diff --git a/apps/code/src/main/di/container.ts b/apps/code/src/main/di/container.ts index 959ea1431..d2bfb2303 100644 --- a/apps/code/src/main/di/container.ts +++ b/apps/code/src/main/di/container.ts @@ -53,6 +53,7 @@ import { McpCallbackService } from "../services/mcp-callback/service"; import { McpProxyService } from "../services/mcp-proxy/service"; import { NotificationService } from "../services/notification/service"; import { OAuthService } from "../services/oauth/service"; +import { PostHogCodeInternalMcpService } from "../services/posthog-code-internal-mcp/service"; import { PosthogPluginService } from "../services/posthog-plugin/service"; import { ProcessTrackingService } from "../services/process-tracking/service"; import { ProvisioningService } from "../services/provisioning/service"; @@ -102,6 +103,9 @@ container.bind(MAIN_TOKENS.AgentService).to(AgentService); container.bind(MAIN_TOKENS.AuthService).to(AuthService); container.bind(MAIN_TOKENS.AuthProxyService).to(AuthProxyService); container.bind(MAIN_TOKENS.McpProxyService).to(McpProxyService); +container + .bind(MAIN_TOKENS.PostHogCodeInternalMcpService) + .to(PostHogCodeInternalMcpService); container.bind(MAIN_TOKENS.ArchiveService).to(ArchiveService); container.bind(MAIN_TOKENS.SuspensionService).to(SuspensionService); container.bind(MAIN_TOKENS.AppLifecycleService).to(AppLifecycleService); diff --git a/apps/code/src/main/di/tokens.ts b/apps/code/src/main/di/tokens.ts index c8225b2b1..1189ff300 100644 --- a/apps/code/src/main/di/tokens.ts +++ b/apps/code/src/main/di/tokens.ts @@ -41,6 +41,9 @@ export const MAIN_TOKENS = Object.freeze({ AuthService: Symbol.for("Main.AuthService"), AuthProxyService: Symbol.for("Main.AuthProxyService"), McpProxyService: Symbol.for("Main.McpProxyService"), + PostHogCodeInternalMcpService: Symbol.for( + "Main.PostHogCodeInternalMcpService", + ), ArchiveService: Symbol.for("Main.ArchiveService"), SuspensionService: Symbol.for("Main.SuspensionService"), AppLifecycleService: Symbol.for("Main.AppLifecycleService"), diff --git a/apps/code/src/main/index.ts b/apps/code/src/main/index.ts index 9a9ea2c4e..1c3eeddd9 100644 --- a/apps/code/src/main/index.ts +++ b/apps/code/src/main/index.ts @@ -22,6 +22,7 @@ import { initializePostHog, trackAppEvent, } from "./services/posthog-analytics"; +import type { PostHogCodeInternalMcpService } from "./services/posthog-code-internal-mcp/service"; import type { PosthogPluginService } from "./services/posthog-plugin/service"; import type { SuspensionService } from "./services/suspension/service"; import type { TaskLinkService } from "./services/task-link/service"; @@ -52,6 +53,11 @@ async function initializeServices(): Promise { await authService.initialize(); + const internalMcp = container.get( + MAIN_TOKENS.PostHogCodeInternalMcpService, + ); + await internalMcp.start(); + // Initialize workspace branch watcher for live branch rename detection const workspaceService = container.get( MAIN_TOKENS.WorkspaceService, diff --git a/apps/code/src/main/services/agent/auth-adapter.test.ts b/apps/code/src/main/services/agent/auth-adapter.test.ts index 4d3aaf1ff..dbd4c37cc 100644 --- a/apps/code/src/main/services/agent/auth-adapter.test.ts +++ b/apps/code/src/main/services/agent/auth-adapter.test.ts @@ -58,6 +58,12 @@ function createDependencies() { (id: string) => `http://127.0.0.1:9998/${encodeURIComponent(id)}`, ), }, + internalMcp: { + getUrl: vi.fn().mockReturnValue("http://127.0.0.1:9997/mcp"), + getAuthHeader: vi + .fn() + .mockReturnValue({ name: "authorization", value: "Bearer test" }), + }, }; } @@ -77,6 +83,7 @@ describe("AgentAuthAdapter", () => { deps.authService as never, deps.authProxy as never, deps.mcpProxy as never, + deps.internalMcp as never, ); }); diff --git a/apps/code/src/main/services/agent/auth-adapter.ts b/apps/code/src/main/services/agent/auth-adapter.ts index 1cfa711fe..afc1d618e 100644 --- a/apps/code/src/main/services/agent/auth-adapter.ts +++ b/apps/code/src/main/services/agent/auth-adapter.ts @@ -11,6 +11,7 @@ import { logger } from "../../utils/logger"; import type { AuthService } from "../auth/service"; import type { AuthProxyService } from "../auth-proxy/service"; import type { McpProxyService } from "../mcp-proxy/service"; +import type { PostHogCodeInternalMcpService } from "../posthog-code-internal-mcp/service"; import type { Credentials } from "./schemas"; const log = logger.scope("agent-auth-adapter"); @@ -63,6 +64,8 @@ export class AgentAuthAdapter { private readonly authProxy: AuthProxyService, @inject(MAIN_TOKENS.McpProxyService) private readonly mcpProxy: McpProxyService, + @inject(MAIN_TOKENS.PostHogCodeInternalMcpService) + private readonly internalMcp: PostHogCodeInternalMcpService, ) {} createPosthogConfig(credentials: Credentials): AgentPosthogConfig { @@ -102,6 +105,19 @@ export class AgentAuthAdapter { ], }); + try { + servers.push({ + name: "posthog-code-internal", + type: "http", + url: this.internalMcp.getUrl(), + headers: [this.internalMcp.getAuthHeader()], + }); + } catch (err) { + // Service should always be running by the time the agent starts a task, + // but don't take down the whole MCP config if it isn't. + log.warn("posthog-code-internal MCP not available", { error: err }); + } + const installations = await this.fetchMcpInstallations(credentials); for (const installation of installations) { diff --git a/apps/code/src/main/services/agent/service.test.ts b/apps/code/src/main/services/agent/service.test.ts index 6dcab002c..1d1489e4c 100644 --- a/apps/code/src/main/services/agent/service.test.ts +++ b/apps/code/src/main/services/agent/service.test.ts @@ -190,6 +190,9 @@ function createMockDependencies() { appDataPath: "/mock/userData", logsPath: "/mock/logs", }, + internalMcp: { + on: vi.fn(), + }, }; } @@ -220,6 +223,7 @@ describe("AgentService", () => { deps.bundledResources as never, deps.appMeta as never, deps.storagePaths as never, + deps.internalMcp as never, ); }); diff --git a/apps/code/src/main/services/agent/service.ts b/apps/code/src/main/services/agent/service.ts index af80ec302..bc27e757e 100644 --- a/apps/code/src/main/services/agent/service.ts +++ b/apps/code/src/main/services/agent/service.ts @@ -15,6 +15,7 @@ import { import { isMcpToolReadOnly, isNotification, + POSTHOG_METHODS, POSTHOG_NOTIFICATIONS, } from "@posthog/agent"; import type { McpToolApprovals } from "@posthog/agent/adapters/claude/mcp/tool-metadata"; @@ -50,6 +51,8 @@ import { logger } from "../../utils/logger"; import { TypedEventEmitter } from "../../utils/typed-event-emitter"; import type { FsService } from "../fs/service"; import type { McpAppsService } from "../mcp-apps/service"; +import { PostHogCodeInternalMcpEvent } from "../posthog-code-internal-mcp/schemas"; +import type { PostHogCodeInternalMcpService } from "../posthog-code-internal-mcp/service"; import type { PosthogPluginService } from "../posthog-plugin/service"; import type { ProcessTrackingService } from "../process-tracking/service"; import type { SleepService } from "../sleep/service"; @@ -247,6 +250,8 @@ interface ManagedSession { mcpToolApprovals: McpToolApprovals; /** Maps tool keys to their installation for backend approval updates */ toolInstallations: McpToolInstallations; + /** Set when an MCP server is installed mid-turn; refresh runs after the turn ends. */ + pendingMcpRefresh: boolean; } /** Get the agent session ID from a managed session, throwing if not set. */ @@ -304,6 +309,8 @@ export class AgentService extends TypedEventEmitter { private readonly appMeta: IAppMeta, @inject(MAIN_TOKENS.StoragePaths) private readonly storagePaths: IStoragePaths, + @inject(MAIN_TOKENS.PostHogCodeInternalMcpService) + internalMcp: PostHogCodeInternalMcpService, ) { super(); this.processTracking = processTracking; @@ -314,6 +321,9 @@ export class AgentService extends TypedEventEmitter { this.mcpAppsService = mcpAppsService; powerManager.onResume(() => this.checkIdleDeadlines()); + internalMcp.on(PostHogCodeInternalMcpEvent.McpServerInstalled, () => { + void this.refreshAllSessionMcpServers(); + }); } private getClaudeCliPath(): string { @@ -395,6 +405,46 @@ export class AgentService extends TypedEventEmitter { this.recordActivity(taskRunId); } + private async refreshSessionMcpServers( + session: ManagedSession, + ): Promise { + try { + const { servers } = await this.agentAuthAdapter.buildMcpServers( + session.config.credentials, + ); + await session.clientSideConnection.extMethod( + POSTHOG_METHODS.REFRESH_SESSION, + { mcpServers: servers }, + ); + log.info("Refreshed MCP servers for session", { + taskRunId: session.taskRunId, + serverCount: servers.length, + }); + } catch (err) { + log.warn("Failed to refresh MCP servers for session", { + taskRunId: session.taskRunId, + err, + }); + } + } + + private async refreshAllSessionMcpServers(): Promise { + const refreshable: ManagedSession[] = []; + for (const session of this.sessions.values()) { + if (session.promptPending) { + // ACP refresh contract requires no prompt in flight; defer until the + // turn completes (see prompt() finally block). + session.pendingMcpRefresh = true; + log.info("Deferring MCP refresh until current turn ends", { + taskRunId: session.taskRunId, + }); + continue; + } + refreshable.push(session); + } + await Promise.all(refreshable.map((s) => this.refreshSessionMcpServers(s))); + } + /** * Check if any sessions are currently active (i.e. have a prompt pending). */ @@ -797,6 +847,7 @@ When creating pull requests, add the following footer at the end of the PR descr inFlightMcpToolCalls: new Map(), mcpToolApprovals: toolApprovals, toolInstallations, + pendingMcpRefresh: false, }; this.sessions.set(taskRunId, session); @@ -885,6 +936,11 @@ When creating pull requests, add the following footer at the end of the PR descr this.recordActivity(sessionId); this.sleepService.release(sessionId); + if (session.pendingMcpRefresh) { + session.pendingMcpRefresh = false; + void this.refreshSessionMcpServers(session); + } + if (!this.hasActiveSessions()) { this.emit(AgentServiceEvent.SessionsIdle, undefined); } diff --git a/apps/code/src/main/services/posthog-code-internal-mcp/schemas.ts b/apps/code/src/main/services/posthog-code-internal-mcp/schemas.ts new file mode 100644 index 000000000..8eb7c6b42 --- /dev/null +++ b/apps/code/src/main/services/posthog-code-internal-mcp/schemas.ts @@ -0,0 +1,19 @@ +import { z } from "zod"; + +export const customInstructionsChanged = z.object({ + customInstructions: z.string(), +}); + +export type CustomInstructionsChanged = z.infer< + typeof customInstructionsChanged +>; + +export const PostHogCodeInternalMcpEvent = { + CustomInstructionsChanged: "custom-instructions-changed", + McpServerInstalled: "mcp-server-installed", +} as const; + +export interface PostHogCodeInternalMcpEvents { + [PostHogCodeInternalMcpEvent.CustomInstructionsChanged]: CustomInstructionsChanged; + [PostHogCodeInternalMcpEvent.McpServerInstalled]: Record; +} diff --git a/apps/code/src/main/services/posthog-code-internal-mcp/service.test.ts b/apps/code/src/main/services/posthog-code-internal-mcp/service.test.ts new file mode 100644 index 000000000..cbad14864 --- /dev/null +++ b/apps/code/src/main/services/posthog-code-internal-mcp/service.test.ts @@ -0,0 +1,105 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +// electron-store mkdir's userDataDir at import time, which fails in CI where +// the default mocked path (/mock/userData) isn't writable. The tests below +// don't exercise the store paths, so a no-op mock is safe. +vi.mock("../../utils/store", () => ({ + rendererStore: { + has: () => false, + get: () => undefined, + set: () => {}, + }, +})); + +import { PostHogCodeInternalMcpEvent } from "./schemas"; +import { PostHogCodeInternalMcpService } from "./service"; + +interface FakeAuthService { + getValidAccessToken: () => Promise<{ apiHost: string; token: string }>; + getState: () => { projectId: number }; + authenticatedFetch: ( + fetchImpl: typeof fetch, + url: string, + init?: RequestInit, + ) => Promise; +} + +const createFakeAuth = ( + fetchImpl: (url: string) => Promise, +): FakeAuthService => ({ + getValidAccessToken: async () => ({ + apiHost: "https://example.com", + token: "t", + }), + getState: () => ({ projectId: 1 }), + authenticatedFetch: async (_f, url) => fetchImpl(String(url)), +}); + +const okJson = (body: unknown): Response => + new Response(JSON.stringify(body), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + +describe("PostHogCodeInternalMcpService.pollForOauthCompletion", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + afterEach(() => { + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + it("emits McpServerInstalled when pending_oauth flips to false", async () => { + const responses = [ + okJson({ + results: [ + { id: "abc", name: "linear", pending_oauth: true, is_enabled: true }, + ], + }), + okJson({ + results: [ + { id: "abc", name: "linear", pending_oauth: false, is_enabled: true }, + ], + }), + ]; + const auth = createFakeAuth(async () => { + const next = responses.shift(); + if (!next) throw new Error("no more responses"); + return next; + }); + const service = new PostHogCodeInternalMcpService(auth as never); + const handler = vi.fn(); + service.on(PostHogCodeInternalMcpEvent.McpServerInstalled, handler); + + const poll = ( + service as unknown as { + pollForOauthCompletion: (id: string, name: string) => Promise; + } + ).pollForOauthCompletion("abc", "linear"); + + await vi.advanceTimersByTimeAsync(3500); + await vi.advanceTimersByTimeAsync(3500); + await poll; + + expect(handler).toHaveBeenCalledOnce(); + }); + + it("stops polling when installation disappears", async () => { + const auth = createFakeAuth(async () => okJson({ results: [] })); + const service = new PostHogCodeInternalMcpService(auth as never); + const handler = vi.fn(); + service.on(PostHogCodeInternalMcpEvent.McpServerInstalled, handler); + + const poll = ( + service as unknown as { + pollForOauthCompletion: (id: string, name: string) => Promise; + } + ).pollForOauthCompletion("abc", "linear"); + + await vi.advanceTimersByTimeAsync(3500); + await poll; + + expect(handler).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/code/src/main/services/posthog-code-internal-mcp/service.ts b/apps/code/src/main/services/posthog-code-internal-mcp/service.ts new file mode 100644 index 000000000..ea4a43f26 --- /dev/null +++ b/apps/code/src/main/services/posthog-code-internal-mcp/service.ts @@ -0,0 +1,447 @@ +import { randomBytes } from "node:crypto"; +import http from "node:http"; +import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; +import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; +import type { CallToolResult } from "@modelcontextprotocol/sdk/types.js"; +import { inject, injectable, preDestroy } from "inversify"; +import { z } from "zod"; +import { MAIN_TOKENS } from "../../di/tokens"; +import { decrypt, encrypt } from "../../utils/encryption"; +import { logger } from "../../utils/logger"; +import { rendererStore } from "../../utils/store"; +import { TypedEventEmitter } from "../../utils/typed-event-emitter"; +import type { AuthService } from "../auth/service"; +import { + PostHogCodeInternalMcpEvent, + type PostHogCodeInternalMcpEvents, +} from "./schemas"; + +const log = logger.scope("posthog-code-internal-mcp"); + +const SETTINGS_STORE_KEY = "settings-storage"; +const SERVER_NAME = "posthog-code-internal"; +const SERVER_VERSION = "1.0.0"; +const OAUTH_POLL_INTERVAL_MS = 3000; +const OAUTH_POLL_TIMEOUT_MS = 10 * 60 * 1000; + +/** + * Local-only HTTP MCP server that exposes a few self-modification tools to + * the running agent: read/write the user's custom instructions, and + * list/add MCP server installations on the active project. + * + * Mirrors {@link McpProxyService}: listens on 127.0.0.1, generates a random + * bearer token at boot, and dies with the app via @preDestroy. + */ +@injectable() +export class PostHogCodeInternalMcpService extends TypedEventEmitter { + private server: http.Server | null = null; + private port: number | null = null; + private bearerToken: string | null = null; + private startPromise: Promise | null = null; + + constructor( + @inject(MAIN_TOKENS.AuthService) + private readonly authService: AuthService, + ) { + super(); + } + + async start(): Promise { + if (this.server && this.port) return; + if (this.startPromise) return this.startPromise; + this.startPromise = this.doStart().catch((err) => { + this.startPromise = null; + throw err; + }); + return this.startPromise; + } + + @preDestroy() + async stop(): Promise { + if (!this.server) return; + const server = this.server; + await new Promise((resolve) => { + server.close(() => { + log.info("PostHog Code internal MCP stopped"); + resolve(); + }); + }); + this.server = null; + this.port = null; + this.bearerToken = null; + this.startPromise = null; + } + + getUrl(): string { + if (!this.port) { + throw new Error("posthog-code-internal MCP server not started"); + } + return `http://127.0.0.1:${this.port}/mcp`; + } + + getAuthHeader(): { name: string; value: string } { + if (!this.bearerToken) { + throw new Error("posthog-code-internal MCP server not started"); + } + return { name: "authorization", value: `Bearer ${this.bearerToken}` }; + } + + private async doStart(): Promise { + this.bearerToken = randomBytes(32).toString("hex"); + + const server = http.createServer((req, res) => { + void this.handleRequest(req, res); + }); + this.server = server; + + await new Promise((resolve, reject) => { + server.listen(0, "127.0.0.1", () => { + const addr = server.address(); + if (typeof addr === "object" && addr) { + this.port = addr.port; + log.info("PostHog Code internal MCP started", { port: this.port }); + resolve(); + } else { + reject(new Error("Failed to get internal MCP address")); + } + }); + server.on("error", (err) => { + log.error("Internal MCP server error", err); + reject(err); + }); + }); + } + + private async handleRequest( + req: http.IncomingMessage, + res: http.ServerResponse, + ): Promise { + const auth = req.headers.authorization; + if (!auth || auth !== `Bearer ${this.bearerToken}`) { + res.writeHead(401).end("Unauthorized"); + return; + } + + let mcpServer: McpServer | null = null; + let transport: StreamableHTTPServerTransport | null = null; + try { + // Stateless per-request: each HTTP request gets a fresh server + + // transport. Avoids cross-request session state and matches the SDK's + // documented stateless pattern. + mcpServer = this.buildServer(); + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: undefined, + }); + const owned = { mcpServer, transport }; + res.on("close", () => { + try { + owned.transport.close(); + } catch {} + try { + owned.mcpServer.close(); + } catch {} + }); + await mcpServer.connect(transport); + await transport.handleRequest(req, res); + } catch (err) { + log.error("Internal MCP request error", err); + try { + transport?.close(); + } catch {} + try { + mcpServer?.close(); + } catch {} + if (!res.headersSent) { + res.writeHead(500).end("Internal error"); + } else { + res.end(); + } + } + } + + private buildServer(): McpServer { + const server = new McpServer( + { name: SERVER_NAME, version: SERVER_VERSION }, + { capabilities: { tools: {} } }, + ); + + server.tool( + "read_custom_instructions", + "Read the user's custom instructions — extra guidance the user has appended to every agent prompt. Returns the raw text (empty string if unset).", + async () => { + const text = this.readCustomInstructions(); + return { + content: [ + { + type: "text", + text: + text === "" + ? "(empty — no custom instructions are currently configured)" + : text, + }, + ], + }; + }, + ); + + server.tool( + "write_custom_instructions", + "Replace the user's custom instructions. Pass the full new text — this overwrites the existing value. Pass an empty string to clear.", + { instructions: z.string() }, + async ({ instructions }) => { + this.writeCustomInstructions(instructions); + this.emit(PostHogCodeInternalMcpEvent.CustomInstructionsChanged, { + customInstructions: instructions, + }); + return { + content: [{ type: "text", text: "Custom instructions updated." }], + }; + }, + ); + + server.tool( + "list_mcp_servers", + "List the MCP server installations available to the agent in the current project. Returns an array with id, name, url, auth_type, and status flags.", + async () => this.listMcpServers(), + ); + + server.tool( + "add_mcp_server", + 'Install a new MCP server on the current project. Use auth_type="api_key" to attach a static bearer token (provide api_key); use auth_type="oauth" to start an OAuth flow — the response will include a redirect URL the user must visit. Omit api_key for servers that require no authentication.', + { + name: z.string().min(1), + url: z.string().url(), + auth_type: z.enum(["api_key", "oauth"]).default("api_key"), + api_key: z.string().optional(), + description: z.string().optional(), + }, + async (args) => this.addMcpServer(args), + ); + + return server; + } + + private readCustomInstructions(): string { + if (!rendererStore.has(SETTINGS_STORE_KEY)) return ""; + const encrypted = rendererStore.get(SETTINGS_STORE_KEY) as string; + const raw = decrypt(encrypted); + if (!raw) return ""; + try { + const parsed = JSON.parse(raw) as { + state?: { customInstructions?: string }; + }; + return parsed.state?.customInstructions ?? ""; + } catch (err) { + log.warn("Failed to parse settings-storage", { err }); + return ""; + } + } + + private writeCustomInstructions(value: string): void { + let parsed: { state?: Record; version?: number } = { + state: {}, + version: 0, + }; + if (rendererStore.has(SETTINGS_STORE_KEY)) { + const encrypted = rendererStore.get(SETTINGS_STORE_KEY) as string; + const raw = decrypt(encrypted); + if (raw) { + try { + parsed = JSON.parse(raw); + } catch (err) { + log.warn("Settings store corrupted, overwriting with new state", { + err, + }); + } + } + } + parsed.state = { ...(parsed.state ?? {}), customInstructions: value }; + rendererStore.set(SETTINGS_STORE_KEY, encrypt(JSON.stringify(parsed))); + } + + private async listMcpServers(): Promise { + const { apiHost } = await this.authService.getValidAccessToken(); + const projectId = this.authService.getState().projectId; + if (!projectId) { + return { + isError: true, + content: [ + { + type: "text", + text: "No project selected. Sign in and pick a project before listing MCP servers.", + }, + ], + }; + } + const baseUrl = apiHost.replace(/\/+$/, ""); + const url = `${baseUrl}/api/environments/${projectId}/mcp_server_installations/`; + const response = await this.authService.authenticatedFetch(fetch, url, { + headers: { "Content-Type": "application/json" }, + }); + if (!response.ok) { + const errText = await response.text().catch(() => ""); + return { + isError: true, + content: [ + { + type: "text", + text: `Failed to list MCP servers (${response.status}): ${errText.slice(0, 500)}`, + }, + ], + }; + } + const data = (await response.json()) as { + results?: Array<{ + id: string; + name?: string; + display_name?: string; + url?: string; + auth_type?: string; + is_enabled?: boolean; + pending_oauth?: boolean; + needs_reauth?: boolean; + }>; + }; + const servers = (data.results ?? []).map((i) => ({ + id: i.id, + name: i.name ?? i.display_name ?? "(unnamed)", + url: i.url ?? "", + auth_type: i.auth_type ?? "unknown", + is_enabled: i.is_enabled !== false, + pending_oauth: !!i.pending_oauth, + needs_reauth: !!i.needs_reauth, + })); + return { + content: [{ type: "text", text: JSON.stringify(servers, null, 2) }], + }; + } + + private async addMcpServer(input: { + name: string; + url: string; + auth_type: "api_key" | "oauth"; + api_key?: string; + description?: string; + }): Promise { + const { apiHost } = await this.authService.getValidAccessToken(); + const projectId = this.authService.getState().projectId; + if (!projectId) { + return { + isError: true, + content: [ + { + type: "text", + text: "No project selected. Sign in and pick a project before adding an MCP server.", + }, + ], + }; + } + const baseUrl = apiHost.replace(/\/+$/, ""); + const url = `${baseUrl}/api/environments/${projectId}/mcp_server_installations/install_custom/`; + const response = await this.authService.authenticatedFetch(fetch, url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + name: input.name, + url: input.url, + auth_type: input.auth_type, + api_key: input.api_key, + description: input.description, + install_source: "posthog-code", + }), + }); + if (!response.ok) { + const errText = await response.text().catch(() => ""); + return { + isError: true, + content: [ + { + type: "text", + text: `Failed to install MCP server (${response.status}): ${errText.slice(0, 500)}`, + }, + ], + }; + } + const data = (await response.json()) as Record; + if (typeof data.redirect_url === "string") { + const installationId = typeof data.id === "string" ? data.id : undefined; + void this.pollForOauthCompletion(installationId, input.name); + return { + content: [ + { + type: "text", + text: `OAuth flow required. The user must visit: ${data.redirect_url} to finish installing "${input.name}". Once authorized, the session will refresh automatically.`, + }, + ], + }; + } + this.emit(PostHogCodeInternalMcpEvent.McpServerInstalled, {}); + return { + content: [ + { + type: "text", + text: `Installed MCP server "${input.name}" (id=${String(data.id ?? "unknown")}). Refreshing session to make it available immediately.`, + }, + ], + }; + } + + private async pollForOauthCompletion( + installationId: string | undefined, + name: string, + ): Promise { + const { apiHost } = await this.authService.getValidAccessToken(); + const projectId = this.authService.getState().projectId; + if (!projectId) return; + const baseUrl = apiHost.replace(/\/+$/, ""); + const url = `${baseUrl}/api/environments/${projectId}/mcp_server_installations/`; + + log.info("Polling for OAuth completion", { installationId, name }); + + const start = Date.now(); + while (Date.now() - start < OAUTH_POLL_TIMEOUT_MS) { + await new Promise((resolve) => + setTimeout(resolve, OAUTH_POLL_INTERVAL_MS), + ); + + try { + const response = await this.authService.authenticatedFetch(fetch, url, { + headers: { "Content-Type": "application/json" }, + }); + if (!response.ok) continue; + const data = (await response.json()) as { + results?: Array<{ + id: string; + name?: string; + display_name?: string; + pending_oauth?: boolean; + is_enabled?: boolean; + }>; + }; + const inst = (data.results ?? []).find((i) => + installationId + ? i.id === installationId + : i.name === name || i.display_name === name, + ); + if (!inst) { + log.info("OAuth installation no longer in list, stopping poll", { + installationId, + name, + }); + return; + } + if (!inst.pending_oauth && inst.is_enabled !== false) { + log.info("OAuth install completed, triggering session refresh", { + installationId: inst.id, + name, + }); + this.emit(PostHogCodeInternalMcpEvent.McpServerInstalled, {}); + return; + } + } catch (err) { + log.warn("OAuth poll error", { err }); + } + } + log.info("OAuth poll timed out", { installationId, name }); + } +} diff --git a/apps/code/src/main/trpc/router.ts b/apps/code/src/main/trpc/router.ts index 75a5c85c2..16b3db3fc 100644 --- a/apps/code/src/main/trpc/router.ts +++ b/apps/code/src/main/trpc/router.ts @@ -25,6 +25,7 @@ import { mcpCallbackRouter } from "./routers/mcp-callback"; import { notificationRouter } from "./routers/notification"; import { oauthRouter } from "./routers/oauth"; import { osRouter } from "./routers/os"; +import { posthogCodeInternalMcpRouter } from "./routers/posthog-code-internal-mcp"; import { processTrackingRouter } from "./routers/process-tracking"; import { provisioningRouter } from "./routers/provisioning"; import { secureStoreRouter } from "./routers/secure-store"; @@ -65,6 +66,7 @@ export const trpcRouter = router({ oauth: oauthRouter, logs: logsRouter, os: osRouter, + posthogCodeInternalMcp: posthogCodeInternalMcpRouter, processTracking: processTrackingRouter, provisioning: provisioningRouter, sleep: sleepRouter, diff --git a/apps/code/src/main/trpc/routers/posthog-code-internal-mcp.ts b/apps/code/src/main/trpc/routers/posthog-code-internal-mcp.ts new file mode 100644 index 000000000..479c61931 --- /dev/null +++ b/apps/code/src/main/trpc/routers/posthog-code-internal-mcp.ts @@ -0,0 +1,24 @@ +import { container } from "../../di/container"; +import { MAIN_TOKENS } from "../../di/tokens"; +import { PostHogCodeInternalMcpEvent } from "../../services/posthog-code-internal-mcp/schemas"; +import type { PostHogCodeInternalMcpService } from "../../services/posthog-code-internal-mcp/service"; +import { publicProcedure, router } from "../trpc"; + +const getService = () => + container.get( + MAIN_TOKENS.PostHogCodeInternalMcpService, + ); + +export const posthogCodeInternalMcpRouter = router({ + onCustomInstructionsChanged: publicProcedure.subscription( + async function* (opts) { + const service = getService(); + for await (const data of service.toIterable( + PostHogCodeInternalMcpEvent.CustomInstructionsChanged, + { signal: opts.signal }, + )) { + yield data; + } + }, + ), +}); diff --git a/apps/code/src/renderer/App.tsx b/apps/code/src/renderer/App.tsx index 15babd5de..31ae0b42d 100644 --- a/apps/code/src/renderer/App.tsx +++ b/apps/code/src/renderer/App.tsx @@ -13,6 +13,7 @@ import { import { useAuthSession } from "@features/auth/hooks/useAuthSession"; import { OnboardingFlow } from "@features/onboarding/components/OnboardingFlow"; import { useOnboardingStore } from "@features/onboarding/stores/onboardingStore"; +import { initializeSettingsStore } from "@features/settings/stores/settingsStore"; import { Flex, Spinner, Text } from "@radix-ui/themes"; import { initializeConnectivityToast } from "@renderer/features/connectivity/connectivityToast"; import { initializeConnectivityStore } from "@renderer/stores/connectivityStore"; @@ -67,6 +68,11 @@ function App() { return initializeUpdateStore(); }, []); + // Sync settings store when the internal MCP server writes custom instructions + useEffect(() => { + return initializeSettingsStore(); + }, []); + // Dev-only inbox demo command for local QA from the renderer console. useEffect(() => { if (import.meta.env.PROD) { diff --git a/apps/code/src/renderer/features/settings/stores/settingsStore.ts b/apps/code/src/renderer/features/settings/stores/settingsStore.ts index 963b9cfa9..c695fb3bc 100644 --- a/apps/code/src/renderer/features/settings/stores/settingsStore.ts +++ b/apps/code/src/renderer/features/settings/stores/settingsStore.ts @@ -1,9 +1,13 @@ import type { WorkspaceMode } from "@main/services/workspace/schemas"; +import { trpcClient } from "@renderer/trpc/client"; import type { ExecutionMode } from "@shared/types"; import { electronStorage } from "@utils/electronStorage"; +import { logger } from "@utils/logger"; import { create } from "zustand"; import { persist } from "zustand/middleware"; +const log = logger.scope("settings-store"); + export type DefaultRunMode = "local" | "cloud" | "last_used"; export type LocalWorkspaceMode = "worktree" | "local"; export type SendMessagesWith = "enter" | "cmd+enter"; @@ -246,3 +250,24 @@ export const useSettingsStore = create()( }, ), ); + +/** + * Subscribe to custom-instructions writes coming from the agent's internal + * MCP server, so the in-memory store stays in sync after the persisted bucket + * is rewritten in the main process. + */ +export function initializeSettingsStore(): () => void { + const subscription = + trpcClient.posthogCodeInternalMcp.onCustomInstructionsChanged.subscribe( + undefined, + { + onData: ({ customInstructions }) => { + useSettingsStore.setState({ customInstructions }); + }, + onError: (error) => { + log.error("Custom instructions subscription error", { error }); + }, + }, + ); + return () => subscription.unsubscribe(); +} diff --git a/packages/agent/src/index.ts b/packages/agent/src/index.ts index 05dc8bae5..bf5b02bc9 100644 --- a/packages/agent/src/index.ts +++ b/packages/agent/src/index.ts @@ -1,4 +1,8 @@ -export { isNotification, POSTHOG_NOTIFICATIONS } from "./acp-extensions"; +export { + isNotification, + POSTHOG_METHODS, + POSTHOG_NOTIFICATIONS, +} from "./acp-extensions"; export { getMcpToolMetadata, isMcpToolReadOnly,