Skip to content
Draft
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""postgres task state

Revision ID: 07fc12196914
Revises: a1b2c3d4e5f6
Create Date: 2026-01-23 19:57:54.790475

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB


# revision identifiers, used by Alembic.
revision: str = '07fc12196914'
down_revision: Union[str, None] = 'd024851e790c'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.create_table(
"task_states",
sa.Column("id", sa.String(), nullable=False),
sa.Column("task_id", sa.String(), nullable=False),
sa.Column("agent_id", sa.String(), nullable=False),
sa.Column("state", JSONB, nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=True,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=True,
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("task_id", "agent_id", name="uq_task_states_task_agent"),
)
op.create_index("idx_task_states_task_id", "task_states", ["task_id"], unique=False)
op.create_index(
"idx_task_states_agent_id", "task_states", ["agent_id"], unique=False
)


def downgrade() -> None:
op.drop_index("idx_task_states_agent_id", table_name="task_states")
op.drop_index("idx_task_states_task_id", table_name="task_states")
op.drop_table("task_states")
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""postgres messages

Revision ID: b4d5f54e4ba2
Revises: 07fc12196914
Create Date: 2026-02-10 17:53:14.889515

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision: str = 'b4d5f54e4ba2'
down_revision: Union[str, None] = '07fc12196914'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('task_messages',
sa.Column('id', sa.String(), nullable=False),
sa.Column('task_id', sa.String(), nullable=False),
sa.Column('content', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.Column('streaming_status', sa.String(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index('idx_task_messages_task_id', 'task_messages', ['task_id'], unique=False)
op.create_index('idx_task_messages_task_id_created_at', 'task_messages', ['task_id', 'created_at'], unique=False)
op.create_index('idx_task_messages_task_id_streaming_status', 'task_messages', ['task_id', 'streaming_status'], unique=False)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('idx_task_messages_task_id_streaming_status', table_name='task_messages')
op.drop_index('idx_task_messages_task_id_created_at', table_name='task_messages')
op.drop_index('idx_task_messages_task_id', table_name='task_messages')
op.drop_table('task_messages')
# ### end Alembic commands ###
4 changes: 3 additions & 1 deletion agentex/database/migrations/migration_history.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
24429f13b8bd -> d024851e790c (head), add_performance_indexes
07fc12196914 -> b4d5f54e4ba2 (head), postgres messages
d024851e790c -> 07fc12196914, postgres task state
24429f13b8bd -> d024851e790c, add_performance_indexes
a5d67f2d7356 -> 24429f13b8bd, add agent input type
329fbafa4ff9 -> a5d67f2d7356, add unhealthy status
d7addd4229e8 -> 329fbafa4ff9, change_default_acp_to_async
Expand Down
171 changes: 171 additions & 0 deletions agentex/scripts/backfill_task_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
#!/usr/bin/env python3
"""
Backfill task messages from MongoDB to PostgreSQL.

This script should be run BEFORE enabling dual-write phase to ensure
existing task messages are present in PostgreSQL.

Usage:
python scripts/backfill_task_messages.py [--dry-run] [--batch-size=1000]

Options:
--dry-run Don't actually write to PostgreSQL, just log what would be done
--batch-size Number of records to process per batch (default: 1000)
"""

import argparse
import asyncio
import sys
from pathlib import Path

# Add the src directory to the path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))

from sqlalchemy.ext.asyncio import async_sessionmaker
from src.config.dependencies import GlobalDependencies
from src.domain.repositories.task_message_postgres_repository import (
TaskMessagePostgresRepository,
)
from src.domain.repositories.task_message_repository import TaskMessageRepository
from src.utils.logging import make_logger

logger = make_logger(__name__)


async def backfill_messages(
dry_run: bool = False, batch_size: int = 1000
) -> tuple[int, int, int]:
"""
Backfill all task messages from MongoDB to PostgreSQL.

Args:
dry_run: If True, don't actually write to PostgreSQL
batch_size: Number of records to process per batch

Returns:
Tuple of (migrated_count, skipped_count, error_count)
"""
# Initialize dependencies
deps = GlobalDependencies()
await deps.load()

if deps.mongodb_database is None:
logger.error("MongoDB is not configured. Cannot backfill.")
return 0, 0, 0

if deps.database_async_read_write_engine is None:
logger.error("PostgreSQL is not configured. Cannot backfill.")
return 0, 0, 0

# Create repositories
mongo_repo = TaskMessageRepository(deps.mongodb_database)

rw_session_maker = async_sessionmaker(
autoflush=False,
bind=deps.database_async_read_write_engine,
expire_on_commit=False,
class_=AsyncSession,
)
ro_session_maker = async_sessionmaker(
autoflush=False,
bind=deps.database_async_read_only_engine
or deps.database_async_read_write_engine,
expire_on_commit=False,
class_=AsyncSession,
)
postgres_repo = TaskMessagePostgresRepository(rw_session_maker, ro_session_maker)

# Pagination through MongoDB
page = 1
total_migrated = 0
total_skipped = 0
total_errors = 0

logger.info(f"Starting backfill (dry_run={dry_run}, batch_size={batch_size})")

while True:
try:
messages = await mongo_repo.list(limit=batch_size, page_number=page)
except Exception as e:
logger.error(f"Error fetching messages from MongoDB: {e}")
break

if not messages:
break

for message in messages:
try:
# Check if already exists in PostgreSQL by ID
try:
existing = await postgres_repo.get(id=message.id)
except Exception:
existing = None

if existing:
total_skipped += 1
continue

if dry_run:
logger.info(
f"[DRY RUN] Would migrate message {message.id} "
f"(task={message.task_id})"
)
total_migrated += 1
continue

# Create in PostgreSQL with the same ID
await postgres_repo.create(message)
total_migrated += 1

except Exception as e:
logger.error(
f"Error migrating message {message.id}: {e}",
extra={"task_id": message.task_id},
)
total_errors += 1

logger.info(
f"Processed page {page}: "
f"migrated={total_migrated}, skipped={total_skipped}, errors={total_errors}"
)
page += 1

logger.info(
f"Backfill complete: "
f"migrated={total_migrated}, skipped={total_skipped}, errors={total_errors}"
)

return total_migrated, total_skipped, total_errors


def main():
parser = argparse.ArgumentParser(
description="Backfill task messages from MongoDB to PostgreSQL"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Don't actually write to PostgreSQL, just log what would be done",
)
parser.add_argument(
"--batch-size",
type=int,
default=1000,
help="Number of records to process per batch (default: 1000)",
)
args = parser.parse_args()

migrated, skipped, errors = asyncio.run(
backfill_messages(dry_run=args.dry_run, batch_size=args.batch_size)
)

# Exit with error code if there were any errors
if errors > 0:
sys.exit(1)


if __name__ == "__main__":
# Import AsyncSession here to avoid issues with the path setup
from sqlalchemy.ext.asyncio import AsyncSession

main()
Loading
Loading