Skip to content

Commit 9f287c2

Browse files
committed
Refcount tasks in packets to avoid races.
Revert "Once again, revert "Use pipes in compiletest"" Fixes #3098
1 parent 86947e4 commit 9f287c2

File tree

5 files changed

+61
-39
lines changed

5 files changed

+61
-39
lines changed

src/compiletest/compiletest.rs

-5
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,6 @@ import task;
88
import core::result;
99
import result::{ok, err};
1010

11-
import comm::port;
12-
import comm::chan;
13-
import comm::send;
14-
import comm::recv;
15-
1611
import common::config;
1712
import common::mode_run_pass;
1813
import common::mode_run_fail;

src/compiletest/procsrv.rs

+15-12
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ import run::spawn_process;
22
import io::{writer_util, reader_util};
33
import libc::{c_int, pid_t};
44

5+
import pipes::chan;
6+
57
export run;
68

79
#[cfg(target_os = "win32")]
@@ -58,29 +60,30 @@ fn run(lib_path: ~str,
5860

5961

6062
writeclose(pipe_in.out, input);
61-
let p = comm::port();
62-
let ch = comm::chan(p);
63+
let p = pipes::port_set();
64+
let ch = p.chan();
6365
do task::spawn_sched(task::single_threaded) {
6466
let errput = readclose(pipe_err.in);
65-
comm::send(ch, (2, errput));
67+
ch.send((2, errput));
6668
}
69+
let ch = p.chan();
6770
do task::spawn_sched(task::single_threaded) {
6871
let output = readclose(pipe_out.in);
69-
comm::send(ch, (1, output));
72+
ch.send((1, output));
7073
}
7174
let status = run::waitpid(pid);
7275
let mut errs = ~"";
7376
let mut outs = ~"";
7477
let mut count = 2;
7578
while count > 0 {
76-
let stream = comm::recv(p);
77-
alt check stream {
78-
(1, s) => {
79-
outs = s;
80-
}
81-
(2, s) => {
82-
errs = s;
83-
}
79+
alt p.recv() {
80+
(1, s) => {
81+
outs = s;
82+
}
83+
(2, s) => {
84+
errs = s;
85+
}
86+
_ { fail }
8487
};
8588
count -= 1;
8689
};

src/libcore/pipes.rs

+46-17
Original file line numberDiff line numberDiff line change
@@ -113,27 +113,29 @@ type buffer<T: send> = {
113113

114114
struct packet_header {
115115
let mut state: state;
116-
let mut blocked_task: option<*rust_task>;
116+
let mut blocked_task: *rust_task;
117117

118118
// This is a reinterpret_cast of a ~buffer, that can also be cast
119119
// to a buffer_header if need be.
120120
let mut buffer: *libc::c_void;
121121

122122
new() {
123123
self.state = empty;
124-
self.blocked_task = none;
124+
self.blocked_task = ptr::null();
125125
self.buffer = ptr::null();
126126
}
127127

128128
// Returns the old state.
129129
unsafe fn mark_blocked(this: *rust_task) -> state {
130-
self.blocked_task = some(this);
130+
rustrt::rust_task_ref(this);
131+
let old_task = swap_task(self.blocked_task, this);
132+
assert old_task.is_null();
131133
swap_state_acq(self.state, blocked)
132134
}
133135

134136
unsafe fn unblock() {
135-
assert self.state != blocked || self.blocked_task != none;
136-
self.blocked_task = none;
137+
let old_task = swap_task(self.blocked_task, ptr::null());
138+
if !old_task.is_null() { rustrt::rust_task_deref(old_task) }
137139
alt swap_state_acq(self.state, empty) {
138140
empty | blocked => (),
139141
terminated => self.state = terminated,
@@ -240,12 +242,26 @@ fn atomic_sub_rel(&dst: int, src: int) -> int {
240242
rusti::atomic_sub_rel(dst, src)
241243
}
242244

245+
#[doc(hidden)]
246+
fn swap_task(&dst: *rust_task, src: *rust_task) -> *rust_task {
247+
// It might be worth making both acquire and release versions of
248+
// this.
249+
unsafe {
250+
reinterpret_cast(rusti::atomic_xchng(
251+
*(ptr::mut_addr_of(dst) as *mut int),
252+
src as int))
253+
}
254+
}
255+
243256
#[doc(hidden)]
244257
type rust_task = libc::c_void;
245258

246259
extern mod rustrt {
247260
#[rust_stack]
248261
fn rust_get_task() -> *rust_task;
262+
#[rust_stack]
263+
fn rust_task_ref(task: *rust_task);
264+
fn rust_task_deref(task: *rust_task);
249265

250266
#[rust_stack]
251267
fn task_clear_event_reject(task: *rust_task);
@@ -334,10 +350,11 @@ fn send<T: send, Tbuffer: send>(-p: send_packet_buffered<T, Tbuffer>,
334350
full => fail ~"duplicate send",
335351
blocked => {
336352
debug!{"waking up task for %?", p_};
337-
alt p.header.blocked_task {
338-
some(task) => rustrt::task_signal_event(
339-
task, ptr::addr_of(p.header) as *libc::c_void),
340-
none => debug!{"just kidding!"}
353+
let old_task = swap_task(p.header.blocked_task, ptr::null());
354+
if !old_task.is_null() {
355+
rustrt::task_signal_event(
356+
old_task, ptr::addr_of(p.header) as *libc::c_void);
357+
rustrt::rust_task_deref(old_task);
341358
}
342359

343360
// The receiver will eventually clean this up.
@@ -372,7 +389,9 @@ fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
372389
let p = unsafe { &*p_ };
373390
let this = rustrt::rust_get_task();
374391
rustrt::task_clear_event_reject(this);
375-
p.header.blocked_task = some(this);
392+
rustrt::rust_task_ref(this);
393+
let old_task = swap_task(p.header.blocked_task, this);
394+
assert old_task.is_null();
376395
let mut first = true;
377396
let mut count = SPIN_COUNT;
378397
loop {
@@ -402,14 +421,22 @@ fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
402421
full => {
403422
let mut payload = none;
404423
payload <-> p.payload;
405-
p.header.blocked_task = none;
424+
let old_task = swap_task(p.header.blocked_task, ptr::null());
425+
if !old_task.is_null() {
426+
rustrt::rust_task_deref(old_task);
427+
}
406428
p.header.state = empty;
407429
return some(option::unwrap(payload))
408430
}
409431
terminated => {
410432
// This assert detects when we've accidentally unsafely
411433
// casted too big of a number to a state.
412434
assert old_state == terminated;
435+
436+
let old_task = swap_task(p.header.blocked_task, ptr::null());
437+
if !old_task.is_null() {
438+
rustrt::rust_task_deref(old_task);
439+
}
413440
return none;
414441
}
415442
}
@@ -437,17 +464,18 @@ fn sender_terminate<T: send>(p: *packet<T>) {
437464
let p = unsafe { &*p };
438465
alt swap_state_rel(p.header.state, terminated) {
439466
empty => {
467+
assert p.header.blocked_task.is_null();
440468
// The receiver will eventually clean up.
441469
//unsafe { forget(p) }
442470
}
443471
blocked => {
444472
// wake up the target
445-
alt p.header.blocked_task {
446-
some(target) =>
473+
let old_task = swap_task(p.header.blocked_task, ptr::null());
474+
if !old_task.is_null() {
447475
rustrt::task_signal_event(
448-
target,
449-
ptr::addr_of(p.header) as *libc::c_void),
450-
none => { debug!{"receiver is already shutting down"} }
476+
old_task,
477+
ptr::addr_of(p.header) as *libc::c_void);
478+
rustrt::rust_task_deref(old_task);
451479
}
452480
// The receiver will eventually clean up.
453481
//unsafe { forget(p) }
@@ -457,6 +485,7 @@ fn sender_terminate<T: send>(p: *packet<T>) {
457485
fail ~"you dun goofed"
458486
}
459487
terminated => {
488+
assert p.header.blocked_task.is_null();
460489
// I have to clean up, use drop_glue
461490
}
462491
}
@@ -465,7 +494,7 @@ fn sender_terminate<T: send>(p: *packet<T>) {
465494
#[doc(hidden)]
466495
fn receiver_terminate<T: send>(p: *packet<T>) {
467496
let p = unsafe { &*p };
468-
assert p.header.blocked_task == none;
497+
assert p.header.blocked_task.is_null();
469498
alt swap_state_rel(p.header.state, terminated) {
470499
empty => {
471500
// the sender will clean up

src/rt/rust_task.cpp

-3
Original file line numberDiff line numberDiff line change
@@ -678,9 +678,6 @@ MUST_CHECK bool rust_task::wait_event(void **result) {
678678

679679
void
680680
rust_task::signal_event(void *event) {
681-
assert(task_state_blocked == state ||
682-
task_state_running == state);
683-
684681
scoped_lock with(lifecycle_lock);
685682

686683
this->event = event;

src/test/bench/shootout-pfib.rs

-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
// -*- rust -*-
22
// xfail-pretty
33

4-
// xfail-test
5-
64
/*
75
A parallel version of fibonacci numbers.
86

0 commit comments

Comments
 (0)