quic: fix a handful of bugs and missing functionality · nodejs/node@7f3a85e
@@ -443,63 +443,76 @@ function createBlobReaderStream(reader) {
443443// There really should only be one read at a time so using an
444444// array here is purely defensive.
445445this.pendingPulls = [];
446+// Register a wakeup callback that the C++ side can invoke
447+// when new data is available after a STATUS_BLOCK.
448+reader.setWakeup(() => {
449+if (this.pendingPulls.length > 0) {
450+this.readNext(c);
451+}
452+});
446453},
447454pull(c) {
448455const { promise, resolve, reject } = PromiseWithResolvers();
449456this.pendingPulls.push({ resolve, reject });
450-const readNext = () => {
451-reader.pull((status, buffer) => {
452-// If pendingPulls is empty here, the stream had to have
453-// been canceled, and we don't really care about the result.
454-// We can simply exit.
455-if (this.pendingPulls.length === 0) {
456-return;
457-}
458-if (status === 0) {
459-// EOS
460-c.close();
461-// This is to signal the end for byob readers
462-// see https://streams.spec.whatwg.org/#example-rbs-pull
463-c.byobRequest?.respond(0);
464-const pending = this.pendingPulls.shift();
465-pending.resolve();
466-return;
467-} else if (status < 0) {
468-// The read could fail for many different reasons when reading
469-// from a non-memory resident blob part (e.g. file-backed blob).
470-// The error details the system error code.
471-const error = lazyDOMException('The blob could not be read', 'NotReadableError');
472-const pending = this.pendingPulls.shift();
473-c.error(error);
474-pending.reject(error);
457+this.readNext(c);
458+return promise;
459+},
460+readNext(c) {
461+reader.pull((status, buffer) => {
462+// If pendingPulls is empty here, the stream had to have
463+// been canceled, and we don't really care about the result.
464+// We can simply exit.
465+if (this.pendingPulls.length === 0) {
466+return;
467+}
468+if (status === 0) {
469+// EOS
470+c.close();
471+// This is to signal the end for byob readers
472+// see https://streams.spec.whatwg.org/#example-rbs-pull
473+c.byobRequest?.respond(0);
474+const pending = this.pendingPulls.shift();
475+pending.resolve();
476+return;
477+} else if (status < 0) {
478+// The read could fail for many different reasons when reading
479+// from a non-memory resident blob part (e.g. file-backed blob).
480+// The error details the system error code.
481+const error =
482+lazyDOMException('The blob could not be read',
483+'NotReadableError');
484+const pending = this.pendingPulls.shift();
485+c.error(error);
486+pending.reject(error);
487+return;
488+} else if (status === 2) {
489+// STATUS_BLOCK: No data available yet. The wakeup callback
490+// registered in start() will re-invoke readNext when data
491+// arrives.
492+return;
493+}
494+// ReadableByteStreamController.enqueue errors if we submit a
495+// 0-length buffer. We need to check for that here.
496+if (buffer !== undefined && buffer.byteLength !== 0) {
497+c.enqueue(new Uint8Array(buffer));
498+}
499+// We keep reading until we either reach EOS, some error, or
500+// we hit the flow rate of the stream (c.desiredSize).
501+// We use setImmediate here because we have to allow the event
502+// loop to turn in order to process any pending i/o. Using
503+// queueMicrotask won't allow the event loop to turn.
504+setImmediate(() => {
505+if (c.desiredSize < 0) {
506+// A manual backpressure check.
507+if (this.pendingPulls.length !== 0) {
508+const pending = this.pendingPulls.shift();
509+pending.resolve();
510+}
475511return;
476512}
477-// ReadableByteStreamController.enqueue errors if we submit a 0-length
478-// buffer. We need to check for that here.
479-if (buffer !== undefined && buffer.byteLength !== 0) {
480-c.enqueue(new Uint8Array(buffer));
481-}
482-// We keep reading until we either reach EOS, some error, or we
483-// hit the flow rate of the stream (c.desiredSize).
484-// We use set immediate here because we have to allow the event
485-// loop to turn in order to process any pending i/o. Using
486-// queueMicrotask won't allow the event loop to turn.
487-setImmediate(() => {
488-if (c.desiredSize < 0) {
489-// A manual backpressure check.
490-if (this.pendingPulls.length !== 0) {
491-// A case of waiting pull finished (= not yet canceled)
492-const pending = this.pendingPulls.shift();
493-pending.resolve();
494-}
495-return;
496-}
497-readNext();
498-});
513+this.readNext(c);
499514});
500-};
501-readNext();
502-return promise;
515+});
503516},
504517cancel(reason) {
505518// Reject any currently pending pulls here.