diff --git a/README.md b/README.md index 38d643881..019d6f576 100644 --- a/README.md +++ b/README.md @@ -2103,6 +2103,11 @@ tests. ### Style +``` +# runs ruff + cargo fmt +poe format +``` + * Mostly [Google Style Guide](https://google.github.io/styleguide/pyguide.html). Notable exceptions: * We use [ruff](https://docs.astral.sh/ruff/) for formatting, so that takes precedence * In tests and example code, can import individual classes/functions to make it more readable. Can also do this for diff --git a/temporalio/contrib/opentelemetry/_interceptor.py b/temporalio/contrib/opentelemetry/_interceptor.py index 2c6323707..eb22f8be6 100644 --- a/temporalio/contrib/opentelemetry/_interceptor.py +++ b/temporalio/contrib/opentelemetry/_interceptor.py @@ -355,46 +355,6 @@ async def start_activity( ): return await super().start_activity(input) - async def cancel_activity( - self, input: temporalio.client.CancelActivityInput - ) -> None: - with self.root._start_as_current_span( - "CancelActivity", - attributes={"temporalActivityID": input.activity_id}, - kind=opentelemetry.trace.SpanKind.CLIENT, - ): - return await super().cancel_activity(input) - - async def terminate_activity( - self, input: temporalio.client.TerminateActivityInput - ) -> None: - with self.root._start_as_current_span( - "TerminateActivity", - attributes={"temporalActivityID": input.activity_id}, - kind=opentelemetry.trace.SpanKind.CLIENT, - ): - return await super().terminate_activity(input) - - async def describe_activity( - self, input: temporalio.client.DescribeActivityInput - ) -> temporalio.client.ActivityExecutionDescription: - with self.root._start_as_current_span( - "DescribeActivity", - attributes={"temporalActivityID": input.activity_id}, - kind=opentelemetry.trace.SpanKind.CLIENT, - ): - return await super().describe_activity(input) - - async def count_activities( - self, input: temporalio.client.CountActivitiesInput - ) -> temporalio.client.ActivityExecutionCount: - with self.root._start_as_current_span( - "CountActivities", - attributes={}, - kind=opentelemetry.trace.SpanKind.CLIENT, - ): - return await super().count_activities(input) - class _TracingActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor): def __init__( diff --git a/temporalio/contrib/opentelemetry/_otel_interceptor.py b/temporalio/contrib/opentelemetry/_otel_interceptor.py index 089e73da7..6062f5022 100644 --- a/temporalio/contrib/opentelemetry/_otel_interceptor.py +++ b/temporalio/contrib/opentelemetry/_otel_interceptor.py @@ -319,54 +319,6 @@ async def start_activity( input.headers = _context_to_headers(input.headers) return await super().start_activity(input) - async def cancel_activity( - self, input: temporalio.client.CancelActivityInput - ) -> None: - with _maybe_span( - get_tracer(__name__), - "CancelActivity", - add_temporal_spans=self._add_temporal_spans, - attributes={"temporalActivityID": input.activity_id}, - kind=opentelemetry.trace.SpanKind.CLIENT, - ): - return await super().cancel_activity(input) - - async def terminate_activity( - self, input: temporalio.client.TerminateActivityInput - ) -> None: - with _maybe_span( - get_tracer(__name__), - "TerminateActivity", - add_temporal_spans=self._add_temporal_spans, - attributes={"temporalActivityID": input.activity_id}, - kind=opentelemetry.trace.SpanKind.CLIENT, - ): - return await super().terminate_activity(input) - - async def describe_activity( - self, input: temporalio.client.DescribeActivityInput - ) -> temporalio.client.ActivityExecutionDescription: - with _maybe_span( - get_tracer(__name__), - "DescribeActivity", - add_temporal_spans=self._add_temporal_spans, - attributes={"temporalActivityID": input.activity_id}, - kind=opentelemetry.trace.SpanKind.CLIENT, - ): - return await super().describe_activity(input) - - async def count_activities( - self, input: temporalio.client.CountActivitiesInput - ) -> temporalio.client.ActivityExecutionCount: - with _maybe_span( - get_tracer(__name__), - "CountActivities", - add_temporal_spans=self._add_temporal_spans, - attributes={}, - kind=opentelemetry.trace.SpanKind.CLIENT, - ): - return await super().count_activities(input) - class _TracingActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor): def __init__( diff --git a/tests/contrib/opentelemetry/test_opentelemetry.py b/tests/contrib/opentelemetry/test_opentelemetry.py index 2dd17e303..94bb3fda5 100644 --- a/tests/contrib/opentelemetry/test_opentelemetry.py +++ b/tests/contrib/opentelemetry/test_opentelemetry.py @@ -960,6 +960,7 @@ async def test_opentelemetry_standalone_activity_tracing( client = Client(**client_config) task_queue = f"task_queue_{uuid.uuid4()}" + activity_id = f"activity_{uuid.uuid4()}" async with Worker( client, task_queue=task_queue, @@ -968,44 +969,23 @@ async def test_opentelemetry_standalone_activity_tracing( handle = await client.start_activity( tracing_activity, TracingActivityParam(heartbeat=False), - id=f"activity_{uuid.uuid4()}", + id=activity_id, task_queue=task_queue, schedule_to_close_timeout=timedelta(seconds=10), ) await handle.result() - # Use a queue with no worker so activities stay in SCHEDULED state, - # allowing describe/cancel/terminate to be called without a race. - no_worker_queue = f"task_queue_{uuid.uuid4()}" - - cancel_handle = await client.start_activity( - tracing_activity, - TracingActivityParam(heartbeat=False), - id=f"activity_{uuid.uuid4()}", - task_queue=no_worker_queue, - schedule_to_close_timeout=timedelta(seconds=30), - ) - await cancel_handle.describe() - await cancel_handle.cancel() - - terminate_handle = await client.start_activity( - tracing_activity, - TracingActivityParam(heartbeat=False), - id=f"activity_{uuid.uuid4()}", - task_queue=no_worker_queue, - schedule_to_close_timeout=timedelta(seconds=30), - ) - await terminate_handle.terminate() - - assert dump_spans(exporter.get_finished_spans(), with_attributes=False) == [ + finished_spans = exporter.get_finished_spans() + assert dump_spans(finished_spans, with_attributes=False) == [ "StartActivity:tracing_activity", " RunActivity:tracing_activity", - "StartActivity:tracing_activity", - "DescribeActivity", - "CancelActivity", - "StartActivity:tracing_activity", - "TerminateActivity", ] + start_activity_span = next( + s for s in finished_spans if s.name == "StartActivity:tracing_activity" + ) + assert start_activity_span.attributes is not None + assert start_activity_span.attributes["temporalActivityID"] == activity_id + assert start_activity_span.attributes["temporalActivityType"] == "tracing_activity" def test_opentelemetry_safe_detach(): diff --git a/tests/contrib/opentelemetry/test_opentelemetry_plugin.py b/tests/contrib/opentelemetry/test_opentelemetry_plugin.py index 29acf0b6a..06ad330ed 100644 --- a/tests/contrib/opentelemetry/test_opentelemetry_plugin.py +++ b/tests/contrib/opentelemetry/test_opentelemetry_plugin.py @@ -585,49 +585,37 @@ async def test_otel_standalone_activity_tracing( new_config["plugins"] = [OpenTelemetryPlugin(add_temporal_spans=True)] new_client = Client(**new_config) + activity_id = f"activity_{uuid.uuid4()}" async with new_worker( new_client, activities=[simple_no_context_activity], ) as worker: handle = await new_client.start_activity( simple_no_context_activity, - id=f"activity_{uuid.uuid4()}", + id=activity_id, task_queue=worker.task_queue, schedule_to_close_timeout=timedelta(seconds=10), ) await handle.result() - # Use a queue with no worker so activities stay in SCHEDULED state, - # allowing describe/cancel/terminate to be called without a race. - no_worker_queue = f"task_queue_{uuid.uuid4()}" - - cancel_handle = await new_client.start_activity( - simple_no_context_activity, - id=f"activity_{uuid.uuid4()}", - task_queue=no_worker_queue, - schedule_to_close_timeout=timedelta(seconds=30), - ) - await cancel_handle.describe() - await cancel_handle.cancel() - - terminate_handle = await new_client.start_activity( - simple_no_context_activity, - id=f"activity_{uuid.uuid4()}", - task_queue=no_worker_queue, - schedule_to_close_timeout=timedelta(seconds=30), - ) - await terminate_handle.terminate() - - assert dump_spans(exporter.get_finished_spans(), with_attributes=False) == [ + finished_spans = exporter.get_finished_spans() + assert dump_spans(finished_spans, with_attributes=False) == [ "StartActivity:simple_no_context_activity", " RunActivity:simple_no_context_activity", " Activity", - "StartActivity:simple_no_context_activity", - "DescribeActivity", - "CancelActivity", - "StartActivity:simple_no_context_activity", - "TerminateActivity", ] + start_activity_span = next( + s + for s in finished_spans + if s.name == "StartActivity:simple_no_context_activity" + and s.attributes is not None + and s.attributes.get("temporalActivityID") == activity_id + ) + assert start_activity_span.attributes is not None + assert ( + start_activity_span.attributes["temporalActivityType"] + == "simple_no_context_activity" + ) def test_replay_safe_span_delegates_extra_attributes():