◐ Shell
clean mode source ↗

worker: support MessagePort passing in messages · nodejs/node@f447acd

@@ -41,14 +41,27 @@ namespace {

4141

// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.

4242

class DeserializerDelegate : public ValueDeserializer::Delegate {

4343

public:

44-

DeserializerDelegate(Message* m, Environment* env)

45-

: env_(env), msg_(m) {}

44+

DeserializerDelegate(Message* m,

45+

Environment* env,

46+

const std::vector<MessagePort*>& message_ports)

47+

: env_(env), msg_(m), message_ports_(message_ports) {}

48+49+

MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {

50+

// Currently, only MessagePort hosts objects are supported, so identifying

51+

// by the index in the message's MessagePort array is sufficient.

52+

uint32_t id;

53+

if (!deserializer->ReadUint32(&id))

54+

return MaybeLocal<Object>();

55+

CHECK_LE(id, message_ports_.size());

56+

return message_ports_[id]->object();

57+

};

46584759

ValueDeserializer* deserializer = nullptr;

48604961

private:

5062

Environment* env_;

5163

Message* msg_;

64+

const std::vector<MessagePort*>& message_ports_;

5265

};

53665467

} // anonymous namespace

@@ -58,7 +71,23 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,

5871

EscapableHandleScope handle_scope(env->isolate());

5972

Context::Scope context_scope(context);

607361-

DeserializerDelegate delegate(this, env);

74+

// Create all necessary MessagePort handles.

75+

std::vector<MessagePort*> ports(message_ports_.size());

76+

for (uint32_t i = 0; i < message_ports_.size(); ++i) {

77+

ports[i] = MessagePort::New(env,

78+

context,

79+

std::move(message_ports_[i]));

80+

if (ports[i] == nullptr) {

81+

for (MessagePort* port : ports) {

82+

// This will eventually release the MessagePort object itself.

83+

port->Close();

84+

}

85+

return MaybeLocal<Value>();

86+

}

87+

}

88+

message_ports_.clear();

89+90+

DeserializerDelegate delegate(this, env, ports);

6291

ValueDeserializer deserializer(

6392

env->isolate(),

6493

reinterpret_cast<const uint8_t*>(main_message_buf_.data),

@@ -83,6 +112,10 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,

83112

deserializer.ReadValue(context).FromMaybe(Local<Value>()));

84113

}

85114115+

void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {

116+

message_ports_.emplace_back(std::move(data));

117+

}

118+86119

namespace {

8712088121

// This tells V8 how to serialize objects that it does not understand

@@ -97,12 +130,43 @@ class SerializerDelegate : public ValueSerializer::Delegate {

97130

env_->isolate()->ThrowException(Exception::Error(message));

98131

}

99132133+

Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {

134+

if (env_->message_port_constructor_template()->HasInstance(object)) {

135+

return WriteMessagePort(Unwrap<MessagePort>(object));

136+

}

137+138+

THROW_ERR_CANNOT_TRANSFER_OBJECT(env_);

139+

return Nothing<bool>();

140+

}

141+142+

void Finish() {

143+

// Only close the MessagePort handles and actually transfer them

144+

// once we know that serialization succeeded.

145+

for (MessagePort* port : ports_) {

146+

port->Close();

147+

msg_->AddMessagePort(port->Detach());

148+

}

149+

}

150+100151

ValueSerializer* serializer = nullptr;

101152102153

private:

154+

Maybe<bool> WriteMessagePort(MessagePort* port) {

155+

for (uint32_t i = 0; i < ports_.size(); i++) {

156+

if (ports_[i] == port) {

157+

serializer->WriteUint32(i);

158+

return Just(true);

159+

}

160+

}

161+162+

THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);

163+

return Nothing<bool>();

164+

}

165+103166

Environment* env_;

104167

Local<Context> context_;

105168

Message* msg_;

169+

std::vector<MessagePort*> ports_;

106170107171

friend class worker::Message;

108172

};

@@ -131,7 +195,7 @@ Maybe<bool> Message::Serialize(Environment* env,

131195

Local<Value> entry;

132196

if (!transfer_list->Get(context, i).ToLocal(&entry))

133197

return Nothing<bool>();

134-

// Currently, we support ArrayBuffers.

198+

// Currently, we support ArrayBuffers and MessagePorts.

135199

if (entry->IsArrayBuffer()) {

136200

Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();

137201

// If we cannot render the ArrayBuffer unusable in this Isolate and

@@ -144,6 +208,12 @@ Maybe<bool> Message::Serialize(Environment* env,

144208

array_buffers.push_back(ab);

145209

serializer.TransferArrayBuffer(id, ab);

146210

continue;

211+

} else if (env->message_port_constructor_template()

212+

->HasInstance(entry)) {

213+

MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());

214+

CHECK_NE(port, nullptr);

215+

delegate.ports_.push_back(port);

216+

continue;

147217

}

148218149219

THROW_ERR_INVALID_TRANSFER_OBJECT(env);

@@ -167,6 +237,8 @@ Maybe<bool> Message::Serialize(Environment* env,

167237

contents.ByteLength() });

168238

}

169239240+

delegate.Finish();

241+170242

// The serializer gave us a buffer allocated using `malloc()`.

171243

std::pair<uint8_t*, size_t> data = serializer.Release();

172244

main_message_buf_ =