worker: enable stdio · nodejs/node@6b1a887
@@ -5,6 +5,7 @@ const EventEmitter = require('events');
55const assert = require('assert');
66const path = require('path');
77const util = require('util');
8+const { Readable, Writable } = require('stream');
89const {
910ERR_INVALID_ARG_TYPE,
1011ERR_WORKER_NEED_ABSOLUTE_PATH,
@@ -29,13 +30,20 @@ const isMainThread = threadId === 0;
29303031const kOnMessageListener = Symbol('kOnMessageListener');
3132const kHandle = Symbol('kHandle');
33+const kName = Symbol('kName');
3234const kPort = Symbol('kPort');
3335const kPublicPort = Symbol('kPublicPort');
3436const kDispose = Symbol('kDispose');
3537const kOnExit = Symbol('kOnExit');
3638const kOnMessage = Symbol('kOnMessage');
3739const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr');
3840const kOnErrorMessage = Symbol('kOnErrorMessage');
41+const kParentSideStdio = Symbol('kParentSideStdio');
42+const kWritableCallbacks = Symbol('kWritableCallbacks');
43+const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
44+const kStartedReading = Symbol('kStartedReading');
45+const kWaitingStreams = Symbol('kWaitingStreams');
46+const kIncrementsPortRef = Symbol('kIncrementsPortRef');
39474048const debug = util.debuglog('worker');
4149@@ -129,6 +137,72 @@ function setupPortReferencing(port, eventEmitter, eventName) {
129137}
130138131139140+class ReadableWorkerStdio extends Readable {
141+constructor(port, name) {
142+super();
143+this[kPort] = port;
144+this[kName] = name;
145+this[kIncrementsPortRef] = true;
146+this[kStartedReading] = false;
147+this.on('end', () => {
148+if (this[kIncrementsPortRef] && --this[kPort][kWaitingStreams] === 0)
149+this[kPort].unref();
150+});
151+}
152+153+_read() {
154+if (!this[kStartedReading] && this[kIncrementsPortRef]) {
155+this[kStartedReading] = true;
156+if (this[kPort][kWaitingStreams]++ === 0)
157+this[kPort].ref();
158+}
159+160+this[kPort].postMessage({
161+type: 'stdioWantsMoreData',
162+stream: this[kName]
163+});
164+}
165+}
166+167+class WritableWorkerStdio extends Writable {
168+constructor(port, name) {
169+super({ decodeStrings: false });
170+this[kPort] = port;
171+this[kName] = name;
172+this[kWritableCallbacks] = [];
173+}
174+175+_write(chunk, encoding, cb) {
176+this[kPort].postMessage({
177+type: 'stdioPayload',
178+stream: this[kName],
179+ chunk,
180+ encoding
181+});
182+this[kWritableCallbacks].push(cb);
183+if (this[kPort][kWaitingStreams]++ === 0)
184+this[kPort].ref();
185+}
186+187+_final(cb) {
188+this[kPort].postMessage({
189+type: 'stdioPayload',
190+stream: this[kName],
191+chunk: null
192+});
193+cb();
194+}
195+196+[kStdioWantsMoreDataCallback]() {
197+const cbs = this[kWritableCallbacks];
198+this[kWritableCallbacks] = [];
199+for (const cb of cbs)
200+cb();
201+if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)
202+this[kPort].unref();
203+}
204+}
205+132206class Worker extends EventEmitter {
133207constructor(filename, options = {}) {
134208super();
@@ -154,8 +228,25 @@ class Worker extends EventEmitter {
154228this[kPort].on('message', (data) => this[kOnMessage](data));
155229this[kPort].start();
156230this[kPort].unref();
231+this[kPort][kWaitingStreams] = 0;
157232debug(`[${threadId}] created Worker with ID ${this.threadId}`);
158233234+let stdin = null;
235+if (options.stdin)
236+stdin = new WritableWorkerStdio(this[kPort], 'stdin');
237+const stdout = new ReadableWorkerStdio(this[kPort], 'stdout');
238+if (!options.stdout) {
239+stdout[kIncrementsPortRef] = false;
240+pipeWithoutWarning(stdout, process.stdout);
241+}
242+const stderr = new ReadableWorkerStdio(this[kPort], 'stderr');
243+if (!options.stderr) {
244+stderr[kIncrementsPortRef] = false;
245+pipeWithoutWarning(stderr, process.stderr);
246+}
247+248+this[kParentSideStdio] = { stdin, stdout, stderr };
249+159250const { port1, port2 } = new MessageChannel();
160251this[kPublicPort] = port1;
161252this[kPublicPort].on('message', (message) => this.emit('message', message));
@@ -165,7 +256,8 @@ class Worker extends EventEmitter {
165256 filename,
166257doEval: !!options.eval,
167258workerData: options.workerData,
168-publicPort: port2
259+publicPort: port2,
260+hasStdin: !!options.stdin
169261}, [port2]);
170262// Actually start the new thread now that everything is in place.
171263this[kHandle].startThread();
@@ -197,6 +289,16 @@ class Worker extends EventEmitter {
197289return this[kOnCouldNotSerializeErr]();
198290case 'errorMessage':
199291return this[kOnErrorMessage](message.error);
292+case 'stdioPayload':
293+{
294+const { stream, chunk, encoding } = message;
295+return this[kParentSideStdio][stream].push(chunk, encoding);
296+}
297+case 'stdioWantsMoreData':
298+{
299+const { stream } = message;
300+return this[kParentSideStdio][stream][kStdioWantsMoreDataCallback]();
301+}
200302}
201303202304assert.fail(`Unknown worker message type ${message.type}`);
@@ -207,6 +309,18 @@ class Worker extends EventEmitter {
207309this[kHandle] = null;
208310this[kPort] = null;
209311this[kPublicPort] = null;
312+313+const { stdout, stderr } = this[kParentSideStdio];
314+this[kParentSideStdio] = null;
315+316+if (!stdout._readableState.ended) {
317+debug(`[${threadId}] explicitly closes stdout for ${this.threadId}`);
318+stdout.push(null);
319+}
320+if (!stderr._readableState.ended) {
321+debug(`[${threadId}] explicitly closes stderr for ${this.threadId}`);
322+stderr.push(null);
323+}
210324}
211325212326postMessage(...args) {
@@ -243,6 +357,27 @@ class Worker extends EventEmitter {
243357244358return this[kHandle].threadId;
245359}
360+361+get stdin() {
362+return this[kParentSideStdio].stdin;
363+}
364+365+get stdout() {
366+return this[kParentSideStdio].stdout;
367+}
368+369+get stderr() {
370+return this[kParentSideStdio].stderr;
371+}
372+}
373+374+const workerStdio = {};
375+if (!isMainThread) {
376+const port = getEnvMessagePort();
377+port[kWaitingStreams] = 0;
378+workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin');
379+workerStdio.stdout = new WritableWorkerStdio(port, 'stdout');
380+workerStdio.stderr = new WritableWorkerStdio(port, 'stderr');
246381}
247382248383let originalFatalException;
@@ -256,10 +391,14 @@ function setupChild(evalScript) {
256391257392port.on('message', (message) => {
258393if (message.type === 'loadScript') {
259-const { filename, doEval, workerData, publicPort } = message;
394+const { filename, doEval, workerData, publicPort, hasStdin } = message;
260395publicWorker.parentPort = publicPort;
261396setupPortReferencing(publicPort, publicPort, 'message');
262397publicWorker.workerData = workerData;
398+399+if (!hasStdin)
400+workerStdio.stdin.push(null);
401+263402debug(`[${threadId}] starts worker script ${filename} ` +
264403`(eval = ${eval}) at cwd = ${process.cwd()}`);
265404port.unref();
@@ -271,6 +410,14 @@ function setupChild(evalScript) {
271410require('module').runMain();
272411}
273412return;
413+} else if (message.type === 'stdioPayload') {
414+const { stream, chunk, encoding } = message;
415+workerStdio[stream].push(chunk, encoding);
416+return;
417+} else if (message.type === 'stdioWantsMoreData') {
418+const { stream } = message;
419+workerStdio[stream][kStdioWantsMoreDataCallback]();
420+return;
274421}
275422276423assert.fail(`Unknown worker message type ${message.type}`);
@@ -317,11 +464,24 @@ function deserializeError(error) {
317464error.byteLength).toString('utf8');
318465}
319466467+function pipeWithoutWarning(source, dest) {
468+const sourceMaxListeners = source._maxListeners;
469+const destMaxListeners = dest._maxListeners;
470+source.setMaxListeners(Infinity);
471+dest.setMaxListeners(Infinity);
472+473+source.pipe(dest);
474+475+source._maxListeners = sourceMaxListeners;
476+dest._maxListeners = destMaxListeners;
477+}
478+320479module.exports = {
321480 MessagePort,
322481 MessageChannel,
323482 threadId,
324483 Worker,
325484 setupChild,
326- isMainThread
485+ isMainThread,
486+ workerStdio
327487};