Let's Make an Information Display Part 1: Backend
About this Project
Our house in Vancouver came with a TV pre-installed in the kitchen, right above the fridge. This was, originally, for monitoring all of the security cameras. But surveillance cameras show the exact same thing, day in and day out. What if, instead, I aggregated a bunch of different datums our house cares about?
A former housemate had a little web app that displayed the weather, and when the next trash day was. But I was feeling a little more ambitious than that, and I never miss an opportunity to over-engineer something.
There's two important things to notice here. One, planes are cool. Two, both the ADSB and the Slack message want to be near-real-time1, which means this can't just be a server side application.
The First Architecture
Client Side Best Side
My first idea was to have everything happen on the client side. There's a half dozen or so APIs that we need to aggregate, but if this going to only run in one or two places, why have a backend at all?
It turns out there's a couple reasons. One is CORS, an annoying but necessary security feature that won't let you just call any old URL you want. If you control the endpoint you can add a couple headers that say "Hey, whatever, do what you want." But I don't control a lot of the endpoints, so I was going to need a proxy anyway.
The other reason was that making requests from inside Webassembly was just a little awkward.
For example, here's how reqwest
, a common Rust library, makes a request?
let client = reqwest::Client::new(); let res = client.post("http://httpbin.org/post") .body("Some text") .send() .await .unwrap();
Easy, right?
But on the Webassembly side, you can't just make arbitrary socket calls. You need to use the browser's built-in XHR methods.
Request::get("/path") .send() .await? .unwrap()
And while that interface is the same, pretty much no library you'd want to use comes with built-in support. You lose access to a lot of the ecosystem advantages Rust usually gives you. Before I gave up, for example, I wrote my own slack client because none of the existing ones supported WASM.
You'd expect Rust would have a… trait HttpClient
that had a bunch of implementations, but no such luck.
Maybe once AsyncTraits stabilise.
The architecture I eventually settled on was more traditional, consisting of three crates:
den-tail
is the backend, running in a VM on my NASden-head
is the frontend, running in webassembly in a browserden-message
represents the JSON-encoded wire format2, transmitted over websockets fromden-tail
toden-head
.
In this post we'll discuss the first one, and part 2 will introduce our frontend.
The Backend Situation
Here's the basic problem statement: The backend needs to fetch updates from a lot of different sources, on a lot of different schedules. Bus departures should be every minute or so, but the trash schedule needs once a day at most.
And all that data needed to flow back over the websocket to the web frontend. Are you thinking what I'm thinking?
Actor model!!
There's a neat actor model library called actix. And even better for my purposes, it's mostly a web framework that happens to use actors. I don't have to choose a web framework!
A basic actor looks like this:
pub struct UpdateActor { updater: Rc<dyn GaugeUpdater>, name: &'static str, } impl Actor for UpdateActor { type Context = Context<Self>; } #[derive(Message)] #[rtype(result = "()")] pub struct RequestUpdate; impl Handler<RequestUpdate> for UpdateActor { type Result = (); fn handle(&mut self, _msg: RequestUpdate, ctx: &mut Self::Context) -> Self::Result { todo!() } }
An actor can receive several kinds of messages, which are just structs or enums. The compiler even checks that you're sending messages to actors that understand them!
--> den-tail/src/ws.rs:37:12 | 37 | a.send(MyStruct2); | ---- ^^^^^^^^^ the trait `actix::Handler<MyStruct2>` is not implemented for `WsActor` | | | required by a bound introduced by this call | = help: the following other types implement trait `actix::Handler<M>`: <WsActor as actix::Handler<GaugeUpdateMessage>> <WsActor as actix::Handler<MyStruct>>
The Data Must Flow
Let's see how data actually moves through the system.
To do that, we'll take a look at the Bus Arrival. I call the individual displays "Gauges." Here's what the bus gauge looks like:
This means, the 46 bus has arrivals in 9, 18, and 55 minutes. The 7's arrival is real-time, the others are just from the schedule.
First question: How do we get this information?
The API
I live in Vancouver, and translink has a real-time API we can use. I looked into using Transit's API, but it's limited to 1500 calls a month. That's plenty for an app used once a day, but it works out to 2 an hour for an always-on application. No good!
There wasn't a crate, but it's not too complicated an API:
[ { "RouteNo": "049", "RouteName": "METROTOWN STN/DUNBAR LOOP/UBC", "Direction": "WEST", "RouteMap": { "Href": "https://nb.translink.ca/geodata/049.kmz" }, "Schedules": [ { "Pattern": "WB1", "Destination": "UBC", "ExpectedLeaveTime": "9:49pm 2024-01-03", "ExpectedCountdown": 11, "ScheduleStatus": "-", "CancelledTrip": false, "CancelledStop": false, "AddedTrip": false, "AddedStop": false, "LastUpdate": "09:37:09 pm" }, ] } ]
There's a lot, but we don't care about most of it. Just the route number, the expected leave time, and the schedule status.
Rust has a really excellent SERialize/DEserialize library called, appropriately serde.
It's very simple to use! You just create a normal struct,
add a few annotations when the JSON object doesn't quite match Rust's conventions,
and derive Deserialize
. Magic!
Here's our struct:
#[derive(Deserialize, Debug, Clone)] struct Route { #[serde(rename = "RouteNo")] route_no: String, #[serde(rename = "Schedules")] schedules: Vec<EstimatedArrival>, } #[derive(Deserialize, Debug, Clone)] struct EstimatedArrival { #[serde(rename = "ExpectedLeaveTime")] expected_leave_time: String, #[serde(rename = "ScheduleStatus")] schedule_status: String, }
Then we have to retrieve it. There's a lot of Rust http clients, and I initially picked Reqwest. But after I settled on actix I moved everything to awc.
async fn fetch(&self) -> Result<Vec<Route>> { let url = format!( "https://api.translink.ca/rttiapi/v1/stops/{}/estimates", self.stop ); Ok(self .client .get(url) .query(&[("apikey", &self.api_token)])? .insert_header((ACCEPT, "application/json")) .send() .await? .json() .await?) }
There's a neat trick here with the Rust type system:
because we know that we're returning a Vec
of Route
structs,
we don't need to tell json()
what format to use!
It'll automagically call Vec<Route>::deserialize.
There's a little more post-processing. We parse the date (annoyingly complicated, since the date format changes) and whether it's live or not3.
Here's the end result, from den-message
.
This is the wire format that will eventually be sent to the frontend
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct BusLine { pub bus_line: String, pub arrivals: Vec<BusArrival>, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct BusArrival { pub arrival: chrono::DateTime<Local>, pub live: bool, }
The Interface
Obviously this isn't the only gauge we need to update. But the logic for every one is the same: retrieve an update, do some parsing, return it. And when you need a lot of implementations of the same thing, you make a trait!
#[async_trait(?Send)] pub trait GaugeUpdater { async fn update(&self) -> Result<GaugeUpdate>; }
Note that while it currently uses the asynctrait, it won't need to for long!
The ?Send
is a consequence of Actix.
Send
is a Rust trait meaning "safe to send between threads."
But it usually comes with some overhead, involving a mutex or some other synchronization primitive.
But Actix's own future wrapper, ActorFuture,
doesn't require Send.
And since awc is mainly for the actix system, it isn't Send either.
Until I figured out the ?Send
, I got a lot of gnarly error messages,
and I stuck with Reqwest (which is Send
).
But eventually the asynctrait docs gave me the answer.
Anyway!
GaugeUpdate
is an enum
, with variants for all the gauges' associated data.
Every kind of data we collect can be turned into a GaugeUpdate
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub enum GaugeUpdate { IpAddress(IpAddress), TrashDay(TrashDay), CalendarEvents(Vec<CalendarEvent>), SlackMessages(Vec<SlackMessage>), Weather(Weather), BusArrival(Vec<BusLine>), }
There's our BusLine
from above!
Updater
Since we're using an actor model, every updater is going to get its own actor. But since we've got a trait, we can use the single actor type for all of them.
pub struct UpdateActor { // Not having UpdateActor be typed makes it easier to construct collections updater: Rc<dyn GaugeUpdater>, name: &'static str, }
Probably both the Rc
and the dyn
could be removed with some clever typings,
but this works well enough.
Since actors are all about messages, we'll use an empty unit-like struct to indicate we want an update.
#[derive(Message)] #[rtype(result = "()")] pub struct RequestUpdate;
The actor could make its own schedule,
but instead we use an UpdateSupervisor
to periodically send RequestUpdate
to every actor
using the runinterval method on an actor's context.
The actors are all run from a Supervisor,
so they'll be restarted in the face of any Err
results.
They can't do anything about panics,
so we are extra diligent to not call unwrap
or expect
.
Cache Money
Let's go back to UpdateActor
and see how it handles those RequestUpdate
messages.
impl Handler<RequestUpdate> for UpdateActor { type Result = (); fn handle(&mut self, _msg: RequestUpdate, ctx: &mut Self::Context) -> Self::Result { let updater = self.updater.clone(); let name = self.name; ctx.spawn( async move { updater.update().await } .and_then(GaugeCache::update_cache) .map_ok_or_else(move |e| error!("Error running {}: {}", name, e), |_| ()) .into_actor(self), ); } }
There's some housekeeping to appease the almighty borrow checker,
but in essence we call updater.update().await
and pass the result to GaugeCache::update_cache
:
What's GaugeCache::update_cache
?
I don't know if this is a common pattern in Actix, but in Erlang it's not typical to send messages directly to actors. Instead, there will be functions that send the appropriate messages for you:
-spec attach(Node) -> 'already_attached' | 'attached' when Node :: node(). attach(Node) -> gen_server:call({global, pool_master}, {attach, Node}).
GaugeCache::update_cache
performs a similar task.
It looks up the running cache instance from the supervisor
(which will start it if necessary.)
From there, it sends the update in a format the cache can understand.
pub async fn update_cache(msg: GaugeUpdate) -> crate::Result<()> { Self::from_registry() .try_send(GaugeUpdateMessage(msg)) .map_err(|_| crate::Error::ActixSendError) }
Let's take a look at the cache actor. It's the brains of this whole operation.
pub struct GaugeCache { gauges: HashMap<UpdateKind, GaugeUpdate>, clients: HashSet<Recipient<GaugeUpdateMessage>>, }
gauges
stores the most recent update of every kind we've received.
clients
represents outgoing websocket connections, which we'll get to.
Let's see what happens when we receive a GaugeUpdateMessage
:
#[derive(Message, Clone)] #[rtype(result = "()")] pub struct GaugeUpdateMessage(pub GaugeUpdate); impl Handler<GaugeUpdateMessage> for GaugeCache { fn handle(&mut self, msg: GaugeUpdateMessage, _ctx: &mut Self::Context) -> Self::Result { // Store our update self.receive(UpdateKind::of(&msg.0), msg.0.clone()); // Send it to everyone subscribed self.clients.iter().for_each(|v| v.do_send(msg.clone())); } }
Seems simple enough. But where did those clients come from? A Connect message, of course!
#[derive(Message)] #[rtype(result = "()")] struct Connect(Recipient<GaugeUpdateMessage>);
And what's that handler look like?
impl Handler<Connect> for GaugeCache { type Result = (); fn handle(&mut self, msg: Connect, _ctx: &mut Self::Context) -> Self::Result { self.clients.insert(msg.0.clone()); self.gauges .values() .cloned() .map(GaugeUpdateMessage) .for_each(|v| msg.0.do_send(v)); } }
First, we store our client in the client list so it can receive future updates. Then, we send a catch-up: the most recent messages of every kind. That way, a reconnecting client doesn't need to wait for fresh updates to come down the pipe. Some of the less time-critical updaters only run once a day, so they'd be waiting a while!
Web Sock It To Me
Actix handles websockets with the actix-web-actors crate. You'd think all of actix-web would be web-actors, but I guess not. Regardless, it plugin in nicely to our existing menagerie of actors.
The way this works is actually pretty interesting.
First of all, for incoming messages, we use a StreamHandler.
Where a regular Handler
handles a single message,
StreamHandler
works with a Stream of messages.
pub trait StreamHandler<I> where Self: Actor, { fn handle(&mut self, item: I, ctx: &mut Self::Context); //snip }
For our implementation, there aren't many incoming messages we care about:
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsActor { fn handle(&mut self, item: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) { match item { Ok(ws::Message::Ping(ping)) => ctx.pong(&ping), Ok(ws::Message::Close(c)) => { info!("Closing connection: {:?}", c); ctx.close(c) } // error handling omitted } } }
We close gracefully, respond to pings, and handle errors.
But what about sending messages back down the websocket?
The key is to use the WebsocketContext instead of the regular Actix Context.
impl Actor for WsActor { type Context = WebsocketContext<Self>; // snip }
How does this work?
Let's take a look at WebsocketContext::create
's signature
pub fn create<S>(actor: A, stream: S) -> impl Stream<Item = Result<Bytes, Error>> where A: StreamHandler<Result<Message, ProtocolError>>, S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
We take an Actor (which must be a StreamHandler
) and a Stream
.
The Stream is over Bytes
, which will be decoded into the Message we expect.
It's designed to take the web::Payload actixweb uses.
But what's that return type?
Normally we'd expect a Self
to be returned,
but instead we get a stream of Bytes
That stream is how outgoing messages get sent to the client.
When you call WebsocketContext::text
to send a message,
behind the scenes it's enqueueing a message which ultimately ends up in that stream.
Here's how we use it to send GaugeUpdateMessages
impl Handler<GaugeUpdateMessage> for WsActor { type Result = (); fn handle(&mut self, msg: GaugeUpdateMessage, ctx: &mut Self::Context) -> Self::Result { match serde_json::to_string(&DenMessage::Update(msg.0)) { Err(e) => error!("Failed to encode payload: {:?}", e), Ok(msg) => ctx.text(msg), }; } }
(We'll get to DenMessage
in a later post,
but for now it's just a wrapper enum around GaugeUpdate
.)
Handle is synchronous, so ctx.text
can't wait for a message to be sent.
Instead, it gets added to WebsocketContext
's internal queue
that's eventually set back to the client by the whims of Actix's scheduler.
How does the cache know to send us GaugeUpdateMessage
?
We need to subscribe to the cache.
If we look at the Actor
trait,
we can see that there's some lifecycle hooks we can use:
pub trait Actor: Sized + Unpin + 'static { type Context: ActorContext; fn started(&mut self, ctx: &mut Self::Context) { ... } fn stopped(&mut self, ctx: &mut Self::Context) { ... } // snip }
So these just need to be hooked up to the GaugeCache
.
The signature looks like this:
pub fn connect(r: Recipient<GaugeUpdateMessage>) -> crate::Result<()>
where Recipient
means "Any actor that can receive a GaugeUpdateMessage
.
In practice this will always be the same kind of actor,
but there's no reason to hardcode that.
This constrains us to only sending GaugeUpdateMessage,
instead of every message we've got a Handler
for,
but that's okay.
Wiring it up gets us this:
fn started(&mut self, ctx: &mut Self::Context) { if let Err(e) = GaugeCache::connect(ctx.address().recipient()) { error!("Failed to connect to cache: {:?}", e); ctx.stop(); } }
There's a similar stopped
method so we don't send messages to a client not listening
(there's a metaphor there, probably).
Now all that's left is to actually run actix! Their documentation is very good, this is all we need:
#[get("/ws")] async fn websocket(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse> { Ok(ws::start(WsActor, &req, stream)?) }
There's the incoming stream
for upstream Websocket messages.
ws::start
will call HttpResponseBuilder::streaming to stream the downstream half.
And that's the end of the backend!
Our bus arrival is traveling over the websocket connection. What does that wire format look like? I'm not actually sure! The beauty of using Rust on both ends is that we just need to make sure serialisation is reversible. The libraries will handle the rest.
Next Time
We'll build out the frontend and actually use this data we fired down the pipe!
Footnotes:
I haven't actually implemented either of these. Whoops.
I briefly considered using gRPC, but those same restrictions raised their heads: browsers don't provide low-enough level socket access for GRPC to work without a weird proxy.
this is a completely undocumented single character. I have assummed "+", "-", and " " are live, mostly based on guesses.