Skip to content

Commit 58011f6

Browse files
committed
stream: add kStreamBase marker for internal pipe optimization
1 parent 6218d14 commit 58011f6

File tree

5 files changed

+270
-2
lines changed

5 files changed

+270
-2
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
'use strict';
2+
const common = require('../common.js');
3+
const fsp = require('fs/promises');
4+
const path = require('path');
5+
const os = require('os');
6+
const { pipeline } = require('stream/promises');
7+
const {
8+
ReadableStream,
9+
WritableStream,
10+
} = require('node:stream/web');
11+
12+
const bench = common.createBenchmark(main, {
13+
type: [
14+
'node-streams',
15+
'webstream-js',
16+
'webstream-file-read',
17+
],
18+
size: [1024, 16384, 65536],
19+
n: [1e4, 1e5],
20+
});
21+
22+
async function main({ type, size, n }) {
23+
const chunk = Buffer.alloc(size, 'x');
24+
const totalBytes = size * n;
25+
26+
switch (type) {
27+
case 'node-streams': {
28+
// Baseline: Node.js streams
29+
let received = 0;
30+
const readable = new (require('stream').Readable)({
31+
read() {
32+
for (let i = 0; i < 100 && received < n; i++) {
33+
this.push(chunk);
34+
received++;
35+
}
36+
if (received >= n) this.push(null);
37+
},
38+
});
39+
40+
const writable = new (require('stream').Writable)({
41+
write(data, enc, cb) { cb(); },
42+
});
43+
44+
bench.start();
45+
await pipeline(readable, writable);
46+
bench.end(totalBytes);
47+
break;
48+
}
49+
50+
case 'webstream-js': {
51+
// Web streams with pure JS source/sink
52+
let sent = 0;
53+
const rs = new ReadableStream({
54+
pull(controller) {
55+
if (sent++ < n) {
56+
controller.enqueue(chunk);
57+
} else {
58+
controller.close();
59+
}
60+
},
61+
});
62+
63+
const ws = new WritableStream({
64+
write() {},
65+
close() { bench.end(totalBytes); },
66+
});
67+
68+
bench.start();
69+
await rs.pipeTo(ws);
70+
break;
71+
}
72+
73+
case 'webstream-file-read': {
74+
// Create a temporary file with test data
75+
const tmpDir = os.tmpdir();
76+
const tmpFile = path.join(tmpDir, `bench-webstream-${process.pid}.tmp`);
77+
78+
// Write test data to file
79+
const fd = await fsp.open(tmpFile, 'w');
80+
for (let i = 0; i < n; i++) {
81+
await fd.write(chunk);
82+
}
83+
await fd.close();
84+
85+
// Read using readableWebStream
86+
const readFd = await fsp.open(tmpFile, 'r');
87+
const rs = readFd.readableWebStream({ type: 'bytes' });
88+
89+
const ws = new WritableStream({
90+
write() {},
91+
close() {
92+
bench.end(totalBytes);
93+
// Cleanup
94+
readFd.close().then(() => fsp.unlink(tmpFile));
95+
},
96+
});
97+
98+
bench.start();
99+
await rs.pipeTo(ws);
100+
break;
101+
}
102+
103+
default:
104+
throw new Error(`Unknown type: ${type}`);
105+
}
106+
}

lib/internal/webstreams/adapters.js

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ const {
3535
ByteLengthQueuingStrategy,
3636
} = require('internal/webstreams/queuingstrategies');
3737

38+
const {
39+
kStreamBase,
40+
} = require('internal/webstreams/util');
41+
3842
const {
3943
Writable,
4044
Readable,
@@ -946,7 +950,7 @@ function newWritableStreamFromStreamBase(streamBase, strategy) {
946950
return promise.promise;
947951
}
948952

949-
return new WritableStream({
953+
const stream = new WritableStream({
950954
write(chunk, controller) {
951955
current = current !== undefined ?
952956
PromisePrototypeThen(
@@ -967,6 +971,10 @@ function newWritableStreamFromStreamBase(streamBase, strategy) {
967971
return promise.promise;
968972
},
969973
}, strategy);
974+
975+
stream[kStreamBase] = streamBase;
976+
977+
return stream;
970978
}
971979

972980
/**
@@ -1017,7 +1025,7 @@ function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyO
10171025
}
10181026
};
10191027

1020-
return new ReadableStream({
1028+
const stream = new ReadableStream({
10211029
start(c) { controller = c; },
10221030

10231031
pull() {
@@ -1040,6 +1048,10 @@ function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyO
10401048
return promise.promise;
10411049
},
10421050
}, strategy);
1051+
1052+
stream[kStreamBase] = streamBase;
1053+
1054+
return stream;
10431055
}
10441056

10451057
module.exports = {

lib/internal/webstreams/readablestream.js

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ const {
4545
DOMException,
4646
} = internalBinding('messaging');
4747

48+
const {
49+
StreamPipe,
50+
} = internalBinding('stream_pipe');
51+
4852
const {
4953
isArrayBufferView,
5054
isDataView,
@@ -114,6 +118,7 @@ const {
114118
iteratorNext,
115119
kType,
116120
kState,
121+
kStreamBase,
117122
} = require('internal/webstreams/util');
118123

119124
const {
@@ -1369,6 +1374,34 @@ function readableStreamPipeTo(
13691374
preventCancel,
13701375
signal) {
13711376

1377+
const sourceStreamBase = source[kStreamBase];
1378+
const destStreamBase = dest[kStreamBase];
1379+
1380+
if (sourceStreamBase !== undefined &&
1381+
destStreamBase !== undefined &&
1382+
signal === undefined &&
1383+
!preventClose &&
1384+
!preventAbort &&
1385+
!preventCancel) {
1386+
// Use native piping
1387+
const promise = PromiseWithResolvers();
1388+
1389+
source[kState].disturbed = true;
1390+
1391+
try {
1392+
const pipe = new StreamPipe(sourceStreamBase, destStreamBase);
1393+
pipe.onunpipe = () => {
1394+
promise.resolve();
1395+
};
1396+
pipe.start();
1397+
} catch (error) {
1398+
return PromiseReject(error);
1399+
}
1400+
1401+
return promise.promise;
1402+
}
1403+
1404+
// Use JS-based piping
13721405
let reader;
13731406
let writer;
13741407
let disposable;

lib/internal/webstreams/util.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ const {
4949

5050
const kState = Symbol('kState');
5151
const kType = Symbol('kType');
52+
const kStreamBase = Symbol('kStreamBase');
5253

5354
const AsyncIterator = {
5455
__proto__: AsyncIteratorPrototype,
@@ -296,4 +297,5 @@ module.exports = {
296297
iteratorNext,
297298
kType,
298299
kState,
300+
kStreamBase,
299301
};
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Flags: --expose-internals --no-warnings
2+
'use strict';
3+
4+
// Tests for the internal StreamBase pipe optimization infrastructure
5+
// described in nodejs/performance#134
6+
//
7+
// Note(mertcanaltin): Full fast-path testing requires real StreamBase implementations
8+
// (like HTTP/2 streams or TCP sockets), not JSStream mocks.
9+
// These tests verify the marker attachment and fallback behavior.
10+
11+
const common = require('../common');
12+
13+
const assert = require('assert');
14+
15+
const {
16+
internalBinding,
17+
} = require('internal/test/binding');
18+
19+
const {
20+
newWritableStreamFromStreamBase,
21+
newReadableStreamFromStreamBase,
22+
} = require('internal/webstreams/adapters');
23+
24+
const {
25+
kStreamBase,
26+
} = require('internal/webstreams/util');
27+
28+
const {
29+
JSStream,
30+
} = internalBinding('js_stream');
31+
32+
// Test 1: kStreamBase marker is attached to ReadableStream
33+
{
34+
const stream = new JSStream();
35+
const readable = newReadableStreamFromStreamBase(stream);
36+
37+
assert.strictEqual(readable[kStreamBase], stream);
38+
39+
// Cleanup
40+
stream.emitEOF();
41+
}
42+
43+
// Test 2: kStreamBase marker is attached to WritableStream
44+
{
45+
const stream = new JSStream();
46+
stream.onwrite = common.mustNotCall();
47+
stream.onshutdown = (req) => req.oncomplete();
48+
49+
const writable = newWritableStreamFromStreamBase(stream);
50+
51+
assert.strictEqual(writable[kStreamBase], stream);
52+
53+
// Cleanup
54+
writable.close();
55+
}
56+
57+
// Test 3: Regular JS streams don't have kStreamBase
58+
{
59+
const { ReadableStream, WritableStream } = require('stream/web');
60+
61+
const rs = new ReadableStream({
62+
pull(controller) {
63+
controller.enqueue('chunk');
64+
controller.close();
65+
},
66+
});
67+
68+
const ws = new WritableStream({
69+
write() {},
70+
});
71+
72+
assert.strictEqual(rs[kStreamBase], undefined);
73+
assert.strictEqual(ws[kStreamBase], undefined);
74+
75+
// Pipe should still work (standard path)
76+
rs.pipeTo(ws).then(common.mustCall());
77+
}
78+
79+
// Test 4: Mixed streams (one internal, one JS) use standard path
80+
{
81+
const stream = new JSStream();
82+
stream.onshutdown = (req) => req.oncomplete();
83+
const readable = newReadableStreamFromStreamBase(stream);
84+
85+
const { WritableStream } = require('stream/web');
86+
const chunks = [];
87+
const ws = new WritableStream({
88+
write(chunk) {
89+
chunks.push(chunk);
90+
},
91+
});
92+
93+
// Readable has kStreamBase, ws does not - should use standard path
94+
assert.ok(readable[kStreamBase]);
95+
assert.strictEqual(ws[kStreamBase], undefined);
96+
97+
const pipePromise = readable.pipeTo(ws);
98+
99+
stream.readBuffer(Buffer.from('hello'));
100+
stream.emitEOF();
101+
102+
pipePromise.then(common.mustCall(() => {
103+
assert.strictEqual(chunks.length, 1);
104+
}));
105+
}
106+
107+
// Test 5: Verify kStreamBase is the correct symbol from util
108+
{
109+
const {
110+
kStreamBase: kStreamBase2,
111+
} = require('internal/webstreams/util');
112+
113+
// Should be the same symbol
114+
assert.strictEqual(kStreamBase, kStreamBase2);
115+
}

0 commit comments

Comments
 (0)