From 4be99f2f180f6303686597f8502fbfdc43a67ffd Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Mon, 26 Jan 2026 14:12:12 +0100 Subject: [PATCH 1/5] Add strategic_metadata to insert_submission --- libs/opsqueue_python/python/opsqueue/producer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libs/opsqueue_python/python/opsqueue/producer.py b/libs/opsqueue_python/python/opsqueue/producer.py index a7d57e6..e5e3015 100644 --- a/libs/opsqueue_python/python/opsqueue/producer.py +++ b/libs/opsqueue_python/python/opsqueue/producer.py @@ -133,6 +133,7 @@ def insert_submission( chunk_size: int, serialization_format: SerializationFormat = DEFAULT_SERIALIZATION_FORMAT, metadata: None | bytes = None, + strategic_metadata: None | dict[str, str | int] = None, ) -> SubmissionId: """ Inserts a submission into the queue, @@ -147,6 +148,7 @@ def insert_submission( return self.insert_submission_chunks( _chunk_iterator(ops, chunk_size, serialization_format), metadata=metadata, + strategic_metadata=strategic_metadata, chunk_size=chunk_size, ) From 9123ed579bca4feeb3e7bd06f2507315481a8ec3 Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Mon, 26 Jan 2026 14:44:42 +0100 Subject: [PATCH 2/5] Add strategic_metadata to SubmissionCompleted --- libs/opsqueue_python/src/common.rs | 8 ++++++-- opsqueue/src/common/submission.rs | 25 ++++++++++++++++++++----- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index ba5e8aa..539f4d3 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::time::Duration; use chrono::{DateTime, Utc}; +use opsqueue::common::{StrategicMetadataMap}; use opsqueue::common::errors::TryFromIntError; use opsqueue::common::submission::Metadata; use opsqueue::object_store::{ChunkRetrievalError, ChunkType, ObjectStoreClient}; @@ -279,6 +280,7 @@ impl From for SubmissionCompl completed_at: value.completed_at, chunks_total: value.chunks_total.into(), metadata: value.metadata, + strategic_metadata: value.strategic_metadata, } } } @@ -373,11 +375,12 @@ impl SubmissionStatus { impl SubmissionCompleted { fn __repr__(&self) -> String { format!( - "SubmissionCompleted(id={0}, chunks_total={1}, completed_at={2}, metadata={3:?})", + "SubmissionCompleted(id={0}, chunks_total={1}, completed_at={2}, metadata={3:?}, strategic_metadata={4:?})", self.id.__repr__(), self.chunks_total, self.completed_at, - self.metadata + self.metadata, + self.strategic_metadata ) } } @@ -396,6 +399,7 @@ pub struct SubmissionCompleted { pub id: SubmissionId, pub chunks_total: u64, pub metadata: Option, + pub strategic_metadata: Option, pub completed_at: DateTime, } diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index 0331180..6f45558 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -2,6 +2,7 @@ use std::fmt::Display; use std::time::Duration; +use crate::common::StrategicMetadataMap; use chrono::{DateTime, Utc}; use ux::u63; @@ -164,6 +165,7 @@ pub struct SubmissionCompleted { pub chunks_total: ChunkCount, pub chunk_size: ChunkSize, pub metadata: Option, + pub strategic_metadata: Option, pub completed_at: DateTime, pub otel_trace_carrier: String, } @@ -530,15 +532,18 @@ pub mod db { return Ok(Some(SubmissionStatus::InProgress(submission))); } - let completed_submission = query_as!( - SubmissionCompleted, + let row_opt = query!( r#" SELECT id AS "id: SubmissionId" , prefix , chunks_total AS "chunks_total: ChunkCount" - , chunk_size AS "chunk_size: ChunkSize" + , chunk_size AS "chunk_size!: ChunkSize" , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions_completed.id + ) AS "strategic_metadata: sqlx::types::Json" , completed_at AS "completed_at: DateTime" , otel_trace_carrier FROM submissions_completed WHERE id = $1 @@ -547,8 +552,18 @@ pub mod db { ) .fetch_optional(conn.get_inner()) .await?; - if let Some(completed_submission) = completed_submission { - return Ok(Some(SubmissionStatus::Completed(completed_submission))); + if let Some(row) = row_opt { + let submission_completed = SubmissionCompleted { + id: row.id, + prefix: row.prefix, + chunks_total: row.chunks_total, + chunk_size: row.chunk_size, + metadata: row.metadata, + strategic_metadata: row.strategic_metadata.map(|json| json.0), + completed_at: row.completed_at, + otel_trace_carrier: row.otel_trace_carrier, + }; + return Ok(Some(SubmissionStatus::Completed(submission_completed))); } let failed_submission = query_as!( From 881d5e9fa72cab4682072f299e996ceab20ab237 Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Thu, 5 Feb 2026 14:35:54 +0100 Subject: [PATCH 3/5] Add strategic_metadata to SubmissionFailed --- libs/opsqueue_python/src/common.rs | 2 ++ opsqueue/src/common/submission.rs | 30 ++++++++++++++++++++++-------- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index 539f4d3..612a5bf 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -292,6 +292,7 @@ impl From for SubmissionFailed { failed_at: value.failed_at, chunks_total: value.chunks_total.into(), metadata: value.metadata, + strategic_metadata: value.strategic_metadata, failed_chunk_id: value.failed_chunk_id.into(), } } @@ -409,6 +410,7 @@ pub struct SubmissionFailed { pub id: SubmissionId, pub chunks_total: u64, pub metadata: Option, + pub strategic_metadata: Option, pub failed_at: DateTime, pub failed_chunk_id: u64, } diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index 6f45558..497d77d 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -177,6 +177,7 @@ pub struct SubmissionFailed { pub chunks_total: ChunkCount, pub chunk_size: ChunkSize, pub metadata: Option, + pub strategic_metadata: Option, pub failed_at: DateTime, pub failed_chunk_id: ChunkIndex, pub otel_trace_carrier: String, @@ -532,7 +533,7 @@ pub mod db { return Ok(Some(SubmissionStatus::InProgress(submission))); } - let row_opt = query!( + let completed_row_opt = query!( r#" SELECT id AS "id: SubmissionId" @@ -552,7 +553,7 @@ pub mod db { ) .fetch_optional(conn.get_inner()) .await?; - if let Some(row) = row_opt { + if let Some(row) = completed_row_opt { let submission_completed = SubmissionCompleted { id: row.id, prefix: row.prefix, @@ -566,15 +567,18 @@ pub mod db { return Ok(Some(SubmissionStatus::Completed(submission_completed))); } - let failed_submission = query_as!( - SubmissionFailed, + let failed_row_opt = query!( r#" SELECT id AS "id: SubmissionId" , prefix , chunks_total AS "chunks_total: ChunkCount" - , chunk_size AS "chunk_size: ChunkSize" + , chunk_size AS "chunk_size!: ChunkSize" , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions_failed.id + ) AS "strategic_metadata: sqlx::types::Json" , failed_at AS "failed_at: DateTime" , failed_chunk_id AS "failed_chunk_id: ChunkIndex" , otel_trace_carrier @@ -584,15 +588,25 @@ pub mod db { ) .fetch_optional(conn.get_inner()) .await?; - if let Some(failed_submission) = failed_submission { - let failed_chunk_id = (failed_submission.id, failed_submission.failed_chunk_id).into(); + if let Some(row) = failed_row_opt { + let failed_submission = SubmissionFailed { + id: row.id, + prefix: row.prefix, + chunks_total: row.chunks_total, + chunk_size: row.chunk_size, + metadata: row.metadata, + strategic_metadata: row.strategic_metadata.map(|json| json.0), + failed_at: row.failed_at, + failed_chunk_id: row.failed_chunk_id, + otel_trace_carrier: row.otel_trace_carrier, + }; + let failed_chunk_id = (row.id, row.failed_chunk_id).into(); let failed_chunk = super::chunk::db::get_chunk_failed(failed_chunk_id, conn).await?; return Ok(Some(SubmissionStatus::Failed( failed_submission, failed_chunk, ))); } - Ok(None) } From 93c426b220732c03b6e168c90eaa1dec3a34880d Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Thu, 5 Feb 2026 16:09:41 +0100 Subject: [PATCH 4/5] fixup! Add strategic_metadata to SubmissionFailed fixup --- libs/opsqueue_python/src/common.rs | 4 ++-- opsqueue/src/consumer/server/state.rs | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index 612a5bf..7caf08c 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -389,8 +389,8 @@ impl SubmissionCompleted { #[pymethods] impl SubmissionFailed { fn __repr__(&self) -> String { - format!("SubmissionFailed(id={0}, chunks_total={1}, failed_at={2}, failed_chunk_id={3}, metadata={4:?})", - self.id.__repr__(), self.chunks_total, self.failed_at, self.failed_chunk_id, self.metadata) + format!("SubmissionFailed(id={0}, chunks_total={1}, failed_at={2}, failed_chunk_id={3}, metadata={4:?}, strategic_metadata={5:?})", + self.id.__repr__(), self.chunks_total, self.failed_at, self.failed_chunk_id, self.metadata, self.strategic_metadata) } } diff --git a/opsqueue/src/consumer/server/state.rs b/opsqueue/src/consumer/server/state.rs index 160a9fe..38ce38a 100644 --- a/opsqueue/src/consumer/server/state.rs +++ b/opsqueue/src/consumer/server/state.rs @@ -4,7 +4,6 @@ use std::sync::Mutex; use axum_prometheus::metrics::histogram; use opentelemetry::trace::TraceContextExt; -use tracing::Span; use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::common::chunk; From 0b30a8e5af0b63a177f3b68c949395fe217886a2 Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Thu, 5 Feb 2026 16:27:16 +0100 Subject: [PATCH 5/5] fixup! Add strategic_metadata to SubmissionFailed rustfmt --- libs/opsqueue_python/src/common.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index 7caf08c..9c1b5cc 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -3,9 +3,9 @@ use std::sync::Arc; use std::time::Duration; use chrono::{DateTime, Utc}; -use opsqueue::common::{StrategicMetadataMap}; use opsqueue::common::errors::TryFromIntError; use opsqueue::common::submission::Metadata; +use opsqueue::common::StrategicMetadataMap; use opsqueue::object_store::{ChunkRetrievalError, ChunkType, ObjectStoreClient}; use opsqueue::tracing::CarrierMap; use pyo3::prelude::*;