Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 52 additions & 4 deletions src/strands/event_loop/event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from opentelemetry import trace as trace_api

from ..hooks import AfterModelCallEvent, BeforeModelCallEvent, MessageAddedEvent
from ..hooks import AfterModelCallEvent, AfterToolCallEvent, BeforeModelCallEvent, MessageAddedEvent
from ..telemetry.metrics import Trace
from ..telemetry.tracer import Tracer, get_tracer
from ..tools._validator import validate_and_prepare_tools
Expand Down Expand Up @@ -476,15 +476,48 @@ async def _handle_tool_execution(
validate_and_prepare_tools(message, tool_uses, tool_results, invalid_tool_use_ids)
tool_uses = [tool_use for tool_use in tool_uses if tool_use.get("toolUseId") not in invalid_tool_use_ids]

interrupts = []

if agent._interrupt_state.activated:
tool_results.extend(agent._interrupt_state.context["tool_results"])

# Replay after-tool-call hooks for tools that were interrupted after execution.
# The tool result is already preserved in tool_results; we re-fire the hook so
# the callback receives the human response via event.interrupt() return value.
after_tool_snapshots: list[dict[str, Any]] = agent._interrupt_state.context.get("after_tool_events", [])
for snapshot in after_tool_snapshots:
tool_name = snapshot["tool_use"]["name"]
tool_func = agent.tool_registry.dynamic_tools.get(tool_name) or agent.tool_registry.registry.get(tool_name)
original_exception = Exception(snapshot["exception"]) if snapshot.get("exception") else None
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: Original exception type is lost during serialization/deserialization.

The exception is stored as str(evt.exception) at L575 and restored as Exception(snapshot["exception"]) here. This means the original exception type (e.g., RuntimeError, ValueError) is replaced with a generic Exception. Callbacks that use isinstance checks on the exception type during replay will see different behavior than the original invocation.

Suggestion: Consider preserving the exception type name and restoring it, or at minimum document this limitation in the docstring. If type fidelity matters, you could store both:

"exception": str(evt.exception) if evt.exception else None,
"exception_type": type(evt.exception).__name__ if evt.exception else None,

Though for session persistence across processes, the generic Exception approach may be the pragmatic choice — just make sure the docstring calls out this behavior.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented this limitation in both the AfterToolCallEvent docstring and as an inline comment in the replay loop. The generic Exception approach is the pragmatic choice for session persistence across processes — the docstring now explicitly calls out the behavior.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented in both the AfterToolCallEvent docstring and the replay loop inline comments. Generic Exception is the pragmatic choice for session persistence — the docstring now explicitly calls out the behavior.

original_event = AfterToolCallEvent(
agent=agent,
selected_tool=tool_func,
tool_use=snapshot["tool_use"],
invocation_state=invocation_state,
result=snapshot["result"],
exception=original_exception,
cancel_message=snapshot.get("cancel_message"),
)
replayed, new_interrupts = await agent.hooks.invoke_callbacks_async(original_event)
if new_interrupts:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: Missing test coverage for the re-interrupt-during-replay path.

Codecov reports 80% patch coverage on this file with 2 missing lines and 3 partials. The path at L502-503 (replay produces new interrupts) appears untested. Given the bug identified above (snapshots not re-added for re-interrupts), this would be a valuable test case. A test for the "callback interrupts again during replay" scenario would both validate the fix and cover the gap.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test_after_tool_interrupt_re_interrupt_during_replay which covers this path and validates the bug fix from the first review comment.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test_after_tool_interrupt_re_interrupt_during_replay covering this path and validating the snapshot re-persistence fix.

interrupts.extend(new_interrupts)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: Re-interrupt during replay loses the after-tool snapshot.

When the replay loop (L501-503) produces new_interrupts, these get added to the top-level interrupts list, which triggers the interrupt path at L589. However, after_tool_interrupt_events (L564) is only populated from the tool_events loop at L566-577 — snapshots for tools that re-interrupt during the replay loop are never added to after_tool_interrupt_events. This means _interrupt_state.context["after_tool_events"] won't contain the snapshot for the re-interrupted tool, so the next resume won't replay it.

Suggestion: When the replay loop encounters new_interrupts, re-add the snapshot to after_tool_interrupt_events:

if new_interrupts:
    interrupts.extend(new_interrupts)
    after_tool_interrupt_events.append(snapshot)
    continue

Note that after_tool_interrupt_events would need to be declared before the if agent._interrupt_state.activated block (move it up from L564) to be accessible in the replay loop.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Moved after_tool_interrupt_events declaration before the if agent._interrupt_state.activated block and added after_tool_interrupt_events.append(snapshot) when replay produces new interrupts. Added a test (test_after_tool_interrupt_re_interrupt_during_replay) that validates the snapshot is re-persisted for the next resume.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Moved after_tool_interrupt_events declaration before the if agent._interrupt_state.activated block and added after_tool_interrupt_events.append(snapshot) when replay produces new interrupts. Added test_after_tool_interrupt_re_interrupt_during_replay to validate.

continue
tool_use_id = original_event.tool_use["toolUseId"]
if getattr(replayed, "retry", False):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: The _can_write method doesn't include "retry" as a writable property alongside interrupts.

Looking at the interaction between retry and interrupt: when both are set on the same event, interrupt raises InterruptException which short-circuits the callback before retry is typically read. The test test_after_tool_call_interrupt_takes_precedence_over_retry validates this. However, during replay, the callback can set retry=True after getting the interrupt response (without raising). This interplay seems intentional but should be documented more clearly — a comment in the replay logic noting that retry and interrupt responses can coexist on resume would help future readers.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an inline comment in the replay logic explaining that retry and interrupt responses can coexist on resume — the callback receives the human response (interrupt() returns instead of raising) and may then set retry=True.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added inline comment in the replay logic explaining that retry and interrupt responses can coexist on resume — interrupt() returns instead of raising when a response is available, then the callback may set retry=True.

# Hook wants to re-execute the tool — remove preserved result and re-queue
tool_results[:] = [tr for tr in tool_results if tr["toolUseId"] != tool_use_id]
tool_uses.append(original_event.tool_use)
else:
# Update result in case the hook modified it
for i, tr in enumerate(tool_results):
if tr["toolUseId"] == tool_use_id:
tool_results[i] = replayed.result
break

# Filter to only the interrupted tools when resuming from interrupt (tool uses without results)
tool_use_ids = {tool_result["toolUseId"] for tool_result in tool_results}
tool_uses = [tool_use for tool_use in tool_uses if tool_use["toolUseId"] not in tool_use_ids]

interrupts = []

# Check for cancellation before tool execution
# Add tool_result for each tool_use to maintain valid conversation state
if agent._cancel_signal.is_set():
Expand Down Expand Up @@ -528,9 +561,20 @@ async def _handle_tool_execution(
tool_events = agent.tool_executor._execute(
agent, tool_uses, tool_results, cycle_trace, cycle_span, invocation_state, structured_output_context
)
after_tool_interrupt_events: list[dict[str, Any]] = []
async for tool_event in tool_events:
if isinstance(tool_event, ToolInterruptEvent):
interrupts.extend(tool_event["tool_interrupt_event"]["interrupts"])
if isinstance(tool_event.source_event, AfterToolCallEvent):
evt = tool_event.source_event
after_tool_interrupt_events.append(
{
"tool_use": evt.tool_use,
"result": evt.result,
"cancel_message": evt.cancel_message,
"exception": str(evt.exception) if evt.exception else None,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: The exception field in the snapshot dictionary is inconsistent with the cancel_message and other snapshot fields.

Looking at the snapshot structure at L570-577, the exception key is always present (set to str(evt.exception) or None). But at L491, the code uses snapshot.get("exception") which suggests it might be absent. These two approaches are consistent in practice since get() with a missing key returns None, but it would be cleaner to check snapshot["exception"] directly (since the key is always set) or use snapshot.get("exception") consistently in both write and read paths.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to direct key access (snapshot["exception"], snapshot["cancel_message"]) since the keys are always set in the serialization path. Updated test fixtures to include the "exception" key to match.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to direct key access (snapshot["exception"], snapshot["cancel_message"]) since the keys are always set at serialization (L574-575). Updated test fixtures to include "exception" key to match.

}
)

yield tool_event

Expand All @@ -544,7 +588,11 @@ async def _handle_tool_execution(

if interrupts:
# Session state stored on AfterInvocationEvent.
agent._interrupt_state.context = {"tool_use_message": message, "tool_results": tool_results}
agent._interrupt_state.context = {
"tool_use_message": message,
"tool_results": tool_results,
"after_tool_events": after_tool_interrupt_events,
}
agent._interrupt_state.activate()

agent.event_loop_metrics.end_cycle(cycle_start_time, cycle_trace)
Expand Down
20 changes: 19 additions & 1 deletion src/strands/hooks/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def _interrupt_id(self, name: str) -> str:


@dataclass
class AfterToolCallEvent(HookEvent):
class AfterToolCallEvent(HookEvent, _Interruptible):
"""Event triggered after a tool invocation completes.

This event is fired after the agent has finished executing a tool,
Expand All @@ -193,6 +193,12 @@ class AfterToolCallEvent(HookEvent):
- ToolResultEvent is NOT emitted for discarded attempts - only the final attempt's
result is emitted and added to the conversation history

Interrupts:
Hook callbacks can call ``event.interrupt(name, reason)`` to pause agent execution
and request human input. The tool result is preserved and the tool will not be
re-executed on resume. See :func:`strands.event_loop.event_loop._handle_tool_execution`
for the replay mechanism.

Attributes:
selected_tool: The tool that was invoked. It may be None if tool lookup failed.
tool_use: The tool parameters that were passed to the tool invoked.
Expand Down Expand Up @@ -221,6 +227,18 @@ def should_reverse_callbacks(self) -> bool:
"""True to invoke callbacks in reverse order."""
return True

@override
def _interrupt_id(self, name: str) -> str:
"""Unique id for the interrupt.

Args:
name: User defined name for the interrupt.

Returns:
Interrupt id.
"""
return f"v1:after_tool_call:{self.tool_use['toolUseId']}:{uuid.uuid5(uuid.NAMESPACE_OID, name)}"


@dataclass
class BeforeModelCallEvent(HookEvent):
Expand Down
31 changes: 27 additions & 4 deletions src/strands/tools/executors/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ async def _stream(
"content": [{"text": cancel_message}],
}

after_event, _ = await ToolExecutor._invoke_after_tool_call_hook(
after_event, interrupts = await ToolExecutor._invoke_after_tool_call_hook(
agent,
None,
tool_use,
Expand All @@ -179,6 +179,12 @@ async def _stream(
exception=Exception(cancel_message),
cancel_message=cancel_message,
)

if interrupts:
tool_results.append(after_event.result)
yield ToolInterruptEvent(tool_use, interrupts, source_event=after_event)
return

yield ToolResultEvent(after_event.result, exception=after_event.exception)
tool_results.append(after_event.result)
return
Expand Down Expand Up @@ -209,9 +215,15 @@ async def _stream(
}

unknown_tool_error = Exception(f"Unknown tool: {tool_name}")
after_event, _ = await ToolExecutor._invoke_after_tool_call_hook(
after_event, interrupts = await ToolExecutor._invoke_after_tool_call_hook(
agent, selected_tool, tool_use, invocation_state, result, exception=unknown_tool_error
)

if interrupts:
tool_results.append(after_event.result)
yield ToolInterruptEvent(tool_use, interrupts, source_event=after_event)
return

# Check if retry requested for unknown tool error
# Use getattr because BidiAfterToolCallEvent doesn't have retry attribute
if getattr(after_event, "retry", False):
Expand Down Expand Up @@ -256,10 +268,15 @@ async def _stream(

result = cast(ToolResult, event)

after_event, _ = await ToolExecutor._invoke_after_tool_call_hook(
after_event, interrupts = await ToolExecutor._invoke_after_tool_call_hook(
agent, selected_tool, tool_use, invocation_state, result, exception=exception
)

if interrupts:
tool_results.append(after_event.result)
yield ToolInterruptEvent(tool_use, interrupts, source_event=after_event)
return

# Check if retry requested (getattr for BidiAfterToolCallEvent compatibility)
if getattr(after_event, "retry", False):
logger.debug("tool_name=<%s> | retry requested, retrying tool call", tool_name)
Expand All @@ -277,9 +294,15 @@ async def _stream(
"content": [{"text": f"Error: {str(e)}"}],
}

after_event, _ = await ToolExecutor._invoke_after_tool_call_hook(
after_event, interrupts = await ToolExecutor._invoke_after_tool_call_hook(
agent, selected_tool, tool_use, invocation_state, error_result, exception=e
)

if interrupts:
tool_results.append(after_event.result)
yield ToolInterruptEvent(tool_use, interrupts, source_event=after_event)
return

# Check if retry requested (getattr for BidiAfterToolCallEvent compatibility)
if getattr(after_event, "retry", False):
logger.debug("tool_name=<%s> | retry requested after exception, retrying tool call", tool_name)
Expand Down
19 changes: 17 additions & 2 deletions src/strands/types/_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
if TYPE_CHECKING:
from ..agent import AgentResult
from ..agent._agent_as_tool import _AgentAsTool
from ..hooks.registry import BaseHookEvent
from ..multiagent.base import MultiAgentResult, NodeResult


Expand Down Expand Up @@ -373,11 +374,25 @@ def message(self) -> str:


class ToolInterruptEvent(TypedEvent):
"""Event emitted when a tool is interrupted."""
"""Event emitted when a tool is interrupted.

def __init__(self, tool_use: ToolUse, interrupts: list[Interrupt]) -> None:
Attributes:
source_event: The hook event that raised the interrupt, if available. For interrupts
raised from AfterToolCallEvent, this preserves the original event so the after-hook
can be replayed on resume without re-executing the tool.
"""

def __init__(
self, tool_use: ToolUse, interrupts: list[Interrupt], source_event: "BaseHookEvent | None" = None
) -> None:
"""Set interrupt in the event payload."""
super().__init__({"tool_interrupt_event": {"tool_use": tool_use, "interrupts": interrupts}})
self._source_event = source_event

@property
def source_event(self) -> "BaseHookEvent | None":
"""The hook event that raised the interrupt."""
return self._source_event

@property
def tool_use_id(self) -> str:
Expand Down
Loading
Loading