Skip to content

Zlib Stream API corrupts data. #61202

@LRagji

Description

@LRagji

Version

v24.5.0 and even previous LTS version 22

Platform

Tried both on MAC OS(M4 arm64) and Windows 11(Intel x64)

Subsystem

node:Zlib

What steps will reproduce the bug?

I am trying to use zlib inside a transform stream which compresses multiple streams with separate zlib instances. like given below

type TSplitStreamContext = {
        redisTempKey: string,
        csvObjectStringifier?: ReturnType<typeof createObjectCsvStringifier>,
        zipper?: Gzip,
        downStream?: Stream[]
    };

const tagsStreamContext = new Map<string, TSplitStreamContext>();

const zipStream = new Transform({
        objectMode: true,
        transform(tagWiseCSVChunks: Record<string, string>, encoding, callback) {
            const tagIdVSChunkRecords = Object.entries(tagWiseCSVChunks);
            let upstreamPressure = 0;
            let downstreamBackPressure = 0;

            for (const [tagId, csvChunk] of tagIdVSChunkRecords) {
                const existingContext = tagsStreamContext.get(tagId) ?? {} as TSplitStreamContext;

                if (existingContext.zipper == null || existingContext.zipper === undefined) {
                    existingContext.zipper = createGzip();

                    existingContext.zipper.on('data', (chunk: Buffer) => {
                        if (this.push({ [tagId]: chunk }) === false) {
                            downstreamBackPressure++;
                            existingContext.zipper!.pause();
                            this.once('drain', () => {
                                downstreamBackPressure--;
                                if (downstreamBackPressure === 0) {
                                    existingContext.zipper!.resume();
                                }
                            });
                        }
                    });

                    tagsStreamContext.set(tagId, existingContext);
                }

                if (existingContext.zipper.write(csvChunk) === false) {
                    upstreamPressure++;
                    existingContext.zipper.once('drain', () => {
                        upstreamPressure--;
                        if (upstreamPressure === 0) {
                            callback();//This controls upstream flow
                        }
                    });
                }
            }
            if (upstreamPressure === 0) {
                callback();//This controls upstream flow
            }

        },

        final(callback) {
            const promiseHandles = [];

            for (const [tagId, context] of tagsStreamContext) {
                if (context.zipper !== null && context.zipper !== undefined) {
                    promiseHandles.push(new Promise<void>((resolve, reject) => {
                        context.zipper!.once('end', resolve);
                        context.zipper!.once('error', reject);
                        context.zipper!.end();
                    }));
                }
            }

            Promise.all(promiseHandles)
                .then(() => { callback(); console.log('All zippers ended.'); })
                .catch(err => callback(err));
        }
    });

I invoke this code with more stream which pass data to this transform stream like file reader, csv parser etc and downstream i have filewriter, Everything this only produces half of the data from the actual file and half is not written.

How often does it reproduce? Is there a required condition?

This happens every time.

What is the expected behavior? Why is that the expected behavior?

It should normally compress all of the file data and just act like a normal pipe operation.

What do you see instead?

It only transforms half of the file.

Important part is it works for small files <100MB but fails for any bigger files.. i assume this is something to do with its internal buffers i guess (speculation)

rename the attached code file from ts to mts for some reason github is not allowing mts extension

Additional information

The Entire code:

import { appendFileSync, createReadStream, createWriteStream, mkdirSync } from 'node:fs';
import Stream, { PassThrough, Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { createGzip, Gzip } from 'node:zlib';
import csvParser from 'csv-parser';
import redis from 'redis';
import { randomInt } from 'node:crypto';
import { createObjectCsvStringifier } from 'csv-writer';

async function streamTodo(csvFilePath: string, expiryTimeInMilliseconds: number, redisClient: redis.RedisClientType, redisTempKeyPartA = `temp-stream-${randomInt(1e6)}-`, redisTempStreamingTimeout = 10000): Promise<void> {
    type TTagStreamObject = { timestamp: number, value: number, status: number, tag: string };
    type TSplitStreamContext = {
        redisTempKey: string,
        csvObjectStringifier?: ReturnType<typeof createObjectCsvStringifier>,
        zipper?: Gzip,
        downStream?: Stream[]
    };

    const tagsStreamContext = new Map<string, TSplitStreamContext>();
    const rowAccumulatorMap = new WeakMap<Transform, Array<Record<string, string>>>();

    const inputFileStream = createReadStream(csvFilePath, { encoding: 'utf8' });

    const csvStream = csvParser();

    const rowAccumulator = new Transform({
        objectMode: true,
        highWaterMark: 100, // Adjust this value as needed for batch size.(sweet spot between memory and speed)
        transform(row: Record<string, string>, encoding, callback) {
            const acc = rowAccumulatorMap.get(this) ?? new Array<Record<string, string>>();
            acc.push(row);
            if (acc.length >= this.readableHighWaterMark) {
                callback(null, acc);
                acc.length = 0;
            } else {
                rowAccumulatorMap.set(this, acc);
                callback();
            }
        },
        flush(callback) {
            const acc = rowAccumulatorMap.get(this) ?? new Array<Record<string, string>>();
            callback(null, acc);
            acc.length = 0;
        }
    });

    const tagSegregationStream = new Transform({
        objectMode: true,
        transform(rows: Record<string, string>[], encoding, callback) {
            const tagWiseData: Record<string, TTagStreamObject[]> = {};
            for (const row of rows) {
                for (const [key, value] of Object.entries(row)) {
                    if (key !== 'timestamp' && !key.endsWith('_status')) {
                        const existingArray = tagWiseData[key] ?? [];
                        existingArray.push({
                            timestamp: parseFloat(row['timestamp']),
                            value: parseFloat(value),
                            status: parseFloat(row[`${key}_status`]),
                            tag: key
                        });
                        tagWiseData[key] = existingArray;
                    }
                }
            }
            callback(null, tagWiseData);
        }
    });

    const outputCSVStream = new Transform({
        objectMode: true,
        transform(tagWiseData: Record<string, TTagStreamObject[]>, encoding, callback) {

            const tagWiseCSVChunks: Record<string, string> = {};
            for (const [tagId, data] of Object.entries(tagWiseData)) {
                tagWiseCSVChunks[tagId] = "";
                const existingContext = tagsStreamContext.get(tagId) ?? {} as TSplitStreamContext;
                if (existingContext?.csvObjectStringifier == undefined || existingContext?.csvObjectStringifier == null) {

                    existingContext.csvObjectStringifier = createObjectCsvStringifier({
                        header: [
                            { id: 'timestamp', title: 'timestamp' },
                            { id: 'value', title: 'value' },
                            { id: 'status', title: 'status' }
                        ]
                    });
                    tagsStreamContext.set(tagId, existingContext);
                    tagWiseCSVChunks[tagId] = existingContext.csvObjectStringifier.getHeaderString() ?? "";
                }

                tagWiseCSVChunks[tagId] += existingContext.csvObjectStringifier.stringifyRecords(data);
            }

            callback(null, tagWiseCSVChunks);
        }
    });

    const zipStream = new Transform({
        objectMode: true,
        transform(tagWiseCSVChunks: Record<string, string>, encoding, callback) {
            const tagIdVSChunkRecords = Object.entries(tagWiseCSVChunks);
            let upstreamPressure = 0;
            let downstreamBackPressure = 0;

            for (const [tagId, csvChunk] of tagIdVSChunkRecords) {
                const existingContext = tagsStreamContext.get(tagId) ?? {} as TSplitStreamContext;

                if (existingContext.zipper == null || existingContext.zipper === undefined) {
                    existingContext.zipper = createGzip();

                    existingContext.zipper.on('data', (chunk: Buffer) => {
                        if (this.push({ [tagId]: chunk }) === false) {
                            downstreamBackPressure++;
                            existingContext.zipper!.pause();
                            this.once('drain', () => {
                                downstreamBackPressure--;
                                if (downstreamBackPressure === 0) {
                                    existingContext.zipper!.resume();
                                }
                            });
                        }
                    });

                    tagsStreamContext.set(tagId, existingContext);
                }

                if (existingContext.zipper.write(csvChunk) === false) {
                    upstreamPressure++;
                    existingContext.zipper.once('drain', () => {
                        upstreamPressure--;
                        if (upstreamPressure === 0) {
                            callback();//This controls upstream flow
                        }
                    });
                }
            }
            if (upstreamPressure === 0) {
                callback();//This controls upstream flow
            }

        },

        final(callback) {
            const promiseHandles = [];

            for (const [tagId, context] of tagsStreamContext) {
                if (context.zipper !== null && context.zipper !== undefined) {
                    promiseHandles.push(new Promise<void>((resolve, reject) => {
                        context.zipper!.once('end', resolve);
                        context.zipper!.once('error', reject);
                        context.zipper!.end();
                    }));
                }
            }

            Promise.all(promiseHandles)
                .then(() => { callback(); console.log('All zippers ended.'); })
                .catch(err => callback(err));
        }
    });

    const redisWriter = new Transform({
        objectMode: true,
        transform(chunk: Record<string, Buffer>, encoding, callback) {
            const promiseHandles = [];
            for (const [tagId, zippedBufferChunk] of Object.entries(chunk)) {
                const redisTempTagKey = `${redisTempKeyPartA}${tagId}`;

                const existingContext = tagsStreamContext.get(tagId) ?? {} as TSplitStreamContext;
                if (existingContext.redisTempKey == null || existingContext.redisTempKey == undefined) {
                    existingContext.redisTempKey = redisTempTagKey;
                    tagsStreamContext.set(tagId, existingContext);
                }

                promiseHandles.push(
                    redisClient.multi()
                        .append(redisTempTagKey, zippedBufferChunk)
                        .pExpire(redisTempTagKey, redisTempStreamingTimeout)
                        .exec()
                )
            }

            Promise.allSettled(promiseHandles)
                .then(() => callback())
                .catch(err => callback(err));
        },

        final(callback) {
            const promiseHandles = [];
            for (const [tagId, info] of tagsStreamContext) {
                promiseHandles.push(
                    redisClient.multi()
                        .rename(info.redisTempKey, tagId)
                        .pExpire(tagId, expiryTimeInMilliseconds)
                        .exec()
                )
            }

            Promise.allSettled(promiseHandles)
                .then(() => callback())
                .catch(err => callback(err));
        },
    });

    const fileWriter = new Transform({
        objectMode: true,
        transform(chunk: Record<string, Buffer>, encoding, callback) {
            for (const [tagId, zippedBufferChunk] of Object.entries(chunk)) {
                appendFileSync(`./zipped/${tagId}.gz`, zippedBufferChunk, { flag: 'a' });
            }
            callback();
        }
    });

    // const splitStreams = new Transform({
    //     objectMode: true,
    //     construct(callback) {
    //         callback();
    //     },
    //     transform(tagWiseCSVChunks: Record<string, Buffer>, encoding, callback) {

    //         const tagIdVSChunkRecords = Object.entries(tagWiseCSVChunks);
    //         let upstreamPressure = 0;
    //         let downstreamBackPressure = 0;

    //         for (const [tagId, csvChunk] of tagIdVSChunkRecords) {
    //             const existingContext = tagsStreamContext.get(tagId) ?? {} as TSplitStreamContext;

    //             if (Array.isArray(existingContext.downStream) === false || existingContext.downStream.length === 0) {
    //                 existingContext.downStream = [new PassThrough({ objectMode: true }), createGzip(), createWriteStream(`./zipped/${tagId}.gz`, { flags: 'a' })];

    //                 existingContext.zipper.on('data', (chunk: Buffer) => {
    //                     if (this.push({ [tagId]: chunk }) === false) {
    //                         downstreamBackPressure++;
    //                         existingContext.zipper!.pause();
    //                         this.once('drain', () => {
    //                             downstreamBackPressure--;
    //                             if (downstreamBackPressure === 0) {
    //                                 existingContext.zipper!.resume();
    //                             }
    //                         });
    //                     }
    //                 });

    //                 tagsStreamContext.set(tagId, existingContext);
    //             }

    //             if (existingContext.zipper.write(csvChunk) === false) {
    //                 upstreamPressure++;
    //                 existingContext.zipper.once('drain', () => {
    //                     upstreamPressure--;
    //                     if (upstreamPressure === 0) {
    //                         callback();//This controls upstream flow
    //                     }
    //                 });
    //             }
    //         }
    //         if (upstreamPressure === 0) {
    //             callback();//This controls upstream flow
    //         }
    //     },
    //     final(callback) {
    //         redisWriter.end();
    //         fileWriter.end();
    //         callback();
    //     }
    // });

    await pipeline(
        inputFileStream,
        csvStream,
        rowAccumulator,
        tagSegregationStream,
        outputCSVStream,
        zipStream,
        fileWriter
    );

    tagsStreamContext.clear();
}

async function mainModule() {
    mkdirSync('./zipped', { recursive: true })
    console.log('Computing...');
    await streamTodo('./1.csv', 3600000, null as any);
    // console.log('Unzipping...');
    // await unzipGzFiles('./zipped');
    // console.log('Comparing 03...');
    // await compareCsvFiles('./1.csv', './zipped/0c6ad90b-e90d-40ac-a277-169c8024a003.gz', './zipped/diff_03.csv');
    // console.log('Comparing 10...');
    // await compareCsvFiles('./1.csv', './zipped/0c6ad90b-e90d-40ac-a277-169c8024a010.gz', './zipped/diff_10.csv');
}

mainModule()
    .then(() => console.log('All operations completed successfully.'))
    .catch(err => console.error('Error during operations:', err));

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions