Reconnecting Client Specification
Introduction
A ReconnectingClient wraps a roam ConnectionHandle and provides
transparent reconnection when the underlying transport fails. Callers
make RPC calls as normal; if the connection is lost, the client
automatically reconnects and retries the call.
Motivation
Roam clients currently fail permanently when the connection drops. This forces every consumer to implement their own reconnection logic at the wrong abstraction level. Reconnection belongs in roam-stream, at the transport layer.
API
Connector Trait
A Connector is a factory that creates new connections on demand.
It is called on initial connect and after each disconnect.
pub trait Connector : Send + Sync + ' static {
type Transport : MessageTransport ;
/// Establish a new connection.
fn connect ( & self ) -> impl Future < Output = io:: Result < Self :: Transport >> + Send ;
/// Hello parameters for the connection.
fn hello ( & self ) -> Hello ;
}
The Transport associated type MUST implement MessageTransport.
This allows any roam-compatible transport to be used.
The hello() method returns the Hello parameters to use when
establishing the connection. These are sent during the hello
exchange after transport connect succeeds.
ReconnectingClient
ReconnectingClient<C> wraps a Connector and provides automatic
reconnection with configurable retry policy.
pub struct ReconnectingClient < C : Connector > {
connector : C ,
// ... internal state
}
Construction
ReconnectingClient::new() does NOT connect immediately. The first
call triggers the initial connection attempt.
impl < C : Connector > ReconnectingClient < C > {
/// Create a new reconnecting client with default retry policy.
pub fn new ( connector : C ) -> Self ;
/// Create with custom retry policy.
pub fn with_policy ( connector : C , policy : RetryPolicy ) -> Self ;
}
Retry Policy
RetryPolicy configures reconnection behavior with exponential backoff.
pub struct RetryPolicy {
/// Maximum reconnection attempts before giving up.
pub max_attempts : u32 ,
/// Initial delay between reconnection attempts.
pub initial_backoff : Duration ,
/// Maximum delay between reconnection attempts.
pub max_backoff : Duration ,
/// Backoff multiplier.
pub backoff_multiplier : f64 ,
}
Default retry policy values:
max_attempts: 3initial_backoff: 100msmax_backoff: 5sbackoff_multiplier: 2.0
The delay before attempt N (1-indexed) is:
min(initial_backoff * backoff_multiplier^(N-1), max_backoff)
Making Calls
The call() method makes an RPC call with automatic reconnection.
If the call fails due to a transport error, it reconnects and retries
according to the retry policy.
impl < C : Connector > ReconnectingClient < C > {
/// Make an RPC call with automatic reconnection.
pub async fn call < Req , Resp >(
& self ,
method_id : u64 ,
request : & Req ,
) -> Result < Resp , ReconnectError >
where
Req : for < ' a > Facet < ' a >,
Resp : for < ' a > Facet < ' a >;
}
The handle() method returns the current ConnectionHandle for
direct access. The handle may become invalid if the connection drops.
impl < C : Connector > ReconnectingClient < C > {
/// Get a connection handle for making calls.
///
/// Prefer using `call()` directly for automatic retry.
pub async fn handle ( & self ) -> Result < ConnectionHandle , ReconnectError >;
}
Errors
ReconnectError distinguishes transport failures from RPC errors.
# [ derive ( Debug )]
pub enum ReconnectError {
/// All retry attempts exhausted.
RetriesExhausted {
original : io:: Error ,
attempts : u32 ,
},
/// Connection failed.
ConnectFailed ( io:: Error ),
/// RPC error (no reconnection attempted).
Rpc ( CallError ),
}
RetriesExhausted is returned when a transport error occurs and
all reconnection attempts fail. It contains the original error that
caused the disconnect and the number of attempts made.
ConnectFailed is returned when the initial connection or a
reconnection attempt fails with an error from the connector.
Rpc wraps call-level errors that are NOT transport failures.
These errors do not trigger reconnection because the connection
is still valid.
Behavior
Connection Lifecycle
Connection lifecycle:
- Lazy connection: No connection attempt until first call
- On first call: Call
connector.connect(), perform hello exchange, spawn driver - On success: Cache the
ConnectionHandle, complete the call - On transport error during call: Mark connection dead, trigger reconnection
Reconnection Triggers
Reconnection is triggered when a call fails with a transport error. Transport errors include:
- Broken pipe (EPIPE)
- Connection reset (ECONNRESET)
- Connection closed by peer
- Goodbye message received
- Any
io::Errorfrom the transport layer
Reconnection is NOT triggered for RPC-level errors:
RoamError::UnknownMethodRoamError::InvalidPayloadRoamError::Cancelled- Application errors (
RoamError::User) - Serialization errors
Reconnection Flow
When a transport error occurs:
- Mark the current connection as dead
- Set attempt counter to 1
- Call
connector.connect() - On connect success: perform hello exchange, spawn driver, retry original call
- On connect failure: wait backoff duration, increment attempt counter
- If attempts < max_attempts: go to step 3
- Otherwise: return
RetriesExhausted
Call fails with transport error
|
v
Mark connection dead
|
v
+------------------+
| Attempt = 1 |
+--------+---------+
|
v
+------------------+
| connector.connect() |
+--------+---------+
|
+-----+-----+
| |
Success Failure
| |
v v
Hello Wait backoff
exchange |
| |
| Attempt += 1
| |
| +-----+-----+
| | |
| < max >= max
| | |
| v v
| Retry RetriesExhausted
| |
v |
Spawn |
driver |
| |
v |
Retry <--+
original
call
Concurrency
Only one reconnection attempt runs at a time. If multiple calls fail simultaneously, they share the reconnection attempt.
Callers blocked during reconnection wait for it to complete. After reconnection succeeds, all waiting callers proceed with the new connection.
Implementation note: Use a Mutex<State> or similar to serialize
reconnection attempts while allowing concurrent calls on a healthy
connection.
Driver Lifecycle
Each connection spawns a driver task that processes incoming messages.
On disconnect, the driver task exits (or is aborted). A new connection spawns a new driver. The driver handle is stored internally for cleanup.
Integration
Generated Clients
Generated clients (e.g., FooClient) wrap a ConnectionHandle.
To use with ReconnectingClient:
// Option A: Get handle and construct client (handle may become stale)
let handle = reconnecting. handle (). await ?;
let client = FooClient :: new ( handle);
// Option B: Use call() directly
let response: StatusResponse = reconnecting
. call ( foo_method_id:: status (), & ())
. await ?;
Example Usage
// Define connector
struct DaemonConnector {
socket_path : PathBuf ,
}
impl Connector for DaemonConnector {
type Transport = CobsFramed < UnixStream >;
async fn connect ( & self ) -> io:: Result < Self :: Transport > {
let stream = UnixStream :: connect ( & self . socket_path ). await ?;
Ok ( CobsFramed :: new ( stream))
}
fn hello ( & self ) -> Hello {
Hello :: V1 {
max_payload_size : 1024 * 1024 ,
initial_channel_credit : 64 * 1024 ,
}
}
}
// Use it
let connector = DaemonConnector { socket_path };
let client = ReconnectingClient :: new ( connector);
// Calls reconnect transparently
let status: StatusResponse = client
. call ( daemon_method_id:: status (), & ())
. await ?;
Test Requirements
Test: Connection drops mid-call, reconnects, call succeeds.
Test: Server stays down, returns RetriesExhausted after max attempts.
Test: Verify exponential backoff timing between attempts.
Test: Multiple tasks calling during reconnection all succeed after.
Test: RoamError::UnknownMethod is not treated as transport error.
Test: No connection until first call.
Test: Server sends Goodbye, client reconnects.