stream: commit pull-into descriptors after filling from queue · nodejs/node@d352b04
@@ -94,6 +94,7 @@ const {
9494 ArrayBufferViewGetByteLength,
9595 ArrayBufferViewGetByteOffset,
9696 AsyncIterator,
97+ canCopyArrayBuffer,
9798 cloneAsUint8Array,
9899 copyArrayBuffer,
99100 createPromiseCallback,
@@ -2552,6 +2553,15 @@ function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {
25522553}
25532554}
255425552556+function readableByteStreamControllerCommitPullIntoDescriptors(stream, descriptors) {
2557+for (let i = 0; i < descriptors.length; ++i) {
2558+readableByteStreamControllerCommitPullIntoDescriptor(
2559+stream,
2560+descriptors[i],
2561+);
2562+}
2563+}
2564+25552565function readableByteStreamControllerInvalidateBYOBRequest(controller) {
25562566if (controller[kState].byobRequest === null)
25572567return;
@@ -2758,11 +2768,11 @@ function readableByteStreamControllerRespondInClosedState(controller, desc) {
27582768 stream,
27592769} = controller[kState];
27602770if (readableStreamHasBYOBReader(stream)) {
2761-while (readableStreamGetNumReadIntoRequests(stream) > 0) {
2762-readableByteStreamControllerCommitPullIntoDescriptor(
2763-stream,
2764-readableByteStreamControllerShiftPendingPullInto(controller));
2771+const filledPullIntos = [];
2772+for (let i = 0; i < readableStreamGetNumReadIntoRequests(stream); ++i) {
2773+ArrayPrototypePush(filledPullIntos, readableByteStreamControllerShiftPendingPullInto(controller));
27652774}
2775+readableByteStreamControllerCommitPullIntoDescriptors(stream, filledPullIntos);
27662776}
27672777}
27682778@@ -2843,8 +2853,9 @@ function readableByteStreamControllerEnqueue(controller, chunk) {
28432853transferredBuffer,
28442854byteOffset,
28452855byteLength);
2846-readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
2856+const filledPullIntos = readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
28472857controller);
2858+readableByteStreamControllerCommitPullIntoDescriptors(stream, filledPullIntos);
28482859} else {
28492860assert(!isReadableStreamLocked(stream));
28502861readableByteStreamControllerEnqueueChunkToQueue(
@@ -2937,6 +2948,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
29372948const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
29382949let totalBytesToCopyRemaining = maxBytesToCopy;
29392950let ready = false;
2951+assert(!ArrayBufferPrototypeGetDetached(buffer));
29402952assert(bytesFilled < minimumFill);
29412953if (maxAlignedBytes >= minimumFill) {
29422954totalBytesToCopyRemaining = maxAlignedBytes - bytesFilled;
@@ -2952,12 +2964,12 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
29522964totalBytesToCopyRemaining,
29532965headOfQueue.byteLength);
29542966const destStart = byteOffset + desc.bytesFilled;
2955-const arrayBufferByteLength = ArrayBufferPrototypeGetByteLength(buffer);
2956-if (arrayBufferByteLength - destStart < bytesToCopy) {
2957-throw new ERR_INVALID_STATE.RangeError(
2958- 'view ArrayBuffer size is invalid');
2959-}
2960-assert(arrayBufferByteLength - destStart >= bytesToCopy);
2967+assert(canCopyArrayBuffer(
2968+ buffer,
2969+destStart,
2970+headOfQueue.buffer,
2971+ headOfQueue.byteOffset,
2972+ bytesToCopy));
29612973copyArrayBuffer(
29622974buffer,
29632975destStart,
@@ -2991,26 +3003,30 @@ function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
29913003const {
29923004 closeRequested,
29933005 pendingPullIntos,
2994- stream,
29953006} = controller[kState];
29963007assert(!closeRequested);
3008+const filledPullIntos = [];
29973009while (pendingPullIntos.length) {
29983010if (!controller[kState].queueTotalSize)
2999-return;
3011+break;
30003012const desc = pendingPullIntos[0];
30013013if (readableByteStreamControllerFillPullIntoDescriptorFromQueue(
30023014controller,
30033015desc)) {
30043016readableByteStreamControllerShiftPendingPullInto(controller);
3005-readableByteStreamControllerCommitPullIntoDescriptor(stream, desc);
3017+ArrayPrototypePush(filledPullIntos, desc);
30063018}
30073019}
3020+return filledPullIntos;
30083021}
3009302230103023function readableByteStreamControllerRespondInReadableState(
30113024controller,
30123025bytesWritten,
30133026desc) {
3027+const {
3028+ stream,
3029+} = controller[kState];
30143030const {
30153031 buffer,
30163032 bytesFilled,
@@ -3031,9 +3047,10 @@ function readableByteStreamControllerRespondInReadableState(
30313047controller,
30323048desc,
30333049);
3034-readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
3050+const filledPullIntos = readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
30353051controller,
30363052);
3053+readableByteStreamControllerCommitPullIntoDescriptors(stream, filledPullIntos);
30373054return;
30383055}
30393056@@ -3059,10 +3076,10 @@ function readableByteStreamControllerRespondInReadableState(
30593076ArrayBufferPrototypeGetByteLength(remainder));
30603077}
30613078desc.bytesFilled -= remainderSize;
3062-readableByteStreamControllerCommitPullIntoDescriptor(
3063- controller[kState].stream,
3064- desc);
3065-readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);
3079+const filledPullIntos = readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);
3080+3081+readableByteStreamControllerCommitPullIntoDescriptor(stream, desc);
3082+readableByteStreamControllerCommitPullIntoDescriptors(stream, filledPullIntos);
30663083}
3067308430683085function readableByteStreamControllerRespondWithNewView(controller, view) {