diff --git a/src/workerd/api/global-scope.c++ b/src/workerd/api/global-scope.c++ index fdc26124303..2f705fe7bbb 100644 --- a/src/workerd/api/global-scope.c++ +++ b/src/workerd/api/global-scope.c++ @@ -179,8 +179,9 @@ kj::Promise ServiceWorkerGlobalScope::connect(kj::String host, // We set isDefaultFetchPort to false here – sockets.c++ sets it for ports 443 and 8080 to // provide a more descriptive error message for HTTP, but this is not relevant on the TCP server // side. - jsg::Ref jsSocket = setupSocket(js, kj::mv(ownConnection), kj::none, kj::none, - kj::mv(nullTlsStarter), SecureTransportKind::OFF, kj::none, false, kj::none); + jsg::Ref jsSocket = + setupSocket(js, kj::mv(ownConnection), kj::none /* remoteAddress */, kj::mv(host), kj::none, + kj::mv(nullTlsStarter), SecureTransportKind::OFF, kj::none, false, kj::none); // handleProxyStatus() is required to indicate that the socket was opened properly. Since the // connection is already open at this point, exception handling is not required. jsSocket->handleProxyStatus(js, kj::Promise>(kj::none)); diff --git a/src/workerd/api/hyperdrive.c++ b/src/workerd/api/hyperdrive.c++ index 4b4c501fa11..7792faadb3f 100644 --- a/src/workerd/api/hyperdrive.c++ +++ b/src/workerd/api/hyperdrive.c++ @@ -44,9 +44,9 @@ jsg::Ref Hyperdrive::connect(jsg::Lock& js) { // TODO(someday): Support TLS? It's not at all necessary since we're connecting locally, but // some users may want it anyway. auto nullTlsStarter = kj::heap(); - auto sock = setupSocket(js, kj::mv(conn), kj::str(getHost(), ":", getPort()), kj::none, - kj::mv(nullTlsStarter), SecureTransportKind::OFF, kj::str(this->randomHost), false, - kj::none /* maybeOpenedPrPair */); + auto sock = setupSocket(js, kj::mv(conn), kj::str(getHost(), ":", getPort()), + kj::none /* localAddress */, kj::none, kj::mv(nullTlsStarter), SecureTransportKind::OFF, + kj::str(this->randomHost), false, kj::none /* maybeOpenedPrPair */); sock->handleProxyStatus(js, kj::mv(paf.promise)); return sock; } diff --git a/src/workerd/api/sockets.c++ b/src/workerd/api/sockets.c++ index 62c8b3aaf7b..12f676d389f 100644 --- a/src/workerd/api/sockets.c++ +++ b/src/workerd/api/sockets.c++ @@ -87,6 +87,7 @@ class StreamWorkerInterface; jsg::Ref setupSocket(jsg::Lock& js, kj::Own connection, kj::Maybe remoteAddress, + kj::Maybe localAddress, jsg::Optional options, kj::Own tlsStarter, SecureTransportKind secureTransport, @@ -173,9 +174,9 @@ jsg::Ref setupSocket(jsg::Lock& js, getWritableHighWaterMark(options), openedPrPair.promise.whenResolved(js)); auto result = js.alloc(js, ioContext, kj::mv(refcountedConnection), kj::mv(remoteAddress), - kj::mv(readable), kj::mv(writable), kj::mv(closedPrPair), kj::mv(watchForDisconnectTask), - kj::mv(options), kj::mv(tlsStarter), secureTransport, kj::mv(domain), isDefaultFetchPort, - kj::mv(openedPrPair)); + kj::mv(localAddress), kj::mv(readable), kj::mv(writable), kj::mv(closedPrPair), + kj::mv(watchForDisconnectTask), kj::mv(options), kj::mv(tlsStarter), secureTransport, + kj::mv(domain), isDefaultFetchPort, kj::mv(openedPrPair)); KJ_IF_SOME(p, eofPromise) { result->handleReadableEof(js, kj::mv(p)); @@ -272,9 +273,9 @@ jsg::Ref connectImplNoOutputLock(jsg::Lock& js, auto request = httpClient->connect(addressStr, *headers, httpConnectSettings); request.connection = request.connection.attach(kj::mv(httpClient)); - auto result = setupSocket(js, kj::mv(request.connection), kj::mv(addressStr), kj::mv(options), - kj::mv(tlsStarter), secureTransport, kj::mv(domain), isDefaultFetchPort, - kj::none /* maybeOpenedPrPair */); + auto result = setupSocket(js, kj::mv(request.connection), kj::mv(addressStr), + kj::none /* localAddress */, kj::mv(options), kj::mv(tlsStarter), secureTransport, + kj::mv(domain), isDefaultFetchPort, kj::none /* maybeOpenedPrPair */); // `handleProxyStatus` needs an initialized refcount to use `JSG_THIS`, hence it cannot be // called in Socket's constructor. Also it's only necessary when creating a Socket as a result of // a `connect`. @@ -363,7 +364,8 @@ jsg::Ref Socket::startTls(jsg::Lock& js, jsg::Optional tlsOp JSG_VISITABLE_LAMBDA((self = JSG_THIS, domain = kj::heapString(KJ_ASSERT_NONNULL(domain)), tlsOptions = kj::mv(tlsOptions), openedResolver = openedPrPair.resolver.addRef(js), - remoteAddress = mapCopyString(remoteAddress)), + remoteAddress = mapCopyString(remoteAddress), + localAddress = mapCopyString(localAddress)), (self, openedResolver), (jsg::Lock & js) mutable { auto& context = IoContext::current(); @@ -402,11 +404,12 @@ jsg::Ref Socket::startTls(jsg::Lock& js, jsg::Optional tlsOp openedResolver.resolve(js, context.awaitIo(js, forkedPromise.addBranch(), - [remoteAddress = kj::mv(remoteAddress)]( + [remoteAddress = kj::mv(remoteAddress), + localAddress = kj::mv(localAddress)]( jsg::Lock& js) mutable -> SocketInfo { return SocketInfo{ .remoteAddress = kj::mv(remoteAddress), - .localAddress = kj::none, + .localAddress = kj::mv(localAddress), }; })); @@ -425,8 +428,9 @@ jsg::Ref Socket::startTls(jsg::Lock& js, jsg::Optional tlsOp // to `setupSocket`. auto newTlsStarter = kj::heap(); return setupSocket(js, kj::newPromisedStream(kj::mv(secureStreamPromise)), - mapCopyString(remoteAddress), kj::mv(options), kj::mv(newTlsStarter), SecureTransportKind::ON, - kj::mv(domain), isDefaultFetchPort, kj::mv(openedPrPair)); + mapCopyString(remoteAddress), mapCopyString(localAddress), kj::mv(options), + kj::mv(newTlsStarter), SecureTransportKind::ON, kj::mv(domain), isDefaultFetchPort, + kj::mv(openedPrPair)); } void Socket::handleProxyStatus( @@ -460,12 +464,13 @@ void Socket::handleProxyStatus( } handleProxyError(js, JSG_KJ_EXCEPTION(FAILED, Error, msg)); } else { - // In our implementation we do not expose the local address at all simply - // because there's no useful value we can provide. + // For outbound sockets we have no useful local address to expose. Inbound sockets (produced + // by the `connect()` handler dispatch path) populate `localAddress` with the CONNECT + // authority that the peer targeted. openedResolver.resolve(js, SocketInfo{ .remoteAddress = mapCopyString(remoteAddress), - .localAddress = kj::none, + .localAddress = mapCopyString(localAddress), }); } }; @@ -488,12 +493,13 @@ void Socket::handleProxyStatus(jsg::Lock& js, kj::Promise remoteAddress; - // The local address is specified by the spec but we don't implement it. - // It will always remain empty. + // The local address — i.e. the address on this side of the socket. For outbound sockets created + // via `connect()`, we don't have a useful value to provide and leave it empty. For inbound + // sockets delivered to a worker's `connect(socket)` handler, this is populated with the CONNECT + // authority (the "host:port" string the caller passed to `fetcher.connect(...)`), since from the + // handler's perspective that is the address the peer asked to connect to on this end. jsg::Optional localAddress; JSG_STRUCT(remoteAddress, localAddress); }; @@ -61,6 +64,7 @@ class Socket: public jsg::Object { IoContext& context, kj::Own>> connectionStream, kj::Maybe remoteAddress, + kj::Maybe localAddress, jsg::Ref readableParam, jsg::Ref writable, jsg::PromiseResolverPair closedPrPair, @@ -80,6 +84,7 @@ class Socket: public jsg::Object { closedPromise(kj::mv(closedPrPair.promise)), options(kj::mv(options)), remoteAddress(kj::mv(remoteAddress)), + localAddress(kj::mv(localAddress)), secureTransport(secureTransport), domain(kj::mv(domain)), isDefaultFetchPort(isDefaultFetchPort), @@ -201,6 +206,7 @@ class Socket: public jsg::Object { jsg::MemoizedIdentity> closedPromise; jsg::Optional options; kj::Maybe remoteAddress; + kj::Maybe localAddress; // Set to true when the socket is upgraded to a secure one. bool upgraded = false; SecureTransportKind secureTransport; @@ -246,6 +252,7 @@ class Socket: public jsg::Object { jsg::Ref setupSocket(jsg::Lock& js, kj::Own connection, kj::Maybe remoteAddress, + kj::Maybe localAddress, jsg::Optional options, kj::Own tlsStarter, SecureTransportKind secureTransport, diff --git a/src/workerd/api/tests/connect-handler-test-proxy.js b/src/workerd/api/tests/connect-handler-test-proxy.js index af23f67b995..a32bc9a2376 100644 --- a/src/workerd/api/tests/connect-handler-test-proxy.js +++ b/src/workerd/api/tests/connect-handler-test-proxy.js @@ -23,3 +23,16 @@ export class ConnectEndpoint extends WorkerEntrypoint { await writer.close(); } } + +// Reached via a service binding from connect-handler-test.js. Awaits socket.opened and echoes back +// the observed localAddress, which on the service-binding path is the verbatim authority string +// the caller passed to fetcher.connect(...). The client test asserts strict equality. +export class LocalAddressEndpoint extends WorkerEntrypoint { + async connect(socket) { + const { localAddress } = await socket.opened; + const enc = new TextEncoder(); + const writer = socket.writable.getWriter(); + await writer.write(enc.encode(`OK:${localAddress}`)); + await writer.close(); + } +} diff --git a/src/workerd/api/tests/connect-handler-test.js b/src/workerd/api/tests/connect-handler-test.js index f21a2930144..7764cf35397 100644 --- a/src/workerd/api/tests/connect-handler-test.js +++ b/src/workerd/api/tests/connect-handler-test.js @@ -38,6 +38,27 @@ export const connectHandlerProxy = { }, }; +// Exercises the service-binding path: fetcher.connect("host:port") -> WorkerEntrypoint::connect on +// the target worker. Unlike the TCP listener path (which presents the listener's bound address as +// the authority), the service-binding path forwards the caller's authority string verbatim, so we +// can assert strict equality between what the client passed and what the server observes as +// socket.opened.localAddress. +export const localAddressViaServiceBinding = { + async test(ctrl, env) { + const AUTHORITY = 'example.com:1234'; + const socket = env.TARGET.connect(AUTHORITY); + await socket.opened; + const dec = new TextDecoder(); + let result = ''; + for await (const chunk of socket.readable) { + result += dec.decode(chunk, { stream: true }); + } + result += dec.decode(); + strictEqual(result, `OK:${AUTHORITY}`); + await socket.closed; + }, +}; + export default { async connect(socket) { const enc = new TextEncoder(); diff --git a/src/workerd/api/tests/connect-handler-test.wd-test b/src/workerd/api/tests/connect-handler-test.wd-test index 98b517a861a..c5cea0ba591 100644 --- a/src/workerd/api/tests/connect-handler-test.wd-test +++ b/src/workerd/api/tests/connect-handler-test.wd-test @@ -8,6 +8,19 @@ const unitTests :Workerd.Config = ( (name = "worker", esModule = embed "connect-handler-test.js"), ], compatibilityFlags = ["nodejs_compat_v2", "experimental"], + bindings = [ + (name = "TARGET", service = ( + name = "connect-handler-test-local-address-endpoint", + entrypoint = "LocalAddressEndpoint")), + ], + ) + ), + ( name = "connect-handler-test-local-address-endpoint", + worker = ( + modules = [ + (name = "worker", esModule = embed "connect-handler-test-proxy.js"), + ], + compatibilityFlags = ["nodejs_compat_v2", "experimental"], ) ), ( name = "connect-handler-test-proxy", diff --git a/src/workerd/api/tests/tail-worker-test-receiver.js b/src/workerd/api/tests/tail-worker-test-receiver.js index 7cbf5082685..d7625dedbd3 100644 --- a/src/workerd/api/tests/tail-worker-test-receiver.js +++ b/src/workerd/api/tests/tail-worker-test-receiver.js @@ -32,7 +32,7 @@ export const test = { // The shared tail worker we configured only produces onset and outcome events, so every trace is identical here. // Number of traces based on how often main tail worker is invoked from previous tests - let numTraces = 32; + let numTraces = 33; let basicTrace = '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"trace","traces":[]}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}'; assert.deepStrictEqual( diff --git a/src/workerd/api/tests/tail-worker-test.js b/src/workerd/api/tests/tail-worker-test.js index de36e83798a..bace4473cf4 100644 --- a/src/workerd/api/tests/tail-worker-test.js +++ b/src/workerd/api/tests/tail-worker-test.js @@ -193,6 +193,8 @@ const E = { '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"connectHandler","scriptTags":[],"info":{"type":"custom"}}{"type":"spanOpen","name":"connect","spanId":"0000000000000001"}{"type":"spanClose","outcome":"ok"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', connectHandlerProxy: '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"connectHandlerProxy","scriptTags":[],"info":{"type":"custom"}}{"type":"spanOpen","name":"connect","spanId":"0000000000000001"}{"type":"spanClose","outcome":"ok"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', + localAddressViaServiceBinding: + '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"localAddressViaServiceBinding","scriptTags":[],"info":{"type":"custom"}}{"type":"spanOpen","name":"connect","spanId":"0000000000000001"}{"type":"spanClose","outcome":"ok"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', connectTarget: '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"connect"}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', @@ -250,6 +252,7 @@ const expectedFlat = [ n(E.cacheMode), n(E.connectHandler), n(E.connectHandlerProxy), + n(E.localAddressViaServiceBinding), n(E.jsrpcGetCounter), n(E.jsrpcNonFunction), n(E.connectTarget), @@ -291,9 +294,11 @@ const expectedWithPropagation = [ // cacheMode: standalone n(E.cacheMode), - // connect: handler and proxy are separate top-level tests, target is independent + // connect: handler, proxy, and service-binding test are separate top-level tests, target is + // independent n(E.connectHandler), n(E.connectHandlerProxy), + n(E.localAddressViaServiceBinding), n(E.connectTarget), // jsrpc DO subrequest test: caller has children (MyService + MyActor DO calls) diff --git a/src/workerd/api/tests/tail-worker-test.wd-test b/src/workerd/api/tests/tail-worker-test.wd-test index 1cf53d675d7..3cdd7880dd1 100644 --- a/src/workerd/api/tests/tail-worker-test.wd-test +++ b/src/workerd/api/tests/tail-worker-test.wd-test @@ -38,9 +38,22 @@ const unitTests :Workerd.Config = ( (name = "worker", esModule = embed "connect-handler-test.js"), ], compatibilityFlags = ["nodejs_compat_v2", "experimental"], + bindings = [ + (name = "TARGET", service = ( + name = "connect-handler-test-local-address-endpoint", + entrypoint = "LocalAddressEndpoint")), + ], streamingTails = ["log"], ) ), + ( name = "connect-handler-test-local-address-endpoint", + worker = ( + modules = [ + (name = "worker", esModule = embed "connect-handler-test-proxy.js"), + ], + compatibilityFlags = ["nodejs_compat_v2", "experimental"], + ) + ), ( name = "connect-handler-test-proxy", worker = ( modules = [