Interceptors - Python SDK
Interceptors are SDK hooks that let you intercept inbound and outbound Temporal calls. You use them to add common behavior across many calls, such as tracing and context propagation, before calls reach the SDK's underlying implementation. This is similar to using middleware in web frameworks such as Django, Starlette, and Flask.
The methods you implement on your Interceptor classes can perform side effects and modify incoming or outgoing data.
There are five categories of inbound and outbound calls that you can modify in this way:
| Outbound Client calls | Inbound Workflow calls | Outbound Workflow calls | Inbound Activity calls | Outbound Activity calls |
|---|---|---|---|---|
| Start workflow, signal workflow, list workflows, update schedule | Execute workflow (handles a Workflow Task that starts a new Workflow Execution), handle query, handle signal, handle update handler, handle update validator | Start activity, start child workflow, signal child workflow, signal external workflow, start Nexus operation, start local activity | Execute activity (this is the only inbound Activity call) | Info, heartbeat |
This is not an exhaustive list; refer to the Python SDK methods for details.
The first of these categories is a Client call, and the remaining 4 are Worker calls.
Register an Interceptor
Registering an interceptor means supplying an interceptor instance to the SDK so Temporal can invoke it when matching Client or Worker calls occur. Once registered, the interceptor runs as part of the call path and can observe or modify request and response data.
Register on the Client
Pass interceptors in the interceptors argument of Client.connect(). Client interceptors modify outbound calls such
as starting and signaling Workflows.
client = await Client.connect(
"localhost:7233",
interceptors=[TracingInterceptor()],
)
The interceptors list can contain multiple interceptors. In this case they form a chain: a method implemented on an
interceptor instance in the list can perform side effects, and modify the data, before passing it on to the
corresponding method on the next interceptor in the list.
Register via a Plugin
If you're building a reusable library or want to bundle interceptors with other primitives, you can register them through a Plugin.
Register on the Worker only
If your interceptor doesn't affect the Client, you can pass interceptors in the interceptors argument of Worker().
Worker interceptors modify inbound and outbound Workflow and Activity calls.
worker = Worker(
client,
task_queue="my-task-queue",
interceptors=[SomeWorkerInterceptor()],
# ...
)
If your interceptor class inherits from both client.Interceptor and worker.Interceptor, pass it to
Client.connect() rather than the Worker() constructor. The Worker will use interceptors from its underlying Client
automatically.
Client call Interceptors
To modify outbound Client calls, define a class inheriting from
client.Interceptor, and implement the method
intercept_client() to return an instance of
OutboundInterceptor that implements the
subset of outbound Client calls that you wish to modify.
This example implements an Interceptor on outbound Client calls that sets a certain key in the outbound headers field.
A User ID is context-propagated by being sent in a header field with outbound requests:
class ContextPropagationInterceptor(
temporalio.client.Interceptor, temporalio.worker.Interceptor
):
def __init__(
self,
payload_converter: temporalio.converter.PayloadConverter = temporalio.converter.default().payload_converter,
) -> None:
self._payload_converter = payload_converter
def intercept_client(
self, next: temporalio.client.OutboundInterceptor
) -> temporalio.client.OutboundInterceptor:
return _ContextPropagationClientOutboundInterceptor(
next, self._payload_converter
)
def set_header_from_context(
input: _InputWithHeaders, payload_converter: temporalio.converter.PayloadConverter
) -> None:
user_id_val = user_id.get()
if user_id_val:
input.headers = {
**input.headers,
HEADER_KEY: payload_converter.to_payload(user_id_val),
}
class _ContextPropagationClientOutboundInterceptor(
temporalio.client.OutboundInterceptor
):
def __init__(
self,
next: temporalio.client.OutboundInterceptor,
payload_converter: temporalio.converter.PayloadConverter,
) -> None:
super().__init__(next)
self._payload_converter = payload_converter
async def start_workflow(
self, input: temporalio.client.StartWorkflowInput
) -> temporalio.client.WorkflowHandle[Any, Any]:
set_header_from_context(input, self._payload_converter)
return await super().start_workflow(input)
It often happens that your Worker and Client interceptors will share code because they implement closely related logic.
In the Python SDK, you will typically want to create an interceptor class that inherits from both client.Interceptor
and worker.Interceptor as above, since their method sets do not overlap.
You can then register this interceptor in your client/starter code.
Your interceptor classes need not implement every method; the default implementation is always to pass the data on to the next method in the interceptor chain. During execution, when the SDK encounters an Inbound Activity call, it will look to the first Interceptor instance, get hold of the appropriate intercepted method, and call it. The intercepted method will perform its function then call the same method on the next Interceptor in the chain. At the end of the chain the SDK will call the "real" SDK method.
Worker call Interceptors
To modify inbound and outbound Workflow and Activity calls, define a class inheriting from worker.Interceptor. This is
an interface with two methods named intercept_activity and workflow_interceptor_class, which you can use to
configure interceptions of Activity and Workflow calls, respectively. intercept_activity returns an
ActivityInboundInterceptor.
This example demonstrates using an interceptor to measure Schedule-To-Start latency:
from temporalio.worker import (
ActivityInboundInterceptor,
ExecuteActivityInput,
Interceptor,
Worker,
)
class SimpleWorkerInterceptor(Interceptor):
def intercept_activity(
self, next: ActivityInboundInterceptor
) -> ActivityInboundInterceptor:
return CustomScheduleToStartInterceptor(next)
class CustomScheduleToStartInterceptor(ActivityInboundInterceptor):
async def execute_activity(self, input: ExecuteActivityInput):
schedule_to_start = (
activity.info().started_time
- activity.info().current_attempt_scheduled_time
)
meter = activity.metric_meter()
histogram = meter.create_histogram_timedelta(
"custom_activity_schedule_to_start_latency",
description="Time between activity scheduling and start",
unit="duration",
)
histogram.record(
schedule_to_start, {"workflow_type": activity.info().workflow_type}
)
return await self.next.execute_activity(input)
client = await Client.connect(
"localhost:7233",
)
worker = Worker(
client,
interceptors=[SimpleWorkerInterceptor()],
# ...
)
The workflow_interceptor_class returns a WorkflowInboundInterceptor that works similarly to
ActivityInboundInterceptor.