stream: implement `min` option for `ReadableStreamBYOBReader.read` · nodejs/node@4a3ecbf
@@ -23,6 +23,7 @@ const {
2323 SymbolAsyncIterator,
2424 SymbolDispose,
2525 SymbolToStringTag,
26+ TypedArrayPrototypeGetLength,
2627 Uint8Array,
2728} = primordials;
2829@@ -34,6 +35,7 @@ const {
3435ERR_INVALID_ARG_TYPE,
3536ERR_INVALID_STATE,
3637ERR_INVALID_THIS,
38+ERR_OUT_OF_RANGE,
3739},
3840} = require('internal/errors');
3941@@ -59,8 +61,8 @@ const {
5961 validateAbortSignal,
6062 validateBuffer,
6163 validateObject,
62-kValidateObjectAllowNullable,
63-kValidateObjectAllowFunction,
64+kValidateObjectAllowObjects,
65+kValidateObjectAllowObjectsAndNull,
6466} = require('internal/validators');
65676668const {
@@ -247,9 +249,9 @@ class ReadableStream {
247249 * @param {UnderlyingSource} [source]
248250 * @param {QueuingStrategy} [strategy]
249251 */
250-constructor(source = {}, strategy = kEmptyObject) {
251-if (source === null)
252- throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
252+constructor(source = kEmptyObject, strategy = kEmptyObject) {
253+validateObject(source, 'source', kValidateObjectAllowObjects);
254+validateObject(strategy, 'strategy', kValidateObjectAllowObjectsAndNull);
253255this[kState] = createReadableStreamState();
254256255257this[kIsClosedPromise] = createDeferredPromise();
@@ -335,7 +337,7 @@ class ReadableStream {
335337getReader(options = kEmptyObject) {
336338if (!isReadableStream(this))
337339throw new ERR_INVALID_THIS('ReadableStream');
338-validateObject(options, 'options', kValidateObjectAllowNullable | kValidateObjectAllowFunction);
340+validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
339341const mode = options?.mode;
340342341343if (mode === undefined)
@@ -373,6 +375,7 @@ class ReadableStream {
373375374376// The web platform tests require that these be handled one at a
375377// time and in a specific order. options can be null or undefined.
378+validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
376379const preventAbort = options?.preventAbort;
377380const preventCancel = options?.preventCancel;
378381const preventClose = options?.preventClose;
@@ -415,6 +418,7 @@ class ReadableStream {
415418destination);
416419}
417420421+validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
418422const preventAbort = options?.preventAbort;
419423const preventCancel = options?.preventCancel;
420424const preventClose = options?.preventClose;
@@ -459,10 +463,8 @@ class ReadableStream {
459463values(options = kEmptyObject) {
460464if (!isReadableStream(this))
461465throw new ERR_INVALID_THIS('ReadableStream');
462-validateObject(options, 'options');
463-const {
464- preventCancel = false,
465-} = options;
466+validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
467+const preventCancel = !!(options?.preventCancel);
466468467469// eslint-disable-next-line no-use-before-define
468470const reader = new ReadableStreamDefaultReader(this);
@@ -926,47 +928,62 @@ class ReadableStreamBYOBReader {
926928927929/**
928930 * @param {ArrayBufferView} view
931+ * @param {{
932+ * min? : number
933+ * }} [options]
929934 * @returns {Promise<{
930- * view : ArrayBufferView,
935+ * value : ArrayBufferView,
931936 * done : boolean,
932937 * }>}
933938 */
934-read(view) {
939+async read(view, options = kEmptyObject) {
935940if (!isReadableStreamBYOBReader(this))
936-return PromiseReject(new ERR_INVALID_THIS('ReadableStreamBYOBReader'));
941+throw new ERR_INVALID_THIS('ReadableStreamBYOBReader');
937942if (!isArrayBufferView(view)) {
938-return PromiseReject(
939-new ERR_INVALID_ARG_TYPE(
940- 'view',
941-[
942- 'Buffer',
943- 'TypedArray',
944- 'DataView',
945- ],
946- view));
943+throw new ERR_INVALID_ARG_TYPE(
944+'view',
945+[
946+'Buffer',
947+'TypedArray',
948+'DataView',
949+],
950+view,
951+);
947952}
953+validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
948954949955const viewByteLength = ArrayBufferViewGetByteLength(view);
950956const viewBuffer = ArrayBufferViewGetBuffer(view);
951957const viewBufferByteLength = ArrayBufferPrototypeGetByteLength(viewBuffer);
952958953959if (viewByteLength === 0 || viewBufferByteLength === 0) {
954-return PromiseReject(
955-new ERR_INVALID_STATE.TypeError(
956-'View or Viewed ArrayBuffer is zero-length or detached',
957-),
958-);
960+throw new ERR_INVALID_STATE.TypeError(
961+'View or Viewed ArrayBuffer is zero-length or detached');
959962}
960963961964// Supposed to assert here that the view's buffer is not
962965// detached, but there's no API available to use to check that.
966+967+const min = options?.min ?? 1;
968+if (typeof min !== 'number')
969+throw new ERR_INVALID_ARG_TYPE('options.min', 'number', min);
970+if (!NumberIsInteger(min))
971+throw new ERR_INVALID_ARG_VALUE('options.min', min, 'must be an integer');
972+if (min <= 0)
973+throw new ERR_INVALID_ARG_VALUE('options.min', min, 'must be greater than 0');
974+if (!isDataView(view)) {
975+if (min > TypedArrayPrototypeGetLength(view)) {
976+throw new ERR_OUT_OF_RANGE('options.min', '<= view.length', min);
977+}
978+} else if (min > viewByteLength) {
979+throw new ERR_OUT_OF_RANGE('options.min', '<= view.byteLength', min);
980+}
981+963982if (this[kState].stream === undefined) {
964-return PromiseReject(
965-new ERR_INVALID_STATE.TypeError(
966-'The reader is not attached to a stream'));
983+throw new ERR_INVALID_STATE.TypeError('The reader is not attached to a stream');
967984}
968985const readIntoRequest = new ReadIntoRequest();
969-readableStreamBYOBReaderRead(this, view, readIntoRequest);
986+readableStreamBYOBReaderRead(this, view, min, readIntoRequest);
970987return readIntoRequest.promise;
971988}
972989@@ -1880,7 +1897,7 @@ function readableByteStreamTee(stream) {
18801897reading = false;
18811898},
18821899};
1883-readableStreamBYOBReaderRead(reader, view, readIntoRequest);
1900+readableStreamBYOBReaderRead(reader, view, 1, readIntoRequest);
18841901}
1885190218861903function pull1Algorithm() {
@@ -2207,7 +2224,7 @@ function readableStreamReaderGenericRelease(reader) {
22072224reader[kState].stream = undefined;
22082225}
220922262210-function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
2227+function readableStreamBYOBReaderRead(reader, view, min, readIntoRequest) {
22112228const {
22122229 stream,
22132230} = reader[kState];
@@ -2220,6 +2237,7 @@ function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
22202237readableByteStreamControllerPullInto(
22212238stream[kState].controller,
22222239view,
2240+min,
22232241readIntoRequest);
22242242}
22252243@@ -2492,7 +2510,7 @@ function readableByteStreamControllerClose(controller) {
2492251024932511if (pendingPullIntos.length) {
24942512const firstPendingPullInto = pendingPullIntos[0];
2495-if (firstPendingPullInto.bytesFilled > 0) {
2513+if (firstPendingPullInto.bytesFilled % firstPendingPullInto.elementSize !== 0) {
24962514const error = new ERR_INVALID_STATE.TypeError('Partial read');
24972515readableByteStreamControllerError(controller, error);
24982516throw error;
@@ -2509,7 +2527,7 @@ function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {
2509252725102528let done = false;
25112529if (stream[kState].state === 'closed') {
2512-desc.bytesFilled = 0;
2530+assert(desc.bytesFilled % desc.elementSize === 0);
25132531done = true;
25142532}
25152533@@ -2598,6 +2616,7 @@ function readableByteStreamControllerHandleQueueDrain(controller) {
25982616function readableByteStreamControllerPullInto(
25992617controller,
26002618view,
2619+min,
26012620readIntoRequest) {
26022621const {
26032622 closeRequested,
@@ -2610,6 +2629,11 @@ function readableByteStreamControllerPullInto(
26102629elementSize = view.constructor.BYTES_PER_ELEMENT;
26112630ctor = view.constructor;
26122631}
2632+2633+const minimumFill = min * elementSize;
2634+assert(minimumFill >= elementSize && minimumFill <= view.byteLength);
2635+assert(minimumFill % elementSize === 0);
2636+26132637const buffer = ArrayBufferViewGetBuffer(view);
26142638const byteOffset = ArrayBufferViewGetByteOffset(view);
26152639const byteLength = ArrayBufferViewGetByteLength(view);
@@ -2628,6 +2652,7 @@ function readableByteStreamControllerPullInto(
26282652 byteOffset,
26292653 byteLength,
26302654bytesFilled: 0,
2655+ minimumFill,
26312656 elementSize,
26322657 ctor,
26332658type: 'byob',
@@ -2715,7 +2740,7 @@ function readableByteStreamControllerRespond(controller, bytesWritten) {
27152740}
2716274127172742function readableByteStreamControllerRespondInClosedState(controller, desc) {
2718-assert(!desc.bytesFilled);
2743+assert(desc.bytesFilled % desc.elementSize === 0);
27192744if (desc.type === 'none') {
27202745readableByteStreamControllerShiftPendingPullInto(controller);
27212746}
@@ -2892,17 +2917,18 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
28922917 byteLength,
28932918 byteOffset,
28942919 bytesFilled,
2920+ minimumFill,
28952921 elementSize,
28962922} = desc;
2897-const currentAlignedBytes = bytesFilled - (bytesFilled % elementSize);
28982923const maxBytesToCopy = MathMin(
28992924controller[kState].queueTotalSize,
29002925byteLength - bytesFilled);
29012926const maxBytesFilled = bytesFilled + maxBytesToCopy;
29022927const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
29032928let totalBytesToCopyRemaining = maxBytesToCopy;
29042929let ready = false;
2905-if (maxAlignedBytes > currentAlignedBytes) {
2930+assert(bytesFilled < minimumFill);
2931+if (maxAlignedBytes >= minimumFill) {
29062932totalBytesToCopyRemaining = maxAlignedBytes - bytesFilled;
29072933ready = true;
29082934}
@@ -2945,7 +2971,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
29452971if (!ready) {
29462972assert(!controller[kState].queueTotalSize);
29472973assert(desc.bytesFilled > 0);
2948-assert(desc.bytesFilled < elementSize);
2974+assert(desc.bytesFilled < minimumFill);
29492975}
29502976return ready;
29512977}
@@ -3001,7 +3027,7 @@ function readableByteStreamControllerRespondInReadableState(
30013027return;
30023028}
300330293004-if (desc.bytesFilled < desc.elementSize)
3030+if (desc.bytesFilled < desc.minimumFill)
30053031return;
3006303230073033readableByteStreamControllerShiftPendingPullInto(controller);
@@ -3186,6 +3212,7 @@ function readableByteStreamControllerPullSteps(controller, readRequest) {
31863212byteOffset: 0,
31873213byteLength: autoAllocateChunkSize,
31883214bytesFilled: 0,
3215+minimumFill: 1,
31893216elementSize: 1,
31903217ctor: Uint8Array,
31913218type: 'default',