Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2103,6 +2103,11 @@ tests.

### Style

```
# runs ruff + cargo fmt
poe format
```

* Mostly [Google Style Guide](https://google.github.io/styleguide/pyguide.html). Notable exceptions:
* We use [ruff](https://docs.astral.sh/ruff/) for formatting, so that takes precedence
* In tests and example code, can import individual classes/functions to make it more readable. Can also do this for
Expand Down
40 changes: 0 additions & 40 deletions temporalio/contrib/opentelemetry/_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,46 +355,6 @@ async def start_activity(
):
return await super().start_activity(input)

async def cancel_activity(
self, input: temporalio.client.CancelActivityInput
) -> None:
with self.root._start_as_current_span(
"CancelActivity",
attributes={"temporalActivityID": input.activity_id},
kind=opentelemetry.trace.SpanKind.CLIENT,
):
return await super().cancel_activity(input)

async def terminate_activity(
self, input: temporalio.client.TerminateActivityInput
) -> None:
with self.root._start_as_current_span(
"TerminateActivity",
attributes={"temporalActivityID": input.activity_id},
kind=opentelemetry.trace.SpanKind.CLIENT,
):
return await super().terminate_activity(input)

async def describe_activity(
self, input: temporalio.client.DescribeActivityInput
) -> temporalio.client.ActivityExecutionDescription:
with self.root._start_as_current_span(
"DescribeActivity",
attributes={"temporalActivityID": input.activity_id},
kind=opentelemetry.trace.SpanKind.CLIENT,
):
return await super().describe_activity(input)

async def count_activities(
self, input: temporalio.client.CountActivitiesInput
) -> temporalio.client.ActivityExecutionCount:
with self.root._start_as_current_span(
"CountActivities",
attributes={},
kind=opentelemetry.trace.SpanKind.CLIENT,
):
return await super().count_activities(input)


class _TracingActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor):
def __init__(
Expand Down
48 changes: 0 additions & 48 deletions temporalio/contrib/opentelemetry/_otel_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,54 +319,6 @@ async def start_activity(
input.headers = _context_to_headers(input.headers)
return await super().start_activity(input)

async def cancel_activity(
self, input: temporalio.client.CancelActivityInput
) -> None:
with _maybe_span(
get_tracer(__name__),
"CancelActivity",
add_temporal_spans=self._add_temporal_spans,
attributes={"temporalActivityID": input.activity_id},
kind=opentelemetry.trace.SpanKind.CLIENT,
):
return await super().cancel_activity(input)

async def terminate_activity(
self, input: temporalio.client.TerminateActivityInput
) -> None:
with _maybe_span(
get_tracer(__name__),
"TerminateActivity",
add_temporal_spans=self._add_temporal_spans,
attributes={"temporalActivityID": input.activity_id},
kind=opentelemetry.trace.SpanKind.CLIENT,
):
return await super().terminate_activity(input)

async def describe_activity(
self, input: temporalio.client.DescribeActivityInput
) -> temporalio.client.ActivityExecutionDescription:
with _maybe_span(
get_tracer(__name__),
"DescribeActivity",
add_temporal_spans=self._add_temporal_spans,
attributes={"temporalActivityID": input.activity_id},
kind=opentelemetry.trace.SpanKind.CLIENT,
):
return await super().describe_activity(input)

async def count_activities(
self, input: temporalio.client.CountActivitiesInput
) -> temporalio.client.ActivityExecutionCount:
with _maybe_span(
get_tracer(__name__),
"CountActivities",
add_temporal_spans=self._add_temporal_spans,
attributes={},
kind=opentelemetry.trace.SpanKind.CLIENT,
):
return await super().count_activities(input)


class _TracingActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor):
def __init__(
Expand Down
40 changes: 10 additions & 30 deletions tests/contrib/opentelemetry/test_opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,7 @@ async def test_opentelemetry_standalone_activity_tracing(
client = Client(**client_config)

task_queue = f"task_queue_{uuid.uuid4()}"
activity_id = f"activity_{uuid.uuid4()}"
async with Worker(
client,
task_queue=task_queue,
Expand All @@ -968,44 +969,23 @@ async def test_opentelemetry_standalone_activity_tracing(
handle = await client.start_activity(
tracing_activity,
TracingActivityParam(heartbeat=False),
id=f"activity_{uuid.uuid4()}",
id=activity_id,
task_queue=task_queue,
schedule_to_close_timeout=timedelta(seconds=10),
)
await handle.result()

# Use a queue with no worker so activities stay in SCHEDULED state,
# allowing describe/cancel/terminate to be called without a race.
no_worker_queue = f"task_queue_{uuid.uuid4()}"

cancel_handle = await client.start_activity(
tracing_activity,
TracingActivityParam(heartbeat=False),
id=f"activity_{uuid.uuid4()}",
task_queue=no_worker_queue,
schedule_to_close_timeout=timedelta(seconds=30),
)
await cancel_handle.describe()
await cancel_handle.cancel()

terminate_handle = await client.start_activity(
tracing_activity,
TracingActivityParam(heartbeat=False),
id=f"activity_{uuid.uuid4()}",
task_queue=no_worker_queue,
schedule_to_close_timeout=timedelta(seconds=30),
)
await terminate_handle.terminate()

assert dump_spans(exporter.get_finished_spans(), with_attributes=False) == [
finished_spans = exporter.get_finished_spans()
assert dump_spans(finished_spans, with_attributes=False) == [
"StartActivity:tracing_activity",
" RunActivity:tracing_activity",
"StartActivity:tracing_activity",
"DescribeActivity",
"CancelActivity",
"StartActivity:tracing_activity",
"TerminateActivity",
]
start_activity_span = next(
s for s in finished_spans if s.name == "StartActivity:tracing_activity"
)
assert start_activity_span.attributes is not None
assert start_activity_span.attributes["temporalActivityID"] == activity_id
assert start_activity_span.attributes["temporalActivityType"] == "tracing_activity"


def test_opentelemetry_safe_detach():
Expand Down
44 changes: 16 additions & 28 deletions tests/contrib/opentelemetry/test_opentelemetry_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,49 +585,37 @@ async def test_otel_standalone_activity_tracing(
new_config["plugins"] = [OpenTelemetryPlugin(add_temporal_spans=True)]
new_client = Client(**new_config)

activity_id = f"activity_{uuid.uuid4()}"
async with new_worker(
new_client,
activities=[simple_no_context_activity],
) as worker:
handle = await new_client.start_activity(
simple_no_context_activity,
id=f"activity_{uuid.uuid4()}",
id=activity_id,
task_queue=worker.task_queue,
schedule_to_close_timeout=timedelta(seconds=10),
)
await handle.result()

# Use a queue with no worker so activities stay in SCHEDULED state,
# allowing describe/cancel/terminate to be called without a race.
no_worker_queue = f"task_queue_{uuid.uuid4()}"

cancel_handle = await new_client.start_activity(
simple_no_context_activity,
id=f"activity_{uuid.uuid4()}",
task_queue=no_worker_queue,
schedule_to_close_timeout=timedelta(seconds=30),
)
await cancel_handle.describe()
await cancel_handle.cancel()

terminate_handle = await new_client.start_activity(
simple_no_context_activity,
id=f"activity_{uuid.uuid4()}",
task_queue=no_worker_queue,
schedule_to_close_timeout=timedelta(seconds=30),
)
await terminate_handle.terminate()

assert dump_spans(exporter.get_finished_spans(), with_attributes=False) == [
finished_spans = exporter.get_finished_spans()
assert dump_spans(finished_spans, with_attributes=False) == [
"StartActivity:simple_no_context_activity",
" RunActivity:simple_no_context_activity",
" Activity",
"StartActivity:simple_no_context_activity",
"DescribeActivity",
"CancelActivity",
"StartActivity:simple_no_context_activity",
"TerminateActivity",
]
start_activity_span = next(
s
for s in finished_spans
if s.name == "StartActivity:simple_no_context_activity"
and s.attributes is not None
and s.attributes.get("temporalActivityID") == activity_id
)
assert start_activity_span.attributes is not None
assert (
start_activity_span.attributes["temporalActivityType"]
== "simple_no_context_activity"
)


def test_replay_safe_span_delegates_extra_attributes():
Expand Down
Loading