diff --git a/.gitignore b/.gitignore index ba8d0fb..acb7c64 100644 --- a/.gitignore +++ b/.gitignore @@ -60,3 +60,18 @@ data/ # Build artifacts *.tar.gz *.zip + +# Mycelium coordination files (not part of project) +# Remove these lines if you want to commit mycelium to the project +mycelium.py +scripts/forager/ +signals/ +substrate/ +audit/ +logs/ +map.yaml +pyproject.toml +CLAUDE.md +AGENT-QUICKSTART.md +docs/FORAGER-*.md +*.vesicle diff --git a/docs/metrics.md b/docs/metrics.md new file mode 100644 index 0000000..4c20d29 --- /dev/null +++ b/docs/metrics.md @@ -0,0 +1,232 @@ +# Metrics Instrumentation + +This document describes the metrics instrumentation available in amp for observability and monitoring. + +## Overview + +Amp provides Prometheus-compatible metrics for monitoring data loading operations. The metrics module offers: + +- **Low overhead** instrumentation with optional prometheus_client dependency +- **Graceful degradation** when prometheus_client is not installed +- **Consistent naming** following Prometheus conventions +- **Thread-safe** singleton implementation + +## Installation + +The metrics module works without prometheus_client (using no-op metrics), but to enable actual metric collection: + +```bash +# Install with metrics support +pip install amp[metrics] + +# Or install prometheus_client directly +pip install prometheus-client +``` + +## Quick Start + +```python +from amp.metrics import get_metrics, start_metrics_server + +# Get the global metrics instance +metrics = get_metrics() + +# Start HTTP server on port 8000 for Prometheus scraping +start_metrics_server(port=8000) + +# Record metrics in your code +metrics.records_processed.labels( + loader='postgresql', + table='users', + connection='default' +).inc(1000) + +metrics.processing_latency.labels( + loader='postgresql', + operation='load_batch' +).observe(0.5) +``` + +## Available Metrics + +### Counters + +| Metric | Labels | Description | +|--------|--------|-------------| +| `amp_records_processed_total` | loader, table, connection | Total records processed | +| `amp_errors_total` | loader, error_type, table | Total errors by type | +| `amp_bytes_processed_total` | loader, table | Total bytes processed | +| `amp_reorg_events_total` | loader, network, table | Blockchain reorg events | +| `amp_retry_attempts_total` | loader, operation, reason | Retry attempts | + +### Histograms + +| Metric | Labels | Description | +|--------|--------|-------------| +| `amp_processing_latency_seconds` | loader, operation | Processing time distribution | +| `amp_batch_size_records` | loader, table | Batch size distribution | + +### Gauges + +| Metric | Labels | Description | +|--------|--------|-------------| +| `amp_active_connections` | loader, target | Current active connections | +| `amp_queue_depth` | queue_name | Current queue depth | + +### Info + +| Metric | Labels | Description | +|--------|--------|-------------| +| `amp_build_info` | (various) | Build/version information | + +## Context Manager for Operations + +The `track_operation` context manager simplifies instrumentation: + +```python +from amp.metrics import get_metrics + +metrics = get_metrics() + +with metrics.track_operation('postgresql', 'load_batch', table='users') as ctx: + # Your loading code here + rows_loaded = load_data(batch) + + # Set context for automatic metric recording + ctx['records'] = rows_loaded + ctx['bytes'] = batch.nbytes + +# Metrics are automatically recorded: +# - processing_latency is observed +# - records_processed is incremented +# - bytes_processed is incremented +# - errors are recorded if an exception occurs +``` + +## Configuration + +Customize metrics collection with `MetricsConfig`: + +```python +from amp.metrics import get_metrics, MetricsConfig + +config = MetricsConfig( + enabled=True, # Enable/disable all metrics + namespace='amp', # Metric name prefix + subsystem='loader', # Optional subsystem name + default_labels={'env': 'prod'}, # Default labels for all metrics + histogram_buckets=( # Custom latency buckets + 0.001, 0.005, 0.01, 0.025, 0.05, + 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0 + ), +) + +metrics = get_metrics(config) +``` + +## Prometheus Integration + +### HTTP Endpoint + +Start a metrics server for Prometheus scraping: + +```python +from amp.metrics import start_metrics_server + +# Start on default port 8000 +start_metrics_server() + +# Or specify custom port and address +start_metrics_server(port=9090, addr='0.0.0.0') +``` + +### Generate Metrics Text + +Generate metrics in Prometheus text format for custom export: + +```python +from amp.metrics import generate_metrics_text + +# Get metrics as bytes +metrics_text = generate_metrics_text() + +# Use in your HTTP handler +@app.route('/metrics') +def metrics_endpoint(): + return Response(generate_metrics_text(), mimetype='text/plain') +``` + +### Example Prometheus Config + +```yaml +scrape_configs: + - job_name: 'amp' + static_configs: + - targets: ['localhost:8000'] + scrape_interval: 15s +``` + +## Grafana Dashboard + +Example queries for a Grafana dashboard: + +```promql +# Records processed rate (per second) +rate(amp_records_processed_total[5m]) + +# P99 latency +histogram_quantile(0.99, rate(amp_processing_latency_seconds_bucket[5m])) + +# Error rate percentage +rate(amp_errors_total[5m]) / rate(amp_records_processed_total[5m]) * 100 + +# Active connections by loader +amp_active_connections + +# Average batch size +rate(amp_batch_size_records_sum[5m]) / rate(amp_batch_size_records_count[5m]) +``` + +## Graceful Degradation + +When prometheus_client is not installed, the metrics module uses no-op implementations that silently accept all operations: + +```python +from amp.metrics import get_metrics, is_prometheus_available + +if is_prometheus_available(): + print("Prometheus metrics enabled") +else: + print("Metrics disabled - install prometheus-client to enable") + +# Code works the same either way +metrics = get_metrics() +metrics.records_processed.labels(loader='test', table='t', connection='c').inc(100) +``` + +## Testing + +For testing, you can reset the metrics singleton: + +```python +from amp.metrics import AmpMetrics + +def test_my_loader(): + # Reset before test + AmpMetrics.reset_instance() + + # Run test with fresh metrics + metrics = get_metrics() + # ... + + # Clean up after test + AmpMetrics.reset_instance() +``` + +## Best Practices + +1. **Use consistent labels** - Keep label values consistent across your codebase +2. **Avoid high cardinality** - Don't use user IDs or request IDs as labels +3. **Use track_operation** - Prefer the context manager for automatic error handling +4. **Set up alerts** - Configure Prometheus alerts for error rates and latency +5. **Dashboard first** - Design your metrics around what you want to see in dashboards diff --git a/pyproject.toml b/pyproject.toml index 258bc24..918354d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,6 +66,10 @@ lmdb = [ "lmdb>=1.4.0", ] +metrics = [ + "prometheus-client>=0.20.0", +] + all_loaders = [ "psycopg2-binary>=2.9.0", # PostgreSQL "redis>=4.5.0", # Redis @@ -75,6 +79,7 @@ all_loaders = [ "snowflake-connector-python>=4.0.0", # Snowflake "snowpipe-streaming>=1.0.0", # Snowpipe Streaming API "lmdb>=1.4.0", # LMDB + "prometheus-client>=0.20.0", # Metrics ] test = [ diff --git a/src/amp/loaders/base.py b/src/amp/loaders/base.py index 967c1fd..77b886e 100644 --- a/src/amp/loaders/base.py +++ b/src/amp/loaders/base.py @@ -11,6 +11,7 @@ import pyarrow as pa +from ..metrics import get_metrics from ..streaming.resilience import ( AdaptiveRateLimiter, BackPressureConfig, @@ -148,6 +149,18 @@ def close(self) -> None: """Alias for disconnect() for backward compatibility""" self.disconnect() + def _record_connection_opened(self) -> None: + """Record that a connection was opened. Call this after establishing connection.""" + loader_type = self.__class__.__name__.replace('Loader', '').lower() + metrics = get_metrics() + metrics.active_connections.labels(loader=loader_type, target='default').inc() + + def _record_connection_closed(self) -> None: + """Record that a connection was closed. Call this after closing connection.""" + loader_type = self.__class__.__name__.replace('Loader', '').lower() + metrics = get_metrics() + metrics.active_connections.labels(loader=loader_type, target='default').dec() + @abstractmethod def _load_batch_impl(self, batch: pa.RecordBatch, table_name: str, **kwargs) -> int: """ @@ -227,6 +240,16 @@ def load_batch(self, batch: pa.RecordBatch, table_name: str, **kwargs) -> LoadRe f'Transient error loading batch (attempt {backoff.attempt}/{self.retry_config.max_retries}): ' f'{last_error}. Retrying in {delay:.1f}s...' ) + # Record retry metric + loader_type = self.__class__.__name__.replace('Loader', '').lower() + metrics = get_metrics() + # Determine retry reason + reason = 'transient' + if '429' in last_error or 'rate limit' in last_error.lower(): + reason = 'rate_limit' + elif 'timeout' in last_error.lower() or 'timed out' in last_error.lower(): + reason = 'timeout' + metrics.retry_attempts.labels(loader=loader_type, operation='load_batch', reason=reason).inc() time.sleep(delay) def _try_load_batch(self, batch: pa.RecordBatch, table_name: str, **kwargs) -> LoadResult: @@ -320,24 +343,39 @@ def _try_load_batch(self, batch: pa.RecordBatch, table_name: str, **kwargs) -> L duration = time.time() - start_time + # Record metrics for successful batch load + loader_type = self.__class__.__name__.replace('Loader', '').lower() + metrics = get_metrics() + metrics.records_processed.labels(loader=loader_type, table=table_name, connection=connection_name).inc( + rows_loaded + ) + metrics.batch_sizes.labels(loader=loader_type, table=table_name).observe(rows_loaded) + metrics.processing_latency.labels(loader=loader_type, operation='load_batch').observe(duration) + if hasattr(batch, 'nbytes'): + metrics.bytes_processed.labels(loader=loader_type, table=table_name).inc(batch.nbytes) + return LoadResult( rows_loaded=rows_loaded, duration=duration, ops_per_second=round(rows_loaded / duration, 2) if duration > 0 else 0, table_name=table_name, - loader_type=self.__class__.__name__.replace('Loader', '').lower(), + loader_type=loader_type, success=True, metadata=self._get_batch_metadata(batch, duration, **kwargs), ) except Exception as e: self.logger.error(f'Failed to load batch: {str(e)}') + # Record error metrics + loader_type = self.__class__.__name__.replace('Loader', '').lower() + metrics = get_metrics() + metrics.errors.labels(loader=loader_type, error_type=type(e).__name__, table=table_name).inc() return LoadResult( rows_loaded=0, duration=time.time() - start_time, ops_per_second=0, table_name=table_name, - loader_type=self.__class__.__name__.replace('Loader', '').lower(), + loader_type=loader_type, success=False, error=str(e), ) @@ -575,8 +613,14 @@ def _process_reorg_event( # Invalidate affected batches from state store if response.invalidation_ranges: + # Record reorg metrics + loader_type = self.__class__.__name__.replace('Loader', '').lower() + metrics = get_metrics() + # Log reorg details for range_obj in response.invalidation_ranges: + # Record reorg event per network + metrics.reorg_events.labels(loader=loader_type, network=range_obj.network, table=table_name).inc() self.logger.warning( f'Reorg detected on {range_obj.network}: blocks {range_obj.start}-{range_obj.end} invalidated' ) diff --git a/src/amp/metrics.py b/src/amp/metrics.py new file mode 100644 index 0000000..fe7c2c5 --- /dev/null +++ b/src/amp/metrics.py @@ -0,0 +1,482 @@ +""" +Metrics instrumentation for amp data processing. + +This module provides observability metrics using Prometheus client. +Metrics can be exported via HTTP endpoint or push gateway. + +Usage: + from amp.metrics import get_metrics, MetricsConfig + + # Get the global metrics instance + metrics = get_metrics() + + # Record processing metrics + metrics.records_processed.labels(loader='postgresql', table='users').inc(1000) + metrics.processing_latency.labels(loader='postgresql', operation='load_batch').observe(0.5) + + # Start HTTP server to expose metrics + from amp.metrics import start_metrics_server + start_metrics_server(port=8000) +""" + +import threading +import time +from contextlib import contextmanager +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, Iterator, Optional + +# Optional prometheus_client import +try: + from prometheus_client import ( + REGISTRY, + CollectorRegistry, + Counter, + Gauge, + Histogram, + Info, + generate_latest, + start_http_server, + ) + + PROMETHEUS_AVAILABLE = True +except ImportError: + PROMETHEUS_AVAILABLE = False + # Provide stub types for type hints + Counter = Any # type: ignore + Gauge = Any # type: ignore + Histogram = Any # type: ignore + Info = Any # type: ignore + CollectorRegistry = Any # type: ignore + REGISTRY = None # type: ignore + + +@dataclass +class MetricsConfig: + """Configuration for metrics collection. + + Attributes: + enabled: Whether metrics collection is enabled + namespace: Prefix for all metric names (default: 'amp') + subsystem: Optional subsystem name for grouping metrics + default_labels: Labels applied to all metrics + histogram_buckets: Custom histogram buckets for latency metrics + """ + + enabled: bool = True + namespace: str = 'amp' + subsystem: str = '' + default_labels: Dict[str, str] = field(default_factory=dict) + histogram_buckets: tuple = ( + 0.001, # 1ms + 0.005, # 5ms + 0.01, # 10ms + 0.025, # 25ms + 0.05, # 50ms + 0.1, # 100ms + 0.25, # 250ms + 0.5, # 500ms + 1.0, # 1s + 2.5, # 2.5s + 5.0, # 5s + 10.0, # 10s + ) + + +class NullMetric: + """No-op metric that silently ignores all operations. + + Used when prometheus_client is not available or metrics are disabled. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + pass + + def labels(self, *args: Any, **kwargs: Any) -> 'NullMetric': + return self + + def inc(self, amount: float = 1) -> None: + pass + + def dec(self, amount: float = 1) -> None: + pass + + def set(self, value: float) -> None: + pass + + def observe(self, value: float) -> None: + pass + + def info(self, val: Dict[str, str]) -> None: + pass + + def time(self) -> 'NullTimer': + return NullTimer() + + @contextmanager + def track_inprogress(self) -> Iterator[None]: + yield + + +class NullTimer: + """No-op timer context manager.""" + + def __enter__(self) -> 'NullTimer': + return self + + def __exit__(self, *args: Any) -> None: + pass + + +class AmpMetrics: + """Central metrics registry for amp data processing. + + Provides Prometheus metrics for monitoring data loading operations: + - Records processed (counter) + - Processing latency (histogram) + - Error rates (counter by type) + - Batch sizes (histogram) + - Active connections (gauge) + - Queue depths (gauge) + + Thread-safe singleton implementation. + """ + + _instance: Optional['AmpMetrics'] = None + _lock = threading.Lock() + + def __new__(cls, config: Optional[MetricsConfig] = None) -> 'AmpMetrics': + """Singleton pattern with lazy initialization.""" + if cls._instance is None: + with cls._lock: + if cls._instance is None: + instance = super().__new__(cls) + instance._initialized = False + cls._instance = instance + return cls._instance + + def __init__(self, config: Optional[MetricsConfig] = None) -> None: + """Initialize metrics. + + Args: + config: Optional metrics configuration + """ + if self._initialized: + return + + self._config = config or MetricsConfig() + self._setup_metrics() + self._initialized = True + + def _setup_metrics(self) -> None: + """Set up all Prometheus metrics.""" + if not PROMETHEUS_AVAILABLE or not self._config.enabled: + self._setup_null_metrics() + return + + ns = self._config.namespace + ss = self._config.subsystem + + # Records processed counter + self.records_processed: Counter = self._get_or_create_metric( + Counter, + name='records_processed_total', + documentation='Total number of records processed', + labelnames=['loader', 'table', 'connection'], + namespace=ns, + subsystem=ss, + ) + + # Processing latency histogram + self.processing_latency: Histogram = self._get_or_create_metric( + Histogram, + name='processing_latency_seconds', + documentation='Time spent processing data', + labelnames=['loader', 'operation'], + namespace=ns, + subsystem=ss, + buckets=self._config.histogram_buckets, + ) + + # Error counter by type + self.errors: Counter = self._get_or_create_metric( + Counter, + name='errors_total', + documentation='Total number of errors', + labelnames=['loader', 'error_type', 'table'], + namespace=ns, + subsystem=ss, + ) + + # Batch sizes histogram + self.batch_sizes: Histogram = self._get_or_create_metric( + Histogram, + name='batch_size_records', + documentation='Number of records per batch', + labelnames=['loader', 'table'], + namespace=ns, + subsystem=ss, + buckets=(10, 50, 100, 500, 1000, 5000, 10000, 50000, 100000), + ) + + # Active connections gauge + self.active_connections: Gauge = self._get_or_create_metric( + Gauge, + name='active_connections', + documentation='Number of active connections', + labelnames=['loader', 'target'], + namespace=ns, + subsystem=ss, + ) + + # Queue depth gauge + self.queue_depth: Gauge = self._get_or_create_metric( + Gauge, + name='queue_depth', + documentation='Number of items in processing queue', + labelnames=['queue_name'], + namespace=ns, + subsystem=ss, + ) + + # Bytes processed counter + self.bytes_processed: Counter = self._get_or_create_metric( + Counter, + name='bytes_processed_total', + documentation='Total bytes processed', + labelnames=['loader', 'table'], + namespace=ns, + subsystem=ss, + ) + + # Reorg events counter + self.reorg_events: Counter = self._get_or_create_metric( + Counter, + name='reorg_events_total', + documentation='Total blockchain reorganization events', + labelnames=['loader', 'network', 'table'], + namespace=ns, + subsystem=ss, + ) + + # Retry attempts counter + self.retry_attempts: Counter = self._get_or_create_metric( + Counter, + name='retry_attempts_total', + documentation='Total retry attempts', + labelnames=['loader', 'operation', 'reason'], + namespace=ns, + subsystem=ss, + ) + + # Build info (static metadata) + self.build_info: Info = self._get_or_create_metric( + Info, + name='build_info', + documentation='Build information', + namespace=ns, + subsystem=ss, + ) + + def _get_or_create_metric(self, metric_class: type, name: str, **kwargs) -> Any: + """Get an existing metric from the registry or create a new one. + + This handles the case where metrics might already be registered + (e.g., during test runs) by returning the existing metric instead + of failing with a duplicate registration error. + """ + ns = kwargs.get('namespace', '') + ss = kwargs.get('subsystem', '') + + # Build the full metric name as Prometheus does (without _total suffix) + full_name = '_'.join(filter(None, [ns, ss, name.replace('_total', '')])) + + try: + return metric_class(name=name, **kwargs) + except ValueError as e: + if 'Duplicated timeseries' in str(e): + # Metric already exists, try to get it from the registry + if full_name in REGISTRY._names_to_collectors: + return REGISTRY._names_to_collectors[full_name] + # If we can't find it, re-raise the original error + raise + raise + + def _setup_null_metrics(self) -> None: + """Set up no-op metrics when prometheus is unavailable or disabled.""" + self.records_processed = NullMetric() + self.processing_latency = NullMetric() + self.errors = NullMetric() + self.batch_sizes = NullMetric() + self.active_connections = NullMetric() + self.queue_depth = NullMetric() + self.bytes_processed = NullMetric() + self.reorg_events = NullMetric() + self.retry_attempts = NullMetric() + self.build_info = NullMetric() + + @contextmanager + def track_operation( + self, loader: str, operation: str, table: str = '', connection: str = 'default' + ) -> Iterator[Dict[str, Any]]: + """Context manager to track an operation's duration and success. + + Usage: + with metrics.track_operation('postgresql', 'load_batch', table='users') as ctx: + # do work + ctx['records'] = 1000 + + Args: + loader: Loader type name + operation: Operation name (e.g., 'load_batch', 'connect') + table: Target table name (optional) + connection: Connection name (optional) + + Yields: + Context dict where you can set 'records' and 'bytes' for automatic tracking + """ + ctx: Dict[str, Any] = {'records': 0, 'bytes': 0, 'error': None} + start_time = time.perf_counter() + + try: + yield ctx + except Exception as e: + ctx['error'] = type(e).__name__ + self.errors.labels(loader=loader, error_type=type(e).__name__, table=table).inc() + raise + finally: + duration = time.perf_counter() - start_time + self.processing_latency.labels(loader=loader, operation=operation).observe(duration) + + if ctx['records'] > 0: + self.records_processed.labels(loader=loader, table=table, connection=connection).inc(ctx['records']) + self.batch_sizes.labels(loader=loader, table=table).observe(ctx['records']) + + if ctx['bytes'] > 0: + self.bytes_processed.labels(loader=loader, table=table).inc(ctx['bytes']) + + def reset(self) -> None: + """Reset all metrics (useful for testing). + + Note: This creates new metric instances. In production, you typically + don't reset metrics as they should accumulate over the process lifetime. + """ + self._initialized = False + self._setup_metrics() + + @classmethod + def reset_instance(cls) -> None: + """Reset the singleton instance (useful for testing). + + This also unregisters metrics from Prometheus registry to allow + re-registration with the same names. + """ + with cls._lock: + if cls._instance is not None and PROMETHEUS_AVAILABLE: + from prometheus_client import REGISTRY + + # Unregister all metrics from the global registry + metrics_to_unregister = [ + 'records_processed', + 'processing_latency', + 'errors', + 'batch_sizes', + 'active_connections', + 'queue_depth', + 'bytes_processed', + 'reorg_events', + 'retry_attempts', + 'build_info', + ] + for metric_name in metrics_to_unregister: + metric = getattr(cls._instance, metric_name, None) + if metric is not None and not isinstance(metric, NullMetric): + try: + REGISTRY.unregister(metric) + except Exception: + pass # Metric may not be registered + cls._instance = None + + +def get_metrics(config: Optional[MetricsConfig] = None) -> AmpMetrics: + """Get the global metrics instance. + + Args: + config: Optional configuration (only used on first call) + + Returns: + The singleton AmpMetrics instance + """ + return AmpMetrics(config) + + +def start_metrics_server(port: int = 8000, addr: str = '') -> None: + """Start HTTP server to expose Prometheus metrics. + + Args: + port: Port to listen on (default: 8000) + addr: Address to bind to (default: all interfaces) + + Raises: + RuntimeError: If prometheus_client is not available + """ + if not PROMETHEUS_AVAILABLE: + raise RuntimeError('prometheus_client is not installed. Install with: pip install prometheus-client') + + start_http_server(port, addr) + + +def generate_metrics_text() -> bytes: + """Generate Prometheus metrics in text format. + + Returns: + Metrics in Prometheus text exposition format + + Raises: + RuntimeError: If prometheus_client is not available + """ + if not PROMETHEUS_AVAILABLE: + raise RuntimeError('prometheus_client is not installed. Install with: pip install prometheus-client') + + return generate_latest(REGISTRY) + + +def is_prometheus_available() -> bool: + """Check if prometheus_client is available. + + Returns: + True if prometheus_client is installed and importable + """ + return PROMETHEUS_AVAILABLE + + +# Convenience function for instrumenting a callable +def timed( + metric_name: str = 'processing_latency', + loader: str = 'unknown', + operation: str = 'unknown', +) -> Callable: + """Decorator to time function execution. + + Usage: + @timed(loader='postgresql', operation='query') + def execute_query(): + ... + + Args: + metric_name: Name of the histogram metric to use + loader: Loader label value + operation: Operation label value + + Returns: + Decorated function + """ + + def decorator(func: Callable) -> Callable: + def wrapper(*args: Any, **kwargs: Any) -> Any: + metrics = get_metrics() + with metrics.track_operation(loader, operation): + return func(*args, **kwargs) + + return wrapper + + return decorator diff --git a/tests/unit/test_metrics.py b/tests/unit/test_metrics.py new file mode 100644 index 0000000..6965d84 --- /dev/null +++ b/tests/unit/test_metrics.py @@ -0,0 +1,278 @@ +""" +Unit tests for amp metrics instrumentation. + +Tests the metrics module functionality including: +- Metrics initialization and singleton pattern +- Counter, Gauge, and Histogram operations +- Context manager for tracking operations +- Graceful degradation when prometheus_client unavailable +""" + +import pytest + +from src.amp.metrics import ( + AmpMetrics, + MetricsConfig, + NullMetric, + get_metrics, + is_prometheus_available, +) + + +@pytest.mark.unit +class TestMetricsConfig: + """Test MetricsConfig dataclass.""" + + def test_default_values(self): + """Test default configuration values.""" + config = MetricsConfig() + + assert config.enabled is True + assert config.namespace == 'amp' + assert config.subsystem == '' + assert config.default_labels == {} + assert len(config.histogram_buckets) > 0 + + def test_custom_values(self): + """Test custom configuration values.""" + config = MetricsConfig( + enabled=False, + namespace='myapp', + subsystem='loader', + default_labels={'env': 'prod'}, + ) + + assert config.enabled is False + assert config.namespace == 'myapp' + assert config.subsystem == 'loader' + assert config.default_labels == {'env': 'prod'} + + +@pytest.mark.unit +class TestNullMetric: + """Test NullMetric no-op implementation.""" + + def test_null_metric_all_operations(self): + """Test that NullMetric accepts all operations silently.""" + metric = NullMetric() + + # All operations should succeed without error + metric.inc() + metric.inc(10) + metric.dec() + metric.dec(5) + metric.set(100) + metric.observe(0.5) + metric.info({'version': '1.0'}) + + def test_null_metric_labels(self): + """Test that labels() returns another NullMetric.""" + metric = NullMetric() + + labeled = metric.labels(foo='bar', baz='qux') + assert isinstance(labeled, NullMetric) + + # Chained operations should work + metric.labels(a='1').labels(b='2').inc() + + def test_null_metric_timer(self): + """Test that time() returns a no-op timer.""" + metric = NullMetric() + timer = metric.time() + + with timer: + pass # Should not raise + + def test_null_metric_track_inprogress(self): + """Test that track_inprogress context manager works.""" + metric = NullMetric() + + with metric.track_inprogress(): + pass # Should not raise + + +@pytest.mark.unit +class TestAmpMetricsSingleton: + """Test AmpMetrics singleton pattern.""" + + def setup_method(self): + """Reset singleton before each test.""" + AmpMetrics.reset_instance() + + def teardown_method(self): + """Reset singleton after each test.""" + AmpMetrics.reset_instance() + + def test_singleton_returns_same_instance(self): + """Test that multiple calls return the same instance.""" + metrics1 = AmpMetrics() + metrics2 = AmpMetrics() + + assert metrics1 is metrics2 + + def test_get_metrics_returns_singleton(self): + """Test that get_metrics returns the singleton.""" + metrics1 = get_metrics() + metrics2 = get_metrics() + + assert metrics1 is metrics2 + + def test_config_only_used_on_first_call(self): + """Test that config is only applied on first initialization.""" + config1 = MetricsConfig(namespace='first') + config2 = MetricsConfig(namespace='second') + + metrics1 = AmpMetrics(config1) + metrics2 = AmpMetrics(config2) + + # Both should use the first config + assert metrics1._config.namespace == 'first' + assert metrics2._config.namespace == 'first' + + +@pytest.mark.unit +class TestAmpMetricsWithPrometheus: + """Test AmpMetrics when prometheus_client is available.""" + + def setup_method(self): + """Reset singleton before each test.""" + AmpMetrics.reset_instance() + + def teardown_method(self): + """Reset singleton after each test.""" + AmpMetrics.reset_instance() + + @pytest.mark.skipif(not is_prometheus_available(), reason='prometheus_client not installed') + def test_metrics_initialized(self): + """Test that metrics are properly initialized.""" + metrics = get_metrics() + + # All expected metrics should be present + assert hasattr(metrics, 'records_processed') + assert hasattr(metrics, 'processing_latency') + assert hasattr(metrics, 'errors') + assert hasattr(metrics, 'batch_sizes') + assert hasattr(metrics, 'active_connections') + assert hasattr(metrics, 'queue_depth') + assert hasattr(metrics, 'bytes_processed') + assert hasattr(metrics, 'reorg_events') + assert hasattr(metrics, 'retry_attempts') + assert hasattr(metrics, 'build_info') + + @pytest.mark.skipif(not is_prometheus_available(), reason='prometheus_client not installed') + def test_counter_operations(self): + """Test counter metric operations.""" + metrics = get_metrics() + + # Should not raise + metrics.records_processed.labels(loader='test', table='users', connection='default').inc() + metrics.records_processed.labels(loader='test', table='users', connection='default').inc(100) + + @pytest.mark.skipif(not is_prometheus_available(), reason='prometheus_client not installed') + def test_gauge_operations(self): + """Test gauge metric operations.""" + metrics = get_metrics() + + # Should not raise + metrics.active_connections.labels(loader='test', target='localhost').inc() + metrics.active_connections.labels(loader='test', target='localhost').dec() + metrics.active_connections.labels(loader='test', target='localhost').set(5) + + @pytest.mark.skipif(not is_prometheus_available(), reason='prometheus_client not installed') + def test_histogram_operations(self): + """Test histogram metric operations.""" + metrics = get_metrics() + + # Should not raise + metrics.processing_latency.labels(loader='test', operation='load').observe(0.5) + metrics.batch_sizes.labels(loader='test', table='users').observe(1000) + + +@pytest.mark.unit +class TestAmpMetricsDisabled: + """Test AmpMetrics when disabled.""" + + def setup_method(self): + """Reset singleton before each test.""" + AmpMetrics.reset_instance() + + def teardown_method(self): + """Reset singleton after each test.""" + AmpMetrics.reset_instance() + + def test_disabled_metrics_use_null_metric(self): + """Test that disabled metrics use NullMetric.""" + config = MetricsConfig(enabled=False) + metrics = get_metrics(config) + + # All metrics should be NullMetric instances + assert isinstance(metrics.records_processed, NullMetric) + assert isinstance(metrics.processing_latency, NullMetric) + assert isinstance(metrics.errors, NullMetric) + + def test_disabled_metrics_operations_succeed(self): + """Test that operations on disabled metrics succeed silently.""" + config = MetricsConfig(enabled=False) + metrics = get_metrics(config) + + # All operations should work without error + metrics.records_processed.labels(loader='test', table='t', connection='c').inc(100) + metrics.processing_latency.labels(loader='test', operation='op').observe(0.5) + metrics.active_connections.labels(loader='test', target='t').set(10) + + +@pytest.mark.unit +class TestTrackOperation: + """Test the track_operation context manager.""" + + def setup_method(self): + """Reset singleton before each test.""" + AmpMetrics.reset_instance() + + def teardown_method(self): + """Reset singleton after each test.""" + AmpMetrics.reset_instance() + + def test_track_operation_basic(self): + """Test basic track_operation usage.""" + config = MetricsConfig(enabled=False) # Use NullMetric for isolated testing + metrics = get_metrics(config) + + with metrics.track_operation('test_loader', 'test_op', table='test_table') as ctx: + ctx['records'] = 100 + ctx['bytes'] = 5000 + + # Should complete without error + assert ctx['records'] == 100 + assert ctx['bytes'] == 5000 + + def test_track_operation_error_handling(self): + """Test track_operation records errors.""" + config = MetricsConfig(enabled=False) + metrics = get_metrics(config) + + with pytest.raises(ValueError): + with metrics.track_operation('test_loader', 'test_op'): + raise ValueError('test error') + + def test_track_operation_default_values(self): + """Test track_operation with default context values.""" + config = MetricsConfig(enabled=False) + metrics = get_metrics(config) + + with metrics.track_operation('test_loader', 'test_op') as ctx: + pass # Don't set any values + + assert ctx['records'] == 0 + assert ctx['bytes'] == 0 + assert ctx['error'] is None + + +@pytest.mark.unit +class TestIsPrometheusAvailable: + """Test is_prometheus_available function.""" + + def test_returns_boolean(self): + """Test that function returns a boolean.""" + result = is_prometheus_available() + assert isinstance(result, bool)