◐ Shell
clean mode source ↗

worker: enable stdio · nodejs/node@6b1a887

@@ -5,6 +5,7 @@ const EventEmitter = require('events');

55

const assert = require('assert');

66

const path = require('path');

77

const util = require('util');

8+

const { Readable, Writable } = require('stream');

89

const {

910

ERR_INVALID_ARG_TYPE,

1011

ERR_WORKER_NEED_ABSOLUTE_PATH,

@@ -29,13 +30,20 @@ const isMainThread = threadId === 0;

29303031

const kOnMessageListener = Symbol('kOnMessageListener');

3132

const kHandle = Symbol('kHandle');

33+

const kName = Symbol('kName');

3234

const kPort = Symbol('kPort');

3335

const kPublicPort = Symbol('kPublicPort');

3436

const kDispose = Symbol('kDispose');

3537

const kOnExit = Symbol('kOnExit');

3638

const kOnMessage = Symbol('kOnMessage');

3739

const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr');

3840

const kOnErrorMessage = Symbol('kOnErrorMessage');

41+

const kParentSideStdio = Symbol('kParentSideStdio');

42+

const kWritableCallbacks = Symbol('kWritableCallbacks');

43+

const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');

44+

const kStartedReading = Symbol('kStartedReading');

45+

const kWaitingStreams = Symbol('kWaitingStreams');

46+

const kIncrementsPortRef = Symbol('kIncrementsPortRef');

39474048

const debug = util.debuglog('worker');

4149

@@ -129,6 +137,72 @@ function setupPortReferencing(port, eventEmitter, eventName) {

129137

}

130138131139140+

class ReadableWorkerStdio extends Readable {

141+

constructor(port, name) {

142+

super();

143+

this[kPort] = port;

144+

this[kName] = name;

145+

this[kIncrementsPortRef] = true;

146+

this[kStartedReading] = false;

147+

this.on('end', () => {

148+

if (this[kIncrementsPortRef] && --this[kPort][kWaitingStreams] === 0)

149+

this[kPort].unref();

150+

});

151+

}

152+153+

_read() {

154+

if (!this[kStartedReading] && this[kIncrementsPortRef]) {

155+

this[kStartedReading] = true;

156+

if (this[kPort][kWaitingStreams]++ === 0)

157+

this[kPort].ref();

158+

}

159+160+

this[kPort].postMessage({

161+

type: 'stdioWantsMoreData',

162+

stream: this[kName]

163+

});

164+

}

165+

}

166+167+

class WritableWorkerStdio extends Writable {

168+

constructor(port, name) {

169+

super({ decodeStrings: false });

170+

this[kPort] = port;

171+

this[kName] = name;

172+

this[kWritableCallbacks] = [];

173+

}

174+175+

_write(chunk, encoding, cb) {

176+

this[kPort].postMessage({

177+

type: 'stdioPayload',

178+

stream: this[kName],

179+

chunk,

180+

encoding

181+

});

182+

this[kWritableCallbacks].push(cb);

183+

if (this[kPort][kWaitingStreams]++ === 0)

184+

this[kPort].ref();

185+

}

186+187+

_final(cb) {

188+

this[kPort].postMessage({

189+

type: 'stdioPayload',

190+

stream: this[kName],

191+

chunk: null

192+

});

193+

cb();

194+

}

195+196+

[kStdioWantsMoreDataCallback]() {

197+

const cbs = this[kWritableCallbacks];

198+

this[kWritableCallbacks] = [];

199+

for (const cb of cbs)

200+

cb();

201+

if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)

202+

this[kPort].unref();

203+

}

204+

}

205+132206

class Worker extends EventEmitter {

133207

constructor(filename, options = {}) {

134208

super();

@@ -154,8 +228,25 @@ class Worker extends EventEmitter {

154228

this[kPort].on('message', (data) => this[kOnMessage](data));

155229

this[kPort].start();

156230

this[kPort].unref();

231+

this[kPort][kWaitingStreams] = 0;

157232

debug(`[${threadId}] created Worker with ID ${this.threadId}`);

158233234+

let stdin = null;

235+

if (options.stdin)

236+

stdin = new WritableWorkerStdio(this[kPort], 'stdin');

237+

const stdout = new ReadableWorkerStdio(this[kPort], 'stdout');

238+

if (!options.stdout) {

239+

stdout[kIncrementsPortRef] = false;

240+

pipeWithoutWarning(stdout, process.stdout);

241+

}

242+

const stderr = new ReadableWorkerStdio(this[kPort], 'stderr');

243+

if (!options.stderr) {

244+

stderr[kIncrementsPortRef] = false;

245+

pipeWithoutWarning(stderr, process.stderr);

246+

}

247+248+

this[kParentSideStdio] = { stdin, stdout, stderr };

249+159250

const { port1, port2 } = new MessageChannel();

160251

this[kPublicPort] = port1;

161252

this[kPublicPort].on('message', (message) => this.emit('message', message));

@@ -165,7 +256,8 @@ class Worker extends EventEmitter {

165256

filename,

166257

doEval: !!options.eval,

167258

workerData: options.workerData,

168-

publicPort: port2

259+

publicPort: port2,

260+

hasStdin: !!options.stdin

169261

}, [port2]);

170262

// Actually start the new thread now that everything is in place.

171263

this[kHandle].startThread();

@@ -197,6 +289,16 @@ class Worker extends EventEmitter {

197289

return this[kOnCouldNotSerializeErr]();

198290

case 'errorMessage':

199291

return this[kOnErrorMessage](message.error);

292+

case 'stdioPayload':

293+

{

294+

const { stream, chunk, encoding } = message;

295+

return this[kParentSideStdio][stream].push(chunk, encoding);

296+

}

297+

case 'stdioWantsMoreData':

298+

{

299+

const { stream } = message;

300+

return this[kParentSideStdio][stream][kStdioWantsMoreDataCallback]();

301+

}

200302

}

201303202304

assert.fail(`Unknown worker message type ${message.type}`);

@@ -207,6 +309,18 @@ class Worker extends EventEmitter {

207309

this[kHandle] = null;

208310

this[kPort] = null;

209311

this[kPublicPort] = null;

312+313+

const { stdout, stderr } = this[kParentSideStdio];

314+

this[kParentSideStdio] = null;

315+316+

if (!stdout._readableState.ended) {

317+

debug(`[${threadId}] explicitly closes stdout for ${this.threadId}`);

318+

stdout.push(null);

319+

}

320+

if (!stderr._readableState.ended) {

321+

debug(`[${threadId}] explicitly closes stderr for ${this.threadId}`);

322+

stderr.push(null);

323+

}

210324

}

211325212326

postMessage(...args) {

@@ -243,6 +357,27 @@ class Worker extends EventEmitter {

243357244358

return this[kHandle].threadId;

245359

}

360+361+

get stdin() {

362+

return this[kParentSideStdio].stdin;

363+

}

364+365+

get stdout() {

366+

return this[kParentSideStdio].stdout;

367+

}

368+369+

get stderr() {

370+

return this[kParentSideStdio].stderr;

371+

}

372+

}

373+374+

const workerStdio = {};

375+

if (!isMainThread) {

376+

const port = getEnvMessagePort();

377+

port[kWaitingStreams] = 0;

378+

workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin');

379+

workerStdio.stdout = new WritableWorkerStdio(port, 'stdout');

380+

workerStdio.stderr = new WritableWorkerStdio(port, 'stderr');

246381

}

247382248383

let originalFatalException;

@@ -256,10 +391,14 @@ function setupChild(evalScript) {

256391257392

port.on('message', (message) => {

258393

if (message.type === 'loadScript') {

259-

const { filename, doEval, workerData, publicPort } = message;

394+

const { filename, doEval, workerData, publicPort, hasStdin } = message;

260395

publicWorker.parentPort = publicPort;

261396

setupPortReferencing(publicPort, publicPort, 'message');

262397

publicWorker.workerData = workerData;

398+399+

if (!hasStdin)

400+

workerStdio.stdin.push(null);

401+263402

debug(`[${threadId}] starts worker script ${filename} ` +

264403

`(eval = ${eval}) at cwd = ${process.cwd()}`);

265404

port.unref();

@@ -271,6 +410,14 @@ function setupChild(evalScript) {

271410

require('module').runMain();

272411

}

273412

return;

413+

} else if (message.type === 'stdioPayload') {

414+

const { stream, chunk, encoding } = message;

415+

workerStdio[stream].push(chunk, encoding);

416+

return;

417+

} else if (message.type === 'stdioWantsMoreData') {

418+

const { stream } = message;

419+

workerStdio[stream][kStdioWantsMoreDataCallback]();

420+

return;

274421

}

275422276423

assert.fail(`Unknown worker message type ${message.type}`);

@@ -317,11 +464,24 @@ function deserializeError(error) {

317464

error.byteLength).toString('utf8');

318465

}

319466467+

function pipeWithoutWarning(source, dest) {

468+

const sourceMaxListeners = source._maxListeners;

469+

const destMaxListeners = dest._maxListeners;

470+

source.setMaxListeners(Infinity);

471+

dest.setMaxListeners(Infinity);

472+473+

source.pipe(dest);

474+475+

source._maxListeners = sourceMaxListeners;

476+

dest._maxListeners = destMaxListeners;

477+

}

478+320479

module.exports = {

321480

MessagePort,

322481

MessageChannel,

323482

threadId,

324483

Worker,

325484

setupChild,

326-

isMainThread

485+

isMainThread,

486+

workerStdio

327487

};