@@ -24,252 +24,6 @@ use std::rt::task::{Task, BlockedTask, TaskOpts};
24
24
use std:: rt:: thread:: Thread ;
25
25
use std:: rt;
26
26
27
- use std:: task:: { TaskBuilder , Spawner } ;
28
-
29
- /// Creates a new Task which is ready to execute as a 1:1 task.
30
- pub fn new ( stack_bounds : ( uint , uint ) , stack_guard : uint ) -> Box < Task > {
31
- let mut task = box Task :: new ( ) ;
32
- let mut ops = ops ( ) ;
33
- ops. stack_bounds = stack_bounds;
34
- ops. stack_guard = stack_guard;
35
- task. put_runtime ( ops) ;
36
- return task;
37
- }
38
-
39
- fn ops ( ) -> Box < Ops > {
40
- box Ops {
41
- lock : unsafe { NativeMutex :: new ( ) } ,
42
- awoken : false ,
43
- // these *should* get overwritten
44
- stack_bounds : ( 0 , 0 ) ,
45
- stack_guard : 0
46
- }
47
- }
48
-
49
- /// A spawner for native tasks
50
- pub struct NativeSpawner ;
51
-
52
- impl Spawner for NativeSpawner {
53
- fn spawn ( self , opts : TaskOpts , f : proc ( ) : Send ) {
54
- let TaskOpts { name, stack_size, on_exit } = opts;
55
-
56
- let mut task = box Task :: new ( ) ;
57
- task. name = name;
58
- task. death . on_exit = on_exit;
59
-
60
- let stack = stack_size. unwrap_or ( rt:: min_stack ( ) ) ;
61
- let task = task;
62
- let ops = ops ( ) ;
63
-
64
- // Note that this increment must happen *before* the spawn in order to
65
- // guarantee that if this task exits it will always end up waiting for
66
- // the spawned task to exit.
67
- let token = bookkeeping:: increment ( ) ;
68
-
69
- // Spawning a new OS thread guarantees that __morestack will never get
70
- // triggered, but we must manually set up the actual stack bounds once
71
- // this function starts executing. This raises the lower limit by a bit
72
- // because by the time that this function is executing we've already
73
- // consumed at least a little bit of stack (we don't know the exact byte
74
- // address at which our stack started).
75
- Thread :: spawn_stack ( stack, proc ( ) {
76
- let something_around_the_top_of_the_stack = 1 ;
77
- let addr = & something_around_the_top_of_the_stack as * const int ;
78
- let my_stack = addr as uint ;
79
- unsafe {
80
- stack:: record_os_managed_stack_bounds ( my_stack - stack + 1024 ,
81
- my_stack) ;
82
- }
83
- let mut ops = ops;
84
- ops. stack_guard = rt:: thread:: current_guard_page ( ) ;
85
- ops. stack_bounds = ( my_stack - stack + 1024 , my_stack) ;
86
-
87
- let mut f = Some ( f) ;
88
- let mut task = task;
89
- task. put_runtime ( ops) ;
90
- drop ( task. run ( || { f. take ( ) . unwrap ( ) ( ) } ) . destroy ( ) ) ;
91
- drop ( token) ;
92
- } )
93
- }
94
- }
95
-
96
- /// An extension trait adding a `native` configuration method to `TaskBuilder`.
97
- pub trait NativeTaskBuilder {
98
- fn native ( self ) -> TaskBuilder < NativeSpawner > ;
99
- }
100
-
101
- impl < S : Spawner > NativeTaskBuilder for TaskBuilder < S > {
102
- fn native ( self ) -> TaskBuilder < NativeSpawner > {
103
- self . spawner ( NativeSpawner )
104
- }
105
- }
106
-
107
- // This structure is the glue between channels and the 1:1 scheduling mode. This
108
- // structure is allocated once per task.
109
- struct Ops {
110
- lock : NativeMutex , // native synchronization
111
- awoken : bool , // used to prevent spurious wakeups
112
-
113
- // This field holds the known bounds of the stack in (lo, hi) form. Not all
114
- // native tasks necessarily know their precise bounds, hence this is
115
- // optional.
116
- stack_bounds : ( uint , uint ) ,
117
-
118
- stack_guard : uint
119
- }
120
-
121
- impl rt:: Runtime for Ops {
122
- fn yield_now ( self : Box < Ops > , mut cur_task : Box < Task > ) {
123
- // put the task back in TLS and then invoke the OS thread yield
124
- cur_task. put_runtime ( self ) ;
125
- Local :: put ( cur_task) ;
126
- Thread :: yield_now ( ) ;
127
- }
128
-
129
- fn maybe_yield ( self : Box < Ops > , mut cur_task : Box < Task > ) {
130
- // just put the task back in TLS, on OS threads we never need to
131
- // opportunistically yield b/c the OS will do that for us (preemption)
132
- cur_task. put_runtime ( self ) ;
133
- Local :: put ( cur_task) ;
134
- }
135
-
136
- fn wrap ( self : Box < Ops > ) -> Box < Any +' static > {
137
- self as Box < Any +' static >
138
- }
139
-
140
- fn stack_bounds ( & self ) -> ( uint , uint ) { self . stack_bounds }
141
-
142
- fn stack_guard ( & self ) -> Option < uint > {
143
- if self . stack_guard != 0 {
144
- Some ( self . stack_guard )
145
- } else {
146
- None
147
- }
148
- }
149
-
150
- fn can_block ( & self ) -> bool { true }
151
-
152
- // This function gets a little interesting. There are a few safety and
153
- // ownership violations going on here, but this is all done in the name of
154
- // shared state. Additionally, all of the violations are protected with a
155
- // mutex, so in theory there are no races.
156
- //
157
- // The first thing we need to do is to get a pointer to the task's internal
158
- // mutex. This address will not be changing (because the task is allocated
159
- // on the heap). We must have this handle separately because the task will
160
- // have its ownership transferred to the given closure. We're guaranteed,
161
- // however, that this memory will remain valid because *this* is the current
162
- // task's execution thread.
163
- //
164
- // The next weird part is where ownership of the task actually goes. We
165
- // relinquish it to the `f` blocking function, but upon returning this
166
- // function needs to replace the task back in TLS. There is no communication
167
- // from the wakeup thread back to this thread about the task pointer, and
168
- // there's really no need to. In order to get around this, we cast the task
169
- // to a `uint` which is then used at the end of this function to cast back
170
- // to a `Box<Task>` object. Naturally, this looks like it violates
171
- // ownership semantics in that there may be two `Box<Task>` objects.
172
- //
173
- // The fun part is that the wakeup half of this implementation knows to
174
- // "forget" the task on the other end. This means that the awakening half of
175
- // things silently relinquishes ownership back to this thread, but not in a
176
- // way that the compiler can understand. The task's memory is always valid
177
- // for both tasks because these operations are all done inside of a mutex.
178
- //
179
- // You'll also find that if blocking fails (the `f` function hands the
180
- // BlockedTask back to us), we will `mem::forget` the handles. The
181
- // reasoning for this is the same logic as above in that the task silently
182
- // transfers ownership via the `uint`, not through normal compiler
183
- // semantics.
184
- //
185
- // On a mildly unrelated note, it should also be pointed out that OS
186
- // condition variables are susceptible to spurious wakeups, which we need to
187
- // be ready for. In order to accommodate for this fact, we have an extra
188
- // `awoken` field which indicates whether we were actually woken up via some
189
- // invocation of `reawaken`. This flag is only ever accessed inside the
190
- // lock, so there's no need to make it atomic.
191
- fn deschedule ( mut self : Box < Ops > ,
192
- times : uint ,
193
- mut cur_task : Box < Task > ,
194
- f: |BlockedTask | -> Result < ( ) , BlockedTask > ) {
195
- let me = & mut * self as * mut Ops ;
196
- cur_task. put_runtime ( self ) ;
197
-
198
- unsafe {
199
- let cur_task_dupe = & mut * cur_task as * mut Task ;
200
- let task = BlockedTask :: block ( cur_task) ;
201
-
202
- if times == 1 {
203
- let guard = ( * me) . lock . lock ( ) ;
204
- ( * me) . awoken = false ;
205
- match f ( task) {
206
- Ok ( ( ) ) => {
207
- while !( * me) . awoken {
208
- guard. wait ( ) ;
209
- }
210
- }
211
- Err ( task) => { mem:: forget ( task. wake ( ) ) ; }
212
- }
213
- } else {
214
- let iter = task. make_selectable ( times) ;
215
- let guard = ( * me) . lock . lock ( ) ;
216
- ( * me) . awoken = false ;
217
-
218
- // Apply the given closure to all of the "selectable tasks",
219
- // bailing on the first one that produces an error. Note that
220
- // care must be taken such that when an error is occurred, we
221
- // may not own the task, so we may still have to wait for the
222
- // task to become available. In other words, if task.wake()
223
- // returns `None`, then someone else has ownership and we must
224
- // wait for their signal.
225
- match iter. map ( f) . filter_map ( |a| a. err ( ) ) . next ( ) {
226
- None => { }
227
- Some ( task) => {
228
- match task. wake ( ) {
229
- Some ( task) => {
230
- mem:: forget ( task) ;
231
- ( * me) . awoken = true ;
232
- }
233
- None => { }
234
- }
235
- }
236
- }
237
- while !( * me) . awoken {
238
- guard. wait ( ) ;
239
- }
240
- }
241
- // re-acquire ownership of the task
242
- cur_task = mem:: transmute ( cur_task_dupe) ;
243
- }
244
-
245
- // put the task back in TLS, and everything is as it once was.
246
- Local :: put ( cur_task) ;
247
- }
248
-
249
- // See the comments on `deschedule` for why the task is forgotten here, and
250
- // why it's valid to do so.
251
- fn reawaken ( mut self : Box < Ops > , mut to_wake : Box < Task > ) {
252
- unsafe {
253
- let me = & mut * self as * mut Ops ;
254
- to_wake. put_runtime ( self ) ;
255
- mem:: forget ( to_wake) ;
256
- let guard = ( * me) . lock . lock ( ) ;
257
- ( * me) . awoken = true ;
258
- guard. signal ( ) ;
259
- }
260
- }
261
-
262
- fn spawn_sibling ( self : Box < Ops > ,
263
- mut cur_task : Box < Task > ,
264
- opts : TaskOpts ,
265
- f : proc ( ) : Send ) {
266
- cur_task. put_runtime ( self ) ;
267
- Local :: put ( cur_task) ;
268
-
269
- NativeSpawner . spawn ( opts, f) ;
270
- }
271
- }
272
-
273
27
#[ cfg( test) ]
274
28
mod tests {
275
29
use std:: rt:: local:: Local ;
0 commit comments