-
-
Notifications
You must be signed in to change notification settings - Fork 34.2k
Description
Version
v24.5.0 and even previous LTS version 22
Platform
Tried both on MAC OS(M4 arm64) and Windows 11(Intel x64)
Subsystem
node:Zlib
What steps will reproduce the bug?
I am trying to use zlib inside a transform stream which compresses multiple streams with separate zlib instances. like given below
type TSplitStreamContext = {
redisTempKey: string,
csvObjectStringifier?: ReturnType<typeof createObjectCsvStringifier>,
zipper?: Gzip,
downStream?: Stream[]
};
const tagsStreamContext = new Map<string, TSplitStreamContext>();
const zipStream = new Transform({
objectMode: true,
transform(tagWiseCSVChunks: Record<string, string>, encoding, callback) {
const tagIdVSChunkRecords = Object.entries(tagWiseCSVChunks);
let upstreamPressure = 0;
let downstreamBackPressure = 0;
for (const [tagId, csvChunk] of tagIdVSChunkRecords) {
const existingContext = tagsStreamContext.get(tagId) ?? {} as TSplitStreamContext;
if (existingContext.zipper == null || existingContext.zipper === undefined) {
existingContext.zipper = createGzip();
existingContext.zipper.on('data', (chunk: Buffer) => {
if (this.push({ [tagId]: chunk }) === false) {
downstreamBackPressure++;
existingContext.zipper!.pause();
this.once('drain', () => {
downstreamBackPressure--;
if (downstreamBackPressure === 0) {
existingContext.zipper!.resume();
}
});
}
});
tagsStreamContext.set(tagId, existingContext);
}
if (existingContext.zipper.write(csvChunk) === false) {
upstreamPressure++;
existingContext.zipper.once('drain', () => {
upstreamPressure--;
if (upstreamPressure === 0) {
callback();//This controls upstream flow
}
});
}
}
if (upstreamPressure === 0) {
callback();//This controls upstream flow
}
},
final(callback) {
const promiseHandles = [];
for (const [tagId, context] of tagsStreamContext) {
if (context.zipper !== null && context.zipper !== undefined) {
promiseHandles.push(new Promise<void>((resolve, reject) => {
context.zipper!.once('end', resolve);
context.zipper!.once('error', reject);
context.zipper!.end();
}));
}
}
Promise.all(promiseHandles)
.then(() => { callback(); console.log('All zippers ended.'); })
.catch(err => callback(err));
}
});I invoke this code with more stream which pass data to this transform stream like file reader, csv parser etc and downstream i have filewriter, Everything this only produces half of the data from the actual file and half is not written.
How often does it reproduce? Is there a required condition?
This happens every time.
What is the expected behavior? Why is that the expected behavior?
It should normally compress all of the file data and just act like a normal pipe operation.
What do you see instead?
It only transforms half of the file.
Important part is it works for small files <100MB but fails for any bigger files.. i assume this is something to do with its internal buffers i guess (speculation)
rename the attached code file from ts to mts for some reason github is not allowing mts extension
Additional information
The Entire code:
import { appendFileSync, createReadStream, createWriteStream, mkdirSync } from 'node:fs';
import Stream, { PassThrough, Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { createGzip, Gzip } from 'node:zlib';
import csvParser from 'csv-parser';
import redis from 'redis';
import { randomInt } from 'node:crypto';
import { createObjectCsvStringifier } from 'csv-writer';
async function streamTodo(csvFilePath: string, expiryTimeInMilliseconds: number, redisClient: redis.RedisClientType, redisTempKeyPartA = `temp-stream-${randomInt(1e6)}-`, redisTempStreamingTimeout = 10000): Promise<void> {
type TTagStreamObject = { timestamp: number, value: number, status: number, tag: string };
type TSplitStreamContext = {
redisTempKey: string,
csvObjectStringifier?: ReturnType<typeof createObjectCsvStringifier>,
zipper?: Gzip,
downStream?: Stream[]
};
const tagsStreamContext = new Map<string, TSplitStreamContext>();
const rowAccumulatorMap = new WeakMap<Transform, Array<Record<string, string>>>();
const inputFileStream = createReadStream(csvFilePath, { encoding: 'utf8' });
const csvStream = csvParser();
const rowAccumulator = new Transform({
objectMode: true,
highWaterMark: 100, // Adjust this value as needed for batch size.(sweet spot between memory and speed)
transform(row: Record<string, string>, encoding, callback) {
const acc = rowAccumulatorMap.get(this) ?? new Array<Record<string, string>>();
acc.push(row);
if (acc.length >= this.readableHighWaterMark) {
callback(null, acc);
acc.length = 0;
} else {
rowAccumulatorMap.set(this, acc);
callback();
}
},
flush(callback) {
const acc = rowAccumulatorMap.get(this) ?? new Array<Record<string, string>>();
callback(null, acc);
acc.length = 0;
}
});
const tagSegregationStream = new Transform({
objectMode: true,
transform(rows: Record<string, string>[], encoding, callback) {
const tagWiseData: Record<string, TTagStreamObject[]> = {};
for (const row of rows) {
for (const [key, value] of Object.entries(row)) {
if (key !== 'timestamp' && !key.endsWith('_status')) {
const existingArray = tagWiseData[key] ?? [];
existingArray.push({
timestamp: parseFloat(row['timestamp']),
value: parseFloat(value),
status: parseFloat(row[`${key}_status`]),
tag: key
});
tagWiseData[key] = existingArray;
}
}
}
callback(null, tagWiseData);
}
});
const outputCSVStream = new Transform({
objectMode: true,
transform(tagWiseData: Record<string, TTagStreamObject[]>, encoding, callback) {
const tagWiseCSVChunks: Record<string, string> = {};
for (const [tagId, data] of Object.entries(tagWiseData)) {
tagWiseCSVChunks[tagId] = "";
const existingContext = tagsStreamContext.get(tagId) ?? {} as TSplitStreamContext;
if (existingContext?.csvObjectStringifier == undefined || existingContext?.csvObjectStringifier == null) {
existingContext.csvObjectStringifier = createObjectCsvStringifier({
header: [
{ id: 'timestamp', title: 'timestamp' },
{ id: 'value', title: 'value' },
{ id: 'status', title: 'status' }
]
});
tagsStreamContext.set(tagId, existingContext);
tagWiseCSVChunks[tagId] = existingContext.csvObjectStringifier.getHeaderString() ?? "";
}
tagWiseCSVChunks[tagId] += existingContext.csvObjectStringifier.stringifyRecords(data);
}
callback(null, tagWiseCSVChunks);
}
});
const zipStream = new Transform({
objectMode: true,
transform(tagWiseCSVChunks: Record<string, string>, encoding, callback) {
const tagIdVSChunkRecords = Object.entries(tagWiseCSVChunks);
let upstreamPressure = 0;
let downstreamBackPressure = 0;
for (const [tagId, csvChunk] of tagIdVSChunkRecords) {
const existingContext = tagsStreamContext.get(tagId) ?? {} as TSplitStreamContext;
if (existingContext.zipper == null || existingContext.zipper === undefined) {
existingContext.zipper = createGzip();
existingContext.zipper.on('data', (chunk: Buffer) => {
if (this.push({ [tagId]: chunk }) === false) {
downstreamBackPressure++;
existingContext.zipper!.pause();
this.once('drain', () => {
downstreamBackPressure--;
if (downstreamBackPressure === 0) {
existingContext.zipper!.resume();
}
});
}
});
tagsStreamContext.set(tagId, existingContext);
}
if (existingContext.zipper.write(csvChunk) === false) {
upstreamPressure++;
existingContext.zipper.once('drain', () => {
upstreamPressure--;
if (upstreamPressure === 0) {
callback();//This controls upstream flow
}
});
}
}
if (upstreamPressure === 0) {
callback();//This controls upstream flow
}
},
final(callback) {
const promiseHandles = [];
for (const [tagId, context] of tagsStreamContext) {
if (context.zipper !== null && context.zipper !== undefined) {
promiseHandles.push(new Promise<void>((resolve, reject) => {
context.zipper!.once('end', resolve);
context.zipper!.once('error', reject);
context.zipper!.end();
}));
}
}
Promise.all(promiseHandles)
.then(() => { callback(); console.log('All zippers ended.'); })
.catch(err => callback(err));
}
});
const redisWriter = new Transform({
objectMode: true,
transform(chunk: Record<string, Buffer>, encoding, callback) {
const promiseHandles = [];
for (const [tagId, zippedBufferChunk] of Object.entries(chunk)) {
const redisTempTagKey = `${redisTempKeyPartA}${tagId}`;
const existingContext = tagsStreamContext.get(tagId) ?? {} as TSplitStreamContext;
if (existingContext.redisTempKey == null || existingContext.redisTempKey == undefined) {
existingContext.redisTempKey = redisTempTagKey;
tagsStreamContext.set(tagId, existingContext);
}
promiseHandles.push(
redisClient.multi()
.append(redisTempTagKey, zippedBufferChunk)
.pExpire(redisTempTagKey, redisTempStreamingTimeout)
.exec()
)
}
Promise.allSettled(promiseHandles)
.then(() => callback())
.catch(err => callback(err));
},
final(callback) {
const promiseHandles = [];
for (const [tagId, info] of tagsStreamContext) {
promiseHandles.push(
redisClient.multi()
.rename(info.redisTempKey, tagId)
.pExpire(tagId, expiryTimeInMilliseconds)
.exec()
)
}
Promise.allSettled(promiseHandles)
.then(() => callback())
.catch(err => callback(err));
},
});
const fileWriter = new Transform({
objectMode: true,
transform(chunk: Record<string, Buffer>, encoding, callback) {
for (const [tagId, zippedBufferChunk] of Object.entries(chunk)) {
appendFileSync(`./zipped/${tagId}.gz`, zippedBufferChunk, { flag: 'a' });
}
callback();
}
});
// const splitStreams = new Transform({
// objectMode: true,
// construct(callback) {
// callback();
// },
// transform(tagWiseCSVChunks: Record<string, Buffer>, encoding, callback) {
// const tagIdVSChunkRecords = Object.entries(tagWiseCSVChunks);
// let upstreamPressure = 0;
// let downstreamBackPressure = 0;
// for (const [tagId, csvChunk] of tagIdVSChunkRecords) {
// const existingContext = tagsStreamContext.get(tagId) ?? {} as TSplitStreamContext;
// if (Array.isArray(existingContext.downStream) === false || existingContext.downStream.length === 0) {
// existingContext.downStream = [new PassThrough({ objectMode: true }), createGzip(), createWriteStream(`./zipped/${tagId}.gz`, { flags: 'a' })];
// existingContext.zipper.on('data', (chunk: Buffer) => {
// if (this.push({ [tagId]: chunk }) === false) {
// downstreamBackPressure++;
// existingContext.zipper!.pause();
// this.once('drain', () => {
// downstreamBackPressure--;
// if (downstreamBackPressure === 0) {
// existingContext.zipper!.resume();
// }
// });
// }
// });
// tagsStreamContext.set(tagId, existingContext);
// }
// if (existingContext.zipper.write(csvChunk) === false) {
// upstreamPressure++;
// existingContext.zipper.once('drain', () => {
// upstreamPressure--;
// if (upstreamPressure === 0) {
// callback();//This controls upstream flow
// }
// });
// }
// }
// if (upstreamPressure === 0) {
// callback();//This controls upstream flow
// }
// },
// final(callback) {
// redisWriter.end();
// fileWriter.end();
// callback();
// }
// });
await pipeline(
inputFileStream,
csvStream,
rowAccumulator,
tagSegregationStream,
outputCSVStream,
zipStream,
fileWriter
);
tagsStreamContext.clear();
}
async function mainModule() {
mkdirSync('./zipped', { recursive: true })
console.log('Computing...');
await streamTodo('./1.csv', 3600000, null as any);
// console.log('Unzipping...');
// await unzipGzFiles('./zipped');
// console.log('Comparing 03...');
// await compareCsvFiles('./1.csv', './zipped/0c6ad90b-e90d-40ac-a277-169c8024a003.gz', './zipped/diff_03.csv');
// console.log('Comparing 10...');
// await compareCsvFiles('./1.csv', './zipped/0c6ad90b-e90d-40ac-a277-169c8024a010.gz', './zipped/diff_10.csv');
}
mainModule()
.then(() => console.log('All operations completed successfully.'))
.catch(err => console.error('Error during operations:', err));