feat: Migrate task states and task messages from MongoDB to PostgreSQL#128
Draft
feat: Migrate task states and task messages from MongoDB to PostgreSQL#128
Conversation
Implement phased migration strategy for task states: - Phase 0: Add feature flag TASK_STATE_STORAGE_PHASE - Add TaskStateORM model and Alembic migration - Create TaskStatePostgresRepository for PostgreSQL storage - Create TaskStateDualRepository for phased rollout Migration phases supported: - mongodb: Legacy behavior (MongoDB only) - dual_write: Write to both, read from MongoDB - dual_read: Write to both, read from both with verification - postgres: PostgreSQL only (target state) Includes: - Datadog StatsD metrics for dual_read verification - Backfill script for existing MongoDB data - Verification script for data consistency checks - Unit tests for all repository operations and metrics
7685cef to
a9f298c
Compare
- Add benchmark scripts for comparing MongoDB vs PostgreSQL performance: - benchmark_task_state.py: Repository-level benchmarks - benchmark_api.py: API-level benchmarks with connection pooling - compare_results.py: Generate markdown comparison reports - locustfile.py: Cluster load tests with Locust - Add storage_backend query parameter to dynamically switch backends: - Enables benchmarking without server restarts - Valid values: mongodb, dual_write, dual_read, postgres - Fix FastAPI dependency injection in TaskStateDualRepository: - Remove 'from __future__ import annotations' which broke DI resolution - Use List from typing to avoid 'list' method name shadowing - Update authorization_shortcuts to use DTaskStateDualRepository Benchmark results show MongoDB ~20-30% faster than PostgreSQL at API level.
Port task messages to PostgreSQL using the same phased dual-write/dual-read strategy used for task states. Controlled by TASK_MESSAGE_STORAGE_PHASE env var with phases: mongodb → dual_write → dual_read → postgres. New files: - TaskMessageORM model with JSONB content column and indexes - Alembic migration for task_messages table - TaskMessagePostgresRepository with JSONB filter translation and cursor pagination - TaskMessageDualRepository with 4-phase switching and Datadog metrics - Backfill and verification scripts for data migration - Unit tests (63 tests: 47 dual repo + 16 postgres repo) Modified: - Wire TaskMessageService to use DualRepository - Pass raw TaskMessageEntityFilter objects through to storage layer - Add TASK_MESSAGE_STORAGE_PHASE to environment variables
JSONB supports indexing and faster querying compared to JSON, consistent with all other JSON columns in the schema.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implements a phased migration strategy for task states and task messages from MongoDB to PostgreSQL with zero-downtime rollout capability. Both use the same dual-write/dual-read pattern controlled by independent feature flags.
Key changes:
TASK_STATE_STORAGE_PHASEandTASK_MESSAGE_STORAGE_PHASEfeature flagstask_statesandtask_messagesPostgreSQL tables with JSONB columnsstorage_backendquery parameter for testingMigration Phases
Each entity migrates independently via its own env var:
mongodbdual_writedual_readpostgresFiles Changed
Task State Migration
src/adapters/orm.py—TaskStateORMmodel (JSONB state column)src/domain/repositories/task_state_postgres_repository.py— PostgreSQL repositorysrc/domain/repositories/task_state_dual_repository.py— Dual-write wrappersrc/domain/use_cases/states_use_case.py— Updated to use dual repositorydatabase/migrations/.../postgres_task_state_07fc12196914.py— Alembic migrationscripts/backfill_task_states.py— Backfill MongoDB → PostgreSQLscripts/verify_task_states.py— Verify data consistencytests/unit/repositories/test_task_state_dual_repository.py— 35 teststests/unit/repositories/test_task_state_postgres_repository.py— 2 testsTask Message Migration
src/adapters/orm.py—TaskMessageORMmodel (JSONB content column, 3 indexes)src/domain/repositories/task_message_postgres_repository.py— PostgreSQL repository with JSONB filter translation and cursor paginationsrc/domain/repositories/task_message_dual_repository.py— Dual-write wrapper with MongoDB filter conversionsrc/domain/services/task_message_service.py— Updated to use dual repositorysrc/domain/use_cases/messages_use_case.py— Pass raw filters to storage layerdatabase/migrations/.../postgres_messages_b4d5f54e4ba2.py— Alembic migrationscripts/backfill_task_messages.py— Backfill MongoDB → PostgreSQLscripts/verify_task_messages.py— Verify data consistencytests/unit/repositories/test_task_message_dual_repository.py— 47 teststests/unit/repositories/test_task_message_postgres_repository.py— 16 testsShared / Config
src/config/environment_variables.py— Both storage phase env varssrc/adapters/orm.py— Both ORM modelsMetrics (dual_read phase)
Task States
task_state.dual_read.matchtask_state.dual_read.mismatch.missing_postgrestask_state.dual_read.mismatch.missing_mongodbtask_state.dual_read.mismatch.state_contenttask_state.dual_read.list_count_mismatchTask Messages
task_message.dual_read.matchtask_message.dual_read.mismatch.missing_postgrestask_message.dual_read.mismatch.missing_mongodbtask_message.dual_read.mismatch.contenttask_message.dual_read.list_count_mismatchRollout Plan
For each entity (task states, then task messages):
*_STORAGE_PHASE=mongodb(no behavior change)python scripts/backfill_task_{states,messages}.py*_STORAGE_PHASE=dual_write*_STORAGE_PHASE=dual_read, monitor metrics*_STORAGE_PHASE=postgresRollback
Set either
*_STORAGE_PHASEenv var back to the previous phase at any time.Test plan