Let's Make an Information Display Part 1: Backend

About this Project

Our house in Vancouver came with a TV pre-installed in the kitchen, right above the fridge. This was, originally, for monitoring all of the security cameras. But surveillance cameras show the exact same thing, day in and day out. What if, instead, I aggregated a bunch of different datums our house cares about?

A former housemate had a little web app that displayed the weather, and when the next trash day was. But I was feeling a little more ambitious than that, and I never miss an opportunity to over-engineer something.

A mockup of a information display, showing weather, trash pickup, planes, and more
Figure 1: the initial mockup

There's two important things to notice here. One, planes are cool. Two, both the ADSB and the Slack message want to be near-real-time1, which means this can't just be a server side application.

The First Architecture

Client Side Best Side

My first idea was to have everything happen on the client side. There's a half dozen or so APIs that we need to aggregate, but if this going to only run in one or two places, why have a backend at all?

It turns out there's a couple reasons. One is CORS, an annoying but necessary security feature that won't let you just call any old URL you want. If you control the endpoint you can add a couple headers that say "Hey, whatever, do what you want." But I don't control a lot of the endpoints, so I was going to need a proxy anyway.

The other reason was that making requests from inside Webassembly was just a little awkward.

For example, here's how reqwest, a common Rust library, makes a request?

let client = reqwest::Client::new();
let res = client.post("http://httpbin.org/post")
    .body("Some text")
    .send()
    .await
    .unwrap();

Easy, right?

But on the Webassembly side, you can't just make arbitrary socket calls. You need to use the browser's built-in XHR methods.

Request::get("/path")
    .send()
    .await?
    .unwrap()

And while that interface is the same, pretty much no library you'd want to use comes with built-in support. You lose access to a lot of the ecosystem advantages Rust usually gives you. Before I gave up, for example, I wrote my own slack client because none of the existing ones supported WASM.

You'd expect Rust would have a… trait HttpClient that had a bunch of implementations, but no such luck. Maybe once AsyncTraits stabilise.

The architecture I eventually settled on was more traditional, consisting of three crates:

  • den-tail is the backend, running in a VM on my NAS
  • den-head is the frontend, running in webassembly in a browser
  • den-message represents the JSON-encoded wire format2, transmitted over websockets from den-tail to den-head.

In this post we'll discuss the first one, and part 2 will introduce our frontend.

The Backend Situation

Here's the basic problem statement: The backend needs to fetch updates from a lot of different sources, on a lot of different schedules. Bus departures should be every minute or so, but the trash schedule needs once a day at most.

And all that data needed to flow back over the websocket to the web frontend. Are you thinking what I'm thinking?

Actor model!!

A directed graph showing data flow, from GaugeUpdaters to GaugeCache to WsActor to den-tail
Figure 2: the datums must flow

There's a neat actor model library called actix. And even better for my purposes, it's mostly a web framework that happens to use actors. I don't have to choose a web framework!

A basic actor looks like this:

pub struct UpdateActor {
    updater: Rc<dyn GaugeUpdater>,
    name: &'static str,
}

impl Actor for UpdateActor {
    type Context = Context<Self>;
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct RequestUpdate;

impl Handler<RequestUpdate> for UpdateActor {
    type Result = ();

    fn handle(&mut self, _msg: RequestUpdate, ctx: &mut Self::Context) -> Self::Result {
        todo!()
    }
}

An actor can receive several kinds of messages, which are just structs or enums. The compiler even checks that you're sending messages to actors that understand them!

     --> den-tail/src/ws.rs:37:12
    |
37  |     a.send(MyStruct2);
    |       ---- ^^^^^^^^^ the trait `actix::Handler<MyStruct2>` is not implemented for `WsActor`
    |       |
    |       required by a bound introduced by this call
    |
    = help: the following other types implement trait `actix::Handler<M>`:
              <WsActor as actix::Handler<GaugeUpdateMessage>>
              <WsActor as actix::Handler<MyStruct>>

The Data Must Flow

Let's see how data actually moves through the system.

To do that, we'll take a look at the Bus Arrival. I call the individual displays "Gauges." Here's what the bus gauge looks like:

A title (46) with several numbers below it (9, 18, 55). The 9 has a wifi icon next to it
Figure 3: The Bus Gauge (with fake data)

This means, the 46 bus has arrivals in 9, 18, and 55 minutes. The 7's arrival is real-time, the others are just from the schedule.

First question: How do we get this information?

The API

I live in Vancouver, and translink has a real-time API we can use. I looked into using Transit's API, but it's limited to 1500 calls a month. That's plenty for an app used once a day, but it works out to 2 an hour for an always-on application. No good!

There wasn't a crate, but it's not too complicated an API:

[
  {
    "RouteNo": "049",
    "RouteName": "METROTOWN STN/DUNBAR LOOP/UBC",
    "Direction": "WEST",
    "RouteMap": {
      "Href": "https://nb.translink.ca/geodata/049.kmz"
    },
    "Schedules": [
      {
        "Pattern": "WB1",
        "Destination": "UBC",
        "ExpectedLeaveTime": "9:49pm 2024-01-03",
        "ExpectedCountdown": 11,
        "ScheduleStatus": "-",
        "CancelledTrip": false,
        "CancelledStop": false,
        "AddedTrip": false,
        "AddedStop": false,
        "LastUpdate": "09:37:09 pm"
      },
    ]
  }
]

There's a lot, but we don't care about most of it. Just the route number, the expected leave time, and the schedule status.

Rust has a really excellent SERialize/DEserialize library called, appropriately serde.

It's very simple to use! You just create a normal struct, add a few annotations when the JSON object doesn't quite match Rust's conventions, and derive Deserialize. Magic!

Here's our struct:

#[derive(Deserialize, Debug, Clone)]
struct Route {
    #[serde(rename = "RouteNo")]
    route_no: String,

    #[serde(rename = "Schedules")]
    schedules: Vec<EstimatedArrival>,
}

#[derive(Deserialize, Debug, Clone)]
struct EstimatedArrival {
    #[serde(rename = "ExpectedLeaveTime")]
    expected_leave_time: String,
    #[serde(rename = "ScheduleStatus")]
    schedule_status: String,
}

Then we have to retrieve it. There's a lot of Rust http clients, and I initially picked Reqwest. But after I settled on actix I moved everything to awc.

async fn fetch(&self) -> Result<Vec<Route>> {
        let url = format!(
            "https://api.translink.ca/rttiapi/v1/stops/{}/estimates",
            self.stop
        );
        Ok(self
            .client
            .get(url)
            .query(&[("apikey", &self.api_token)])?
            .insert_header((ACCEPT, "application/json"))
            .send()
            .await?
            .json()
            .await?)
    }

There's a neat trick here with the Rust type system: because we know that we're returning a Vec of Route structs, we don't need to tell json() what format to use! It'll automagically call Vec<Route>::deserialize.

There's a little more post-processing. We parse the date (annoyingly complicated, since the date format changes) and whether it's live or not3.

Here's the end result, from den-message. This is the wire format that will eventually be sent to the frontend

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct BusLine {
    pub bus_line: String,
    pub arrivals: Vec<BusArrival>,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct BusArrival {
    pub arrival: chrono::DateTime<Local>,
    pub live: bool,
}

The Interface

Obviously this isn't the only gauge we need to update. But the logic for every one is the same: retrieve an update, do some parsing, return it. And when you need a lot of implementations of the same thing, you make a trait!

#[async_trait(?Send)]
pub trait GaugeUpdater {
    async fn update(&self) -> Result<GaugeUpdate>;
}

Note that while it currently uses the asynctrait, it won't need to for long!

The ?Send is a consequence of Actix. Send is a Rust trait meaning "safe to send between threads." But it usually comes with some overhead, involving a mutex or some other synchronization primitive. But Actix's own future wrapper, ActorFuture, doesn't require Send. And since awc is mainly for the actix system, it isn't Send either. Until I figured out the ?Send, I got a lot of gnarly error messages, and I stuck with Reqwest (which is Send). But eventually the asynctrait docs gave me the answer.

Anyway! GaugeUpdate is an enum, with variants for all the gauges' associated data. Every kind of data we collect can be turned into a GaugeUpdate

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub enum GaugeUpdate {
    IpAddress(IpAddress),
    TrashDay(TrashDay),
    CalendarEvents(Vec<CalendarEvent>),
    SlackMessages(Vec<SlackMessage>),
    Weather(Weather),
    BusArrival(Vec<BusLine>),
}

There's our BusLine from above!

Updater

Since we're using an actor model, every updater is going to get its own actor. But since we've got a trait, we can use the single actor type for all of them.

pub struct UpdateActor {
    // Not having UpdateActor be typed makes it easier to construct collections
    updater: Rc<dyn GaugeUpdater>,
    name: &'static str,
}

Probably both the Rc and the dyn could be removed with some clever typings, but this works well enough.

Since actors are all about messages, we'll use an empty unit-like struct to indicate we want an update.

#[derive(Message)]
#[rtype(result = "()")]
pub struct RequestUpdate;

The actor could make its own schedule, but instead we use an UpdateSupervisor to periodically send RequestUpdate to every actor using the runinterval method on an actor's context.

The actors are all run from a Supervisor, so they'll be restarted in the face of any Err results. They can't do anything about panics, so we are extra diligent to not call unwrap or expect.

Cache Money

Let's go back to UpdateActor and see how it handles those RequestUpdate messages.

  impl Handler<RequestUpdate> for UpdateActor {
    type Result = ();

    fn handle(&mut self, _msg: RequestUpdate, ctx: &mut Self::Context) -> Self::Result {
        let updater = self.updater.clone();
        let name = self.name;
        ctx.spawn(
            async move { updater.update().await }
                .and_then(GaugeCache::update_cache)
                .map_ok_or_else(move |e| error!("Error running {}: {}", name, e), |_| ())
                .into_actor(self),
        );
    }
}

There's some housekeeping to appease the almighty borrow checker, but in essence we call updater.update().await and pass the result to GaugeCache::update_cache:

What's GaugeCache::update_cache?

I don't know if this is a common pattern in Actix, but in Erlang it's not typical to send messages directly to actors. Instead, there will be functions that send the appropriate messages for you:

-spec attach(Node) -> 'already_attached' | 'attached' when
      Node :: node().
attach(Node) ->
    gen_server:call({global, pool_master}, {attach, Node}).

GaugeCache::update_cache performs a similar task. It looks up the running cache instance from the supervisor (which will start it if necessary.) From there, it sends the update in a format the cache can understand.

pub async fn update_cache(msg: GaugeUpdate) -> crate::Result<()> {
      Self::from_registry()
          .try_send(GaugeUpdateMessage(msg))
          .map_err(|_| crate::Error::ActixSendError)
}

Let's take a look at the cache actor. It's the brains of this whole operation.

pub struct GaugeCache {
    gauges: HashMap<UpdateKind, GaugeUpdate>,
    clients: HashSet<Recipient<GaugeUpdateMessage>>,
}

gauges stores the most recent update of every kind we've received. clients represents outgoing websocket connections, which we'll get to.

Let's see what happens when we receive a GaugeUpdateMessage:

#[derive(Message, Clone)]
#[rtype(result = "()")]
pub struct GaugeUpdateMessage(pub GaugeUpdate);

impl Handler<GaugeUpdateMessage> for GaugeCache {
    fn handle(&mut self, msg: GaugeUpdateMessage, _ctx: &mut Self::Context) -> Self::Result {
        // Store our update
        self.receive(UpdateKind::of(&msg.0), msg.0.clone());
        // Send it to everyone subscribed
        self.clients.iter().for_each(|v| v.do_send(msg.clone()));
    }
}

Seems simple enough. But where did those clients come from? A Connect message, of course!

#[derive(Message)]
#[rtype(result = "()")]
struct Connect(Recipient<GaugeUpdateMessage>);

And what's that handler look like?

  impl Handler<Connect> for GaugeCache {
    type Result = ();

    fn handle(&mut self, msg: Connect, _ctx: &mut Self::Context) -> Self::Result {
        self.clients.insert(msg.0.clone());
        self.gauges
            .values()
            .cloned()
            .map(GaugeUpdateMessage)
            .for_each(|v| msg.0.do_send(v));
    }
}

First, we store our client in the client list so it can receive future updates. Then, we send a catch-up: the most recent messages of every kind. That way, a reconnecting client doesn't need to wait for fresh updates to come down the pipe. Some of the less time-critical updaters only run once a day, so they'd be waiting a while!

Web Sock It To Me

Actix handles websockets with the actix-web-actors crate. You'd think all of actix-web would be web-actors, but I guess not. Regardless, it plugin in nicely to our existing menagerie of actors.

The way this works is actually pretty interesting.

First of all, for incoming messages, we use a StreamHandler. Where a regular Handler handles a single message, StreamHandler works with a Stream of messages.

pub trait StreamHandler<I>
where
      Self: Actor,
{
    fn handle(&mut self, item: I, ctx: &mut Self::Context);

    //snip
}

For our implementation, there aren't many incoming messages we care about:

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsActor {
    fn handle(&mut self, item: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        match item {
            Ok(ws::Message::Ping(ping)) => ctx.pong(&ping),
            Ok(ws::Message::Close(c)) => {
                info!("Closing connection: {:?}", c);
                ctx.close(c)
            }
            // error handling omitted
        }
    }
}

We close gracefully, respond to pings, and handle errors.

But what about sending messages back down the websocket?

The key is to use the WebsocketContext instead of the regular Actix Context.

impl Actor for WsActor {
    type Context = WebsocketContext<Self>;

    // snip
}

How does this work? Let's take a look at WebsocketContext::create's signature

pub fn create<S>(actor: A, stream: S) -> impl Stream<Item = Result<Bytes, Error>>
where
    A: StreamHandler<Result<Message, ProtocolError>>,
    S: Stream<Item = Result<Bytes, PayloadError>> + 'static,

We take an Actor (which must be a StreamHandler) and a Stream. The Stream is over Bytes, which will be decoded into the Message we expect. It's designed to take the web::Payload actixweb uses.

But what's that return type? Normally we'd expect a Self to be returned, but instead we get a stream of Bytes

That stream is how outgoing messages get sent to the client. When you call WebsocketContext::text to send a message, behind the scenes it's enqueueing a message which ultimately ends up in that stream.

Here's how we use it to send GaugeUpdateMessages

impl Handler<GaugeUpdateMessage> for WsActor {
    type Result = ();

    fn handle(&mut self, msg: GaugeUpdateMessage, ctx: &mut Self::Context) -> Self::Result {
        match serde_json::to_string(&DenMessage::Update(msg.0)) {
            Err(e) => error!("Failed to encode payload: {:?}", e),
            Ok(msg) => ctx.text(msg),
        };
    }
}

(We'll get to DenMessage in a later post, but for now it's just a wrapper enum around GaugeUpdate.)

Handle is synchronous, so ctx.text can't wait for a message to be sent. Instead, it gets added to WebsocketContext's internal queue that's eventually set back to the client by the whims of Actix's scheduler.

How does the cache know to send us GaugeUpdateMessage? We need to subscribe to the cache. If we look at the Actor trait, we can see that there's some lifecycle hooks we can use:

pub trait Actor: Sized + Unpin + 'static {
  type Context: ActorContext;

  fn started(&mut self, ctx: &mut Self::Context) { ... }
  fn stopped(&mut self, ctx: &mut Self::Context) { ... }

  // snip
}

So these just need to be hooked up to the GaugeCache.

The signature looks like this:

pub fn connect(r: Recipient<GaugeUpdateMessage>) -> crate::Result<()>

where Recipient means "Any actor that can receive a GaugeUpdateMessage. In practice this will always be the same kind of actor, but there's no reason to hardcode that. This constrains us to only sending GaugeUpdateMessage, instead of every message we've got a Handler for, but that's okay.

Wiring it up gets us this:

fn started(&mut self, ctx: &mut Self::Context) {
      if let Err(e) = GaugeCache::connect(ctx.address().recipient()) {
          error!("Failed to connect to cache: {:?}", e);
          ctx.stop();
      }
  }

There's a similar stopped method so we don't send messages to a client not listening (there's a metaphor there, probably).

Now all that's left is to actually run actix! Their documentation is very good, this is all we need:

#[get("/ws")]
async fn websocket(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse> {
    Ok(ws::start(WsActor, &req, stream)?)
}

There's the incoming stream for upstream Websocket messages. ws::start will call HttpResponseBuilder::streaming to stream the downstream half. And that's the end of the backend!

A diagram of message flows between different actors in the system
Figure 4: this chart isn't useful but i like graphs

Our bus arrival is traveling over the websocket connection. What does that wire format look like? I'm not actually sure! The beauty of using Rust on both ends is that we just need to make sure serialisation is reversible. The libraries will handle the rest.

Next Time

We'll build out the frontend and actually use this data we fired down the pipe!

Part 2

Footnotes:

1

I haven't actually implemented either of these. Whoops.

2

I briefly considered using gRPC, but those same restrictions raised their heads: browsers don't provide low-enough level socket access for GRPC to work without a weird proxy.

3

this is a completely undocumented single character. I have assummed "+", "-", and " " are live, mostly based on guesses.