◐ Shell
clean mode source ↗

stream: fix nested compose error propagation · nodejs/node@3527646

Original file line numberDiff line numberDiff line change

@@ -239,7 +239,17 @@ function fromAsyncGen(fn) {

239239

_resolve({ __proto__: null, done: true, cb });

240240

},

241241

destroy(err, cb) {

242-

ac.abort();

242+

ac.abort(err);

243+
244+

// If the source async iterator is waiting for the next write/final

245+

// signal, unblock it so the readable side can observe the abort and

246+

// finish destroying.

247+

if (resolve !== null) {

248+

const _resolve = resolve;

249+

resolve = null;

250+

_resolve({ __proto__: null, done: true, cb() {} });

251+

}

252+
243253

cb(err);

244254

},

245255

};

Original file line numberDiff line numberDiff line change

@@ -8,9 +8,12 @@ const {

88

const { Buffer } = require('buffer');

99
1010

const {

11-

ERR_INVALID_ARG_TYPE,

12-

ERR_STREAM_NULL_VALUES,

13-

} = require('internal/errors').codes;

11+

aggregateTwoErrors,

12+

codes: {

13+

ERR_INVALID_ARG_TYPE,

14+

ERR_STREAM_NULL_VALUES,

15+

},

16+

} = require('internal/errors');

1417
1518

function from(Readable, iterable, opts) {

1619

let iterator;

@@ -43,6 +46,7 @@ function from(Readable, iterable, opts) {

4346

// TODO(ronag): What options should be allowed?

4447

...opts,

4548

});

49+

const originalDestroy = readable._destroy;

4650
4751

// Flag to protect against _read

4852

// being called before last iteration completion.

@@ -64,11 +68,18 @@ function from(Readable, iterable, opts) {

6468

};

6569
6670

readable._destroy = function(error, cb) {

67-

PromisePrototypeThen(

68-

close(error),

69-

() => process.nextTick(cb, error), // nextTick is here in case cb throws

70-

(e) => process.nextTick(cb, e || error),

71-

);

71+

originalDestroy.call(this, error, (destroyError) => {

72+

const combinedError = destroyError || error;

73+

PromisePrototypeThen(

74+

close(combinedError),

75+

// nextTick is here in case cb throws

76+

() => process.nextTick(cb, combinedError),

77+

(closeError) => process.nextTick(

78+

cb,

79+

aggregateTwoErrors(combinedError, closeError),

80+

),

81+

);

82+

});

7283

};

7384
7485

async function close(error) {

Original file line numberDiff line numberDiff line change

@@ -116,6 +116,24 @@ const assert = require('assert');

116116

).then(common.mustCall());

117117

}

118118
119+

{

120+

// Errors from nested `.compose()` calls should propagate instead of hanging.

121+

const stream = Readable.from(['hello'])

122+

.compose(async function *(source) { // eslint-disable-line require-yield

123+

for await (const chunk of source) {

124+

throw new Error(`boom: ${chunk}`);

125+

}

126+

})

127+

.compose(async function *(source) {

128+

yield* source;

129+

});

130+
131+

assert.rejects(

132+

stream.toArray(),

133+

/boom: hello/,

134+

).then(common.mustCall());

135+

}

136+
119137

{

120138

// AbortSignal

121139

const ac = new AbortController();