diff options
Diffstat (limited to 'components/util/deque/mod.rs')
-rw-r--r-- | components/util/deque/mod.rs | 268 |
1 files changed, 9 insertions, 259 deletions
diff --git a/components/util/deque/mod.rs b/components/util/deque/mod.rs index b98c872cf0f..505b09ab021 100644 --- a/components/util/deque/mod.rs +++ b/components/util/deque/mod.rs @@ -54,12 +54,13 @@ pub use self::Stolen::{Empty, Abort, Data}; use alloc::arc::Arc; use alloc::heap::{allocate, deallocate}; -use std::kinds::marker; +use std::marker; use std::mem::{forget, min_align_of, size_of, transmute}; use std::ptr; use std::sync::Mutex; -use std::sync::atomic::{AtomicInt, AtomicPtr, SeqCst}; +use std::sync::atomic::{AtomicInt, AtomicPtr}; +use std::sync::atomic::Ordering::SeqCst; // Once the queue is less than 1/K full, then it will be downsized. Note that // the deque requires that this number be less than 2. @@ -97,7 +98,7 @@ pub struct Stealer<T> { } /// When stealing some data, this is an enumeration of the possible outcomes. -#[deriving(PartialEq, Show)] +#[derive(PartialEq, Show)] pub enum Stolen<T> { /// The deque was empty at the time of stealing Empty, @@ -141,6 +142,8 @@ struct Buffer<T> { log_size: uint, } +unsafe impl<T: 'static> Send for Buffer<T> { } + impl<T: Send> BufferPool<T> { /// Allocates a new buffer pool which in turn can be used to allocate new /// deques. @@ -159,16 +162,16 @@ impl<T: Send> BufferPool<T> { fn alloc(&mut self, bits: uint) -> Box<Buffer<T>> { unsafe { - let mut pool = self.pool.lock(); + let mut pool = self.pool.lock().unwrap(); match pool.iter().position(|x| x.size() >= (1 << bits)) { - Some(i) => pool.remove(i).unwrap(), + Some(i) => pool.remove(i), None => box Buffer::new(bits) } } } fn free(&self, buf: Box<Buffer<T>>) { - let mut pool = self.pool.lock(); + let mut pool = self.pool.lock().unwrap(); match pool.iter().position(|v| v.size() > buf.size()) { Some(i) => pool.insert(i, buf), None => pool.push(buf), @@ -403,256 +406,3 @@ impl<T: Send> Drop for Buffer<T> { unsafe { deallocate(self.storage as *mut u8, size, min_align_of::<T>()) } } } - -#[cfg(test)] -mod tests { - use super::{Data, BufferPool, Abort, Empty, Worker, Stealer}; - - use std::mem; - use rustrt::thread::Thread; - use std::rand; - use std::rand::Rng; - use std::sync::atomic::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst, - AtomicUint, INIT_ATOMIC_UINT}; - use std::vec; - - #[test] - fn smoke() { - let pool = BufferPool::new(); - let (w, s) = pool.deque(); - assert_eq!(w.pop(), None); - assert_eq!(s.steal(), Empty); - w.push(1i); - assert_eq!(w.pop(), Some(1)); - w.push(1); - assert_eq!(s.steal(), Data(1)); - w.push(1); - assert_eq!(s.clone().steal(), Data(1)); - } - - #[test] - fn stealpush() { - static AMT: int = 100000; - let pool = BufferPool::<int>::new(); - let (w, s) = pool.deque(); - let t = Thread::start(proc() { - let mut left = AMT; - while left > 0 { - match s.steal() { - Data(i) => { - assert_eq!(i, 1); - left -= 1; - } - Abort | Empty => {} - } - } - }); - - for _ in range(0, AMT) { - w.push(1); - } - - t.join(); - } - - #[test] - fn stealpush_large() { - static AMT: int = 100000; - let pool = BufferPool::<(int, int)>::new(); - let (w, s) = pool.deque(); - let t = Thread::start(proc() { - let mut left = AMT; - while left > 0 { - match s.steal() { - Data((1, 10)) => { left -= 1; } - Data(..) => panic!(), - Abort | Empty => {} - } - } - }); - - for _ in range(0, AMT) { - w.push((1, 10)); - } - - t.join(); - } - - fn stampede(w: Worker<Box<int>>, s: Stealer<Box<int>>, - nthreads: int, amt: uint) { - for _ in range(0, amt) { - w.push(box 20); - } - let mut remaining = AtomicUint::new(amt); - let unsafe_remaining: *mut AtomicUint = &mut remaining; - - let threads = range(0, nthreads).map(|_| { - let s = s.clone(); - Thread::start(proc() { - unsafe { - while (*unsafe_remaining).load(SeqCst) > 0 { - match s.steal() { - Data(box 20) => { - (*unsafe_remaining).fetch_sub(1, SeqCst); - } - Data(..) => panic!(), - Abort | Empty => {} - } - } - } - }) - }).collect::<Vec<Thread<()>>>(); - - while remaining.load(SeqCst) > 0 { - match w.pop() { - Some(box 20) => { remaining.fetch_sub(1, SeqCst); } - Some(..) => panic!(), - None => {} - } - } - - for thread in threads.into_iter() { - thread.join(); - } - } - - #[test] - fn run_stampede() { - let pool = BufferPool::<Box<int>>::new(); - let (w, s) = pool.deque(); - stampede(w, s, 8, 10000); - } - - #[test] - fn many_stampede() { - static AMT: uint = 4; - let pool = BufferPool::<Box<int>>::new(); - let threads = range(0, AMT).map(|_| { - let (w, s) = pool.deque(); - Thread::start(proc() { - stampede(w, s, 4, 10000); - }) - }).collect::<Vec<Thread<()>>>(); - - for thread in threads.into_iter() { - thread.join(); - } - } - - #[test] - fn stress() { - static AMT: int = 100000; - static NTHREADS: int = 8; - static DONE: AtomicBool = INIT_ATOMIC_BOOL; - static HITS: AtomicUint = INIT_ATOMIC_UINT; - let pool = BufferPool::<int>::new(); - let (w, s) = pool.deque(); - - let threads = range(0, NTHREADS).map(|_| { - let s = s.clone(); - Thread::start(proc() { - loop { - match s.steal() { - Data(2) => { HITS.fetch_add(1, SeqCst); } - Data(..) => panic!(), - _ if DONE.load(SeqCst) => break, - _ => {} - } - } - }) - }).collect::<Vec<Thread<()>>>(); - - let mut rng = rand::task_rng(); - let mut expected = 0; - while expected < AMT { - if rng.gen_range(0i, 3) == 2 { - match w.pop() { - None => {} - Some(2) => { HITS.fetch_add(1, SeqCst); }, - Some(_) => panic!(), - } - } else { - expected += 1; - w.push(2); - } - } - - while HITS.load(SeqCst) < AMT as uint { - match w.pop() { - None => {} - Some(2) => { HITS.fetch_add(1, SeqCst); }, - Some(_) => panic!(), - } - } - DONE.store(true, SeqCst); - - for thread in threads.into_iter() { - thread.join(); - } - - assert_eq!(HITS.load(SeqCst), expected as uint); - } - - #[test] - #[cfg_attr(windows, ignore)] // apparently windows scheduling is weird? - fn no_starvation() { - static AMT: int = 10000; - static NTHREADS: int = 4; - static DONE: AtomicBool = INIT_ATOMIC_BOOL; - let pool = BufferPool::<(int, uint)>::new(); - let (w, s) = pool.deque(); - - let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| { - let s = s.clone(); - let unique_box = box AtomicUint::new(0); - let thread_box = unsafe { - *mem::transmute::<&Box<AtomicUint>, - *const *mut AtomicUint>(&unique_box) - }; - (Thread::start(proc() { - unsafe { - loop { - match s.steal() { - Data((1, 2)) => { - (*thread_box).fetch_add(1, SeqCst); - } - Data(..) => panic!(), - _ if DONE.load(SeqCst) => break, - _ => {} - } - } - } - }), unique_box) - })); - - let mut rng = rand::task_rng(); - let mut myhit = false; - 'outer: loop { - for _ in range(0, rng.gen_range(0, AMT)) { - if !myhit && rng.gen_range(0i, 3) == 2 { - match w.pop() { - None => {} - Some((1, 2)) => myhit = true, - Some(_) => panic!(), - } - } else { - w.push((1, 2)); - } - } - - for slot in hits.iter() { - let amt = slot.load(SeqCst); - if amt == 0 { continue 'outer; } - } - if myhit { - break - } - } - - DONE.store(true, SeqCst); - - for thread in threads.into_iter() { - thread.join(); - } - } -} |