-
Notifications
You must be signed in to change notification settings - Fork 810
feat: support interrupts from AfterToolCallEvent (#1165) #2178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,7 +15,7 @@ | |
|
|
||
| from opentelemetry import trace as trace_api | ||
|
|
||
| from ..hooks import AfterModelCallEvent, BeforeModelCallEvent, MessageAddedEvent | ||
| from ..hooks import AfterModelCallEvent, AfterToolCallEvent, BeforeModelCallEvent, MessageAddedEvent | ||
| from ..telemetry.metrics import Trace | ||
| from ..telemetry.tracer import Tracer, get_tracer | ||
| from ..tools._validator import validate_and_prepare_tools | ||
|
|
@@ -476,15 +476,48 @@ async def _handle_tool_execution( | |
| validate_and_prepare_tools(message, tool_uses, tool_results, invalid_tool_use_ids) | ||
| tool_uses = [tool_use for tool_use in tool_uses if tool_use.get("toolUseId") not in invalid_tool_use_ids] | ||
|
|
||
| interrupts = [] | ||
|
|
||
| if agent._interrupt_state.activated: | ||
| tool_results.extend(agent._interrupt_state.context["tool_results"]) | ||
|
|
||
| # Replay after-tool-call hooks for tools that were interrupted after execution. | ||
| # The tool result is already preserved in tool_results; we re-fire the hook so | ||
| # the callback receives the human response via event.interrupt() return value. | ||
| after_tool_snapshots: list[dict[str, Any]] = agent._interrupt_state.context.get("after_tool_events", []) | ||
| for snapshot in after_tool_snapshots: | ||
| tool_name = snapshot["tool_use"]["name"] | ||
| tool_func = agent.tool_registry.dynamic_tools.get(tool_name) or agent.tool_registry.registry.get(tool_name) | ||
| original_exception = Exception(snapshot["exception"]) if snapshot.get("exception") else None | ||
| original_event = AfterToolCallEvent( | ||
| agent=agent, | ||
| selected_tool=tool_func, | ||
| tool_use=snapshot["tool_use"], | ||
| invocation_state=invocation_state, | ||
| result=snapshot["result"], | ||
| exception=original_exception, | ||
| cancel_message=snapshot.get("cancel_message"), | ||
| ) | ||
| replayed, new_interrupts = await agent.hooks.invoke_callbacks_async(original_event) | ||
| if new_interrupts: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: Missing test coverage for the re-interrupt-during-replay path. Codecov reports 80% patch coverage on this file with 2 missing lines and 3 partials. The path at L502-503 (replay produces new interrupts) appears untested. Given the bug identified above (snapshots not re-added for re-interrupts), this would be a valuable test case. A test for the "callback interrupts again during replay" scenario would both validate the fix and cover the gap. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
| interrupts.extend(new_interrupts) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: Re-interrupt during replay loses the after-tool snapshot. When the replay loop (L501-503) produces Suggestion: When the replay loop encounters if new_interrupts:
interrupts.extend(new_interrupts)
after_tool_interrupt_events.append(snapshot)
continueNote that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. Moved There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. Moved |
||
| continue | ||
| tool_use_id = original_event.tool_use["toolUseId"] | ||
| if getattr(replayed, "retry", False): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: The Looking at the interaction between There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added an inline comment in the replay logic explaining that retry and interrupt responses can coexist on resume — the callback receives the human response (interrupt() returns instead of raising) and may then set retry=True. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added inline comment in the replay logic explaining that retry and interrupt responses can coexist on resume — |
||
| # Hook wants to re-execute the tool — remove preserved result and re-queue | ||
| tool_results[:] = [tr for tr in tool_results if tr["toolUseId"] != tool_use_id] | ||
| tool_uses.append(original_event.tool_use) | ||
| else: | ||
| # Update result in case the hook modified it | ||
| for i, tr in enumerate(tool_results): | ||
| if tr["toolUseId"] == tool_use_id: | ||
| tool_results[i] = replayed.result | ||
| break | ||
|
|
||
| # Filter to only the interrupted tools when resuming from interrupt (tool uses without results) | ||
| tool_use_ids = {tool_result["toolUseId"] for tool_result in tool_results} | ||
| tool_uses = [tool_use for tool_use in tool_uses if tool_use["toolUseId"] not in tool_use_ids] | ||
|
|
||
| interrupts = [] | ||
|
|
||
| # Check for cancellation before tool execution | ||
| # Add tool_result for each tool_use to maintain valid conversation state | ||
| if agent._cancel_signal.is_set(): | ||
|
|
@@ -528,9 +561,20 @@ async def _handle_tool_execution( | |
| tool_events = agent.tool_executor._execute( | ||
| agent, tool_uses, tool_results, cycle_trace, cycle_span, invocation_state, structured_output_context | ||
| ) | ||
| after_tool_interrupt_events: list[dict[str, Any]] = [] | ||
| async for tool_event in tool_events: | ||
| if isinstance(tool_event, ToolInterruptEvent): | ||
| interrupts.extend(tool_event["tool_interrupt_event"]["interrupts"]) | ||
| if isinstance(tool_event.source_event, AfterToolCallEvent): | ||
| evt = tool_event.source_event | ||
| after_tool_interrupt_events.append( | ||
| { | ||
| "tool_use": evt.tool_use, | ||
| "result": evt.result, | ||
| "cancel_message": evt.cancel_message, | ||
| "exception": str(evt.exception) if evt.exception else None, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: The Looking at the snapshot structure at L570-577, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Switched to direct key access ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Switched to direct key access ( |
||
| } | ||
| ) | ||
|
|
||
| yield tool_event | ||
|
|
||
|
|
@@ -544,7 +588,11 @@ async def _handle_tool_execution( | |
|
|
||
| if interrupts: | ||
| # Session state stored on AfterInvocationEvent. | ||
| agent._interrupt_state.context = {"tool_use_message": message, "tool_results": tool_results} | ||
| agent._interrupt_state.context = { | ||
| "tool_use_message": message, | ||
| "tool_results": tool_results, | ||
| "after_tool_events": after_tool_interrupt_events, | ||
| } | ||
| agent._interrupt_state.activate() | ||
|
|
||
| agent.event_loop_metrics.end_cycle(cycle_start_time, cycle_trace) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Issue: Original exception type is lost during serialization/deserialization.
The exception is stored as
str(evt.exception)at L575 and restored asException(snapshot["exception"])here. This means the original exception type (e.g.,RuntimeError,ValueError) is replaced with a genericException. Callbacks that useisinstancechecks on the exception type during replay will see different behavior than the original invocation.Suggestion: Consider preserving the exception type name and restoring it, or at minimum document this limitation in the docstring. If type fidelity matters, you could store both:
Though for session persistence across processes, the generic
Exceptionapproach may be the pragmatic choice — just make sure the docstring calls out this behavior.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documented this limitation in both the
AfterToolCallEventdocstring and as an inline comment in the replay loop. The genericExceptionapproach is the pragmatic choice for session persistence across processes — the docstring now explicitly calls out the behavior.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documented in both the
AfterToolCallEventdocstring and the replay loop inline comments. GenericExceptionis the pragmatic choice for session persistence — the docstring now explicitly calls out the behavior.