PubSub
A simple PubSub support is added in 0.8.0. A simple example can be found here.
A publisher can be created on the server side or the client side using the publisher::<T: Topic>()
method, and a subscriber can be created using the subscriber::<T: Topic>(cap: usize)
method. They both take one type parameter T
which must implement the toy_rpc::pubsub::Topic
trait. You can use the provided derive macro #[derive(toy_rpc::macros::Topic)]
to define a struct as the pubsub message or by manually implementing the toy_rpc::pubsub::Topic
trait on a type.
use toy_rpc::macros::Topic;
use serde::{Serializer, Deserialize};
#[derive(Topic, Serialize, Deserialize)]
pub struct Count(pub u32);
The message item type and topic name can also be customized using attribute #[topic]
. For example
#[derive(Serialize, Deserialize, Topic)
#[topic(rename="C")] // This will only change topic name to "C", and the message item type is still `Count`
pub struct Count(u32);
#[derive(Topic)
#[topic(item = "u32")] // This will only change the message item type
pub struct Count { }
#[derive(Topic)
#[topic(rename = "C", item = "u32")] // Or customize both topic name and item type
pub struct Count { }
Or manually implement the Topic
trait
#[derive(Serialize, Deserialize)]
pub struct Count(pub u32);
impl toy_rpc::pubsub::Topic for Count {
type Item = Count; // The Item type must implement `Serialize` and `Deserialize`
// A String identifier for the topic. The user must ensure it is unique
fn topic() -> String {
"Count"
}
}
A publisher can be created by specifying the topic in the type parameter.
let publisher = client.publisher::<Count>(); // on client side
// let publisher = server.publisher::<Count>(); // on server side
The Publisher
implements the futures::Sink<T>
trait where T
is the type parameter representing the topic. In order to publish message to the topic, the futures::SinkExt
trait must be imported.
use futures::SinkExt;
publisher.send(Count(7)).await.unwrap();
A subscriber can be created by specifying the topic in the type parameter and the capacity of its local buffer. Here we will create a subscriber on the client side listening to messages on the topic Count
with a local capacity of 10.
let subscriber = client.subscirber::<Count>(10).unwrap(); // on the client side
// let subscriber = server.subscriber::<Count>(10).unwrap(); // on the server side (except for `actix-web`)
The Subscriber
implements the futures::Stream<Item = Result<T, toy_rpc::Error>>
trait where T
is the type parameter representing the topic. In order to process incoming messages, the futures::StreamExt
trait must be imported.
use futures::StreamExt;
if let Some(result) = subscriber.next().await {
let item = result.unwrap(); // There could be errors recving incoming messages
// do something with the item
}
Example
Ack
for Publish
message delivery
As of version 0.8.0-beta.0, Ack
is added in the cases where explicit Ack
is needed. Ack
only applies to acknowledge receiving of Publish
message and does NOT apply to any RPC requests/responses. There are three different AckMode
AckModeNone
, which is the default mode for both theServer
andClient
. This mode is available on both theServer
and theClient
Under this mode, noAck
message will be required by the publisher or be sent by the subscriber.AckModeAuto
. This mode is available on both theServer
andClient
. Under this mode, both the server and the client will automatically reply with anAck
to anyPublish
message they receive.AckModeManual
. This mode is only available onClient
. Under this mode, the subscriber needs to manually.ack()
in order to get the published item. Please note that under the manual mode, thePublisher
behaves the same as if it is under theAckModeAuto
mode.
The behavior of publisher/subscriber will be discussed in different senarios below.
-
Publisher
on theServer
withAckModeAuto
When a
Publisher
is created on the server side, the server's pubsub handler will wait for ALLAck
s from the subscribers, including that fromSubscriber
on theServer
, in an asynchronous manner, meaning the publisher is able to continue publishing new messages even if some subscribers have not sent backAck
yet. Upon reaching the timeout, the server's pubsub handler will try to resend the same publish message (with the same sequence ID) to theSubscriber
s that have not send backAck
messages. The server will stop retrying after the maximum number of retries is reached. -
Publisher
on theClient
withAckModeAuto
orAckModeManual
When a
Publisher
is created on the client side, the client will wait for only ONEAck
message from theServer
in an asynchronous manner, meaning thePublisher
is able to continue publishing new messages even if theAck
message from theServer
has not arrived. If theAck
message from theServer
does not arrive before the timeout expires, the client will attempt to publish the same message (with the same message ID). The client (Publisher
) will stop retrying after the maximum number of retries is reached.Once the
Publish
message is received by theServer
, the message will be assigned a new sequence ID that is tracked only by theServer
. The message will then be published to all subscribers under the topic, and the server will wait for ALLAck
messages from the subscribers in an asynchronous manner, meaning the server will be able to keep handling RPC requests or PubSub messages while waiting forAck
messages to come back. If not allAck
messages are sent back to the server before the timeout expires, the server will attempt to resend the same message with the same sequence ID number to the subscribers whoseAck
messages are not received. The server will stop retrying after the maximum number of retries is reached. -
Subscriber
on theServer
side withAckModeAuto
Please note that the
Server
side does NOT supportAckModdManual
. Upon receiving a published message, the subscriber will automatically send back anAck
message to the PubSub handler on the server. -
Subscriber
on theClient
side withAckModeAuto
Upon receiving a published message, the
Client
will automatically send back anAck
message back to theServer
. -
Subscriber
on theClient
side withAckModeManual
Instead of receiving the usual
Result<Topic::Item, Error>
from theSubscriber
stream, the user will receiveResult<Delivery<Topic::Item>, Error>
. In order to get the usualTopic::Item
, the user will need to call.ack()
method on theDelivery
object (ie.let item = delivery.ack()
), which will send back anAck
message to theServer
.
How to use AckMode
By default, all Server
and Client
start with AckModeNone
let server = Server::builder()
.build(); // This will create a server with `AckModeNone`
let client = Client::dial(ADDR)
.await.unwrap(); // This will create a client with `AckModeNone`
Ack
can be enabled by setting the Server
or Client
into the corresponding mode using the corresponding builder.
let server = Server::builder() // This will start the builder with `AckModeNone`
.set_ack_mode_auto() // This will set the ServerBuilder to `AckModeAuto`
.build(); // This will build the server with `AckModeAuto`
let client = Client::builder()
.set_ack_mode_auto() // This will set the ClientBuilder to `AckModeAuto`
.dial(ADDR)
.await.unwrap(); // This will create a Client with `AckModeAuto`
let client = Client::builder() // This will start the builder with `AckModeNone`
.set_ack_mode_manual() // This will set the ClientBuilder to `AckModeManual`
.dial(ADDR)
.await.unwrap(); // This will create a Client with `AckModeManual`
The timeout and maximum number of retries for the publisher can also be configured
let server = Server::builder()
// Must enable Ack first
.set_ack_mode_auto()
// Sets how long the server will wait for Ack messages.
//
// This also affects the Publish messages sent by
// `Publisher`s from the client side
.set_publisher_retry_timeout(Duration::from_secs(5))
// Sets many times the server will retry.
//
// This also affects the Publish messages sent by
// `Publisher`s from the client side
.set_publisher_max_num_retries(3)
.build();
let client = Client::builder()
// Must set the builder into some `Ack` mode
.set_ack_mode_auto()
// Sets how long the client publisher will wait for Ack message from Server.
//
// This does ***NOT*** affect how long the server will wait for
// `Ack` from the subscribers
.set_publisher_retry_timeout(Duration::from_secs(5))
// Sets how long the client publisher will retry to send the Publish
// message to the server.
//
// This does ***NOT*** affect how many times the server will attempt to
// resend the publish message to the subscribers
.set_publisher_max_num_retries(3)
.dial(ADDR)
.await.unwrap();