| tokio-statemapVisualize state transitions across Tokio tasks and worker threads | Author | abhirag | |
|---|---|---|---|
| License | MIT | ||
| Version | 0.0.1 | Source | GitHub |
tokio-statemap
Library for instrumenting Rust programs to collect Tokio’s async execution traces as statemaps.
Table of Contents
Overview
Having worked with Async Rust in anger and at scale, I hated performance regressions more than anything else. I know the rule is:
Async code should never spend a long time without reaching an
.await
but when things aren’t going that well in production land and you have more Tokio tasks than sense, figuring out where this invariant is being broken is nigh impossible. There were times when I just wanted to pry open the Tokio runtime to see:
- Which tasks were blocked, stuck waiting instead of making progress
- Whether the worker threads were busy or mostly parked while tasks sat blocked
- Any rhyme or reason to this madness, patterns that were impossible to see in logs or traces
I’ve looked up to Bryan Cantrill and Eliza Weisman’s work for years, so when I came across how Oxide was using statemaps to visualize Tokio runtime behavior, it felt like the missing piece. But Oxide uses DTrace to collect the required events which is not a viable option in most containerized environments I’ve worked in. I needed something that worked in scratch containers with zero dependencies, and this library is the result of that.
What follows is the complete library as a literate program:
Statemap
A software visualization in which time is on the X axis and timelines for discrete entities are stacked on the Y axis, with different states for the discrete entities rendered in different colors.
Generating a statemap consists of two steps: instrumentation and rendering. The result is a SVG that can be visualized with a SVG viewer (e.g., a web browser), allowing interaction.
Data format
To generate data for statemap generation, instrumentation should create a file that consists of a stream of concatenated JSON. The expectation is that one JSON payload will consist of metadata, with many JSON payloads containing data.
Metadata
The following metadata fields are required:
-
startA two-element array of integers consisting of the start time of the data in seconds (the 0th element) and nanoseconds within the second (the 1st element). The start time should be expressed in UTC -
statesAn object in which each member is the name of a valid entity state. Each member object can contain the following:-
valueThe value by which this state will be referred to in the data stream -
colorThe color that should be used to render the state. If the color is not specified, a color will be selected at random during rendering
# [ derive ( Serialize , Debug )] struct State { value : usize , # [ serde ( skip_serializing_if = "Option::is_none" )] color : Option < String >, } For example, here is a valid states object:
"states" : {"running" : {"value" :1 ,"color" :"#DC267F" },"waiting" : {"value" :2 ,"color" :"#FE6100" },"completed" : {"value" :3 ,"color" :"#648FFF" },"spawned" : {"value" :0 ,"color" :"#FFB000" } } -
In addition, the metadata can contain the following optional fields:
-
titleThe title of the statemap, such that it can meaningfully be in the clause “statemap of title activity” -
hostThe host on which the data was gatheredfn get_hostname () ->Option < String > { hostname:: get (). ok (). and_then ( |h| h. into_string (). ok ()) } -
entityKindThe entity under observation
The final struct thus becomes:
# [ derive ( Serialize , Debug )]
# [ allow ( non_snake_case )]
pub ( crate ) struct Metadata {
start : [ u64 ; 2 ],
# [ serde ( skip_serializing_if = "Option::is_none" )]
title : Option < String >,
# [ serde ( skip_serializing_if = "Option::is_none" )]
host : Option < String >,
# [ serde ( skip_serializing_if = "Option::is_none" )]
entityKind : Option < String >,
states : HashMap < String , State >,
}
For example valid metadata json looks like:
{
"start" : [
1765405854 ,
712163000
],
"title" : "Tokio Task Execution" ,
"host" : "abhirags-MacBook-Air.local" ,
"entityKind" : "Task" ,
"states" : {
"spawned" : {
"value" : 0 ,
"color" : "#FFB000"
},
"waiting" : {
"value" : 2 ,
"color" : "#FE6100"
},
"completed" : {
"value" : 3 ,
"color" : "#648FFF"
},
"running" : {
"value" : 1 ,
"color" : "#DC267F"
}
}
}
Given the tokio task states we are tracking are:
# [ derive ( Debug )]
# [ repr ( u32 )]
pub ( crate ) enum TaskState {
Spawned = 0 ,
Running = 1 ,
Waiting = 2 ,
Completed = 3 ,
}
impl TaskState {
pub ( crate ) fn as_u32 ( self ) -> u32 {
self as u32
}
}
Let’s create initializers for the metadata struct for tokio tasks:
impl Metadata {
pub ( crate ) fn for_tasks ( start : [ u64 ; 2 ]) -> Self {
let mut states = HashMap :: new ();
states. insert (
"spawned" . to_string (),
State {
value : TaskState :: Spawned as usize ,
color : Some ( "#FFB000" . to_string ()),
},
);
states. insert (
"running" . to_string (),
State {
value : TaskState :: Running as usize ,
color : Some ( "#DC267F" . to_string ()),
},
);
states. insert (
"waiting" . to_string (),
State {
value : TaskState :: Waiting as usize ,
color : Some ( "#FE6100" . to_string ()),
},
);
states. insert (
"completed" . to_string (),
State {
value : TaskState :: Completed as usize ,
color : Some ( "#648FFF" . to_string ()),
},
);
Self {
start,
title : Some ( "Tokio Task Execution" . to_string ()),
host : get_hostname (),
entityKind : Some ( "Task" . to_string ()),
states,
}
}
}
and for tokio worker threads, given the states we are tracking are:
# [ derive ( Debug )]
# [ repr ( u32 )]
pub ( crate ) enum WorkerState {
Running = 0 ,
Parked = 1 ,
Stopped = 2 ,
}
impl WorkerState {
pub ( crate ) fn as_u32 ( self ) -> u32 {
self as u32
}
}
initializer for metadata struct becomes:
impl Metadata {
pub ( crate ) fn for_workers ( start : [ u64 ; 2 ]) -> Self {
let mut states = HashMap :: new ();
states. insert (
"running" . to_string (),
State {
value : WorkerState :: Running as usize ,
color : Some ( "#DC267F" . to_string ()),
},
);
states. insert (
"parked" . to_string (),
State {
value : WorkerState :: Parked as usize ,
color : Some ( "#FE6100" . to_string ()),
},
);
states. insert (
"stopped" . to_string (),
State {
value : WorkerState :: Stopped as usize ,
color : Some ( "#648FFF" . to_string ()),
},
);
Self {
start,
title : Some ( "Tokio Worker Threads" . to_string ()),
host : get_hostname (),
entityKind : Some ( "Worker" . to_string ()),
states,
}
}
}
Data
The data for a statemap is provided following the metadata as concatenated JSON (that is, each JSON payload is a datum). Each datum is a JSON object that must contain the following members:
-
entityThe name of the entity. -
timeThe time of the datum, expressed as a nanosecond offset from the start member present in the metadata. -
stateThe value of the state that begins at the time of the datum.
Each datum may also contain an additional member:
tag The tag for the state. See State tagging, below.
# [ derive ( Serialize , Debug )]
pub struct Datum {
time : String ,
entity : String ,
state : u32 ,
# [ serde ( skip_serializing_if = "Option::is_none" )]
tag : Option < String >,
}
impl Datum {
pub ( crate ) fn new ( time_ns : u64 , entity : String , state : u32 ) -> Self {
Self {
time : time_ns. to_string (),
entity,
state,
tag : None ,
}
}
pub ( crate ) fn with_tag ( mut self , tag : String ) -> Self {
self . tag = Some ( tag);
self
}
}
State tagging
It is often helpful to examine additional dimensionality within a particular state or states. For example, in understanding lifecycles of numerous tokio tasks, it is helpful to track the location in code they were spawned at. To facilitate this, statemaps support state tagging whereby an immutable tag is associated with a particular transition to a particular state. There can be an arbitrary number of such tags, but the expectation is that there are many more state transitions than there are tags. Tags are indicated by the tag member of the state datum payload. Elsewhere in the stream of data (though not necessarily before the tag is used), the tag should be defined with a tag-defining JSON payload that contains the following two members:
-
tagA string that is the tag that is being defined. -
stateThe state that corresponds to this tag. Each state/tag tuple must have its own tag definition.
Beyond these two members, the tag definition can have any number of scalar members. Tags are immutable; if a tag is redefined, the last tag definition will apply to all uses of that tag.
The tag should not contain member definitions that would cause it to be ambiguous with respect to data (namely, entity and time members).
# [ derive ( Serialize , Debug )]
pub struct TagDefinition {
state : u32 ,
tag : String ,
# [ serde ( flatten )]
properties : HashMap < String , serde_json:: Value >,
}
impl TagDefinition {
pub ( crate ) fn new ( state : u32 , tag : String ) -> Self {
Self {
state,
tag,
properties : HashMap :: new (),
}
}
pub ( crate ) fn with_property (
mut self ,
key : String ,
value : impl Into < serde_json:: Value >,
) -> Self {
self . properties . insert ( key, value. into ());
self
}
}
As an example, here is a tag definition for tokio task spawned state that tracks the location in code the task was spawned at:
{"state" :0 ,"tag" :"examples/basic.rs:31:20" ,"location" :"examples/basic.rs:31:20" ,"file" :"examples/basic.rs" }
And here is an example of a tagged state datum:
{"time" :"224000" ,"entity" :"task-5" ,"state" :0 ,"tag" :"examples/basic.rs:31:20" }
This would indicate that at time 224000, entity task-5 went into state 0 (spawned) – and the tag for this state (in this case, the spawn location) was examples/basic.rs:31:20.
Collector
Architecture
The core event collection logic is:
-
We tap into Tokio’s runtime hooks to collect lifecycle events
-
Each thread writes its events to a single-producer single-consumer (SPSC) ring buffer it owns. Reading from and writing into this ring buffer is lock-free and wait-free
-
Processing of events is done after collection stops. This separates the hot path (collection) from cold path (processing). This includes:
- Draining all buffers
- Sorting events by timestamp
- Transforming events into statemap format
Thread local state
Let’s get into the weeds then. First, we need to identify worker threads by a u64 ID but the as_u64 method on ThreadId is yet
to be stabilized so we’ll assign these IDs ourselves. We keep track of the next ID to be assigned in:
static WORKER_ID_COUNTER : AtomicU64 = AtomicU64 :: new ( 1 );
We’ll keep some thread local state for every worker thread:
-
The producer side of
rtrb::RingBufferto push events
-
worker_id
struct ThreadLocalState {
producer : RefCell < Option < Producer < Event >>>,
worker_id : Cell < u64 >,
}
impl ThreadLocalState {
const fn new () -> Self {
Self {
producer : RefCell :: new ( None ),
worker_id : Cell :: new ( 0 ),
}
}
}
thread_local! {
static LOCAL_STATE : ThreadLocalState = ThreadLocalState :: new ();
}
Configuration
Now let’s get the configuration out of the way, configurable collector params are:
-
trace_targetWhether we want to track lifecycles of tokio tasks, worker threads, both or none
# [ derive ( Debug , Default )]
pub enum TraceTarget {
Tasks ,
Workers ,
# [ default ]
Both ,
None ,
}
impl TraceTarget {
pub fn tracks_tasks ( & self ) -> bool {
matches! ( self , TraceTarget :: Tasks | TraceTarget :: Both )
}
pub fn tracks_workers ( & self ) -> bool {
matches! ( self , TraceTarget :: Workers | TraceTarget :: Both )
}
}
-
buffer_capacityMax capacity of each ring buffer (per thread) -
max_eventsMax total number of events to collect (globally) -
include_spawn_locationWhether to capture location in code where each task was spawned
# [ derive ( Debug , Clone )]
pub struct SpawnLocation {
file : String ,
line : u32 ,
column : u32 ,
}
impl std:: fmt:: Display for SpawnLocation {
fn fmt ( & self , f : & mut std:: fmt:: Formatter ) -> std:: fmt:: Result {
write! ( f, "{}:{}:{}" , self . file , self . line , self . column )
}
}
CollectorConfig thus becomes:
# [ derive ( Debug )]
pub struct CollectorConfig {
pub buffer_capacity : usize ,
pub max_events : usize ,
pub trace_target : TraceTarget ,
pub include_spawn_location : bool ,
}
impl Default for CollectorConfig {
fn default () -> Self {
Self {
buffer_capacity : 4096 ,
max_events : 100000 ,
trace_target : TraceTarget :: Both ,
include_spawn_location : true ,
}
}
}
Events
We tap into Tokio’s runtime hooks to capture the following events:
-
For tokio tasks:
-
TaskSpawnusingon_task_spawn -
TaskPollStartusingon_before_task_poll -
TaskPollEndusingon_after_task_poll -
TaskTerminateusingon_task_terminate
-
-
For worker threads:
-
WorkerStartusingon_thread_start -
WorkerParkusingon_thread_park -
WorkerUnparkusingon_thread_unpark -
WorkerStopusingon_thread_stop
-
Every event includes:
-
time_nsNanoseconds elapsed since collection started until the event occurred -
u64 id-
worker_id(introduced in Thread local state) for events corresponding to worker threads -
task_idfor events corresponding to tokio tasks
-
Additionally the TaskSpawn event may also contain task’s SpawnLocation
# [ derive ( Debug )]
pub enum Event {
TaskSpawn {
task_id : u64 ,
time_ns : u64 ,
location : Option < SpawnLocation >,
},
TaskPollStart {
task_id : u64 ,
time_ns : u64 ,
},
TaskPollEnd {
task_id : u64 ,
time_ns : u64 ,
},
TaskTerminate {
task_id : u64 ,
time_ns : u64 ,
},
WorkerStart {
worker_id : u64 ,
time_ns : u64 ,
},
WorkerStop {
worker_id : u64 ,
time_ns : u64 ,
},
WorkerPark {
worker_id : u64 ,
time_ns : u64 ,
},
WorkerUnpark {
worker_id : u64 ,
time_ns : u64 ,
},
}
impl Event {
pub fn time_ns ( & self ) -> u64 {
match self {
Event :: TaskSpawn { time_ns, .. }
| Event :: TaskPollStart { time_ns, .. }
| Event :: TaskPollEnd { time_ns, .. }
| Event :: TaskTerminate { time_ns, .. }
| Event :: WorkerStart { time_ns, .. }
| Event :: WorkerStop { time_ns, .. }
| Event :: WorkerPark { time_ns, .. }
| Event :: WorkerUnpark { time_ns, .. } => * time_ns,
}
}
}
Tokio task id
When we tap into Tokio’s runtime hooks to capture events related to tasks we are passed TaskMeta
which contains tokio::task::Id. Unfortunately tokio::task::Id conversion to u64 is still an open issue so we use tokio-dtrace’s approach.
# [ derive ( Debug , thiserror :: Error )]
# [ error (
"\
tokio-statemap: POTENTIALLY UNSOUND CAST DETECTED!\n \
size_of::<tokio::task::Id>() = {id_size}\n \
size_of::<NonZeroU64>() = {nonzero_u64_size}\n \
align_of::<tokio::task::Id>() = {id_align}\n \
align_of::<NonZeroU64>() = {nonzero_u64_align}\n\
"
)]
pub struct InvalidCasts {
id_size : usize ,
nonzero_u64_size : usize ,
id_align : usize ,
nonzero_u64_align : usize ,
}
/// Checks that unsafe casts performed by `tokio-statemap` are valid.
///
/// `tokio-statemap` relies on the ability to cast a [`tokio::task::Id`] to a
/// [`u64`] in order to pass a task ID as an integer.
/// This function checks that the sizes and alignments of the types are compatible. If
/// they are not, it returns an error, indicating that the casts are potentially
/// unsound. This may occur if Tokio has changed the representation of the
/// [`tokio::task::Id`] type, which is unlikely, but always possible.
///
/// If this function returns an error, `tokio-statemap`'s runtime hooks should not
/// be used. Registering hooks using the [`register_hooks`] function will call
/// this function prior to registering the runtime hooks, and will fail to do so
/// if casts are unsound.
/// reference: https://github.com/oxidecomputer/tokio-dtrace/blob/91f72de4510298aec4a3772d4eaef6426ec2f85e/src/lib.rs#L266
pub fn check_casts () -> Result <(), InvalidCasts > {
use std:: mem::{ align_of, size_of};
let id_size = size_of ::< tokio:: task:: Id >();
let nonzero_u64_size = size_of ::< NonZeroU64 >();
let id_align = align_of ::< tokio:: task:: Id >();
let nonzero_u64_align = align_of ::< NonZeroU64 >();
if id_size != nonzero_u64_size || id_align != nonzero_u64_align {
Err ( InvalidCasts {
id_size,
nonzero_u64_size,
id_align,
nonzero_u64_align,
})
} else {
Ok (())
}
}
// reference: https://github.com/oxidecomputer/tokio-dtrace/blob/91f72de4510298aec4a3772d4eaef6426ec2f85e/src/lib.rs#L461
# [ cfg ( tokio_unstable )]
fn task_id_to_u64 ( id : tokio:: task:: Id ) -> u64 {
unsafe {
// SAFETY: `check_casts()` has verified that `tokio::task::Id` has the same
// size and alignment as `NonZeroU64`. This is called before hooks are registered.
union TrustMeOnThis {
id : tokio:: task:: Id ,
int : NonZeroU64 ,
}
TrustMeOnThis { id }. int . get ()
}
}
# [ cfg ( test )]
mod tests {
# [ test ]
fn casts_are_valid () {
crate :: check_casts (). unwrap ();
}
}
Start time
Captured once when collection starts and used throughout the collection lifetime:
-
system_timeCaptures the wall-clock time (UTC) when collection started, used to populate the statemap metadata’sstartfield -
instantUsed for measuring elapsed time during collection. UnlikeSystemTime, it is monotonically non-decreasing which makes it suitable for computing precise durations between events
We use fastant::Instant, a drop-in replacement for std::time::Instant with lower overhead on x86/x86_64 Linux due to use of TSC (Time Stamp Counter).
struct StartTime {
system_time : SystemTime ,
instant : fastant:: Instant ,
}
Collector definition
Event collection handle shared across all worker threads via Arc<Collector>:
-
configCollection parameters. See Configuration -
activeWhether collection is running, checked on every event push -
total_eventsGlobal event counter. When it hitsmax_events, collection stops automatically -
start_timeCaptured once when collection starts. See Start time -
consumersAggregates the consumer end of each thread’s ring buffer. Threads register dynamically during collection, when a thread first pushes an event it creates its ring buffer and adds the consumer here. Multiple threads can register concurrently, so theVecneeds synchronized access for pushing, hence theMutex.
pub struct Collector {
config : CollectorConfig ,
active : AtomicBool ,
total_events : AtomicUsize ,
start_time : OnceLock < StartTime >,
consumers : Mutex < Vec < Consumer < Event >>>,
}
Collector implementation
Now let’s look at Collector’s methods in the order they’re used during the collection lifecycle:
impl Collector {
Construction
We construct the collector with the provided config with inactive state, 0 total_events, start_time behind OnceLock so that it can be initialized lazily when
collection starts and an empty vector to keep the consumer end of each thread’s ring buffer:
pub ( crate ) fn new ( config : CollectorConfig ) -> Self {
Self {
config,
active : AtomicBool :: new ( false ),
total_events : AtomicUsize :: new ( 0 ),
start_time : OnceLock :: new (),
consumers : Mutex :: new ( Vec :: new ()),
}
}
Starting and stopping collection
Collection is controlled through start and stop methods. Starting collection:
-
Initializes
start_timeif not already set (viaOnceLock) -
Resets the
total_eventscounter -
Sets the
activeflag totrue
pub fn start ( & self ) {
self . start_time . get_or_init ( || StartTime {
system_time : SystemTime :: now (),
instant : fastant:: Instant :: now (),
});
self . total_events . store ( 0 , Ordering :: Relaxed );
self . active . store ( true , Ordering :: Relaxed );
}
Stopping collection just sets the active flag to false:
pub fn stop ( & self ) {
self . active . store ( false , Ordering :: Relaxed );
}
Checked before pushing every event:
fn is_active ( & self ) -> bool {
self . active . load ( Ordering :: Relaxed )
}
Time helpers
Two methods handle time:
-
elapsed_nscomputes nanoseconds since collection started (used for event timestamps) -
start_time_arrayconverts the wall-clock start time to the[seconds, nanoseconds]format required by statemap metadata.
fn elapsed_ns ( & self ) -> u64 {
self . start_time
. get ()
. map ( |t| t. instant . elapsed (). as_nanos () as u64 )
. unwrap_or ( 0 )
}
pub ( crate ) fn start_time_array ( & self ) -> [ u64 ; 2 ] {
self . start_time
. get ()
. map ( |t| {
let duration = t
. system_time
. duration_since ( std:: time:: UNIX_EPOCH )
. unwrap_or_default ();
[ duration. as_secs (), duration. subsec_nanos () as u64 ]
})
. unwrap_or ([ 0 , 0 ])
}
Thread registration
When a thread first attempts to record an event, it must register itself. This creates the thread’s ring buffer, adds the consumer end to the shared consumers vector, and assigns a unique worker ID.
fn ensure_thread_registered ( & self ) {
LOCAL_STATE . with ( |state| {
let mut producer = state. producer . borrow_mut ();
if producer. is_none () {
let ( prod, cons) = RingBuffer :: new ( self . config . buffer_capacity );
* producer = Some ( prod);
self . consumers . lock (). unwrap (). push ( cons);
let id = WORKER_ID_COUNTER . fetch_add ( 1 , Ordering :: Relaxed );
state. worker_id . set ( id);
}
});
}
Event pushing
try_push_event method is the hot path. It:
-
Checks if collection is active (early exit if not)
-
Fetches + increments the
total_eventscounter atomically -
Stops collection if
max_eventsis reached -
Pushes the event to the thread-local ring buffer
fn try_push_event ( & self , event : Event ) -> bool {
if !self . is_active () {
return false ;
}
let count = self . total_events . fetch_add ( 1 , Ordering :: Relaxed );
if count >= self . config . max_events {
self . stop ();
return false ;
}
LOCAL_STATE . with ( |state| {
if let Some ( producer) = state. producer . borrow_mut (). as_mut () {
let _ = producer. push ( event);
}
});
true
}
Hook handlers
These methods are called by Tokio’s runtime hooks. Each hook handler:
- Checks if the relevant trace target is enabled
- Ensures the thread is registered (for spawn/start events)
- Constructs and pushes the appropriate event
Task hooks:
# [ cfg ( tokio_unstable )]
pub ( crate ) fn on_task_spawn ( & self , meta : & tokio:: runtime:: TaskMeta < ' _ >) {
if !self . config . trace_target . tracks_tasks () {
return ;
}
self . ensure_thread_registered ();
let location = if self . config . include_spawn_location {
let loc = meta. spawned_at ();
Some ( SpawnLocation {
file : loc. file (). to_string (),
line : loc. line (),
column : loc. column (),
})
} else {
None
};
self . try_push_event ( Event :: TaskSpawn {
task_id : task_id_to_u64 ( meta. id ()),
time_ns : self . elapsed_ns (),
location,
});
}
# [ cfg ( tokio_unstable )]
pub ( crate ) fn on_before_task_poll ( & self , meta : & tokio:: runtime:: TaskMeta < ' _ >) {
if !self . config . trace_target . tracks_tasks () {
return ;
}
self . try_push_event ( Event :: TaskPollStart {
task_id : task_id_to_u64 ( meta. id ()),
time_ns : self . elapsed_ns (),
});
}
# [ cfg ( tokio_unstable )]
pub ( crate ) fn on_after_task_poll ( & self , meta : & tokio:: runtime:: TaskMeta < ' _ >) {
if !self . config . trace_target . tracks_tasks () {
return ;
}
self . try_push_event ( Event :: TaskPollEnd {
task_id : task_id_to_u64 ( meta. id ()),
time_ns : self . elapsed_ns (),
});
}
# [ cfg ( tokio_unstable )]
pub ( crate ) fn on_task_terminate ( & self , meta : & tokio:: runtime:: TaskMeta < ' _ >) {
if !self . config . trace_target . tracks_tasks () {
return ;
}
self . try_push_event ( Event :: TaskTerminate {
task_id : task_id_to_u64 ( meta. id ()),
time_ns : self . elapsed_ns (),
});
}
Worker thread hooks:
fn get_worker_id ( & self ) -> u64 {
LOCAL_STATE . with ( |state| state. worker_id . get ())
}
pub ( crate ) fn on_thread_start ( & self ) {
self . ensure_thread_registered ();
if !self . config . trace_target . tracks_workers () {
return ;
}
self . try_push_event ( Event :: WorkerStart {
worker_id : self . get_worker_id (),
time_ns : self . elapsed_ns (),
});
}
pub ( crate ) fn on_thread_stop ( & self ) {
if !self . config . trace_target . tracks_workers () {
return ;
}
self . try_push_event ( Event :: WorkerStop {
worker_id : self . get_worker_id (),
time_ns : self . elapsed_ns (),
});
}
pub ( crate ) fn on_thread_park ( & self ) {
if !self . config . trace_target . tracks_workers () {
return ;
}
self . try_push_event ( Event :: WorkerPark {
worker_id : self . get_worker_id (),
time_ns : self . elapsed_ns (),
});
}
pub ( crate ) fn on_thread_unpark ( & self ) {
if !self . config . trace_target . tracks_workers () {
return ;
}
self . try_push_event ( Event :: WorkerUnpark {
worker_id : self . get_worker_id (),
time_ns : self . elapsed_ns (),
});
}
Draining events
After collection stops, drain_events aggregates all events from every thread’s ring buffer and sorts them by timestamp. This is the cold path, we don’t
mind the cost of locking and sorting here.
pub fn drain_events ( & self ) -> Vec < Event > {
let mut all_events = Vec :: new ();
let mut consumers = self . consumers . lock (). unwrap ();
for consumer in consumers. iter_mut () {
while let Ok ( event) = consumer. pop () {
all_events. push ( event);
}
}
all_events. sort_by_key ( |e| e. time_ns ());
all_events
}
Event to data transformation
The events_to_data method transforms raw events into Datum and TagDefinition instances. This is where we handle the state machine logic. For task events the mapping is:
-
TaskSpawn->Spawned(with optional spawn-location tag) -
TaskPollStart->Running -
TaskPollEnd->Waiting(unless already terminated) -
TaskTerminate->Completed
For worker events the mapping is:
-
WorkerStart/WorkerUnpark->Running -
WorkerPark->Parked -
WorkerStop->Stopped
pub fn events_to_data ( & self , events : & [ Event ]) -> StatemapData {
let mut task_data = Vec :: new ();
let mut worker_data = Vec :: new ();
let mut task_tags = Vec :: new ();
let mut seen_locations = HashSet :: new ();
let mut terminated_tasks = HashSet :: new ();
for event in events {
match event {
Event :: TaskSpawn {
task_id,
time_ns,
location,
} => {
let mut datum = Datum :: new (
* time_ns,
format! ( "task-{}" , task_id),
TaskState :: Spawned . as_u32 (),
);
if let Some ( loc) = location {
let loc_str = loc. to_string ();
datum = datum. with_tag ( loc_str. clone ());
if seen_locations. insert ( loc_str. clone ()) {
let tag_def =
TagDefinition :: new ( TaskState :: Spawned . as_u32 (), loc_str. clone ())
. with_property ( "file" . to_string (), loc. file . clone ())
. with_property ( "location" . to_string (), loc_str);
task_tags. push ( tag_def);
}
}
task_data. push ( datum);
}
Event :: TaskPollStart { task_id, time_ns } => {
task_data. push ( Datum :: new (
* time_ns,
format! ( "task-{}" , task_id),
TaskState :: Running . as_u32 (),
));
}
// If polling the task returned Poll::Ready, the task-terminate event will
// fire before task-poll-end for that poll. Otherwise, if task-terminate
// does not fire, the task is still pending.
// We track terminated tasks to avoid emitting a Waiting state after Completed.
Event :: TaskPollEnd { task_id, time_ns } => {
if !terminated_tasks. contains ( task_id) {
task_data. push ( Datum :: new (
* time_ns,
format! ( "task-{}" , task_id),
TaskState :: Waiting . as_u32 (),
));
}
}
Event :: TaskTerminate { task_id, time_ns } => {
task_data. push ( Datum :: new (
* time_ns,
format! ( "task-{}" , task_id),
TaskState :: Completed . as_u32 (),
));
terminated_tasks. insert ( * task_id);
}
Event :: WorkerStart { worker_id, time_ns }
| Event :: WorkerUnpark { worker_id, time_ns } => {
worker_data. push ( Datum :: new (
* time_ns,
format! ( "worker-{}" , worker_id),
WorkerState :: Running . as_u32 (),
));
}
Event :: WorkerStop { worker_id, time_ns } => {
worker_data. push ( Datum :: new (
* time_ns,
format! ( "worker-{}" , worker_id),
WorkerState :: Stopped . as_u32 (),
));
}
Event :: WorkerPark { worker_id, time_ns } => {
worker_data. push ( Datum :: new (
* time_ns,
format! ( "worker-{}" , worker_id),
WorkerState :: Parked . as_u32 (),
));
}
}
}
StatemapData {
task_data,
worker_data,
task_tags,
}
}
}
Writing output
The events_to_data method returns a StatemapData struct that holds the transformed data:
pub struct StatemapData {
task_data : Vec < Datum >,
worker_data : Vec < Datum >,
task_tags : Vec < TagDefinition >,
}
impl StatemapData {
StatemapData provides methods to write the collected data as concatenated JSON. Each method writes metadata first, followed by tag definitions (for tasks),
then the data payloads:
pub fn write_tasks < W : std:: io:: Write >(
& self ,
collector : & Collector ,
writer : & mut W ,
) -> Result <(), Error > {
let metadata = Metadata :: for_tasks ( collector. start_time_array ());
serde_json:: to_writer ( & mut * writer, & metadata) ?;
writeln! ( writer) ?;
for tag in & self . task_tags {
serde_json:: to_writer ( & mut * writer, tag) ?;
writeln! ( writer) ?;
}
for datum in & self . task_data {
serde_json:: to_writer ( & mut * writer, datum) ?;
writeln! ( writer) ?;
}
writer. flush () ?;
Ok (())
}
pub fn write_workers < W : std:: io:: Write >(
& self ,
collector : & Collector ,
writer : & mut W ,
) -> Result <(), Error > {
let metadata = Metadata :: for_workers ( collector. start_time_array ());
serde_json:: to_writer ( & mut * writer, & metadata) ?;
writeln! ( writer) ?;
for datum in & self . worker_data {
serde_json:: to_writer ( & mut * writer, datum) ?;
writeln! ( writer) ?;
}
writer. flush () ?;
Ok (())
}
Convenience methods return the output as byte vectors:
pub fn tasks_to_bytes ( & self , collector : & Collector ) -> Result < Vec < u8 >, Error > {
let mut buf = Vec :: new ();
self . write_tasks ( collector, & mut buf) ?;
Ok ( buf)
}
pub fn workers_to_bytes ( & self , collector : & Collector ) -> Result < Vec < u8 >, Error > {
let mut buf = Vec :: new ();
self . write_workers ( collector, & mut buf) ?;
Ok ( buf)
}
}
Tying it together
The errors we are tracking are:
# [ derive ( Debug , thiserror :: Error )]
pub enum Error {
# [ error ( "tokio-statemap requires `RUSTFLAGS=\"--cfg tokio_unstable\"`" )]
UnstableFeaturesRequired ,
/// `tokio-statemap` hooks were not registered as the layout of Tokio's
/// [`tokio::task::Id`] type has changed. See [`check_casts`] for details.
# [ error ( transparent )]
InvalidCasts ( # [ from ] InvalidCasts ),
# [ error ( "IO error: {0}" )]
Io ( # [ from ] std:: io:: Error ),
# [ error ( "JSON error: {0}" )]
Json ( # [ from ] serde_json:: Error ),
}
Finally, the register_hooks function ties everything together. It verifies the task ID cast is safe, creates the collector, and wires up all the
Tokio runtime hooks.
pub fn register_hooks (
builder : & mut tokio:: runtime:: Builder ,
config : CollectorConfig ,
) -> Result < Arc < Collector >, Error > {
# [ cfg ( tokio_unstable )]
{
check_casts () ?;
let c = Arc :: new ( Collector :: new ( config));
let task_hook = |f : fn ( & Collector , & tokio:: runtime:: TaskMeta ) | {
let c = c. clone ();
move |meta : & tokio:: runtime:: TaskMeta | f ( & c, meta)
};
let thread_hook = |f : fn ( & Collector ) | {
let c = c. clone ();
move || f ( & c)
};
builder. on_task_spawn ( task_hook ( Collector :: on_task_spawn));
builder. on_before_task_poll ( task_hook ( Collector :: on_before_task_poll));
builder. on_after_task_poll ( task_hook ( Collector :: on_after_task_poll));
builder. on_task_terminate ( task_hook ( Collector :: on_task_terminate));
builder. on_thread_start ( thread_hook ( Collector :: on_thread_start));
builder. on_thread_stop ( thread_hook ( Collector :: on_thread_stop));
builder. on_thread_park ( thread_hook ( Collector :: on_thread_park));
builder. on_thread_unpark ( thread_hook ( Collector :: on_thread_unpark));
Ok ( c)
}
# [ cfg ( not ( tokio_unstable ))]
{
Err ( Error :: UnstableFeaturesRequired )
}
}