diff --git a/examples/nodejs/workflows/workflows.ts b/examples/nodejs/workflows/workflows.ts index b257817..c99897f 100644 --- a/examples/nodejs/workflows/workflows.ts +++ b/examples/nodejs/workflows/workflows.ts @@ -1,85 +1,40 @@ -// Test script for the simplified proxy approach +// Experimental upcoming beta AI primitve. +// Please refer to the documentation for more information: https://langbase.com/docs for more information. + import 'dotenv/config'; -import {Langbase} from 'langbase'; +import {Langbase, Workflow} from 'langbase'; -// Create Langbase instance const langbase = new Langbase({ apiKey: process.env.LANGBASE_API_KEY!, }); async function main() { - // Create a workflow with debug mode enabled - const workflow = langbase.workflow({ - name: 'simplified-proxy-test', - debug: true, // Enable debug logging + const {step} = new Workflow({debug: true}); + + const result = await step({ + id: 'sumamrize', + run: async () => { + return langbase.llm.run({ + model: 'openai:gpt-4o-mini', + apiKey: process.env.OPENAI_API_KEY!, + messages: [ + { + role: 'system', + content: + 'You are an expert summarizer. Summarize the user input.', + }, + { + role: 'user', + content: + 'I am testing workflows. I just created an example of summarize workflow. Can you summarize this?', + }, + ], + stream: false, + }); + }, }); - try { - // STEP 1: Call langbase.agent.run but don't return its result directly - const step1Result = await workflow.step({ - id: 'call-but-return-custom', - run: async () => { - // Return custom result instead - return { - customField: 'Custom result from simplified proxy', - timestamp: new Date().toISOString(), - }; - }, - }); - - // STEP 2: Return agent.run result directly - const step2Result = await workflow.step({ - id: 'return-agent-run-directly', - run: async () => { - // Call Langbase API and return the result directly - return langbase.agent.run({ - model: 'openai:gpt-4o-mini', - apiKey: process.env.OPENAI_API_KEY!, - instructions: 'Be brief and concise.', - input: 'What is 2+2?', - stream: false, - }); - }, - }); - - // STEP 3: Make multiple Langbase calls in one step - const step3Result = await workflow.step({ - id: 'multiple-calls', - run: async () => { - // First call - const call1 = await langbase.agent.run({ - model: 'openai:gpt-4o-mini', - apiKey: process.env.OPENAI_API_KEY!, - instructions: 'Be brief.', - input: 'First proxy test', - stream: false, - }); - - // Second call with different method - const call2 = await langbase.agent.run({ - model: 'openai:gpt-4o-mini', - apiKey: process.env.OPENAI_API_KEY!, - instructions: 'Be brief.', - input: 'Second proxy test', - stream: false, - }); - - // Return combined result - return { - summary: 'Multiple calls completed with simplified proxy', - calls: 2, - firstOutput: call1.output, - secondOutput: call2.output, - }; - }, - }); - } catch (error) { - console.error('āŒ Workflow error:', error); - } finally { - // End the workflow to show trace report - workflow.end(); - } + console.log(result['completion']); } -// Run the test -main().catch(console.error); +main(); diff --git a/packages/langbase/src/common/request.ts b/packages/langbase/src/common/request.ts index b6a6a90..cf2d817 100644 --- a/packages/langbase/src/common/request.ts +++ b/packages/langbase/src/common/request.ts @@ -62,17 +62,6 @@ export class Request { const isLllmGenerationEndpoint = GENERATION_ENDPOINTS.includes(endpoint); - // All endpoints should return headers if rawResponse is true - if (!isLllmGenerationEndpoint && options.body?.rawResponse) { - const responseData = await response.json(); - return { - ...responseData, - rawResponse: { - headers: Object.fromEntries(response.headers.entries()), - }, - } as T; - } - if (isLllmGenerationEndpoint) { const threadId = response.headers.get('lb-thread-id'); diff --git a/packages/langbase/src/langbase/langbase.ts b/packages/langbase/src/langbase/langbase.ts index 501c18c..b2c01e5 100644 --- a/packages/langbase/src/langbase/langbase.ts +++ b/packages/langbase/src/langbase/langbase.ts @@ -1,6 +1,5 @@ import {convertDocToFormData} from '@/lib/utils/doc-to-formdata'; import {Request} from '../common/request'; -import {Workflow} from './workflows'; export type Role = 'user' | 'assistant' | 'system' | 'tool'; @@ -640,12 +639,6 @@ export class Langbase { }; }; - public workflow: (config: {debug?: boolean; name: string}) => Workflow; - - public traces: { - create: (trace: any) => Promise; - }; - constructor(options?: LangbaseOptions) { this.baseUrl = options?.baseUrl ?? 'https://api.langbase.com'; this.apiKey = options?.apiKey ?? ''; @@ -730,12 +723,6 @@ export class Langbase { this.agent = { run: this.runAgent.bind(this), }; - - this.workflow = config => new Workflow({...config, langbase: this}); - - this.traces = { - create: this.createTrace.bind(this), - }; } private async runPipe( @@ -1159,17 +1146,4 @@ export class Langbase { }, }); } - - /** - * Creates a new trace on Langbase. - * - * @param {any} trace - The trace data to send. - * @returns {Promise} A promise that resolves to the response of the trace creation. - */ - private async createTrace(trace: any): Promise { - return this.request.post({ - endpoint: '/v1/traces', - body: trace, - }); - } } diff --git a/packages/langbase/src/langbase/trace.ts b/packages/langbase/src/langbase/trace.ts deleted file mode 100644 index e97f47b..0000000 --- a/packages/langbase/src/langbase/trace.ts +++ /dev/null @@ -1,133 +0,0 @@ -export interface Trace { - name: string; - startTime: number; - endTime?: number; - duration?: number; - steps: StepTrace[]; - error?: string; -} - -export interface StepTrace { - name: string; - output: any; - error?: string; - traces: string[] | null; - duration: number; - startTime: number; - endTime: number; -} - -export type TraceType = - | 'workflow' - | 'agent' - | 'chunk' - | 'memory' - | 'parse' - | 'embed'; - -export type PrimitiveTrace = - | {chunk: any} - | {agent: any} - | {memory: any} - | {parse: any} - | {embed: any} - | {workflow: WorkflowTrace; entityAuthId: string}; - -type WorkflowTrace = { - createdAt: string; - id: string; - agentWorkflowId: string; - name: string; - startTime: number; - endTime?: number; - duration?: number; - steps: StepTrace[]; - error?: string; -}; - -export class TraceManager { - private traces: Map = new Map(); - - createTrace(type: TraceType, traceData: any = {}): string { - const traceId = crypto.randomUUID(); - let trace: PrimitiveTrace; - const createdAt = new Date().toISOString(); - if (type === 'workflow') { - trace = { - workflow: { - createdAt, - id: traceId, - agentWorkflowId: process.env.LANGBASE_AGENT_ID || '', - name: traceData.name || '', - startTime: Date.now(), - steps: [], - }, - entityAuthId: '', - }; - } else if (type === 'agent') { - trace = {agent: {...traceData, createdAt, id: traceId}}; - } else if (type === 'chunk') { - trace = {chunk: {...traceData, createdAt, id: traceId}}; - } else if (type === 'memory') { - trace = {memory: {...traceData, createdAt, id: traceId}}; - } else if (type === 'parse') { - trace = {parse: {...traceData, createdAt, id: traceId}}; - } else if (type === 'embed') { - trace = {embed: {...traceData, createdAt, id: traceId}}; - } else { - throw new Error('Unknown trace type'); - } - this.traces.set(traceId, trace); - return traceId; - } - - addStep(traceId: string, step: StepTrace) { - const trace = this.traces.get(traceId); - if (trace && 'workflow' in trace) { - trace.workflow.steps.push(step); - } - } - - endTrace(traceId: string) { - const trace = this.traces.get(traceId); - if (trace && 'workflow' in trace) { - trace.workflow.endTime = Date.now(); - trace.workflow.duration = - trace.workflow.endTime - trace.workflow.startTime; - } - } - - getTrace(traceId: string): PrimitiveTrace | undefined { - return this.traces.get(traceId); - } - - printTrace(traceId: string) { - const trace = this.traces.get(traceId); - if (!trace) return; - if ('workflow' in trace) { - const wf = trace.workflow; - const duration = wf.endTime - ? wf.endTime - wf.startTime - : Date.now() - wf.startTime; - console.log('\nšŸ“Š Workflow Trace:'); - console.log(`Name: ${wf.name}`); - console.log(`Duration: ${duration}ms`); - console.log(`Start Time: ${new Date(wf.startTime).toISOString()}`); - if (wf.endTime) { - console.log(`End Time: ${new Date(wf.endTime).toISOString()}`); - } - console.log('\nSteps:'); - wf.steps.forEach(step => { - console.log(`\n Step: ${step.name}`); - console.log(` Duration: ${step.duration}ms`); - if (step.traces && step.traces.length > 0) { - console.log(` Traces:`, step.traces); - } - console.log(` Output:`, step.output); - }); - } else { - console.log('\nšŸ“Š Primitive Trace:'); - console.dir(trace, {depth: 4}); - } - } -} diff --git a/packages/langbase/src/langbase/workflows.ts b/packages/langbase/src/langbase/workflows.ts index 7ce6032..63847c0 100644 --- a/packages/langbase/src/langbase/workflows.ts +++ b/packages/langbase/src/langbase/workflows.ts @@ -1,23 +1,3 @@ -import {TraceManager, StepTrace} from './trace'; -import {Langbase} from './langbase'; - -// Cross-platform global object -const _global: any = - typeof global !== 'undefined' - ? global - : typeof window !== 'undefined' - ? window - : typeof self !== 'undefined' - ? self - : typeof globalThis !== 'undefined' - ? globalThis - : {}; - -// Declare the global langbase instance -_global.langbase = _global.langbase || new Langbase(); -_global._activeTraceCollector = _global._activeTraceCollector || null; -_global._workflowDebugEnabled = _global._workflowDebugEnabled || false; - type WorkflowContext = { outputs: Record; }; @@ -35,12 +15,6 @@ type StepConfig = { run: () => Promise; }; -type WorkflowConfig = { - debug?: boolean; - name: string; - langbase: Langbase; -}; - class TimeoutError extends Error { constructor(stepId: string, timeout: number) { super(`Step "${stepId}" timed out after ${timeout}ms`); @@ -51,183 +25,15 @@ class TimeoutError extends Error { export class Workflow { private context: WorkflowContext; private debug: boolean; - private name: string; - private traceManager: TraceManager; - private traceId: string; - private langbase: Langbase; - - private originalMethods: Map = new Map(); public readonly step: (config: StepConfig) => Promise; - constructor(config: WorkflowConfig) { + constructor(config: {debug?: boolean} = {debug: false}) { this.context = {outputs: {}}; this.debug = config.debug ?? false; - this.name = config.name; - this.langbase = config.langbase; - this.traceManager = new TraceManager(); - this.traceId = this.traceManager.createTrace('workflow', { - name: this.name, - }); this.step = this._step.bind(this); - - // Set global debug flag - _global._workflowDebugEnabled = this.debug; - } - - /** - * Replace a method in the Langbase instance with a traced version - */ - private interceptMethod(obj: any, method: string, path: string = ''): void { - if (!obj || typeof obj[method] !== 'function') return; - - const fullPath = path ? `${path}.${method}` : method; - const originalMethod = obj[method]; - - // Only replace if not already intercepted - if (!this.originalMethods.has(fullPath)) { - this.originalMethods.set(fullPath, originalMethod); - - const debug = this.debug; - - // Replace with intercepted version - obj[method] = async function (...args: any[]) { - // Add custom arguments for tracing - // Add rawResponse to the options if it's an object - const lastArg = args[args.length - 1]; - const newArgs = [...args]; - - if (lastArg && typeof lastArg === 'object') { - newArgs[newArgs.length - 1] = { - ...lastArg, - rawResponse: true, - }; - } - // Append a new object if the last argument is not an object - else { - newArgs.push({rawResponse: true}); - } - - const result = await originalMethod.apply(this, newArgs); - console.log(`šŸ”„ Intercepted method: ${fullPath}`, result); - - // Process result for tracing if we have an active collector - if (_global._activeTraceCollector) { - // Extract or create traceId - let traceId: string | undefined; - - // Check if result is an object with response headers - if (result && typeof result === 'object') { - // Extract from response headers - if ('rawResponse' in result && result.rawResponse) { - // Check for lb-trace-id in headers - if (result.rawResponse.headers['lb-trace-id']) { - // Plain object headers - traceId = - result.rawResponse.headers['lb-trace-id']; - } - } - - // Notify collector if traceId was found - if (traceId && _global._activeTraceCollector) { - if (debug) - console.log( - `šŸ” Trace ID extracted: ${traceId}`, - ); - _global._activeTraceCollector(traceId); - } - } - } - - return result; - }; - } - } - - /** - * Restore all original methods that were intercepted - */ - private restoreOriginalMethods(): void { - this.originalMethods.forEach((originalMethod, path) => { - // Parse the path to find the object and method - const parts = path.split('.'); - const methodName = parts.pop()!; - let obj: any = this.langbase; - - // Navigate to the correct object - for (const part of parts) { - if (obj && typeof obj === 'object' && part in obj) { - obj = obj[part as keyof typeof obj]; // Type safe access - } else { - return; // Skip if path no longer exists - } - } - - // Restore original method - if ( - obj && - methodName in obj && - typeof obj[methodName] === 'function' - ) { - obj[methodName] = originalMethod; - } - }); - - // Clear the map - this.originalMethods.clear(); - } - - /** - * Intercept all important methods in the Langbase instance - */ - private setupMethodInterceptors(): void { - // Agent methods - this.interceptMethod(this.langbase.agent, 'run', 'agent'); - - // Pipes methods - this.interceptMethod(this.langbase.pipes, 'run', 'pipes'); - this.interceptMethod(this.langbase.pipe, 'run', 'pipe'); - - // Memory methods - if (this.langbase.memories) { - this.interceptMethod( - this.langbase.memories, - 'retrieve', - 'memories', - ); - } - if (this.langbase.memory) { - this.interceptMethod(this.langbase.memory, 'retrieve', 'memory'); - } - - // Tool methods - if (this.langbase.tools) { - this.interceptMethod(this.langbase.tools, 'webSearch', 'tools'); - this.interceptMethod(this.langbase.tools, 'crawl', 'tools'); - } - if (this.langbase.tool) { - this.interceptMethod(this.langbase.tool, 'webSearch', 'tool'); - this.interceptMethod(this.langbase.tool, 'crawl', 'tool'); - } - - // Top-level methods - this.interceptMethod(this.langbase, 'embed'); - this.interceptMethod(this.langbase, 'chunk'); - this.interceptMethod(this.langbase, 'parse'); } private async _step(config: StepConfig): Promise { - const stepStartTime = Date.now(); - // Initialize an array to collect trace IDs - const stepTraces: string[] = []; - - // Function to collect trace IDs - const collectTrace = (traceId: string) => { - if (this.debug) { - console.log(`šŸ“‹ Collected trace ID: ${traceId}`); - } - stepTraces.push(traceId); - }; - if (this.debug) { console.log(`\nšŸ”„ Starting step: ${config.id}`); console.time(`ā±ļø Step ${config.id}`); @@ -242,103 +48,60 @@ export class Workflow { ? config.retries.limit + 1 : 1; - // Set up method interceptors before running the step - this.setupMethodInterceptors(); - - // Set the global active trace collector - const previousTraceCollector = _global._activeTraceCollector; - _global._activeTraceCollector = collectTrace; + while (attempt <= maxAttempts) { + try { + let stepPromise = config.run(); - try { - // Execute the step function directly - let stepPromise: Promise = config.run(); - - // Apply timeout if configured - if (config.timeout) { - stepPromise = this.withTimeout({ - promise: stepPromise, - timeout: config.timeout, - stepId: config.id, - }); - } - - // Wait for the step to complete - const result = await stepPromise; - - // Store step result in context - this.context.outputs[config.id] = result; - - if (this.debug) { - console.timeEnd(`ā±ļø Step ${config.id}`); - console.log(`šŸ“¤ Output:`, result); - - if (stepTraces.length > 0) { - console.log( - `šŸ“‹ Trace IDs (${stepTraces.length}):`, - stepTraces, - ); - } else { - console.log(`šŸ” No trace IDs captured for this step`); + if (config.timeout) { + stepPromise = this.withTimeout({ + promise: stepPromise, + timeout: config.timeout, + stepId: config.id, + }); } - } - - // Create step trace - const stepEndTime = Date.now(); - const stepTrace: StepTrace = { - name: config.id, - output: result, - traces: stepTraces.length > 0 ? stepTraces : null, - duration: stepEndTime - stepStartTime, - startTime: stepStartTime, - endTime: stepEndTime, - }; - - // Add step to trace manager - this.traceManager.addStep(this.traceId, stepTrace); - // Restore original methods and trace collector - this.restoreOriginalMethods(); - _global._activeTraceCollector = previousTraceCollector; - - return result; - } catch (error) { - // Restore original methods and trace collector on error - this.restoreOriginalMethods(); - _global._activeTraceCollector = previousTraceCollector; - - // Store error for potential retry or final throw - lastError = error as Error; - - // If retries are configured, try again - if (attempt < maxAttempts) { - const delay = config.retries - ? this.calculateDelay( - config.retries.delay, - attempt, - config.retries.backoff, - ) - : 0; + const result = await stepPromise; + this.context.outputs[config.id] = result; if (this.debug) { - console.log( - `āš ļø Attempt ${attempt} failed, retrying in ${delay}ms...`, - ); - console.error(error); + console.timeEnd(`ā±ļø Step ${config.id}`); + console.log(`šŸ“¤ Output:`, result); + console.log(`āœ… Completed step: ${config.id}\n`); } - await this.sleep(delay); - attempt++; + return result; + } catch (error) { + lastError = error as Error; + + if (attempt < maxAttempts) { + const delay = config.retries + ? this.calculateDelay( + config.retries.delay, + attempt, + config.retries.backoff, + ) + : 0; + + if (this.debug) { + console.log( + `āš ļø Attempt ${attempt} failed, retrying in ${delay}ms...`, + ); + console.error(error); + } - // Try again with the next attempt - return this._step(config); - } else { - if (this.debug) { - console.timeEnd(`ā±ļø Step ${config.id}`); - console.error(`āŒ Failed step: ${config.id}`, error); + await this.sleep(delay); + attempt++; + } else { + if (this.debug) { + console.timeEnd(`ā±ļø Step ${config.id}`); + console.error(`āŒ Failed step: ${config.id}`, error); + } + throw lastError; } - throw lastError; } } + + throw lastError; } private async withTimeout({ @@ -378,32 +141,4 @@ export class Workflow { private async sleep(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)); } - - public async end(): Promise { - // Finalise and grab the trace - this.traceManager.endTrace(this.traceId); - this.traceManager.printTrace(this.traceId); - const traceData = this.traceManager.getTrace(this.traceId); - - // --- send to LB API v1/traces/create using SDK method --- - try { - const res = await this.langbase.traces.create(traceData); - - if (!res || res.error) { - console.error( - `āŒ Trace upload failed: ${res?.status || ''} ${res?.statusText || ''}`, - ); - } else if (this.debug) { - console.log(`āœ… Trace ${this.traceId} sent to collector`); - } - } catch (err) { - console.error('āŒ Error while sending trace', err); - } - // ------------------------------------------------------------------------- - - if (this.debug) { - console.log('\nšŸ” DEBUG: Final trace data:'); - console.log(JSON.stringify(traceData, null, 2)); - } - } }