diff --git a/index.js b/index.js index 857bd24..422b9ef 100644 --- a/index.js +++ b/index.js @@ -5,11 +5,13 @@ const MessageJoiner = require('./lib/message-joiner'); const NodeRewriter = require('./lib/node-rewriter'); const NodeStreamer = require('./lib/node-streamer'); const Headers = require('./lib/headers'); +const ChunkedPassthrough = require('./lib/chunked-passthrough'); module.exports = { Splitter: MessageSplitter, Joiner: MessageJoiner, Rewriter: NodeRewriter, Streamer: NodeStreamer, + ChunkedPassthrough, Headers }; diff --git a/lib/chunked-passthrough.js b/lib/chunked-passthrough.js new file mode 100644 index 0000000..c2c1e04 --- /dev/null +++ b/lib/chunked-passthrough.js @@ -0,0 +1,37 @@ +'use strict'; + +const { Transform } = require('stream'); + +class ChunkedPassthrough extends Transform { + constructor(options = {}) { + let config = { + readableObjectMode: true, + writableObjectMode: false + }; + super(config); + this.chunkSize = options.chunkSize || 64 * 1024; // 64KB default + this.buffer = Buffer.alloc(0); + } + + _transform(chunk, encoding, callback) { + this.buffer = Buffer.concat([this.buffer, chunk]); + + if (this.buffer.length >= this.chunkSize) { + this.push(this.buffer); + this.buffer = Buffer.alloc(0); + } + + callback(); + } + + _flush(callback) { + // Send remaining data + if (this.buffer.length > 0) { + this.push(this.buffer); + this.buffer = Buffer.alloc(0); + } + callback(); + } +} + +module.exports = ChunkedPassthrough; diff --git a/test/message-splitter-test.js b/test/message-splitter-test.js index 1e350ae..d7f4182 100644 --- a/test/message-splitter-test.js +++ b/test/message-splitter-test.js @@ -5,6 +5,7 @@ const crypto = require('crypto'); const MessageSplitter = require('../lib/message-splitter'); const MessageJoiner = require('../lib/message-joiner'); const { Readable, Transform } = require('stream'); +const ChunkedPassthrough = require('../lib/chunked-passthrough'); module.exports['Split simple message'] = test => { let splitter = new MessageSplitter(); @@ -506,7 +507,7 @@ module.exports['handles line break lines split into 2-byte chunks'] = test => { // Pipe through our chunker const chunker = new TwoByteChunker(); - source.pipe(chunker).pipe(splitter); + source.pipe(chunker).pipe(new ChunkedPassthrough()).pipe(splitter); test.expect(0); test.done();