◐ Shell
clean mode source ↗

quic: fix a handful of bugs and missing functionality · nodejs/node@7f3a85e

@@ -443,63 +443,76 @@ function createBlobReaderStream(reader) {

443443

// There really should only be one read at a time so using an

444444

// array here is purely defensive.

445445

this.pendingPulls = [];

446+

// Register a wakeup callback that the C++ side can invoke

447+

// when new data is available after a STATUS_BLOCK.

448+

reader.setWakeup(() => {

449+

if (this.pendingPulls.length > 0) {

450+

this.readNext(c);

451+

}

452+

});

446453

},

447454

pull(c) {

448455

const { promise, resolve, reject } = PromiseWithResolvers();

449456

this.pendingPulls.push({ resolve, reject });

450-

const readNext = () => {

451-

reader.pull((status, buffer) => {

452-

// If pendingPulls is empty here, the stream had to have

453-

// been canceled, and we don't really care about the result.

454-

// We can simply exit.

455-

if (this.pendingPulls.length === 0) {

456-

return;

457-

}

458-

if (status === 0) {

459-

// EOS

460-

c.close();

461-

// This is to signal the end for byob readers

462-

// see https://streams.spec.whatwg.org/#example-rbs-pull

463-

c.byobRequest?.respond(0);

464-

const pending = this.pendingPulls.shift();

465-

pending.resolve();

466-

return;

467-

} else if (status < 0) {

468-

// The read could fail for many different reasons when reading

469-

// from a non-memory resident blob part (e.g. file-backed blob).

470-

// The error details the system error code.

471-

const error = lazyDOMException('The blob could not be read', 'NotReadableError');

472-

const pending = this.pendingPulls.shift();

473-

c.error(error);

474-

pending.reject(error);

457+

this.readNext(c);

458+

return promise;

459+

},

460+

readNext(c) {

461+

reader.pull((status, buffer) => {

462+

// If pendingPulls is empty here, the stream had to have

463+

// been canceled, and we don't really care about the result.

464+

// We can simply exit.

465+

if (this.pendingPulls.length === 0) {

466+

return;

467+

}

468+

if (status === 0) {

469+

// EOS

470+

c.close();

471+

// This is to signal the end for byob readers

472+

// see https://streams.spec.whatwg.org/#example-rbs-pull

473+

c.byobRequest?.respond(0);

474+

const pending = this.pendingPulls.shift();

475+

pending.resolve();

476+

return;

477+

} else if (status < 0) {

478+

// The read could fail for many different reasons when reading

479+

// from a non-memory resident blob part (e.g. file-backed blob).

480+

// The error details the system error code.

481+

const error =

482+

lazyDOMException('The blob could not be read',

483+

'NotReadableError');

484+

const pending = this.pendingPulls.shift();

485+

c.error(error);

486+

pending.reject(error);

487+

return;

488+

} else if (status === 2) {

489+

// STATUS_BLOCK: No data available yet. The wakeup callback

490+

// registered in start() will re-invoke readNext when data

491+

// arrives.

492+

return;

493+

}

494+

// ReadableByteStreamController.enqueue errors if we submit a

495+

// 0-length buffer. We need to check for that here.

496+

if (buffer !== undefined && buffer.byteLength !== 0) {

497+

c.enqueue(new Uint8Array(buffer));

498+

}

499+

// We keep reading until we either reach EOS, some error, or

500+

// we hit the flow rate of the stream (c.desiredSize).

501+

// We use setImmediate here because we have to allow the event

502+

// loop to turn in order to process any pending i/o. Using

503+

// queueMicrotask won't allow the event loop to turn.

504+

setImmediate(() => {

505+

if (c.desiredSize < 0) {

506+

// A manual backpressure check.

507+

if (this.pendingPulls.length !== 0) {

508+

const pending = this.pendingPulls.shift();

509+

pending.resolve();

510+

}

475511

return;

476512

}

477-

// ReadableByteStreamController.enqueue errors if we submit a 0-length

478-

// buffer. We need to check for that here.

479-

if (buffer !== undefined && buffer.byteLength !== 0) {

480-

c.enqueue(new Uint8Array(buffer));

481-

}

482-

// We keep reading until we either reach EOS, some error, or we

483-

// hit the flow rate of the stream (c.desiredSize).

484-

// We use set immediate here because we have to allow the event

485-

// loop to turn in order to process any pending i/o. Using

486-

// queueMicrotask won't allow the event loop to turn.

487-

setImmediate(() => {

488-

if (c.desiredSize < 0) {

489-

// A manual backpressure check.

490-

if (this.pendingPulls.length !== 0) {

491-

// A case of waiting pull finished (= not yet canceled)

492-

const pending = this.pendingPulls.shift();

493-

pending.resolve();

494-

}

495-

return;

496-

}

497-

readNext();

498-

});

513+

this.readNext(c);

499514

});

500-

};

501-

readNext();

502-

return promise;

515+

});

503516

},

504517

cancel(reason) {

505518

// Reject any currently pending pulls here.