From 95e2956f149fbf778afd9a449f229c654ea986fe Mon Sep 17 00:00:00 2001 From: Ammar Date: Sat, 20 Dec 2025 12:04:17 -0600 Subject: [PATCH 1/3] =?UTF-8?q?=F0=9F=A4=96=20fix:=20retry=20workspace=20o?= =?UTF-8?q?nChat=20subscription=20after=20disconnect?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/browser/stores/WorkspaceStore.ts | 198 ++++++++++++++++++++------- 1 file changed, 148 insertions(+), 50 deletions(-) diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index f472790e5..09e5f9e8f 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -1108,6 +1108,131 @@ export class WorkspaceStore { } } + private sleepWithAbort(timeoutMs: number, signal: AbortSignal): Promise { + return new Promise((resolve) => { + if (signal.aborted) { + resolve(); + return; + } + + const timeout = setTimeout(() => { + resolve(); + }, timeoutMs); + + signal.addEventListener( + "abort", + () => { + clearTimeout(timeout); + resolve(); + }, + { once: true } + ); + }); + } + + /** + * Reset derived UI state for a workspace so a fresh onChat replay can rebuild it. + * + * This is used when an onChat subscription ends unexpectedly (MessagePort/WebSocket hiccup). + * Without clearing, replayed history would be merged into stale state (loadHistoricalMessages + * only adds/overwrites, it doesn't delete messages that disappeared due to compaction/truncation). + */ + private prepareForChatReplay(workspaceId: string): void { + const aggregator = this.aggregators.get(workspaceId); + if (!aggregator) { + return; + } + + // Clear any pending UI bumps from deltas - we're about to rebuild the message list. + this.cancelPendingIdleBump(workspaceId); + + aggregator.clear(); + + // Clear non-persisted UI state that won't be re-emitted during replay. + this.queuedMessages.set(workspaceId, null); + this.liveBashOutput.delete(workspaceId); + + // Reset replay buffers so we batch-load the next replay. + this.caughtUp.set(workspaceId, false); + this.historicalMessages.set(workspaceId, []); + this.pendingStreamEvents.set(workspaceId, []); + this.replayingHistory.delete(workspaceId); + + this.states.bump(workspaceId); + this.checkAndBumpRecencyIfChanged(); + } + + /** + * Subscribe to workspace chat events (history replay + live streaming). + * Retries on unexpected iterator termination to avoid requiring a full app restart. + */ + private async runOnChatSubscription(workspaceId: string, signal: AbortSignal): Promise { + let attempt = 0; + + while (!signal.aborted) { + try { + const client = this.client; + if (!client) { + // Wait for a client to be attached (e.g., initial connect or reconnect). + await this.sleepWithAbort(250, signal); + continue; + } + + const iterator = await client.workspace.onChat({ workspaceId }, { signal }); + + for await (const data of iterator) { + if (signal.aborted) { + return; + } + + queueMicrotask(() => { + this.handleChatMessage(workspaceId, data); + }); + } + + // Iterator ended without an abort - treat as unexpected and retry. + if (signal.aborted) { + return; + } + + console.warn( + `[WorkspaceStore] onChat subscription ended unexpectedly for ${workspaceId}; retrying...` + ); + } catch (error) { + // Suppress errors when subscription was intentionally cleaned up + if (signal.aborted) { + return; + } + + // EVENT_ITERATOR_VALIDATION_FAILED with ErrorEvent cause happens when: + // - The workspace was removed on server side (iterator ends with error) + // - Connection dropped (WebSocket/MessagePort error) + // Only suppress if workspace no longer exists (was removed during the race) + const isIteratorError = + error instanceof Error && + "code" in error && + error.code === "EVENT_ITERATOR_VALIDATION_FAILED"; + const workspaceRemoved = !this.states.has(workspaceId); + if (isIteratorError && workspaceRemoved) { + return; + } + + console.error(`[WorkspaceStore] Error in onChat subscription for ${workspaceId}:`, error); + } + + attempt++; + + // Exponential backoff (kept relatively slow to avoid churn in tests / rapid loops). + const delayMs = Math.min(250 * 2 ** (attempt - 1), 5000); + await this.sleepWithAbort(delayMs, signal); + if (signal.aborted) { + return; + } + + this.prepareForChatReplay(workspaceId); + } + } + /** * Add a workspace and subscribe to its IPC events. */ @@ -1151,59 +1276,32 @@ export class WorkspaceStore { // Subscribe to IPC events // Wrap in queueMicrotask to ensure IPC events don't update during React render - if (this.client) { - const controller = new AbortController(); - const { signal } = controller; - - // Fire and forget the async loop - (async () => { - try { - const iterator = await this.client!.workspace.onChat({ workspaceId }, { signal }); - - for await (const data of iterator) { - if (signal.aborted) break; - queueMicrotask(() => { - this.handleChatMessage(workspaceId, data); - }); - } - } catch (error) { - // Suppress errors when subscription was intentionally cleaned up - if (signal.aborted) return; - - // EVENT_ITERATOR_VALIDATION_FAILED with ErrorEvent cause happens when: - // - The workspace was removed on server side (iterator ends with error) - // - Connection dropped (WebSocket/MessagePort error) - // Only suppress if workspace no longer exists (was removed during the race) - const isIteratorError = - error instanceof Error && - "code" in error && - error.code === "EVENT_ITERATOR_VALIDATION_FAILED"; - const workspaceRemoved = !this.states.has(workspaceId); - if (isIteratorError && workspaceRemoved) return; - - console.error(`[WorkspaceStore] Error in onChat subscription for ${workspaceId}:`, error); - } - })(); + const controller = new AbortController(); + const { signal } = controller; - this.ipcUnsubscribers.set(workspaceId, () => controller.abort()); + // Fire and forget the subscription loop (retries on errors) + void this.runOnChatSubscription(workspaceId, signal); - // Fetch persisted session usage (fire-and-forget) - this.client.workspace - .getSessionUsage({ workspaceId }) - .then((data) => { - if (data) { - this.sessionUsage.set(workspaceId, data); - this.usageStore.bump(workspaceId); - } - }) - .catch((error) => { - console.warn(`Failed to fetch session usage for ${workspaceId}:`, error); - }); + this.ipcUnsubscribers.set(workspaceId, () => controller.abort()); - if (this.statsEnabled) { - this.subscribeToStats(workspaceId); - } - } else { + // Fetch persisted session usage (fire-and-forget) + this.client?.workspace + .getSessionUsage({ workspaceId }) + .then((data) => { + if (data) { + this.sessionUsage.set(workspaceId, data); + this.usageStore.bump(workspaceId); + } + }) + .catch((error) => { + console.warn(`Failed to fetch session usage for ${workspaceId}:`, error); + }); + + if (this.statsEnabled) { + this.subscribeToStats(workspaceId); + } + + if (!this.client) { console.warn(`[WorkspaceStore] No ORPC client available for workspace ${workspaceId}`); } } From 838e77cc7e95e2878cfc893ad60768ace013f3e6 Mon Sep 17 00:00:00 2001 From: Ammar Date: Sat, 20 Dec 2025 12:35:14 -0600 Subject: [PATCH 2/3] =?UTF-8?q?=F0=9F=A4=96=20refactor:=20simplify=20Works?= =?UTF-8?q?paceStore=20onChat=20retry=20loop?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/browser/stores/WorkspaceStore.ts | 66 ++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 19 deletions(-) diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index 09e5f9e8f..c270612e0 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -219,6 +219,22 @@ export interface WorkspaceConsumersState { isCalculating: boolean; } +const ON_CHAT_RETRY_BASE_MS = 250; +const ON_CHAT_RETRY_MAX_MS = 5000; + +type IteratorValidationFailedError = Error & { code: "EVENT_ITERATOR_VALIDATION_FAILED" }; + +function isIteratorValidationFailed(error: unknown): error is IteratorValidationFailedError { + return ( + error instanceof Error && + (error as { code?: unknown }).code === "EVENT_ITERATOR_VALIDATION_FAILED" + ); +} + +function calculateOnChatBackoffMs(attempt: number): number { + return Math.min(ON_CHAT_RETRY_BASE_MS * 2 ** attempt, ON_CHAT_RETRY_MAX_MS); +} + /** * External store for workspace aggregators and streaming state. * @@ -1130,6 +1146,23 @@ export class WorkspaceStore { }); } + private isWorkspaceSubscribed(workspaceId: string): boolean { + return this.ipcUnsubscribers.has(workspaceId); + } + + private async waitForClient(signal: AbortSignal): Promise | null> { + while (!signal.aborted) { + if (this.client) { + return this.client; + } + + // Wait for a client to be attached (e.g., initial connect or reconnect). + await this.sleepWithAbort(ON_CHAT_RETRY_BASE_MS, signal); + } + + return null; + } + /** * Reset derived UI state for a workspace so a fresh onChat replay can rebuild it. * @@ -1137,7 +1170,7 @@ export class WorkspaceStore { * Without clearing, replayed history would be merged into stale state (loadHistoricalMessages * only adds/overwrites, it doesn't delete messages that disappeared due to compaction/truncation). */ - private prepareForChatReplay(workspaceId: string): void { + private resetChatStateForReplay(workspaceId: string): void { const aggregator = this.aggregators.get(workspaceId); if (!aggregator) { return; @@ -1170,14 +1203,12 @@ export class WorkspaceStore { let attempt = 0; while (!signal.aborted) { - try { - const client = this.client; - if (!client) { - // Wait for a client to be attached (e.g., initial connect or reconnect). - await this.sleepWithAbort(250, signal); - continue; - } + const client = this.client ?? (await this.waitForClient(signal)); + if (!client || signal.aborted) { + return; + } + try { const iterator = await client.workspace.onChat({ workspaceId }, { signal }); for await (const data of iterator) { @@ -1185,6 +1216,9 @@ export class WorkspaceStore { return; } + // Connection is alive again - don't carry old backoff into the next failure. + attempt = 0; + queueMicrotask(() => { this.handleChatMessage(workspaceId, data); }); @@ -1208,28 +1242,22 @@ export class WorkspaceStore { // - The workspace was removed on server side (iterator ends with error) // - Connection dropped (WebSocket/MessagePort error) // Only suppress if workspace no longer exists (was removed during the race) - const isIteratorError = - error instanceof Error && - "code" in error && - error.code === "EVENT_ITERATOR_VALIDATION_FAILED"; - const workspaceRemoved = !this.states.has(workspaceId); - if (isIteratorError && workspaceRemoved) { + if (isIteratorValidationFailed(error) && !this.isWorkspaceSubscribed(workspaceId)) { return; } console.error(`[WorkspaceStore] Error in onChat subscription for ${workspaceId}:`, error); } + const delayMs = calculateOnChatBackoffMs(attempt); attempt++; - // Exponential backoff (kept relatively slow to avoid churn in tests / rapid loops). - const delayMs = Math.min(250 * 2 ** (attempt - 1), 5000); await this.sleepWithAbort(delayMs, signal); if (signal.aborted) { return; } - this.prepareForChatReplay(workspaceId); + this.resetChatStateForReplay(workspaceId); } } @@ -1279,11 +1307,11 @@ export class WorkspaceStore { const controller = new AbortController(); const { signal } = controller; + this.ipcUnsubscribers.set(workspaceId, () => controller.abort()); + // Fire and forget the subscription loop (retries on errors) void this.runOnChatSubscription(workspaceId, signal); - this.ipcUnsubscribers.set(workspaceId, () => controller.abort()); - // Fetch persisted session usage (fire-and-forget) this.client?.workspace .getSessionUsage({ workspaceId }) From 211af7b7be44be36ed59eb2cb63e72f707d1ddcf Mon Sep 17 00:00:00 2001 From: Ammar Date: Sat, 20 Dec 2025 13:18:36 -0600 Subject: [PATCH 3/3] =?UTF-8?q?=F0=9F=A4=96=20refactor:=20centralize=20per?= =?UTF-8?q?-workspace=20transient=20chat=20state?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/browser/stores/WorkspaceStore.ts | 140 +++++++++++++-------------- 1 file changed, 65 insertions(+), 75 deletions(-) diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index c270612e0..b90ecbcc1 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -219,6 +219,26 @@ export interface WorkspaceConsumersState { isCalculating: boolean; } +interface WorkspaceChatTransientState { + caughtUp: boolean; + historicalMessages: MuxMessage[]; + pendingStreamEvents: WorkspaceChatMessage[]; + replayingHistory: boolean; + queuedMessage: QueuedMessage | null; + liveBashOutput: Map; +} + +function createInitialChatTransientState(): WorkspaceChatTransientState { + return { + caughtUp: false, + historicalMessages: [], + pendingStreamEvents: [], + replayingHistory: false, + queuedMessage: null, + liveBashOutput: new Map(), + }; +} + const ON_CHAT_RETRY_BASE_MS = 250; const ON_CHAT_RETRY_MAX_MS = 5000; @@ -263,11 +283,11 @@ export class WorkspaceStore { // Supporting data structures private aggregators = new Map(); private ipcUnsubscribers = new Map void>(); - private caughtUp = new Map(); - private historicalMessages = new Map(); - private pendingStreamEvents = new Map(); + + // Per-workspace ephemeral chat state (buffering, queued message, live bash output, etc.) + private chatTransientState = new Map(); + private workspaceMetadata = new Map(); // Store metadata for name lookup - private queuedMessages = new Map(); // Cached queued messages // Workspace timing stats snapshots (from workspace.stats.subscribe) private statsEnabled = false; @@ -276,9 +296,6 @@ export class WorkspaceStore { private statsUnsubscribers = new Map void>(); // Cumulative session usage (from session-usage.json) - // UI-only incremental bash output streamed via bash-output events (not persisted). - // Keyed by toolCallId. - private liveBashOutput = new Map>(); private sessionUsage = new Map>(); // Idle compaction notification callbacks (called when backend signals idle compaction needed) @@ -406,11 +423,8 @@ export class WorkspaceStore { if (toolCallEnd.toolName === "bash") { const output = (toolCallEnd.result as { output?: unknown } | undefined)?.output; if (typeof output === "string") { - const perWorkspace = this.liveBashOutput.get(workspaceId); - perWorkspace?.delete(toolCallEnd.toolCallId); - if (perWorkspace?.size === 0) { - this.liveBashOutput.delete(workspaceId); - } + const transient = this.chatTransientState.get(workspaceId); + transient?.liveBashOutput.delete(toolCallEnd.toolCallId); } } @@ -467,7 +481,7 @@ export class WorkspaceStore { } : null; - this.queuedMessages.set(workspaceId, queuedMessage); + this.assertChatTransientState(workspaceId).queuedMessage = queuedMessage; this.states.bump(workspaceId); }, "restore-to-input": (workspaceId, _aggregator, data) => { @@ -493,9 +507,6 @@ export class WorkspaceStore { // Track previous sidebar state per workspace (to prevent unnecessary bumps) private previousSidebarValues = new Map(); - // Track workspaces currently replaying buffered history (to avoid O(N) scheduling) - private replayingHistory = new Set(); - // Track model usage (optional integration point for model bookkeeping) private readonly onModelUsed?: (model: string) => void; @@ -688,7 +699,7 @@ export class WorkspaceStore { workspaceId: string, aggregator: StreamingMessageAggregator ): void { - const perWorkspace = this.liveBashOutput.get(workspaceId); + const perWorkspace = this.chatTransientState.get(workspaceId)?.liveBashOutput; if (!perWorkspace || perWorkspace.size === 0) return; const activeToolCallIds = new Set(); @@ -703,10 +714,6 @@ export class WorkspaceStore { perWorkspace.delete(toolCallId); } } - - if (perWorkspace.size === 0) { - this.liveBashOutput.delete(workspaceId); - } } /** @@ -730,8 +737,7 @@ export class WorkspaceStore { }; getBashToolLiveOutput(workspaceId: string, toolCallId: string): LiveBashOutputView | null { - const perWorkspace = this.liveBashOutput.get(workspaceId); - const state = perWorkspace?.get(toolCallId); + const state = this.chatTransientState.get(workspaceId)?.liveBashOutput.get(toolCallId); // Important: return the stored object reference so useSyncExternalStore sees a stable snapshot. // (Returning a fresh object every call can trigger an infinite re-render loop.) @@ -748,6 +754,12 @@ export class WorkspaceStore { return aggregator; } + private assertChatTransientState(workspaceId: string): WorkspaceChatTransientState { + const state = this.chatTransientState.get(workspaceId); + assert(state, `Workspace ${workspaceId} not found - must call addWorkspace() first`); + return state; + } + /** * Get state for a specific workspace. * Lazy computation - only runs when version changes. @@ -759,7 +771,7 @@ export class WorkspaceStore { const aggregator = this.assertGet(workspaceId); const hasMessages = aggregator.hasMessages(); - const isCaughtUp = this.caughtUp.get(workspaceId) ?? false; + const transient = this.assertChatTransientState(workspaceId); const activeStreams = aggregator.getActiveStreams(); const messages = aggregator.getAllMessages(); const metadata = this.workspaceMetadata.get(workspaceId); @@ -767,11 +779,11 @@ export class WorkspaceStore { return { name: metadata?.name ?? workspaceId, // Fall back to ID if metadata missing messages: aggregator.getDisplayedMessages(), - queuedMessage: this.queuedMessages.get(workspaceId) ?? null, + queuedMessage: transient.queuedMessage, canInterrupt: activeStreams.length > 0, isCompacting: aggregator.isCompacting(), awaitingUserQuestion: aggregator.hasAwaitingUserQuestion(), - loading: !hasMessages && !isCaughtUp, + loading: !hasMessages && !transient.caughtUp, muxMessages: messages, currentModel: aggregator.getCurrentModel() ?? null, recencyTimestamp: aggregator.getRecencyTimestamp(), @@ -1044,7 +1056,7 @@ export class WorkspaceStore { */ getWorkspaceConsumers(workspaceId: string): WorkspaceConsumersState { const aggregator = this.aggregators.get(workspaceId); - const isCaughtUp = this.caughtUp.get(workspaceId) ?? false; + const isCaughtUp = this.chatTransientState.get(workspaceId)?.caughtUp ?? false; // Lazy trigger check (runs on EVERY access, not just when MapStore recomputes) const cached = this.consumerManager.getCachedState(workspaceId); @@ -1104,7 +1116,7 @@ export class WorkspaceStore { metadata?: { usage?: LanguageModelV2Usage } ): void { // During history replay: only bump usage, skip scheduling (caught-up schedules once at end) - if (this.replayingHistory.has(workspaceId)) { + if (this.chatTransientState.get(workspaceId)?.replayingHistory) { if (metadata?.usage) { this.usageStore.bump(workspaceId); } @@ -1181,15 +1193,8 @@ export class WorkspaceStore { aggregator.clear(); - // Clear non-persisted UI state that won't be re-emitted during replay. - this.queuedMessages.set(workspaceId, null); - this.liveBashOutput.delete(workspaceId); - - // Reset replay buffers so we batch-load the next replay. - this.caughtUp.set(workspaceId, false); - this.historicalMessages.set(workspaceId, []); - this.pendingStreamEvents.set(workspaceId, []); - this.replayingHistory.delete(workspaceId); + // Reset per-workspace transient state so the next replay rebuilds from the backend source of truth. + this.chatTransientState.set(workspaceId, createInitialChatTransientState()); this.states.bump(workspaceId); this.checkAndBumpRecencyIfChanged(); @@ -1291,12 +1296,9 @@ export class WorkspaceStore { this.derived.bump("recency"); } - // Initialize state - if (!this.caughtUp.has(workspaceId)) { - this.caughtUp.set(workspaceId, false); - } - if (!this.historicalMessages.has(workspaceId)) { - this.historicalMessages.set(workspaceId, []); + // Initialize transient chat state + if (!this.chatTransientState.has(workspaceId)) { + this.chatTransientState.set(workspaceId, createInitialChatTransientState()); } // Clear stale streaming state @@ -1361,9 +1363,7 @@ export class WorkspaceStore { this.usageStore.delete(workspaceId); this.consumersStore.delete(workspaceId); this.aggregators.delete(workspaceId); - this.caughtUp.delete(workspaceId); - this.historicalMessages.delete(workspaceId); - this.pendingStreamEvents.delete(workspaceId); + this.chatTransientState.delete(workspaceId); this.recencyCache.delete(workspaceId); this.previousSidebarValues.delete(workspaceId); this.sidebarStateCache.delete(workspaceId); @@ -1371,7 +1371,6 @@ export class WorkspaceStore { this.workspaceCreatedAt.delete(workspaceId); this.workspaceStats.delete(workspaceId); this.statsStore.delete(workspaceId); - this.liveBashOutput.delete(workspaceId); this.sessionUsage.delete(workspaceId); } @@ -1417,12 +1416,9 @@ export class WorkspaceStore { this.usageStore.clear(); this.consumersStore.clear(); this.aggregators.clear(); - this.caughtUp.clear(); - this.historicalMessages.clear(); - this.pendingStreamEvents.clear(); + this.chatTransientState.clear(); this.workspaceStats.clear(); this.statsStore.clear(); - this.liveBashOutput.clear(); this.sessionUsage.clear(); this.recencyCache.clear(); this.previousSidebarValues.clear(); @@ -1511,36 +1507,35 @@ export class WorkspaceStore { // Aggregator must exist - IPC subscription happens in addWorkspace() const aggregator = this.assertGet(workspaceId); - const isCaughtUp = this.caughtUp.get(workspaceId) ?? false; - const historicalMsgs = this.historicalMessages.get(workspaceId) ?? []; + const transient = this.assertChatTransientState(workspaceId); if (isCaughtUpMessage(data)) { // Check if there's an active stream in buffered events (reconnection scenario) - const pendingEvents = this.pendingStreamEvents.get(workspaceId) ?? []; + const pendingEvents = transient.pendingStreamEvents; const hasActiveStream = pendingEvents.some( (event) => "type" in event && event.type === "stream-start" ); // Load historical messages first - if (historicalMsgs.length > 0) { - aggregator.loadHistoricalMessages(historicalMsgs, hasActiveStream); - this.historicalMessages.set(workspaceId, []); + if (transient.historicalMessages.length > 0) { + aggregator.loadHistoricalMessages(transient.historicalMessages, hasActiveStream); + transient.historicalMessages.length = 0; } // Mark that we're replaying buffered history (prevents O(N) scheduling) - this.replayingHistory.add(workspaceId); + transient.replayingHistory = true; // Process buffered stream events now that history is loaded for (const event of pendingEvents) { this.processStreamEvent(workspaceId, aggregator, event); } - this.pendingStreamEvents.set(workspaceId, []); + pendingEvents.length = 0; // Done replaying buffered events - this.replayingHistory.delete(workspaceId); + transient.replayingHistory = false; // Mark as caught up - this.caughtUp.set(workspaceId, true); + transient.caughtUp = true; this.states.bump(workspaceId); this.checkAndBumpRecencyIfChanged(); // Messages loaded, update recency @@ -1575,10 +1570,8 @@ export class WorkspaceStore { // // This is especially important for workspaces with long histories (100+ messages), // where unbuffered rendering would cause visible lag and UI stutter. - if (!isCaughtUp && this.isBufferedEvent(data)) { - const pending = this.pendingStreamEvents.get(workspaceId) ?? []; - pending.push(data); - this.pendingStreamEvents.set(workspaceId, pending); + if (!transient.caughtUp && this.isBufferedEvent(data)) { + transient.pendingStreamEvents.push(data); return; } @@ -1630,18 +1623,16 @@ export class WorkspaceStore { if (isBashOutputEvent(data)) { if (data.text.length === 0) return; - const perWorkspace = - this.liveBashOutput.get(workspaceId) ?? new Map(); + const transient = this.assertChatTransientState(workspaceId); - const prev = perWorkspace.get(data.toolCallId); + const prev = transient.liveBashOutput.get(data.toolCallId); const next = appendLiveBashOutputChunk( prev, { text: data.text, isError: data.isError }, BASH_TRUNCATE_MAX_TOTAL_BYTES ); - perWorkspace.set(data.toolCallId, next); - this.liveBashOutput.set(workspaceId, perWorkspace); + transient.liveBashOutput.set(data.toolCallId, next); // High-frequency: throttle UI updates like other delta-style events. this.scheduleIdleStateBump(workspaceId); @@ -1656,12 +1647,11 @@ export class WorkspaceStore { // Regular messages (MuxMessage without type field) if (isMuxMessage(data)) { - const isCaughtUp = this.caughtUp.get(workspaceId) ?? false; - if (!isCaughtUp) { + const transient = this.assertChatTransientState(workspaceId); + + if (!transient.caughtUp) { // Buffer historical MuxMessages - const historicalMsgs = this.historicalMessages.get(workspaceId) ?? []; - historicalMsgs.push(data); - this.historicalMessages.set(workspaceId, historicalMsgs); + transient.historicalMessages.push(data); } else { // Process live events immediately (after history loaded) aggregator.handleMessage(data);