From 6a8a843d30a2e533178e229556ed113743241543 Mon Sep 17 00:00:00 2001 From: Josh Thomas <> Date: Thu, 13 Nov 2025 11:37:53 +0900 Subject: [PATCH 1/3] feat(binding-websockets): implement WebSocket client with W3C WoT protocol support Implements complete WebSocket client binding with support for: - OAuth2 Bearer token authentication via custom headers - WebSocket subprotocol negotiation from Thing Description forms - Configurable protocol mode: W3C Web Thing Protocol or generic WebSocket - All CRUD operations: readResource, writeResource, invokeResource - Full subscription support: subscribeResource and unlinkResource - Proper field naming per W3C spec: input/output for actions, value for properties - Connection management with one persistent connection per Thing - Request/response correlation using message IDs - Timeout handling and error propagation - Multiple subscription handlers per resource This resolves the incomplete stub implementation in ws-client.ts and enables real-world usage of WebSocket bindings with authenticated Things using either W3C Web Thing Protocol or generic WebSocket communication. Signed-off-by: Josh Thomas <> Signed-off-by: Josh Thomas --- packages/binding-websockets/src/ws-client.ts | 684 ++++++++++++++++++- 1 file changed, 659 insertions(+), 25 deletions(-) diff --git a/packages/binding-websockets/src/ws-client.ts b/packages/binding-websockets/src/ws-client.ts index 1d0a0dede..84254c9d9 100644 --- a/packages/binding-websockets/src/ws-client.ts +++ b/packages/binding-websockets/src/ws-client.ts @@ -19,49 +19,268 @@ import { ProtocolClient, Content, Form, SecurityScheme, createLoggers } from "@node-wot/core"; import { Subscription } from "rxjs/Subscription"; +import WebSocket from "ws"; +import { Readable } from "stream"; -const { debug, warn } = createLoggers("binding-websockets", "ws-client"); +const { debug, info, warn, error } = createLoggers("binding-websockets", "ws-client"); + +/** + * Protocol mode for WebSocket communication + */ +type ProtocolMode = "wot" | "generic"; + +/** + * Handler for pending request responses + */ +interface ResponseHandler { + resolve: (value: unknown) => void; + reject: (reason: Error) => void; + timeoutId: NodeJS.Timeout; +} + +/** + * Handlers for active subscriptions + */ +interface SubscriptionHandlers { + next: (content: Content) => void; + error?: (error: Error) => void; + complete?: () => void; +} + +/** + * Stored security credentials + */ +interface StoredCredentials { + scheme: string; + token?: string; + username?: string; + password?: string; + [key: string]: unknown; +} export default class WebSocketClient implements ProtocolClient { + // Connection management + private connections: Map = new Map(); + private pendingRequests: Map = new Map(); + private subscriptions: Map> = new Map(); + private credentials: Map = new Map(); + private protocolMode: Map = new Map(); + private isStarted = false; + + // Configuration + private readonly defaultTimeout = 5000; + constructor() { - // TODO: implement and remove eslint-ignore-useless-constructor + debug("WebSocketClient created"); } public toString(): string { return `[WebSocketClient]`; } - public readResource(form: Form): Promise { - return new Promise((resolve, reject) => { - // TODO: implement - }); + public async readResource(form: Form): Promise { + debug(`readResource: ${form.href}`); + + const ws = await this.getOrCreateConnection(form); + const baseUrl = this.extractBaseUrl(form.href); + const mode = this.protocolMode.get(baseUrl) ?? "generic"; + + let response: unknown; + + if (mode === "wot") { + // Use W3C Web Thing Protocol + const thingId = this.extractThingId(form.href); + const resourceName = this.extractResourceName(form.href); + const request = this.buildWoTRequest("readproperty", thingId, resourceName); + response = await this.sendRequest(ws, request); + + // Extract value from W3C response + const value = (response as Record).value !== undefined ? (response as Record).value : response; + return new Content( + form.contentType ?? "application/json", + this.bufferToStream(Buffer.from(JSON.stringify(value))) + ); + } else { + // Generic WebSocket: send simple request + const request = { + id: this.generateMessageId(), + action: "read", + href: form.href, + }; + response = await this.sendRequest(ws, request); + return new Content( + form.contentType ?? "application/json", + this.bufferToStream(Buffer.from(JSON.stringify(response))) + ); + } } - public writeResource(form: Form, content: Content): Promise { - return new Promise((resolve, reject) => { - // TODO: implement - }); + public async writeResource(form: Form, content: Content): Promise { + debug(`writeResource: ${form.href}`); + + const ws = await this.getOrCreateConnection(form); + const baseUrl = this.extractBaseUrl(form.href); + const mode = this.protocolMode.get(baseUrl) ?? "generic"; + + // Parse content body + const buffer = await content.toBuffer(); + const data = JSON.parse(buffer.toString()); + + if (mode === "wot") { + // Use W3C Web Thing Protocol + const thingId = this.extractThingId(form.href); + const resourceName = this.extractResourceName(form.href); + const request = this.buildWoTRequest("writeproperty", thingId, resourceName, data); + await this.sendRequest(ws, request); + } else { + // Generic WebSocket + const request = { + id: this.generateMessageId(), + action: "write", + href: form.href, + value: data, + }; + await this.sendRequest(ws, request); + } } - public invokeResource(form: Form, content?: Content): Promise { - return new Promise((resolve, reject) => { - // TODO: implement - }); + public async invokeResource(form: Form, content?: Content): Promise { + debug(`invokeResource: ${form.href}`); + + const ws = await this.getOrCreateConnection(form); + const baseUrl = this.extractBaseUrl(form.href); + const mode = this.protocolMode.get(baseUrl) ?? "generic"; + + // Parse input parameters if provided + let inputData: unknown; + if (content != null) { + const buffer = await content.toBuffer(); + inputData = JSON.parse(buffer.toString()); + } + + let response: unknown; + + if (mode === "wot") { + // Use W3C Web Thing Protocol + const thingId = this.extractThingId(form.href); + const resourceName = this.extractResourceName(form.href); + const request = this.buildWoTRequest("invokeaction", thingId, resourceName, inputData); + response = await this.sendRequest(ws, request); + + // Extract output from W3C response + const output = (response as Record).output !== undefined ? (response as Record).output : response; + return new Content( + form.response?.contentType ?? form.contentType ?? "application/json", + this.bufferToStream(Buffer.from(JSON.stringify(output))) + ); + } else { + // Generic WebSocket + const request = { + id: this.generateMessageId(), + action: "invoke", + href: form.href, + input: inputData, + }; + response = await this.sendRequest(ws, request); + return new Content( + form.response?.contentType ?? form.contentType ?? "application/json", + this.bufferToStream(Buffer.from(JSON.stringify(response))) + ); + } } - public unlinkResource(form: Form): Promise { - return new Promise((resolve, reject) => { - // TODO: implement - }); + public async unlinkResource(form: Form): Promise { + debug(`unlinkResource: ${form.href}`); + + const baseUrl = this.extractBaseUrl(form.href); + const resourceKey = `${baseUrl}:${this.extractResourceName(form.href)}`; + + // Remove subscription handlers + this.subscriptions.delete(resourceKey); + + const ws = await this.getOrCreateConnection(form); + const mode = this.protocolMode.get(baseUrl) ?? "generic"; + + if (mode === "wot") { + // Send unsubscribe request + const thingId = this.extractThingId(form.href); + const resourceName = this.extractResourceName(form.href); + const request = { + messageType: "request", + thingID: thingId, + messageID: this.generateMessageId(), + operation: "unsubscribe", + name: resourceName, + }; + + try { + await this.sendRequest(ws, request); + } catch (err) { + // Ignore errors during unsubscribe + debug(`Unsubscribe error (ignored): ${err}`); + } + } } - public subscribeResource( + public async subscribeResource( form: Form, next: (content: Content) => void, error?: (error: Error) => void, complete?: () => void ): Promise { - throw new Error("Websocket client does not implement subscribeResource"); + debug(`subscribeResource: ${form.href}`); + + const ws = await this.getOrCreateConnection(form); + const baseUrl = this.extractBaseUrl(form.href); + const resourceName = this.extractResourceName(form.href); + const resourceKey = `${baseUrl}:${resourceName}`; + const mode = this.protocolMode.get(baseUrl) ?? "generic"; + + // Store subscription handlers + if (!this.subscriptions.has(resourceKey)) { + this.subscriptions.set(resourceKey, new Set()); + } + const handlers: SubscriptionHandlers = { next, error, complete }; + this.subscriptions.get(resourceKey)!.add(handlers); + + // Determine if this is an event or property subscription + const isEvent = form.op?.includes("subscribeevent") ?? form.op === "subscribeevent"; + + if (mode === "wot") { + // Send W3C Web Thing Protocol subscribe request + const thingId = this.extractThingId(form.href); + const operation = isEvent ? "subscribeevent" : "subscribeproperty"; + const request = { + messageType: "request", + thingID: thingId, + messageID: this.generateMessageId(), + operation: operation, + name: resourceName, + }; + + try { + await this.sendRequest(ws, request); + } catch (err) { + // Remove handler if subscription failed + this.subscriptions.get(resourceKey)?.delete(handlers); + throw err; + } + } + + // Return RxJS Subscription with unsubscribe function + return new Subscription(() => { + const subs = this.subscriptions.get(resourceKey); + if (subs != null) { + subs.delete(handlers); + if (subs.size === 0) { + this.subscriptions.delete(resourceKey); + // Attempt to unlink + this.unlinkResource(form).catch((err) => { + debug(`Error unlinking during unsubscribe: ${err}`); + }); + } + } + }); } /** @@ -72,11 +291,47 @@ export default class WebSocketClient implements ProtocolClient { } public async start(): Promise { - // do nothing + if (this.isStarted) { + warn("WebSocketClient already started"); + return; + } + info("WebSocketClient starting"); + this.isStarted = true; } public async stop(): Promise { - // do nothing + if (!this.isStarted) { + warn("WebSocketClient not started"); + return; + } + + info("WebSocketClient stopping"); + + // Close all connections + for (const [url, ws] of this.connections.entries()) { + debug(`Closing WebSocket connection to ${url}`); + if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) { + ws.close(); + } + } + this.connections.clear(); + + // Reject all pending requests + for (const [, handler] of this.pendingRequests.entries()) { + clearTimeout(handler.timeoutId); + handler.reject(new Error("WebSocketClient stopped")); + } + this.pendingRequests.clear(); + + // Clear subscriptions + this.subscriptions.clear(); + + // Clear credentials and protocol modes + this.credentials.clear(); + this.protocolMode.clear(); + + this.isStarted = false; + info("WebSocketClient stopped"); } public setSecurity(metadata: Array, credentials?: unknown): boolean { @@ -84,10 +339,389 @@ export default class WebSocketClient implements ProtocolClient { warn("WebSocketClient received empty security metadata"); return false; } - // TODO support for multiple security schemes (see http-client.ts) - const security: SecurityScheme = metadata[0]; - debug(`WebSocketClient using security scheme '${security.scheme}'`); + // Support multiple security schemes by storing all + for (const security of metadata) { + debug(`WebSocketClient processing security scheme '${security.scheme}'`); + + const stored: StoredCredentials = { + scheme: security.scheme, + }; + + // Extract credentials based on scheme type + if (security.scheme === "bearer" && credentials != null) { + const creds = credentials as Record; + stored.token = (creds.token ?? creds.bearer ?? creds) as string; + debug("Stored Bearer token for authentication"); + } else if (security.scheme === "oauth2" && credentials != null) { + const creds = credentials as Record; + stored.token = (creds.token ?? creds.access_token) as string; + debug("Stored OAuth2 token for authentication"); + } else if (security.scheme === "basic" && credentials != null) { + const creds = credentials as Record; + stored.username = (creds.username ?? creds.user) as string; + stored.password = (creds.password ?? creds.pass) as string; + debug("Stored Basic auth credentials"); + } + + // Store by security scheme name or use default key + const key = (security as Record).name ?? "default"; + this.credentials.set(key as string, stored); + } + return true; } + + /** + * Generate unique message ID for request/response correlation + */ + private generateMessageId(): string { + return `${Date.now()}-${Math.random().toString(36).substring(2, 11)}`; + } + + /** + * Convert Buffer to Readable stream + */ + private bufferToStream(buffer: Buffer): Readable { + const stream = new Readable(); + stream.push(buffer); + stream.push(null); + return stream; + } + + /** + * Extract base URL (ws://host:port) from form href + */ + private extractBaseUrl(href: string): string { + const url = new URL(href); + return `${url.protocol}//${url.host}`; + } + + /** + * Extract resource name from form href path + * Example: ws://host/thing/properties/temp -> temp + */ + private extractResourceName(href: string): string { + const url = new URL(href); + const parts = url.pathname.split("/").filter((p) => p.length > 0); + return parts.length > 0 ? parts[parts.length - 1] : ""; + } + + /** + * Extract Thing ID from form href + * For W3C Web Thing Protocol compatibility + */ + private extractThingId(href: string): string { + const url = new URL(href); + // Extract thing ID from path (typically after /things/) + const parts = url.pathname.split("/").filter((p) => p.length > 0); + const thingIndex = parts.findIndex((p) => p === "things" || p === "thing"); + if (thingIndex >= 0 && thingIndex < parts.length - 1) { + return parts[thingIndex + 1]; + } + // Fallback: use host as thing ID + return url.host; + } + + /** + * Extract WebSocket subprotocol from form if specified + */ + private extractSubprotocol(form: Form): string | undefined { + // Check various possible locations for subprotocol + if (form.subprotocol != null) { + return form.subprotocol; + } + if (form["wss:subprotocol"] != null) { + return form["wss:subprotocol"] as string; + } + if (form["ws:subprotocol"] != null) { + return form["ws:subprotocol"] as string; + } + return undefined; + } + + /** + * Detect protocol mode from form hints + * Returns 'wot' for W3C Web Thing Protocol, 'generic' otherwise + */ + private detectProtocolMode(form: Form): ProtocolMode { + // Check for explicit protocol hint + if (form["wot:protocol"] === "webthing") { + return "wot"; + } + + // Check if subprotocol indicates Web Thing Protocol + const subprotocol = this.extractSubprotocol(form); + if (subprotocol === "webthingprotocol" || subprotocol === "webthing") { + return "wot"; + } + + // Check for other hints in form + if (form["@type"] === "WebThing" || form.type === "WebThing") { + return "wot"; + } + + // Default to generic WebSocket protocol + return "generic"; + } + + /** + * Build W3C Web Thing Protocol request message + */ + private buildWoTRequest( + operation: string, + thingId: string, + resourceName: string, + data?: unknown + ): Record { + const request: Record = { + messageType: "request", + thingID: thingId, + messageID: this.generateMessageId(), + operation: operation, + name: resourceName, + }; + + // Add data based on operation type + if (data !== undefined) { + if (operation === "writeproperty") { + request.value = data; + } else if (operation === "invokeaction") { + request.input = data; + } else { + request.data = data; + } + } + + return request; + } + + /** + * Get or create WebSocket connection for the given form + */ + private async getOrCreateConnection(form: Form): Promise { + const baseUrl = this.extractBaseUrl(form.href); + + // Check if connection already exists and is open + const existing = this.connections.get(baseUrl); + if (existing != null && existing.readyState === WebSocket.OPEN) { + return existing; + } + + // Detect protocol mode + const mode = this.detectProtocolMode(form); + this.protocolMode.set(baseUrl, mode); + debug(`Using protocol mode '${mode}' for ${baseUrl}`); + + // Create new WebSocket connection + return new Promise((resolve, reject) => { + const wsOptions: WebSocket.ClientOptions = {}; + + // Add authentication headers if credentials available + const creds = this.credentials.get("default"); + if (creds != null && creds.token != null) { + wsOptions.headers = { + Authorization: `Bearer ${creds.token}`, + }; + debug(`Adding Bearer token to WebSocket connection`); + } else if (creds != null && creds.username != null && creds.password != null) { + const auth = Buffer.from(`${creds.username}:${creds.password}`).toString("base64"); + wsOptions.headers = { + Authorization: `Basic ${auth}`, + }; + debug(`Adding Basic auth to WebSocket connection`); + } + + // Extract subprotocol if specified + const subprotocol = this.extractSubprotocol(form); + const protocols = subprotocol ? [subprotocol] : undefined; + + debug(`Creating WebSocket connection to ${baseUrl}${protocols ? ` with subprotocol ${subprotocol}` : ""}`); + + const ws = new WebSocket(baseUrl, protocols, wsOptions); + + ws.on("open", () => { + info(`WebSocket connection established to ${baseUrl}`); + this.connections.set(baseUrl, ws); + resolve(ws); + }); + + ws.on("message", (data: WebSocket.Data) => { + this.handleWebSocketMessage(baseUrl, data); + }); + + ws.on("error", (err: Error) => { + error(`WebSocket error for ${baseUrl}: ${err.message}`); + // Reject pending requests + for (const [messageId, handler] of this.pendingRequests.entries()) { + handler.reject(err); + clearTimeout(handler.timeoutId); + this.pendingRequests.delete(messageId); + } + // Notify subscriptions + const subs = this.subscriptions.get(baseUrl); + if (subs != null) { + subs.forEach((handlers) => { + if (handlers.error != null) { + handlers.error(err); + } + }); + } + reject(err); + }); + + ws.on("close", (code: number, reason: string) => { + info(`WebSocket connection closed for ${baseUrl}: ${code} ${reason}`); + this.connections.delete(baseUrl); + // Complete subscriptions + const subs = this.subscriptions.get(baseUrl); + if (subs != null) { + subs.forEach((handlers) => { + if (handlers.complete != null) { + handlers.complete(); + } + }); + this.subscriptions.delete(baseUrl); + } + }); + + // Connection timeout + setTimeout(() => { + if (ws.readyState === WebSocket.CONNECTING) { + ws.close(); + reject(new Error(`WebSocket connection timeout for ${baseUrl}`)); + } + }, this.defaultTimeout); + }); + } + + /** + * Handle incoming WebSocket message + */ + private handleWebSocketMessage(baseUrl: string, data: WebSocket.Data): void { + try { + const message = JSON.parse(data.toString()); + const mode = this.protocolMode.get(baseUrl) ?? "generic"; + + if (mode === "wot") { + this.handleWoTMessage(baseUrl, message); + } else { + this.handleGenericMessage(baseUrl, message); + } + } catch (err) { + error(`Failed to parse WebSocket message: ${err}`); + } + } + + /** + * Handle W3C Web Thing Protocol message + */ + private handleWoTMessage(baseUrl: string, message: Record): void { + const messageType = message.messageType; + + if (messageType === "response" && message.correlationID != null) { + // Handle request/response correlation + const handler = this.pendingRequests.get(message.correlationID as string); + if (handler != null) { + clearTimeout(handler.timeoutId); + this.pendingRequests.delete(message.correlationID as string); + + if (message.error != null) { + const errorMsg = (message.error as Record).message; + handler.reject(new Error((errorMsg ?? "Request failed") as string)); + } else { + handler.resolve(message); + } + } + } else if (messageType === "event" || messageType === "propertyUpdate") { + // Handle subscription notifications + const resourceKey = `${baseUrl}:${message.name as string}`; + const subs = this.subscriptions.get(resourceKey); + if (subs != null) { + const value = message.data !== undefined ? message.data : message.value; + const content = new Content( + "application/json", + this.bufferToStream(Buffer.from(JSON.stringify(value))) + ); + subs.forEach((handlers) => { + try { + handlers.next(content); + } catch (err) { + error(`Error in subscription handler: ${err}`); + } + }); + } + } + } + + /** + * Handle generic WebSocket message (non-WoT protocol) + */ + private handleGenericMessage(baseUrl: string, message: Record): void { + // For generic protocol, try to correlate by any ID field + const possibleIds = [message.id, message.messageId, message.requestId]; + + for (const id of possibleIds) { + if (id != null) { + const handler = this.pendingRequests.get(id as string); + if (handler != null) { + clearTimeout(handler.timeoutId); + this.pendingRequests.delete(id as string); + handler.resolve(message); + return; + } + } + } + + // If no correlation found, might be a subscription update + // Notify all subscriptions for this base URL + const subs = this.subscriptions.get(baseUrl); + if (subs != null) { + const content = new Content( + "application/json", + this.bufferToStream(Buffer.from(JSON.stringify(message))) + ); + subs.forEach((handlers) => { + try { + handlers.next(content); + } catch (err) { + error(`Error in subscription handler: ${err}`); + } + }); + } + } + + /** + * Send request and wait for response with timeout + */ + private async sendRequest( + ws: WebSocket, + request: Record, + timeout: number = this.defaultTimeout + ): Promise { + return new Promise((resolve, reject) => { + const messageId = (request.messageID ?? request.id) as string; + + const timeoutId = setTimeout(() => { + this.pendingRequests.delete(messageId); + reject(new Error(`Request timeout after ${timeout}ms`)); + }, timeout); + + this.pendingRequests.set(messageId, { + resolve, + reject, + timeoutId, + }); + + const payload = JSON.stringify(request); + debug(`Sending WebSocket request: ${payload}`); + ws.send(payload, (err) => { + if (err != null) { + clearTimeout(timeoutId); + this.pendingRequests.delete(messageId); + reject(err); + } + }); + }); + } } From 2fa13ef8053a4c70649888eae4d006ed45b3f979 Mon Sep 17 00:00:00 2001 From: Josh Thomas Date: Wed, 4 Feb 2026 08:10:31 -0800 Subject: [PATCH 2/3] Feat: Adding websocket implementation" Signed-off-by: Josh Thomas --- packages/binding-websockets/src/ws-client.ts | 144 +++++++----- packages/binding-websockets/test/ws-tests.ts | 227 ++++++++++++++++++- 2 files changed, 319 insertions(+), 52 deletions(-) diff --git a/packages/binding-websockets/src/ws-client.ts b/packages/binding-websockets/src/ws-client.ts index 84254c9d9..9954f91ff 100644 --- a/packages/binding-websockets/src/ws-client.ts +++ b/packages/binding-websockets/src/ws-client.ts @@ -65,6 +65,7 @@ export default class WebSocketClient implements ProtocolClient { private subscriptions: Map> = new Map(); private credentials: Map = new Map(); private protocolMode: Map = new Map(); + private subscriptionTypes: Map = new Map(); private isStarted = false; // Configuration @@ -83,7 +84,7 @@ export default class WebSocketClient implements ProtocolClient { const ws = await this.getOrCreateConnection(form); const baseUrl = this.extractBaseUrl(form.href); - const mode = this.protocolMode.get(baseUrl) ?? "generic"; + const mode = this.protocolMode.get(form.href) ?? this.protocolMode.get(baseUrl) ?? "generic"; let response: unknown; @@ -120,7 +121,7 @@ export default class WebSocketClient implements ProtocolClient { const ws = await this.getOrCreateConnection(form); const baseUrl = this.extractBaseUrl(form.href); - const mode = this.protocolMode.get(baseUrl) ?? "generic"; + const mode = this.protocolMode.get(form.href) ?? this.protocolMode.get(baseUrl) ?? "generic"; // Parse content body const buffer = await content.toBuffer(); @@ -149,7 +150,7 @@ export default class WebSocketClient implements ProtocolClient { const ws = await this.getOrCreateConnection(form); const baseUrl = this.extractBaseUrl(form.href); - const mode = this.protocolMode.get(baseUrl) ?? "generic"; + const mode = this.protocolMode.get(form.href) ?? this.protocolMode.get(baseUrl) ?? "generic"; // Parse input parameters if provided let inputData: unknown; @@ -195,21 +196,26 @@ export default class WebSocketClient implements ProtocolClient { const baseUrl = this.extractBaseUrl(form.href); const resourceKey = `${baseUrl}:${this.extractResourceName(form.href)}`; + // Get subscription type to send correct unsubscribe verb + const subscriptionType = this.subscriptionTypes.get(resourceKey); + // Remove subscription handlers this.subscriptions.delete(resourceKey); + this.subscriptionTypes.delete(resourceKey); const ws = await this.getOrCreateConnection(form); - const mode = this.protocolMode.get(baseUrl) ?? "generic"; + const mode = this.protocolMode.get(form.href) ?? this.protocolMode.get(baseUrl) ?? "generic"; - if (mode === "wot") { - // Send unsubscribe request + if (mode === "wot" && subscriptionType != null) { + // Send unsubscribe request with correct WoT verb const thingId = this.extractThingId(form.href); const resourceName = this.extractResourceName(form.href); + const operation = subscriptionType === "event" ? "unsubscribeevent" : "unsubscribeproperty"; const request = { messageType: "request", thingID: thingId, messageID: this.generateMessageId(), - operation: "unsubscribe", + operation: operation, name: resourceName, }; @@ -245,6 +251,10 @@ export default class WebSocketClient implements ProtocolClient { // Determine if this is an event or property subscription const isEvent = form.op?.includes("subscribeevent") ?? form.op === "subscribeevent"; + const subscriptionType: "property" | "event" = isEvent ? "event" : "property"; + + // Store subscription type for later unsubscribe + this.subscriptionTypes.set(resourceKey, subscriptionType); if (mode === "wot") { // Send W3C Web Thing Protocol subscribe request @@ -263,6 +273,7 @@ export default class WebSocketClient implements ProtocolClient { } catch (err) { // Remove handler if subscription failed this.subscriptions.get(resourceKey)?.delete(handlers); + this.subscriptionTypes.delete(resourceKey); throw err; } } @@ -500,18 +511,20 @@ export default class WebSocketClient implements ProtocolClient { * Get or create WebSocket connection for the given form */ private async getOrCreateConnection(form: Form): Promise { + // Use full href as connection key to support multiple endpoints on same host + const connectionKey = form.href; const baseUrl = this.extractBaseUrl(form.href); // Check if connection already exists and is open - const existing = this.connections.get(baseUrl); + const existing = this.connections.get(connectionKey); if (existing != null && existing.readyState === WebSocket.OPEN) { return existing; } // Detect protocol mode const mode = this.detectProtocolMode(form); - this.protocolMode.set(baseUrl, mode); - debug(`Using protocol mode '${mode}' for ${baseUrl}`); + this.protocolMode.set(connectionKey, mode); + debug(`Using protocol mode '${mode}' for ${connectionKey}`); // Create new WebSocket connection return new Promise((resolve, reject) => { @@ -536,52 +549,55 @@ export default class WebSocketClient implements ProtocolClient { const subprotocol = this.extractSubprotocol(form); const protocols = subprotocol ? [subprotocol] : undefined; - debug(`Creating WebSocket connection to ${baseUrl}${protocols ? ` with subprotocol ${subprotocol}` : ""}`); + debug(`Creating WebSocket connection to ${form.href}${protocols ? ` with subprotocol ${subprotocol}` : ""}`); - const ws = new WebSocket(baseUrl, protocols, wsOptions); + // Connect to the full href, not just baseUrl + const ws = new WebSocket(form.href, protocols, wsOptions); ws.on("open", () => { - info(`WebSocket connection established to ${baseUrl}`); - this.connections.set(baseUrl, ws); + info(`WebSocket connection established to ${connectionKey}`); + this.connections.set(connectionKey, ws); resolve(ws); }); ws.on("message", (data: WebSocket.Data) => { - this.handleWebSocketMessage(baseUrl, data); + this.handleWebSocketMessage(connectionKey, data); }); ws.on("error", (err: Error) => { - error(`WebSocket error for ${baseUrl}: ${err.message}`); + error(`WebSocket error for ${connectionKey}: ${err.message}`); // Reject pending requests for (const [messageId, handler] of this.pendingRequests.entries()) { handler.reject(err); clearTimeout(handler.timeoutId); this.pendingRequests.delete(messageId); } - // Notify subscriptions - const subs = this.subscriptions.get(baseUrl); - if (subs != null) { - subs.forEach((handlers) => { - if (handlers.error != null) { - handlers.error(err); - } - }); + // Notify subscriptions based on baseUrl prefix + for (const [key, subs] of this.subscriptions.entries()) { + if (key.startsWith(baseUrl + ":")) { + subs.forEach((handlers) => { + if (handlers.error != null) { + handlers.error(err); + } + }); + } } reject(err); }); ws.on("close", (code: number, reason: string) => { - info(`WebSocket connection closed for ${baseUrl}: ${code} ${reason}`); - this.connections.delete(baseUrl); - // Complete subscriptions - const subs = this.subscriptions.get(baseUrl); - if (subs != null) { - subs.forEach((handlers) => { - if (handlers.complete != null) { - handlers.complete(); - } - }); - this.subscriptions.delete(baseUrl); + info(`WebSocket connection closed for ${connectionKey}: ${code} ${reason}`); + this.connections.delete(connectionKey); + // Complete subscriptions based on baseUrl prefix + for (const [key, subs] of this.subscriptions.entries()) { + if (key.startsWith(baseUrl + ":")) { + subs.forEach((handlers) => { + if (handlers.complete != null) { + handlers.complete(); + } + }); + this.subscriptions.delete(key); + } } }); @@ -589,7 +605,7 @@ export default class WebSocketClient implements ProtocolClient { setTimeout(() => { if (ws.readyState === WebSocket.CONNECTING) { ws.close(); - reject(new Error(`WebSocket connection timeout for ${baseUrl}`)); + reject(new Error(`WebSocket connection timeout for ${connectionKey}`)); } }, this.defaultTimeout); }); @@ -598,10 +614,12 @@ export default class WebSocketClient implements ProtocolClient { /** * Handle incoming WebSocket message */ - private handleWebSocketMessage(baseUrl: string, data: WebSocket.Data): void { + private handleWebSocketMessage(connectionKey: string, data: WebSocket.Data): void { try { const message = JSON.parse(data.toString()); - const mode = this.protocolMode.get(baseUrl) ?? "generic"; + const mode = this.protocolMode.get(connectionKey) ?? "generic"; + // Extract baseUrl from connectionKey for subscription lookup + const baseUrl = this.extractBaseUrl(connectionKey); if (mode === "wot") { this.handleWoTMessage(baseUrl, message); @@ -674,20 +692,44 @@ export default class WebSocketClient implements ProtocolClient { } // If no correlation found, might be a subscription update - // Notify all subscriptions for this base URL - const subs = this.subscriptions.get(baseUrl); - if (subs != null) { - const content = new Content( - "application/json", - this.bufferToStream(Buffer.from(JSON.stringify(message))) - ); - subs.forEach((handlers) => { - try { - handlers.next(content); - } catch (err) { - error(`Error in subscription handler: ${err}`); + // For generic websockets, try to extract resource name from message + // or dispatch to all subscriptions for this baseUrl + const resourceName = message.resource as string | undefined; + + if (resourceName) { + // If message has resource name, use specific key + const resourceKey = `${baseUrl}:${resourceName}`; + const subs = this.subscriptions.get(resourceKey); + if (subs != null) { + const content = new Content( + "application/json", + this.bufferToStream(Buffer.from(JSON.stringify(message))) + ); + subs.forEach((handlers) => { + try { + handlers.next(content); + } catch (err) { + error(`Error in subscription handler: ${err}`); + } + }); + } + } else { + // Otherwise, broadcast to all subscriptions for this baseUrl + for (const [key, subs] of this.subscriptions.entries()) { + if (key.startsWith(baseUrl + ":")) { + const content = new Content( + "application/json", + this.bufferToStream(Buffer.from(JSON.stringify(message))) + ); + subs.forEach((handlers) => { + try { + handlers.next(content); + } catch (err) { + error(`Error in subscription handler: ${err}`); + } + }); } - }); + } } } diff --git a/packages/binding-websockets/test/ws-tests.ts b/packages/binding-websockets/test/ws-tests.ts index 5a132cbbd..00ef28563 100644 --- a/packages/binding-websockets/test/ws-tests.ts +++ b/packages/binding-websockets/test/ws-tests.ts @@ -17,10 +17,12 @@ * Protocol test suite to test protocol implementations */ -import Servient, { createLoggers } from "@node-wot/core"; +import Servient, { createLoggers, ExposedThing } from "@node-wot/core"; import { suite, test } from "@testdeck/mocha"; import { expect, should } from "chai"; import WebSocketServer from "../src/ws-server"; +import WebSocketClient from "../src/ws-client"; +import * as WebSocket from "ws"; const { info } = createLoggers("binding-websockets", "ws-tests"); @@ -41,4 +43,227 @@ class WebSocketsTest { await wsServer.stop(); expect(wsServer.getPort()).to.eq(-1); // from getPort() when not listening } + + @test async "should handle multiple endpoints on same host"() { + // Regression test for connection key issue + // Previously, connections were keyed by ws://host:port, preventing multiple endpoints + // Now they should be keyed by full href (ws://host:port/path) + + const testPort = 31081; + const mockServer = new WebSocket.Server({ port: testPort }); + const receivedConnections: string[] = []; + + mockServer.on("connection", (ws, req) => { + receivedConnections.push(req.url || "/"); + ws.on("message", (msg) => { + const data = JSON.parse(msg.toString()); + // Echo back with id + ws.send(JSON.stringify({ id: data.id, path: req.url, value: "test" })); + }); + }); + + const client = new WebSocketClient(); + await client.start(); + + try { + // Create forms for two different endpoints on same host + const form1 = { + href: `ws://localhost:${testPort}/endpoint1`, + contentType: "application/json", + }; + const form2 = { + href: `ws://localhost:${testPort}/endpoint2`, + contentType: "application/json", + }; + + // Both should succeed and connect to different paths + const result1 = await client.readResource(form1); + const result2 = await client.readResource(form2); + + expect(receivedConnections).to.include("/endpoint1"); + expect(receivedConnections).to.include("/endpoint2"); + } finally { + await client.stop(); + mockServer.close(); + } + } + + @test async "should align subscription bookkeeping between subscribe and message handling"() { + // Regression test for subscription key mismatch + // Previously, subscribeResource used "baseUrl:resourceName" but handleGenericMessage used just "baseUrl" + // This caused subscription updates to fail + + const testPort = 31082; + const mockServer = new WebSocket.Server({ port: testPort }); + + mockServer.on("connection", (ws) => { + // Simulate subscription update after a delay + setTimeout(() => { + ws.send(JSON.stringify({ resource: "temperature", value: 42 })); + }, 100); + }); + + const client = new WebSocketClient(); + await client.start(); + + try { + const form = { + href: `ws://localhost:${testPort}/properties/temperature`, + contentType: "application/json", + }; + + let updateReceived = false; + const subscription = await client.subscribeResource( + form, + (content) => { + updateReceived = true; + }, + undefined, + undefined + ); + + // Wait for subscription update + await new Promise((resolve) => setTimeout(resolve, 200)); + + expect(updateReceived).to.be.true; + + subscription.unsubscribe(); + } finally { + await client.stop(); + mockServer.close(); + } + } + + @test async "should use correct WoT unsubscribe verbs"() { + // Regression test for non-spec "operation: unsubscribe" + // Previously used generic "unsubscribe", now should use "unsubscribeproperty" or "unsubscribeevent" + + const testPort = 31083; + const mockServer = new WebSocket.Server({ port: testPort }); + let unsubscribeOperation: string | undefined; + + mockServer.on("connection", (ws) => { + ws.on("message", (msg) => { + const data = JSON.parse(msg.toString()); + + // Capture unsubscribe operation + if (data.operation?.startsWith("unsubscribe")) { + unsubscribeOperation = data.operation; + } + + // Send appropriate response + if (data.operation === "subscribeproperty") { + ws.send( + JSON.stringify({ + messageType: "response", + correlationID: data.messageID, + }) + ); + } else if (data.operation === "unsubscribeproperty") { + ws.send( + JSON.stringify({ + messageType: "response", + correlationID: data.messageID, + }) + ); + } + }); + }); + + const client = new WebSocketClient(); + await client.start(); + + try { + const form = { + href: `ws://localhost:${testPort}/things/test/properties/temp`, + contentType: "application/json", + "wot:protocol": "webthing", + op: "subscribeproperty", + }; + + const subscription = await client.subscribeResource( + form, + () => {}, + undefined, + undefined + ); + + // Unsubscribe should trigger the correct operation + await client.unlinkResource(form); + + // Give time for message to be sent + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(unsubscribeOperation).to.equal("unsubscribeproperty"); + } finally { + await client.stop(); + mockServer.close(); + } + } + + @test async "should use unsubscribeevent for event subscriptions"() { + // Verify that event subscriptions use the correct unsubscribe verb + + const testPort = 31084; + const mockServer = new WebSocket.Server({ port: testPort }); + let unsubscribeOperation: string | undefined; + + mockServer.on("connection", (ws) => { + ws.on("message", (msg) => { + const data = JSON.parse(msg.toString()); + + // Capture unsubscribe operation + if (data.operation?.startsWith("unsubscribe")) { + unsubscribeOperation = data.operation; + } + + // Send appropriate response + if (data.operation === "subscribeevent") { + ws.send( + JSON.stringify({ + messageType: "response", + correlationID: data.messageID, + }) + ); + } else if (data.operation === "unsubscribeevent") { + ws.send( + JSON.stringify({ + messageType: "response", + correlationID: data.messageID, + }) + ); + } + }); + }); + + const client = new WebSocketClient(); + await client.start(); + + try { + const form = { + href: `ws://localhost:${testPort}/things/test/events/changed`, + contentType: "application/json", + "wot:protocol": "webthing", + op: "subscribeevent", + }; + + const subscription = await client.subscribeResource( + form, + () => {}, + undefined, + undefined + ); + + // Unsubscribe should trigger the correct operation + await client.unlinkResource(form); + + // Give time for message to be sent + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(unsubscribeOperation).to.equal("unsubscribeevent"); + } finally { + await client.stop(); + mockServer.close(); + } + } } From ef0196d8cb950ebe6dff88a1c74473ae3e289f42 Mon Sep 17 00:00:00 2001 From: Josh Thomas Date: Wed, 4 Feb 2026 08:11:13 -0800 Subject: [PATCH 3/3] Chore: formatting Signed-off-by: Josh Thomas --- packages/binding-websockets/src/ws-client.ts | 14 ++++++++++--- packages/binding-websockets/test/ws-tests.ts | 22 ++++++-------------- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/packages/binding-websockets/src/ws-client.ts b/packages/binding-websockets/src/ws-client.ts index 9954f91ff..b5ad1bf84 100644 --- a/packages/binding-websockets/src/ws-client.ts +++ b/packages/binding-websockets/src/ws-client.ts @@ -96,7 +96,10 @@ export default class WebSocketClient implements ProtocolClient { response = await this.sendRequest(ws, request); // Extract value from W3C response - const value = (response as Record).value !== undefined ? (response as Record).value : response; + const value = + (response as Record).value !== undefined + ? (response as Record).value + : response; return new Content( form.contentType ?? "application/json", this.bufferToStream(Buffer.from(JSON.stringify(value))) @@ -169,7 +172,10 @@ export default class WebSocketClient implements ProtocolClient { response = await this.sendRequest(ws, request); // Extract output from W3C response - const output = (response as Record).output !== undefined ? (response as Record).output : response; + const output = + (response as Record).output !== undefined + ? (response as Record).output + : response; return new Content( form.response?.contentType ?? form.contentType ?? "application/json", this.bufferToStream(Buffer.from(JSON.stringify(output))) @@ -549,7 +555,9 @@ export default class WebSocketClient implements ProtocolClient { const subprotocol = this.extractSubprotocol(form); const protocols = subprotocol ? [subprotocol] : undefined; - debug(`Creating WebSocket connection to ${form.href}${protocols ? ` with subprotocol ${subprotocol}` : ""}`); + debug( + `Creating WebSocket connection to ${form.href}${protocols ? ` with subprotocol ${subprotocol}` : ""}` + ); // Connect to the full href, not just baseUrl const ws = new WebSocket(form.href, protocols, wsOptions); diff --git a/packages/binding-websockets/test/ws-tests.ts b/packages/binding-websockets/test/ws-tests.ts index 00ef28563..0c8e70ba3 100644 --- a/packages/binding-websockets/test/ws-tests.ts +++ b/packages/binding-websockets/test/ws-tests.ts @@ -17,7 +17,7 @@ * Protocol test suite to test protocol implementations */ -import Servient, { createLoggers, ExposedThing } from "@node-wot/core"; +import Servient, { createLoggers } from "@node-wot/core"; import { suite, test } from "@testdeck/mocha"; import { expect, should } from "chai"; import WebSocketServer from "../src/ws-server"; @@ -54,7 +54,7 @@ class WebSocketsTest { const receivedConnections: string[] = []; mockServer.on("connection", (ws, req) => { - receivedConnections.push(req.url || "/"); + receivedConnections.push(req.url ?? "/"); ws.on("message", (msg) => { const data = JSON.parse(msg.toString()); // Echo back with id @@ -77,8 +77,8 @@ class WebSocketsTest { }; // Both should succeed and connect to different paths - const result1 = await client.readResource(form1); - const result2 = await client.readResource(form2); + await client.readResource(form1); + await client.readResource(form2); expect(receivedConnections).to.include("/endpoint1"); expect(receivedConnections).to.include("/endpoint2"); @@ -181,12 +181,7 @@ class WebSocketsTest { op: "subscribeproperty", }; - const subscription = await client.subscribeResource( - form, - () => {}, - undefined, - undefined - ); + await client.subscribeResource(form, () => {}, undefined, undefined); // Unsubscribe should trigger the correct operation await client.unlinkResource(form); @@ -247,12 +242,7 @@ class WebSocketsTest { op: "subscribeevent", }; - const subscription = await client.subscribeResource( - form, - () => {}, - undefined, - undefined - ); + await client.subscribeResource(form, () => {}, undefined, undefined); // Unsubscribe should trigger the correct operation await client.unlinkResource(form);