Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,10 @@ src/scaffolding.config

# Visual Studio Code
.vscode

# AI config
.claude/
CLAUDE.md

# User-specific files
.local
2 changes: 2 additions & 0 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<PackageVersion Include="GitHubActionsTestLogger" Version="3.0.1" />
<PackageVersion Include="HdrHistogram" Version="2.5.0" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="8.0.21" />
<PackageVersion Include="MongoDB.Driver" Version="3.6.0" />
<PackageVersion Include="Microsoft.AspNetCore.SignalR.Client" Version="8.0.21" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.DependencyModel" Version="8.0.2" />
Expand Down Expand Up @@ -78,6 +79,7 @@
<PackageVersion Include="System.Reactive.Linq" Version="6.0.1" />
<PackageVersion Include="System.Reflection.MetadataLoadContext" Version="8.0.1" />
<PackageVersion Include="System.ServiceProcess.ServiceController" Version="8.0.1" />
<PackageVersion Include="Testcontainers.MongoDb" Version="4.3.0" />
<PackageVersion Include="Validar.Fody" Version="1.9.0" />
<PackageVersion Include="Yarp.ReverseProxy" Version="2.3.0" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace ServiceControl.Audit.Persistence.MongoDB.BodyStorage
{
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Auditing.BodyStorage;

/// <summary>
/// Stores message bodies on the file system.
/// Useful when message bodies should not be stored in the database.
/// </summary>
class FileSystemBodyStorage : IBodyStorage
{
// TODO: Implement file system body storage
// - Store bodies as files in a configurable directory
// - Use bodyId as filename (with appropriate sanitization)
// - Handle expiration via file timestamps or separate cleanup process

public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
=> throw new NotImplementedException("File system body storage not yet implemented");

public Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken)
=> throw new NotImplementedException("File system body storage not yet implemented");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
namespace ServiceControl.Audit.Persistence.MongoDB.BodyStorage
{
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Auditing.BodyStorage;
using Collections;
using Documents;
using global::MongoDB.Driver;
using NServiceBus;

/// <summary>
/// Reads message bodies stored inline in the ProcessedMessages collection.
/// Text bodies are stored as UTF-8 strings in the Body field (searchable).
/// Binary bodies are stored as BSON BinData in the BinaryBody field (not searchable).
/// This storage does not implement Store() as bodies are written directly
/// by MongoAuditIngestionUnitOfWork.
/// </summary>
class InlineBodyStorage(IMongoClientProvider clientProvider) : IBodyStorage
{
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
{
// Bodies are stored inline by MongoAuditIngestionUnitOfWork, not through IBodyStorage.Store()
// This method should not be called for inline storage
return Task.CompletedTask;
}

public async Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken)
{
var collection = clientProvider.Database.GetCollection<ProcessedMessageDocument>(CollectionNames.ProcessedMessages);

// Query for the document with the body (text or binary)
var filter = Builders<ProcessedMessageDocument>.Filter.Eq(d => d.Id, bodyId);
var projection = Builders<ProcessedMessageDocument>.Projection
.Include(d => d.Body)
.Include(d => d.BinaryBody)
.Include(d => d.Headers);

var document = await collection.Find(filter)
.Project<ProcessedMessageDocument>(projection)
.FirstOrDefaultAsync(cancellationToken)
.ConfigureAwait(false);

// Check for text body first, then binary body
byte[] bodyBytes;
if (document?.Body != null)
{
bodyBytes = System.Text.Encoding.UTF8.GetBytes(document.Body);
}
else if (document?.BinaryBody != null)
{
bodyBytes = document.BinaryBody;
}
else
{
return new StreamResult { HasResult = false };
}

// Get content type from headers
var contentType = document.Headers?.GetValueOrDefault(Headers.ContentType, "text/plain") ?? "text/plain";

return new StreamResult
{
HasResult = true,
Stream = new MemoryStream(bodyBytes),
ContentType = contentType,
BodySize = bodyBytes.Length,
Etag = document.Id
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace ServiceControl.Audit.Persistence.MongoDB.BodyStorage
{
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Auditing.BodyStorage;

/// <summary>
/// A no-op body storage implementation used when body storage is disabled.
/// </summary>
class NullBodyStorage : IBodyStorage
{
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
=> Task.CompletedTask;

public Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken)
=> Task.FromResult(new StreamResult { HasResult = false });
}
}
23 changes: 23 additions & 0 deletions src/ServiceControl.Audit.Persistence.MongoDB/BodyStorageType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace ServiceControl.Audit.Persistence.MongoDB
{
/// <summary>
/// Specifies where message bodies should be stored.
/// </summary>
public enum BodyStorageType
{
/// <summary>
/// Message bodies are not stored.
/// </summary>
None,

/// <summary>
/// Message bodies are stored in the MongoDB database.
/// </summary>
Database,

/// <summary>
/// Message bodies are stored on the file system.
/// </summary>
FileSystem
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace ServiceControl.Audit.Persistence.MongoDB.Collections
{
/// <summary>
/// Constants for MongoDB collection names.
/// </summary>
static class CollectionNames
{
public const string ProcessedMessages = "processedMessages";
public const string KnownEndpoints = "knownEndpoints";
public const string SagaSnapshots = "sagaSnapshots";
public const string FailedAuditImports = "failedAuditImports";
public const string MessageBodies = "messageBodies";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace ServiceControl.Audit.Persistence.MongoDB.Documents
{
using System.Collections.Generic;
using global::MongoDB.Bson;
using global::MongoDB.Bson.Serialization.Attributes;

class FailedAuditImportDocument
{
[BsonId]
public ObjectId Id { get; set; }

[BsonElement("messageId")]
public string MessageId { get; set; }

[BsonElement("headers")]
public Dictionary<string, string> Headers { get; set; }

[BsonElement("body")]
public byte[] Body { get; set; }

[BsonElement("exceptionInfo")]
public string ExceptionInfo { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
namespace ServiceControl.Audit.Persistence.MongoDB.Documents
{
using System;
using global::MongoDB.Bson.Serialization.Attributes;

class KnownEndpointDocument
{
[BsonId]
public string Id { get; set; }

[BsonElement("name")]
public string Name { get; set; }

[BsonElement("hostId")]
[BsonGuidRepresentation(global::MongoDB.Bson.GuidRepresentation.Standard)]
public Guid HostId { get; set; }

[BsonElement("host")]
public string Host { get; set; }

[BsonElement("lastSeen")]
public DateTime LastSeen { get; set; }

[BsonElement("expiresAt")]
public DateTime ExpiresAt { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
namespace ServiceControl.Audit.Persistence.MongoDB.Documents
{
using System;
using global::MongoDB.Bson.Serialization.Attributes;

class MessageBodyDocument
{
[BsonId]
public string Id { get; set; }

[BsonElement("contentType")]
public string ContentType { get; set; }

[BsonElement("bodySize")]
public int BodySize { get; set; }

/// <summary>
/// Text body content for text-based content types (JSON, XML, plain text).
/// Stored as string for full-text search support.
/// </summary>
[BsonElement("textBody")]
[BsonIgnoreIfNull]
public string TextBody { get; set; }

/// <summary>
/// Binary body content for non-text content types (protobuf, images, etc.).
/// Stored as byte[] for efficient storage.
/// </summary>
[BsonElement("binaryBody")]
[BsonIgnoreIfNull]
public byte[] BinaryBody { get; set; }

[BsonElement("expiresAt")]
public DateTime ExpiresAt { get; set; }

/// <summary>
/// Determines if the content type represents text-based content that can be searched.
/// </summary>
public static bool IsTextContentType(string contentType)
{
if (string.IsNullOrEmpty(contentType))
{
return false;
}

// TODO: Better way to determine text-based content types?
return contentType.StartsWith("text/", StringComparison.OrdinalIgnoreCase) ||
contentType.Contains("json", StringComparison.OrdinalIgnoreCase) ||
contentType.Contains("xml", StringComparison.OrdinalIgnoreCase);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
namespace ServiceControl.Audit.Persistence.MongoDB.Documents
{
using System;
using System.Collections.Generic;
using global::MongoDB.Bson;
using global::MongoDB.Bson.Serialization.Attributes;

class ProcessedMessageDocument
{
[BsonId]
public string Id { get; set; }

[BsonElement("uniqueMessageId")]
public string UniqueMessageId { get; set; }

[BsonElement("messageMetadata")]
public BsonDocument MessageMetadata { get; set; }

[BsonElement("headers")]
public Dictionary<string, string> Headers { get; set; }

/// <summary>
/// Text body content stored as UTF-8 string. Used for text-based messages (JSON, XML, plain text).
/// This field is included in the text search index for full-text search.
/// </summary>
[BsonElement("body")]
[BsonIgnoreIfNull]
public string Body { get; set; }

/// <summary>
/// Binary body content stored as BSON BinData. Used for non-text messages.
/// This field is NOT included in text search (binary content can't be meaningfully searched).
/// </summary>
[BsonElement("binaryBody")]
[BsonIgnoreIfNull]
public byte[] BinaryBody { get; set; }

[BsonElement("processedAt")]
public DateTime ProcessedAt { get; set; }

[BsonElement("expiresAt")]
public DateTime ExpiresAt { get; set; }

/// <summary>
/// Computed field for full-text search containing concatenated header values.
/// Headers are stored as a dictionary and can't be directly text-indexed,
/// so we flatten the values into a single searchable string.
/// </summary>
[BsonElement("headerSearchText")]
[BsonIgnoreIfNull]
public string HeaderSearchText { get; set; }
}
}
Loading