diff --git a/.server-changes/realtime-redis-connection-leak.md b/.server-changes/realtime-redis-connection-leak.md new file mode 100644 index 0000000000..e27b200174 --- /dev/null +++ b/.server-changes/realtime-redis-connection-leak.md @@ -0,0 +1,10 @@ +--- +area: webapp +type: fix +--- + +Fix Redis connection leak in realtime streams and broken abort signal propagation. + +**Redis connections**: Non-blocking methods (ingestData, appendPart, getLastChunkIndex) now share a single Redis connection instead of creating one per request. streamResponse still uses dedicated connections (required for XREAD BLOCK) but now tears them down immediately via disconnect() instead of graceful quit(), with a 15s inactivity fallback. + +**Abort signal**: request.signal is broken in Remix/Express due to a Node.js undici GC bug (nodejs/node#55428) that severs the signal chain when Remix clones the Request internally. Added getRequestAbortSignal() wired to Express res.on("close") via httpAsyncStorage, which fires reliably on client disconnect. All SSE/streaming routes updated to use it. diff --git a/apps/webapp/CLAUDE.md b/apps/webapp/CLAUDE.md index dff3ca4eb8..a4de6ab57b 100644 --- a/apps/webapp/CLAUDE.md +++ b/apps/webapp/CLAUDE.md @@ -59,6 +59,17 @@ Use the `chrome-devtools` MCP server to visually verify local dashboard changes. Routes use Remix flat-file convention with dot-separated segments: `api.v1.tasks.$taskId.trigger.ts` -> `/api/v1/tasks/:taskId/trigger` +## Abort Signals + +**Never use `request.signal`** for detecting client disconnects. It is broken due to a Node.js bug ([nodejs/node#55428](https://github.com/nodejs/node/issues/55428)) where the AbortSignal chain is severed when Remix internally clones the Request object. Instead, use `getRequestAbortSignal()` from `app/services/httpAsyncStorage.server.ts`, which is wired directly to Express `res.on("close")` and fires reliably. + +```typescript +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; + +// In route handlers, SSE streams, or any server-side code: +const signal = getRequestAbortSignal(); +``` + ## Environment Variables Access via `env` export from `app/env.server.ts`. **Never use `process.env` directly.** diff --git a/apps/webapp/app/presenters/v3/TasksStreamPresenter.server.ts b/apps/webapp/app/presenters/v3/TasksStreamPresenter.server.ts index d690b3d083..17a5bda620 100644 --- a/apps/webapp/app/presenters/v3/TasksStreamPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/TasksStreamPresenter.server.ts @@ -1,6 +1,7 @@ import { type TaskRunAttempt } from "@trigger.dev/database"; import { eventStream } from "remix-utils/sse/server"; import { type PrismaClient, prisma } from "~/db.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { logger } from "~/services/logger.server"; import { projectPubSub } from "~/v3/services/projectPubSub.server"; @@ -63,7 +64,9 @@ export class TasksStreamPresenter { const subscriber = await projectPubSub.subscribe(`project:${project.id}:*`); - return eventStream(request.signal, (send, close) => { + const signal = getRequestAbortSignal(); + + return eventStream(signal, (send, close) => { const safeSend = (args: { event?: string; data: string }) => { try { send(args); @@ -95,7 +98,7 @@ export class TasksStreamPresenter { }); pinger = setInterval(() => { - if (request.signal.aborted) { + if (signal.aborted) { return close(); } diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts index 822c10a810..aabd83bc9b 100644 --- a/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts @@ -1,6 +1,7 @@ import { type ActionFunctionArgs } from "@remix-run/server-runtime"; import { z } from "zod"; import { $replica } from "~/db.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; @@ -129,7 +130,7 @@ export const loader = createLoaderApiRoute( run.realtimeStreamsVersion ); - return realtimeStream.streamResponse(request, run.friendlyId, params.streamId, request.signal, { + return realtimeStream.streamResponse(request, run.friendlyId, params.streamId, getRequestAbortSignal(), { lastEventId, timeoutInSeconds, }); diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts index 98c348a023..b16b1ca792 100644 --- a/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts @@ -1,6 +1,7 @@ import { json } from "@remix-run/server-runtime"; import { z } from "zod"; import { $replica } from "~/db.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { getInputStreamWaitpoint, deleteInputStreamWaitpoint, @@ -162,7 +163,7 @@ const loader = createLoaderApiRoute( request, run.friendlyId, `$trigger.input:${params.streamId}`, - request.signal, + getRequestAbortSignal(), { lastEventId, timeoutInSeconds, diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx index b5763bb4e9..1295adb784 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx @@ -21,6 +21,7 @@ import { $replica } from "~/db.server"; import { useEnvironment } from "~/hooks/useEnvironment"; import { useOrganization } from "~/hooks/useOrganizations"; import { useProject } from "~/hooks/useProject"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; import { requireUserId } from "~/services/session.server"; import { cn } from "~/utils/cn"; @@ -89,7 +90,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { run.realtimeStreamsVersion ); - return realtimeStream.streamResponse(request, run.friendlyId, streamKey, request.signal, { + return realtimeStream.streamResponse(request, run.friendlyId, streamKey, getRequestAbortSignal(), { lastEventId, }); }; diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.ai-generate-payload.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.ai-generate-payload.tsx index 2cfbe6a10b..a4a5b8900b 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.ai-generate-payload.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.ai-generate-payload.tsx @@ -5,6 +5,7 @@ import { z } from "zod"; import { env } from "~/env.server"; import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { requireUserId } from "~/services/session.server"; import { EnvironmentParamSchema } from "~/utils/pathBuilder"; import { inflate } from "node:zlib"; @@ -92,7 +93,7 @@ export async function action({ request, params }: ActionFunctionArgs) { const result = streamText({ model: openai(env.AI_RUN_FILTER_MODEL ?? "gpt-5-mini"), temperature: 1, - abortSignal: request.signal, + abortSignal: getRequestAbortSignal(), system: systemPrompt, prompt, tools: { diff --git a/apps/webapp/app/services/httpAsyncStorage.server.ts b/apps/webapp/app/services/httpAsyncStorage.server.ts index 7b709e4bf1..24b5c23f87 100644 --- a/apps/webapp/app/services/httpAsyncStorage.server.ts +++ b/apps/webapp/app/services/httpAsyncStorage.server.ts @@ -5,6 +5,7 @@ export type HttpLocalStorage = { path: string; host: string; method: string; + abortController: AbortController; }; const httpLocalStorage = new AsyncLocalStorage(); @@ -18,3 +19,15 @@ export function runWithHttpContext(context: HttpLocalStorage, fn: () => T): T export function getHttpContext(): HttpLocalStorage | undefined { return httpLocalStorage.getStore(); } + +// Fallback signal that is never aborted, safe for tests and non-Express contexts. +const neverAbortedSignal = new AbortController().signal; + +/** + * Returns an AbortSignal wired to the Express response's "close" event. + * This bypasses the broken request.signal chain in @remix-run/express + * (caused by Node.js undici GC bug nodejs/node#55428). + */ +export function getRequestAbortSignal(): AbortSignal { + return httpLocalStorage.getStore()?.abortController.signal ?? neverAbortedSignal; +} diff --git a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts index e742f770a9..99ad10c8ee 100644 --- a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts @@ -7,7 +7,7 @@ export type RealtimeStreamsOptions = { redis: RedisOptions | undefined; logger?: Logger; logLevel?: LogLevel; - inactivityTimeoutMs?: number; // Close stream after this many ms of no new data (default: 60000) + inactivityTimeoutMs?: number; // Close stream after this many ms of no new data (default: 15000) }; // Legacy constant for backward compatibility (no longer written, but still recognized when reading) @@ -23,10 +23,23 @@ type StreamChunk = export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { private logger: Logger; private inactivityTimeoutMs: number; + // Shared connection for short-lived non-blocking operations (XADD, XREVRANGE, EXPIRE). + // Lazily created on first use so we don't open a connection if only streamResponse is called. + private _sharedRedis: Redis | undefined; constructor(private options: RealtimeStreamsOptions) { this.logger = options.logger ?? new Logger("RedisRealtimeStreams", options.logLevel ?? "info"); - this.inactivityTimeoutMs = options.inactivityTimeoutMs ?? 60000; // Default: 60 seconds + this.inactivityTimeoutMs = options.inactivityTimeoutMs ?? 15000; // Default: 15 seconds + } + + private get sharedRedis(): Redis { + if (!this._sharedRedis) { + this._sharedRedis = new Redis({ + ...this.options.redis, + connectionName: "realtime:shared", + }); + } + return this._sharedRedis; } async initializeStream( @@ -43,7 +56,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { signal: AbortSignal, options?: StreamResponseOptions ): Promise { - const redis = new Redis(this.options.redis ?? {}); + const redis = new Redis({ ...this.options.redis, connectionName: "realtime:streamResponse" }); const streamKey = `stream:${runId}:${streamId}`; let isCleanedUp = false; @@ -269,7 +282,10 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { async function cleanup() { if (isCleanedUp) return; isCleanedUp = true; - await redis.quit().catch(console.error); + // disconnect() tears down the TCP socket immediately, which causes any + // pending XREAD BLOCK to reject right away instead of waiting for the + // block timeout to elapse. quit() would queue behind the blocking command. + redis.disconnect(); } signal.addEventListener("abort", cleanup, { once: true }); @@ -290,22 +306,12 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { clientId: string, resumeFromChunk?: number ): Promise { - const redis = new Redis(this.options.redis ?? {}); + const redis = this.sharedRedis; const streamKey = `stream:${runId}:${streamId}`; const startChunk = resumeFromChunk ?? 0; // Start counting from the resume point, not from 0 let currentChunkIndex = startChunk; - const self = this; - - async function cleanup() { - try { - await redis.quit(); - } catch (error) { - self.logger.error("[RedisRealtimeStreams][ingestData] Error in cleanup:", { error }); - } - } - try { const textStream = stream.pipeThrough(new TextDecoderStream()); const reader = textStream.getReader(); @@ -361,13 +367,11 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { this.logger.error("[RealtimeStreams][ingestData] Error in ingestData:", { error }); return new Response(null, { status: 500 }); - } finally { - await cleanup(); } } async appendPart(part: string, partId: string, runId: string, streamId: string): Promise { - const redis = new Redis(this.options.redis ?? {}); + const redis = this.sharedRedis; const streamKey = `stream:${runId}:${streamId}`; await redis.xadd( @@ -386,12 +390,10 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { // Set TTL for cleanup when stream is done await redis.expire(streamKey, env.REALTIME_STREAM_TTL); - - await redis.quit(); } async getLastChunkIndex(runId: string, streamId: string, clientId: string): Promise { - const redis = new Redis(this.options.redis ?? {}); + const redis = this.sharedRedis; const streamKey = `stream:${runId}:${streamId}`; try { @@ -460,10 +462,6 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { }); // Return -1 to indicate we don't know what the server has return -1; - } finally { - await redis.quit().catch((err) => { - this.logger.error("[RedisRealtimeStreams][getLastChunkIndex] Error in cleanup:", { err }); - }); } } diff --git a/apps/webapp/app/utils/sse.server.ts b/apps/webapp/app/utils/sse.server.ts index 56e7b191af..c8ecce4a85 100644 --- a/apps/webapp/app/utils/sse.server.ts +++ b/apps/webapp/app/utils/sse.server.ts @@ -1,5 +1,6 @@ import { eventStream } from "remix-utils/sse/server"; import { env } from "~/env.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { logger } from "~/services/logger.server"; type SseProps = { @@ -22,6 +23,8 @@ export function sse({ request, pingInterval = 1000, updateInterval = 348, run }: return new Response("SSE disabled", { status: 200 }); } + const signal = getRequestAbortSignal(); + let pinger: NodeJS.Timeout | undefined = undefined; let updater: NodeJS.Timeout | undefined = undefined; let timeout: NodeJS.Timeout | undefined = undefined; @@ -32,7 +35,7 @@ export function sse({ request, pingInterval = 1000, updateInterval = 348, run }: clearTimeout(timeout); }; - return eventStream(request.signal, (send, close) => { + return eventStream(signal, (send, close) => { const safeSend = (args: { event?: string; data: string }) => { try { send(args); @@ -60,7 +63,7 @@ export function sse({ request, pingInterval = 1000, updateInterval = 348, run }: }; pinger = setInterval(() => { - if (request.signal.aborted) { + if (signal.aborted) { return abort(); } @@ -68,7 +71,7 @@ export function sse({ request, pingInterval = 1000, updateInterval = 348, run }: }, pingInterval); updater = setInterval(() => { - if (request.signal.aborted) { + if (signal.aborted) { return abort(); } diff --git a/apps/webapp/app/utils/sse.ts b/apps/webapp/app/utils/sse.ts index 8f396c092e..f48cc9e31f 100644 --- a/apps/webapp/app/utils/sse.ts +++ b/apps/webapp/app/utils/sse.ts @@ -2,6 +2,7 @@ import { type LoaderFunctionArgs } from "@remix-run/node"; import { type Params } from "@remix-run/router"; import { eventStream } from "remix-utils/sse/server"; import { setInterval } from "timers/promises"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; export type SendFunction = Parameters[1]>[0]; @@ -89,15 +90,17 @@ export function createSSELoader(options: SSEOptions) { throw new Response("Internal Server Error", { status: 500 }); }); + const requestAbortSignal = getRequestAbortSignal(); + const combinedSignal = AbortSignal.any([ - request.signal, + requestAbortSignal, timeoutSignal, internalController.signal, ]); log("Start"); - request.signal.addEventListener( + requestAbortSignal.addEventListener( "abort", () => { log(`request signal aborted`); diff --git a/apps/webapp/remix.config.js b/apps/webapp/remix.config.js index ae2f18cd72..a4ad1bd228 100644 --- a/apps/webapp/remix.config.js +++ b/apps/webapp/remix.config.js @@ -30,6 +30,7 @@ module.exports = { "redlock", "parse-duration", "uncrypto", + "std-env", ], browserNodeBuiltinsPolyfill: { modules: { diff --git a/apps/webapp/server.ts b/apps/webapp/server.ts index b2cc938733..e266c6985c 100644 --- a/apps/webapp/server.ts +++ b/apps/webapp/server.ts @@ -145,9 +145,11 @@ if (ENABLE_CLUSTER && cluster.isPrimary) { app.use((req, res, next) => { // Generate a unique request ID for each request const requestId = nanoid(); + const abortController = new AbortController(); + res.on("close", () => abortController.abort()); runWithHttpContext( - { requestId, path: req.url, host: req.hostname, method: req.method }, + { requestId, path: req.url, host: req.hostname, method: req.method, abortController }, next ); });