◐ Shell
clean mode source ↗

stream: implement `min` option for `ReadableStreamBYOBReader.read` · nodejs/node@4a3ecbf

@@ -23,6 +23,7 @@ const {

2323

SymbolAsyncIterator,

2424

SymbolDispose,

2525

SymbolToStringTag,

26+

TypedArrayPrototypeGetLength,

2627

Uint8Array,

2728

} = primordials;

2829

@@ -34,6 +35,7 @@ const {

3435

ERR_INVALID_ARG_TYPE,

3536

ERR_INVALID_STATE,

3637

ERR_INVALID_THIS,

38+

ERR_OUT_OF_RANGE,

3739

},

3840

} = require('internal/errors');

3941

@@ -59,8 +61,8 @@ const {

5961

validateAbortSignal,

6062

validateBuffer,

6163

validateObject,

62-

kValidateObjectAllowNullable,

63-

kValidateObjectAllowFunction,

64+

kValidateObjectAllowObjects,

65+

kValidateObjectAllowObjectsAndNull,

6466

} = require('internal/validators');

65676668

const {

@@ -247,9 +249,9 @@ class ReadableStream {

247249

* @param {UnderlyingSource} [source]

248250

* @param {QueuingStrategy} [strategy]

249251

*/

250-

constructor(source = {}, strategy = kEmptyObject) {

251-

if (source === null)

252-

throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);

252+

constructor(source = kEmptyObject, strategy = kEmptyObject) {

253+

validateObject(source, 'source', kValidateObjectAllowObjects);

254+

validateObject(strategy, 'strategy', kValidateObjectAllowObjectsAndNull);

253255

this[kState] = createReadableStreamState();

254256255257

this[kIsClosedPromise] = createDeferredPromise();

@@ -335,7 +337,7 @@ class ReadableStream {

335337

getReader(options = kEmptyObject) {

336338

if (!isReadableStream(this))

337339

throw new ERR_INVALID_THIS('ReadableStream');

338-

validateObject(options, 'options', kValidateObjectAllowNullable | kValidateObjectAllowFunction);

340+

validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);

339341

const mode = options?.mode;

340342341343

if (mode === undefined)

@@ -373,6 +375,7 @@ class ReadableStream {

373375374376

// The web platform tests require that these be handled one at a

375377

// time and in a specific order. options can be null or undefined.

378+

validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);

376379

const preventAbort = options?.preventAbort;

377380

const preventCancel = options?.preventCancel;

378381

const preventClose = options?.preventClose;

@@ -415,6 +418,7 @@ class ReadableStream {

415418

destination);

416419

}

417420421+

validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);

418422

const preventAbort = options?.preventAbort;

419423

const preventCancel = options?.preventCancel;

420424

const preventClose = options?.preventClose;

@@ -459,10 +463,8 @@ class ReadableStream {

459463

values(options = kEmptyObject) {

460464

if (!isReadableStream(this))

461465

throw new ERR_INVALID_THIS('ReadableStream');

462-

validateObject(options, 'options');

463-

const {

464-

preventCancel = false,

465-

} = options;

466+

validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);

467+

const preventCancel = !!(options?.preventCancel);

466468467469

// eslint-disable-next-line no-use-before-define

468470

const reader = new ReadableStreamDefaultReader(this);

@@ -926,47 +928,62 @@ class ReadableStreamBYOBReader {

926928927929

/**

928930

* @param {ArrayBufferView} view

931+

* @param {{

932+

* min? : number

933+

* }} [options]

929934

* @returns {Promise<{

930-

* view : ArrayBufferView,

935+

* value : ArrayBufferView,

931936

* done : boolean,

932937

* }>}

933938

*/

934-

read(view) {

939+

async read(view, options = kEmptyObject) {

935940

if (!isReadableStreamBYOBReader(this))

936-

return PromiseReject(new ERR_INVALID_THIS('ReadableStreamBYOBReader'));

941+

throw new ERR_INVALID_THIS('ReadableStreamBYOBReader');

937942

if (!isArrayBufferView(view)) {

938-

return PromiseReject(

939-

new ERR_INVALID_ARG_TYPE(

940-

'view',

941-

[

942-

'Buffer',

943-

'TypedArray',

944-

'DataView',

945-

],

946-

view));

943+

throw new ERR_INVALID_ARG_TYPE(

944+

'view',

945+

[

946+

'Buffer',

947+

'TypedArray',

948+

'DataView',

949+

],

950+

view,

951+

);

947952

}

953+

validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);

948954949955

const viewByteLength = ArrayBufferViewGetByteLength(view);

950956

const viewBuffer = ArrayBufferViewGetBuffer(view);

951957

const viewBufferByteLength = ArrayBufferPrototypeGetByteLength(viewBuffer);

952958953959

if (viewByteLength === 0 || viewBufferByteLength === 0) {

954-

return PromiseReject(

955-

new ERR_INVALID_STATE.TypeError(

956-

'View or Viewed ArrayBuffer is zero-length or detached',

957-

),

958-

);

960+

throw new ERR_INVALID_STATE.TypeError(

961+

'View or Viewed ArrayBuffer is zero-length or detached');

959962

}

960963961964

// Supposed to assert here that the view's buffer is not

962965

// detached, but there's no API available to use to check that.

966+967+

const min = options?.min ?? 1;

968+

if (typeof min !== 'number')

969+

throw new ERR_INVALID_ARG_TYPE('options.min', 'number', min);

970+

if (!NumberIsInteger(min))

971+

throw new ERR_INVALID_ARG_VALUE('options.min', min, 'must be an integer');

972+

if (min <= 0)

973+

throw new ERR_INVALID_ARG_VALUE('options.min', min, 'must be greater than 0');

974+

if (!isDataView(view)) {

975+

if (min > TypedArrayPrototypeGetLength(view)) {

976+

throw new ERR_OUT_OF_RANGE('options.min', '<= view.length', min);

977+

}

978+

} else if (min > viewByteLength) {

979+

throw new ERR_OUT_OF_RANGE('options.min', '<= view.byteLength', min);

980+

}

981+963982

if (this[kState].stream === undefined) {

964-

return PromiseReject(

965-

new ERR_INVALID_STATE.TypeError(

966-

'The reader is not attached to a stream'));

983+

throw new ERR_INVALID_STATE.TypeError('The reader is not attached to a stream');

967984

}

968985

const readIntoRequest = new ReadIntoRequest();

969-

readableStreamBYOBReaderRead(this, view, readIntoRequest);

986+

readableStreamBYOBReaderRead(this, view, min, readIntoRequest);

970987

return readIntoRequest.promise;

971988

}

972989

@@ -1880,7 +1897,7 @@ function readableByteStreamTee(stream) {

18801897

reading = false;

18811898

},

18821899

};

1883-

readableStreamBYOBReaderRead(reader, view, readIntoRequest);

1900+

readableStreamBYOBReaderRead(reader, view, 1, readIntoRequest);

18841901

}

1885190218861903

function pull1Algorithm() {

@@ -2207,7 +2224,7 @@ function readableStreamReaderGenericRelease(reader) {

22072224

reader[kState].stream = undefined;

22082225

}

220922262210-

function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {

2227+

function readableStreamBYOBReaderRead(reader, view, min, readIntoRequest) {

22112228

const {

22122229

stream,

22132230

} = reader[kState];

@@ -2220,6 +2237,7 @@ function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {

22202237

readableByteStreamControllerPullInto(

22212238

stream[kState].controller,

22222239

view,

2240+

min,

22232241

readIntoRequest);

22242242

}

22252243

@@ -2492,7 +2510,7 @@ function readableByteStreamControllerClose(controller) {

2492251024932511

if (pendingPullIntos.length) {

24942512

const firstPendingPullInto = pendingPullIntos[0];

2495-

if (firstPendingPullInto.bytesFilled > 0) {

2513+

if (firstPendingPullInto.bytesFilled % firstPendingPullInto.elementSize !== 0) {

24962514

const error = new ERR_INVALID_STATE.TypeError('Partial read');

24972515

readableByteStreamControllerError(controller, error);

24982516

throw error;

@@ -2509,7 +2527,7 @@ function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {

2509252725102528

let done = false;

25112529

if (stream[kState].state === 'closed') {

2512-

desc.bytesFilled = 0;

2530+

assert(desc.bytesFilled % desc.elementSize === 0);

25132531

done = true;

25142532

}

25152533

@@ -2598,6 +2616,7 @@ function readableByteStreamControllerHandleQueueDrain(controller) {

25982616

function readableByteStreamControllerPullInto(

25992617

controller,

26002618

view,

2619+

min,

26012620

readIntoRequest) {

26022621

const {

26032622

closeRequested,

@@ -2610,6 +2629,11 @@ function readableByteStreamControllerPullInto(

26102629

elementSize = view.constructor.BYTES_PER_ELEMENT;

26112630

ctor = view.constructor;

26122631

}

2632+2633+

const minimumFill = min * elementSize;

2634+

assert(minimumFill >= elementSize && minimumFill <= view.byteLength);

2635+

assert(minimumFill % elementSize === 0);

2636+26132637

const buffer = ArrayBufferViewGetBuffer(view);

26142638

const byteOffset = ArrayBufferViewGetByteOffset(view);

26152639

const byteLength = ArrayBufferViewGetByteLength(view);

@@ -2628,6 +2652,7 @@ function readableByteStreamControllerPullInto(

26282652

byteOffset,

26292653

byteLength,

26302654

bytesFilled: 0,

2655+

minimumFill,

26312656

elementSize,

26322657

ctor,

26332658

type: 'byob',

@@ -2715,7 +2740,7 @@ function readableByteStreamControllerRespond(controller, bytesWritten) {

27152740

}

2716274127172742

function readableByteStreamControllerRespondInClosedState(controller, desc) {

2718-

assert(!desc.bytesFilled);

2743+

assert(desc.bytesFilled % desc.elementSize === 0);

27192744

if (desc.type === 'none') {

27202745

readableByteStreamControllerShiftPendingPullInto(controller);

27212746

}

@@ -2892,17 +2917,18 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(

28922917

byteLength,

28932918

byteOffset,

28942919

bytesFilled,

2920+

minimumFill,

28952921

elementSize,

28962922

} = desc;

2897-

const currentAlignedBytes = bytesFilled - (bytesFilled % elementSize);

28982923

const maxBytesToCopy = MathMin(

28992924

controller[kState].queueTotalSize,

29002925

byteLength - bytesFilled);

29012926

const maxBytesFilled = bytesFilled + maxBytesToCopy;

29022927

const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);

29032928

let totalBytesToCopyRemaining = maxBytesToCopy;

29042929

let ready = false;

2905-

if (maxAlignedBytes > currentAlignedBytes) {

2930+

assert(bytesFilled < minimumFill);

2931+

if (maxAlignedBytes >= minimumFill) {

29062932

totalBytesToCopyRemaining = maxAlignedBytes - bytesFilled;

29072933

ready = true;

29082934

}

@@ -2945,7 +2971,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(

29452971

if (!ready) {

29462972

assert(!controller[kState].queueTotalSize);

29472973

assert(desc.bytesFilled > 0);

2948-

assert(desc.bytesFilled < elementSize);

2974+

assert(desc.bytesFilled < minimumFill);

29492975

}

29502976

return ready;

29512977

}

@@ -3001,7 +3027,7 @@ function readableByteStreamControllerRespondInReadableState(

30013027

return;

30023028

}

300330293004-

if (desc.bytesFilled < desc.elementSize)

3030+

if (desc.bytesFilled < desc.minimumFill)

30053031

return;

3006303230073033

readableByteStreamControllerShiftPendingPullInto(controller);

@@ -3186,6 +3212,7 @@ function readableByteStreamControllerPullSteps(controller, readRequest) {

31863212

byteOffset: 0,

31873213

byteLength: autoAllocateChunkSize,

31883214

bytesFilled: 0,

3215+

minimumFill: 1,

31893216

elementSize: 1,

31903217

ctor: Uint8Array,

31913218

type: 'default',