A simple, easy, and lightweight Server-Sent Events (SSE) library for Node.js that simplifies managing SSE connections, broadcasting events, and maintaining reactive state. It works out of the box with any backend library that supports native IncomingMessage and ServerResponse objects for IO.
- Manage multiple SSE connections with centralized control.
- Broadcast events to channels (event names).
- Built-in support for connection and listener limits.
- Optional token-based authentication per connection.
- Heartbeat support to keep connections alive.
- Reactive state management with automatic broadcasting.
npm install evstreamWe used express.js to show you the usage. However you can use the library with any backend library or framework supporting IncomingMessage and ServerResponse objects for IO.
import { Evstream } from 'evstream'
app.get('/', (req, res) => {
const stream = new Evstream(req, res, { heartbeat: 5000 })
stream.message({ event: 'connected', data: { userId: 'a-user-id' } })
setTimeout(() => {
stream.close()
}, 5000)
})Client Recieves :
event:connected
data:{"userId":"a-user-id"}
event:heartbeat
data:
event:end
data:
app.get('/', async (req, res) => {
const stream = new Evstream(req, res, {
heartbeat: 5000,
authentication: {
method: 'query',
param: 'token',
verify: async (token) => false,
},
})
const isAuthenticated = await stream.authenticate()
if (!isAuthenticated) {
return
}
stream.message({ event: 'connected', data: { userId: 'a-user-id' } })
setTimeout(() => {
stream.close()
}, 5000)
})To test this out URL should be /?token=<auth-token>.
You'll get the query parameter value one the callback function's parameter passed to verify field.
You can either return boolean values or EvMessage.
To authenticate the incoming request there is a built-in support in evstream. You can verify the query based token verification which is generally not recommended.
- Options :
method: Authentication method to use ("query").param: Field or parameter inquerywhich holds the authentication token.verify: A callback function to check the token. Iffalsereturned req will get close.
evstream by default doesn't authenticate the request. You have to call the authenticate function from Evstream class to verify. If false returned you have to stop processing the request and return immediately.
const isAuthenticated = await stream.authenticate()Using EvStreamManager you can broadcast messages, create channels and manage connections in a much better way.
const manager = new EvStreamManager()
app.get('/', (req, res) => {
const stream = manager.createStream(req, res)
const i = setInterval(() => {
stream.message({ data: { hello: 'hii' } })
}, 2000)
stream.message({ data: { why: 'hii' }, event: 'hello' })
setTimeout(() => {
clearTimeout(i)
stream.close()
}, 10000)
})Reactive states are data which you can shared across multiple clients within the same server. Whenever the data gets updated each client listening to that data get notified with an SSE message.
-
import { EvState, EvStreamManager } from 'evstream' const manager = new EvStreamManager() const userCount = new EvState({ channel: 'user-count', initialValue: 0, manager: manager, })
To create a reactive value you can use
EvStateclass which takes achannelwhich is then listened by the connected client for any update.channel: A unique name to which client will listen to for state changes.initialValue: Default value for the state.manager: Connection manager for the connected clients.
Getting the state data
userCount.get()
Updating State data
userCount.set((prev) => (prev += 1))
This will update the values and send the data to all clients which are listening for the state changes.
-
import { EvState, EvStreamManager } from 'evstream' const manager = new EvStreamManager() const userCount = new EvState({ channel: 'user-count', initialValue: 0, manager: manager, }) app.get('/', (req, res) => { const stream = manager.createStream(req, res) stream.listen('user-count') userCount.set((user) => user + 1) const i = setInterval(() => { stream.message({ data: { hello: 'hii' } }) }, 2000) stream.message({ data: { why: 'hii' }, event: 'hello' }) setTimeout(() => { clearTimeout(i) stream.close((channels) => { userCount.set((user) => user - 1) console.log(channels) }) }, 10000) })
This will now listen for a state change in
userCountvariables and push the update to all the connected client listening for that state.See
channeland the value pass to thelisten()must be the same
When running multiple server instances, you can synchronize EvState across them using the built-in Redis adapter.
-
Install the peer dependency:
npm install ioredis
-
Use the adapter:
import { EvState, EvStreamManager } from 'evstream' import { EvRedisAdapter } from 'evstream/adapter/redis' const manager = new EvStreamManager() const redisAdapter = new EvRedisAdapter('redis://localhost:6379') const userCount = new EvState({ channel: 'user-count', initialValue: 0, manager: manager, adapter: redisAdapter, })
Updates to
userCountwill now be synchronized across all instances connected to the same Redis.
To send data to a channel you can use send() method from EvStreamManager class.
Example :
import { EvStreamManager } from 'evstream'
const manager = new EvStreamManager()
manager.send('<channel-name>', { event: 'custom-event', data: { foo: 'bar' } })To listen for data from any channel you can use listen() function from Evstream class.
client.listen('<channel-name>')When running multiple server instances, you may want state creation and removal to stay in sync across all instances.
EvStateManager helps manage shared reactive states and keeps their lifecycle consistent using Pub/Sub.
import { EvStreamManager } from 'evstream'
import { EvRedisAdapter } from 'evstream/adapter/redis'
import { EvRedisPubSub } from 'evstream/adapter/pub-sub'
import { EvStateManager } from 'evstream/state-manager'
const streamManager = new EvStreamManager()
const adapter = new EvRedisAdapter('redis://localhost:6379')
const pubsub = new EvRedisPubSub({
subject: 'ev:states',
options: { host: 'localhost', port: 6379 },
})
const stateManager = new EvStateManager({
manager: streamManager,
adapter,
pubsub,
})
const userCount = stateManager.createState('user-count', 0)Notes
- States are identified by string-based keys
- State creation and removal are synchronized across instances
- State updates are still handled by EvState
Manages a Server-Sent Events (SSE) connection. Handles headers, heartbeat intervals, authentication, sending messages, and closing the stream.
new Evstream(req: IncomingMessage, res: ServerResponse, opts?: EvOptions)req:IncomingMessageβ The incoming HTTP request.res:ServerResponseβ The HTTP response to write SSE messages to.opts(optional):EvOptionsβ Optional configuration including heartbeat interval and authentication.
Performs optional token-based authentication if opts.authentication is provided.
- If authentication fails, sends an error message and closes the connection.
- Returns
trueif authenticated,falseif rejected, orundefinedif no authentication is configured.
Sends an SSE message to the connected client.
msg:EvMessageβ Object containingevent,data, and optionallyid.
Sends a final end event and closes the SSE connection.
const ev = new Evstream(req, res, {
heartbeat: 30000,
authentication: {
param: 'token',
verify: async (token) => token === 'valid_token',
},
})
await ev.authenticate()
ev.message({ event: 'message', data: { text: 'Hello world' }, id: '1' })
ev.close()Manages multiple Server-Sent Events (SSE) client streams. Supports connection tracking, message broadcasting, and channel-based listeners.
new EvStreamManager(opts?: EvManagerOptions)opts(optional):EvManagerOptionsmaxConnection: Maximum allowed active connections (default:5000)maxListeners: Maximum listeners per channel (default:5000)id: Optional prefix for client IDs
createStream(req: IncomingMessage, res: ServerResponse, opts?: EvOptions): { authenticate, message, close, listen }
Creates and tracks a new SSE stream.
req:IncomingMessageβ Incoming HTTP requestres:ServerResponseβ HTTP response for the SSE connectionopts(optional):EvOptionsβ Optional stream config (heartbeat, authentication, etc.)
An object with methods:
authenticate(): Promise<boolean | undefined>β Authenticates the stream (delegates toEvstream)message(msg: EvMessage): voidβ Sends a message to the streamclose(onClose?: EvOnClose): voidβ Closes the stream and cleans up listenerslisten(name: string): voidβ Subscribes the stream to a named channel
Broadcasts a message to all clients listening on the specified name (channel).
name:stringβ Channel namemsg:EvMessageβ The message to broadcast
Adds a client (by ID) to a named channel. Throws EvMaxListenerError if channel exceeds max listeners.
Removes a client from a channel. Deletes the channel if no listeners remain.
const manager = new EvStreamManager()
const stream = manager.createStream(req, res)
await stream.authenticate()
stream.listen('news')
stream.message({ event: 'hello', data: 'welcome' })
manager.send('news', { event: 'news', data: 'breaking update' })Reactive state holder that broadcasts updates to a specified channel via an EvStreamManager. Designed for real-time state syncing over Server-Sent Events (SSE).
new EvState<T>({
channel,
initialValue,
manager,
key,
adapter
}: EvStateOptions<T>)channel:stringβ The name of the channel to broadcast updates to.initialValue:Tβ The initial state value.manager:EvStreamManagerβ The SSE manager instance used for broadcasting.key(optional):stringβ The key used in the broadcasted data object (default:'value').adapter(optional):EvStateAdapterβ Adapter for distributed state synchronization (e.g.EvRedisAdapter).
Returns the current value of the state.
Updates the internal state based on a callback function. If the new value is different (deep comparison), it broadcasts the updated value to the channel.
callback:(val: T) => Tβ A function that receives the current state and returns the new state.
const state = new EvState({
channel: 'counter',
initialValue: 0,
manager: evManager,
key: 'count',
})
state.set((prev) => prev + 1)
// Will broadcast: { event: 'counter', data: { count: 1 } }
const current = state.get()
// current === 1Represents an error thrown when the number of active SSE connections exceeds the allowed maxConnection limit (default: 5000).
new EvMaxConnectionsError(connections: number)connections:numberβ The current number of active connections when the limit is exceeded.
const manager = new EvStreamManager({ maxConnection: 100 })
if (tooManyConnections) {
throw new EvMaxConnectionsError(100)
}Adapter for synchronizing EvState across multiple instances using Redis Pub/Sub.
new EvRedisAdapter(options?: RedisOptions | string)options:RedisOptions | stringβ Configuration options for the Redis client (fromioredis), or a Redis connection URL.
Represents an error thrown when the number of listeners on a given channel exceeds the allowed maxListeners limit (default: 5000).
new EvMaxListenerError(listeners: number, channel: string)listeners:numberβ The current number of listeners on the channel.channel:stringβ The name of the channel that exceeded the listener limit.
if (tooManyListenersOnChannel) {
throw new EvMaxListenerError(5000, 'news')
}type EvEventsType = 'data' | 'error' | 'end'Represents built-in event types commonly used in Server-Sent Events.
interface EvMessage {
event?: string | EvEventsType
data: string | object
id?: string
}Represents a message sent to the client via SSE.
event(optional): Name of the event.data: The payload to send. Can be a string or an object.id(optional): Event ID for reconnection tracking.
interface EvAuthenticationOptions {
method: 'query'
param: string
verify: (token: string) => Promise<EvMessage> | undefined | null | boolean
}Options for enabling query-based token authentication.
method: Always'query'param: Name of the query parameter containing the token.verify: Async verification function. Can return:true(authenticated)false(rejected)EvMessage(custom response)undefined/null(unauthenticated)
interface EvOptions {
authentication?: EvAuthenticationOptions
heartbeat?: number
}Optional config for an individual SSE stream.
authentication: Auth configuration (seeEvAuthenticationOptions)heartbeat: Interval in milliseconds for sending heartbeat events
interface EvManagerOptions {
id?: string
maxConnection?: number
maxListeners?: number
}Configuration for EvStreamManager.
id: Optional prefix for client IDsmaxConnection: Max allowed connections (default:5000)maxListeners: Max listeners per channel (default:5000)
interface EvStateOptions<T> {
initialValue: T
channel: string
manager: EvStreamManager
key?: string
}Options for initializing a reactive state with EvState.
initialValue: Initial state valuechannel: Channel name for broadcastingmanager: Instance ofEvStreamManagerkey(optional): Key for wrapping state in the broadcast (default:'value')adapter(optional): Instance ofEvStateAdapter(e.g.,EvRedisAdapter) for distributed synchronization.
type EvOnClose = (channels: string[]) => Promise<void>Callback triggered when a client connection is closed. Receives a list of channels the client was subscribed to.
Manages a collection of shared reactive states and synchronizes their creation and removal across multiple instances using Pub/Sub.
new EvStateManager<S>({
manager,
adapter?,
pubsub?
})-
manager:EvStreamManagerStream manager used by all states. -
adapter(optional):EvRedisAdapterAdapter used byEvStatefor distributed state updates. -
pubsub(optional):EvRedisPubSubPub/Sub instance used to synchronize state lifecycle (create/remove).
Creates a new state or returns an existing one.
- Creates the state locally
- Broadcasts creation to other instances (if Pub/Sub is enabled)
Returns an existing state if it exists.
Checks whether a state exists.
Removes a state locally and broadcasts the removal to other instances.
const state = stateManager.createState('user-count', 0)
state.set((v) => v + 1)
stateManager.removeState('user-count')Lightweight Redis-based Pub/Sub utility used to synchronize events between server instances.
new EvRedisPubSub({
subject,
options,
onMessage?
})-
subject:stringRedis channel name used for Pub/Sub. -
options:RedisOptionsRedis connection options (ioredis). -
onMessage(optional):(message: any) => voidCallback invoked when a message is received.
Publishes a message to the configured Redis channel.
- Automatically filters out self-published messages.
Registers or replaces the message handler.
Closes Redis publisher and subscriber connections.
const pubsub = new EvRedisPubSub({
subject: 'ev:states',
options: { host: 'localhost', port: 6379 },
})
pubsub.onMessage((msg) => {
console.log('received:', msg)
})
await pubsub.send({ type: 'create', channel: 'user-count' })Contributions are welcome! Whether it's a bug fix, feature request, or improvement to documentation, your help is appreciated.
-
Fork the repository.
-
Create a branch for your feature or fix:
git checkout -b feature/your-feature-name
-
Commit your changes with a clear message.
-
Push to your fork:
git push origin feature/your-feature-name
-
Open a Pull Request and describe your changes.
- Keep your code clean and consistent with the project's existing style.
- Include relevant tests and documentation updates.
- Make sure the project builds and passes all existing checks.
This project is licensed under the MIT License.
You are free to use, modify, distribute, and sublicense this software for both personal and commercial use β provided that the original license and copyright notice are included in all copies.
See the LICENSE file for full details.