@@ -5,16 +5,34 @@ use lock::Lock;
5
5
/// A slot in memory intended to represent the communication channel between one
6
6
/// producer and one consumer.
7
7
///
8
- /// Each slot contains space for a piece of data, `T`, and space for callbacks.
9
- /// The callbacks can be run when the data is either full or empty.
8
+ /// Each slot contains space for a piece of data of type `T`, and can have callbacks
9
+ /// registered to run when the slot is either full or empty.
10
+ ///
11
+ /// # Registering callbacks
12
+ ///
13
+ /// [`on_empty`](#method.on_empty) registers a callback to run when the slot
14
+ /// becomes empty, and [`on_full`](#method.on_full) registers one to run when it
15
+ /// becomes full. In both cases, the callback will run immediately if possible.
16
+ ///
17
+ /// At most one callback can be registered at any given time: it is an error to
18
+ /// attempt to register a callback with `on_full` if one is currently registered
19
+ /// via `on_empty`, or any other combination.
20
+ ///
21
+ /// # Cancellation
22
+ ///
23
+ /// Registering a callback returns a `Token` which can be used to
24
+ /// [`cancel`](#method.cancel) the callback. Cancellation is best-effort:
25
+ /// cancelling a callback that has already run is not an error, and does not
26
+ /// signal this to the caller of `cancel`.
10
27
pub struct Slot < T > {
11
28
state : AtomicUsize ,
12
29
slot : Lock < Option < T > > ,
13
30
on_full : Lock < Option < Box < FnBox < T > > > > ,
14
31
on_empty : Lock < Option < Box < FnBox < T > > > > ,
15
32
}
16
33
17
- /// Error value returned from erroneous calls to `try_produce`.
34
+ /// Error value returned from erroneous calls to `try_produce`, which contains
35
+ /// the value that was passed to `try_produce`.
18
36
#[ derive( Debug , PartialEq ) ]
19
37
pub struct TryProduceError < T > ( T ) ;
20
38
@@ -30,8 +48,17 @@ pub struct OnFullError(());
30
48
#[ derive( Debug , PartialEq ) ]
31
49
pub struct OnEmptyError ( ( ) ) ;
32
50
51
+ /// A `Token` represents a registered callback, and can be used to cancel the callback.
33
52
pub struct Token ( usize ) ;
34
53
54
+ /// Slot state: the lowest 3 bits are flags; the remaining bits are used to
55
+ /// store the `Token` for the currently registered callback. The special token
56
+ /// value 0 means no callback is registered.
57
+ ///
58
+ /// The flags are:
59
+ /// - `DATA`: the `Slot` contains a value
60
+ /// - `ON_FULL`: the `Slot` has an `on_full` callback registered
61
+ /// - `ON_EMPTY`: the `Slot` has an `on_empty` callback registered
35
62
struct State ( usize ) ;
36
63
37
64
fn _is_send < T : Send > ( ) { }
@@ -49,6 +76,9 @@ const STATE_BITS: usize = 3;
49
76
const STATE_MASK : usize = ( 1 << STATE_BITS ) - 1 ;
50
77
51
78
impl < T : ' static > Slot < T > {
79
+ /// Creates a new `Slot` containing `val`, which may be `None` to create an
80
+ /// empty `Slot`.
81
+ // TODO: Separate this into `new`, and `with_value` functions?
52
82
pub fn new ( val : Option < T > ) -> Slot < T > {
53
83
Slot {
54
84
state : AtomicUsize :: new ( if val. is_some ( ) { DATA } else { 0 } ) ,
@@ -59,6 +89,14 @@ impl<T: 'static> Slot<T> {
59
89
}
60
90
61
91
// PRODUCER
92
+ /// Attempts to store `t` in the slot.
93
+ ///
94
+ /// This method is to be called by the producer.
95
+ ///
96
+ /// # Errors
97
+ ///
98
+ /// Returns `Err` if the slot is already full. The value you attempted to
99
+ /// store is included in the error value.
62
100
pub fn try_produce ( & self , t : T ) -> Result < ( ) , TryProduceError < T > > {
63
101
let mut state = State ( self . state . load ( Ordering :: SeqCst ) ) ;
64
102
assert ! ( !state. flag( ON_EMPTY ) ) ;
@@ -90,15 +128,27 @@ impl<T: 'static> Slot<T> {
90
128
}
91
129
92
130
// PRODUCER
131
+ /// Registers `f` as a callback to run when the slot becomes empty. The
132
+ /// callback will run immediately if the slot is already empty. Returns a
133
+ /// token that can be used to cancel the callback.
134
+ ///
135
+ /// This method is to be called by the producer.
136
+ ///
137
+ /// # Panics
138
+ ///
139
+ /// Panics if another callback was already registered via `on_empty` or
140
+ /// `on_full`.
93
141
pub fn on_empty < F > ( & self , f : F ) -> Token
94
142
where F : FnOnce ( & Slot < T > ) + Send + ' static
95
143
{
96
144
let mut state = State ( self . state . load ( Ordering :: SeqCst ) ) ;
145
+ // TODO: return OnEmptyError instead of assert.
97
146
assert ! ( !state. flag( ON_EMPTY ) ) ;
98
147
if !state. flag ( DATA ) {
99
148
f ( self ) ;
100
149
return Token ( 0 )
101
150
}
151
+ // TODO: return OnEmptyError instead of assert?
102
152
assert ! ( !state. flag( ON_FULL ) ) ;
103
153
let mut slot = self . on_empty . try_lock ( ) . expect ( "on_empty interference" ) ;
104
154
assert ! ( slot. is_none( ) ) ;
@@ -129,6 +179,13 @@ impl<T: 'static> Slot<T> {
129
179
}
130
180
131
181
// CONSUMER
182
+ /// Attempts to consume the value stored in the slot.
183
+ ///
184
+ /// This method is to be called by the consumer.
185
+ ///
186
+ /// # Errors
187
+ ///
188
+ /// Returns `Err` if the slot is already empty.
132
189
pub fn try_consume ( & self ) -> Result < T , TryConsumeError > {
133
190
let mut state = State ( self . state . load ( Ordering :: SeqCst ) ) ;
134
191
assert ! ( !state. flag( ON_FULL ) ) ;
@@ -160,15 +217,27 @@ impl<T: 'static> Slot<T> {
160
217
}
161
218
162
219
// CONSUMER
220
+ /// Registers `f` as a callback to run when the slot becomes full. The
221
+ /// callback will run immediately if the slot is already full. Returns a
222
+ /// token that can be used to cancel the callback.
223
+ ///
224
+ /// This method is to be called by the consumer.
225
+ ///
226
+ /// # Panics
227
+ ///
228
+ /// Panics if another callback was already registered via `on_empty` or
229
+ /// `on_full`.
163
230
pub fn on_full < F > ( & self , f : F ) -> Token
164
231
where F : FnOnce ( & Slot < T > ) + Send + ' static
165
232
{
166
233
let mut state = State ( self . state . load ( Ordering :: SeqCst ) ) ;
234
+ // TODO: return OnFullError instead of assert.
167
235
assert ! ( !state. flag( ON_FULL ) ) ;
168
236
if state. flag ( DATA ) {
169
237
f ( self ) ;
170
238
return Token ( 0 )
171
239
}
240
+ // TODO: return OnFullError instead of assert?
172
241
assert ! ( !state. flag( ON_EMPTY ) ) ;
173
242
let mut slot = self . on_full . try_lock ( ) . expect ( "on_full interference" ) ;
174
243
assert ! ( slot. is_none( ) ) ;
@@ -198,6 +267,8 @@ impl<T: 'static> Slot<T> {
198
267
}
199
268
}
200
269
270
+ /// Cancels the callback associated with `token`. Returns successfully even
271
+ /// if the callback had already run; see [Cancellation](#cancellation).
201
272
pub fn cancel ( & self , token : Token ) {
202
273
let token = token. 0 ;
203
274
if token == 0 {
@@ -241,6 +312,7 @@ impl<T: 'static> Slot<T> {
241
312
}
242
313
243
314
impl < T > TryProduceError < T > {
315
+ /// Extracts the value that was attempted to be produced.
244
316
pub fn into_inner ( self ) -> T {
245
317
self . 0
246
318
}
0 commit comments