tokio-statemapVisualize state transitions across Tokio tasks and worker threadsAuthorabhirag
LicenseMIT
Version0.0.1SourceGitHub

tokio-statemap

Library for instrumenting Rust programs to collect Tokio’s async execution traces as statemaps.

Overview

Having worked with Async Rust in anger and at scale, I hated performance regressions more than anything else. I know the rule is:

Async code should never spend a long time without reaching an .await

but when things aren’t going that well in production land and you have more Tokio tasks than sense, figuring out where this invariant is being broken is nigh impossible. There were times when I just wanted to pry open the Tokio runtime to see:

  • Which tasks were blocked, stuck waiting instead of making progress
  • Whether the worker threads were busy or mostly parked while tasks sat blocked
  • Any rhyme or reason to this madness, patterns that were impossible to see in logs or traces

I’ve looked up to Bryan Cantrill and Eliza Weisman’s work for years, so when I came across how Oxide was using statemaps to visualize Tokio runtime behavior, it felt like the missing piece. But Oxide uses DTrace to collect the required events which is not a viable option in most containerized environments I’ve worked in. I needed something that worked in scratch containers with zero dependencies, and this library is the result of that.

What follows is the complete library as a literate program:

Statemap

A software visualization in which time is on the X axis and timelines for discrete entities are stacked on the Y axis, with different states for the discrete entities rendered in different colors.

Generating a statemap consists of two steps: instrumentation and rendering. The result is a SVG that can be visualized with a SVG viewer (e.g., a web browser), allowing interaction.

Statemap of tokio tasks and worker threads

Data format

To generate data for statemap generation, instrumentation should create a file that consists of a stream of concatenated JSON. The expectation is that one JSON payload will consist of metadata, with many JSON payloads containing data.

Metadata

The following metadata fields are required:

  • start A two-element array of integers consisting of the start time of the data in seconds (the 0th element) and nanoseconds within the second (the 1st element). The start time should be expressed in UTC

  • states An object in which each member is the name of a valid entity state. Each member object can contain the following:

    • value The value by which this state will be referred to in the data stream

    • color The color that should be used to render the state. If the color is not specified, a color will be selected at random during rendering

    #[derive(Serialize, Debug)]
    struct State {
    value: usize,
    #[serde(skip_serializing_if = "Option::is_none")]
    color: Option<String>,
    }
    

    For example, here is a valid states object:

    "states": {
       "running": {
           "value": 1,
           "color": "#DC267F"
       },
       "waiting": {
           "value": 2,
           "color": "#FE6100"
       },
       "completed": {
           "value": 3,
           "color": "#648FFF"
       },
       "spawned": {
           "value": 0,
           "color": "#FFB000"
       }
    }
    

In addition, the metadata can contain the following optional fields:

  • title The title of the statemap, such that it can meaningfully be in the clause “statemap of title activity”

  • host The host on which the data was gathered

    fn get_hostname() -> Option<String> {
     hostname::get().ok().and_then(|h| h.into_string().ok())
    }
    
  • entityKind The entity under observation

The final struct thus becomes:

#[derive(Serialize, Debug)]
#[allow(non_snake_case)]
pub(crate) struct Metadata {
    start: [u64; 2],
    #[serde(skip_serializing_if = "Option::is_none")]
    title: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    host: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    entityKind: Option<String>,
    states: HashMap<String, State>,
}

For example valid metadata json looks like:

{
    "start": [
        1765405854,
        712163000
    ],
    "title": "Tokio Task Execution",
    "host": "abhirags-MacBook-Air.local",
    "entityKind": "Task",
    "states": {
        "spawned": {
            "value": 0,
            "color": "#FFB000"
        },
        "waiting": {
            "value": 2,
            "color": "#FE6100"
        },
        "completed": {
            "value": 3,
            "color": "#648FFF"
        },
        "running": {
            "value": 1,
            "color": "#DC267F"
        }
    }
}

Given the tokio task states we are tracking are:

tokio task states

#[derive(Debug)]
#[repr(u32)]
pub(crate) enum TaskState {
    Spawned = 0,
    Running = 1,
    Waiting = 2,
    Completed = 3,
}

impl TaskState {
    pub(crate) fn as_u32(self) -> u32 {
        self as u32
    }
}

Let’s create initializers for the metadata struct for tokio tasks:

impl Metadata {
    pub(crate) fn for_tasks(start: [u64; 2]) -> Self {
        let mut states = HashMap::new();
        states.insert(
            "spawned".to_string(),
            State {
                value: TaskState::Spawned as usize,
                color: Some("#FFB000".to_string()),
            },
        );
        states.insert(
            "running".to_string(),
            State {
                value: TaskState::Running as usize,
                color: Some("#DC267F".to_string()),
            },
        );
        states.insert(
            "waiting".to_string(),
            State {
                value: TaskState::Waiting as usize,
                color: Some("#FE6100".to_string()),
            },
        );
        states.insert(
            "completed".to_string(),
            State {
                value: TaskState::Completed as usize,
                color: Some("#648FFF".to_string()),
            },
        );

        Self {
            start,
            title: Some("Tokio Task Execution".to_string()),
            host: get_hostname(),
            entityKind: Some("Task".to_string()),
            states,
        }
    }
}

and for tokio worker threads, given the states we are tracking are:

worker thread states

#[derive(Debug)]
#[repr(u32)]
pub(crate) enum WorkerState {
    Running = 0,
    Parked = 1,
    Stopped = 2,
}

impl WorkerState {
    pub(crate) fn as_u32(self) -> u32 {
        self as u32
    }
}

initializer for metadata struct becomes:

impl Metadata {
    pub(crate) fn for_workers(start: [u64; 2]) -> Self {
        let mut states = HashMap::new();
        states.insert(
            "running".to_string(),
            State {
                value: WorkerState::Running as usize,
                color: Some("#DC267F".to_string()),
            },
        );
        states.insert(
            "parked".to_string(),
            State {
                value: WorkerState::Parked as usize,
                color: Some("#FE6100".to_string()),
            },
        );
        states.insert(
            "stopped".to_string(),
            State {
                value: WorkerState::Stopped as usize,
                color: Some("#648FFF".to_string()),
            },
        );

        Self {
            start,
            title: Some("Tokio Worker Threads".to_string()),
            host: get_hostname(),
            entityKind: Some("Worker".to_string()),
            states,
        }
    }
}

Data

The data for a statemap is provided following the metadata as concatenated JSON (that is, each JSON payload is a datum). Each datum is a JSON object that must contain the following members:

  • entity The name of the entity.

  • time The time of the datum, expressed as a nanosecond offset from the start member present in the metadata.

  • state The value of the state that begins at the time of the datum.

Each datum may also contain an additional member:

tag The tag for the state. See State tagging, below.

#[derive(Serialize, Debug)]
pub struct Datum {
    time: String,
    entity: String,
    state: u32,
    #[serde(skip_serializing_if = "Option::is_none")]
    tag: Option<String>,
}

impl Datum {
    pub(crate) fn new(time_ns: u64, entity: String, state: u32) -> Self {
        Self {
            time: time_ns.to_string(),
            entity,
            state,
            tag: None,
        }
    }

    pub(crate) fn with_tag(mut self, tag: String) -> Self {
        self.tag = Some(tag);
        self
    }
}

State tagging

It is often helpful to examine additional dimensionality within a particular state or states. For example, in understanding lifecycles of numerous tokio tasks, it is helpful to track the location in code they were spawned at. To facilitate this, statemaps support state tagging whereby an immutable tag is associated with a particular transition to a particular state. There can be an arbitrary number of such tags, but the expectation is that there are many more state transitions than there are tags. Tags are indicated by the tag member of the state datum payload. Elsewhere in the stream of data (though not necessarily before the tag is used), the tag should be defined with a tag-defining JSON payload that contains the following two members:

  • tag A string that is the tag that is being defined.

  • state The state that corresponds to this tag. Each state/tag tuple must have its own tag definition.

Beyond these two members, the tag definition can have any number of scalar members. Tags are immutable; if a tag is redefined, the last tag definition will apply to all uses of that tag. The tag should not contain member definitions that would cause it to be ambiguous with respect to data (namely, entity and time members).

#[derive(Serialize, Debug)]
pub struct TagDefinition {
    state: u32,
    tag: String,
    #[serde(flatten)]
    properties: HashMap<String, serde_json::Value>,
}

impl TagDefinition {
    pub(crate) fn new(state: u32, tag: String) -> Self {
        Self {
            state,
            tag,
            properties: HashMap::new(),
        }
    }

    pub(crate) fn with_property(
        mut self,
        key: String,
        value: impl Into<serde_json::Value>,
    ) -> Self {
        self.properties.insert(key, value.into());
        self
    }
}

As an example, here is a tag definition for tokio task spawned state that tracks the location in code the task was spawned at:

{"state":0,"tag":"examples/basic.rs:31:20","location":"examples/basic.rs:31:20","file":"examples/basic.rs"}

And here is an example of a tagged state datum:

{"time":"224000","entity":"task-5","state":0,"tag":"examples/basic.rs:31:20"}

This would indicate that at time 224000, entity task-5 went into state 0 (spawned) – and the tag for this state (in this case, the spawn location) was examples/basic.rs:31:20.

Collector

Architecture

The core event collection logic is:

  1. We tap into Tokio’s runtime hooks to collect lifecycle events

  2. Each thread writes its events to a single-producer single-consumer (SPSC) ring buffer it owns. Reading from and writing into this ring buffer is lock-free and wait-free

  3. Processing of events is done after collection stops. This separates the hot path (collection) from cold path (processing). This includes:

    • Draining all buffers
    • Sorting events by timestamp
    • Transforming events into statemap format

collector’s architecture

Thread local state

Let’s get into the weeds then. First, we need to identify worker threads by a u64 ID but the as_u64 method on ThreadId is yet to be stabilized so we’ll assign these IDs ourselves. We keep track of the next ID to be assigned in:

static WORKER_ID_COUNTER: AtomicU64 = AtomicU64::new(1);

We’ll keep some thread local state for every worker thread:

  • The producer side of rtrb::RingBuffer to push events
  • worker_id
struct ThreadLocalState {
    producer: RefCell<Option<Producer<Event>>>,
    worker_id: Cell<u64>,
}

impl ThreadLocalState {
    const fn new() -> Self {
        Self {
            producer: RefCell::new(None),
            worker_id: Cell::new(0),
        }
    }
}

thread_local! {
    static LOCAL_STATE: ThreadLocalState = ThreadLocalState::new();
}

Configuration

Now let’s get the configuration out of the way, configurable collector params are:

  • trace_target Whether we want to track lifecycles of tokio tasks, worker threads, both or none
#[derive(Debug, Default)]
pub enum TraceTarget {
    Tasks,
    Workers,
    #[default]
    Both,
    None,
}

impl TraceTarget {
    pub fn tracks_tasks(&self) -> bool {
        matches!(self, TraceTarget::Tasks | TraceTarget::Both)
    }

    pub fn tracks_workers(&self) -> bool {
        matches!(self, TraceTarget::Workers | TraceTarget::Both)
    }
}
  • buffer_capacity Max capacity of each ring buffer (per thread)

  • max_events Max total number of events to collect (globally)

  • include_spawn_location Whether to capture location in code where each task was spawned

#[derive(Debug, Clone)]
pub struct SpawnLocation {
    file: String,
    line: u32,
    column: u32,
}

impl std::fmt::Display for SpawnLocation {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "{}:{}:{}", self.file, self.line, self.column)
    }
}

CollectorConfig thus becomes:

#[derive(Debug)]
pub struct CollectorConfig {
    pub buffer_capacity: usize,
    pub max_events: usize,
    pub trace_target: TraceTarget,
    pub include_spawn_location: bool,
}

impl Default for CollectorConfig {
    fn default() -> Self {
        Self {
            buffer_capacity: 4096,
            max_events: 100000,
            trace_target: TraceTarget::Both,
            include_spawn_location: true,
        }
    }
}

Events

We tap into Tokio’s runtime hooks to capture the following events:

  1. For tokio tasks:
  2. For worker threads:

Every event includes:

  • time_ns Nanoseconds elapsed since collection started until the event occurred
  • u64 id
    • worker_id (introduced in Thread local state) for events corresponding to worker threads
    • task_id for events corresponding to tokio tasks

Additionally the TaskSpawn event may also contain task’s SpawnLocation

#[derive(Debug)]
pub enum Event {
    TaskSpawn {
        task_id: u64,
        time_ns: u64,
        location: Option<SpawnLocation>,
    },
    TaskPollStart {
        task_id: u64,
        time_ns: u64,
    },
    TaskPollEnd {
        task_id: u64,
        time_ns: u64,
    },
    TaskTerminate {
        task_id: u64,
        time_ns: u64,
    },
    WorkerStart {
        worker_id: u64,
        time_ns: u64,
    },
    WorkerStop {
        worker_id: u64,
        time_ns: u64,
    },
    WorkerPark {
        worker_id: u64,
        time_ns: u64,
    },
    WorkerUnpark {
        worker_id: u64,
        time_ns: u64,
    },
}

impl Event {
    pub fn time_ns(&self) -> u64 {
        match self {
            Event::TaskSpawn { time_ns, .. }
            | Event::TaskPollStart { time_ns, .. }
            | Event::TaskPollEnd { time_ns, .. }
            | Event::TaskTerminate { time_ns, .. }
            | Event::WorkerStart { time_ns, .. }
            | Event::WorkerStop { time_ns, .. }
            | Event::WorkerPark { time_ns, .. }
            | Event::WorkerUnpark { time_ns, .. } => *time_ns,
        }
    }
}

Tokio task id

When we tap into Tokio’s runtime hooks to capture events related to tasks we are passed TaskMeta which contains tokio::task::Id. Unfortunately tokio::task::Id conversion to u64 is still an open issue so we use tokio-dtrace’s approach.

#[derive(Debug, thiserror::Error)]
#[error(
    "\
tokio-statemap: POTENTIALLY UNSOUND CAST DETECTED!\n \
  size_of::<tokio::task::Id>() = {id_size}\n       \
       size_of::<NonZeroU64>() = {nonzero_u64_size}\n \
 align_of::<tokio::task::Id>() = {id_align}\n      \
      align_of::<NonZeroU64>() = {nonzero_u64_align}\n\
"
)]
pub struct InvalidCasts {
    id_size: usize,
    nonzero_u64_size: usize,
    id_align: usize,
    nonzero_u64_align: usize,
}

/// Checks that unsafe casts performed by `tokio-statemap` are valid.
///
/// `tokio-statemap` relies on the ability to cast a [`tokio::task::Id`] to a
/// [`u64`] in order to pass a task ID as an integer.
/// This function checks that the sizes and alignments of the types are compatible. If
/// they are not, it returns an error, indicating that the casts are potentially
/// unsound. This may occur if Tokio has changed the representation of the
/// [`tokio::task::Id`] type, which is unlikely, but always possible.
///
/// If this function returns an error, `tokio-statemap`'s runtime hooks should not
/// be used. Registering hooks using the [`register_hooks`] function will call
/// this function prior to registering the runtime hooks, and will fail to do so
/// if casts are unsound.
/// reference: https://github.com/oxidecomputer/tokio-dtrace/blob/91f72de4510298aec4a3772d4eaef6426ec2f85e/src/lib.rs#L266
pub fn check_casts() -> Result<(), InvalidCasts> {
    use std::mem::{align_of, size_of};

    let id_size = size_of::<tokio::task::Id>();
    let nonzero_u64_size = size_of::<NonZeroU64>();
    let id_align = align_of::<tokio::task::Id>();
    let nonzero_u64_align = align_of::<NonZeroU64>();

    if id_size != nonzero_u64_size || id_align != nonzero_u64_align {
        Err(InvalidCasts {
            id_size,
            nonzero_u64_size,
            id_align,
            nonzero_u64_align,
        })
    } else {
        Ok(())
    }
}
// reference: https://github.com/oxidecomputer/tokio-dtrace/blob/91f72de4510298aec4a3772d4eaef6426ec2f85e/src/lib.rs#L461
#[cfg(tokio_unstable)]
fn task_id_to_u64(id: tokio::task::Id) -> u64 {
    unsafe {
        // SAFETY: `check_casts()` has verified that `tokio::task::Id` has the same
        // size and alignment as `NonZeroU64`. This is called before hooks are registered.
        union TrustMeOnThis {
            id: tokio::task::Id,
            int: NonZeroU64,
        }
        TrustMeOnThis { id }.int.get()
    }
}

#[cfg(test)]
mod tests {
    #[test]
    fn casts_are_valid() {
        crate::check_casts().unwrap();
    }
}

Start time

Captured once when collection starts and used throughout the collection lifetime:

  • system_time Captures the wall-clock time (UTC) when collection started, used to populate the statemap metadata’s start field
  • instant Used for measuring elapsed time during collection. Unlike SystemTime, it is monotonically non-decreasing which makes it suitable for computing precise durations between events

We use fastant::Instant, a drop-in replacement for std::time::Instant with lower overhead on x86/x86_64 Linux due to use of TSC (Time Stamp Counter).

struct StartTime {
    system_time: SystemTime,
    instant: fastant::Instant,
}

Collector definition

Event collection handle shared across all worker threads via Arc<Collector>:

  • config Collection parameters. See Configuration

  • active Whether collection is running, checked on every event push

  • total_events Global event counter. When it hits max_events, collection stops automatically

  • start_time Captured once when collection starts. See Start time

  • consumers Aggregates the consumer end of each thread’s ring buffer. Threads register dynamically during collection, when a thread first pushes an event it creates its ring buffer and adds the consumer here. Multiple threads can register concurrently, so the Vec needs synchronized access for pushing, hence the Mutex.

pub struct Collector {
    config: CollectorConfig,
    active: AtomicBool,
    total_events: AtomicUsize,
    start_time: OnceLock<StartTime>,
    consumers: Mutex<Vec<Consumer<Event>>>,
}

Collector implementation

Now let’s look at Collector’s methods in the order they’re used during the collection lifecycle:

impl Collector {

Construction

We construct the collector with the provided config with inactive state, 0 total_events, start_time behind OnceLock so that it can be initialized lazily when collection starts and an empty vector to keep the consumer end of each thread’s ring buffer:

    pub(crate) fn new(config: CollectorConfig) -> Self {
        Self {
            config,
            active: AtomicBool::new(false),
            total_events: AtomicUsize::new(0),
            start_time: OnceLock::new(),
            consumers: Mutex::new(Vec::new()),
        }
    }

Starting and stopping collection

Collection is controlled through start and stop methods. Starting collection:

  • Initializes start_time if not already set (via OnceLock)

  • Resets the total_events counter

  • Sets the active flag to true

    pub fn start(&self) {
        self.start_time.get_or_init(|| StartTime {
            system_time: SystemTime::now(),
            instant: fastant::Instant::now(),
        });
        self.total_events.store(0, Ordering::Relaxed);
        self.active.store(true, Ordering::Relaxed);
    }

Stopping collection just sets the active flag to false:

    pub fn stop(&self) {
        self.active.store(false, Ordering::Relaxed);
    }

Checked before pushing every event:

    fn is_active(&self) -> bool {
        self.active.load(Ordering::Relaxed)
    }

Time helpers

Two methods handle time:

  • elapsed_ns computes nanoseconds since collection started (used for event timestamps)

  • start_time_array converts the wall-clock start time to the [seconds, nanoseconds] format required by statemap metadata.

    fn elapsed_ns(&self) -> u64 {
        self.start_time
            .get()
            .map(|t| t.instant.elapsed().as_nanos() as u64)
            .unwrap_or(0)
    }

    pub(crate) fn start_time_array(&self) -> [u64; 2] {
        self.start_time
            .get()
            .map(|t| {
                let duration = t
                    .system_time
                    .duration_since(std::time::UNIX_EPOCH)
                    .unwrap_or_default();
                [duration.as_secs(), duration.subsec_nanos() as u64]
            })
            .unwrap_or([0, 0])
    }

Thread registration

When a thread first attempts to record an event, it must register itself. This creates the thread’s ring buffer, adds the consumer end to the shared consumers vector, and assigns a unique worker ID.

    fn ensure_thread_registered(&self) {
        LOCAL_STATE.with(|state| {
            let mut producer = state.producer.borrow_mut();
            if producer.is_none() {
                let (prod, cons) = RingBuffer::new(self.config.buffer_capacity);
                *producer = Some(prod);
                self.consumers.lock().unwrap().push(cons);

                let id = WORKER_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
                state.worker_id.set(id);
            }
        });
    }

Event pushing

try_push_event method is the hot path. It:

  1. Checks if collection is active (early exit if not)

  2. Fetches + increments the total_events counter atomically

  3. Stops collection if max_events is reached

  4. Pushes the event to the thread-local ring buffer

    fn try_push_event(&self, event: Event) -> bool {
        if !self.is_active() {
            return false;
        }

        let count = self.total_events.fetch_add(1, Ordering::Relaxed);
        if count >= self.config.max_events {
            self.stop();
            return false;
        }

        LOCAL_STATE.with(|state| {
            if let Some(producer) = state.producer.borrow_mut().as_mut() {
                let _ = producer.push(event);
            }
        });

        true
    }

Hook handlers

These methods are called by Tokio’s runtime hooks. Each hook handler:

  1. Checks if the relevant trace target is enabled
  2. Ensures the thread is registered (for spawn/start events)
  3. Constructs and pushes the appropriate event

Task hooks:

    #[cfg(tokio_unstable)]
    pub(crate) fn on_task_spawn(&self, meta: &tokio::runtime::TaskMeta<'_>) {
        if !self.config.trace_target.tracks_tasks() {
            return;
        }

        self.ensure_thread_registered();

        let location = if self.config.include_spawn_location {
            let loc = meta.spawned_at();
            Some(SpawnLocation {
                file: loc.file().to_string(),
                line: loc.line(),
                column: loc.column(),
            })
        } else {
            None
        };

        self.try_push_event(Event::TaskSpawn {
            task_id: task_id_to_u64(meta.id()),
            time_ns: self.elapsed_ns(),
            location,
        });
    }

    #[cfg(tokio_unstable)]
    pub(crate) fn on_before_task_poll(&self, meta: &tokio::runtime::TaskMeta<'_>) {
        if !self.config.trace_target.tracks_tasks() {
            return;
        }
        self.try_push_event(Event::TaskPollStart {
            task_id: task_id_to_u64(meta.id()),
            time_ns: self.elapsed_ns(),
        });
    }

    #[cfg(tokio_unstable)]
    pub(crate) fn on_after_task_poll(&self, meta: &tokio::runtime::TaskMeta<'_>) {
        if !self.config.trace_target.tracks_tasks() {
            return;
        }
        self.try_push_event(Event::TaskPollEnd {
            task_id: task_id_to_u64(meta.id()),
            time_ns: self.elapsed_ns(),
        });
    }

    #[cfg(tokio_unstable)]
    pub(crate) fn on_task_terminate(&self, meta: &tokio::runtime::TaskMeta<'_>) {
        if !self.config.trace_target.tracks_tasks() {
            return;
        }
        self.try_push_event(Event::TaskTerminate {
            task_id: task_id_to_u64(meta.id()),
            time_ns: self.elapsed_ns(),
        });
    }

Worker thread hooks:

    fn get_worker_id(&self) -> u64 {
        LOCAL_STATE.with(|state| state.worker_id.get())
    }
    pub(crate) fn on_thread_start(&self) {
        self.ensure_thread_registered();
        if !self.config.trace_target.tracks_workers() {
            return;
        }
        self.try_push_event(Event::WorkerStart {
            worker_id: self.get_worker_id(),
            time_ns: self.elapsed_ns(),
        });
    }

    pub(crate) fn on_thread_stop(&self) {
        if !self.config.trace_target.tracks_workers() {
            return;
        }
        self.try_push_event(Event::WorkerStop {
            worker_id: self.get_worker_id(),
            time_ns: self.elapsed_ns(),
        });
    }

    pub(crate) fn on_thread_park(&self) {
        if !self.config.trace_target.tracks_workers() {
            return;
        }
        self.try_push_event(Event::WorkerPark {
            worker_id: self.get_worker_id(),
            time_ns: self.elapsed_ns(),
        });
    }

    pub(crate) fn on_thread_unpark(&self) {
        if !self.config.trace_target.tracks_workers() {
            return;
        }
        self.try_push_event(Event::WorkerUnpark {
            worker_id: self.get_worker_id(),
            time_ns: self.elapsed_ns(),
        });
    }

Draining events

After collection stops, drain_events aggregates all events from every thread’s ring buffer and sorts them by timestamp. This is the cold path, we don’t mind the cost of locking and sorting here.

    pub fn drain_events(&self) -> Vec<Event> {
        let mut all_events = Vec::new();
        let mut consumers = self.consumers.lock().unwrap();

        for consumer in consumers.iter_mut() {
            while let Ok(event) = consumer.pop() {
                all_events.push(event);
            }
        }

        all_events.sort_by_key(|e| e.time_ns());
        all_events
    }

Event to data transformation

The events_to_data method transforms raw events into Datum and TagDefinition instances. This is where we handle the state machine logic. For task events the mapping is:

  • TaskSpawn -> Spawned (with optional spawn-location tag)

  • TaskPollStart -> Running

  • TaskPollEnd -> Waiting (unless already terminated)

  • TaskTerminate -> Completed

For worker events the mapping is:

  • WorkerStart / WorkerUnpark -> Running

  • WorkerPark -> Parked

  • WorkerStop -> Stopped

    pub fn events_to_data(&self, events: &[Event]) -> StatemapData {
        let mut task_data = Vec::new();
        let mut worker_data = Vec::new();
        let mut task_tags = Vec::new();
        let mut seen_locations = HashSet::new();
        let mut terminated_tasks = HashSet::new();

        for event in events {
            match event {
                Event::TaskSpawn {
                    task_id,
                    time_ns,
                    location,
                } => {
                    let mut datum = Datum::new(
                        *time_ns,
                        format!("task-{}", task_id),
                        TaskState::Spawned.as_u32(),
                    );
                    if let Some(loc) = location {
                        let loc_str = loc.to_string();
                        datum = datum.with_tag(loc_str.clone());
                        if seen_locations.insert(loc_str.clone()) {
                            let tag_def =
                                TagDefinition::new(TaskState::Spawned.as_u32(), loc_str.clone())
                                    .with_property("file".to_string(), loc.file.clone())
                                    .with_property("location".to_string(), loc_str);
                            task_tags.push(tag_def);
                        }
                    }
                    task_data.push(datum);
                }
                Event::TaskPollStart { task_id, time_ns } => {
                    task_data.push(Datum::new(
                        *time_ns,
                        format!("task-{}", task_id),
                        TaskState::Running.as_u32(),
                    ));
                }
                // If polling the task returned Poll::Ready, the task-terminate event will
                // fire before task-poll-end for that poll. Otherwise, if task-terminate
                // does not fire, the task is still pending.
                // We track terminated tasks to avoid emitting a Waiting state after Completed.
                Event::TaskPollEnd { task_id, time_ns } => {
                    if !terminated_tasks.contains(task_id) {
                        task_data.push(Datum::new(
                            *time_ns,
                            format!("task-{}", task_id),
                            TaskState::Waiting.as_u32(),
                        ));
                    }
                }
                Event::TaskTerminate { task_id, time_ns } => {
                    task_data.push(Datum::new(
                        *time_ns,
                        format!("task-{}", task_id),
                        TaskState::Completed.as_u32(),
                    ));
                    terminated_tasks.insert(*task_id);
                }
                Event::WorkerStart { worker_id, time_ns }
                | Event::WorkerUnpark { worker_id, time_ns } => {
                    worker_data.push(Datum::new(
                        *time_ns,
                        format!("worker-{}", worker_id),
                        WorkerState::Running.as_u32(),
                    ));
                }
                Event::WorkerStop { worker_id, time_ns } => {
                    worker_data.push(Datum::new(
                        *time_ns,
                        format!("worker-{}", worker_id),
                        WorkerState::Stopped.as_u32(),
                    ));
                }
                Event::WorkerPark { worker_id, time_ns } => {
                    worker_data.push(Datum::new(
                        *time_ns,
                        format!("worker-{}", worker_id),
                        WorkerState::Parked.as_u32(),
                    ));
                }
            }
        }

        StatemapData {
            task_data,
            worker_data,
            task_tags,
        }
    }
}

Writing output

The events_to_data method returns a StatemapData struct that holds the transformed data:

pub struct StatemapData {
    task_data: Vec<Datum>,
    worker_data: Vec<Datum>,
    task_tags: Vec<TagDefinition>,
}
impl StatemapData {

StatemapData provides methods to write the collected data as concatenated JSON. Each method writes metadata first, followed by tag definitions (for tasks), then the data payloads:

    pub fn write_tasks<W: std::io::Write>(
        &self,
        collector: &Collector,
        writer: &mut W,
    ) -> Result<(), Error> {
        let metadata = Metadata::for_tasks(collector.start_time_array());
        serde_json::to_writer(&mut *writer, &metadata)?;
        writeln!(writer)?;

        for tag in &self.task_tags {
            serde_json::to_writer(&mut *writer, tag)?;
            writeln!(writer)?;
        }

        for datum in &self.task_data {
            serde_json::to_writer(&mut *writer, datum)?;
            writeln!(writer)?;
        }

        writer.flush()?;
        Ok(())
    }

    pub fn write_workers<W: std::io::Write>(
        &self,
        collector: &Collector,
        writer: &mut W,
    ) -> Result<(), Error> {
        let metadata = Metadata::for_workers(collector.start_time_array());
        serde_json::to_writer(&mut *writer, &metadata)?;
        writeln!(writer)?;

        for datum in &self.worker_data {
            serde_json::to_writer(&mut *writer, datum)?;
            writeln!(writer)?;
        }

        writer.flush()?;
        Ok(())
    }

Convenience methods return the output as byte vectors:

    pub fn tasks_to_bytes(&self, collector: &Collector) -> Result<Vec<u8>, Error> {
        let mut buf = Vec::new();
        self.write_tasks(collector, &mut buf)?;
        Ok(buf)
    }

    pub fn workers_to_bytes(&self, collector: &Collector) -> Result<Vec<u8>, Error> {
        let mut buf = Vec::new();
        self.write_workers(collector, &mut buf)?;
        Ok(buf)
    }
}

Tying it together

The errors we are tracking are:

#[derive(Debug, thiserror::Error)]
pub enum Error {
    #[error("tokio-statemap requires `RUSTFLAGS=\"--cfg tokio_unstable\"`")]
    UnstableFeaturesRequired,
    /// `tokio-statemap` hooks were not registered as the layout of Tokio's
    /// [`tokio::task::Id`] type has changed. See [`check_casts`] for details.
    #[error(transparent)]
    InvalidCasts(#[from] InvalidCasts),
    #[error("IO error: {0}")]
    Io(#[from] std::io::Error),
    #[error("JSON error: {0}")]
    Json(#[from] serde_json::Error),
}

Finally, the register_hooks function ties everything together. It verifies the task ID cast is safe, creates the collector, and wires up all the Tokio runtime hooks.

pub fn register_hooks(
    builder: &mut tokio::runtime::Builder,
    config: CollectorConfig,
) -> Result<Arc<Collector>, Error> {
    #[cfg(tokio_unstable)]
    {
        check_casts()?;
        let c = Arc::new(Collector::new(config));

        let task_hook = |f: fn(&Collector, &tokio::runtime::TaskMeta)| {
            let c = c.clone();
            move |meta: &tokio::runtime::TaskMeta| f(&c, meta)
        };

        let thread_hook = |f: fn(&Collector)| {
            let c = c.clone();
            move || f(&c)
        };

        builder.on_task_spawn(task_hook(Collector::on_task_spawn));
        builder.on_before_task_poll(task_hook(Collector::on_before_task_poll));
        builder.on_after_task_poll(task_hook(Collector::on_after_task_poll));
        builder.on_task_terminate(task_hook(Collector::on_task_terminate));
        builder.on_thread_start(thread_hook(Collector::on_thread_start));
        builder.on_thread_stop(thread_hook(Collector::on_thread_stop));
        builder.on_thread_park(thread_hook(Collector::on_thread_park));
        builder.on_thread_unpark(thread_hook(Collector::on_thread_unpark));

        Ok(c)
    }
    #[cfg(not(tokio_unstable))]
    {
        Err(Error::UnstableFeaturesRequired)
    }
}