Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 83 additions & 98 deletions apps/sim/executor/execution/block-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
type ExecutionContext,
getNextExecutionOrder,
type NormalizedBlockOutput,
type StreamingExecution,
} from '@/executor/types'
import { streamingResponseFormatProcessor } from '@/executor/utils'
import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors'
Expand Down Expand Up @@ -140,7 +141,7 @@ export class BlockExecutor {

let normalizedOutput: NormalizedBlockOutput
if (isStreamingExecution) {
const streamingExec = output as { stream: ReadableStream; execution: any }
const streamingExec = output as StreamingExecution

if (ctx.onStream) {
await this.handleStreamingExecution(
Expand Down Expand Up @@ -602,7 +603,7 @@ export class BlockExecutor {
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock,
streamingExec: { stream: ReadableStream; execution: any },
streamingExec: StreamingExecution,
resolvedInputs: Record<string, any>,
selectedOutputs: string[]
): Promise<void> {
Expand All @@ -613,129 +614,113 @@ export class BlockExecutor {
(block.config?.params as Record<string, any> | undefined)?.responseFormat ??
(block.config as Record<string, any> | undefined)?.responseFormat

const stream = streamingExec.stream
if (typeof stream.tee !== 'function') {
await this.forwardStream(ctx, blockId, streamingExec, stream, responseFormat, selectedOutputs)
return
}
const sourceReader = streamingExec.stream.getReader()
const decoder = new TextDecoder()
const accumulated: string[] = []
let drainError: unknown
let sourceFullyDrained = false

const [clientStream, executorStream] = stream.tee()
const clientSource = new ReadableStream<Uint8Array>({
async pull(controller) {
try {
const { done, value } = await sourceReader.read()
if (done) {
const tail = decoder.decode()
if (tail) accumulated.push(tail)
sourceFullyDrained = true
controller.close()
return
}
accumulated.push(decoder.decode(value, { stream: true }))
controller.enqueue(value)
} catch (error) {
drainError = error
controller.error(error)
}
},
async cancel(reason) {
try {
await sourceReader.cancel(reason)
} catch {}
},
})

const processedClientStream = streamingResponseFormatProcessor.processStream(
clientStream,
blockId,
selectedOutputs,
responseFormat
)

const clientStreamingExec = {
...streamingExec,
stream: processedClientStream,
}

const executorConsumption = this.consumeExecutorStream(
executorStream,
streamingExec,
blockId,
responseFormat
)

const clientConsumption = (async () => {
try {
await ctx.onStream?.(clientStreamingExec)
} catch (error) {
this.execLogger.error('Error in onStream callback', { blockId, error })
// Cancel the client stream to release the tee'd buffer
await processedClientStream.cancel().catch(() => {})
}
})()

await Promise.all([clientConsumption, executorConsumption])
}

private async forwardStream(
ctx: ExecutionContext,
blockId: string,
streamingExec: { stream: ReadableStream; execution: any },
stream: ReadableStream,
responseFormat: any,
selectedOutputs: string[]
): Promise<void> {
const processedStream = streamingResponseFormatProcessor.processStream(
stream,
clientSource,
blockId,
selectedOutputs,
responseFormat
)

try {
await ctx.onStream?.({
...streamingExec,
stream: processedStream,
stream: processedClientStream,
execution: streamingExec.execution,
})
} catch (error) {
this.execLogger.error('Error in onStream callback', { blockId, error })
await processedStream.cancel().catch(() => {})
}
}

private async consumeExecutorStream(
stream: ReadableStream,
streamingExec: { execution: any },
blockId: string,
responseFormat: any
): Promise<void> {
const reader = stream.getReader()
const decoder = new TextDecoder()
const chunks: string[] = []

try {
while (true) {
const { done, value } = await reader.read()
if (done) break
chunks.push(decoder.decode(value, { stream: true }))
}
const tail = decoder.decode()
if (tail) chunks.push(tail)
} catch (error) {
this.execLogger.error('Error reading executor stream for block', { blockId, error })
await processedClientStream.cancel().catch(() => {})
} finally {
try {
await reader.cancel().catch(() => {})
sourceReader.releaseLock()
} catch {}
}

const fullContent = chunks.join('')
Comment thread
TheodoreSpeaks marked this conversation as resolved.
if (!fullContent) {
if (drainError) {
this.execLogger.error('Error reading stream for block', { blockId, error: drainError })
return
}

const executionOutput = streamingExec.execution?.output
if (!executionOutput || typeof executionOutput !== 'object') {
// If the onStream consumer exited before the source drained (e.g. it caught
// an internal error and returned normally), `accumulated` holds a truncated
// response. Persisting that to memory or setting it as the block output
// would corrupt downstream state — skip and log instead.
if (!sourceFullyDrained) {
this.execLogger.warn(
'Stream consumer exited before source drained; skipping content persistence',
{
blockId,
}
)
return
}

if (responseFormat) {
try {
const parsed = JSON.parse(fullContent.trim())

streamingExec.execution.output = {
...parsed,
tokens: executionOutput.tokens,
toolCalls: executionOutput.toolCalls,
providerTiming: executionOutput.providerTiming,
cost: executionOutput.cost,
model: executionOutput.model,
const fullContent = accumulated.join('')
if (fullContent) {
const executionOutput = streamingExec.execution?.output
if (executionOutput && typeof executionOutput === 'object') {
let parsedForFormat = false
if (responseFormat) {
try {
const parsed = JSON.parse(fullContent.trim())
streamingExec.execution.output = {
...parsed,
tokens: executionOutput.tokens,
toolCalls: executionOutput.toolCalls,
providerTiming: executionOutput.providerTiming,
cost: executionOutput.cost,
model: executionOutput.model,
}
parsedForFormat = true
} catch (error) {
this.execLogger.warn('Failed to parse streamed content for response format', {
blockId,
error,
})
}
}
if (!parsedForFormat) {
executionOutput.content = fullContent
}
return
} catch (error) {
this.execLogger.warn('Failed to parse streamed content for response format', {
blockId,
error,
})
}
}

executionOutput.content = fullContent
if (streamingExec.onFullContent) {
try {
await streamingExec.onFullContent(fullContent)
} catch (error) {
this.execLogger.error('onFullContent callback failed', { blockId, error })
}
}
}
}
10 changes: 9 additions & 1 deletion apps/sim/executor/handlers/agent/agent-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -958,8 +958,16 @@ export class AgentBlockHandler implements BlockHandler {
streamingExec: StreamingExecution
): StreamingExecution {
return {
stream: memoryService.wrapStreamForPersistence(streamingExec.stream, ctx, inputs),
stream: streamingExec.stream,
execution: streamingExec.execution,
onFullContent: async (content: string) => {
if (!content.trim()) return
try {
await memoryService.appendToMemory(ctx, inputs, { role: 'assistant', content })
} catch (error) {
logger.error('Failed to persist streaming response:', error)
}
},
}
}

Expand Down
29 changes: 0 additions & 29 deletions apps/sim/executor/handlers/agent/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,35 +111,6 @@ export class Memory {
})
}

wrapStreamForPersistence(
stream: ReadableStream<Uint8Array>,
ctx: ExecutionContext,
inputs: AgentInputs
): ReadableStream<Uint8Array> {
const chunks: string[] = []
const decoder = new TextDecoder()

const transformStream = new TransformStream<Uint8Array, Uint8Array>({
transform: (chunk, controller) => {
controller.enqueue(chunk)
const decoded = decoder.decode(chunk, { stream: true })
chunks.push(decoded)
},

flush: () => {
const content = chunks.join('')
if (content.trim()) {
this.appendToMemory(ctx, inputs, {
role: 'assistant',
content,
}).catch((error) => logger.error('Failed to persist streaming response:', error))
}
},
})

return stream.pipeThrough(transformStream)
}

private requireWorkspaceId(ctx: ExecutionContext): string {
if (!ctx.workspaceId) {
throw new Error('workspaceId is required for memory operations')
Expand Down
6 changes: 6 additions & 0 deletions apps/sim/executor/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,12 @@ export interface ExecutionResult {
export interface StreamingExecution {
stream: ReadableStream
execution: ExecutionResult & { isStreaming?: boolean }
/**
* Invoked with the assembled response text after the stream drains. Lets agent
* blocks persist the full response without interposing a TransformStream on a
* fetch-backed source — that pattern amplifies memory on Bun via #28035.
*/
onFullContent?: (content: string) => void | Promise<void>
}

export interface BlockExecutor {
Expand Down
Loading