Valkey as a Message Broker for Request-Reply
I needed to call an RPC service hosted behind a home NAT, with no public IP and no port forwarding. Using Valkey/Redis lists and blocking pops, I built a lightweight request-reply protocol that lets machines communicate across NATs without tunnels or VPNs.
I needed it because my service runs on a beefy machine with serious capacity. Renting equivalent hardware in the cloud would cost a lot of money, so I use my home workstation for some workloads.
Request-Reply brokered via Valkey
The protocol (I called it Valbridge) is simple. A caller sends a request by pushing a JSON message to a service queue. The message includes a unique reply channel ID (it's a UUIDv4). The caller then blocks on that reply channel, waiting for a response.
On the other side, a listener blocks on the service queue. When a request arrives, it processes it, then pushes the response to the reply channel specified in the request.
Caller Valkey Listener
| | |
|-- RPUSH req:service --> | |
|-- BLPOP reply:uuid ---> | |
| | <-- BLPOP req:service ---|
| | | (processing)
| | <-- RPUSH reply:uuid ----|
| <-- response --------- | |
That's the entire protocol. There is no subscription management, no topic routing, but just lists and blocking pops. But as we'll see, there are a few tricks to keep it efficient and avoid bloat.
Implementation details
The interesting bits (time-bucketed queues, self-cleaning reply channels) are explained after the following code.
Types and constants
const TIMEOUT_SECS = 10;
const BUCKET_TTL_SECS = 90; // TTL for time-bucketed request queues (90 seconds)
const CLEANUP_TTL_SECS = 30; // TTL for reply channels (30 seconds)
export interface RequestMessage {
date: number;
content_type?: string;
reply_to: string;
body: string;
}
export interface ReplyMessage {
date: number;
content_type?: string;
server_id: string;
think_time?: number;
body: string;
}
Caller side
async call(service: string, body: string): Promise<string | null> {
const replyChanId = `chan:${crypto.randomUUID()}`;
const requestMessage: RequestMessage = {
date: Date.now(),
reply_to: replyChanId,
body,
};
// Calculate time bucket (minutes since epoch)
const bucket = Math.floor(Date.now() / 60_000);
const queueKey = `req:${service}:${bucket}`;
// Send request to time-bucketed queue and wait for response in a single pipeline
// This achieves 1 network roundtrip instead of 2
const pipe = this.client.pipeline();
await pipe.rpush(queueKey, JSON.stringify(requestMessage));
await pipe.expire(queueKey, BUCKET_TTL_SECS);
await pipe.blpop(TIMEOUT_SECS, replyChanId); // BLPOP must be the last command in the pipeline
const pipeResult = await pipe.flush();
const resp = pipeResult[pipeResult.length - 1];
if (!resp) {
console.warn("no reply - timeout");
return null;
}
// Success: response received (BLPOP automatically deletes the reply channel)
const responseMessage: ReplyMessage = JSON.parse(resp[1]);
return responseMessage.body;
}
Listener side
type Handler = (msg: RequestMessage) => Promise;
async listen(service: string, handler: Handler) {
while (true) {
// Calculate current, previous, and next time buckets (minutes since epoch)
// Monitor 3 buckets (m-1, m, m+1) to avoid missing requests when
// the minute boundary changes during the BLPOP timeout
const currentBucket = Math.floor(Date.now() / 60_000);
const prevBucket = currentBucket - 1;
const nextBucket = currentBucket + 1;
// BLPOP checks all buckets atomically, prioritizing the oldest bucket first
const resp = await this.client.blpop(
20,
`req:${service}:${prevBucket}`,
`req:${service}:${currentBucket}`,
`req:${service}:${nextBucket}`,
);
if (!resp) {
continue;
}
const requestMessage: RequestMessage = JSON.parse(resp[1]);
console.log("Handling request from channel:", requestMessage.reply_to);
if (!requestMessage.reply_to) {
console.warn("Invalid request: missing reply_to", requestMessage);
continue;
}
const body = await handler(requestMessage);
const responseMessage: ReplyMessage = {
date: Date.now(),
server_id: this.serverId,
body,
};
// Send response to reply channel with TTL (1 network roundtrip)
const pipe = this.client.pipeline();
await pipe.rpush(
requestMessage.reply_to,
JSON.stringify(responseMessage),
);
await pipe.expire(requestMessage.reply_to, CLEANUP_TTL_SECS);
await pipe.flush();
console.log("Replied to:", requestMessage.reply_to);
}
}
Time-bucketed queues
One detail worth mentioning: request queues are time-bucketed by minute.
Instead of a single req:service key, requests go to
req:service:{bucket}
where the bucket is the current minute since epoch.
Without bucketing, the queue would accumulate stale requests if the listener goes offline. With bucketing, each key has a TTL of 90 seconds. Old requests expire automatically. No further cleanup job is needed.
The listener monitors three buckets at once (previous minute, current,
next) to handle the boundary between two minutes. BLPOP
accepts multiple keys and returns from whichever has data first,
prioritizing the oldest bucket.
Reply channels are self-cleaning
Each reply channel is a unique UUID-based list: chan:{uuid}.
The caller creates it implicitly by blocking on it. The listener pushes
the response and sets a short TTL (30 seconds).
If the caller receives the response, BLPOP
atomically pops and deletes it. If the caller times out and gives up, the
TTL handles cleanup. Either way, no garbage accumulates.
Performance
Both the call and listen paths use pipelines to batch commands into a
single network roundtrip. The caller's
RPUSH + EXPIRE + BLPOP
is one pipeline. The listener's RPUSH + EXPIRE
for the reply is one pipeline too.
Pipelines amortize network latency. In practice, Valbridge has been handling thousands of requests without any issue so far. The bottleneck is the handler function (CPU-bound), not the messaging layer anyway.
But why?
The typical solutions are reverse tunnels (Cloudflare Tunnel, ngrok, SSH
with -R), VPN meshes (Tailscale or DIY with WireGuard), or,
God forbid, ninja NAT hole-punching techniques.
I went with something simpler. Both machines connect outbound to a shared Valkey TLS-enabled instance on a cheap VPS. There is no port to open, no tunneling, and no VPN. There is no TLS setup on the service side either, since both machines are clients from Valkey's perspective. It may change in the future if I decide to enable mTLS.
This is a well-known pattern, specialized brokers like
NATS
and ZeroMQ have it. But I was already running Valkey
for other things, and the pattern maps cleanly onto Redis/Valkey
primitives RPUSH
and BLPOP, so I went with it.
Closing thoughts
Valbridge is not a message broker. It's a single TypeScript file (I have it for Python too), a dependency I already had, and that's all I needed.