Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 50 additions & 4 deletions packages/opencode/src/bun/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,33 @@ import { NamedError } from "@opencode-ai/util/error"
import { readableStreamToText } from "bun"
import { Lock } from "../util/lock"
import { PackageRegistry } from "./registry"
import { mkdir, rm } from "fs/promises"

export namespace BunProc {
const log = Log.create({ service: "bun" })

async function getPendingUpdatePath(pkg: string): Promise<string> {
return path.join(Global.Path.cache, ".update-pending", pkg)
}

async function hasPendingUpdate(pkg: string): Promise<boolean> {
const updatePath = await getPendingUpdatePath(pkg)
return await Filesystem.exists(updatePath)
}

async function markUpdatePending(pkg: string): Promise<void> {
const updatePath = await getPendingUpdatePath(pkg)
await mkdir(path.dirname(updatePath), { recursive: true })
await Bun.write(updatePath, Date.now().toString())
}

async function clearPendingUpdate(pkg: string): Promise<void> {
const updatePath = await getPendingUpdatePath(pkg)
if (await Filesystem.exists(updatePath)) {
await rm(updatePath)
}
}

export async function run(cmd: string[], options?: Bun.SpawnOptions.OptionsObject<any, any, any>) {
log.info("running", {
cmd: [which(), ...cmd],
Expand Down Expand Up @@ -76,14 +99,37 @@ export namespace BunProc {
const modExists = await Filesystem.exists(mod)
const cachedVersion = dependencies[pkg]

const pendingUpdate = await hasPendingUpdate(pkg)

if (!modExists || !cachedVersion) {
// continue to install
} else if (version !== "latest" && cachedVersion === version) {
return mod
} else if (version === "latest") {
const isOutdated = await PackageRegistry.isOutdated(pkg, cachedVersion, Global.Path.cache)
if (!isOutdated) return mod
log.info("Cached version is outdated, proceeding with install", { pkg, cachedVersion })
} else if (version === "latest" && pendingUpdate) {
// Update was detected in previous session - install now
log.info("Installing pending plugin update", { pkg, current: cachedVersion })
await clearPendingUpdate(pkg)
// Fall through to install latest version
} else if (version === "latest" && cachedVersion) {
// Return cached version immediately, check for updates in background
queueMicrotask(() => {
void (async () => {
try {
using _lock = await Lock.read("bun-install")
const isOutdated = await PackageRegistry.isOutdated(pkg, cachedVersion, Global.Path.cache)
if (isOutdated) {
await markUpdatePending(pkg)
log.info("Plugin update available - will be installed on next startup", {
pkg,
current: cachedVersion,
})
}
} catch (error) {
log.debug("Background update check failed", { pkg, error })
}
})()
})
return mod
}

const proxied = !!(
Expand Down
13 changes: 11 additions & 2 deletions packages/opencode/src/cli/ink/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import { useReducer, useCallback, useState, useRef, useEffect } from "react"
import type { ReactElement } from "react"
import { Box, Text } from "ink"

import { appReducer, initialState } from "./state/reducer"
import { theme } from "./theme"
import { InputLine } from "./components/InputLine"
Expand Down Expand Up @@ -36,6 +37,7 @@ export const App = (): ReactElement => {
})
} catch (error) {
console.error("Session init failed:", error)
dispatch({ type: "CLEAR_STREAMING" })
dispatch({
type: "STREAM_TEXT",
payload: `Error: Failed to initialize session - ${error}\n`,
Expand Down Expand Up @@ -66,8 +68,15 @@ export const App = (): ReactElement => {
})
return
}
// Send message via SDK hook
await sendMessage(value.trim())
try {
await sendMessage(value.trim())
} catch (err) {
dispatch({ type: "CLEAR_STREAMING" })
dispatch({
type: "STREAM_TEXT",
payload: `Failed to send message: ${err}\n`,
})
}
}
},
[dispatch, setUIMode, state.session.id, sendMessage],
Expand Down
5 changes: 4 additions & 1 deletion packages/opencode/src/cli/ink/components/InputLine.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import { useState, useCallback, useEffect } from "react"
import type { ReactElement } from "react"
import { Box, Text, useInput } from "ink"

import { theme } from "../theme"

interface InputLineProps {
Expand Down Expand Up @@ -29,7 +30,9 @@ export const InputLine = ({

useInput(
(input, key) => {
if (disabled || !focus) return
if (disabled || !focus) {
return
}

if (key.return) {
onSubmit(state.value)
Expand Down
2 changes: 2 additions & 0 deletions packages/opencode/src/cli/ink/entry.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env bun

import { Global } from "@/global"
import { Log } from "@/util/log"
import { Instance } from "@/project/instance"
Expand All @@ -16,6 +17,7 @@ async function main() {
}

await Global.init()

// Disable Log printing to stdout - Ink owns stdout for TUI rendering
await Log.init({
print: false,
Expand Down
69 changes: 58 additions & 11 deletions packages/opencode/src/cli/ink/hooks/useSDKEvents.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { useEffect, useCallback, useRef, useState } from "react"

import { createOpencodeClient, type Event } from "@opencode-ai/sdk/v2"
import type { Action } from "@/cli/ink/state/reducer"
import type { Dispatch } from "react"
Expand All @@ -24,9 +25,13 @@ function createInProcessFetch() {
export function useSDKEvents(sessionId: string | null, dispatch: Dispatch<Action>) {
const sdkRef = useRef<ReturnType<typeof createOpencodeClient> | null>(null)
const [isStreaming, setIsStreaming] = useState(false)
const currentAssistantMessageIdRef = useRef<string | null>(null)
const streamingLockRef = useRef(false)

useEffect(() => {
if (!sessionId) return
if (!sessionId) {
return
}

const abortController = new AbortController()
const sdk = createOpencodeClient({
Expand All @@ -47,7 +52,7 @@ export function useSDKEvents(sessionId: string | null, dispatch: Dispatch<Action
)

for await (const event of events.stream) {
handleEvent(event, dispatch, sessionId, setIsStreaming)
handleEvent(event, dispatch, sessionId, setIsStreaming, currentAssistantMessageIdRef, streamingLockRef)
}
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
Expand All @@ -67,23 +72,42 @@ export function useSDKEvents(sessionId: string | null, dispatch: Dispatch<Action

const sendMessage = useCallback(
async (content: string) => {
if (!sessionId || !sdkRef.current) return
if (!sessionId || !sdkRef.current) {
throw new Error("Session not ready - SDK client not initialized")
}

// Use ref-based locking to prevent TOCTOU race
if (streamingLockRef.current || isStreaming) {
throw new Error("Message already in progress - please wait")
}

// Acquire lock BEFORE any async operations
streamingLockRef.current = true

setIsStreaming(true)
dispatch({ type: "CLEAR_STREAMING" })

setIsStreaming(true)

try {
await sdkRef.current.session.prompt({
const result = await sdkRef.current.session.prompt({
sessionID: sessionId,
parts: [{ type: "text", text: content }],
})

if (result.error) {
throw new Error("Failed to send message")
}

currentAssistantMessageIdRef.current = result.data.info.id
} catch (err) {
currentAssistantMessageIdRef.current = null
setIsStreaming(false)
streamingLockRef.current = false
console.error("Failed to send message:", err)
throw err
}
},
[sessionId, dispatch],
[sessionId, dispatch, isStreaming],
)

return {
Expand All @@ -97,19 +121,27 @@ function handleEvent(
dispatch: Dispatch<Action>,
sessionId: string,
setIsStreaming: (value: boolean) => void,
currentAssistantMessageIdRef: { current: string | null },
streamingLockRef: { current: boolean },
) {
switch (event.type) {
case "message.part.updated": {
const part = event.properties.part
if (part.sessionID !== sessionId) return

// Handle text streaming
// Handle text streaming - only for assistant messages
if (part.type === "text") {
const delta = event.properties.delta
if (delta) {
// Skip if this is not the expected assistant message
if (currentAssistantMessageIdRef.current && part.messageID !== currentAssistantMessageIdRef.current) {
return
}

// Use delta for incremental updates, fallback to full text
const content = event.properties.delta ?? part.text
if (typeof content === "string" && content.length > 0) {
dispatch({
type: "STREAM_TEXT",
payload: delta,
payload: content,
})
}
}
Expand Down Expand Up @@ -178,6 +210,13 @@ function handleEvent(
case "message.updated": {
if (event.properties.info.sessionID !== sessionId) return
setIsStreaming(false)

// Clear the tracked message ID if this is the expected assistant message
if (currentAssistantMessageIdRef.current === event.properties.info.id) {
currentAssistantMessageIdRef.current = null
streamingLockRef.current = false
}

dispatch({
type: "MESSAGE_COMPLETE",
payload: { id: event.properties.info.id },
Expand All @@ -189,7 +228,15 @@ function handleEvent(
if (event.properties.sessionID !== sessionId) return
if (event.properties.status.type === "idle") {
setIsStreaming(false)
dispatch({ type: "CLEAR_STREAMING" })
streamingLockRef.current = false

// Fallback: If MESSAGE_COMPLETE never arrived (SDK crash, network error),
// clear streaming state to prevent memory leak
if (currentAssistantMessageIdRef.current !== null) {
currentAssistantMessageIdRef.current = null
dispatch({ type: "CLEAR_STREAMING" })
}
// Otherwise, MESSAGE_COMPLETE already moved content to messages array
}
break
}
Expand Down
9 changes: 8 additions & 1 deletion packages/opencode/src/cli/ink/index.tsx
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
/** @jsxImportSource react */

import { render } from "ink"
import App from "./App"

export function startInkTUI() {
const instance = render(<App />)
// Configure stdin/stdout for Ink to receive keyboard events
const instance = render(<App />, {
stdin: process.stdin,
stdout: process.stdout,
exitOnCtrlC: true, // Let Ink handle Ctrl+C exit
})

return instance.waitUntilExit()
}
4 changes: 2 additions & 2 deletions packages/opencode/test/cli/ink/hooks/event-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,10 @@ describe("SDK Event Handling Logic", () => {

if (sessionStatusEvent.type === "session.status") {
if (sessionStatusEvent.properties.status.type === "idle") {
dispatch({ type: "CLEAR_STREAMING" })
// Don't dispatch CLEAR_STREAMING - MESSAGE_COMPLETE handles it
}
}

expect(dispatch).toHaveBeenCalledWith({ type: "CLEAR_STREAMING" })
expect(dispatch).not.toHaveBeenCalled()
})
})
Loading