Skip to content

Commit 7a1d97e

Browse files
committed
Enhance timers to create ports
In addition to being able to sleep the current task, timers should be able to create ports which get notified after a period of time. Closes #10014
1 parent baeed88 commit 7a1d97e

File tree

3 files changed

+170
-6
lines changed

3 files changed

+170
-6
lines changed

src/libstd/rt/io/timer.rs

+131-4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,37 @@
88
// option. This file may not be copied, modified, or distributed
99
// except according to those terms.
1010

11+
/*!
12+
13+
Synchronous Timers
14+
15+
This module exposes the functionality to create timers, block the current task,
16+
and create ports which will receive notifications after a period of time.
17+
18+
# Example
19+
20+
```rust
21+
22+
use std::rt::io::Timer;
23+
24+
let mut timer = Timer::new().unwrap();
25+
timer.sleep(10); // block the task for awhile
26+
27+
let timeout = timer.oneshot(10);
28+
// do some work
29+
timeout.recv(); // wait for the timeout to expire
30+
31+
let periodic = timer.periodic(10);
32+
loop {
33+
periodic.recv();
34+
// this loop is only executed once every 10ms
35+
}
36+
37+
```
38+
39+
*/
40+
41+
use comm::{Port, PortOne};
1142
use option::{Option, Some, None};
1243
use result::{Ok, Err};
1344
use rt::io::io_error;
@@ -25,9 +56,9 @@ pub fn sleep(msecs: u64) {
2556
}
2657

2758
impl Timer {
28-
2959
/// Creates a new timer which can be used to put the current task to sleep
30-
/// for a number of milliseconds.
60+
/// for a number of milliseconds, or to possibly create channels which will
61+
/// get notified after an amount of time has passed.
3162
pub fn new() -> Option<Timer> {
3263
do with_local_io |io| {
3364
match io.timer_init() {
@@ -42,20 +73,116 @@ impl Timer {
4273
}
4374
}
4475

76+
/// Blocks the current task for `msecs` milliseconds.
77+
///
78+
/// Note that this function will cause any other ports for this timer to be
79+
/// invalidated (the other end will be closed).
4580
pub fn sleep(&mut self, msecs: u64) {
4681
self.obj.sleep(msecs);
4782
}
83+
84+
/// Creates a oneshot port which will have a notification sent when `msecs`
85+
/// milliseconds has elapsed. This does *not* block the current task, but
86+
/// instead returns immediately.
87+
///
88+
/// Note that this invalidates any previous port which has been created by
89+
/// this timer, and that the returned port will be invalidated once the
90+
/// timer is destroyed (when it falls out of scope).
91+
pub fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
92+
self.obj.oneshot(msecs)
93+
}
94+
95+
/// Creates a port which will have a continuous stream of notifications
96+
/// being sent every `msecs` milliseconds. This does *not* block the
97+
/// current task, but instead returns immediately. The first notification
98+
/// will not be received immediately, but rather after `msec` milliseconds
99+
/// have passed.
100+
///
101+
/// Note that this invalidates any previous port which has been created by
102+
/// this timer, and that the returned port will be invalidated once the
103+
/// timer is destroyed (when it falls out of scope).
104+
pub fn periodic(&mut self, msecs: u64) -> Port<()> {
105+
self.obj.period(msecs)
106+
}
48107
}
49108

50109
#[cfg(test)]
51110
mod test {
52111
use super::*;
53112
use rt::test::*;
113+
use cell::Cell;
114+
use task;
115+
54116
#[test]
55117
fn test_io_timer_sleep_simple() {
56118
do run_in_mt_newsched_task {
57-
let timer = Timer::new();
58-
do timer.map |mut t| { t.sleep(1) };
119+
let mut timer = Timer::new().unwrap();
120+
timer.sleep(1);
121+
}
122+
}
123+
124+
#[test]
125+
fn test_io_timer_sleep_oneshot() {
126+
do run_in_mt_newsched_task {
127+
let mut timer = Timer::new().unwrap();
128+
timer.oneshot(1).recv();
129+
}
130+
}
131+
132+
#[test]
133+
fn test_io_timer_sleep_oneshot_forget() {
134+
do run_in_mt_newsched_task {
135+
let mut timer = Timer::new().unwrap();
136+
timer.oneshot(100000000000);
137+
}
138+
}
139+
140+
#[test]
141+
fn oneshot_twice() {
142+
do run_in_mt_newsched_task {
143+
let mut timer = Timer::new().unwrap();
144+
let port1 = timer.oneshot(100000000000);
145+
let port = timer.oneshot(1);
146+
port.recv();
147+
let port1 = Cell::new(port1);
148+
let ret = do task::try {
149+
port1.take().recv();
150+
};
151+
assert!(ret.is_err());
152+
}
153+
}
154+
155+
#[test]
156+
fn test_io_timer_oneshot_then_sleep() {
157+
do run_in_mt_newsched_task {
158+
let mut timer = Timer::new().unwrap();
159+
let port = timer.oneshot(100000000000);
160+
timer.sleep(1); // this should invalidate the port
161+
162+
let port = Cell::new(port);
163+
let ret = do task::try {
164+
port.take().recv();
165+
};
166+
assert!(ret.is_err());
167+
}
168+
}
169+
170+
#[test]
171+
fn test_io_timer_sleep_periodic() {
172+
do run_in_mt_newsched_task {
173+
let mut timer = Timer::new().unwrap();
174+
let port = timer.periodic(1);
175+
port.recv();
176+
port.recv();
177+
port.recv();
178+
}
179+
}
180+
181+
#[test]
182+
fn test_io_timer_sleep_periodic_forget() {
183+
do run_in_mt_newsched_task {
184+
let mut timer = Timer::new().unwrap();
185+
timer.periodic(100000000000);
59186
}
60187
}
61188

src/libstd/rt/rtio.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
use libc;
1212
use option::*;
1313
use result::*;
14-
use comm::SharedChan;
14+
use comm::{SharedChan, PortOne, Port};
1515
use libc::c_int;
1616
use c_str::CString;
1717

@@ -162,6 +162,8 @@ pub trait RtioUdpSocket : RtioSocket {
162162

163163
pub trait RtioTimer {
164164
fn sleep(&mut self, msecs: u64);
165+
fn oneshot(&mut self, msecs: u64) -> PortOne<()>;
166+
fn period(&mut self, msecs: u64) -> Port<()>;
165167
}
166168

167169
pub trait RtioFileStream {

src/libstd/rt/uv/uvio.rs

+36-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use cast::transmute;
1313
use cast;
1414
use cell::Cell;
1515
use clone::Clone;
16-
use comm::{SendDeferred, SharedChan};
16+
use comm::{SendDeferred, SharedChan, Port, PortOne, GenericChan};
1717
use libc::{c_int, c_uint, c_void, pid_t};
1818
use ops::Drop;
1919
use option::*;
@@ -1468,6 +1468,41 @@ impl RtioTimer for UvTimer {
14681468
self_.watcher.stop();
14691469
}
14701470
}
1471+
1472+
fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
1473+
use comm::oneshot;
1474+
1475+
let (port, chan) = oneshot();
1476+
let chan = Cell::new(chan);
1477+
do self.home_for_io |self_| {
1478+
let chan = Cell::new(chan.take());
1479+
do self_.watcher.start(msecs, 0) |_, status| {
1480+
assert!(status.is_none());
1481+
assert!(!chan.is_empty());
1482+
chan.take().send_deferred(());
1483+
}
1484+
}
1485+
1486+
return port;
1487+
}
1488+
1489+
fn period(&mut self, msecs: u64) -> Port<()> {
1490+
use comm::stream;
1491+
1492+
let (port, chan) = stream();
1493+
let chan = Cell::new(chan);
1494+
do self.home_for_io |self_| {
1495+
let chan = Cell::new(chan.take());
1496+
do self_.watcher.start(msecs, msecs) |_, status| {
1497+
assert!(status.is_none());
1498+
do chan.with_ref |chan| {
1499+
chan.send_deferred(());
1500+
}
1501+
}
1502+
}
1503+
1504+
return port;
1505+
}
14711506
}
14721507

14731508
pub struct UvFileStream {

0 commit comments

Comments
 (0)