Add First-Class Gemini SDK Integration to Contrib#1378
Add First-Class Gemini SDK Integration to Contrib#1378JasonSteving99 wants to merge 3 commits intomainfrom
Conversation
500ab1e to
b770922
Compare
|
Semgrep found 1 Risk: Affected versions of litellm are vulnerable to Exposure of Sensitive Information to an Unauthorized Actor / Use of Password Hash With Insufficient Computational Effort / Use of a Broken or Risky Cryptographic Algorithm. LiteLLM exposes password hashes via authenticated API endpoints and its login flow accepts a stored SHA-256 hash as a valid password, allowing an authenticated user to steal another user's hash and log in as that user, resulting in full authentication bypass and privilege escalation. Fix: Upgrade this library to at least version 1.83.0 at sdk-python/uv.lock:2102. Reference(s): GHSA-69x8-hrgq-fjj8 Semgrep found 1 Risk: Affected versions of litellm are vulnerable to Incorrect Authorization. This vulnerability stems from inadequate enforcement of access control policies, allowing authenticated users to perform actions beyond their intended privilege level and potentially alter sensitive system configurations or access restricted resources. Fix: Upgrade this library to at least version 1.83.0 at sdk-python/uv.lock:2102. Reference(s): GHSA-53mr-6c8q-9789, CVE-2026-35029 Semgrep found 1 Risk: Affected versions of google-adk are vulnerable to Missing Authentication for Critical Function. Google Agent Development Kit (ADK) contains a missing authentication flaw that can let an unauthenticated remote attacker reach code-injection paths and execute arbitrary code on the server running the ADK instance, including ADK Web deployments on Python, Cloud Run, and GKE. Fix: Upgrade this library to at least version 1.28.1 at sdk-python/uv.lock:982. Reference(s): GHSA-rg7c-g689-fr3x, CVE-2026-4810 |
19b495c to
6763054
Compare
# Temporal Integration for the Google Gemini SDK
This adds a first-class integration that lets users call the Gemini SDK's `AsyncClient` directly from within Temporal workflows. Every API call and tool invocation becomes a durable Temporal activity — giving full crash recovery, visibility in workflow event history, and replay safety — while keeping credentials entirely on the worker side.
## How it works
The integration shims three layers of the Gemini SDK so that workflows can use `client.models`, `client.files`, `client.file_search_stores`, `client.chats`, and all other SDK modules naturally:
### `TemporalApiClient` (`_temporal_api_client.py`)
A `BaseApiClient` subclass that replaces the SDK's HTTP layer. Instead of making network calls, `async_request` and `async_request_streamed` serialize the request and dispatch it through `workflow.execute_activity`. The real HTTP call happens inside the activity on the worker, where the actual `genai.Client` with real credentials lives. Sync methods raise immediately. Per-request `http_options` are validated (non-serializable fields like `httpx_client` are rejected), and `timeout` is mapped to Temporal's `start_to_close_timeout`.
### `TemporalAsyncFiles` / `TemporalAsyncFileSearchStores` (`_temporal_files.py`, `_temporal_file_search_stores.py`)
Subclasses of `AsyncFiles` and `AsyncFileSearchStores` that override `upload`, `download`, `register_files`, and `upload_to_file_search_store` to dispatch the entire operation as a Temporal activity. This avoids filesystem access (`os` module) and credential token refresh in the workflow sandbox. Methods like `get`, `delete`, `list` are inherited and work through the `TemporalApiClient`'s `async_request` activity. File uploads accept `str` paths (resolved on the worker), `os.PathLike`, or `io.IOBase` (bytes serialized across the activity boundary).
### `TemporalAsyncClient` (`_temporal_async_client.py`)
An `AsyncClient` subclass that wires in `TemporalAsyncFiles` and `TemporalAsyncFileSearchStores`. All other SDK modules (`models`, `tunings`, `caches`, `batches`, `live`, `tokens`, `operations`) are inherited unchanged since they only use `async_request` under the hood.
### `GeminiPlugin` (`_gemini_plugin.py`)
A `SimplePlugin` that registers all activities, configures the Pydantic data converter, and passes `google.genai` through the workflow sandbox. Users pass a fully configured `genai.Client` — the plugin never constructs one itself. An optional `extra_credentials` parameter supports operations like `register_files` that need separate GCS credentials.
### `activity_as_tool` (`workflow.py`)
Wraps any `@activity.defn` function so it looks like a regular async callable to Gemini's automatic function calling (AFC). When the model decides to call the tool, the SDK invokes the wrapper, which dispatches through `workflow.execute_activity`. Users can also pass plain workflow methods directly as tools — these run in-workflow without an activity.
### Batched streaming
`generate_content_stream` is supported via a batched approach: the `async_request_streamed` activity collects all chunks from the real streaming response and returns them as a list. The workflow-side `TemporalApiClient` yields them back as an async generator so the SDK sees the expected interface.
## Usage
```python
# Worker side
client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])
plugin = GeminiPlugin(client)
# Workflow side
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self, query: str) -> str:
client = gemini_client()
response = await client.models.generate_content(
model="gemini-2.5-flash",
contents=query,
config=types.GenerateContentConfig(
tools=[activity_as_tool(my_tool)],
),
)
return response.text
```
## Testing
31 integration tests covering:
- Basic `generate_content` and multi-chunk streaming
- AFC tool calling (single-arg, multi-arg, workflow methods, sequential multi-tool, failure propagation)
- Per-request `http_options` propagation (headers, api_version, base_url)
- File upload via str path and `io.BytesIO`, file download
- File search store upload
- Multi-turn chat via `client.chats`
- `TemporalAsyncClient` wiring verification
- `TemporalApiClient` error paths (sync raises, low-level upload/download raises)
- `activity_as_tool` validation and signature preservation
- A full end-to-end integration test that exercises all real activity implementations (generate, stream, file upload, download, store upload, RAG query, store delete) with a mocked `genai.Client` — ensuring the actual activity code in `_gemini_activity.py` is covered, not just the workflow-side shims.
6763054 to
bf58318
Compare
|
|
||
| This integration lets you use the Gemini SDK's async client with full | ||
| automatic function calling (AFC) support, where every API call and every | ||
| tool invocation is a **durable Temporal activity**. |
There was a problem hiding this comment.
Can we run deterministic tools in the workflow?
There was a problem hiding this comment.
Users can also pass plain workflow methods directly as tools — these run in-workflow without an activity.
Mention this here?
|
|
||
| Quickstart:: | ||
|
|
||
| # ---- worker setup (outside sandbox) ---- |
There was a problem hiding this comment.
Can we specify "Temporal Python Sandbox" here?
| def __init__( | ||
| self, | ||
| client: GeminiClient, | ||
| credentials: google.auth.credentials.Credentials | None = None, |
There was a problem hiding this comment.
Are these optional because you can use environment variables instead? Or can you actually use Gemini SDK without credentials?
| class AgentWorkflow: | ||
| @workflow.run | ||
| async def run(self, query: str) -> str: | ||
| client = gemini_client() |
There was a problem hiding this comment.
What would happen if someone forgot to do this and used the client defined outside of the workflow instead? This seems like a common pitfall across a lot of our plugins...
There was a problem hiding this comment.
Also is there any use case for multiple clients? What if we had two clients and two API keys?
| def activities(self) -> Sequence[Callable]: | ||
| """Return activities that route SDK calls through this client.""" | ||
|
|
||
| @activity.defn(name="gemini_api_client_async_request") |
There was a problem hiding this comment.
Does Temporal default to this as the name if it isn't passed?
| No credentials are passed to or from the workflow. Auth material never | ||
| appears in Temporal's event history. | ||
|
|
||
| Example (Gemini Developer API):: |
There was a problem hiding this comment.
Is this really how we format our docs? I wonder if the AI was just copying from the .. warning:: above.
| Example (Gemini Developer API):: | |
| Example (Gemini Developer API): |
There was a problem hiding this comment.
(two more instances below)
| @@ -0,0 +1,1301 @@ | |||
| """Integration tests for the Google Gemini SDK Temporal integration. | |||
|
|
|||
| Tests cover: | |||
There was a problem hiding this comment.
One file per bullet point?
| { name = "typing-extensions", specifier = ">=4.2.0,<5" }, | ||
| ] | ||
| provides-extras = ["grpc", "opentelemetry", "pydantic", "openai-agents", "google-adk", "langgraph", "langsmith", "lambda-worker-otel", "aioboto3"] | ||
| provides-extras = ["grpc", "opentelemetry", "pydantic", "openai-agents", "google-adk", "langgraph", "langsmith", "lambda-worker-otel", "aioboto3", "google-gemini"] |
There was a problem hiding this comment.
Separate with newlines and alphabetize? This list is going to get long.
There was a problem hiding this comment.
Pull request overview
Adds a first-class Temporal contrib integration for the Google Gemini (google-genai) SDK so workflows can use an AsyncClient while executing all SDK HTTP calls and tool invocations as durable Temporal activities (credentials stay on the worker).
Changes:
- Introduces
temporalio.contrib.google_gemini_sdkwith a worker plugin, workflow helpers, activity implementations, and serializable request/response models. - Adds Temporal-aware shims for Gemini file operations and streamed responses.
- Adds a comprehensive integration test suite and wires the optional
google-genaidependency via a new extra.
Reviewed changes
Copilot reviewed 13 out of 14 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| uv.lock | Adds google-genai dependency via a new google-gemini extra and updates lock metadata. |
| pyproject.toml | Defines google-gemini optional dependency group (google-genai>=1.66.0). |
| tests/contrib/google_gemini_sdk/init.py | Test package marker for the new contrib integration. |
| tests/contrib/google_gemini_sdk/test_gemini.py | Integration + unit tests for API calls, streaming, file ops, tool calling, and plugin wiring. |
| temporalio/contrib/google_gemini_sdk/init.py | Public module entry points and docs for GeminiPlugin, gemini_client, and activity_as_tool. |
| temporalio/contrib/google_gemini_sdk/workflow.py | Workflow-side helpers: gemini_client() and activity_as_tool() wrapper. |
| temporalio/contrib/google_gemini_sdk/justfile | Local dev commands/examples for running workers and sample workflows. |
| temporalio/contrib/google_gemini_sdk/_models.py | Pydantic models that cross the activity boundary (requests/responses/options). |
| temporalio/contrib/google_gemini_sdk/_temporal_api_client.py | Workflow-side BaseApiClient shim that routes SDK HTTP calls through activities. |
| temporalio/contrib/google_gemini_sdk/_temporal_async_client.py | AsyncClient subclass wiring Temporal-aware files and file_search_stores. |
| temporalio/contrib/google_gemini_sdk/_temporal_files.py | Temporal-aware AsyncFiles overriding upload/download/register to run on the worker. |
| temporalio/contrib/google_gemini_sdk/_temporal_file_search_stores.py | Temporal-aware AsyncFileSearchStores overriding upload to run on the worker. |
| temporalio/contrib/google_gemini_sdk/_gemini_activity.py | Worker-side activities that execute real SDK calls using the provided genai.Client. |
| temporalio/contrib/google_gemini_sdk/_gemini_plugin.py | GeminiPlugin that registers activities, configures the data converter, and sandbox passthrough. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| _validate_http_options(upload_config.http_options) | ||
|
|
||
| if isinstance(file, io.IOBase): | ||
| req = _GeminiUploadFileRequest(file_bytes=file.read(), config=upload_config) |
There was a problem hiding this comment.
When file is an io.IOBase, file.read() may return str for text streams (e.g. io.StringIO/TextIOBase), but _GeminiUploadFileRequest.file_bytes is typed as bytes and the worker-side activity expects bytes. Consider validating that read() returned bytes (or restricting accepted types to binary streams) and raising a clear error if not.
| req = _GeminiUploadFileRequest(file_bytes=file.read(), config=upload_config) | |
| file_bytes = file.read() | |
| if not isinstance(file_bytes, bytes): | |
| raise TypeError( | |
| "file must be a binary stream when passing an io.IOBase; " | |
| "file.read() must return bytes" | |
| ) | |
| req = _GeminiUploadFileRequest( | |
| file_bytes=file_bytes, config=upload_config | |
| ) |
| self._activity_config = activity_config or ActivityConfig( | ||
| start_to_close_timeout=timedelta(seconds=60), |
There was a problem hiding this comment.
activity_config or ActivityConfig(...) will override an explicitly provided empty config dict ({}) with the default timeout. Use if activity_config is None to decide when to apply defaults so callers can intentionally provide an empty/partial config without it being replaced.
| self._activity_config = activity_config or ActivityConfig( | |
| start_to_close_timeout=timedelta(seconds=60), | |
| self._activity_config = ( | |
| ActivityConfig(start_to_close_timeout=timedelta(seconds=60)) | |
| if activity_config is None | |
| else activity_config |
| if isinstance(file, io.IOBase): | ||
| req = _GeminiUploadToFileSearchStoreRequest( | ||
| file_search_store_name=file_search_store_name, | ||
| file_bytes=file.read(), | ||
| config=upload_config, | ||
| ) |
There was a problem hiding this comment.
When file is an io.IOBase, file.read() may return str for text streams, but _GeminiUploadToFileSearchStoreRequest.file_bytes is typed as bytes and the activity constructs a BytesIO. Validate that read() returned bytes (or restrict to binary streams) and raise a clear error if not.
| self._activity_config = activity_config or ActivityConfig( | ||
| start_to_close_timeout=__import__("datetime").timedelta(seconds=60), |
There was a problem hiding this comment.
activity_config or ActivityConfig(...) will override an explicitly provided empty config dict ({}) with the default timeout. Use if activity_config is None to decide when to apply defaults so callers can intentionally provide an empty/partial config without it being replaced.
| self._activity_config = activity_config or ActivityConfig( | |
| start_to_close_timeout=__import__("datetime").timedelta(seconds=60), | |
| self._activity_config = ( | |
| ActivityConfig(start_to_close_timeout=timedelta(seconds=60)) | |
| if activity_config is None | |
| else activity_config |
| self.custom_base_url: str | None = None | ||
|
|
||
| self._activity_config = activity_config or ActivityConfig( | ||
| start_to_close_timeout=__import__("datetime").timedelta(seconds=60), |
There was a problem hiding this comment.
__import__("datetime").timedelta(...) is unnecessary here since timedelta is already imported at the top of the module. Using the direct timedelta(...) call would be clearer and avoids surprising dynamic imports in workflow code.
| start_to_close_timeout=__import__("datetime").timedelta(seconds=60), | |
| start_to_close_timeout=timedelta(seconds=60), |
| tools=[ | ||
| activity_as_tool( | ||
| get_weather, | ||
| start_to_close_timeout=timedelta(seconds=30), |
There was a problem hiding this comment.
The quickstart example passes start_to_close_timeout=... directly to activity_as_tool, but the function signature only accepts activity_config. Update the example to pass an ActivityConfig (or adjust the API) so the documented usage matches the actual callable signature.
| start_to_close_timeout=timedelta(seconds=30), | |
| activity_config=ActivityConfig( | |
| start_to_close_timeout=timedelta(seconds=30), | |
| ), |
| config: ActivityConfig = { | ||
| **( | ||
| activity_config | ||
| or ActivityConfig( | ||
| start_to_close_timeout=timedelta(seconds=30), | ||
| ) | ||
| ) | ||
| } |
There was a problem hiding this comment.
activity_config or ActivityConfig(...) treats an empty dict as falsy and silently replaces it with defaults. If a caller intentionally passes {} (or a config that becomes empty after filtering), their choice is lost. Prefer an explicit if activity_config is None check when applying defaults.
| self._activity_config = activity_config or ActivityConfig( | ||
| start_to_close_timeout=timedelta(seconds=60), | ||
| ) |
There was a problem hiding this comment.
activity_config or ActivityConfig(...) will override an explicitly provided empty config dict ({}) with the default timeout. Use if activity_config is None to decide when to apply defaults so callers can intentionally provide an empty/partial config without it being replaced.
| self._activity_config = activity_config or ActivityConfig( | |
| start_to_close_timeout=timedelta(seconds=60), | |
| ) | |
| if activity_config is None: | |
| self._activity_config = ActivityConfig( | |
| start_to_close_timeout=timedelta(seconds=60), | |
| ) | |
| else: | |
| self._activity_config = activity_config |
Temporal Integration for the Google Gemini SDK
This adds a first-class integration that lets users call the Gemini SDK's
AsyncClientdirectly from within Temporal workflows. Every API call and tool invocation becomes a durable Temporal activity — giving full crash recovery, visibility in workflow event history, and replay safety — while keeping credentials entirely on the worker side.How it works
The integration shims three layers of the Gemini SDK so that workflows can use
client.models,client.files,client.file_search_stores,client.chats, and all other SDK modules naturally:TemporalApiClient(_temporal_api_client.py)A
BaseApiClientsubclass that replaces the SDK's HTTP layer. Instead of making network calls,async_requestandasync_request_streamedserialize the request and dispatch it throughworkflow.execute_activity. The real HTTP call happens inside the activity on the worker, where the actualgenai.Clientwith real credentials lives. Sync methods raise immediately. Per-requesthttp_optionsare validated (non-serializable fields likehttpx_clientare rejected), andtimeoutis mapped to Temporal'sstart_to_close_timeout.TemporalAsyncFiles/TemporalAsyncFileSearchStores(_temporal_files.py,_temporal_file_search_stores.py)Subclasses of
AsyncFilesandAsyncFileSearchStoresthat overrideupload,download,register_files, andupload_to_file_search_storeto dispatch the entire operation as a Temporal activity. This avoids filesystem access (osmodule) and credential token refresh in the workflow sandbox. Methods likeget,delete,listare inherited and work through theTemporalApiClient'sasync_requestactivity. File uploads acceptstrpaths (resolved on the worker),os.PathLike, orio.IOBase(bytes serialized across the activity boundary).TemporalAsyncClient(_temporal_async_client.py)An
AsyncClientsubclass that wires inTemporalAsyncFilesandTemporalAsyncFileSearchStores. All other SDK modules (models,tunings,caches,batches,live,tokens,operations) are inherited unchanged since they only useasync_requestunder the hood.GeminiPlugin(_gemini_plugin.py)A
SimplePluginthat registers all activities, configures the Pydantic data converter, and passesgoogle.genaithrough the workflow sandbox. Users pass a fully configuredgenai.Client— the plugin never constructs one itself. An optionalextra_credentialsparameter supports operations likeregister_filesthat need separate GCS credentials.activity_as_tool(workflow.py)Wraps any
@activity.defnfunction so it looks like a regular async callable to Gemini's automatic function calling (AFC). When the model decides to call the tool, the SDK invokes the wrapper, which dispatches throughworkflow.execute_activity. Users can also pass plain workflow methods directly as tools — these run in-workflow without an activity.Batched streaming
generate_content_streamis supported via a batched approach: theasync_request_streamedactivity collects all chunks from the real streaming response and returns them as a list. The workflow-sideTemporalApiClientyields them back as an async generator so the SDK sees the expected interface.Usage
Testing
31 integration tests covering:
generate_contentand multi-chunk streaminghttp_optionspropagation (headers, api_version, base_url)io.BytesIO, file downloadclient.chatsTemporalAsyncClientwiring verificationTemporalApiClienterror paths (sync raises, low-level upload/download raises)activity_as_toolvalidation and signature preservationgenai.Client— ensuring the actual activity code in_gemini_activity.pyis covered, not just the workflow-side shims.