Skip to content

refactor: Use u64 task IDs #75

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions console-api/src/tasks.rs
Original file line number Diff line number Diff line change
@@ -1 +1,17 @@
tonic::include_proto!("rs.tokio.console.tasks");

// === IDs ===

impl From<u64> for TaskId {
fn from(id: u64) -> Self {
TaskId { id }
}
}

impl From<TaskId> for u64 {
fn from(id: TaskId) -> Self {
id.id
}
}

impl Copy for TaskId {}
82 changes: 54 additions & 28 deletions console-subscriber/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::sync::{mpsc, Notify};

use futures::FutureExt;
use std::{
collections::HashMap,
collections::{hash_map::Entry, HashMap},
convert::TryInto,
ops::{Deref, DerefMut},
sync::{
Expand All @@ -22,6 +22,8 @@ use hdrhistogram::{
Histogram,
};

pub type TaskId = u64;

pub(crate) struct Aggregator {
/// Channel of incoming events emitted by `TaskLayer`s.
events: mpsc::Receiver<Event>,
Expand All @@ -42,7 +44,7 @@ pub(crate) struct Aggregator {
watchers: Vec<Watch<proto::tasks::TaskUpdate>>,

/// Currently active RPCs streaming task details events, by task ID.
details_watchers: HashMap<span::Id, Vec<Watch<proto::tasks::TaskDetails>>>,
details_watchers: HashMap<TaskId, Vec<Watch<proto::tasks::TaskDetails>>>,

/// *All* metadata for task spans and user-defined spans that we care about.
///
Expand All @@ -59,6 +61,12 @@ pub(crate) struct Aggregator {

/// Map of task IDs to task stats.
stats: TaskData<Stats>,

/// A counter for the pretty task IDs.
task_id_counter: TaskId,

/// A table that contains the span ID to pretty task ID mappings.
task_id_mappings: HashMap<span::Id, TaskId>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a possible optimization for this is adding a custom Hasher implementation that always just calls span::Id::into_u64 and returns that value as the hash, rather than actually hashing them. but, we can do that in a follow-up...especially because I don't really think the performance impact is particularly significant.

}

#[derive(Debug)]
Expand Down Expand Up @@ -89,7 +97,7 @@ struct Stats {

#[derive(Default)]
struct TaskData<T> {
data: HashMap<span::Id, (T, bool)>,
data: HashMap<TaskId, (T, bool)>,
}

struct Task {
Expand Down Expand Up @@ -139,9 +147,11 @@ impl Aggregator {
all_metadata: Vec::new(),
new_metadata: Vec::new(),
tasks: TaskData {
data: HashMap::<span::Id, (Task, bool)>::new(),
data: HashMap::<TaskId, (Task, bool)>::new(),
},
stats: TaskData::default(),
task_id_counter: 0,
task_id_mappings: HashMap::new(),
}
}

Expand Down Expand Up @@ -223,13 +233,13 @@ impl Aggregator {
let new_tasks = self
.tasks
.all()
.map(|(id, task)| task.to_proto(id.clone()))
.map(|(&id, task)| task.to_proto(id))
.collect();
let now = SystemTime::now();
let stats_update = self
.stats
.all()
.map(|(id, stats)| (id.into_u64(), stats.to_proto()))
.map(|(&id, stats)| (id, stats.to_proto()))
.collect();
// Send the initial state --- if this fails, the subscription is already dead
if subscription.update(&proto::tasks::TaskUpdate {
Expand All @@ -256,22 +266,21 @@ impl Aggregator {
buffer,
} = watch_request;
tracing::debug!(id = ?id, "new task details subscription");
let task_id: span::Id = id.into();
if let Some(stats) = self.stats.get(&task_id) {
if let Some(stats) = self.stats.get(&id) {
let (tx, rx) = mpsc::channel(buffer);
let subscription = Watch(tx);
let now = SystemTime::now();
// Send back the stream receiver.
// Then send the initial state --- if this fails, the subscription is already dead.
if stream_sender.send(rx).is_ok()
&& subscription.update(&proto::tasks::TaskDetails {
task_id: Some(task_id.clone().into()),
task_id: Some(id.into()),
now: Some(now.into()),
poll_times_histogram: serialize_histogram(&stats.poll_times_histogram).ok(),
})
{
self.details_watchers
.entry(task_id)
.entry(id)
.or_insert_with(Vec::new)
.push(subscription);
}
Expand All @@ -294,13 +303,13 @@ impl Aggregator {
let new_tasks = self
.tasks
.since_last_update()
.map(|(id, task)| task.to_proto(id.clone()))
.map(|(&id, task)| task.to_proto(id))
.collect();
let now = SystemTime::now();
let stats_update = self
.stats
.since_last_update()
.map(|(id, stats)| (id.into_u64(), stats.to_proto()))
.map(|(&id, stats)| (id, stats.to_proto()))
.collect();

let update = proto::tasks::TaskUpdate {
Expand All @@ -315,10 +324,10 @@ impl Aggregator {
let stats = &self.stats;
// Assuming there are much fewer task details subscribers than there are
// stats updates, iterate over `details_watchers` and compact the map.
self.details_watchers.retain(|id, watchers| {
if let Some(task_stats) = stats.get(id) {
self.details_watchers.retain(|&id, watchers| {
if let Some(task_stats) = stats.get(&id) {
let details = proto::tasks::TaskDetails {
task_id: Some(id.clone().into()),
task_id: Some(id.into()),
now: Some(now.into()),
poll_times_histogram: serialize_histogram(&task_stats.poll_times_histogram)
.ok(),
Expand All @@ -345,16 +354,17 @@ impl Aggregator {
at,
fields,
} => {
let task_id = self.get_or_insert_task_id(id);
self.tasks.insert(
id.clone(),
task_id,
Task {
metadata,
fields,
// TODO: parents
},
);
self.stats.insert(
id,
task_id,
Stats {
polls: 0,
created_at: Some(at),
Expand All @@ -363,7 +373,8 @@ impl Aggregator {
);
}
Event::Enter { id, at } => {
let mut stats = self.stats.update_or_default(id);
let task_id = self.get_or_insert_task_id(id);
let mut stats = self.stats.update_or_default(task_id);
if stats.current_polls == 0 {
stats.last_poll_started = Some(at);
if stats.first_poll == None {
Expand All @@ -375,7 +386,8 @@ impl Aggregator {
}

Event::Exit { id, at } => {
let mut stats = self.stats.update_or_default(id);
let task_id = self.get_or_insert_task_id(id);
let mut stats = self.stats.update_or_default(task_id);
stats.current_polls -= 1;
if stats.current_polls == 0 {
if let Some(last_poll_started) = stats.last_poll_started {
Expand All @@ -391,17 +403,19 @@ impl Aggregator {
}

Event::Close { id, at } => {
self.stats.update_or_default(id).closed_at = Some(at);
let task_id = self.get_or_insert_task_id(id);
self.stats.update_or_default(task_id).closed_at = Some(at);
}

Event::Waker { id, op, at } => {
let task_id = self.get_or_insert_task_id(id);
// It's possible for wakers to exist long after a task has
// finished. We don't want those cases to create a "new"
// task that isn't closed, just to insert some waker stats.
//
// It may be useful to eventually be able to report about
// "wasted" waker ops, but we'll leave that for another time.
if let Some(mut stats) = self.stats.update(&id) {
if let Some(mut stats) = self.stats.update(&task_id) {
match op {
WakeOp::Wake | WakeOp::WakeByRef => {
stats.wakes += 1;
Expand Down Expand Up @@ -431,6 +445,18 @@ impl Aggregator {
}
}

fn get_or_insert_task_id(&mut self, span_id: span::Id) -> TaskId {
match self.task_id_mappings.entry(span_id) {
Entry::Occupied(entry) => *entry.get(),
Entry::Vacant(entry) => {
let task_id = self.task_id_counter;
entry.insert(task_id);
self.task_id_counter = self.task_id_counter.wrapping_add(1);
task_id
}
}
Comment on lines +449 to +457
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit/TIOLI: I think this match could be replaced with Entry::or_insert_with

Suggested change
match self.task_id_mappings.entry(span_id) {
Entry::Occupied(entry) => *entry.get(),
Entry::Vacant(entry) => {
let task_id = self.task_id_counter;
entry.insert(task_id);
self.task_id_counter = self.task_id_counter.wrapping_add(1);
task_id
}
}
self.task_id_mappings
.entry(span_id)
.or_insert_with(|| {
let task_id = self.task_id_counter;
entry.insert(task_id);
self.task_id_counter = self.task_id_counter.wrapping_add(1);
task_id
})

Copy link
Contributor Author

@oguzbilgener oguzbilgener Aug 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually the first thing I tried, but unfortunately the borrow checker is not happy about the mutation inside the closure 😕 I'm going to do a bit more reading to see if there are any other idiomatic ways.

error[E0500]: closure requires unique access to `self` but it is already borrowed
   --> console-subscriber/src/aggregator.rs:449:62
    |
449 |         *self.task_id_mappings.entry(span_id).or_insert_with(|| {
    |          ---------------------                -------------- ^^ closure construction occurs here
    |          |                                    |
    |          |                                    first borrow later used by call
    |          borrow occurs here
450 |             let task_id = self.task_id_counter;
451 |             self.task_id_counter = self.task_id_counter.wrapping_add(1);
    |             -------------------- second borrow occurs due to use of `self` in closure

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, huh, never mind — the current approach is fine, then, if the borrow checker doesn't like this.

I think we could "split" the borrows, like this:

Suggested change
match self.task_id_mappings.entry(span_id) {
Entry::Occupied(entry) => *entry.get(),
Entry::Vacant(entry) => {
let task_id = self.task_id_counter;
entry.insert(task_id);
self.task_id_counter = self.task_id_counter.wrapping_add(1);
task_id
}
}
let task_id_mappings = &mut self.task_id_mappings;
let task_id_counter = &mut self.task_id_counter;
task_id_mappings
.entry(span_id)
.or_insert_with(|| {
let task_id = *task_id_counter;
entry.insert(task_id);
*task_id_counter = task_id_counter.wrapping_add(1);
task_id
})

but this is the same number of lines as the match version, and i'm not sure if it's meaningfully clearer. so, it may not be worth bothering...

}

fn drop_closed_tasks(&mut self) {
let tasks = &mut self.tasks;
let stats = &mut self.stats;
Expand Down Expand Up @@ -510,22 +536,22 @@ impl Flush {
}

impl<T> TaskData<T> {
fn update_or_default(&mut self, id: span::Id) -> Updating<'_, T>
fn update_or_default(&mut self, id: TaskId) -> Updating<'_, T>
where
T: Default,
{
Updating(self.data.entry(id).or_default())
}

fn update(&mut self, id: &span::Id) -> Option<Updating<'_, T>> {
fn update(&mut self, id: &TaskId) -> Option<Updating<'_, T>> {
self.data.get_mut(id).map(Updating)
}

fn insert(&mut self, id: span::Id, data: T) {
fn insert(&mut self, id: TaskId, data: T) {
self.data.insert(id, (data, true));
}

fn since_last_update(&mut self) -> impl Iterator<Item = (&span::Id, &mut T)> {
fn since_last_update(&mut self) -> impl Iterator<Item = (&TaskId, &mut T)> {
self.data.iter_mut().filter_map(|(id, (data, dirty))| {
if *dirty {
*dirty = false;
Expand All @@ -536,11 +562,11 @@ impl<T> TaskData<T> {
})
}

fn all(&self) -> impl Iterator<Item = (&span::Id, &T)> {
fn all(&self) -> impl Iterator<Item = (&TaskId, &T)> {
self.data.iter().map(|(id, (data, _))| (id, data))
}

fn get(&self, id: &span::Id) -> Option<&T> {
fn get(&self, id: &TaskId) -> Option<&T> {
self.data.get(id).map(|(data, _)| data)
}
}
Expand Down Expand Up @@ -603,7 +629,7 @@ impl Stats {
}

impl Task {
fn to_proto(&self, id: span::Id) -> proto::tasks::Task {
fn to_proto(&self, id: u64) -> proto::tasks::Task {
proto::tasks::Task {
id: Some(id.into()),
// TODO: more kinds of tasks...
Expand Down
7 changes: 4 additions & 3 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use console_api as proto;
use proto::SpanId;
use tokio::sync::{mpsc, oneshot};

use std::{
Expand All @@ -26,6 +25,8 @@ use callsites::Callsites;

pub use init::{build, init};

use crate::aggregator::TaskId;

pub struct TasksLayer {
tx: mpsc::Sender<Event>,
flush: Arc<aggregator::Flush>,
Expand Down Expand Up @@ -58,7 +59,7 @@ enum WatchKind {
}

struct WatchRequest<T> {
id: SpanId,
id: TaskId,
stream_sender: oneshot::Sender<mpsc::Receiver<Result<T, tonic::Status>>>,
buffer: usize,
}
Expand Down Expand Up @@ -368,7 +369,7 @@ impl proto::tasks::tasks_server::Tasks for Server {
// Check with the aggregator task to request a stream if the task exists.
let (stream_sender, stream_recv) = oneshot::channel();
permit.send(WatchKind::TaskDetail(WatchRequest {
id: task_id.clone(),
id: task_id.into(),
stream_sender,
buffer: self.client_buffer,
}));
Expand Down
6 changes: 0 additions & 6 deletions console/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ pub(crate) type DetailsRef = Rc<RefCell<Option<Details>>>;
#[derive(Debug)]
pub(crate) struct Task {
id: u64,
id_hex: String,
fields: Vec<Field>,
formatted_fields: Vec<Vec<Span<'static>>>,
kind: &'static str,
Expand Down Expand Up @@ -191,7 +190,6 @@ impl State {
let stats = stats_update.remove(&id)?.into();
let mut task = Task {
id,
id_hex: format!("{:x}", id),
fields,
formatted_fields,
kind,
Expand Down Expand Up @@ -260,10 +258,6 @@ impl Task {
self.id
}

pub(crate) fn id_hex(&self) -> &str {
&self.id_hex
}

pub(crate) fn target(&self) -> &str {
&self.target
}
Expand Down
2 changes: 1 addition & 1 deletion console/src/view/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl TaskView {
Span::raw(" = quit"),
]);

let attrs = Spans::from(vec![bold("ID: "), Span::raw(task.id_hex())]);
let attrs = Spans::from(vec![bold("ID: "), Span::raw(task.id().to_string())]);
let target = Spans::from(vec![bold("Target: "), Span::raw(task.target())]);

let mut total = vec![bold("Total Time: "), dur(task.total(now))];
Expand Down
2 changes: 1 addition & 1 deletion console/src/view/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl List {
let task = task.borrow();

let mut row = Row::new(vec![
Cell::from(id_width.update_str(task.id_hex()).to_string()),
Cell::from(id_width.update_str(task.id().to_string())),
// TODO(eliza): is there a way to write a `fmt::Debug` impl
// directly to tui without doing an allocation?
Cell::from(task.kind().to_string()),
Expand Down
10 changes: 7 additions & 3 deletions proto/tasks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@ service Tasks {
rpc WatchTaskDetails(DetailsRequest) returns (stream TaskDetails) {}
}

message TaskId {
uint64 id = 1;
}

message TasksRequest {
}

message DetailsRequest {
common.SpanId id = 1;
TaskId id = 1;
}

// A task state update.
Expand Down Expand Up @@ -52,7 +56,7 @@ message TaskUpdate {
// A task details update
message TaskDetails {
// The task's ID which the details belong to.
common.SpanId task_id = 1;
TaskId task_id = 1;

google.protobuf.Timestamp now = 2;

Expand All @@ -69,7 +73,7 @@ message Task {
// identified by this ID; if the client requires additional information
// included in the `Task` message, it should store that data and access it
// by ID.
common.SpanId id = 1;
TaskId id = 1;
// The numeric ID of the task's `Metadata`.
//
// This identifies the `Metadata` that describes the `tracing` span
Expand Down