Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions libs/opsqueue_python/python/opsqueue/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def insert_submission(
chunk_size: int,
serialization_format: SerializationFormat = DEFAULT_SERIALIZATION_FORMAT,
metadata: None | bytes = None,
strategic_metadata: None | dict[str, str | int] = None,
) -> SubmissionId:
"""
Inserts a submission into the queue,
Expand All @@ -147,6 +148,7 @@ def insert_submission(
return self.insert_submission_chunks(
_chunk_iterator(ops, chunk_size, serialization_format),
metadata=metadata,
strategic_metadata=strategic_metadata,
chunk_size=chunk_size,
)

Expand Down
14 changes: 10 additions & 4 deletions libs/opsqueue_python/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Duration;
use chrono::{DateTime, Utc};
use opsqueue::common::errors::TryFromIntError;
use opsqueue::common::submission::Metadata;
use opsqueue::common::StrategicMetadataMap;
use opsqueue::object_store::{ChunkRetrievalError, ChunkType, ObjectStoreClient};
use opsqueue::tracing::CarrierMap;
use pyo3::prelude::*;
Expand Down Expand Up @@ -279,6 +280,7 @@ impl From<opsqueue::common::submission::SubmissionCompleted> for SubmissionCompl
completed_at: value.completed_at,
chunks_total: value.chunks_total.into(),
metadata: value.metadata,
strategic_metadata: value.strategic_metadata,
}
}
}
Expand All @@ -290,6 +292,7 @@ impl From<opsqueue::common::submission::SubmissionFailed> for SubmissionFailed {
failed_at: value.failed_at,
chunks_total: value.chunks_total.into(),
metadata: value.metadata,
strategic_metadata: value.strategic_metadata,
failed_chunk_id: value.failed_chunk_id.into(),
}
}
Expand Down Expand Up @@ -373,20 +376,21 @@ impl SubmissionStatus {
impl SubmissionCompleted {
fn __repr__(&self) -> String {
format!(
"SubmissionCompleted(id={0}, chunks_total={1}, completed_at={2}, metadata={3:?})",
"SubmissionCompleted(id={0}, chunks_total={1}, completed_at={2}, metadata={3:?}, strategic_metadata={4:?})",
self.id.__repr__(),
self.chunks_total,
self.completed_at,
self.metadata
self.metadata,
self.strategic_metadata
)
}
}

#[pymethods]
impl SubmissionFailed {
fn __repr__(&self) -> String {
format!("SubmissionFailed(id={0}, chunks_total={1}, failed_at={2}, failed_chunk_id={3}, metadata={4:?})",
self.id.__repr__(), self.chunks_total, self.failed_at, self.failed_chunk_id, self.metadata)
format!("SubmissionFailed(id={0}, chunks_total={1}, failed_at={2}, failed_chunk_id={3}, metadata={4:?}, strategic_metadata={5:?})",
self.id.__repr__(), self.chunks_total, self.failed_at, self.failed_chunk_id, self.metadata, self.strategic_metadata)
}
}

Expand All @@ -396,6 +400,7 @@ pub struct SubmissionCompleted {
pub id: SubmissionId,
pub chunks_total: u64,
pub metadata: Option<submission::Metadata>,
pub strategic_metadata: Option<StrategicMetadataMap>,
pub completed_at: DateTime<Utc>,
}

Expand All @@ -405,6 +410,7 @@ pub struct SubmissionFailed {
pub id: SubmissionId,
pub chunks_total: u64,
pub metadata: Option<submission::Metadata>,
pub strategic_metadata: Option<StrategicMetadataMap>,
pub failed_at: DateTime<Utc>,
pub failed_chunk_id: u64,
}
Expand Down
51 changes: 40 additions & 11 deletions opsqueue/src/common/submission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::fmt::Display;
use std::time::Duration;

use crate::common::StrategicMetadataMap;
use chrono::{DateTime, Utc};
use ux::u63;

Expand Down Expand Up @@ -164,6 +165,7 @@ pub struct SubmissionCompleted {
pub chunks_total: ChunkCount,
pub chunk_size: ChunkSize,
pub metadata: Option<Metadata>,
pub strategic_metadata: Option<StrategicMetadataMap>,
pub completed_at: DateTime<Utc>,
pub otel_trace_carrier: String,
}
Expand All @@ -175,6 +177,7 @@ pub struct SubmissionFailed {
pub chunks_total: ChunkCount,
pub chunk_size: ChunkSize,
pub metadata: Option<Metadata>,
pub strategic_metadata: Option<StrategicMetadataMap>,
pub failed_at: DateTime<Utc>,
pub failed_chunk_id: ChunkIndex,
pub otel_trace_carrier: String,
Expand Down Expand Up @@ -530,15 +533,18 @@ pub mod db {
return Ok(Some(SubmissionStatus::InProgress(submission)));
}

let completed_submission = query_as!(
SubmissionCompleted,
let completed_row_opt = query!(
r#"
SELECT
id AS "id: SubmissionId"
, prefix
, chunks_total AS "chunks_total: ChunkCount"
, chunk_size AS "chunk_size: ChunkSize"
, chunk_size AS "chunk_size!: ChunkSize"
, metadata
, ( SELECT json_group_object(metadata_key, metadata_value)
FROM submissions_metadata
WHERE submission_id = submissions_completed.id
) AS "strategic_metadata: sqlx::types::Json<StrategicMetadataMap>"
, completed_at AS "completed_at: DateTime<Utc>"
, otel_trace_carrier
FROM submissions_completed WHERE id = $1
Expand All @@ -547,19 +553,32 @@ pub mod db {
)
.fetch_optional(conn.get_inner())
.await?;
if let Some(completed_submission) = completed_submission {
return Ok(Some(SubmissionStatus::Completed(completed_submission)));
if let Some(row) = completed_row_opt {
let submission_completed = SubmissionCompleted {
id: row.id,
prefix: row.prefix,
chunks_total: row.chunks_total,
chunk_size: row.chunk_size,
metadata: row.metadata,
strategic_metadata: row.strategic_metadata.map(|json| json.0),
completed_at: row.completed_at,
otel_trace_carrier: row.otel_trace_carrier,
};
return Ok(Some(SubmissionStatus::Completed(submission_completed)));
}

let failed_submission = query_as!(
SubmissionFailed,
let failed_row_opt = query!(
r#"
SELECT
id AS "id: SubmissionId"
, prefix
, chunks_total AS "chunks_total: ChunkCount"
, chunk_size AS "chunk_size: ChunkSize"
, chunk_size AS "chunk_size!: ChunkSize"
, metadata
, ( SELECT json_group_object(metadata_key, metadata_value)
FROM submissions_metadata
WHERE submission_id = submissions_failed.id
) AS "strategic_metadata: sqlx::types::Json<StrategicMetadataMap>"
, failed_at AS "failed_at: DateTime<Utc>"
, failed_chunk_id AS "failed_chunk_id: ChunkIndex"
, otel_trace_carrier
Expand All @@ -569,15 +588,25 @@ pub mod db {
)
.fetch_optional(conn.get_inner())
.await?;
if let Some(failed_submission) = failed_submission {
let failed_chunk_id = (failed_submission.id, failed_submission.failed_chunk_id).into();
if let Some(row) = failed_row_opt {
let failed_submission = SubmissionFailed {
id: row.id,
prefix: row.prefix,
chunks_total: row.chunks_total,
chunk_size: row.chunk_size,
metadata: row.metadata,
strategic_metadata: row.strategic_metadata.map(|json| json.0),
failed_at: row.failed_at,
failed_chunk_id: row.failed_chunk_id,
otel_trace_carrier: row.otel_trace_carrier,
};
let failed_chunk_id = (row.id, row.failed_chunk_id).into();
let failed_chunk = super::chunk::db::get_chunk_failed(failed_chunk_id, conn).await?;
return Ok(Some(SubmissionStatus::Failed(
failed_submission,
failed_chunk,
)));
}

Ok(None)
}

Expand Down
1 change: 0 additions & 1 deletion opsqueue/src/consumer/server/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::sync::Mutex;

use axum_prometheus::metrics::histogram;
use opentelemetry::trace::TraceContextExt;
use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::common::chunk;
Expand Down
Loading