Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
722bf3e
Add basic tracing middleware and global control
davidigandan Jan 13, 2026
52cb04d
Instrument on subscribe and add dcid to span attributes
davidigandan Jan 26, 2026
cc9ee12
Add spanid and traceid metadata to greylog
davidigandan Jan 26, 2026
f7cc658
Add recipe_id to spans
davidigandan Jan 26, 2026
8b2a2f1
Add dev and prod dependencies
davidigandan Jan 26, 2026
0686e28
Remove dcid extract from message and inject to span logic. Will be ad…
davidigandan Jan 26, 2026
2d9e21c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 26, 2026
3a5283a
Use plugin configurations to configure connection to OTELCollector
davidigandan Jan 26, 2026
4b999f1
Remove vestigial dcid handling and unnecessary debug statements
davidigandan Jan 26, 2026
4b86715
remove unhelpful docstring
davidigandan Jan 26, 2026
9c13d07
Merge branch 'dev' of https://github.com/DiamondLightSource/python-wo…
davidigandan Jan 26, 2026
3e0b902
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 26, 2026
d446e80
imported OTEL config class to common_service
davidigandan Jan 26, 2026
16b0e10
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 26, 2026
7ad857f
add marshmallow dependency
davidigandan Jan 27, 2026
468f940
Merge branch 'dev' of https://github.com/DiamondLightSource/python-wo…
davidigandan Jan 27, 2026
7aae664
add zocalo dependency
davidigandan Jan 27, 2026
902a7df
Fix possibly unbound error
davidigandan Jan 27, 2026
9e5adb7
Moved plugin functionality to python-workflows
davidigandan Feb 2, 2026
1d7457e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ classifiers = [
]
license = { text = "BSD-3-Clause" }
requires-python = ">=3.10"
dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7"]
dependencies = ["zocalo","marshmallow","bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api==1.20.0", "opentelemetry-sdk==1.20.0", "opentelemetry-exporter-otlp-proto-http==1.20.0" ]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove zocalo and marshmallow


[project.urls]
Download = "https://github.com/DiamondLightSource/python-workflows/releases"
Expand Down Expand Up @@ -53,6 +53,7 @@ OfflineTransport = "workflows.transport.offline_transport:OfflineTransport"
pika = "workflows.util.zocalo.configuration:Pika"
stomp = "workflows.util.zocalo.configuration:Stomp"
transport = "workflows.util.zocalo.configuration:DefaultTransport"
opentelemetry = "workflows.util.zocalo.configuration:OTEL"

[project.scripts]
"workflows.validate_recipe" = "workflows.recipe.validate:main"
Expand Down
4 changes: 4 additions & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ pytest-mock==3.14.0
pytest-timeout==2.3.1
stomp-py==8.1.2
websocket-client==1.8.0
opentelemetry-api==1.20.0
opentelemetry-sdk==1.20.0
opentelemetry-exporter-otlp-proto-http==1.20.0
marshmallow
33 changes: 33 additions & 0 deletions src/workflows/recipe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from collections.abc import Callable
from typing import Any

from opentelemetry import trace

from workflows.recipe.recipe import Recipe
from workflows.recipe.validate import validate_recipe
from workflows.recipe.wrapper import RecipeWrapper
Expand Down Expand Up @@ -69,6 +71,37 @@ def unwrap_recipe(header, message):
message = mangle_for_receiving(message)
if header.get("workflows-recipe") in {True, "True", "true", 1}:
rw = RecipeWrapper(message=message, transport=transport_layer)

# Extract recipe_id on the current span
span = trace.get_current_span()
recipe_id = None

# Extract recipe ID from environment
if isinstance(message, dict):
environment = message.get("environment", {})
if isinstance(environment, dict):
recipe_id = environment.get("ID")
Comment on lines +80 to +83
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rw is guaranteed to be initialised here, line 105 accesses to get this same data, so probably can be replaced by

recipe_id = rw.environment.get("ID")

or, to taste,

if recipe_id := rw.environment.get("ID"):

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


if recipe_id:
span.set_attribute("recipe_id", recipe_id)
span.add_event(
"recipe.id_extracted", attributes={"recipe_id": recipe_id}
)
Comment on lines +87 to +89
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed- fixed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed-fixed


# Extract span_id and trace_id for logging
span_context = span.get_span_context()
if span_context and span_context.is_valid:
span_id = format(span_context.span_id, "016x")
trace_id = format(span_context.trace_id, "032x")

log_extra = {
"span_id": span_id,
"trace_id": trace_id,
}

if recipe_id:
log_extra["recipe_id"] = recipe_id
Comment on lines +97 to +103
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this currently appears vestigial; did you mean to use log_extender?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like the easiest way to attach this is via log_extender. possibly https://docs.python.org/3/library/contextlib.html#contextlib.ExitStack helps with the possibly-not-present-multiple context managers


if log_extender and rw.environment and rw.environment.get("ID"):
with log_extender("recipe_ID", rw.environment["ID"]):
return callback(rw, header, message.get("payload"))
Expand Down
57 changes: 57 additions & 0 deletions src/workflows/services/common_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@
import time
from typing import Any

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

import workflows
import workflows.logging
from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware


class Status(enum.Enum):
Expand Down Expand Up @@ -185,6 +192,56 @@ def start_transport(self):
self.transport.subscription_callback_set_intercept(
self._transport_interceptor
)
try:
# Configure OTELTracing if configuration is available
otel_config = (
self.config.opentelemetry
if self.config and hasattr(self.config, "opentelemetry")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like in https://github.com/DiamondLightSource/python-zocalo/blob/b3e3ca4addba6e61fab0bef2fcb825a49e73938a/src/zocalo/configuration/__init__.py#L150 that the name set is _opentelemetry - assuming the self.config object is what we expect

else None
)

if otel_config:
if "endpoint" not in otel_config:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have already validated these in the configuration.OTEL plugin then you do not need to do so again; also - if you have asked to configure but provided wrong values, this is an error

self.log.warning(
"Missing required OTEL configuration field `endpoint`."
)

if "timeout" not in otel_config:
self.log.warning(
"Missing optional OTEL configuration field `timout`. Will default to 10 seconds. "
)

# Configure OTELTracing
resource = Resource.create(
{
SERVICE_NAME: self._service_name,
}
)

self.log.debug("Configuring OTELTracing")
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)

# Configure BatchProcessor and OTLPSpanExporter using config values
otlp_exporter = OTLPSpanExporter(
endpoint=otel_config["endpoint"],
timeout=otel_config.get("timeout", 10),
)
span_processor = BatchSpanProcessor(otlp_exporter)
provider.add_span_processor(span_processor)

# Add OTELTracingMiddleware to the transport layer
tracer = trace.get_tracer(__name__)
otel_middleware = OTELTracingMiddleware(
tracer, service_name=self._service_name
)
self._transport.add_middleware(otel_middleware)
except Exception as e:
# Continue without tracing if configuration fails
self.log.warning(
"Failed to configure OpenTelemetry tracing: %s", str(e)
)
Comment on lines +239 to +243
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the user has explicitly requested opentelemetry then failing to configure it should probably be an error


metrics = self._environment.get("metrics")
if metrics:
import prometheus_client
Expand Down
34 changes: 34 additions & 0 deletions src/workflows/transport/middleware/otel_tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

import functools
from collections.abc import Callable

from opentelemetry import trace
from opentelemetry.propagate import extract

from workflows.transport.middleware import BaseTransportMiddleware


class OTELTracingMiddleware(BaseTransportMiddleware):
def __init__(self, tracer: trace.Tracer, service_name: str):
self.tracer = tracer
self.service_name = service_name

def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int:
@functools.wraps(callback)
def wrapped_callback(header, message):
# Extract trace context from message headers
ctx = extract(header) if header else None

# Start a new span with the extracted context
with self.tracer.start_as_current_span(
"transport.subscribe", context=ctx
) as span:
span.set_attribute("service_name", self.service_name)
span.set_attribute("channel", channel)

# Call the original callback
return callback(header, message)

# Call the next middleware with the wrapped callback
return call_next(channel, wrapped_callback, **kwargs)
26 changes: 26 additions & 0 deletions src/workflows/util/zocalo/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,32 @@
from workflows.transport.stomp_transport import StompTransport


class OTEL:
"""A Zocalo configuration plugin to pre-populate OTELTracing config defaults"""

class Schema(PluginSchema):
host = fields.Str(required=True)
port = fields.Int(required=True)
endpoint = fields.Str(required=False)
timeout = fields.Int(required=False, load_default=10)

# Store configuration for access by services
config = {}

@staticmethod
def activate(configuration):
# Build the full endpoint URL if not provided
if "endpoint" not in configuration:
endpoint = (
f"https://{configuration['host']}:{configuration['port']}/v1/traces"
)
else:
endpoint = configuration["endpoint"]

OTEL.config["endpoint"] = endpoint
OTEL.config["timeout"] = configuration.get("timeout", 10)


class Stomp:
"""A Zocalo configuration plugin to pre-populate StompTransport config defaults"""

Expand Down
Loading