Omar Abid

Implementing the Tower Service Trait

A few years back, Tokio introduced the Tower library. The aim is to create a set of reusable traits, structs, and functions for building network services. For an in-depth understanding of the rationale behind it (which I highly recommend reading first), check out: Inventing the Service trait.

However, there is limited documentation or guidance on implementing the trait itself. First, let's recap a scenario where such an implementation is necessary:

  1. You have a library that accepts a client compatible with Tower's Service trait.
  2. There isn't an existing implementation for such a client. This can be the case on untypical platforms, like WebAssembly.

The Library

The library in question is Octocrab. Octocrab includes networking support by default via Tokio, which does not support WebAssembly. It does, however, allow the use of a custom client.

Full source: custom_client.rs

    let client = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
        .build(connector);
    let octocrab = OctocrabBuilder::new_empty()
        .with_service(client)
        .with_layer(&BaseUriLayer::new(Uri::from_static(
            "https://api.github.com",
        )))
        .with_layer(&ExtraHeadersLayer::new(Arc::new(vec![(
            USER_AGENT,
            "octocrab".parse().unwrap(),
        )])))
        .with_auth(AuthState::None)
        .build()
        .unwrap();

Now, we just need to employ a different client that supports our target platform.

The Platform

The platform is Cloudflare Workers, which uses WebAssembly to execute compiled Rust code. Where WASM is executed here matters: Cloudflare Workers run on a customized V8 engine and does not support WASI (which is not a finalized standard yet). As such, your code won't have access to regular networking APIs.

HTTP requests here are made using the Fetch API, which is a JavaScript API for making network requests. This API is exposed to Rust via the wasm-bindgen crate. Cloudflare offers a worker crate, which exposes some functionalities for making HTTP requests.

The implementation

The trait could be implemented to any struct but since there isn't an actual "client" in the worker crate, I made a dedicated struct for it. The bulk of the implementation is in the call method and the bulk of the work is converting between the http crate types and the JavaScript ones.

use std::convert::TryInto;
use std::task::Context;
use std::{future::Future, pin::Pin, task::Poll};

use crate::send::SendFuture;
use crate::{Body, Error, Fetch, HttpResponse, Request};

/// A Tower-compatible Service implementation for Cloudflare Workers.
///
/// This struct implements the `tower::Service` trait, allowing it to be used
/// as a service in the Tower middleware ecosystem.
#[derive(Debug, Default, Clone, Copy)]
pub struct Service;

impl<B: http_body::Body<Data = bytes::Bytes> + Clone + 'static> tower::Service<http::Request<B>>
    for Service
{
    type Response = http::Response<Body>;
    type Error = Error;
    type Future =
        Pin<Box<dyn Future<Output = std::result::Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<std::result::Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: http::Request<B>) -> Self::Future {
        // Convert a http Request to a worker Request
        let worker_request: Request = req.try_into().unwrap();

        // Send the request with Fetch and receive a Future
        let fut = Fetch::Request(worker_request).send();

        // Convert the Future output to a HttpResponse
        let http_response =
            async { Ok(TryInto::<HttpResponse>::try_into(fut.await.unwrap()).unwrap()) };

        // Wrap the Future in a SendFuture to make it Send
        let wrapped = SendFuture::new(http_response);

        Box::pin(wrapped)
    }
}

Full Pull Request: Tower Service Trait implementation

Custom executors and Buffer

While the call method is responsible for receiving requests and sending responses, a runtime where it will be executed is needed. This is a non-issue if the environment supports Tokio; otherwise, you need to provide the runtime.

The method Buffer::pair provides a way to inject the call method into your own runtime.

Creates a new Buffer wrapping service, but returns the background worker. This is useful if you do not want to spawn directly onto the tokio runtime but instead want to use your own executor. This will return the Buffer and the background Worker that you can then spawn.

The runtime in our case is a simple executor. It is a closure that accepts the background worker returned from Buffer::pair as an argument.

executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()>>>) -> ()>

Since Octocrab didn't support that, a few changes had to be made. Mainly, using Buffer::pair instead of Buffer::new, accepting the executor closure, and passing the worker to it.

/// Creates a new `Octocrab` with a custom executor
fn new_with_executor<S>(
	service: S,
	auth_state: AuthState,
	executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()>>>) -> ()>,
) -> Self
where
	S: Service<Request<OctoBody>, Response = Response<BoxBody<Bytes, crate::Error>>>
		+ Send
		+ 'static,
	S::Future: Send + 'static,
	S::Error: Into<BoxError>,
{
	// Use Buffer pair to return the background worker
	let (service, worker) = Buffer::pair(BoxService::new(service.map_err(Into::into)), 1024);

	// Execute the background worker with the custom executor
	executor(Box::pin(worker));

	Self {
		client: service,
		auth_state,
	}
}

Full Pull Request: Custom executor for Wasm Support

Usage with workers

Finally, we get to the part where we use this with workers. First, we need to create the executor. It must capture the context, so make sure to expose it before. Then, we call the wait_until function and pass the worker to it. The wait_until function will start a new parallel thread to execute the worker.

let my_executor = move |worker: Pin<Box<dyn Future<Output = ()>>>| {
	context.wait_until(worker);
};

And pass both the service and the executor to OctocrabBuilder.

let octocrabis = octocrab::OctocrabBuilder::new_empty()
	.with_service(service)
	.with_executor(Box::new(my_executor))
	...             

Subscribe to the Newsletter

Get the latest posts from this blog delivered to your inbox. No spam.