Skip to content

[SPIKE] Audits using EF#5278

Draft
johnsimons wants to merge 56 commits intomasterfrom
john/audit_ef
Draft

[SPIKE] Audits using EF#5278
johnsimons wants to merge 56 commits intomasterfrom
john/audit_ef

Conversation

@johnsimons
Copy link
Member

No description provided.

Refactors the upsert logic in several data stores to leverage EF Core's change tracking more efficiently.

Instead of creating a new entity and then calling Update, the code now fetches the existing entity (if any) and modifies its properties directly.
This reduces the overhead and potential issues associated with detached entities.

The RecoverabilityIngestionUnitOfWork is also updated to use change tracking for FailedMessageEntity updates.

This commit was made on the `john/more_interfaces` branch.
Adds data store and entities required for persisting licensing and throughput data.

This includes adding new tables for licensing metadata, throughput endpoints, and daily throughput data, as well as configurations and a data store implementation to interact with these tables.
Also added headers to the serialised entity
Updates data stores to utilize IServiceScopeFactory instead of IServiceProvider for creating database scopes.

This change improves dependency injection and resource management,
ensuring proper scope lifecycle management, especially for asynchronous operations.
Adds full-text search capabilities for error messages, allowing users to search within message headers and, optionally, the message body.

Introduces an interface for full-text search providers to abstract the database-specific implementation.

Stores small message bodies inline for faster retrieval and populates a searchable text field from headers and the message body.

Adds configuration option to set the maximum body size to store inline.
Adds tracking of known endpoints to the SQL persistence, including the endpoint name, host ID, host, and last seen time.

Introduces a retention cleaner to remove outdated audit data and associated message body files, ensuring the database remains manageable and performs efficiently.

Uses Upsert to improve the performance when updating the last seen time of known endpoints.
Replaces the upsert operation with an insert-only approach and a background reconciler to synchronize data with the main table.

This change enhances performance and simplifies the data flow for known endpoints. It introduces a new entity and configuration for insert-only operations and a background service that reconciles the insert-only table with the main table, merging and updating data. The reconciler uses advisory locks to ensure data consistency.
Ensures that the reconciliation process in the insert-only table reconciler is transactional by wrapping the batch reconciliation within a database transaction.

This prevents partial updates and maintains data consistency.

Refactors the KnownEndpointsReconciler to use a table variable to store deleted records before aggregation, improving performance and readability of the SQL MERGE statement.
@johnsimons johnsimons force-pushed the john/audit_ef branch 2 times, most recently from cfcf1c9 to 9106c1f Compare February 2, 2026 06:06
Refactors audit ingestion to use batching and concurrency for improved performance.
Introduces configurable batch size, parallel writers, and batch timeout settings.
This change optimizes the ingestion process by assembling messages into batches and processing them concurrently, leading to higher throughput and reduced latency.
Combines headers and body into a single `SearchableContent`
column for full-text indexing. This simplifies FTS queries and
improves search performance.

Also refactors `AuditIngestionUnitOfWork` to utilize the new
`SearchableContent` property and streamline body storage logic.

Removes obsolete migration files related to previous FTS
implementations.
Updates Entity Framework Core and related packages to the latest versions.

This ensures compatibility with the latest features and bug fixes in the EF Core ecosystem. It also addresses potential security vulnerabilities and improves overall performance.
Removes the MySQL persistence implementation due to its incomplete state and lack of audit support.
This simplifies the codebase and focuses resources on fully supported persistence options.

The related test projects and SQL persistence files have been removed.
Package versions are updated to align with current versions.
Introduces a setting to control whether message bodies are stored on disk.

This is useful for scenarios where disk space is a concern or message bodies
are not required for auditing purposes. It enhances configuration flexibility.
@johnsimons
Copy link
Member Author

johnsimons commented Feb 3, 2026

Audit Ingestion Parallel Processing Improvements

Executive Summary

Refactored the AuditIngestion class to support parallel database writes, significantly increasing throughput by decoupling transport message dequeuing from database persistence.


Architecture Comparison

Before: Sequential Processing

Transport (MaxConcurrency threads, e.g., 32)
    ↓
Channel<MessageContext> (capacity = MaxConcurrency = 32)
    ↓
Single Consumer (ExecuteAsync loop)
    ↓
Sequential DB Write (one batch at a time)
    ↓
Complete TaskCompletionSource → Ack message

Bottleneck: Single reader processes one batch at a time. All transport threads wait for DB write to complete before their messages are acknowledged.

After: Parallel Processing

Transport (MaxConcurrency threads, e.g., 100)
    ↓
Channel<MessageContext> (capacity = BatchSize × MaxParallelWriters × 2 = 400)
    ↓
Batch Assembler Task (single reader, assembles batches of 50)
    ↓
Channel<List<MessageContext>> (capacity = MaxParallelWriters × 2 = 8)
    ↓
4 Parallel Writer Tasks → Concurrent DB Writes
    ↓
Complete TaskCompletionSource → Ack message

Improvement: Multiple batches write to DB concurrently while transport continues dequeuing into larger buffer.

New Configuration Settings

Setting Default Range Description
AuditIngestionBatchSize 50 1-500 Messages per batch sent to DB
AuditIngestionMaxParallelWriters 4 1-16 Concurrent DB writer tasks
AuditIngestionBatchTimeout 100ms 10ms-5s Max wait time for partial batch to fill
Environment variables:
SERVICECONTROL_AUDIT_AuditIngestionBatchSize=50
SERVICECONTROL_AUDIT_AuditIngestionMaxParallelWriters=4
SERVICECONTROL_AUDIT_AuditIngestionBatchTimeout=00:00:00.100

Key Code Changes

1. Two-Channel Architecture

Before: Single channel from transport to consumer

readonly Channel<MessageContext> channel;

After: Two channels - messages and assembled batches

readonly Channel<MessageContext> messageChannel;      // Transport → Batch Assembler
readonly Channel<List<MessageContext>> batchChannel;  // Batch Assembler → Writers

2. Batch Assembler Task

New BatchAssemblerLoop that:

  • Reads individual messages from messageChannel
  • Assembles batches up to BatchSize
  • Waits up to BatchTimeout for partial batches to fill
  • Writes assembled batches to batchChannel

3. Parallel Writer Tasks

New WriterLoop (runs MaxParallelWriters instances) that:

  • Reads batches from batchChannel concurrently
  • Calls auditIngestor.Ingest() in parallel
  • Completes TaskCompletionSources on success/failure

4. Bug Fixes Applied

Issue Fix
SemaphoreSlim not disposed Added Dispose() override
Cancellation loses in-flight batch Track currentBatch and signal cancellation on shutdown
Task.Run with cancelled token throws immediately Pass CancellationToken.None to Task.Run

Throughput Analysis

Before

  • Transport concurrency: 32 (default)
  • Channel capacity: 32
  • DB writes: Sequential (1 at a time)
  • Effective throughput: 32 messages / DB_write_time

After

  • Transport concurrency: Configurable (e.g., 100)
  • Message channel capacity: 400 (50 × 4 × 2)
  • Batch channel capacity: 8 (4 × 2)
  • DB writes: 4 concurrent
  • Effective throughput: 4 × 50 messages / DB_write_time = 200 messages / DB_write_time
    Theoretical improvement: ~6x throughput (varies based on DB latency and transport speed)

Preserved Guarantees

Guarantee How It's Preserved
At-least-once delivery Each message's TaskCompletionSource is only completed after its batch persists
Message ordering Not required (audit messages are independent)
Back-pressure Bounded channels with FullMode.Wait
Graceful shutdown Drain channels, signal in-flight batches
Error isolation Failed batch only affects messages in that batch

Files Modified

  1. src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs
    • Added 3 new configuration properties
    • Added validation methods for each setting
  2. src/ServiceControl.Audit/Auditing/AuditIngestion.cs
    • Complete refactor of ExecuteAsync into multi-task architecture
    • Added BatchAssemblerLoop method
    • Added WriterLoop method
    • Updated StopAsync for graceful multi-task shutdown
    • Added Dispose() override for SemaphoreSlim

DB Connection Impact

Before After
1 connection at a time Up to 4 concurrent connections

This is negligible impact on typical connection pools (default: 100 connections).

Introduces Azure Blob Storage as an alternative to file system storage for audit message bodies.

This change allows configuring Audit Persistence to store message bodies in Azure Blob Storage by providing a connection string, offering scalability and cost-effectiveness. It also adds compression for larger messages to optimize storage.

The existing file system storage remains an option if a path is configured.
Adds support for a dedicated connection string for message body storage.

This allows users to configure a separate database or storage account
specifically for storing large message bodies, potentially improving
performance and scalability.
Stores message bodies to disk in parallel to improve ingestion performance.

Instead of awaiting the completion of each write operation, it queues them,
allowing multiple write tasks to run concurrently.
It then awaits all tasks before saving the changes to the database.
Updates the configuration to no longer default the message body storage path to a location under `CommonApplicationData`. The path will now be empty by default. This change allows users to explicitly configure the storage location, preventing potential issues with default locations.
Refactors the Azure Blob Storage persistence to streamline its configuration.
It removes the direct instantiation of BlobContainerClient within the base class and
instead, registers the AzureBlobBodyStoragePersistence class for dependency injection,
allowing the constructor to handle the BlobContainerClient creation.

Additionally, it ensures that the ContentType metadata stored in Azure Blob Storage is properly encoded and decoded
to handle special characters.

Also, it adds MessageBodyStorageConnectionStringKey to the configuration keys for both PostgreSQL and SQL Server.
Implements data retention policy for audit messages and saga snapshots using a background service.

This change introduces a base `RetentionCleaner` class that handles the logic for deleting expired audit data in batches.  Database-specific implementations are provided for SQL Server and PostgreSQL, leveraging their respective locking mechanisms (sp_getapplock and advisory locks) to prevent concurrent executions of the cleanup process.

Removes the registration of the `RetentionCleaner` from the base class and registers it on specific implementations.

The cleanup process deletes processed messages and saga snapshots older than the configured retention period, optimizing database space and improving query performance.
Wraps retention cleanup process in an execution strategy
to handle transient database errors. Moves lock check
to inside the execution strategy, and only logs success
if the lock was acquired.
Resets the total deleted messages and snapshots counters,
as well as the lockAcquired flag, on each retry attempt of
the retention cleaner process. This prevents accumulation
of values across retries when the execution strategy is used.

Also, updates lock acquisition logic to use `AsAsyncEnumerable()`
to prevent errors caused by non-composable SQL in
`SqlQueryRaw` calls.
Adds metrics to monitor the retention cleanup process. This includes metrics for cleanup cycle duration, batch duration, deleted messages, skipped locks, and consecutive failures.

These metrics provide insights into the performance and health of the retention cleanup process, allowing for better monitoring and troubleshooting.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant