Skip to content

Add First-Class Gemini SDK Integration to Contrib#1378

Open
JasonSteving99 wants to merge 3 commits intomainfrom
jason-experiment-gemini-sdk-integration
Open

Add First-Class Gemini SDK Integration to Contrib#1378
JasonSteving99 wants to merge 3 commits intomainfrom
jason-experiment-gemini-sdk-integration

Conversation

@JasonSteving99
Copy link
Copy Markdown
Contributor

@JasonSteving99 JasonSteving99 commented Mar 18, 2026

Temporal Integration for the Google Gemini SDK

This adds a first-class integration that lets users call the Gemini SDK's AsyncClient directly from within Temporal workflows. Every API call and tool invocation becomes a durable Temporal activity — giving full crash recovery, visibility in workflow event history, and replay safety — while keeping credentials entirely on the worker side.

How it works

The integration shims three layers of the Gemini SDK so that workflows can use client.models, client.files, client.file_search_stores, client.chats, and all other SDK modules naturally:

TemporalApiClient (_temporal_api_client.py)

A BaseApiClient subclass that replaces the SDK's HTTP layer. Instead of making network calls, async_request and async_request_streamed serialize the request and dispatch it through workflow.execute_activity. The real HTTP call happens inside the activity on the worker, where the actual genai.Client with real credentials lives. Sync methods raise immediately. Per-request http_options are validated (non-serializable fields like httpx_client are rejected), and timeout is mapped to Temporal's start_to_close_timeout.

TemporalAsyncFiles / TemporalAsyncFileSearchStores (_temporal_files.py, _temporal_file_search_stores.py)

Subclasses of AsyncFiles and AsyncFileSearchStores that override upload, download, register_files, and upload_to_file_search_store to dispatch the entire operation as a Temporal activity. This avoids filesystem access (os module) and credential token refresh in the workflow sandbox. Methods like get, delete, list are inherited and work through the TemporalApiClient's async_request activity. File uploads accept str paths (resolved on the worker), os.PathLike, or io.IOBase (bytes serialized across the activity boundary).

TemporalAsyncClient (_temporal_async_client.py)

An AsyncClient subclass that wires in TemporalAsyncFiles and TemporalAsyncFileSearchStores. All other SDK modules (models, tunings, caches, batches, live, tokens, operations) are inherited unchanged since they only use async_request under the hood.

GeminiPlugin (_gemini_plugin.py)

A SimplePlugin that registers all activities, configures the Pydantic data converter, and passes google.genai through the workflow sandbox. Users pass a fully configured genai.Client — the plugin never constructs one itself. An optional extra_credentials parameter supports operations like register_files that need separate GCS credentials.

activity_as_tool (workflow.py)

Wraps any @activity.defn function so it looks like a regular async callable to Gemini's automatic function calling (AFC). When the model decides to call the tool, the SDK invokes the wrapper, which dispatches through workflow.execute_activity. Users can also pass plain workflow methods directly as tools — these run in-workflow without an activity.

Batched streaming

generate_content_stream is supported via a batched approach: the async_request_streamed activity collects all chunks from the real streaming response and returns them as a list. The workflow-side TemporalApiClient yields them back as an async generator so the SDK sees the expected interface.

Usage

# Worker side
client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])
plugin = GeminiPlugin(client)

# Workflow side
@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self, query: str) -> str:
        client = gemini_client()
        response = await client.models.generate_content(
            model="gemini-2.5-flash",
            contents=query,
            config=types.GenerateContentConfig(
                tools=[activity_as_tool(my_tool)],
            ),
        )
        return response.text

Testing

31 integration tests covering:

  • Basic generate_content and multi-chunk streaming
  • AFC tool calling (single-arg, multi-arg, workflow methods, sequential multi-tool, failure propagation)
  • Per-request http_options propagation (headers, api_version, base_url)
  • File upload via str path and io.BytesIO, file download
  • File search store upload
  • Multi-turn chat via client.chats
  • TemporalAsyncClient wiring verification
  • TemporalApiClient error paths (sync raises, low-level upload/download raises)
  • activity_as_tool validation and signature preservation
  • A full end-to-end integration test that exercises all real activity implementations (generate, stream, file upload, download, store upload, RAG query, store delete) with a mocked genai.Client — ensuring the actual activity code in _gemini_activity.py is covered, not just the workflow-side shims.

@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Mar 18, 2026

CLA assistant check
All committers have signed the CLA.

Comment thread temporalio/contrib/google_gemini_sdk/_client_store.py Outdated
Comment thread temporalio/contrib/google_gemini_sdk/__init__.py Outdated
Comment thread temporalio/contrib/google_gemini_sdk/_sensitive_fields_codec.py Outdated
Comment thread temporalio/contrib/google_gemini_sdk/justfile
Comment thread temporalio/contrib/google_gemini_sdk/first_class_example/start_workflow.py Outdated
Comment thread temporalio/contrib/google_gemini_sdk/_gemini_plugin.py Outdated
Comment thread temporalio/contrib/google_gemini_sdk/_gemini_plugin.py Outdated
Comment thread temporalio/contrib/google_gemini_sdk/_gemini_plugin.py Outdated
@JasonSteving99 JasonSteving99 force-pushed the jason-experiment-gemini-sdk-integration branch from 500ab1e to b770922 Compare April 28, 2026 22:06
@semgrep-managed-scans
Copy link
Copy Markdown

Semgrep found 1 ssc-80bc2470-4c90-4cf6-811e-5a265b8f01d5 finding:

Risk: Affected versions of litellm are vulnerable to Exposure of Sensitive Information to an Unauthorized Actor / Use of Password Hash With Insufficient Computational Effort / Use of a Broken or Risky Cryptographic Algorithm. LiteLLM exposes password hashes via authenticated API endpoints and its login flow accepts a stored SHA-256 hash as a valid password, allowing an authenticated user to steal another user's hash and log in as that user, resulting in full authentication bypass and privilege escalation.

Fix: Upgrade this library to at least version 1.83.0 at sdk-python/uv.lock:2102.

Reference(s): GHSA-69x8-hrgq-fjj8

Semgrep found 1 ssc-9ebb5c4b-d829-4496-b42b-76424d99a7b1 finding:

Risk: Affected versions of litellm are vulnerable to Incorrect Authorization. This vulnerability stems from inadequate enforcement of access control policies, allowing authenticated users to perform actions beyond their intended privilege level and potentially alter sensitive system configurations or access restricted resources.

Fix: Upgrade this library to at least version 1.83.0 at sdk-python/uv.lock:2102.

Reference(s): GHSA-53mr-6c8q-9789, CVE-2026-35029

Semgrep found 1 ssc-4464d88f-a369-4669-9a83-d26a12248ef5 finding:

Risk: Affected versions of google-adk are vulnerable to Missing Authentication for Critical Function. Google Agent Development Kit (ADK) contains a missing authentication flaw that can let an unauthenticated remote attacker reach code-injection paths and execute arbitrary code on the server running the ADK instance, including ADK Web deployments on Python, Cloud Run, and GKE.

Fix: Upgrade this library to at least version 1.28.1 at sdk-python/uv.lock:982.

Reference(s): GHSA-rg7c-g689-fr3x, CVE-2026-4810

@JasonSteving99 JasonSteving99 force-pushed the jason-experiment-gemini-sdk-integration branch from 19b495c to 6763054 Compare April 29, 2026 00:34
# Temporal Integration for the Google Gemini SDK

This adds a first-class integration that lets users call the Gemini SDK's `AsyncClient` directly from within Temporal workflows. Every API call and tool invocation becomes a durable Temporal activity — giving full crash recovery, visibility in workflow event history, and replay safety — while keeping credentials entirely on the worker side.

## How it works

The integration shims three layers of the Gemini SDK so that workflows can use `client.models`, `client.files`, `client.file_search_stores`, `client.chats`, and all other SDK modules naturally:

### `TemporalApiClient` (`_temporal_api_client.py`)

A `BaseApiClient` subclass that replaces the SDK's HTTP layer. Instead of making network calls, `async_request` and `async_request_streamed` serialize the request and dispatch it through `workflow.execute_activity`. The real HTTP call happens inside the activity on the worker, where the actual `genai.Client` with real credentials lives. Sync methods raise immediately. Per-request `http_options` are validated (non-serializable fields like `httpx_client` are rejected), and `timeout` is mapped to Temporal's `start_to_close_timeout`.

### `TemporalAsyncFiles` / `TemporalAsyncFileSearchStores` (`_temporal_files.py`, `_temporal_file_search_stores.py`)

Subclasses of `AsyncFiles` and `AsyncFileSearchStores` that override `upload`, `download`, `register_files`, and `upload_to_file_search_store` to dispatch the entire operation as a Temporal activity. This avoids filesystem access (`os` module) and credential token refresh in the workflow sandbox. Methods like `get`, `delete`, `list` are inherited and work through the `TemporalApiClient`'s `async_request` activity. File uploads accept `str` paths (resolved on the worker), `os.PathLike`, or `io.IOBase` (bytes serialized across the activity boundary).

### `TemporalAsyncClient` (`_temporal_async_client.py`)

An `AsyncClient` subclass that wires in `TemporalAsyncFiles` and `TemporalAsyncFileSearchStores`. All other SDK modules (`models`, `tunings`, `caches`, `batches`, `live`, `tokens`, `operations`) are inherited unchanged since they only use `async_request` under the hood.

### `GeminiPlugin` (`_gemini_plugin.py`)

A `SimplePlugin` that registers all activities, configures the Pydantic data converter, and passes `google.genai` through the workflow sandbox. Users pass a fully configured `genai.Client` — the plugin never constructs one itself. An optional `extra_credentials` parameter supports operations like `register_files` that need separate GCS credentials.

### `activity_as_tool` (`workflow.py`)

Wraps any `@activity.defn` function so it looks like a regular async callable to Gemini's automatic function calling (AFC). When the model decides to call the tool, the SDK invokes the wrapper, which dispatches through `workflow.execute_activity`. Users can also pass plain workflow methods directly as tools — these run in-workflow without an activity.

### Batched streaming

`generate_content_stream` is supported via a batched approach: the `async_request_streamed` activity collects all chunks from the real streaming response and returns them as a list. The workflow-side `TemporalApiClient` yields them back as an async generator so the SDK sees the expected interface.

## Usage

```python
# Worker side
client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])
plugin = GeminiPlugin(client)

# Workflow side
@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self, query: str) -> str:
        client = gemini_client()
        response = await client.models.generate_content(
            model="gemini-2.5-flash",
            contents=query,
            config=types.GenerateContentConfig(
                tools=[activity_as_tool(my_tool)],
            ),
        )
        return response.text
```

## Testing

31 integration tests covering:
- Basic `generate_content` and multi-chunk streaming
- AFC tool calling (single-arg, multi-arg, workflow methods, sequential multi-tool, failure propagation)
- Per-request `http_options` propagation (headers, api_version, base_url)
- File upload via str path and `io.BytesIO`, file download
- File search store upload
- Multi-turn chat via `client.chats`
- `TemporalAsyncClient` wiring verification
- `TemporalApiClient` error paths (sync raises, low-level upload/download raises)
- `activity_as_tool` validation and signature preservation
- A full end-to-end integration test that exercises all real activity implementations (generate, stream, file upload, download, store upload, RAG query, store delete) with a mocked `genai.Client` — ensuring the actual activity code in `_gemini_activity.py` is covered, not just the workflow-side shims.
@JasonSteving99 JasonSteving99 force-pushed the jason-experiment-gemini-sdk-integration branch from 6763054 to bf58318 Compare April 29, 2026 00:42
@JasonSteving99 JasonSteving99 marked this pull request as ready for review April 29, 2026 19:48
@JasonSteving99 JasonSteving99 requested a review from a team as a code owner April 29, 2026 19:48

This integration lets you use the Gemini SDK's async client with full
automatic function calling (AFC) support, where every API call and every
tool invocation is a **durable Temporal activity**.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we run deterministic tools in the workflow?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users can also pass plain workflow methods directly as tools — these run in-workflow without an activity.

Mention this here?


Quickstart::

# ---- worker setup (outside sandbox) ----
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we specify "Temporal Python Sandbox" here?

def __init__(
self,
client: GeminiClient,
credentials: google.auth.credentials.Credentials | None = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these optional because you can use environment variables instead? Or can you actually use Gemini SDK without credentials?

class AgentWorkflow:
@workflow.run
async def run(self, query: str) -> str:
client = gemini_client()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if someone forgot to do this and used the client defined outside of the workflow instead? This seems like a common pitfall across a lot of our plugins...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also is there any use case for multiple clients? What if we had two clients and two API keys?

def activities(self) -> Sequence[Callable]:
"""Return activities that route SDK calls through this client."""

@activity.defn(name="gemini_api_client_async_request")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Temporal default to this as the name if it isn't passed?

No credentials are passed to or from the workflow. Auth material never
appears in Temporal's event history.

Example (Gemini Developer API)::
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really how we format our docs? I wonder if the AI was just copying from the .. warning:: above.

Suggested change
Example (Gemini Developer API)::
Example (Gemini Developer API):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(two more instances below)

@@ -0,0 +1,1301 @@
"""Integration tests for the Google Gemini SDK Temporal integration.

Tests cover:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One file per bullet point?

Comment thread uv.lock
{ name = "typing-extensions", specifier = ">=4.2.0,<5" },
]
provides-extras = ["grpc", "opentelemetry", "pydantic", "openai-agents", "google-adk", "langgraph", "langsmith", "lambda-worker-otel", "aioboto3"]
provides-extras = ["grpc", "opentelemetry", "pydantic", "openai-agents", "google-adk", "langgraph", "langsmith", "lambda-worker-otel", "aioboto3", "google-gemini"]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate with newlines and alphabetize? This list is going to get long.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a first-class Temporal contrib integration for the Google Gemini (google-genai) SDK so workflows can use an AsyncClient while executing all SDK HTTP calls and tool invocations as durable Temporal activities (credentials stay on the worker).

Changes:

  • Introduces temporalio.contrib.google_gemini_sdk with a worker plugin, workflow helpers, activity implementations, and serializable request/response models.
  • Adds Temporal-aware shims for Gemini file operations and streamed responses.
  • Adds a comprehensive integration test suite and wires the optional google-genai dependency via a new extra.

Reviewed changes

Copilot reviewed 13 out of 14 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
uv.lock Adds google-genai dependency via a new google-gemini extra and updates lock metadata.
pyproject.toml Defines google-gemini optional dependency group (google-genai>=1.66.0).
tests/contrib/google_gemini_sdk/init.py Test package marker for the new contrib integration.
tests/contrib/google_gemini_sdk/test_gemini.py Integration + unit tests for API calls, streaming, file ops, tool calling, and plugin wiring.
temporalio/contrib/google_gemini_sdk/init.py Public module entry points and docs for GeminiPlugin, gemini_client, and activity_as_tool.
temporalio/contrib/google_gemini_sdk/workflow.py Workflow-side helpers: gemini_client() and activity_as_tool() wrapper.
temporalio/contrib/google_gemini_sdk/justfile Local dev commands/examples for running workers and sample workflows.
temporalio/contrib/google_gemini_sdk/_models.py Pydantic models that cross the activity boundary (requests/responses/options).
temporalio/contrib/google_gemini_sdk/_temporal_api_client.py Workflow-side BaseApiClient shim that routes SDK HTTP calls through activities.
temporalio/contrib/google_gemini_sdk/_temporal_async_client.py AsyncClient subclass wiring Temporal-aware files and file_search_stores.
temporalio/contrib/google_gemini_sdk/_temporal_files.py Temporal-aware AsyncFiles overriding upload/download/register to run on the worker.
temporalio/contrib/google_gemini_sdk/_temporal_file_search_stores.py Temporal-aware AsyncFileSearchStores overriding upload to run on the worker.
temporalio/contrib/google_gemini_sdk/_gemini_activity.py Worker-side activities that execute real SDK calls using the provided genai.Client.
temporalio/contrib/google_gemini_sdk/_gemini_plugin.py GeminiPlugin that registers activities, configures the data converter, and sandbox passthrough.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

_validate_http_options(upload_config.http_options)

if isinstance(file, io.IOBase):
req = _GeminiUploadFileRequest(file_bytes=file.read(), config=upload_config)
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When file is an io.IOBase, file.read() may return str for text streams (e.g. io.StringIO/TextIOBase), but _GeminiUploadFileRequest.file_bytes is typed as bytes and the worker-side activity expects bytes. Consider validating that read() returned bytes (or restricting accepted types to binary streams) and raising a clear error if not.

Suggested change
req = _GeminiUploadFileRequest(file_bytes=file.read(), config=upload_config)
file_bytes = file.read()
if not isinstance(file_bytes, bytes):
raise TypeError(
"file must be a binary stream when passing an io.IOBase; "
"file.read() must return bytes"
)
req = _GeminiUploadFileRequest(
file_bytes=file_bytes, config=upload_config
)

Copilot uses AI. Check for mistakes.
Comment on lines +47 to +48
self._activity_config = activity_config or ActivityConfig(
start_to_close_timeout=timedelta(seconds=60),
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

activity_config or ActivityConfig(...) will override an explicitly provided empty config dict ({}) with the default timeout. Use if activity_config is None to decide when to apply defaults so callers can intentionally provide an empty/partial config without it being replaced.

Suggested change
self._activity_config = activity_config or ActivityConfig(
start_to_close_timeout=timedelta(seconds=60),
self._activity_config = (
ActivityConfig(start_to_close_timeout=timedelta(seconds=60))
if activity_config is None
else activity_config

Copilot uses AI. Check for mistakes.
Comment on lines +77 to +82
if isinstance(file, io.IOBase):
req = _GeminiUploadToFileSearchStoreRequest(
file_search_store_name=file_search_store_name,
file_bytes=file.read(),
config=upload_config,
)
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When file is an io.IOBase, file.read() may return str for text streams, but _GeminiUploadToFileSearchStoreRequest.file_bytes is typed as bytes and the activity constructs a BytesIO. Validate that read() returned bytes (or restrict to binary streams) and raise a clear error if not.

Copilot uses AI. Check for mistakes.
Comment on lines +90 to +91
self._activity_config = activity_config or ActivityConfig(
start_to_close_timeout=__import__("datetime").timedelta(seconds=60),
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

activity_config or ActivityConfig(...) will override an explicitly provided empty config dict ({}) with the default timeout. Use if activity_config is None to decide when to apply defaults so callers can intentionally provide an empty/partial config without it being replaced.

Suggested change
self._activity_config = activity_config or ActivityConfig(
start_to_close_timeout=__import__("datetime").timedelta(seconds=60),
self._activity_config = (
ActivityConfig(start_to_close_timeout=timedelta(seconds=60))
if activity_config is None
else activity_config

Copilot uses AI. Check for mistakes.
self.custom_base_url: str | None = None

self._activity_config = activity_config or ActivityConfig(
start_to_close_timeout=__import__("datetime").timedelta(seconds=60),
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__import__("datetime").timedelta(...) is unnecessary here since timedelta is already imported at the top of the module. Using the direct timedelta(...) call would be clearer and avoids surprising dynamic imports in workflow code.

Suggested change
start_to_close_timeout=__import__("datetime").timedelta(seconds=60),
start_to_close_timeout=timedelta(seconds=60),

Copilot uses AI. Check for mistakes.
tools=[
activity_as_tool(
get_weather,
start_to_close_timeout=timedelta(seconds=30),
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The quickstart example passes start_to_close_timeout=... directly to activity_as_tool, but the function signature only accepts activity_config. Update the example to pass an ActivityConfig (or adjust the API) so the documented usage matches the actual callable signature.

Suggested change
start_to_close_timeout=timedelta(seconds=30),
activity_config=ActivityConfig(
start_to_close_timeout=timedelta(seconds=30),
),

Copilot uses AI. Check for mistakes.
Comment on lines +80 to +87
config: ActivityConfig = {
**(
activity_config
or ActivityConfig(
start_to_close_timeout=timedelta(seconds=30),
)
)
}
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

activity_config or ActivityConfig(...) treats an empty dict as falsy and silently replaces it with defaults. If a caller intentionally passes {} (or a config that becomes empty after filtering), their choice is lost. Prefer an explicit if activity_config is None check when applying defaults.

Copilot uses AI. Check for mistakes.
Comment on lines +51 to +53
self._activity_config = activity_config or ActivityConfig(
start_to_close_timeout=timedelta(seconds=60),
)
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

activity_config or ActivityConfig(...) will override an explicitly provided empty config dict ({}) with the default timeout. Use if activity_config is None to decide when to apply defaults so callers can intentionally provide an empty/partial config without it being replaced.

Suggested change
self._activity_config = activity_config or ActivityConfig(
start_to_close_timeout=timedelta(seconds=60),
)
if activity_config is None:
self._activity_config = ActivityConfig(
start_to_close_timeout=timedelta(seconds=60),
)
else:
self._activity_config = activity_config

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants