Skip to content

Add performance optimization metrics for loader and streaming analysis #40

@fordN

Description

@fordN

The current metrics implementation provides basic observability but lacks the granularity needed for performance optimization and bottleneck analysis across different loaders and workloads.

Current State

We have these metrics instrumented in src/amp/metrics.py:

  • records_processed (Counter) - loader, table, connection
  • bytes_processed (Counter) - loader, table
  • processing_latency (Histogram) - loader, operation
  • batch_sizes (Histogram) - loader, table
  • errors (Counter) - loader, error_type, table
  • retry_attempts (Counter) - loader, operation, reason
  • active_connections (Gauge) - loader, target
  • reorg_events (Counter) - loader, network, table

Gaps

1. Flight SQL Client Metrics

Current: Zero instrumentation on the data fetching side
Proposed:

  • flight_fetch_latency_seconds (Histogram) - query latency from Flight SQL server
  • flight_bytes_received_total (Counter) - bytes received from server
  • flight_batches_received_total (Counter) - Arrow batches received

2. Parallel Streaming Metrics

Current: Zero instrumentation in src/amp/streaming/
Proposed:

  • streaming_worker_utilization (Gauge) - active workers / total workers
  • streaming_queue_depth (Gauge) - batches waiting per worker
  • streaming_batch_wait_seconds (Histogram) - time batches spend in queue
  • streaming_worker_processing_seconds (Histogram) - per-worker processing time

3. Phase-Level Latency Breakdown

Current: Only load_batch operation tracked
Proposed: Separate histograms or labels for:

  • fetch - time to receive data from Flight SQL
  • transform - time for any data transformation
  • write - time to write to target system

Use Cases This Enables

# Compare loader throughput
rate(amp_records_processed_total[5m]) by (loader)

# Identify bottleneck: fetch vs load
histogram_quantile(0.95, rate(amp_phase_latency_seconds_bucket{phase="fetch"}[5m]))
histogram_quantile(0.95, rate(amp_phase_latency_seconds_bucket{phase="write"}[5m]))

# Parallel efficiency
amp_streaming_worker_utilization

# Queue pressure
rate(amp_streaming_batch_wait_seconds_sum[5m]) / rate(amp_streaming_batch_wait_seconds_count[5m])

Implementation Notes

  • All new metrics should use the existing _get_or_create_metric() helper for test isolation
  • Consider adding a workload or dataset label for A/B comparisons
  • Memory metrics could be added via psutil if needed for correlation analysis

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions