From 30ced95520bbd07b6d4736cada7a0e2178966bb0 Mon Sep 17 00:00:00 2001 From: Patrick Walton Date: Tue, 21 Jan 2014 11:52:20 -0800 Subject: [PATCH] libextra: Rewrite `MutexArc` --- src/libextra/arc.rs | 263 +++++++--------------- src/test/bench/msgsend-ring-mutex-arcs.rs | 119 ---------- src/test/compile-fail/mutex-arc-nested.rs | 26 --- 3 files changed, 85 insertions(+), 323 deletions(-) delete mode 100644 src/test/bench/msgsend-ring-mutex-arcs.rs delete mode 100644 src/test/compile-fail/mutex-arc-nested.rs diff --git a/src/libextra/arc.rs b/src/libextra/arc.rs index a411c4e9185b1..6cdfe8d13d6c2 100644 --- a/src/libextra/arc.rs +++ b/src/libextra/arc.rs @@ -48,6 +48,7 @@ use std::cast; use std::sync::arc::UnsafeArc; use std::task; use std::borrow; +use std::unstable::mutex; /// As sync::condvar, a mechanism for unlock-and-descheduling and signaling. pub struct Condvar<'a> { @@ -144,129 +145,109 @@ impl Clone for Arc { } /**************************************************************************** - * Mutex protected Arc (unsafe) + * Mutex protected Arc ****************************************************************************/ #[doc(hidden)] -struct MutexArcInner { priv lock: Mutex, priv failed: bool, priv data: T } +struct MutexArcInner { + /// The lock. + lock: mutex::Mutex, + /// True if this mutex has been poisoned. + poisoned: bool, + /// The locked data. + data: T, +} -/// An Arc with mutable data protected by a blocking mutex. +/// Shared mutable data protected by a blocking mutex. This allows you to +/// share data between threads and write to it safely without data races. +/// +/// Beware that cycles of `MutexArc` objects will leak. You must break cycles +/// manually. +/// +/// If a task fails while the lock is held, the `MutexArc` will become +/// poisoned and any further attempts to lock it will fail. #[no_freeze] -pub struct MutexArc { priv x: UnsafeArc> } +pub struct MutexArc { + priv arc: UnsafeArc>, +} +/// A guard that represents a locked `MutexArc`. +pub struct MutexArcGuard<'a,T> { + priv mutex_arc: &'a MutexArc, +} impl Clone for MutexArc { - /// Duplicate a mutex-protected Arc. See arc::clone for more details. + /// Duplicates a mutex-protected Arc by incrementing the reference count. #[inline] fn clone(&self) -> MutexArc { - // NB: Cloning the underlying mutex is not necessary. Its reference - // count would be exactly the same as the shared state's. - MutexArc { x: self.x.clone() } + MutexArc { + arc: self.arc.clone(), + } } } impl MutexArc { - /// Create a mutex-protected Arc with the supplied data. - pub fn new(user_data: T) -> MutexArc { - MutexArc::new_with_condvars(user_data, 1) - } - - /** - * Create a mutex-protected Arc with the supplied data and a specified number - * of condvars (as sync::Mutex::new_with_condvars). - */ - pub fn new_with_condvars(user_data: T, num_condvars: uint) -> MutexArc { - let data = MutexArcInner { - lock: Mutex::new_with_condvars(num_condvars), - failed: false, data: user_data - }; - MutexArc { x: UnsafeArc::new(data) } + /// Creates mutex-protected shared data from the supplied value. + pub fn new(value: T) -> MutexArc { + unsafe { + MutexArc { + arc: UnsafeArc::new(MutexArcInner { + lock: mutex::Mutex::new(), + poisoned: false, + data: value, + }) + } + } } - /** - * Access the underlying mutable data with mutual exclusion from other - * tasks. The argument closure will be run with the mutex locked; all - * other tasks wishing to access the data will block until the closure - * finishes running. - * - * The reason this function is 'unsafe' is because it is possible to - * construct a circular reference among multiple Arcs by mutating the - * underlying data. This creates potential for deadlock, but worse, this - * will guarantee a memory leak of all involved Arcs. Using MutexArcs - * inside of other Arcs is safe in absence of circular references. - * - * If you wish to nest MutexArcs, one strategy for ensuring safety at - * runtime is to add a "nesting level counter" inside the stored data, and - * when traversing the arcs, assert that they monotonically decrease. - * - * # Failure - * - * Failing while inside the Arc will unlock the Arc while unwinding, so - * that other tasks won't block forever. It will also poison the Arc: - * any tasks that subsequently try to access it (including those already - * blocked on the mutex) will also fail immediately. - */ + /// Takes the lock and hands out a scoped mutable reference. #[inline] - pub unsafe fn unsafe_access(&self, blk: |x: &mut T| -> U) -> U { - let state = self.x.get(); - // Borrowck would complain about this if the function were - // not already unsafe. See borrow_rwlock, far below. - (&(*state).lock).lock(|| { - check_poison(true, (*state).failed); - let _z = PoisonOnFail::new(&mut (*state).failed); - blk(&mut (*state).data) - }) + pub fn lock<'a>(&'a self) -> MutexArcGuard<'a,T> { + unsafe { + (*self.arc.get()).lock.lock(); + if (*self.arc.get()).poisoned { + self.die_from_poison() + } + } + MutexArcGuard { + mutex_arc: self, + } } - /// As unsafe_access(), but with a condvar, as sync::mutex.lock_cond(). - #[inline] - pub unsafe fn unsafe_access_cond(&self, - blk: |x: &mut T, c: &Condvar| -> U) - -> U { - let state = self.x.get(); - (&(*state).lock).lock_cond(|cond| { - check_poison(true, (*state).failed); - let _z = PoisonOnFail::new(&mut (*state).failed); - blk(&mut (*state).data, - &Condvar {is_mutex: true, - failed: &(*state).failed, - cond: cond }) - }) + /// Called when the Arc is poisoned. This is out of line to guide + /// inlining. + fn die_from_poison(&self) -> ! { + (*self.arc.get()).lock.unlock(); + fail!("Arc was poisoned; another task failed inside!") } } -impl MutexArc { - - /** - * As unsafe_access. - * - * The difference between access and unsafe_access is that the former - * forbids mutexes to be nested. While unsafe_access can be used on - * MutexArcs without freezable interiors, this safe version of access - * requires the Freeze bound, which prohibits access on MutexArcs which - * might contain nested MutexArcs inside. - * - * The purpose of this is to offer a safe implementation of MutexArc to be - * used instead of RWArc in cases where no readers are needed and slightly - * better performance is required. - * - * Both methods have the same failure behaviour as unsafe_access and - * unsafe_access_cond. - */ - #[inline] - pub fn access(&self, blk: |x: &mut T| -> U) -> U { - unsafe { self.unsafe_access(blk) } +impl<'a,T:Send> MutexArcGuard<'a,T> { + /// Returns a reference to the value inside. + pub fn get<'b>(&'b mut self) -> &'b mut T { + unsafe { + &mut (*self.mutex_arc.arc.get()).data + } } +} - /// As unsafe_access_cond but safe and Freeze. - #[inline] - pub fn access_cond(&self, - blk: |x: &mut T, c: &Condvar| -> U) - -> U { - unsafe { self.unsafe_access_cond(blk) } +#[unsafe_destructor] +impl<'a,T:Send> Drop for MutexArcGuard<'a,T> { + fn drop(&mut self) { + unsafe { + if task::failing() { + (*self.mutex_arc.arc.get()).poisoned = true + } + (*self.mutex_arc.arc.get()).lock.unlock() + } } } + +/**************************************************************************** + * R/W lock protected Arc + ****************************************************************************/ + // Common code for {mutex.access,rwlock.write}{,_cond}. #[inline] #[doc(hidden)] @@ -280,7 +261,6 @@ fn check_poison(is_mutex: bool, failed: bool) { } } -#[doc(hidden)] struct PoisonOnFail { flag: *mut bool, failed: bool, @@ -307,10 +287,6 @@ impl PoisonOnFail { } } -/**************************************************************************** - * R/W lock protected Arc - ****************************************************************************/ - #[doc(hidden)] struct RWArcInner { priv lock: RWLock, priv failed: bool, priv data: T } /** @@ -582,64 +558,16 @@ mod tests { info!("{:?}", arc_v); } - #[test] - fn test_mutex_arc_condvar() { - let arc = ~MutexArc::new(false); - let arc2 = ~arc.clone(); - let (p,c) = Chan::new(); - do task::spawn { - // wait until parent gets in - p.recv(); - arc2.access_cond(|state, cond| { - *state = true; - cond.signal(); - }) - } - - arc.access_cond(|state, cond| { - c.send(()); - assert!(!*state); - while !*state { - cond.wait(); - } - }) - } - - #[test] #[should_fail] - fn test_arc_condvar_poison() { - let arc = ~MutexArc::new(1); - let arc2 = ~arc.clone(); - let (p, c) = Chan::new(); - - do spawn { - let _ = p.recv(); - arc2.access_cond(|one, cond| { - cond.signal(); - // Parent should fail when it wakes up. - assert_eq!(*one, 0); - }) - } - - arc.access_cond(|one, cond| { - c.send(()); - while *one == 1 { - cond.wait(); - } - }) - } - #[test] #[should_fail] fn test_mutex_arc_poison() { let arc = ~MutexArc::new(1); let arc2 = ~arc.clone(); do task::try || { - arc2.access(|one| { - assert_eq!(*one, 2); - }) + let lock = arc2.lock(); + assert_eq!(*lock.get(), 2); }; - arc.access(|one| { - assert_eq!(*one, 1); - }) + let lock = arc.lock(); + assert_eq!(*lock.get(), 1); } #[test] @@ -648,36 +576,15 @@ mod tests { // Tests nested mutexes and access // to underlaying data. let arc = ~MutexArc::new(1); - let arc2 = ~MutexArc::new(*arc); + let arc2 = ~MutexArc::new((*arc).clone()); do task::spawn || { - (*arc2).unsafe_access(|mutex| { - (*mutex).access(|one| { - assert!(*one == 1); - }) - }) + let lock2 = arc2.lock(); + let lock = arc.lock(); + assert!(*lock.get() == 1) }; } } - #[test] - fn test_mutex_arc_access_in_unwind() { - let arc = MutexArc::new(1i); - let arc2 = arc.clone(); - task::try::<()>(proc() { - struct Unwinder { - i: MutexArc - } - impl Drop for Unwinder { - fn drop(&mut self) { - self.i.access(|num| *num += 1); - } - } - let _u = Unwinder { i: arc2 }; - fail!(); - }); - assert_eq!(2, arc.access(|n| *n)); - } - #[test] #[should_fail] fn test_rw_arc_poison_wr() { let arc = RWArc::new(1); diff --git a/src/test/bench/msgsend-ring-mutex-arcs.rs b/src/test/bench/msgsend-ring-mutex-arcs.rs deleted file mode 100644 index b148df248334d..0000000000000 --- a/src/test/bench/msgsend-ring-mutex-arcs.rs +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright 2012 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -// This test creates a bunch of tasks that simultaneously send to each -// other in a ring. The messages should all be basically -// independent. -// This is like msgsend-ring-pipes but adapted to use Arcs. - -// This also serves as a pipes test, because Arcs are implemented with pipes. - -extern mod extra; - -use extra::arc; -use extra::future::Future; -use extra::time; -use std::os; -use std::uint; - -// A poor man's pipe. -type pipe = arc::MutexArc<~[uint]>; - -fn send(p: &pipe, msg: uint) { - unsafe { - p.access_cond(|state, cond| { - state.push(msg); - cond.signal(); - }) - } -} -fn recv(p: &pipe) -> uint { - unsafe { - p.access_cond(|state, cond| { - while state.is_empty() { - cond.wait(); - } - state.pop() - }) - } -} - -fn init() -> (pipe,pipe) { - let m = arc::MutexArc::new(~[]); - ((&m).clone(), m) -} - - -fn thread_ring(i: uint, count: uint, num_chan: pipe, num_port: pipe) { - let mut num_chan = Some(num_chan); - let mut num_port = Some(num_port); - // Send/Receive lots of messages. - for j in range(0u, count) { - //error!("task %?, iter %?", i, j); - let num_chan2 = num_chan.take_unwrap(); - let num_port2 = num_port.take_unwrap(); - send(&num_chan2, i * j); - num_chan = Some(num_chan2); - let _n = recv(&num_port2); - //log(error, _n); - num_port = Some(num_port2); - }; -} - -fn main() { - let args = os::args(); - let args = if os::getenv("RUST_BENCH").is_some() { - ~[~"", ~"100", ~"10000"] - } else if args.len() <= 1u { - ~[~"", ~"10", ~"100"] - } else { - args.clone() - }; - - let num_tasks = from_str::(args[1]).unwrap(); - let msg_per_task = from_str::(args[2]).unwrap(); - - let (mut num_chan, num_port) = init(); - - let start = time::precise_time_s(); - - // create the ring - let mut futures = ~[]; - - for i in range(1u, num_tasks) { - //error!("spawning %?", i); - let (new_chan, num_port) = init(); - let num_chan_2 = num_chan.clone(); - let new_future = do Future::spawn() { - thread_ring(i, msg_per_task, num_chan_2, num_port) - }; - futures.push(new_future); - num_chan = new_chan; - }; - - // do our iteration - thread_ring(0, msg_per_task, num_chan, num_port); - - // synchronize - for f in futures.mut_iter() { - f.get() - } - - let stop = time::precise_time_s(); - - // all done, report stats. - let num_msgs = num_tasks * msg_per_task; - let elapsed = (stop - start); - let rate = (num_msgs as f64) / elapsed; - - println!("Sent {} messages in {} seconds", num_msgs, elapsed); - println!(" {} messages / second", rate); - println!(" {} μs / message", 1000000. / rate); -} diff --git a/src/test/compile-fail/mutex-arc-nested.rs b/src/test/compile-fail/mutex-arc-nested.rs deleted file mode 100644 index 476b890efcd6b..0000000000000 --- a/src/test/compile-fail/mutex-arc-nested.rs +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -extern mod extra; - -use std::task; -use extra::arc::{MutexArc}; - -fn test_mutex_arc_nested() { - let arc = ~MutexArc::new(1); - let arc2 = ~MutexArc::new(*arc); - - do task::spawn || { - (*arc2).access(|mutex| { //~ ERROR instantiating a type parameter with an incompatible type - }) - }; -} - -fn main() {}