Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/workerd/api/global-scope.c++
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ kj::Promise<void> ServiceWorkerGlobalScope::connect(kj::String host,
// We set isDefaultFetchPort to false here – sockets.c++ sets it for ports 443 and 8080 to
// provide a more descriptive error message for HTTP, but this is not relevant on the TCP server
// side.
jsg::Ref<Socket> jsSocket = setupSocket(js, kj::mv(ownConnection), kj::none, kj::none,
kj::mv(nullTlsStarter), SecureTransportKind::OFF, kj::none, false, kj::none);
jsg::Ref<Socket> jsSocket =
setupSocket(js, kj::mv(ownConnection), kj::none /* remoteAddress */, kj::mv(host), kj::none,
kj::mv(nullTlsStarter), SecureTransportKind::OFF, kj::none, false, kj::none);
// handleProxyStatus() is required to indicate that the socket was opened properly. Since the
// connection is already open at this point, exception handling is not required.
jsSocket->handleProxyStatus(js, kj::Promise<kj::Maybe<kj::Exception>>(kj::none));
Expand Down
6 changes: 3 additions & 3 deletions src/workerd/api/hyperdrive.c++
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ jsg::Ref<Socket> Hyperdrive::connect(jsg::Lock& js) {
// TODO(someday): Support TLS? It's not at all necessary since we're connecting locally, but
// some users may want it anyway.
auto nullTlsStarter = kj::heap<kj::TlsStarterCallback>();
auto sock = setupSocket(js, kj::mv(conn), kj::str(getHost(), ":", getPort()), kj::none,
kj::mv(nullTlsStarter), SecureTransportKind::OFF, kj::str(this->randomHost), false,
kj::none /* maybeOpenedPrPair */);
auto sock = setupSocket(js, kj::mv(conn), kj::str(getHost(), ":", getPort()),
kj::none /* localAddress */, kj::none, kj::mv(nullTlsStarter), SecureTransportKind::OFF,
kj::str(this->randomHost), false, kj::none /* maybeOpenedPrPair */);
sock->handleProxyStatus(js, kj::mv(paf.promise));
return sock;
}
Expand Down
40 changes: 23 additions & 17 deletions src/workerd/api/sockets.c++
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class StreamWorkerInterface;
jsg::Ref<Socket> setupSocket(jsg::Lock& js,
kj::Own<kj::AsyncIoStream> connection,
kj::Maybe<kj::String> remoteAddress,
kj::Maybe<kj::String> localAddress,
jsg::Optional<SocketOptions> options,
kj::Own<kj::TlsStarterCallback> tlsStarter,
SecureTransportKind secureTransport,
Expand Down Expand Up @@ -173,9 +174,9 @@ jsg::Ref<Socket> setupSocket(jsg::Lock& js,
getWritableHighWaterMark(options), openedPrPair.promise.whenResolved(js));

auto result = js.alloc<Socket>(js, ioContext, kj::mv(refcountedConnection), kj::mv(remoteAddress),
kj::mv(readable), kj::mv(writable), kj::mv(closedPrPair), kj::mv(watchForDisconnectTask),
kj::mv(options), kj::mv(tlsStarter), secureTransport, kj::mv(domain), isDefaultFetchPort,
kj::mv(openedPrPair));
kj::mv(localAddress), kj::mv(readable), kj::mv(writable), kj::mv(closedPrPair),
kj::mv(watchForDisconnectTask), kj::mv(options), kj::mv(tlsStarter), secureTransport,
kj::mv(domain), isDefaultFetchPort, kj::mv(openedPrPair));

KJ_IF_SOME(p, eofPromise) {
result->handleReadableEof(js, kj::mv(p));
Expand Down Expand Up @@ -272,9 +273,9 @@ jsg::Ref<Socket> connectImplNoOutputLock(jsg::Lock& js,
auto request = httpClient->connect(addressStr, *headers, httpConnectSettings);
request.connection = request.connection.attach(kj::mv(httpClient));

auto result = setupSocket(js, kj::mv(request.connection), kj::mv(addressStr), kj::mv(options),
kj::mv(tlsStarter), secureTransport, kj::mv(domain), isDefaultFetchPort,
kj::none /* maybeOpenedPrPair */);
auto result = setupSocket(js, kj::mv(request.connection), kj::mv(addressStr),
kj::none /* localAddress */, kj::mv(options), kj::mv(tlsStarter), secureTransport,
kj::mv(domain), isDefaultFetchPort, kj::none /* maybeOpenedPrPair */);
// `handleProxyStatus` needs an initialized refcount to use `JSG_THIS`, hence it cannot be
// called in Socket's constructor. Also it's only necessary when creating a Socket as a result of
// a `connect`.
Expand Down Expand Up @@ -363,7 +364,8 @@ jsg::Ref<Socket> Socket::startTls(jsg::Lock& js, jsg::Optional<TlsOptions> tlsOp
JSG_VISITABLE_LAMBDA((self = JSG_THIS, domain = kj::heapString(KJ_ASSERT_NONNULL(domain)),
tlsOptions = kj::mv(tlsOptions),
openedResolver = openedPrPair.resolver.addRef(js),
remoteAddress = mapCopyString(remoteAddress)),
remoteAddress = mapCopyString(remoteAddress),
localAddress = mapCopyString(localAddress)),
(self, openedResolver), (jsg::Lock & js) mutable {
auto& context = IoContext::current();

Expand Down Expand Up @@ -402,11 +404,12 @@ jsg::Ref<Socket> Socket::startTls(jsg::Lock& js, jsg::Optional<TlsOptions> tlsOp

openedResolver.resolve(js,
context.awaitIo(js, forkedPromise.addBranch(),
[remoteAddress = kj::mv(remoteAddress)](
[remoteAddress = kj::mv(remoteAddress),
localAddress = kj::mv(localAddress)](
jsg::Lock& js) mutable -> SocketInfo {
return SocketInfo{
.remoteAddress = kj::mv(remoteAddress),
.localAddress = kj::none,
.localAddress = kj::mv(localAddress),
};
}));

Expand All @@ -425,8 +428,9 @@ jsg::Ref<Socket> Socket::startTls(jsg::Lock& js, jsg::Optional<TlsOptions> tlsOp
// to `setupSocket`.
auto newTlsStarter = kj::heap<kj::TlsStarterCallback>();
return setupSocket(js, kj::newPromisedStream(kj::mv(secureStreamPromise)),
mapCopyString(remoteAddress), kj::mv(options), kj::mv(newTlsStarter), SecureTransportKind::ON,
kj::mv(domain), isDefaultFetchPort, kj::mv(openedPrPair));
mapCopyString(remoteAddress), mapCopyString(localAddress), kj::mv(options),
kj::mv(newTlsStarter), SecureTransportKind::ON, kj::mv(domain), isDefaultFetchPort,
kj::mv(openedPrPair));
}

void Socket::handleProxyStatus(
Expand Down Expand Up @@ -460,12 +464,13 @@ void Socket::handleProxyStatus(
}
handleProxyError(js, JSG_KJ_EXCEPTION(FAILED, Error, msg));
} else {
// In our implementation we do not expose the local address at all simply
// because there's no useful value we can provide.
// For outbound sockets we have no useful local address to expose. Inbound sockets (produced
// by the `connect()` handler dispatch path) populate `localAddress` with the CONNECT
// authority that the peer targeted.
openedResolver.resolve(js,
SocketInfo{
.remoteAddress = mapCopyString(remoteAddress),
.localAddress = kj::none,
.localAddress = mapCopyString(localAddress),
});
}
};
Expand All @@ -488,12 +493,13 @@ void Socket::handleProxyStatus(jsg::Lock& js, kj::Promise<kj::Maybe<kj::Exceptio
if (result != kj::none) {
handleProxyError(js, JSG_KJ_EXCEPTION(FAILED, Error, "connection attempt failed"));
} else {
// In our implementation we do not expose the local address at all simply
// because there's no useful value we can provide.
// For outbound sockets we have no useful local address to expose. Inbound sockets (produced
// by the `connect()` handler dispatch path) populate `localAddress` with the CONNECT
// authority that the peer targeted.
openedResolver.resolve(js,
SocketInfo{
.remoteAddress = mapCopyString(remoteAddress),
.localAddress = kj::none,
.localAddress = mapCopyString(localAddress),
});
}
};
Expand Down
11 changes: 9 additions & 2 deletions src/workerd/api/sockets.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ struct SocketAddress {
struct SocketInfo {
jsg::Optional<kj::String> remoteAddress;

// The local address is specified by the spec but we don't implement it.
// It will always remain empty.
// The local address — i.e. the address on this side of the socket. For outbound sockets created
// via `connect()`, we don't have a useful value to provide and leave it empty. For inbound
// sockets delivered to a worker's `connect(socket)` handler, this is populated with the CONNECT
// authority (the "host:port" string the caller passed to `fetcher.connect(...)`), since from the
// handler's perspective that is the address the peer asked to connect to on this end.
jsg::Optional<kj::String> localAddress;
JSG_STRUCT(remoteAddress, localAddress);
};
Expand Down Expand Up @@ -61,6 +64,7 @@ class Socket: public jsg::Object {
IoContext& context,
kj::Own<kj::RefcountedWrapper<kj::Own<kj::AsyncIoStream>>> connectionStream,
kj::Maybe<kj::String> remoteAddress,
kj::Maybe<kj::String> localAddress,
jsg::Ref<ReadableStream> readableParam,
jsg::Ref<WritableStream> writable,
jsg::PromiseResolverPair<void> closedPrPair,
Expand All @@ -80,6 +84,7 @@ class Socket: public jsg::Object {
closedPromise(kj::mv(closedPrPair.promise)),
options(kj::mv(options)),
remoteAddress(kj::mv(remoteAddress)),
localAddress(kj::mv(localAddress)),
secureTransport(secureTransport),
domain(kj::mv(domain)),
isDefaultFetchPort(isDefaultFetchPort),
Expand Down Expand Up @@ -201,6 +206,7 @@ class Socket: public jsg::Object {
jsg::MemoizedIdentity<jsg::Promise<void>> closedPromise;
jsg::Optional<SocketOptions> options;
kj::Maybe<kj::String> remoteAddress;
kj::Maybe<kj::String> localAddress;
// Set to true when the socket is upgraded to a secure one.
bool upgraded = false;
SecureTransportKind secureTransport;
Expand Down Expand Up @@ -246,6 +252,7 @@ class Socket: public jsg::Object {
jsg::Ref<Socket> setupSocket(jsg::Lock& js,
kj::Own<kj::AsyncIoStream> connection,
kj::Maybe<kj::String> remoteAddress,
kj::Maybe<kj::String> localAddress,
jsg::Optional<SocketOptions> options,
kj::Own<kj::TlsStarterCallback> tlsStarter,
SecureTransportKind secureTransport,
Expand Down
9 changes: 9 additions & 0 deletions src/workerd/api/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ wd_test(
tags = ["exclusive"],
)

wd_test(
src = "connect-handler-local-address-test.wd-test",
args = ["--experimental"],
data = [
"connect-handler-local-address-test.js",
"connect-handler-local-address-test-endpoint.js",
],
)

wd_test(
src = "actor-alarms-test.wd-test",
args = ["--experimental"],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2026 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0
import { WorkerEntrypoint } from 'cloudflare:workers';

const EXPECTED_AUTHORITY = 'example.com:1234';

// Server-side CONNECT handler: awaits socket.opened and asserts that
// info.localAddress matches the authority string the caller supplied to
// fetcher.connect(...). From the handler's perspective, the CONNECT authority
// is the *local* address (the address on this side that the peer asked to
// connect to), not the remote one. Writes 'OK' on match, 'BAD:<actual>' on
// mismatch so the client's assertion failure points directly at the observed
// value.
export class LocalAddressEndpoint extends WorkerEntrypoint {
async connect(socket) {
const info = await socket.opened;
const enc = new TextEncoder();
const writer = socket.writable.getWriter();
const payload =
info.localAddress === EXPECTED_AUTHORITY
? 'OK'
: `BAD:${info.localAddress}`;
await writer.write(enc.encode(payload));
await writer.close();
}
}
25 changes: 25 additions & 0 deletions src/workerd/api/tests/connect-handler-local-address-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2026 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0
import { strictEqual } from 'node:assert';

const EXPECTED_AUTHORITY = 'example.com:1234';

// Client worker: invokes the CONNECT handler on a service binding and
// asserts that the authority string flows through to
// socket.opened.remoteAddress on the server side. The endpoint writes back
// either 'OK' (match) or 'BAD:<actual>' (mismatch) so failures are legible.
export default {
async test(ctrl, env) {
const socket = env.TARGET.connect(EXPECTED_AUTHORITY);
await socket.opened;
const dec = new TextDecoder();
let result = '';
for await (const chunk of socket.readable) {
result += dec.decode(chunk, { stream: true });
}
result += dec.decode();
strictEqual(result, 'OK');
await socket.closed;
},
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "connect-handler-local-address-test",
worker = (
modules = [
(name = "worker", esModule = embed "connect-handler-local-address-test.js"),
],
compatibilityFlags = ["nodejs_compat_v2", "experimental"],
bindings = [
(name = "TARGET", service = (
name = "connect-handler-local-address-endpoint",
entrypoint = "LocalAddressEndpoint")),
],
)
),
( name = "connect-handler-local-address-endpoint",
worker = (
modules = [
(name = "worker", esModule = embed "connect-handler-local-address-test-endpoint.js"),
],
compatibilityFlags = ["nodejs_compat_v2", "experimental"],
)
),
]
);
Loading