This post describes my journey in implementing Future from scratch for use with tokio to write asynchronous tasks.

tl;dr: It is possible, but not in the way I want it.

Goal

I have a simple application, redshift-rs, that runs in the background and should be able to perform some simple tasks concurrently:

  • Listen for Crtl+C interrupt (SIGINT and SIGTERM preferably) and begin shutting down once received

  • Every five seconds, check to see if display settings need to be adjusted. When transitioning from day to night or night to day, this should be checked every 100ms.

The application already does this, but using threads and channels; one thread listens for signals (using the chan and chan-signal crates), another thread is a timer thread that sleeps for either five seconds or 100ms and sends a message. The main thread then selects over the channels provided to each of the background threads.

If this were Go, I’d probably stick with something like the current implementation because the Go runtime provides green threading and the three tasks outlined above would probably not use three OS threads. But this is Rust and there is (no longer) green threading.

None of the tasks are computation heavy, so it’d make more sense to have one task handling all the details: checking signals at the right time or adjusting display settings. Instead of writing this by hand, I figure this is something that Tokio and the futures library should be able to do.

Overall design

We probably want to model timeouts as a Stream and not a future, because we want a occurrences every x ms.

First steps

To get started, I started looking at just implementing a Future that expires after a given Duration.

struct Sleep {
    when: Instant,
}

impl Sleep {
    fn new(dur: Duration) -> Sleep {
        Sleep {
            when: Instant::now() + dur,
        }
    }

    fn is_expired(&self) -> bool {
        Instant::new() >= self.when
    }
}

It is created at some point in time and should expire after some indicated duration. We are not worried about precision.

Next we want to implement Future for Sleep. The following code could be our first attempt:

impl Future for Sleep {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<()>, ()> {
        if self.is_expired() {
            Ok(Async::Ready(()))
        } else {
            Ok(Async::NotReady)
        }
    }
}

Seems simple, right? It doesn’t quite work though. To see that let us tie everything together.

fn main() {
    let sleep = Sleep::new(Duration::from_secs(5));
    current_thread::run(|_| {
        current_thread::spawn(sleep);
        println!("Started!");
    });
    println!("Finished!");
}

This program prints “Started!” and then just hangs. On closer inspection (aka throwing println!() in various places) tells us that poll() is called once, before it is expired and returns Async::NotReady. So why isn’t it called again?

Learning more about Async::NotReady

Returning Async::NotReady is special in Tokio. It is the responsibility of the executor handling your task to make sure that poll() is called, ideally at the right time. But how is the executor supposed to know when it is the right time?

At this point, I was a little frustrated with the documentation on Tokio. Most (if not all) the implementations of Future and Stream depend on other things that already implement those traits and the example implementations rely on the inner implementations to do the “right thing”.

From the section on futures there is a better hint:

[…] when a task returns NotReady, once it transitioned to the ready state the executor is notified. […] When a function returns Async::NotReady, it is critical that the executor is notified when the state transitions to “ready”. Otherwise, the task will hang infinitely, never getting run again.

Well, this explains why our first implementation didn’t work.

Innermost futures, sometimes called “resources”, are the ones responsible for notifying the executor. This is done by calling notify on the task returned by task::current(). […] Before an executor calls poll on a task, it sets the task context to a thread-local variable. The inner most future then accesses the context from the thread-local so that it is able to notify the task once its readiness state changes.

So to have our Sleep future polled again we just need to notify the current task? Let us try this then:

impl Future for Sleep {
    ...
    fn poll(&mut self) -> Result<Async<()>, ()> {
        if self.is_expired() {
            Ok(Async::Ready(()))
        } else {
            let task = task.current();
            task.notify();
            Ok(Async::NotReady)
        }
    }
}

This works! Our program now prints

$ cargo run
Started!
Finished!

with five seconds in between the two lines. But this is not the “correct way” to do it. Whenever our task is polled it notifies the executor that it is ready to be polled again, which is not true for a whole five seconds. If we count the number of times our task is polled, we get something like this:

$ cargo run
Started!
Polled 2915047 times
Finished!

Polled almost three million times (and that is not even --release)! We are essentially just busy-waiting here, which we certainly do not want to do unless necessary.

An aside: tokio-timer

I found a library called tokio-timer that implements timing functionality for Tokio-based applications, which sounds like exactly what we are looking to do ourselves. But it is implemented with a background std::thread which is exactly what we are trying to avoid.

Conclusion

A little under two months later

I never finished writing this up. My conclusion at the time was that although it is possible to implement your own futures from scratch without relying on internal future polling, I couldn’t really come up with any other method than busy-waiting that also does not employ another thread.

For my stated goal of reducing the number of threads in redshift-rs, I ended up experimenting with mio which actually allowed me to implement a single-threaded signal handling. The result can be found in the mio-signal branch