Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 61 additions & 10 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,68 @@ uv run ruff check . --fix

### Data Loader System
The core architecture follows a plugin-based loader system with:
- Abstract `DataLoader` base class in `src/amp/loaders/base.py`
- Auto-discovery mechanism for loaders via `__init_subclass__`
- Generic `DataLoader[TConfig]` base class in `src/amp/loaders/base.py`
- Auto-discovery via registry in `src/amp/loaders/registry.py`
- Zero-copy operations using PyArrow for performance
- Connection management with named connections and environment variables

When implementing new loaders:
1. Inherit from `DataLoader` base class
2. Implement required methods: `connect()`, `load_table()`, `close()`
3. Define configuration schema using dataclasses
4. Register supported data types in class attributes
5. Follow existing patterns from PostgreSQL and Redis loaders
- Built-in resilience (retry, backpressure), state management, and reorg handling

**For detailed implementation instructions, see `src/amp/loaders/NEW_LOADER_GUIDE.md`**

#### Quick Reference: Implementing New Loaders

**Files to create:**
1. `src/amp/loaders/implementations/xxx_loader.py` - Main implementation
2. `tests/integration/loaders/backends/test_xxx.py` - Integration tests

**Files to modify:**
1. `src/amp/loaders/implementations/__init__.py` - Add import
2. `tests/conftest.py` - Add testcontainer and config fixtures

**Required implementation:**
```python
@dataclass
class XxxConfig:
host: str = 'localhost'
port: int = 1234
# ... connection settings

class XxxLoader(DataLoader[XxxConfig]):
SUPPORTED_MODES = {LoadMode.APPEND}
SUPPORTS_TRANSACTIONS = False

def connect(self) -> None: ...
def disconnect(self) -> None: ...
def _load_batch_impl(self, batch, table_name, **kwargs) -> int: ...
def _create_table_from_schema(self, schema, table_name) -> None: ...
def table_exists(self, table_name) -> bool: ...
```

**Test implementation:**
```python
class XxxTestConfig(LoaderTestConfig):
loader_class = XxxLoader
config_fixture_name = 'xxx_test_config'

def get_row_count(self, loader, table_name) -> int: ...
def query_rows(self, loader, table_name, where, order_by) -> List[Dict]: ...
def cleanup_table(self, loader, table_name) -> None: ...
def get_column_names(self, loader, table_name) -> List[str]: ...

class TestXxxCore(BaseLoaderTests):
config = XxxTestConfig() # Inherits 6 generalized tests

class TestXxxStreaming(BaseStreamingTests):
config = XxxTestConfig() # Inherits 5 streaming tests
```

#### Existing Loaders (for reference)
- **ClickHouse**: OLAP, columnar, no transactions - `clickhouse_loader.py`
- **PostgreSQL**: OLTP, connection pooling, transactions - `postgresql_loader.py`
- **Redis**: Key-value, multiple data structures - `redis_loader.py`
- **Snowflake**: Cloud warehouse - `snowflake_loader.py`
- **DeltaLake**: File-based, ACID transactions - `deltalake_loader.py`
- **Iceberg**: Catalog-based, partitioned tables - `iceberg_loader.py`
- **LMDB**: Embedded, memory-mapped - `lmdb_loader.py`

### Testing Strategy
- **Unit tests**: Test pure logic and data structures WITHOUT mocking. Unit tests should be simple, fast, and test isolated components (dataclasses, utility functions, partitioning logic, etc.). Do NOT add tests that require mocking to `tests/unit/`.
Expand Down
Loading