@@ -5,16 +5,35 @@ use lock::Lock;
5
5
/// A slot in memory intended to represent the communication channel between one
6
6
/// producer and one consumer.
7
7
///
8
- /// Each slot contains space for a piece of data, `T`, and space for callbacks.
9
- /// The callbacks can be run when the data is either full or empty.
8
+ /// Each slot contains space for a piece of data of type `T`, and can have callbacks
9
+ /// registered to run when the slot is either full or empty.
10
+ ///
11
+ /// # Registering callbacks
12
+ ///
13
+ /// [`on_empty`](#method.on_empty) registers a callback to run when the slot
14
+ /// becomes empty, and [`on_full`](#method.on_full) registers one to run when it
15
+ /// becomes full. In both cases, the callback will run immediately if possible.
16
+ ///
17
+ /// At most one callback can be registered at any given time: it is an error to
18
+ /// attempt to register a callback with `on_full` if one is currently registered
19
+ /// via `on_empty`, or any other combination.
20
+ ///
21
+ /// # Cancellation
22
+ ///
23
+ /// Registering a callback returns a `Token` which can be used to
24
+ /// [`cancel`](#method.cancel) the callback. Only callbacks that have not yet
25
+ /// started running can be canceled. Canceling a callback that has already run
26
+ /// is not an error, and `cancel` does not signal whether or not the callback
27
+ /// was actually canceled to the caller.
10
28
pub struct Slot < T > {
11
29
state : AtomicUsize ,
12
30
slot : Lock < Option < T > > ,
13
31
on_full : Lock < Option < Box < FnBox < T > > > > ,
14
32
on_empty : Lock < Option < Box < FnBox < T > > > > ,
15
33
}
16
34
17
- /// Error value returned from erroneous calls to `try_produce`.
35
+ /// Error value returned from erroneous calls to `try_produce`, which contains
36
+ /// the value that was passed to `try_produce`.
18
37
#[ derive( Debug , PartialEq ) ]
19
38
pub struct TryProduceError < T > ( T ) ;
20
39
@@ -30,8 +49,17 @@ pub struct OnFullError(());
30
49
#[ derive( Debug , PartialEq ) ]
31
50
pub struct OnEmptyError ( ( ) ) ;
32
51
52
+ /// A `Token` represents a registered callback, and can be used to cancel the callback.
33
53
pub struct Token ( usize ) ;
34
54
55
+ /// Slot state: the lowest 3 bits are flags; the remaining bits are used to
56
+ /// store the `Token` for the currently registered callback. The special token
57
+ /// value 0 means no callback is registered.
58
+ ///
59
+ /// The flags are:
60
+ /// - `DATA`: the `Slot` contains a value
61
+ /// - `ON_FULL`: the `Slot` has an `on_full` callback registered
62
+ /// - `ON_EMPTY`: the `Slot` has an `on_empty` callback registered
35
63
struct State ( usize ) ;
36
64
37
65
fn _is_send < T : Send > ( ) { }
@@ -49,6 +77,9 @@ const STATE_BITS: usize = 3;
49
77
const STATE_MASK : usize = ( 1 << STATE_BITS ) - 1 ;
50
78
51
79
impl < T : ' static > Slot < T > {
80
+ /// Creates a new `Slot` containing `val`, which may be `None` to create an
81
+ /// empty `Slot`.
82
+ // TODO: Separate this into `new`, and `with_value` functions?
52
83
pub fn new ( val : Option < T > ) -> Slot < T > {
53
84
Slot {
54
85
state : AtomicUsize :: new ( if val. is_some ( ) { DATA } else { 0 } ) ,
@@ -59,6 +90,14 @@ impl<T: 'static> Slot<T> {
59
90
}
60
91
61
92
// PRODUCER
93
+ /// Attempts to store `t` in the slot.
94
+ ///
95
+ /// This method is to be called by the producer.
96
+ ///
97
+ /// # Errors
98
+ ///
99
+ /// Returns `Err` if the slot is already full. The value you attempted to
100
+ /// store is included in the error value.
62
101
pub fn try_produce ( & self , t : T ) -> Result < ( ) , TryProduceError < T > > {
63
102
let mut state = State ( self . state . load ( Ordering :: SeqCst ) ) ;
64
103
assert ! ( !state. flag( ON_EMPTY ) ) ;
@@ -90,15 +129,27 @@ impl<T: 'static> Slot<T> {
90
129
}
91
130
92
131
// PRODUCER
132
+ /// Registers `f` as a callback to run when the slot becomes empty. The
133
+ /// callback will run immediately if the slot is already empty. Returns a
134
+ /// token that can be used to cancel the callback.
135
+ ///
136
+ /// This method is to be called by the producer.
137
+ ///
138
+ /// # Panics
139
+ ///
140
+ /// Panics if another callback was already registered via `on_empty` or
141
+ /// `on_full`.
93
142
pub fn on_empty < F > ( & self , f : F ) -> Token
94
143
where F : FnOnce ( & Slot < T > ) + Send + ' static
95
144
{
96
145
let mut state = State ( self . state . load ( Ordering :: SeqCst ) ) ;
146
+ // TODO: return OnEmptyError instead of assert.
97
147
assert ! ( !state. flag( ON_EMPTY ) ) ;
98
148
if !state. flag ( DATA ) {
99
149
f ( self ) ;
100
150
return Token ( 0 )
101
151
}
152
+ // TODO: return OnEmptyError instead of assert?
102
153
assert ! ( !state. flag( ON_FULL ) ) ;
103
154
let mut slot = self . on_empty . try_lock ( ) . expect ( "on_empty interference" ) ;
104
155
assert ! ( slot. is_none( ) ) ;
@@ -129,6 +180,13 @@ impl<T: 'static> Slot<T> {
129
180
}
130
181
131
182
// CONSUMER
183
+ /// Attempts to consume the value stored in the slot.
184
+ ///
185
+ /// This method is to be called by the consumer.
186
+ ///
187
+ /// # Errors
188
+ ///
189
+ /// Returns `Err` if the slot is already empty.
132
190
pub fn try_consume ( & self ) -> Result < T , TryConsumeError > {
133
191
let mut state = State ( self . state . load ( Ordering :: SeqCst ) ) ;
134
192
assert ! ( !state. flag( ON_FULL ) ) ;
@@ -160,15 +218,27 @@ impl<T: 'static> Slot<T> {
160
218
}
161
219
162
220
// CONSUMER
221
+ /// Registers `f` as a callback to run when the slot becomes full. The
222
+ /// callback will run immediately if the slot is already full. Returns a
223
+ /// token that can be used to cancel the callback.
224
+ ///
225
+ /// This method is to be called by the consumer.
226
+ ///
227
+ /// # Panics
228
+ ///
229
+ /// Panics if another callback was already registered via `on_empty` or
230
+ /// `on_full`.
163
231
pub fn on_full < F > ( & self , f : F ) -> Token
164
232
where F : FnOnce ( & Slot < T > ) + Send + ' static
165
233
{
166
234
let mut state = State ( self . state . load ( Ordering :: SeqCst ) ) ;
235
+ // TODO: return OnFullError instead of assert.
167
236
assert ! ( !state. flag( ON_FULL ) ) ;
168
237
if state. flag ( DATA ) {
169
238
f ( self ) ;
170
239
return Token ( 0 )
171
240
}
241
+ // TODO: return OnFullError instead of assert?
172
242
assert ! ( !state. flag( ON_EMPTY ) ) ;
173
243
let mut slot = self . on_full . try_lock ( ) . expect ( "on_full interference" ) ;
174
244
assert ! ( slot. is_none( ) ) ;
@@ -198,6 +268,10 @@ impl<T: 'static> Slot<T> {
198
268
}
199
269
}
200
270
271
+ /// Cancels the callback associated with `token`.
272
+ ///
273
+ /// Canceling a callback that has already started running, or has already
274
+ /// run will do nothing, and is not an error. See [Cancellation](#cancellation).
201
275
pub fn cancel ( & self , token : Token ) {
202
276
let token = token. 0 ;
203
277
if token == 0 {
@@ -241,6 +315,7 @@ impl<T: 'static> Slot<T> {
241
315
}
242
316
243
317
impl < T > TryProduceError < T > {
318
+ /// Extracts the value that was attempted to be produced.
244
319
pub fn into_inner ( self ) -> T {
245
320
self . 0
246
321
}
0 commit comments