Skip to content

Commit 2eae474

Browse files
committed
Auto merge of #2197 - RalfJung:round-robin, r=RalfJung
make Miri's scheduler proper round-robin When thread N blocks or yields, we activate thread N+1 next, rather than always activating thread 0. This should guarantee that as long as all threads regularly yield, each thread eventually takes a step again. Fixes the "multiple loops that yield playing ping-pong" part of #1388. `@cbeuw` I hope this doesn't screw up the scheduler-dependent tests you are adding in your PR.
2 parents 0fbe710 + 34b359b commit 2eae474

File tree

3 files changed

+102
-8
lines changed

3 files changed

+102
-8
lines changed

src/thread.rs

+18-8
Original file line numberDiff line numberDiff line change
@@ -518,16 +518,26 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
518518
return Ok(SchedulingAction::ExecuteTimeoutCallback);
519519
}
520520
// No callbacks scheduled, pick a regular thread to execute.
521-
// We need to pick a new thread for execution.
522-
for (id, thread) in self.threads.iter_enumerated() {
521+
// The active thread blocked or yielded. So we go search for another enabled thread.
522+
// Curcially, we start searching at the current active thread ID, rather than at 0, since we
523+
// want to avoid always scheduling threads 0 and 1 without ever making progress in thread 2.
524+
//
525+
// `skip(N)` means we start iterating at thread N, so we skip 1 more to start just *after*
526+
// the active thread. Then after that we look at `take(N)`, i.e., the threads *before* the
527+
// active thread.
528+
let threads = self
529+
.threads
530+
.iter_enumerated()
531+
.skip(self.active_thread.index() + 1)
532+
.chain(self.threads.iter_enumerated().take(self.active_thread.index()));
533+
for (id, thread) in threads {
534+
debug_assert_ne!(self.active_thread, id);
523535
if thread.state == ThreadState::Enabled {
524-
if !self.yield_active_thread || id != self.active_thread {
525-
self.active_thread = id;
526-
if let Some(data_race) = data_race {
527-
data_race.thread_set_active(self.active_thread);
528-
}
529-
break;
536+
self.active_thread = id;
537+
if let Some(data_race) = data_race {
538+
data_race.thread_set_active(self.active_thread);
530539
}
540+
break;
531541
}
532542
}
533543
self.yield_active_thread = false;

tests/pass/concurrency/spin_loops.rs

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// ignore-windows: Concurrency on Windows is not supported yet.
2+
3+
use std::thread;
4+
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5+
use std::sync::mpsc;
6+
use std::cell::Cell;
7+
8+
/// When a thread yields, Miri's scheduler used to pick the thread with the lowest ID
9+
/// that can run. IDs are assigned in thread creation order.
10+
/// This means we could make 2 threads infinitely ping-pong with each other while
11+
/// really there is a 3rd thread that we should schedule to make progress.
12+
fn two_player_ping_pong() {
13+
static FLAG: AtomicUsize = AtomicUsize::new(0);
14+
15+
let waiter1 = thread::spawn(|| {
16+
while FLAG.load(Ordering::Acquire) == 0 {
17+
// spin and wait
18+
thread::yield_now();
19+
}
20+
});
21+
let waiter2 = thread::spawn(|| {
22+
while FLAG.load(Ordering::Acquire) == 0 {
23+
// spin and wait
24+
thread::yield_now();
25+
}
26+
});
27+
let progress = thread::spawn(|| {
28+
FLAG.store(1, Ordering::Release);
29+
});
30+
// The first `join` blocks the main thread and thus takes it out of the equation.
31+
waiter1.join().unwrap();
32+
waiter2.join().unwrap();
33+
progress.join().unwrap();
34+
}
35+
36+
/// Based on a test by @jethrogb.
37+
fn launcher() {
38+
static THREAD2_LAUNCHED: AtomicBool = AtomicBool::new(false);
39+
40+
for _ in 0..10 {
41+
let (tx, rx) = mpsc::sync_channel(0);
42+
THREAD2_LAUNCHED.store(false, Ordering::SeqCst);
43+
44+
let jh = thread::spawn(move || {
45+
struct RecvOnDrop(Cell<Option<mpsc::Receiver<()>>>);
46+
47+
impl Drop for RecvOnDrop {
48+
fn drop(&mut self) {
49+
let rx = self.0.take().unwrap();
50+
while !THREAD2_LAUNCHED.load(Ordering::SeqCst) {
51+
thread::yield_now();
52+
}
53+
rx.recv().unwrap();
54+
}
55+
}
56+
57+
let tl_rx: RecvOnDrop = RecvOnDrop(Cell::new(None));
58+
tl_rx.0.set(Some(rx));
59+
});
60+
61+
let tx_clone = tx.clone();
62+
let jh2 = thread::spawn(move || {
63+
THREAD2_LAUNCHED.store(true, Ordering::SeqCst);
64+
jh.join().unwrap();
65+
tx_clone.send(()).expect_err(
66+
"Expecting channel to be closed because thread 1 TLS destructors must've run",
67+
);
68+
});
69+
70+
while !THREAD2_LAUNCHED.load(Ordering::SeqCst) {
71+
thread::yield_now();
72+
}
73+
thread::yield_now();
74+
tx.send(()).expect("Expecting channel to be live because thread 2 must block on join");
75+
jh2.join().unwrap();
76+
}
77+
}
78+
79+
fn main() {
80+
two_player_ping_pong();
81+
launcher();
82+
}
+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
warning: thread support is experimental and incomplete: weak memory effects are not emulated.
2+

0 commit comments

Comments
 (0)