◐ Shell
clean mode source ↗

http2: add diagnostics channels for client stream request body · nodejs/node@a38e2f5

Original file line numberDiff line numberDiff line change

@@ -1243,6 +1243,23 @@ Emitted when an error occurs during the processing of a stream on the client.

12431243
12441244

Emitted when a stream is received on the client.

12451245
1246+

##### Event: `'http2.client.stream.bodyChunkSent'`

1247+
1248+

* `stream` {ClientHttp2Stream}

1249+

* `writev` {boolean}

1250+

* `data` {Buffer | string | Buffer\[] | Object\[]}

1251+

* `chunk` {Buffer|string}

1252+

* `encoding` {string}

1253+

* `encoding` {string}

1254+
1255+

Emitted when a chunk of the client stream body is being sent.

1256+
1257+

##### Event: `'http2.client.stream.bodySent'`

1258+
1259+

* `stream` {ClientHttp2Stream}

1260+
1261+

Emitted after the client stream body has been fully sent.

1262+
12461263

##### Event: `'http2.client.stream.close'`

12471264
12481265

* `stream` {ClientHttp2Stream}

Original file line numberDiff line numberDiff line change

@@ -190,6 +190,8 @@ const dc = require('diagnostics_channel');

190190

const onClientStreamCreatedChannel = dc.channel('http2.client.stream.created');

191191

const onClientStreamStartChannel = dc.channel('http2.client.stream.start');

192192

const onClientStreamErrorChannel = dc.channel('http2.client.stream.error');

193+

const onClientStreamBodyChunkSentChannel = dc.channel('http2.client.stream.bodyChunkSent');

194+

const onClientStreamBodySentChannel = dc.channel('http2.client.stream.bodySent');

193195

const onClientStreamFinishChannel = dc.channel('http2.client.stream.finish');

194196

const onClientStreamCloseChannel = dc.channel('http2.client.stream.close');

195197

const onServerStreamCreatedChannel = dc.channel('http2.server.stream.created');

@@ -2300,6 +2302,15 @@ class Http2Stream extends Duplex {

23002302

req = writeGeneric(this, data, encoding, writeCallback);

23012303
23022304

trackWriteState(this, req.bytes);

2305+
2306+

if (this.session[kType] === NGHTTP2_SESSION_CLIENT && onClientStreamBodyChunkSentChannel.hasSubscribers) {

2307+

onClientStreamBodyChunkSentChannel.publish({

2308+

stream: this,

2309+

writev,

2310+

data,

2311+

encoding,

2312+

});

2313+

}

23032314

}

23042315
23052316

_write(data, encoding, cb) {

@@ -2317,6 +2328,10 @@ class Http2Stream extends Duplex {

23172328

}

23182329

debugStreamObj(this, 'shutting down writable on _final');

23192330

ReflectApply(shutdownWritable, this, [cb]);

2331+
2332+

if (this.session[kType] === NGHTTP2_SESSION_CLIENT && onClientStreamBodySentChannel.hasSubscribers) {

2333+

onClientStreamBodySentChannel.publish({ stream: this });

2334+

}

23202335

}

23212336
23222337

_read(nread) {

Original file line numberDiff line numberDiff line change

@@ -0,0 +1,73 @@

1+

'use strict';

2+
3+

const common = require('../common');

4+

if (!common.hasCrypto)

5+

common.skip('missing crypto');

6+
7+

// This test ensures that the built-in HTTP/2 diagnostics channels are reporting

8+

// the diagnostics messages for the 'http2.client.stream.bodyChunkSent' and

9+

// 'http2.client.stream.bodySent' channels when ClientHttp2Streams bodies are

10+

// being sent with multiple Buffers and strings.

11+
12+

const assert = require('assert');

13+

const dc = require('diagnostics_channel');

14+

const http2 = require('http2');

15+

const { Duplex } = require('stream');

16+
17+

let bodyChunkSent = false;

18+
19+

dc.subscribe('http2.client.stream.bodyChunkSent', common.mustCall(({ stream, writev, data, encoding }) => {

20+

// Since ClientHttp2Stream is not exported from any module, this just checks

21+

// if the stream is an instance of Duplex.

22+

assert.ok(stream instanceof Duplex);

23+

assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');

24+
25+

assert.strictEqual(writev, true);

26+
27+

assert.ok(Array.isArray(data));

28+

assert.strictEqual(data.length, 3);

29+
30+

assert.strictEqual(data[0].chunk, 'héllo');

31+

assert.strictEqual(data[0].encoding, 'latin1');

32+
33+

assert.ok(Buffer.from('foo').equals(data[1].chunk));

34+

assert.strictEqual(data[1].encoding, 'buffer');

35+
36+

assert.ok(Buffer.from('bar').equals(data[2].chunk));

37+

assert.strictEqual(data[2].encoding, 'buffer');

38+
39+

assert.strictEqual(encoding, '');

40+
41+

bodyChunkSent = true;

42+

}));

43+
44+

dc.subscribe('http2.client.stream.bodySent', common.mustCall(({ stream }) => {

45+

// 'http2.client.stream.bodyChunkSent' must run first.

46+

assert.ok(bodyChunkSent);

47+
48+

// Since ClientHttp2Stream is not exported from any module, this just checks

49+

// if the stream is an instance of Duplex.

50+

assert.ok(stream instanceof Duplex);

51+

assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');

52+

}));

53+
54+

const server = http2.createServer();

55+

server.on('stream', common.mustCall((stream) => {

56+

stream.respond({}, { endStream: true });

57+

}));

58+
59+

server.listen(0, common.mustCall(() => {

60+

const port = server.address().port;

61+

const client = http2.connect(`http://localhost:${port}`);

62+
63+

const stream = client.request({ [http2.constants.HTTP2_HEADER_METHOD]: 'POST' });

64+

stream.write('héllo', 'latin1');

65+

stream.write(Buffer.from('foo'));

66+

stream.write(new TextEncoder().encode('bar'));

67+

stream.end();

68+
69+

stream.on('response', common.mustCall(() => {

70+

client.close();

71+

server.close();

72+

}));

73+

}, 1));

Original file line numberDiff line numberDiff line change

@@ -0,0 +1,66 @@

1+

'use strict';

2+
3+

const common = require('../common');

4+

if (!common.hasCrypto)

5+

common.skip('missing crypto');

6+
7+

// This test ensures that the built-in HTTP/2 diagnostics channels are reporting

8+

// the diagnostics messages for the 'http2.client.stream.bodyChunkSent' and

9+

// 'http2.client.stream.bodySent' channels when ClientHttp2Streams bodies are

10+

// being sent with multiple Buffers.

11+
12+

const assert = require('assert');

13+

const dc = require('diagnostics_channel');

14+

const http2 = require('http2');

15+

const { Duplex } = require('stream');

16+
17+

let bodyChunkSent = false;

18+
19+

dc.subscribe('http2.client.stream.bodyChunkSent', common.mustCall(({ stream, writev, data, encoding }) => {

20+

// Since ClientHttp2Stream is not exported from any module, this just checks

21+

// if the stream is an instance of Duplex.

22+

assert.ok(stream instanceof Duplex);

23+

assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');

24+
25+

assert.strictEqual(writev, true);

26+
27+

assert.ok(Array.isArray(data));

28+

assert.strictEqual(data.length, 2);

29+
30+

assert.ok(Buffer.from('foo').equals(data[0]));

31+

assert.ok(Buffer.from('bar').equals(data[1]));

32+
33+

assert.strictEqual(encoding, '');

34+
35+

bodyChunkSent = true;

36+

}));

37+
38+

dc.subscribe('http2.client.stream.bodySent', common.mustCall(({ stream }) => {

39+

// 'http2.client.stream.bodyChunkSent' must run first.

40+

assert.ok(bodyChunkSent);

41+
42+

// Since ClientHttp2Stream is not exported from any module, this just checks

43+

// if the stream is an instance of Duplex.

44+

assert.ok(stream instanceof Duplex);

45+

assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');

46+

}));

47+
48+

const server = http2.createServer();

49+

server.on('stream', common.mustCall((stream) => {

50+

stream.respond({}, { endStream: true });

51+

}));

52+
53+

server.listen(0, common.mustCall(() => {

54+

const port = server.address().port;

55+

const client = http2.connect(`http://localhost:${port}`);

56+
57+

const stream = client.request({ [http2.constants.HTTP2_HEADER_METHOD]: 'POST' });

58+

stream.write(Buffer.from('foo'));

59+

stream.write(Buffer.from('bar'));

60+

stream.end();

61+
62+

stream.on('response', common.mustCall(() => {

63+

client.close();

64+

server.close();

65+

}));

66+

}, 1));

Original file line numberDiff line numberDiff line change

@@ -0,0 +1,42 @@

1+

'use strict';

2+
3+

const common = require('../common');

4+

if (!common.hasCrypto)

5+

common.skip('missing crypto');

6+
7+

// This test ensures that the built-in HTTP/2 diagnostics channels are reporting

8+

// the diagnostics messages for the 'http2.client.stream.bodyChunkSent' and

9+

// 'http2.client.stream.bodySent' channels when ClientHttp2Streams bodies are

10+

// being sent with no chunks.

11+
12+

const assert = require('assert');

13+

const dc = require('diagnostics_channel');

14+

const http2 = require('http2');

15+

const { Duplex } = require('stream');

16+
17+

dc.subscribe('http2.client.stream.bodyChunkSent', common.mustNotCall());

18+
19+

dc.subscribe('http2.client.stream.bodySent', common.mustCall(({ stream }) => {

20+

// Since ClientHttp2Stream is not exported from any module, this just checks

21+

// if the stream is an instance of Duplex.

22+

assert.ok(stream instanceof Duplex);

23+

assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');

24+

}));

25+
26+

const server = http2.createServer();

27+

server.on('stream', common.mustCall((stream) => {

28+

stream.respond({}, { endStream: true });

29+

}));

30+
31+

server.listen(0, common.mustCall(() => {

32+

const port = server.address().port;

33+

const client = http2.connect(`http://localhost:${port}`);

34+
35+

const stream = client.request({ [http2.constants.HTTP2_HEADER_METHOD]: 'POST' });

36+

stream.end();

37+
38+

stream.on('response', common.mustCall(() => {

39+

client.close();

40+

server.close();

41+

}));

42+

}, 1));

Original file line numberDiff line numberDiff line change

@@ -0,0 +1,59 @@

1+

'use strict';

2+
3+

const common = require('../common');

4+

if (!common.hasCrypto)

5+

common.skip('missing crypto');

6+
7+

// This test ensures that the built-in HTTP/2 diagnostics channels are reporting

8+

// the diagnostics messages for the 'http2.client.stream.bodyChunkSent' and

9+

// 'http2.client.stream.bodySent' channels when ClientHttp2Streams bodies are

10+

// being sent with a single Buffer.

11+
12+

const assert = require('assert');

13+

const dc = require('diagnostics_channel');

14+

const http2 = require('http2');

15+

const { Duplex } = require('stream');

16+
17+

let bodyChunkSent = false;

18+
19+

dc.subscribe('http2.client.stream.bodyChunkSent', common.mustCall(({ stream, writev, data, encoding }) => {

20+

// Since ClientHttp2Stream is not exported from any module, this just checks

21+

// if the stream is an instance of Duplex.

22+

assert.ok(stream instanceof Duplex);

23+

assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');

24+
25+

assert.strictEqual(writev, false);

26+

assert.ok(Buffer.from('foo').equals(data));

27+

assert.strictEqual(encoding, 'buffer');

28+
29+

bodyChunkSent = true;

30+

}));

31+
32+

dc.subscribe('http2.client.stream.bodySent', common.mustCall(({ stream }) => {

33+

// 'http2.client.stream.bodyChunkSent' must run first.

34+

assert.ok(bodyChunkSent);

35+
36+

// Since ClientHttp2Stream is not exported from any module, this just checks

37+

// if the stream is an instance of Duplex.

38+

assert.ok(stream instanceof Duplex);

39+

assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');

40+

}));

41+
42+

const server = http2.createServer();

43+

server.on('stream', common.mustCall((stream) => {

44+

stream.respond({}, { endStream: true });

45+

}));

46+
47+

server.listen(0, common.mustCall(() => {

48+

const port = server.address().port;

49+

const client = http2.connect(`http://localhost:${port}`);

50+
51+

const stream = client.request({ [http2.constants.HTTP2_HEADER_METHOD]: 'POST' });

52+

stream.write(Buffer.from('foo'));

53+

stream.end();

54+
55+

stream.on('response', common.mustCall(() => {

56+

client.close();

57+

server.close();

58+

}));

59+

}, 1));

Original file line numberDiff line numberDiff line change

@@ -0,0 +1,59 @@

1+

'use strict';

2+
3+

const common = require('../common');

4+

if (!common.hasCrypto)

5+

common.skip('missing crypto');

6+
7+

// This test ensures that the built-in HTTP/2 diagnostics channels are reporting

8+

// the diagnostics messages for the 'http2.client.stream.bodyChunkSent' and

9+

// 'http2.client.stream.bodySent' channels when ClientHttp2Streams bodies are

10+

// being sent with a single string.

11+
12+

const assert = require('assert');

13+

const dc = require('diagnostics_channel');

14+

const http2 = require('http2');

15+

const { Duplex } = require('stream');

16+
17+

let bodyChunkSent = false;

18+
19+

dc.subscribe('http2.client.stream.bodyChunkSent', common.mustCall(({ stream, writev, data, encoding }) => {

20+

// Since ClientHttp2Stream is not exported from any module, this just checks

21+

// if the stream is an instance of Duplex.

22+

assert.ok(stream instanceof Duplex);

23+

assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');

24+
25+

assert.strictEqual(writev, false);

26+

assert.strictEqual(data, 'foo');

27+

assert.strictEqual(encoding, 'utf8');

28+
29+

bodyChunkSent = true;

30+

}));

31+
32+

dc.subscribe('http2.client.stream.bodySent', common.mustCall(({ stream }) => {

33+

// 'http2.client.stream.bodyChunkSent' must run first.

34+

assert.ok(bodyChunkSent);

35+
36+

// Since ClientHttp2Stream is not exported from any module, this just checks

37+

// if the stream is an instance of Duplex.

38+

assert.ok(stream instanceof Duplex);

39+

assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');

40+

}));

41+
42+

const server = http2.createServer();

43+

server.on('stream', common.mustCall((stream) => {

44+

stream.respond({}, { endStream: true });

45+

}));

46+
47+

server.listen(0, common.mustCall(() => {

48+

const port = server.address().port;

49+

const client = http2.connect(`http://localhost:${port}`);

50+
51+

const stream = client.request({ [http2.constants.HTTP2_HEADER_METHOD]: 'POST' });

52+

stream.write('foo');

53+

stream.end();

54+
55+

stream.on('response', common.mustCall(() => {

56+

client.close();

57+

server.close();

58+

}));

59+

}, 1));