worker: support MessagePort passing in messages · nodejs/node@f447acd
@@ -41,14 +41,27 @@ namespace {
4141// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
4242class DeserializerDelegate : public ValueDeserializer::Delegate {
4343public:
44-DeserializerDelegate(Message* m, Environment* env)
45- : env_(env), msg_(m) {}
44+DeserializerDelegate(Message* m,
45+ Environment* env,
46+const std::vector<MessagePort*>& message_ports)
47+ : env_(env), msg_(m), message_ports_(message_ports) {}
48+49+ MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
50+// Currently, only MessagePort hosts objects are supported, so identifying
51+// by the index in the message's MessagePort array is sufficient.
52+uint32_t id;
53+if (!deserializer->ReadUint32(&id))
54+return MaybeLocal<Object>();
55+CHECK_LE(id, message_ports_.size());
56+return message_ports_[id]->object();
57+ };
46584759 ValueDeserializer* deserializer = nullptr;
48604961private:
5062 Environment* env_;
5163 Message* msg_;
64+const std::vector<MessagePort*>& message_ports_;
5265};
53665467} // anonymous namespace
@@ -58,7 +71,23 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
5871 EscapableHandleScope handle_scope(env->isolate());
5972 Context::Scope context_scope(context);
607361- DeserializerDelegate delegate(this, env);
74+// Create all necessary MessagePort handles.
75+ std::vector<MessagePort*> ports(message_ports_.size());
76+for (uint32_t i = 0; i < message_ports_.size(); ++i) {
77+ ports[i] = MessagePort::New(env,
78+ context,
79+std::move(message_ports_[i]));
80+if (ports[i] == nullptr) {
81+for (MessagePort* port : ports) {
82+// This will eventually release the MessagePort object itself.
83+ port->Close();
84+ }
85+return MaybeLocal<Value>();
86+ }
87+ }
88+ message_ports_.clear();
89+90+ DeserializerDelegate delegate(this, env, ports);
6291 ValueDeserializer deserializer(
6392 env->isolate(),
6493reinterpret_cast<const uint8_t*>(main_message_buf_.data),
@@ -83,6 +112,10 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
83112 deserializer.ReadValue(context).FromMaybe(Local<Value>()));
84113}
85114115+void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
116+ message_ports_.emplace_back(std::move(data));
117+}
118+86119namespace {
8712088121// This tells V8 how to serialize objects that it does not understand
@@ -97,12 +130,43 @@ class SerializerDelegate : public ValueSerializer::Delegate {
97130 env_->isolate()->ThrowException(Exception::Error(message));
98131 }
99132133+ Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
134+if (env_->message_port_constructor_template()->HasInstance(object)) {
135+return WriteMessagePort(Unwrap<MessagePort>(object));
136+ }
137+138+THROW_ERR_CANNOT_TRANSFER_OBJECT(env_);
139+return Nothing<bool>();
140+ }
141+142+void Finish() {
143+// Only close the MessagePort handles and actually transfer them
144+// once we know that serialization succeeded.
145+for (MessagePort* port : ports_) {
146+ port->Close();
147+ msg_->AddMessagePort(port->Detach());
148+ }
149+ }
150+100151 ValueSerializer* serializer = nullptr;
101152102153private:
154+ Maybe<bool> WriteMessagePort(MessagePort* port) {
155+for (uint32_t i = 0; i < ports_.size(); i++) {
156+if (ports_[i] == port) {
157+ serializer->WriteUint32(i);
158+return Just(true);
159+ }
160+ }
161+162+THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
163+return Nothing<bool>();
164+ }
165+103166 Environment* env_;
104167 Local<Context> context_;
105168 Message* msg_;
169+ std::vector<MessagePort*> ports_;
106170107171friend class worker::Message;
108172};
@@ -131,7 +195,7 @@ Maybe<bool> Message::Serialize(Environment* env,
131195 Local<Value> entry;
132196if (!transfer_list->Get(context, i).ToLocal(&entry))
133197return Nothing<bool>();
134-// Currently, we support ArrayBuffers.
198+// Currently, we support ArrayBuffers and MessagePorts.
135199if (entry->IsArrayBuffer()) {
136200 Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
137201// If we cannot render the ArrayBuffer unusable in this Isolate and
@@ -144,6 +208,12 @@ Maybe<bool> Message::Serialize(Environment* env,
144208 array_buffers.push_back(ab);
145209 serializer.TransferArrayBuffer(id, ab);
146210continue;
211+ } else if (env->message_port_constructor_template()
212+ ->HasInstance(entry)) {
213+ MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
214+CHECK_NE(port, nullptr);
215+ delegate.ports_.push_back(port);
216+continue;
147217 }
148218149219THROW_ERR_INVALID_TRANSFER_OBJECT(env);
@@ -167,6 +237,8 @@ Maybe<bool> Message::Serialize(Environment* env,
167237 contents.ByteLength() });
168238 }
169239240+ delegate.Finish();
241+170242// The serializer gave us a buffer allocated using `malloc()`.
171243 std::pair<uint8_t*, size_t> data = serializer.Release();
172244 main_message_buf_ =