118
118
119
119
use crate :: loom:: cell:: UnsafeCell ;
120
120
use crate :: loom:: sync:: atomic:: { AtomicBool , AtomicUsize } ;
121
- use crate :: loom:: sync:: { Arc , Mutex , MutexGuard , RwLock , RwLockReadGuard } ;
121
+ use crate :: loom:: sync:: { Arc , Mutex , MutexGuard } ;
122
122
use crate :: task:: coop:: cooperative;
123
123
use crate :: util:: linked_list:: { self , GuardedLinkedList , LinkedList } ;
124
124
use crate :: util:: WakeList ;
@@ -340,7 +340,7 @@ use super::Notify;
340
340
/// Data shared between senders and receivers.
341
341
struct Shared < T > {
342
342
/// slots in the channel.
343
- buffer : Box < [ RwLock < Slot < T > > ] > ,
343
+ buffer : Box < [ Mutex < Slot < T > > ] > ,
344
344
345
345
/// Mask a position -> index.
346
346
mask : usize ,
@@ -390,7 +390,7 @@ struct Slot<T> {
390
390
///
391
391
/// The value is set by `send` when the write lock is held. When a reader
392
392
/// drops, `rem` is decremented. When it hits zero, the value is dropped.
393
- val : UnsafeCell < Option < T > > ,
393
+ val : Option < T > ,
394
394
}
395
395
396
396
/// An entry in the wait queue.
@@ -428,7 +428,7 @@ generate_addr_of_methods! {
428
428
}
429
429
430
430
struct RecvGuard < ' a , T > {
431
- slot : RwLockReadGuard < ' a , Slot < T > > ,
431
+ slot : MutexGuard < ' a , Slot < T > > ,
432
432
}
433
433
434
434
/// Receive a value future.
@@ -437,11 +437,15 @@ struct Recv<'a, T> {
437
437
receiver : & ' a mut Receiver < T > ,
438
438
439
439
/// Entry in the waiter `LinkedList`.
440
- waiter : UnsafeCell < Waiter > ,
440
+ waiter : WaiterCell ,
441
441
}
442
442
443
- unsafe impl < ' a , T : Send > Send for Recv < ' a , T > { }
444
- unsafe impl < ' a , T : Send > Sync for Recv < ' a , T > { }
443
+ // The wrapper around `UnsafeCell` isolates the unsafe impl `Send` and `Sync`
444
+ // from `Recv`.
445
+ struct WaiterCell ( UnsafeCell < Waiter > ) ;
446
+
447
+ unsafe impl Send for WaiterCell { }
448
+ unsafe impl Sync for WaiterCell { }
445
449
446
450
/// Max number of receivers. Reserve space to lock.
447
451
const MAX_RECEIVERS : usize = usize:: MAX >> 2 ;
@@ -509,15 +513,6 @@ pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
509
513
( tx, rx)
510
514
}
511
515
512
- unsafe impl < T : Send > Send for Sender < T > { }
513
- unsafe impl < T : Send > Sync for Sender < T > { }
514
-
515
- unsafe impl < T : Send > Send for WeakSender < T > { }
516
- unsafe impl < T : Send > Sync for WeakSender < T > { }
517
-
518
- unsafe impl < T : Send > Send for Receiver < T > { }
519
- unsafe impl < T : Send > Sync for Receiver < T > { }
520
-
521
516
impl < T > Sender < T > {
522
517
/// Creates the sending-half of the [`broadcast`] channel.
523
518
///
@@ -556,10 +551,10 @@ impl<T> Sender<T> {
556
551
let mut buffer = Vec :: with_capacity ( capacity) ;
557
552
558
553
for i in 0 ..capacity {
559
- buffer. push ( RwLock :: new ( Slot {
554
+ buffer. push ( Mutex :: new ( Slot {
560
555
rem : AtomicUsize :: new ( 0 ) ,
561
556
pos : ( i as u64 ) . wrapping_sub ( capacity as u64 ) ,
562
- val : UnsafeCell :: new ( None ) ,
557
+ val : None ,
563
558
} ) ) ;
564
559
}
565
560
@@ -647,7 +642,7 @@ impl<T> Sender<T> {
647
642
tail. pos = tail. pos . wrapping_add ( 1 ) ;
648
643
649
644
// Get the slot
650
- let mut slot = self . shared . buffer [ idx] . write ( ) ;
645
+ let mut slot = self . shared . buffer [ idx] . lock ( ) ;
651
646
652
647
// Track the position
653
648
slot. pos = pos;
@@ -656,7 +651,7 @@ impl<T> Sender<T> {
656
651
slot. rem . with_mut ( |v| * v = rem) ;
657
652
658
653
// Write the value
659
- slot. val = UnsafeCell :: new ( Some ( value) ) ;
654
+ slot. val = Some ( value) ;
660
655
661
656
// Release the slot lock before notifying the receivers.
662
657
drop ( slot) ;
@@ -755,7 +750,7 @@ impl<T> Sender<T> {
755
750
while low < high {
756
751
let mid = low + ( high - low) / 2 ;
757
752
let idx = base_idx. wrapping_add ( mid) & self . shared . mask ;
758
- if self . shared . buffer [ idx] . read ( ) . rem . load ( SeqCst ) == 0 {
753
+ if self . shared . buffer [ idx] . lock ( ) . rem . load ( SeqCst ) == 0 {
759
754
low = mid + 1 ;
760
755
} else {
761
756
high = mid;
@@ -797,7 +792,7 @@ impl<T> Sender<T> {
797
792
let tail = self . shared . tail . lock ( ) ;
798
793
799
794
let idx = ( tail. pos . wrapping_sub ( 1 ) & self . shared . mask as u64 ) as usize ;
800
- self . shared . buffer [ idx] . read ( ) . rem . load ( SeqCst ) == 0
795
+ self . shared . buffer [ idx] . lock ( ) . rem . load ( SeqCst ) == 0
801
796
}
802
797
803
798
/// Returns the number of active receivers.
@@ -1230,7 +1225,7 @@ impl<T> Receiver<T> {
1230
1225
let idx = ( self . next & self . shared . mask as u64 ) as usize ;
1231
1226
1232
1227
// The slot holding the next value to read
1233
- let mut slot = self . shared . buffer [ idx] . read ( ) ;
1228
+ let mut slot = self . shared . buffer [ idx] . lock ( ) ;
1234
1229
1235
1230
if slot. pos != self . next {
1236
1231
// Release the `slot` lock before attempting to acquire the `tail`
@@ -1247,7 +1242,7 @@ impl<T> Receiver<T> {
1247
1242
let mut tail = self . shared . tail . lock ( ) ;
1248
1243
1249
1244
// Acquire slot lock again
1250
- slot = self . shared . buffer [ idx] . read ( ) ;
1245
+ slot = self . shared . buffer [ idx] . lock ( ) ;
1251
1246
1252
1247
// Make sure the position did not change. This could happen in the
1253
1248
// unlikely event that the buffer is wrapped between dropping the
@@ -1581,12 +1576,12 @@ impl<'a, T> Recv<'a, T> {
1581
1576
fn new ( receiver : & ' a mut Receiver < T > ) -> Recv < ' a , T > {
1582
1577
Recv {
1583
1578
receiver,
1584
- waiter : UnsafeCell :: new ( Waiter {
1579
+ waiter : WaiterCell ( UnsafeCell :: new ( Waiter {
1585
1580
queued : AtomicBool :: new ( false ) ,
1586
1581
waker : None ,
1587
1582
pointers : linked_list:: Pointers :: new ( ) ,
1588
1583
_p : PhantomPinned ,
1589
- } ) ,
1584
+ } ) ) ,
1590
1585
}
1591
1586
}
1592
1587
@@ -1598,7 +1593,7 @@ impl<'a, T> Recv<'a, T> {
1598
1593
is_unpin :: < & mut Receiver < T > > ( ) ;
1599
1594
1600
1595
let me = self . get_unchecked_mut ( ) ;
1601
- ( me. receiver , & me. waiter )
1596
+ ( me. receiver , & me. waiter . 0 )
1602
1597
}
1603
1598
}
1604
1599
}
@@ -1632,6 +1627,7 @@ impl<'a, T> Drop for Recv<'a, T> {
1632
1627
// `Shared::notify_rx` before we drop the object.
1633
1628
let queued = self
1634
1629
. waiter
1630
+ . 0
1635
1631
. with ( |ptr| unsafe { ( * ptr) . queued . load ( Acquire ) } ) ;
1636
1632
1637
1633
// If the waiter is queued, we need to unlink it from the waiters list.
@@ -1646,6 +1642,7 @@ impl<'a, T> Drop for Recv<'a, T> {
1646
1642
// `Relaxed` order suffices because we hold the tail lock.
1647
1643
let queued = self
1648
1644
. waiter
1645
+ . 0
1649
1646
. with_mut ( |ptr| unsafe { ( * ptr) . queued . load ( Relaxed ) } ) ;
1650
1647
1651
1648
if queued {
@@ -1654,7 +1651,7 @@ impl<'a, T> Drop for Recv<'a, T> {
1654
1651
// safety: tail lock is held and the wait node is verified to be in
1655
1652
// the list.
1656
1653
unsafe {
1657
- self . waiter . with_mut ( |ptr| {
1654
+ self . waiter . 0 . with_mut ( |ptr| {
1658
1655
tail. waiters . remove ( ( & mut * ptr) . into ( ) ) ;
1659
1656
} ) ;
1660
1657
}
@@ -1706,16 +1703,15 @@ impl<'a, T> RecvGuard<'a, T> {
1706
1703
where
1707
1704
T : Clone ,
1708
1705
{
1709
- self . slot . val . with ( |ptr| unsafe { ( * ptr ) . clone ( ) } )
1706
+ self . slot . val . clone ( )
1710
1707
}
1711
1708
}
1712
1709
1713
1710
impl < ' a , T > Drop for RecvGuard < ' a , T > {
1714
1711
fn drop ( & mut self ) {
1715
1712
// Decrement the remaining counter
1716
1713
if 1 == self . slot . rem . fetch_sub ( 1 , SeqCst ) {
1717
- // Safety: Last receiver, drop the value
1718
- self . slot . val . with_mut ( |ptr| unsafe { * ptr = None } ) ;
1714
+ self . slot . val = None ;
1719
1715
}
1720
1716
}
1721
1717
}
0 commit comments