diff --git a/src/libcore/priv.rs b/src/libcore/priv.rs index b8e0848dc8753..90fc9239e14db 100644 --- a/src/libcore/priv.rs +++ b/src/libcore/priv.rs @@ -1,6 +1,6 @@ #[doc(hidden)]; -export chan_from_global_ptr; +export chan_from_global_ptr, weaken_task; import compare_and_swap = rustrt::rust_compare_and_swap_ptr; @@ -32,8 +32,11 @@ unsafe fn chan_from_global_ptr( abort } + log(debug,"ENTERING chan_from_global_ptr, before is_prob_zero check"); let is_probably_zero = *global == 0u; + log(debug,"after is_prob_zero check"); if is_probably_zero { + log(debug,"is probably zero..."); // There's no global channel. We must make it let setup_po = comm::port(); @@ -51,14 +54,17 @@ unsafe fn chan_from_global_ptr( } }; + log(debug,"before setup recv.."); // This is the proposed global channel let ch = comm::recv(setup_po); // 0 is our sentinal value. It is not a valid channel assert unsafe::reinterpret_cast(ch) != 0u; // Install the channel + log(debug,"BEFORE COMPARE AND SWAP"); let swapped = compare_and_swap( global, 0u, unsafe::reinterpret_cast(ch)); + log(debug,#fmt("AFTER .. swapped? %?", swapped)); if swapped { // Success! @@ -70,6 +76,7 @@ unsafe fn chan_from_global_ptr( unsafe::reinterpret_cast(*global) } } else { + log(debug, "global != 0"); unsafe::reinterpret_cast(*global) } } diff --git a/src/libstd/std.rc b/src/libstd/std.rc index ab7128f4768a8..d1b578f3d8703 100644 --- a/src/libstd/std.rc +++ b/src/libstd/std.rc @@ -14,7 +14,7 @@ use core(vers = "0.2"); import core::*; export net, uv; -export c_vec, util; +export c_vec, util, timer; export bitv, deque, fun_treemap, list, map, smallintmap, sort, treemap, ufind; export rope, arena; export ebml, dbg, getopts, json, rand, sha1, term, time, prettyprint; @@ -28,12 +28,14 @@ mod net; mod uv; mod uv_ll; mod uv_hl; +mod uv_global_loop; // Utility modules mod c_vec; mod util; +mod timer; // Collections diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs new file mode 100644 index 0000000000000..b1d01424fe39b --- /dev/null +++ b/src/libstd/timer.rs @@ -0,0 +1,183 @@ +#[doc =" +Utilities that leverage libuv's `uv_timer_*` API +"]; + +import uv = uv; +export delayed_send, sleep, recv_timeout; + +#[doc = " +Wait for timeout period then send provided value over a channel + +This call returns immediately. Useful as the building block for a number +of higher-level timer functions. + +Is not guaranteed to wait for exactly the specified time, but will wait +for *at least* that period of time. + +# Arguments + +* msecs - a timeout period, in milliseconds, to wait +* ch - a channel of type T to send a `val` on +* val - a value of type T to send over the provided `ch` +"] +fn delayed_send(msecs: uint, ch: comm::chan, val: T) { + task::spawn() {|| + unsafe { + let timer_done_po = comm::port::<()>(); + let timer_done_ch = comm::chan(timer_done_po); + let timer_done_ch_ptr = ptr::addr_of(timer_done_ch); + let timer = uv::ll::timer_t(); + let timer_ptr = ptr::addr_of(timer); + let hl_loop = uv::global_loop::get(); + uv::hl::interact(hl_loop) {|loop_ptr| + uv::hl::ref(hl_loop, timer_ptr); + let init_result = uv::ll::timer_init(loop_ptr, timer_ptr); + if (init_result == 0i32) { + let start_result = uv::ll::timer_start( + timer_ptr, delayed_send_cb, msecs, 0u); + if (start_result == 0i32) { + uv::ll::set_data_for_uv_handle( + timer_ptr, + timer_done_ch_ptr as *libc::c_void); + } + else { + let error_msg = uv::ll::get_last_err_info(loop_ptr); + fail "timer::delayed_send() start failed: "+error_msg; + } + } + else { + let error_msg = uv::ll::get_last_err_info(loop_ptr); + fail "timer::delayed_send() init failed: "+error_msg; + } + }; + // delayed_send_cb has been processed by libuv + comm::recv(timer_done_po); + // notify the caller immediately + comm::send(ch, copy(val)); + // then clean up our handle + uv::hl::unref_and_close(hl_loop, timer_ptr, + delayed_send_close_cb); + // uv_close for this timer has been processed + comm::recv(timer_done_po); + } + }; +} + +#[doc = " +Blocks the current task for (at least) the specified time period. + +Is not guaranteed to sleep for exactly the specified time, but will sleep +for *at least* that period of time. + +# Arguments + +* msecs - an amount of time, in milliseconds, for the current task to block +"] +fn sleep(msecs: uint) { + let exit_po = comm::port::<()>(); + let exit_ch = comm::chan(exit_po); + delayed_send(msecs, exit_ch, ()); + comm::recv(exit_po); +} + +#[doc = " +Receive on a port for (up to) a specified time, then return an `option` + +This call will block to receive on the provided port for up to the specified +timeout. Depending on whether the provided port receives in that time period, +`recv_timeout` will return an `option` representing the result. + +# Arguments + +* msecs - an mount of time, in milliseconds, to wait to receive +* wait_port - a `comm::port` to receive on + +# Returns + +An `option` representing the outcome of the call. If the call `recv`'d on +the provided port in the allotted timeout period, then the result will be a +`some(T)`. If not, then `none` will be returned. +"] +fn recv_timeout(msecs: uint, wait_po: comm::port) -> option { + let timeout_po = comm::port::<()>(); + let timeout_ch = comm::chan(timeout_po); + delayed_send(msecs, timeout_ch, ()); + either::either( + {|left_val| + log(debug, #fmt("recv_time .. left_val %?", + left_val)); + none + }, {|right_val| + some(right_val) + }, comm::select2(timeout_po, wait_po) + ) +} + +// INTERNAL API +crust fn delayed_send_cb(handle: *uv::ll::uv_timer_t, + status: libc::c_int) unsafe { + log(debug, #fmt("delayed_send_cb handle %? status %?", handle, status)); + let timer_done_ch = + *(uv::ll::get_data_for_uv_handle(handle) as *comm::chan<()>); + let stop_result = uv::ll::timer_stop(handle); + if (stop_result == 0i32) { + comm::send(timer_done_ch, ()); + } + else { + let loop_ptr = uv::ll::get_loop_for_uv_handle(handle); + let error_msg = uv::ll::get_last_err_info(loop_ptr); + fail "timer::sleep() init failed: "+error_msg; + } +} + +crust fn delayed_send_close_cb(handle: *uv::ll::uv_timer_t) unsafe { + log(debug, #fmt("delayed_send_close_cb handle %?", handle)); + let timer_done_ch = + *(uv::ll::get_data_for_uv_handle(handle) as *comm::chan<()>); + comm::send(timer_done_ch, ()); +} + +#[cfg(test)] +mod test { + #[test] + fn test_timer_simple_sleep_test() { + sleep(1u); + } + + #[test] + fn test_timer_recv_timeout_before_time_passes() { + let expected = rand::rng().gen_str(16u); + let test_po = comm::port::(); + let test_ch = comm::chan(test_po); + + task::spawn() {|| + delayed_send(1u, test_ch, expected); + }; + + let actual = alt recv_timeout(1000u, test_po) { + some(val) { val } + _ { fail "test_timer_recv_timeout_before_time_passes:"+ + " didn't receive result before timeout"; } + }; + assert actual == expected; + } + + #[test] + fn test_timer_recv_timeout_after_time_passes() { + let expected = rand::rng().gen_str(16u); + let fail_msg = rand::rng().gen_str(16u); + let test_po = comm::port::(); + let test_ch = comm::chan(test_po); + + task::spawn() {|| + delayed_send(1000u, test_ch, expected); + }; + + let actual = alt recv_timeout(1u, test_po) { + none { fail_msg } + _ { fail "test_timer_recv_timeout_before_time_passes:"+ + " didn't receive result before timeout"; } + }; + assert actual == fail_msg; + } +} diff --git a/src/libstd/uv.rs b/src/libstd/uv.rs index d0e8c66d4b08e..29eb90a1a6801 100644 --- a/src/libstd/uv.rs +++ b/src/libstd/uv.rs @@ -23,1231 +23,11 @@ rust-idiomatic abstractions for utilizes libuv's asynchronous IO facilities. "]; -import map::hashmap; -export loop_new, loop_delete, run, close, run_in_bg; -export async_init, async_send; -export timer_init, timer_start, timer_stop; - import ll = uv_ll; export ll; import hl = uv_hl; export hl; -#[nolink] -native mod rustrt { - fn rust_uv_loop_new() -> *libc::c_void; - fn rust_uv_loop_delete(lp: *libc::c_void); - fn rust_uv_loop_set_data( - lp: *libc::c_void, - data: *uv_loop_data); - fn rust_uv_bind_op_cb(lp: *libc::c_void, cb: *u8) - -> *libc::c_void; - fn rust_uv_stop_op_cb(handle: *libc::c_void); - fn rust_uv_run(loop_handle: *libc::c_void); - fn rust_uv_hilvl_close(handle: *libc::c_void, cb: *u8); - fn rust_uv_hilvl_close_async(handle: *libc::c_void); - fn rust_uv_hilvl_close_timer(handle: *libc::c_void); - fn rust_uv_async_send(handle: *ll::uv_async_t); - fn rust_uv_hilvl_async_init( - loop_handle: *libc::c_void, - cb: *u8, - id: *u8) -> *libc::c_void; - fn rust_uv_timer_init( - loop_handle: *libc::c_void, - cb: *u8, - id: *u8) -> *libc::c_void; - fn rust_uv_timer_start( - timer_handle: *libc::c_void, - timeout: libc::c_uint, - repeat: libc::c_uint); - fn rust_uv_timer_stop(handle: *libc::c_void); - fn rust_uv_free(ptr: *libc::c_void); - // sizeof testing helpers - fn rust_uv_helper_uv_tcp_t_size() -> libc::c_uint; - fn rust_uv_helper_uv_connect_t_size() -> libc::c_uint; - fn rust_uv_helper_uv_buf_t_size() -> libc::c_uint; - fn rust_uv_helper_uv_write_t_size() -> libc::c_uint; - fn rust_uv_helper_uv_err_t_size() -> libc::c_uint; - fn rust_uv_helper_sockaddr_in_size() -> libc::c_uint; - fn rust_uv_helper_uv_async_t_size() -> libc::c_uint; -} - - -// these are processed solely in the -// process_operation() crust fn below -enum uv_operation { - op_async_init([u8]), - op_close(uv_handle, *libc::c_void), - op_timer_init([u8]), - op_timer_start([u8], *libc::c_void, u32, u32), - op_timer_stop([u8], *libc::c_void, fn~(uv_handle)), - op_teardown(*libc::c_void) -} - -enum uv_handle { - uv_async([u8], uv_loop), - uv_timer([u8], uv_loop), -} - -enum uv_msg { - // requests from library users - msg_run(comm::chan), - msg_run_in_bg(), - msg_async_init(fn~(uv_handle), fn~(uv_handle)), - msg_async_send([u8]), - msg_close(uv_handle, fn~()), - msg_timer_init(fn~(uv_handle)), - msg_timer_start([u8], u32, u32, fn~(uv_handle)), - msg_timer_stop([u8], fn~(uv_handle)), - - // dispatches from libuv - uv_async_init([u8], *libc::c_void), - uv_async_send([u8]), - uv_close([u8]), - uv_timer_init([u8], *libc::c_void), - uv_timer_call([u8]), - uv_timer_stop([u8], fn~(uv_handle)), - uv_end(), - uv_teardown_check() -} - -type uv_loop_data = { - operation_port: comm::port, - rust_loop_chan: comm::chan -}; - -enum uv_loop { - uv_loop_new(comm::chan, *libc::c_void) -} - -// public functions -fn loop_new() -> uv_loop unsafe { - let ret_recv_port: comm::port = - comm::port(); - let ret_recv_chan: comm::chan = - comm::chan(ret_recv_port); - - task::spawn_sched(task::manual_threads(1u)) {|| - // our beloved uv_loop_t ptr - let loop_handle = rustrt:: - rust_uv_loop_new(); - - // this port/chan pair are used to send messages to - // libuv. libuv processes any pending messages on the - // port (via crust) after receiving an async "wakeup" - // on a special uv_async_t handle created below - let operation_port = comm::port::(); - let operation_chan = comm::chan::( - operation_port); - - // this port/chan pair as used in the while() loop - // below. It takes dispatches, originating from libuv - // callbacks, to invoke handles registered by the - // user - let rust_loop_port = comm::port::(); - let rust_loop_chan = - comm::chan::(rust_loop_port); - // let the task-spawner return - let user_uv_loop = uv_loop_new(rust_loop_chan, loop_handle); - comm::send(ret_recv_chan, copy(user_uv_loop)); - - // create our "special" async handle that will - // allow all operations against libuv to be - // "buffered" in the operation_port, for processing - // from the thread that libuv runs on - let loop_data: uv_loop_data = { - operation_port: operation_port, - rust_loop_chan: rust_loop_chan - }; - rustrt::rust_uv_loop_set_data( - loop_handle, - ptr::addr_of(loop_data)); // pass an opaque C-ptr - // to libuv, this will be - // in the process_operation - // crust fn - let op_handle = rustrt::rust_uv_bind_op_cb( - loop_handle, - process_operation); - - // all state goes here - let handles: map::hashmap<[u8], *libc::c_void> = - map::bytes_hash(); - let id_to_handle: map::hashmap<[u8], uv_handle> = - map::bytes_hash(); - let after_cbs: map::hashmap<[u8], fn~(uv_handle)> = - map::bytes_hash(); - let close_callbacks: map::hashmap<[u8], fn~()> = - map::bytes_hash(); - let async_cbs: map::hashmap<[u8], fn~(uv_handle)> = - map::bytes_hash(); - let timer_cbs: map::hashmap<[u8], fn~(uv_handle)> = - map::bytes_hash(); - - // the main loop that this task blocks on. - // should have the same lifetime as the C libuv - // event loop. - let mut keep_going = true; - while keep_going { - alt comm::recv(rust_loop_port) { - msg_run(end_chan) { - // start the libuv event loop - // we'll also do a uv_async_send with - // the operation handle to have the - // loop process any pending operations - // once its up and running - task::spawn_sched(task::manual_threads(1u)) {|| - // make sure we didn't start the loop - // without the user registering handles - comm::send(rust_loop_chan, uv_teardown_check); - // this call blocks - rustrt::rust_uv_run(loop_handle); - // when we're done, msg the - // end chan - comm::send(end_chan, true); - comm::send(rust_loop_chan, uv_end); - }; - } - - msg_run_in_bg { - task::spawn_sched(task::manual_threads(1u)) {|| - // see note above - comm::send(rust_loop_chan, uv_teardown_check); - // this call blocks - rustrt::rust_uv_run(loop_handle); - }; - } - - msg_close(handle, cb) { - let id = get_id_from_handle(handle); - close_callbacks.insert(id, cb); - let handle_ptr = handles.get(id); - let op = op_close(handle, handle_ptr); - - pass_to_libuv(op_handle, operation_chan, op); - } - uv_close(id) { - handles.remove(id); - let handle = id_to_handle.get(id); - id_to_handle.remove(id); - alt handle { - uv_async(id, _) { - async_cbs.remove(id); - } - uv_timer(id, _) { - timer_cbs.remove(id); - } - _ { - fail "unknown form of uv_handle encountered " - + "in uv_close handler"; - } - } - let cb = close_callbacks.get(id); - close_callbacks.remove(id); - task::spawn {|| - cb(); - }; - // ask the rust loop to check and see if there - // are no more user-registered handles - comm::send(rust_loop_chan, uv_teardown_check); - } - - msg_async_init(callback, after_cb) { - // create a new async handle - // with the id as the handle's - // data and save the callback for - // invocation on msg_async_send - let id = gen_handle_id(); - handles.insert(id, ptr::null()); - async_cbs.insert(id, callback); - after_cbs.insert(id, after_cb); - let op = op_async_init(id); - pass_to_libuv(op_handle, operation_chan, op); - } - uv_async_init(id, async_handle) { - // libuv created a handle, which is - // passed back to us. save it and - // then invoke the supplied callback - // for after completion - handles.insert(id, async_handle); - let after_cb = after_cbs.get(id); - after_cbs.remove(id); - let async = uv_async(id, user_uv_loop); - id_to_handle.insert(id, copy(async)); - task::spawn {|| - after_cb(async); - }; - } - - msg_async_send(id) { - let async_handle = handles.get(id); - do_send(async_handle); - } - uv_async_send(id) { - let async_cb = async_cbs.get(id); - task::spawn {|| - let the_loop = user_uv_loop; - async_cb(uv_async(id, the_loop)); - }; - } - - msg_timer_init(after_cb) { - let id = gen_handle_id(); - handles.insert(id, ptr::null()); - after_cbs.insert(id, after_cb); - let op = op_timer_init(id); - pass_to_libuv(op_handle, operation_chan, op); - } - uv_timer_init(id, handle) { - handles.insert(id, handle); - let after_cb = after_cbs.get(id); - after_cbs.remove(id); - let new_timer = uv_timer(id, user_uv_loop); - id_to_handle.insert(id, copy(new_timer)); - task::spawn {|| - after_cb(new_timer); - }; - } - - uv_timer_call(id) { - let cb = timer_cbs.get(id); - let the_timer = id_to_handle.get(id); - task::spawn {|| - cb(the_timer); - }; - } - - msg_timer_start(id, timeout, repeat, timer_call_cb) { - timer_cbs.insert(id, timer_call_cb); - let handle = handles.get(id); - let op = op_timer_start(id, handle, timeout, - repeat); - pass_to_libuv(op_handle, operation_chan, op); - } - - msg_timer_stop(id, after_cb) { - let handle = handles.get(id); - let op = op_timer_stop(id, handle, after_cb); - pass_to_libuv(op_handle, operation_chan, op); - } - uv_timer_stop(id, after_cb) { - let the_timer = id_to_handle.get(id); - after_cb(the_timer); - } - - uv_teardown_check() { - // here we're checking if there are no user-registered - // handles (and the loop is running), if this is the - // case, then we need to unregister the op_handle via - // a uv_close() call, thus allowing libuv to return - // on its own. - if (handles.size() == 0u) { - let op = op_teardown(op_handle); - pass_to_libuv(op_handle, operation_chan, op); - } - } - - uv_end() { - keep_going = false; - } - - _ { fail "unknown form of uv_msg received"; } - } - } - }; - ret comm::recv(ret_recv_port); -} - -fn loop_delete(lp: uv_loop) { - let loop_ptr = get_loop_ptr_from_uv_loop(lp); - rustrt::rust_uv_loop_delete(loop_ptr); -} - -fn run(lp: uv_loop) { - let end_port = comm::port::(); - let end_chan = comm::chan::(end_port); - let loop_chan = get_loop_chan_from_uv_loop(lp); - comm::send(loop_chan, msg_run(end_chan)); - comm::recv(end_port); -} - -fn run_in_bg(lp: uv_loop) { - let loop_chan = get_loop_chan_from_uv_loop(lp); - comm::send(loop_chan, msg_run_in_bg); -} - -fn async_init ( - lp: uv_loop, - async_cb: fn~(uv_handle), - after_cb: fn~(uv_handle)) { - let msg = msg_async_init(async_cb, after_cb); - let loop_chan = get_loop_chan_from_uv_loop(lp); - comm::send(loop_chan, msg); -} - -fn async_send(async: uv_handle) { - alt async { - uv_async(id, lp) { - let loop_chan = get_loop_chan_from_uv_loop(lp); - comm::send(loop_chan, msg_async_send(id)); - } - _ { - fail "attempting to call async_send() with a" + - " uv_async uv_handle"; - } - } -} - -fn close(h: uv_handle, cb: fn~()) { - let loop_chan = get_loop_chan_from_handle(h); - comm::send(loop_chan, msg_close(h, cb)); -} - -fn timer_init(lp: uv_loop, after_cb: fn~(uv_handle)) { - let msg = msg_timer_init(after_cb); - let loop_chan = get_loop_chan_from_uv_loop(lp); - comm::send(loop_chan, msg); -} - -fn timer_start(the_timer: uv_handle, timeout: u32, repeat:u32, - timer_cb: fn~(uv_handle)) { - alt the_timer { - uv_timer(id, lp) { - let msg = msg_timer_start(id, timeout, repeat, timer_cb); - let loop_chan = get_loop_chan_from_uv_loop(lp); - comm::send(loop_chan, msg); - } - _ { - fail "can only pass a uv_timer form of uv_handle to "+ - " uv::timer_start()"; - } - } -} - -fn timer_stop(the_timer: uv_handle, after_cb: fn~(uv_handle)) { - alt the_timer { - uv_timer(id, lp) { - let loop_chan = get_loop_chan_from_uv_loop(lp); - let msg = msg_timer_stop(id, after_cb); - comm::send(loop_chan, msg); - } - _ { - fail "only uv_timer form is allowed in calls to "+ - " uv::timer_stop()"; - } - } -} - -// internal functions -fn pass_to_libuv( - op_handle: *libc::c_void, - operation_chan: comm::chan, - op: uv_operation) unsafe { - comm::send(operation_chan, copy(op)); - do_send(op_handle); -} -fn do_send(h: *libc::c_void) { - rustrt::rust_uv_async_send(h as *ll::uv_async_t); -} -fn gen_handle_id() -> [u8] { - ret rand::rng().gen_bytes(16u); -} -fn get_handle_id_from(buf: *u8) -> [u8] unsafe { - ret vec::unsafe::from_buf(buf, 16u); -} - -fn get_loop_chan_from_data(data: *uv_loop_data) - -> comm::chan unsafe { - ret (*data).rust_loop_chan; -} - -fn get_loop_chan_from_handle(handle: uv_handle) - -> comm::chan { - alt handle { - uv_async(id,lp) | uv_timer(id,lp) { - let loop_chan = get_loop_chan_from_uv_loop(lp); - ret loop_chan; - } - _ { - fail "unknown form of uv_handle for get_loop_chan_from " - + " handle"; - } - } -} - -fn get_loop_ptr_from_uv_loop(lp: uv_loop) -> *libc::c_void { - alt lp { - uv_loop_new(loop_chan, loop_ptr) { - ret loop_ptr; - } - } -} -fn get_loop_chan_from_uv_loop(lp: uv_loop) -> comm::chan { - alt lp { - uv_loop_new(loop_chan, loop_ptr) { - ret loop_chan; - } - } -} - -fn get_id_from_handle(handle: uv_handle) -> [u8] { - alt handle { - uv_async(id,lp) | uv_timer(id,lp) { - ret id; - } - _ { - fail "unknown form of uv_handle for get_id_from handle"; - } - } -} - -// crust -crust fn process_operation( - lp: *libc::c_void, - data: *uv_loop_data) unsafe { - let op_port = (*data).operation_port; - let loop_chan = get_loop_chan_from_data(data); - let mut op_pending = comm::peek(op_port); - while(op_pending) { - alt comm::recv(op_port) { - op_async_init(id) { - let id_ptr = vec::unsafe::to_ptr(id); - let async_handle = rustrt::rust_uv_hilvl_async_init( - lp, - process_async_send, - id_ptr); - comm::send(loop_chan, uv_async_init( - id, - async_handle)); - } - op_close(handle, handle_ptr) { - handle_op_close(handle, handle_ptr); - } - op_timer_init(id) { - let id_ptr = vec::unsafe::to_ptr(id); - let timer_handle = rustrt::rust_uv_timer_init( - lp, - process_timer_call, - id_ptr); - comm::send(loop_chan, uv_timer_init( - id, - timer_handle)); - } - op_timer_start(id, handle, timeout, repeat) { - rustrt::rust_uv_timer_start(handle, timeout, - repeat); - } - op_timer_stop(id, handle, after_cb) { - rustrt::rust_uv_timer_stop(handle); - comm::send(loop_chan, uv_timer_stop(id, after_cb)); - } - op_teardown(op_handle) { - // this is the last msg that'll be processed by - // this fn, in the current lifetime of the handle's - // uv_loop_t - rustrt::rust_uv_stop_op_cb(op_handle); - } - _ { fail "unknown form of uv_operation received"; } - } - op_pending = comm::peek(op_port); - } -} - -fn handle_op_close(handle: uv_handle, handle_ptr: *libc::c_void) { - // it's just like im doing C - alt handle { - uv_async(id, lp) { - let cb = process_close_async; - rustrt::rust_uv_hilvl_close( - handle_ptr, cb); - } - uv_timer(id, lp) { - let cb = process_close_timer; - rustrt::rust_uv_hilvl_close( - handle_ptr, cb); - } - _ { - fail "unknown form of uv_handle encountered " + - "in process_operation/op_close"; - } - } -} - -crust fn process_async_send(id_buf: *u8, data: *uv_loop_data) - unsafe { - let handle_id = get_handle_id_from(id_buf); - let loop_chan = get_loop_chan_from_data(data); - comm::send(loop_chan, uv_async_send(handle_id)); -} - -crust fn process_timer_call(id_buf: *u8, data: *uv_loop_data) - unsafe { - let handle_id = get_handle_id_from(id_buf); - let loop_chan = get_loop_chan_from_data(data); - comm::send(loop_chan, uv_timer_call(handle_id)); -} - -fn process_close_common(id: [u8], data: *uv_loop_data) - unsafe { - // notify the rust loop that their handle is closed, then - // the caller will invoke a per-handle-type c++ func to - // free allocated memory - let loop_chan = get_loop_chan_from_data(data); - comm::send(loop_chan, uv_close(id)); -} - -crust fn process_close_async( - id_buf: *u8, - handle_ptr: *libc::c_void, - data: *uv_loop_data) - unsafe { - let id = get_handle_id_from(id_buf); - rustrt::rust_uv_hilvl_close_async(handle_ptr); - // at this point, the handle and its data has been - // released. notify the rust loop to remove the - // handle and its data and call the user-supplied - // close cb - process_close_common(id, data); -} - -crust fn process_close_timer( - id_buf: *u8, - handle_ptr: *libc::c_void, - data: *uv_loop_data) - unsafe { - let id = get_handle_id_from(id_buf); - rustrt::rust_uv_hilvl_close_timer(handle_ptr); - process_close_common(id, data); -} - - -#[test] -fn test_uv_new_loop_no_handles() { - let test_loop = uv::loop_new(); - uv::run(test_loop); // this should return immediately - // since there aren't any handles.. - uv::loop_delete(test_loop); -} - -#[test] -#[ignore(cfg(target_os = "freebsd"))] -fn test_uv_simple_async() { - let test_loop = uv::loop_new(); - let exit_port = comm::port::(); - let exit_chan = comm::chan::(exit_port); - uv::async_init(test_loop, {|new_async| - uv::close(new_async) {|| - comm::send(exit_chan, true); - }; - }, {|new_async| - uv::async_send(new_async); - }); - uv::run(test_loop); - let result = comm::recv(exit_port); - assert result; - uv::loop_delete(test_loop); -} - -#[test] -#[ignore(cfg(target_os = "freebsd"))] -fn test_uv_timer() { - let test_loop = uv::loop_new(); - let exit_port = comm::port::(); - let exit_chan = comm::chan::(exit_port); - uv::timer_init(test_loop) {|new_timer| - uv::timer_start(new_timer, 1u32, 0u32) {|started_timer| - uv::timer_stop(started_timer) {|stopped_timer| - uv::close(stopped_timer) {|| - comm::send(exit_chan, true); - }; - }; - }; - }; - uv::run(test_loop); - assert comm::recv(exit_port); - uv::loop_delete(test_loop); -} - -enum tcp_read_data { - tcp_read_eof, - tcp_read_more([u8]), - tcp_read_error -} - -type request_wrapper = { - write_req: *ll::uv_write_t, - req_buf: *[ll::uv_buf_t], - read_chan: *comm::chan -}; - -crust fn after_close_cb(handle: *libc::c_void) { - log(debug, #fmt("after uv_close! handle ptr: %?", - handle)); -} - -crust fn on_alloc_cb(handle: *libc::c_void, - ++suggested_size: libc::size_t) - -> ll::uv_buf_t unsafe { - log(debug, "on_alloc_cb!"); - let char_ptr = ll::malloc_buf_base_of(suggested_size); - log(debug, #fmt("on_alloc_cb h: %? char_ptr: %u sugsize: %u", - handle, - char_ptr as uint, - suggested_size as uint)); - ret ll::buf_init(char_ptr, suggested_size); -} - -crust fn on_read_cb(stream: *ll::uv_stream_t, - nread: libc::ssize_t, - ++buf: ll::uv_buf_t) unsafe { - log(debug, #fmt("CLIENT entering on_read_cb nred: %d", nread)); - if (nread > 0) { - // we have data - log(debug, #fmt("CLIENT read: data! nread: %d", nread)); - ll::read_stop(stream); - let client_data = ll:: - get_data_for_uv_handle(stream as *libc::c_void) - as *request_wrapper; - let buf_base = ll::get_base_from_buf(buf); - let buf_len = ll::get_len_from_buf(buf); - let bytes = vec::unsafe::from_buf(buf_base, buf_len); - let read_chan = *((*client_data).read_chan); - let msg_from_server = str::from_bytes(bytes); - comm::send(read_chan, msg_from_server); - ll::close(stream as *libc::c_void, after_close_cb) - } - else if (nread == -1) { - // err .. possibly EOF - log(debug, "read: eof!"); - } - else { - // nread == 0 .. do nothing, just free buf as below - log(debug, "read: do nothing!"); - } - // when we're done - ll::free_base_of_buf(buf); - log(debug, "CLIENT exiting on_read_cb"); -} - -crust fn on_write_complete_cb(write_req: *ll::uv_write_t, - status: libc::c_int) unsafe { - log(debug, #fmt("CLIENT beginning on_write_complete_cb status: %d", - status as int)); - let stream = ll::get_stream_handle_from_write_req(write_req); - log(debug, #fmt("CLIENT on_write_complete_cb: tcp:%d write_handle:%d", - stream as int, write_req as int)); - let result = ll::read_start(stream, on_alloc_cb, on_read_cb); - log(debug, #fmt("CLIENT ending on_write_complete_cb .. status: %d", - result as int)); -} - -crust fn on_connect_cb(connect_req_ptr: *ll::uv_connect_t, - status: libc::c_int) unsafe { - log(debug, #fmt("beginning on_connect_cb .. status: %d", - status as int)); - let stream = - ll::get_stream_handle_from_connect_req(connect_req_ptr); - if (status == 0i32) { - log(debug, "on_connect_cb: in status=0 if.."); - let client_data = ll::get_data_for_req( - connect_req_ptr as *libc::c_void) - as *request_wrapper; - let write_handle = (*client_data).write_req as *libc::c_void; - log(debug, #fmt("on_connect_cb: tcp stream: %d write_handle addr %d", - stream as int, write_handle as int)); - let write_result = ll::write(write_handle, - stream as *libc::c_void, - (*client_data).req_buf, - on_write_complete_cb); - log(debug, #fmt("on_connect_cb: ll::write() status: %d", - write_result as int)); - } - else { - let test_loop = ll::get_loop_for_uv_handle( - stream as *libc::c_void); - let err_msg = ll::get_last_err_info(test_loop); - log(debug, err_msg); - assert false; - } - log(debug, "finishing on_connect_cb"); -} - -fn impl_uv_tcp_request(ip: str, port: int, req_str: str, - client_chan: *comm::chan) unsafe { - let test_loop = ll::loop_new(); - let tcp_handle = ll::tcp_t(); - let tcp_handle_ptr = ptr::addr_of(tcp_handle); - let connect_handle = ll::connect_t(); - let connect_req_ptr = ptr::addr_of(connect_handle); - - // this is the persistent payload of data that we - // need to pass around to get this example to work. - // In C, this would be a malloc'd or stack-allocated - // struct that we'd cast to a void* and store as the - // data field in our uv_connect_t struct - let req_str_bytes = str::bytes(req_str); - let req_msg_ptr: *u8 = vec::unsafe::to_ptr(req_str_bytes); - log(debug, #fmt("req_msg ptr: %u", req_msg_ptr as uint)); - let req_msg = [ - ll::buf_init(req_msg_ptr, vec::len(req_str_bytes)) - ]; - // this is the enclosing record, we'll pass a ptr to - // this to C.. - let write_handle = ll::write_t(); - let write_handle_ptr = ptr::addr_of(write_handle); - log(debug, #fmt("tcp req: tcp stream: %d write_handle: %d", - tcp_handle_ptr as int, - write_handle_ptr as int)); - let client_data = { writer_handle: write_handle_ptr, - req_buf: ptr::addr_of(req_msg), - read_chan: client_chan }; - - let tcp_init_result = ll::tcp_init( - test_loop as *libc::c_void, tcp_handle_ptr); - if (tcp_init_result == 0i32) { - log(debug, "sucessful tcp_init_result"); - - log(debug, "building addr..."); - let addr = ll::ip4_addr(ip, port); - // FIXME ref #2064 - let addr_ptr = ptr::addr_of(addr); - log(debug, #fmt("after build addr in rust. port: %u", - addr.sin_port as uint)); - - // this should set up the connection request.. - log(debug, #fmt("before calling tcp_connect .. connect cb ptr: %u ", - on_connect_cb as uint)); - let tcp_connect_result = ll::tcp_connect( - connect_req_ptr, tcp_handle_ptr, - addr_ptr, on_connect_cb); - if (tcp_connect_result == 0i32) { - // not set the data on the connect_req - // until its initialized - ll::set_data_for_req( - connect_req_ptr as *libc::c_void, - ptr::addr_of(client_data) as *libc::c_void); - ll::set_data_for_uv_handle( - tcp_handle_ptr as *libc::c_void, - ptr::addr_of(client_data) as *libc::c_void); - log(debug, "before run tcp req loop"); - ll::run(test_loop); - log(debug, "after run tcp req loop"); - } - else { - log(debug, "ll::tcp_connect() failure"); - assert false; - } - } - else { - log(debug, "ll::tcp_init() failure"); - assert false; - } - ll::loop_delete(test_loop); - -} - -crust fn server_after_close_cb(handle: *libc::c_void) unsafe { - log(debug, #fmt("SERVER server stream closed, should exit.. h: %?", - handle)); -} - -crust fn client_stream_after_close_cb(handle: *libc::c_void) - unsafe { - log(debug, "SERVER: closed client stream, now closing server stream"); - let client_data = ll::get_data_for_uv_handle( - handle) as - *tcp_server_data; - ll::close((*client_data).server as *libc::c_void, - server_after_close_cb); -} - -crust fn after_server_resp_write(req: *ll::uv_write_t) unsafe { - let client_stream_ptr = - ll::get_stream_handle_from_write_req(req); - log(debug, "SERVER: resp sent... closing client stream"); - ll::close(client_stream_ptr as *libc::c_void, - client_stream_after_close_cb) -} - -crust fn on_server_read_cb(client_stream_ptr: *ll::uv_stream_t, - nread: libc::ssize_t, - ++buf: ll::uv_buf_t) unsafe { - if (nread > 0) { - // we have data - log(debug, #fmt("SERVER read: data! nread: %d", nread)); - - // pull out the contents of the write from the client - let buf_base = ll::get_base_from_buf(buf); - let buf_len = ll::get_len_from_buf(buf); - log(debug, #fmt("SERVER buf base: %u, len: %u, nread: %d", - buf_base as uint, - buf_len as uint, - nread)); - let bytes = vec::unsafe::from_buf(buf_base, buf_len); - let request_str = str::from_bytes(bytes); - - let client_data = ll::get_data_for_uv_handle( - client_stream_ptr as *libc::c_void) as *tcp_server_data; - - let server_kill_msg = (*client_data).server_kill_msg; - let write_req = (*client_data).server_write_req; - if (str::contains(request_str, server_kill_msg)) { - log(debug, "SERVER: client request contains server_kill_msg!"); - log(debug, "SERVER: sending response to client"); - ll::read_stop(client_stream_ptr); - let server_chan = *((*client_data).server_chan); - comm::send(server_chan, request_str); - let write_result = ll::write( - write_req as *libc::c_void, - client_stream_ptr as *libc::c_void, - (*client_data).server_resp_buf, - after_server_resp_write); - log(debug, #fmt("SERVER: resp write result: %d", - write_result as int)); - if (write_result != 0i32) { - log(debug, "bad result for server resp ll::write()"); - log(debug, ll::get_last_err_info( - ll::get_loop_for_uv_handle(client_stream_ptr - as *libc::c_void))); - assert false; - } - } - else { - log(debug, "SERVER: client req DOESNT contain server_kill_msg!"); - } - } - else if (nread == -1) { - // err .. possibly EOF - log(debug, "read: eof!"); - } - else { - // nread == 0 .. do nothing, just free buf as below - log(debug, "read: do nothing!"); - } - // when we're done - ll::free_base_of_buf(buf); - log(debug, "SERVER exiting on_read_cb"); -} - -crust fn server_connection_cb(server_stream_ptr: - *ll::uv_stream_t, - status: libc::c_int) unsafe { - log(debug, "client connecting!"); - let test_loop = ll::get_loop_for_uv_handle( - server_stream_ptr as *libc::c_void); - if status != 0i32 { - let err_msg = ll::get_last_err_info(test_loop); - log(debug, #fmt("server_connect_cb: non-zero status: %?", - err_msg)); - ret; - } - let server_data = ll::get_data_for_uv_handle( - server_stream_ptr as *libc::c_void) as *tcp_server_data; - let client_stream_ptr = (*server_data).client; - let client_init_result = ll::tcp_init(test_loop, - client_stream_ptr); - ll::set_data_for_uv_handle( - client_stream_ptr as *libc::c_void, - server_data as *libc::c_void); - if (client_init_result == 0i32) { - log(debug, "successfully initialized client stream"); - let accept_result = ll::accept(server_stream_ptr as - *libc::c_void, - client_stream_ptr as - *libc::c_void); - if (accept_result == 0i32) { - // start reading - let read_result = ll::read_start( - client_stream_ptr as *ll::uv_stream_t, - on_alloc_cb, - on_server_read_cb); - if (read_result == 0i32) { - log(debug, "successful server read start"); - } - else { - log(debug, #fmt("server_connection_cb: bad read:%d", - read_result as int)); - assert false; - } - } - else { - log(debug, #fmt("server_connection_cb: bad accept: %d", - accept_result as int)); - assert false; - } - } - else { - log(debug, #fmt("server_connection_cb: bad client init: %d", - client_init_result as int)); - assert false; - } -} - -type tcp_server_data = { - client: *ll::uv_tcp_t, - server: *ll::uv_tcp_t, - server_kill_msg: str, - server_resp_buf: *[ll::uv_buf_t], - server_chan: *comm::chan, - server_write_req: *ll::uv_write_t -}; - -type async_handle_data = { - continue_chan: *comm::chan -}; - -crust fn async_close_cb(handle: *libc::c_void) { - log(debug, #fmt("SERVER: closing async cb... h: %?", - handle)); -} - -crust fn continue_async_cb(async_handle: *ll::uv_async_t, - status: libc::c_int) unsafe { - // once we're in the body of this callback, - // the tcp server's loop is set up, so we - // can continue on to let the tcp client - // do its thang - let data = ll::get_data_for_uv_handle( - async_handle as *libc::c_void) as *async_handle_data; - let continue_chan = *((*data).continue_chan); - let should_continue = status == 0i32; - comm::send(continue_chan, should_continue); - ll::close(async_handle as *libc::c_void, async_close_cb); -} - -fn impl_uv_tcp_server(server_ip: str, - server_port: int, - kill_server_msg: str, - server_resp_msg: str, - server_chan: *comm::chan, - continue_chan: *comm::chan) unsafe { - let test_loop = ll::loop_new(); - let tcp_server = ll::tcp_t(); - let tcp_server_ptr = ptr::addr_of(tcp_server); - - let tcp_client = ll::tcp_t(); - let tcp_client_ptr = ptr::addr_of(tcp_client); - - let server_write_req = ll::write_t(); - let server_write_req_ptr = ptr::addr_of(server_write_req); - - let resp_str_bytes = str::bytes(server_resp_msg); - let resp_msg_ptr: *u8 = vec::unsafe::to_ptr(resp_str_bytes); - log(debug, #fmt("resp_msg ptr: %u", resp_msg_ptr as uint)); - let resp_msg = [ - ll::buf_init(resp_msg_ptr, vec::len(resp_str_bytes)) - ]; - - let continue_async_handle = ll::async_t(); - let continue_async_handle_ptr = - ptr::addr_of(continue_async_handle); - let async_data = - { continue_chan: continue_chan }; - let async_data_ptr = ptr::addr_of(async_data); - - let server_data: tcp_server_data = { - client: tcp_client_ptr, - server: tcp_server_ptr, - server_kill_msg: kill_server_msg, - server_resp_buf: ptr::addr_of(resp_msg), - server_chan: server_chan, - server_write_req: server_write_req_ptr - }; - let server_data_ptr = ptr::addr_of(server_data); - ll::set_data_for_uv_handle(tcp_server_ptr as *libc::c_void, - server_data_ptr as *libc::c_void); - - // uv_tcp_init() - let tcp_init_result = ll::tcp_init( - test_loop as *libc::c_void, tcp_server_ptr); - if (tcp_init_result == 0i32) { - let server_addr = ll::ip4_addr(server_ip, server_port); - // FIXME ref #2064 - let server_addr_ptr = ptr::addr_of(server_addr); - - // uv_tcp_bind() - let bind_result = ll::tcp_bind(tcp_server_ptr, - server_addr_ptr); - if (bind_result == 0i32) { - log(debug, "successful uv_tcp_bind, listening"); - - // uv_listen() - let listen_result = ll::listen(tcp_server_ptr as - *libc::c_void, - 128i32, - server_connection_cb); - if (listen_result == 0i32) { - // let the test know it can set up the tcp server, - // now.. this may still present a race, not sure.. - let async_result = ll::async_init(test_loop, - continue_async_handle_ptr, - continue_async_cb); - if (async_result == 0i32) { - ll::set_data_for_uv_handle( - continue_async_handle_ptr as *libc::c_void, - async_data_ptr as *libc::c_void); - ll::async_send(continue_async_handle_ptr); - // uv_run() - ll::run(test_loop); - log(debug, "server uv::run() has returned"); - } - else { - log(debug, #fmt("uv_async_init failure: %d", - async_result as int)); - assert false; - } - } - else { - log(debug, #fmt("non-zero result on uv_listen: %d", - listen_result as int)); - assert false; - } - } - else { - log(debug, #fmt("non-zero result on uv_tcp_bind: %d", - bind_result as int)); - assert false; - } - } - else { - log(debug, #fmt("non-zero result on uv_tcp_init: %d", - tcp_init_result as int)); - assert false; - } - ll::loop_delete(test_loop); -} - -// this is the impl for a test that is (maybe) ran on a -// per-platform/arch basis below -fn impl_uv_tcp_server_and_request() unsafe { - let bind_ip = "0.0.0.0"; - let request_ip = "127.0.0.1"; - let port = 8888; - let kill_server_msg = "does a dog have buddha nature?"; - let server_resp_msg = "mu!"; - let client_port = comm::port::(); - let client_chan = comm::chan::(client_port); - let server_port = comm::port::(); - let server_chan = comm::chan::(server_port); - - let continue_port = comm::port::(); - let continue_chan = comm::chan::(continue_port); - let continue_chan_ptr = ptr::addr_of(continue_chan); - - task::spawn_sched(task::manual_threads(1u)) {|| - impl_uv_tcp_server(bind_ip, port, - kill_server_msg, - server_resp_msg, - ptr::addr_of(server_chan), - continue_chan_ptr); - }; - - // block until the server up is.. possibly a race? - log(debug, "before receiving on server continue_port"); - comm::recv(continue_port); - log(debug, "received on continue port, set up tcp client"); - - task::spawn_sched(task::manual_threads(1u)) {|| - impl_uv_tcp_request(request_ip, port, - kill_server_msg, - ptr::addr_of(client_chan)); - }; - - let msg_from_client = comm::recv(server_port); - let msg_from_server = comm::recv(client_port); - - assert str::contains(msg_from_client, kill_server_msg); - assert str::contains(msg_from_server, server_resp_msg); -} - -// don't run this test on fbsd or 32bit linux -#[cfg(target_os="win32")] -#[cfg(target_os="darwin")] -#[cfg(target_os="linux")] -mod tcp_and_server_client_test { - #[cfg(target_arch="x86_64")] - mod impl64 { - #[test] - fn test_uv_tcp_server_and_request() unsafe { - impl_uv_tcp_server_and_request(); - } - } - #[cfg(target_arch="x86")] - mod impl32 { - #[test] - #[ignore(cfg(target_os = "linux"))] - fn test_uv_tcp_server_and_request() unsafe { - impl_uv_tcp_server_and_request(); - } - } -} - -// struct size tests -#[test] -#[ignore(cfg(target_os = "freebsd"))] -fn test_uv_struct_size_uv_tcp_t() { - let native_handle_size = rustrt::rust_uv_helper_uv_tcp_t_size(); - let rust_handle_size = sys::size_of::(); - let output = #fmt("uv_tcp_t -- native: %u rust: %u", - native_handle_size as uint, rust_handle_size); - log(debug, output); - assert native_handle_size as uint == rust_handle_size; -} -#[test] -#[ignore(cfg(target_os = "freebsd"))] -fn test_uv_struct_size_uv_connect_t() { - let native_handle_size = - rustrt::rust_uv_helper_uv_connect_t_size(); - let rust_handle_size = sys::size_of::(); - let output = #fmt("uv_connect_t -- native: %u rust: %u", - native_handle_size as uint, rust_handle_size); - log(debug, output); - assert native_handle_size as uint == rust_handle_size; -} -#[test] -#[ignore(cfg(target_os = "freebsd"))] -fn test_uv_struct_size_uv_buf_t() { - let native_handle_size = - rustrt::rust_uv_helper_uv_buf_t_size(); - let rust_handle_size = sys::size_of::(); - let output = #fmt("uv_buf_t -- native: %u rust: %u", - native_handle_size as uint, rust_handle_size); - log(debug, output); - assert native_handle_size as uint == rust_handle_size; -} -#[test] -#[ignore(cfg(target_os = "freebsd"))] -fn test_uv_struct_size_uv_write_t() { - let native_handle_size = - rustrt::rust_uv_helper_uv_write_t_size(); - let rust_handle_size = sys::size_of::(); - let output = #fmt("uv_write_t -- native: %u rust: %u", - native_handle_size as uint, rust_handle_size); - log(debug, output); - assert native_handle_size as uint == rust_handle_size; -} - -#[test] -#[ignore(cfg(target_os = "freebsd"))] -fn test_uv_struct_size_sockaddr_in() { - let native_handle_size = - rustrt::rust_uv_helper_sockaddr_in_size(); - let rust_handle_size = sys::size_of::(); - let output = #fmt("sockaddr_in -- native: %u rust: %u", - native_handle_size as uint, rust_handle_size); - log(debug, output); - assert native_handle_size as uint == rust_handle_size; -} - -#[test] -#[ignore(cfg(target_os = "freebsd"))] -fn test_uv_struct_size_uv_async_t() { - let native_handle_size = - rustrt::rust_uv_helper_uv_async_t_size(); - let rust_handle_size = sys::size_of::(); - let output = #fmt("uv_async_t -- native: %u rust: %u", - native_handle_size as uint, rust_handle_size); - log(debug, output); - assert native_handle_size as uint == rust_handle_size; -} - +import global_loop = uv_global_loop; +export global_loop; diff --git a/src/libstd/uv_global_loop.rs b/src/libstd/uv_global_loop.rs new file mode 100644 index 0000000000000..fbb4ee0a1557b --- /dev/null +++ b/src/libstd/uv_global_loop.rs @@ -0,0 +1,376 @@ +#[doc=" +A process-wide libuv event loop for library use. +"]; + +import ll = uv_ll; +import hl = uv_hl; +import get_gl = get; + +export get, get_single_task_gl, get_monitor_task_gl; + +native mod rustrt { + fn rust_uv_get_kernel_global_chan_ptr() -> *libc::uintptr_t; + fn rust_uv_get_kernel_monitor_global_chan_ptr() -> *libc::uintptr_t; + fn rust_uv_get_kernel_global_async_handle() -> *libc::uintptr_t; + fn rust_compare_and_swap_ptr(address: *libc::uintptr_t, + oldval: libc::uintptr_t, + newval: libc::uintptr_t) -> bool; +} + +#[doc =" +Race-free helper to get access to a global task where a libuv +loop is running. + +Use `uv::hl::interact`, `uv::hl::ref`, `uv::hl::unref` and +uv `uv::hl::unref_and_close` to do operations against the global +loop that this function returns. + +# Return + +* A `hl::high_level_loop` that encapsulates communication with the global +loop. +"] +fn get() -> hl::high_level_loop { + ret get_monitor_task_gl(); +} + +// WARNING: USE ONLY ONE get_*_task_gl fn in the scope of a process lifetime. +#[doc(hidden)] +fn get_monitor_task_gl() -> hl::high_level_loop { + let monitor_loop_chan = + rustrt::rust_uv_get_kernel_monitor_global_chan_ptr(); + ret spawn_global_weak_task( + monitor_loop_chan, + {|weak_exit_po, msg_po, loop_ptr, first_msg| + log(debug, "monitor gl: entering inner loop"); + unsafe { + monitor_task_loop_body(weak_exit_po, msg_po, loop_ptr, + copy(first_msg)) + } + }, + {|msg_ch| + hl::monitor_task_loop({op_chan: msg_ch}) + }); +} + +// WARNING: USE ONLY ONE get_*_task_gl fn in the scope of a process lifetime. +#[doc(hidden)] +fn get_single_task_gl() -> hl::high_level_loop { + let global_loop_chan_ptr = rustrt::rust_uv_get_kernel_global_chan_ptr(); + ret spawn_global_weak_task( + global_loop_chan_ptr, + {|weak_exit_po, msg_po, loop_ptr, first_msg| + log(debug, "single-task gl: about to enter inner loop"); + unsafe { + single_task_loop_body(weak_exit_po, msg_po, loop_ptr, + copy(first_msg)) + } + }, + {|msg_ch| + log(debug, "after priv::chan_from_global_ptr"); + unsafe { + let handle = get_global_async_handle_native_representation() + as **ll::uv_async_t; + hl::single_task_loop( + { async_handle: handle, op_chan: msg_ch }) + } + } + ); +} + +// INTERNAL API + +fn spawn_global_weak_task( + global_loop_chan_ptr: *libc::uintptr_t, + weak_task_body_cb: fn~( + comm::port<()>, + comm::port, + *libc::c_void, + hl::high_level_msg) -> bool, + after_task_spawn_cb: fn~(comm::chan) + -> hl::high_level_loop) -> hl::high_level_loop { + log(debug, #fmt("ENTERING global_loop::get() loop chan: %?", + global_loop_chan_ptr)); + + let builder_fn = {|| + let builder = task::builder(); + let opts = { + supervise: false, + notify_chan: none, + sched: + some({mode: task::manual_threads(1u), + native_stack_size: none }) + }; + task::set_opts(builder, opts); + builder + }; + unsafe { + log(debug, "before priv::chan_from_global_ptr"); + let msg_ch = priv::chan_from_global_ptr::( + global_loop_chan_ptr, + builder_fn) {|port| + + // the actual body of our global loop lives here + log(debug, "initialized global port task!"); + log(debug, "GLOBAL initialized global port task!"); + outer_global_loop_body(port, weak_task_body_cb); + }; + ret after_task_spawn_cb(msg_ch); + } +} + +unsafe fn outer_global_loop_body( + msg_po: comm::port, + weak_task_body_cb: fn~( + comm::port<()>, + comm::port, + *libc::c_void, + hl::high_level_msg) -> bool) { + // we're going to use a single libuv-generated loop ptr + // for the duration of the process + let loop_ptr = ll::loop_new(); + + // data structure for loop goes here.. + + // immediately weaken the task this is running in. + priv::weaken_task() {|weak_exit_po| + // when we first enter this loop, we're going + // to wait on stand-by to receive a request to + // fire-up the libuv loop + let mut continue = true; + while continue { + log(debug, "in outer_loop..."); + continue = either::either( + {|left_val| + // bail out.. + // if we catch this msg at this point, + // we should just be able to exit because + // the loop isn't active + log(debug, #fmt("weak_exit_po recv'd msg: %?", + left_val)); + false + }, {|right_val| + weak_task_body_cb(weak_exit_po, msg_po, loop_ptr, + right_val) + }, comm::select2(weak_exit_po, msg_po)); + log(debug,#fmt("GLOBAL LOOP EXITED, WAITING TO RESTART? %?", + continue)); + } + }; + + ll::loop_delete(loop_ptr); +} + +unsafe fn monitor_task_loop_body(weak_exit_po_in: comm::port<()>, + msg_po_in: comm::port, + loop_ptr: *libc::c_void, + -first_interaction: hl::high_level_msg) -> bool { + // resend the msg to be handled in the select2 loop below.. + comm::send(comm::chan(msg_po_in), first_interaction); + + // our async_handle + let async_handle_po = comm::port::<*ll::uv_async_t>(); + let async_handle_ch = comm::chan(async_handle_po); + + // the msg_po that libuv will be receiving on.. + let loop_msg_po = comm::port::(); + let loop_msg_po_ptr = ptr::addr_of(loop_msg_po); + let loop_msg_ch = comm::chan(loop_msg_po); + + // the question of whether unsupervising this will even do any + // good is there.. but since this'll go into blocking in libuv with + // a quickness.. any errors that occur (including inside crust) will + // be segfaults.. so yeah. + task::spawn_sched(task::manual_threads(1u)) {|| + let loop_msg_po_in = *loop_msg_po_ptr; + hl::run_high_level_loop( + loop_ptr, + loop_msg_po_in, // here the loop gets handed a different message + // port, as we'll be receiving all of the messages + // initially and then passing them on.. + // before_run + {|async_handle| + log(debug,#fmt("monitor gl: before_run: async_handle %?", + async_handle)); + // when this is ran, our async_handle is set up, so let's + // do an async_send with it.. letting the loop know, once it + // starts, that is has work + ll::async_send(async_handle); + comm::send(async_handle_ch, copy(async_handle)); + }, + // before_msg_drain + {|async_handle| + log(debug,#fmt("monitor gl: b4_msg_drain: async_handle %?", + async_handle)); + true + }, + // before_tear_down + {|async_handle| + log(debug,#fmt("monitor gl: b4_tear_down: async_handle %?", + async_handle)); + }); + }; + + // our loop is set up, so let's emit the handle back out to our users.. + let async_handle = comm::recv(async_handle_po); + // supposed to return a bool to indicate to the enclosing loop whether + // it should continue or not.. + let mut continue_inner_loop = true; + let mut didnt_get_hl_bailout = true; + while continue_inner_loop { + log(debug, "monitor task inner loop.. about to block on select2"); + continue_inner_loop = either::either( + {|left_val| + // bail out.. + log(debug, #fmt("monitor inner weak_exit_po recv'd msg: %?", + left_val)); + // TODO: make loop bail out + didnt_get_hl_bailout = false; + false + }, {|right_val| + // wake up our inner loop and pass it a msg.. + comm::send(loop_msg_ch, copy(right_val)); + ll::async_send(async_handle); + true + }, comm::select2(weak_exit_po_in, msg_po_in) + ) + } + didnt_get_hl_bailout +} + +unsafe fn single_task_loop_body(weak_exit_po_in: comm::port<()>, + msg_po_in: comm::port, + loop_ptr: *libc::c_void, + -first_interaction: hl::high_level_msg) -> bool { + // resend the msg + comm::send(comm::chan(msg_po_in), first_interaction); + + // black magic + let weak_exit_po_ptr = ptr::addr_of(weak_exit_po_in); + hl::run_high_level_loop( + loop_ptr, + msg_po_in, + // before_run + {|async_handle| + log(debug,#fmt("global_loop before_run: async_handle %?", + async_handle)); + // set the handle as the global + set_global_async_handle(0u as *ll::uv_async_t, + async_handle); + // when this is ran, our async_handle is set up, so let's + // do an async_send with it + ll::async_send(async_handle); + }, + // before_msg_drain + {|async_handle| + log(debug,#fmt("global_loop before_msg_drain: async_handle %?", + async_handle)); + let weak_exit_po = *weak_exit_po_ptr; + if(comm::peek(weak_exit_po)) { + // if this is true, immediately bail and return false, causing + // the libuv loop to start tearing down + log(debug,"got weak_exit meg inside libuv loop"); + comm::recv(weak_exit_po); + false + } + // if no weak_exit_po msg is received, then we'll let the + // loop continue + else { + true + } + }, + // before_tear_down + {|async_handle| + log(debug,#fmt("global_loop before_tear_down: async_handle %?", + async_handle)); + set_global_async_handle(async_handle, + 0 as *ll::uv_async_t); + }); + // supposed to return a bool to indicate to the enclosing loop whether + // it should continue or not.. + ret true; +} + +unsafe fn get_global_async_handle_native_representation() + -> *libc::uintptr_t { + ret rustrt::rust_uv_get_kernel_global_async_handle(); +} + +unsafe fn get_global_async_handle() -> *ll::uv_async_t { + ret (*get_global_async_handle_native_representation()) as *ll::uv_async_t; +} + +unsafe fn set_global_async_handle(old: *ll::uv_async_t, + new_ptr: *ll::uv_async_t) { + rustrt::rust_compare_and_swap_ptr( + get_global_async_handle_native_representation(), + old as libc::uintptr_t, + new_ptr as libc::uintptr_t); +} + +#[cfg(test)] +mod test { + crust fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) unsafe { + let exit_ch_ptr = ll::get_data_for_uv_handle( + timer_ptr as *libc::c_void) as *comm::chan; + let exit_ch = *exit_ch_ptr; + comm::send(exit_ch, true); + log(debug, #fmt("EXIT_CH_PTR simple_timer_close_cb exit_ch_ptr: %?", + exit_ch_ptr)); + } + crust fn simple_timer_cb(timer_ptr: *ll::uv_timer_t, + status: libc::c_int) unsafe { + log(debug, "in simple timer cb"); + ll::timer_stop(timer_ptr); + let hl_loop = get_gl(); + hl::interact(hl_loop) {|loop_ptr| + log(debug, "closing timer"); + //ll::close(timer_ptr as *libc::c_void, simple_timer_close_cb); + hl::unref_and_close(hl_loop, timer_ptr, simple_timer_close_cb); + log(debug, "about to deref exit_ch_ptr"); + log(debug, "after msg sent on deref'd exit_ch"); + }; + log(debug, "exiting simple timer cb"); + } + + fn impl_uv_hl_simple_timer(hl_loop: hl::high_level_loop) unsafe { + let exit_po = comm::port::(); + let exit_ch = comm::chan(exit_po); + let exit_ch_ptr = ptr::addr_of(exit_ch); + log(debug, #fmt("EXIT_CH_PTR newly created exit_ch_ptr: %?", + exit_ch_ptr)); + let timer_handle = ll::timer_t(); + let timer_ptr = ptr::addr_of(timer_handle); + hl::interact(hl_loop) {|loop_ptr| + log(debug, "user code inside interact loop!!!"); + let init_status = ll::timer_init(loop_ptr, timer_ptr); + if(init_status == 0i32) { + hl::ref(hl_loop, timer_ptr); + ll::set_data_for_uv_handle( + timer_ptr as *libc::c_void, + exit_ch_ptr as *libc::c_void); + let start_status = ll::timer_start(timer_ptr, simple_timer_cb, + 1u, 0u); + if(start_status == 0i32) { + } + else { + fail "failure on ll::timer_start()"; + } + } + else { + fail "failure on ll::timer_init()"; + } + }; + comm::recv(exit_po); + log(debug, "global_loop timer test: msg recv on exit_po, done.."); + } + #[test] + #[ignore(cfg(target_os = "freebsd"))] + fn test_uv_global_loop_high_level_global_timer() unsafe { + let hl_loop = get_gl(); + task::spawn_sched(task::manual_threads(1u), {|| + impl_uv_hl_simple_timer(hl_loop); + }); + impl_uv_hl_simple_timer(hl_loop); + } +} \ No newline at end of file diff --git a/src/libstd/uv_hl.rs b/src/libstd/uv_hl.rs index c716f7a306af4..8787b650b2515 100644 --- a/src/libstd/uv_hl.rs +++ b/src/libstd/uv_hl.rs @@ -6,98 +6,506 @@ provide a high-level, abstracted interface to some set of libuv functionality. "]; -import ll = uv_ll; +export high_level_loop, hl_loop_ext, high_level_msg; +export run_high_level_loop, interact, ref, unref, unref_and_close; -export high_level_loop; -export interact, prepare_loop; +import ll = uv_ll; #[doc = " Used to abstract-away direct interaction with a libuv loop. -# Fields +# Arguments -* async_handle - a pointer to a uv_async_t struct used to 'poke' +* async_handle - a pointer to a pointer to a uv_async_t struct used to 'poke' the C uv loop to process any pending callbacks * op_chan - a channel used to send function callbacks to be processed by the C uv loop "] -type high_level_loop = { - async_handle: *ll::uv_async_t, - op_chan: comm::chan -}; +enum high_level_loop { + simple_task_loop({ + async_handle: *ll::uv_async_t, + op_chan: comm::chan + }), + single_task_loop({ + async_handle: **ll::uv_async_t, + op_chan: comm::chan + }), + monitor_task_loop({ + op_chan: comm::chan + }) +} + +impl hl_loop_ext for high_level_loop { + fn async_handle() -> **ll::uv_async_t { + alt self { + single_task_loop({async_handle, op_chan}) { + ret async_handle; + } + _ { + fail "variant of hl::high_level_loop that doesn't include" + + "an async_handle field"; + } + } + } + fn op_chan() -> comm::chan { + alt self { + single_task_loop({async_handle, op_chan}) { + ret op_chan; + } + monitor_task_loop({op_chan}) { + ret op_chan; + } + simple_task_loop({async_handle, op_chan}) { + ret op_chan; + } + } + } +} + +#[doc=" +Represents the range of interactions with a `high_level_loop` +"] +enum high_level_msg { + interaction (fn~(*libc::c_void)), + ref_handle (*libc::c_void), + manual_unref_handle (*libc::c_void, option<*u8>), + tear_down +} #[doc = " -Pass in a callback to be processed on the running libuv loop's thread +Given a vanilla `uv_loop_t*` + +# Arguments + +* loop_ptr - a pointer to a currently unused libuv loop. Its `data` field +will be overwritten before the loop begins +must be a pointer to a clean rust `uv_async_t` record +* msg_po - an active port that receives `high_level_msg`s +* before_run - a unique closure that is invoked after `uv_async_init` is +called on the `async_handle` passed into this callback, just before `uv_run` +is called on the provided `loop_ptr` +* before_msg_drain - a unique closure that is invoked every time the loop is +awoken, but before the port pointed to in the `msg_po` argument is drained +* before_tear_down - called just before the loop invokes `uv_close()` on the +provided `async_handle`. `uv_run` should return shortly after +"] +unsafe fn run_high_level_loop(loop_ptr: *libc::c_void, + msg_po: comm::port, + before_run: fn~(*ll::uv_async_t), + before_msg_drain: fn~(*ll::uv_async_t) -> bool, + before_tear_down: fn~(*ll::uv_async_t)) { + // set up the special async handle we'll use to allow multi-task + // communication with this loop + let async = ll::async_t(); + let async_handle = ptr::addr_of(async); + // associate the async handle with the loop + ll::async_init(loop_ptr, async_handle, high_level_wake_up_cb); -# Fields + // initialize our loop data and store it in the loop + let data: hl_loop_data = default_gl_data({ + async_handle: async_handle, + mut active: true, + before_msg_drain: before_msg_drain, + before_tear_down: before_tear_down, + msg_po_ptr: ptr::addr_of(msg_po), + mut refd_handles: [mut], + mut unrefd_handles: [mut] + }); + let data_ptr = ptr::addr_of(data); + ll::set_data_for_uv_handle(async_handle, data_ptr); -* a_loop - a high_level_loop record that represents a channel of -communication with an active libuv loop running on a thread -somwhere in the current process + // call before_run + before_run(async_handle); + + log(debug, "about to run high level loop"); + // enter the loop... this blocks until the loop is done.. + ll::run(loop_ptr); + log(debug, "high-level loop ended"); +} + +#[doc = " +Provide a callback to be processed by `a_loop` +The primary way to do operations again a running `high_level_loop` that +doesn't involve creating a uv handle via `safe_handle` + +# Arguments + +* a_loop - a `high_level_loop` that you want to do operations against * cb - a function callback to be processed on the running loop's thread. The only parameter is an opaque pointer to the running -uv_loop_t. You can use this pointer to initiate or continue any -operations against the loop +uv_loop_t. In the context of this callback, it is safe to use this pointer +to do various uv_* API calls. _DO NOT_ send this pointer out via ports/chans "] unsafe fn interact(a_loop: high_level_loop, -cb: fn~(*libc::c_void)) { - comm::send(a_loop.op_chan, cb); - ll::async_send(a_loop.async_handle); + send_high_level_msg(a_loop, interaction(cb)); } -#[doc = " -Prepares a clean, inactive uv_loop_t* to be used with any of the -functions in the `uv::hl` module. - -Library developers can use this function to prepare a given -`uv_loop_t*`, whose lifecycle they manage, to be used, ran -and controlled with the tools in this module. +iface uv_handle_manager { + fn init() -> T; +} -After this is ran against a loop, a library developer can run -the loop in its own thread and then use the returned -`high_level_loop` to interact with it. +type safe_handle_fields = { + hl_loop: high_level_loop, + handle: T, + close_cb: *u8 +}; -# Fields +/*fn safe_handle(a_loop: high_level_loop, + handle_val: T, + handle_init_cb: fn~(*libc::c_void, *T), + close_cb: *u8) { -* loop_ptr - a pointer to a newly created `uv_loop_t*` with no -handles registered (this will interfere with the internal lifecycle -management this module provides). Ideally, this should be called -immediately after using `uv::ll::loop_new()` +resource safe_handle_container(handle_fields: safe_handle_fields) { +} +}*/ -# Returns -A `high_level_loop` record that can be used to interact with the -loop (after you use `uv::ll::run()` on the `uv_loop_t*`, of course +#[doc=" +Needs to be encapsulated within `safe_handle` +"] +fn ref(hl_loop: high_level_loop, handle: *T) unsafe { + send_high_level_msg(hl_loop, ref_handle(handle as *libc::c_void)); +} +#[doc=" +Needs to be encapsulated within `safe_handle` "] -unsafe fn prepare_loop(loop_ptr: *libc::c_void) - -> high_level_loop { - // will probably need to stake out a data record - // here, as well, to keep whatever state we want to - // use with the loop +fn unref(hl_loop: high_level_loop, handle: *T) unsafe { + send_high_level_msg(hl_loop, manual_unref_handle(handle as *libc::c_void, + none)); +} +fn unref_and_close(hl_loop: high_level_loop, handle: *T, cb: *u8) unsafe { + send_high_level_msg(hl_loop, manual_unref_handle(handle as *libc::c_void, + some(cb))); +} - // move this into a malloc - let async = ll::async_t(); - let async_ptr = ptr::addr_of(async); - let op_port = comm::port::(); - let async_result = ll::async_init(loop_ptr, - async_ptr, - interact_poke); - if (async_result != 0i32) { - fail ll::get_last_err_info(loop_ptr); - } - // need to store the port and async_ptr in the top-level - // of the provided loop .. - ret { async_handle: async_ptr, - op_chan: comm::chan::(op_port) - }; +// INTERNAL API + +// data that lives for the lifetime of the high-evel oo +enum hl_loop_data { + default_gl_data({ + async_handle: *ll::uv_async_t, + mut active: bool, + before_msg_drain: fn~(*ll::uv_async_t) -> bool, + before_tear_down: fn~(*ll::uv_async_t), + msg_po_ptr: *comm::port, + mut refd_handles: [mut *libc::c_void], + mut unrefd_handles: [mut *libc::c_void]}) +} + +unsafe fn send_high_level_msg(hl_loop: high_level_loop, + -msg: high_level_msg) unsafe { + comm::send(hl_loop.op_chan(), msg); + + // if the global async handle == 0, then that means + // the loop isn't active, so we don't need to wake it up, + // (the loop's enclosing task should be blocking on a message + // receive on this port) + alt hl_loop { + single_task_loop({async_handle, op_chan}) { + if ((*async_handle) != 0 as *ll::uv_async_t) { + log(debug,"global async handle != 0, waking up loop.."); + ll::async_send((*async_handle)); + } + else { + log(debug,"GLOBAL ASYNC handle == 0"); + } + } + simple_task_loop({async_handle, op_chan}) { + log(debug,"simple async handle != 0, waking up loop.."); + ll::async_send((async_handle)); + } + _ {} + } } -// this will be invoked by a called to uv::hl::interact(), so -// we'll drain the port of pending callbacks, processing each -crust fn interact_poke(async_handle: *libc::c_void) { +// this will be invoked by a call to uv::hl::interact() with +// the high_level_loop corresponding to this async_handle. We +// simply check if the loop is active and, if so, invoke the +// user-supplied on_wake callback that is stored in the loop's +// data member +crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t, + status: int) unsafe { // nothing here, yet. - log(debug, #fmt("interact_poke crust.. handle: %?", - async_handle)); -} \ No newline at end of file + log(debug, #fmt("high_level_wake_up_cb crust.. handle: %? status: %?", + async_handle, status)); + let loop_ptr = ll::get_loop_for_uv_handle(async_handle); + let data = ll::get_data_for_uv_handle(async_handle) as *hl_loop_data; + // we check to see if the loop is "active" (the loop is set to + // active = false the first time we realize we need to 'tear down', + // set subsequent calls to the global async handle may be triggered + // before all of the uv_close() calls are processed and loop exits + // on its own. So if the loop isn't active, we won't run the user's + // on_wake callback (and, consequently, let messages pile up, probably + // in the loops msg_po) + if (*data).active { + log(debug, "before on_wake"); + let mut do_msg_drain = (*data).before_msg_drain(async_handle); + let mut continue = true; + if do_msg_drain { + let msg_po = *((*data).msg_po_ptr); + if comm::peek(msg_po) { + // if this is true, we'll iterate over the + // msgs waiting in msg_po until there's no more + log(debug,"got msg_po"); + while(continue) { + log(debug,"before alt'ing on high_level_msg"); + alt comm::recv(msg_po) { + interaction(cb) { + log(debug,"got interaction, before cb.."); + // call it.. + cb(loop_ptr); + log(debug,"after calling cb"); + } + ref_handle(handle) { + high_level_ref(data, handle); + } + manual_unref_handle(handle, user_close_cb) { + high_level_unref(data, handle, true, user_close_cb); + } + tear_down { + log(debug,"incoming hl_msg: got tear_down"); + } + } + continue = comm::peek(msg_po); + } + } + else { + log(debug, "in hl wake_cb, no pending messages"); + } + } + log(debug, #fmt("after on_wake, continue? %?", continue)); + if !do_msg_drain { + high_level_tear_down(data); + } + } +} + +crust fn tear_down_close_cb(handle: *ll::uv_async_t) unsafe { + log(debug, #fmt("tear_down_close_cb called, closing handle at %?", + handle)); + let data = ll::get_data_for_uv_handle(handle) as *hl_loop_data; + if vec::len((*data).refd_handles) > 0u { + fail "Didn't unref all high-level handles"; + } +} + +fn high_level_tear_down(data: *hl_loop_data) unsafe { + log(debug, "high_level_tear_down() called, close async_handle"); + // call user-suppled before_tear_down cb + let async_handle = (*data).async_handle; + (*data).before_tear_down(async_handle); + ll::close(async_handle as *libc::c_void, tear_down_close_cb); +} + +unsafe fn high_level_ref(data: *hl_loop_data, handle: *libc::c_void) { + log(debug,"incoming hl_msg: got ..ref_handle"); + let mut refd_handles = (*data).refd_handles; + let mut unrefd_handles = (*data).unrefd_handles; + let handle_already_refd = refd_handles.contains(handle); + if handle_already_refd { + fail "attempt to do a high-level ref an already ref'd handle"; + } + let handle_already_unrefd = unrefd_handles.contains(handle); + // if we are ref'ing a handle (by ptr) that was already unref'd, + // probably + if handle_already_unrefd { + let last_idx = vec::len(unrefd_handles) - 1u; + let handle_idx = vec::position_elem(unrefd_handles, handle); + alt handle_idx { + none { + fail "trying to remove handle that isn't in unrefd_handles"; + } + some(idx) { + unrefd_handles[idx] <-> unrefd_handles[last_idx]; + vec::pop(unrefd_handles); + } + } + (*data).unrefd_handles = unrefd_handles; + } + refd_handles += [handle]; + (*data).refd_handles = refd_handles; +} + +unsafe fn high_level_unref(data: *hl_loop_data, handle: *libc::c_void, + manual_unref: bool, user_close_cb: option<*u8>) { + log(debug,"incoming hl_msg: got auto_unref_handle"); + let mut refd_handles = (*data).refd_handles; + let mut unrefd_handles = (*data).unrefd_handles; + log(debug, #fmt("refs: %?, unrefs %? handle %?", vec::len(refd_handles), + vec::len(unrefd_handles), handle)); + let handle_already_refd = refd_handles.contains(handle); + if !handle_already_refd { + fail "attempting to high-level unref an untracked handle"; + } + let double_unref = unrefd_handles.contains(handle); + if double_unref { + log(debug, "double unref encountered"); + if manual_unref { + // will allow a user to manual unref, but only signal + // a fail when a double-unref is caused by a user + fail "attempting to high-level unref an unrefd handle"; + } + else { + log(debug, "not failing..."); + } + } + else { + log(debug, "attempting to unref handle"); + alt user_close_cb { + some(cb) { + ll::close(handle, cb); + } + none { } + } + let last_idx = vec::len(refd_handles) - 1u; + let handle_idx = vec::position_elem(refd_handles, handle); + alt handle_idx { + none { + fail "trying to remove handle that isn't in refd_handles"; + } + some(idx) { + refd_handles[idx] <-> refd_handles[last_idx]; + vec::pop(refd_handles); + } + } + (*data).refd_handles = refd_handles; + unrefd_handles += [handle]; + (*data).unrefd_handles = unrefd_handles; + if vec::len(refd_handles) == 0u { + log(debug, "0 referenced handles, start loop teardown"); + high_level_tear_down(data); + } + else { + log(debug, "more than 0 referenced handles"); + } + } + +} +#[cfg(test)] +mod test { + crust fn async_close_cb(handle: *ll::uv_async_t) unsafe { + log(debug, #fmt("async_close_cb handle %?", handle)); + let exit_ch = (*(ll::get_data_for_uv_handle(handle) + as *ah_data)).exit_ch; + comm::send(exit_ch, ()); + } + crust fn async_handle_cb(handle: *ll::uv_async_t, status: libc::c_int) + unsafe { + log(debug, #fmt("async_handle_cb handle %? status %?",handle,status)); + let hl_loop = (*(ll::get_data_for_uv_handle(handle) + as *ah_data)).hl_loop; + unref_and_close(hl_loop, handle, async_close_cb); + } + type ah_data = { + hl_loop: high_level_loop, + exit_ch: comm::chan<()> + }; + fn impl_uv_hl_async(hl_loop: high_level_loop) unsafe { + let async_handle = ll::async_t(); + let ah_ptr = ptr::addr_of(async_handle); + let exit_po = comm::port::<()>(); + let exit_ch = comm::chan(exit_po); + let ah_data = { + hl_loop: hl_loop, + exit_ch: exit_ch + }; + let ah_data_ptr = ptr::addr_of(ah_data); + interact(hl_loop) {|loop_ptr| + ref(hl_loop, ah_ptr); + ll::async_init(loop_ptr, ah_ptr, async_handle_cb); + ll::set_data_for_uv_handle(ah_ptr, ah_data_ptr as *libc::c_void); + ll::async_send(ah_ptr); + }; + comm::recv(exit_po); + } + + // this fn documents the bear minimum neccesary to roll your own + // high_level_loop + unsafe fn spawn_test_loop(exit_ch: comm::chan<()>) -> high_level_loop { + let hl_loop_port = comm::port::(); + let hl_loop_ch = comm::chan(hl_loop_port); + task::spawn_sched(task::manual_threads(1u)) {|| + let loop_ptr = ll::loop_new(); + let msg_po = comm::port::(); + let msg_ch = comm::chan(msg_po); + run_high_level_loop( + loop_ptr, + msg_po, + // before_run + {|async_handle| + log(debug,#fmt("hltest before_run: async_handle %?", + async_handle)); + // do an async_send with it + ll::async_send(async_handle); + comm::send(hl_loop_ch, simple_task_loop({ + async_handle: async_handle, + op_chan: msg_ch + })); + }, + // before_msg_drain + {|async_handle| + log(debug,#fmt("hltest before_msg_drain: async_handle %?", + async_handle)); + true + }, + // before_tear_down + {|async_handle| + log(debug,#fmt("hl test_loop b4_tear_down: async %?", + async_handle)); + }); + ll::loop_delete(loop_ptr); + comm::send(exit_ch, ()); + }; + ret comm::recv(hl_loop_port); + } + + crust fn lifetime_handle_close(handle: *libc::c_void) unsafe { + log(debug, #fmt("lifetime_handle_close ptr %?", handle)); + } + + crust fn lifetime_async_callback(handle: *libc::c_void, + status: libc::c_int) { + log(debug, #fmt("lifetime_handle_close ptr %? status %?", + handle, status)); + } + + #[test] + #[ignore(cfg(target_os = "freebsd"))] + fn test_uv_hl_async() unsafe { + let exit_po = comm::port::<()>(); + let exit_ch = comm::chan(exit_po); + let hl_loop = spawn_test_loop(exit_ch); + + // using this handle to manage the lifetime of the high_level_loop, + // as it will exit the first time one of the impl_uv_hl_async() is + // cleaned up with no one ref'd handles on the loop (Which can happen + // under race-condition type situations.. this ensures that the loop + // lives until, at least, all of the impl_uv_hl_async() runs have been + // called, at least. + let lifetime_handle = ll::async_t(); + let lifetime_handle_ptr = ptr::addr_of(lifetime_handle); + interact(hl_loop) {|loop_ptr| + ref(hl_loop, lifetime_handle_ptr); + ll::async_init(loop_ptr, lifetime_handle_ptr, + lifetime_async_callback); + }; + + iter::repeat(7u) {|| + task::spawn_sched(task::manual_threads(1u), {|| + impl_uv_hl_async(hl_loop); + }); + }; + impl_uv_hl_async(hl_loop); + impl_uv_hl_async(hl_loop); + impl_uv_hl_async(hl_loop); + interact(hl_loop) {|loop_ptr| + ll::close(lifetime_handle_ptr, lifetime_handle_close); + unref(hl_loop, lifetime_handle_ptr); + log(debug, "close and unref lifetime handle"); + }; + comm::recv(exit_po); + } +} diff --git a/src/libstd/uv_ll.rs b/src/libstd/uv_ll.rs index 85da16823c0ef..ff5c618cdffc7 100644 --- a/src/libstd/uv_ll.rs +++ b/src/libstd/uv_ll.rs @@ -201,6 +201,36 @@ type uv_async_t = { a12: *u8 }; +// 64bit unix size: 128 +// 32bit unix size: 84 +#[cfg(target_os = "linux")] +#[cfg(target_os = "macos")] +#[cfg(target_os = "freebsd")] +type uv_timer_t = { + fields: uv_handle_fields, + a00: *u8, a01: *u8, a02: *u8, a03: *u8, + a04: *u8, a05: *u8, a06: *u8, a07: *u8, + a08: *u8, a09: *u8, + a11: uv_timer_t_32bit_unix_riders +}; +#[cfg(target_arch="x86_64")] +type uv_timer_t_32bit_unix_riders = { + a10: *u8, a11: *u8 +}; +#[cfg(target_arch="x86")] +type uv_timer_t_32bit_unix_riders = { + a10: *u8, a11: *u8, a12: *u8, a13: *u8, + a14: *u8, a15: *u8, a16: *u8 +}; +// win32 size: 64 +#[cfg(target_os = "win32")] +type uv_timer_t = { + fields: uv_handle_fields, + a00: *u8, a01: *u8, a02: *u8, a03: *u8, + a04: *u8, a05: *u8, a06: *u8, a07: *u8, + a08: *u8, a09: *u8, a10: *u8, a11: *u8 +}; + // unix size: 16 #[cfg(target_os = "linux")] #[cfg(target_os = "macos")] @@ -379,6 +409,58 @@ mod uv_ll_struct_stubgen { #[cfg(target_os = "linux")] #[cfg(target_os = "macos")] #[cfg(target_os = "freebsd")] + fn gen_stub_uv_timer_t() -> uv_timer_t { + ret gen_stub_arch(); + #[cfg(target_arch = "x86_64")] + fn gen_stub_arch() -> uv_timer_t { + ret { fields: { loop_handle: ptr::null(), type_: 0u32, + close_cb: ptr::null(), + mut data: ptr::null() }, + a00: 0 as *u8, a01: 0 as *u8, a02: 0 as *u8, + a03: 0 as *u8, + a04: 0 as *u8, a05: 0 as *u8, a06: 0 as *u8, + a07: 0 as *u8, + a08: 0 as *u8, a09: 0 as *u8, + a11: { + a10: 0 as *u8, a11: 0 as *u8 + } + }; + } + #[cfg(target_arch = "x86")] + fn gen_stub_arch() -> uv_timer_t { + ret { fields: { loop_handle: ptr::null(), type_: 0u32, + close_cb: ptr::null(), + mut data: ptr::null() }, + a00: 0 as *u8, a01: 0 as *u8, a02: 0 as *u8, + a03: 0 as *u8, + a04: 0 as *u8, a05: 0 as *u8, a06: 0 as *u8, + a07: 0 as *u8, + a08: 0 as *u8, a09: 0 as *u8, + a11: { + a10: 0 as *u8, a11: 0 as *u8, + a12: 0 as *u8, a13: 0 as *u8, + a14: 0 as *u8, a15: 0 as *u8, + a16: 0 as *u8 + } + }; + } + } + #[cfg(target_os = "win32")] + fn gen_stub_uv_timer_t() -> uv_timer_t { + ret { fields: { loop_handle: ptr::null(), type_: 0u32, + close_cb: ptr::null(), + mut data: ptr::null() }, + a00: 0 as *u8, a01: 0 as *u8, a02: 0 as *u8, + a03: 0 as *u8, + a04: 0 as *u8, a05: 0 as *u8, a06: 0 as *u8, + a07: 0 as *u8, + a08: 0 as *u8, a09: 0 as *u8, a10: 0 as *u8, + a11: 0 as *u8 + }; + } + #[cfg(target_os = "linux")] + #[cfg(target_os = "macos")] + #[cfg(target_os = "freebsd")] fn gen_stub_uv_write_t() -> uv_write_t { ret gen_stub_arch(); #[cfg(target_arch="x86_64")] @@ -467,10 +549,18 @@ native mod rustrt { fn rust_uv_read_start(stream: *libc::c_void, on_alloc: *u8, on_read: *u8) -> libc::c_int; fn rust_uv_read_stop(stream: *libc::c_void) -> libc::c_int; + fn rust_uv_timer_init(loop_handle: *libc::c_void, + timer_handle: *uv_timer_t) -> libc::c_int; + fn rust_uv_timer_start( + timer_handle: *uv_timer_t, + cb: *u8, + timeout: libc::c_uint, + repeat: libc::c_uint) -> libc::c_int; + fn rust_uv_timer_stop(handle: *uv_timer_t) -> libc::c_int; + + // data accessors/helpers for rust-mapped uv structs fn rust_uv_malloc_buf_base_of(sug_size: libc::size_t) -> *u8; fn rust_uv_free_base_of_buf(++buf: uv_buf_t); - - // data accessors for rust-mapped uv structs fn rust_uv_get_stream_handle_from_connect_req( connect_req: *uv_connect_t) -> *uv_stream_t; @@ -479,6 +569,9 @@ native mod rustrt { -> *uv_stream_t; fn rust_uv_get_loop_for_uv_handle(handle: *libc::c_void) -> *libc::c_void; + fn rust_uv_get_data_for_uv_loop(loop_ptr: *libc::c_void) -> *libc::c_void; + fn rust_uv_set_data_for_uv_loop(loop_ptr: *libc::c_void, + data: *libc::c_void); fn rust_uv_get_data_for_uv_handle(handle: *libc::c_void) -> *libc::c_void; fn rust_uv_set_data_for_uv_handle(handle: *libc::c_void, @@ -488,6 +581,16 @@ native mod rustrt { data: *libc::c_void); fn rust_uv_get_base_from_buf(++buf: uv_buf_t) -> *u8; fn rust_uv_get_len_from_buf(++buf: uv_buf_t) -> libc::size_t; + + // sizeof testing helpers + fn rust_uv_helper_uv_tcp_t_size() -> libc::c_uint; + fn rust_uv_helper_uv_connect_t_size() -> libc::c_uint; + fn rust_uv_helper_uv_buf_t_size() -> libc::c_uint; + fn rust_uv_helper_uv_write_t_size() -> libc::c_uint; + fn rust_uv_helper_uv_err_t_size() -> libc::c_uint; + fn rust_uv_helper_sockaddr_in_size() -> libc::c_uint; + fn rust_uv_helper_uv_async_t_size() -> libc::c_uint; + fn rust_uv_helper_uv_timer_t_size() -> libc::c_uint; } unsafe fn loop_new() -> *libc::c_void { @@ -502,8 +605,8 @@ unsafe fn run(loop_handle: *libc::c_void) { rustrt::rust_uv_run(loop_handle); } -unsafe fn close(handle: *libc::c_void, cb: *u8) { - rustrt::rust_uv_close(handle, cb); +unsafe fn close(handle: *T, cb: *u8) { + rustrt::rust_uv_close(handle as *libc::c_void, cb); } unsafe fn tcp_init(loop_handle: *libc::c_void, handle: *uv_tcp_t) @@ -577,6 +680,50 @@ unsafe fn async_init(loop_handle: *libc::c_void, unsafe fn async_send(async_handle: *uv_async_t) { ret rustrt::rust_uv_async_send(async_handle); } +unsafe fn buf_init(++input: *u8, len: uint) -> uv_buf_t { + let out_buf = { base: ptr::null(), len: 0 as libc::size_t }; + let out_buf_ptr = ptr::addr_of(out_buf); + log(debug, #fmt("buf_init - input %u len %u out_buf: %u", + input as uint, + len as uint, + out_buf_ptr as uint)); + // yuck :/ + rustrt::rust_uv_buf_init(out_buf_ptr, input, len); + //let result = rustrt::rust_uv_buf_init_2(input, len); + log(debug, "after rust_uv_buf_init"); + let res_base = get_base_from_buf(out_buf); + let res_len = get_len_from_buf(out_buf); + //let res_base = get_base_from_buf(result); + log(debug, #fmt("buf_init - result %u len %u", + res_base as uint, + res_len as uint)); + ret out_buf; + //ret result; +} +unsafe fn ip4_addr(ip: str, port: int) +-> sockaddr_in { + let mut addr_vec = str::bytes(ip); + addr_vec += [0u8]; // add null terminator + let addr_vec_ptr = vec::unsafe::to_ptr(addr_vec); + let ip_back = str::from_bytes(addr_vec); + log(debug, #fmt("vec val: '%s' length: %u", + ip_back, vec::len(addr_vec))); + ret rustrt::rust_uv_ip4_addr(addr_vec_ptr, + port as libc::c_int); +} + +unsafe fn timer_init(loop_ptr: *libc::c_void, + timer_ptr: *uv_timer_t) -> libc::c_int { + ret rustrt::rust_uv_timer_init(loop_ptr, timer_ptr); +} +unsafe fn timer_start(timer_ptr: *uv_timer_t, cb: *u8, timeout: uint, + repeat: uint) -> libc::c_int { + ret rustrt::rust_uv_timer_start(timer_ptr, cb, timeout as libc::c_uint, + repeat as libc::c_uint); +} +unsafe fn timer_stop(timer_ptr: *uv_timer_t) -> libc::c_int { + ret rustrt::rust_uv_timer_stop(timer_ptr); +} // libuv struct initializers unsafe fn tcp_t() -> uv_tcp_t { @@ -591,9 +738,14 @@ unsafe fn write_t() -> uv_write_t { unsafe fn async_t() -> uv_async_t { ret uv_ll_struct_stubgen::gen_stub_uv_async_t(); } -unsafe fn get_loop_for_uv_handle(handle: *libc::c_void) +unsafe fn timer_t() -> uv_timer_t { + ret uv_ll_struct_stubgen::gen_stub_uv_timer_t(); +} + +// data access helpers +unsafe fn get_loop_for_uv_handle(handle: *T) -> *libc::c_void { - ret rustrt::rust_uv_get_loop_for_uv_handle(handle); + ret rustrt::rust_uv_get_loop_for_uv_handle(handle as *libc::c_void); } unsafe fn get_stream_handle_from_connect_req(connect: *uv_connect_t) -> *uv_stream_t { @@ -606,13 +758,19 @@ unsafe fn get_stream_handle_from_write_req( ret rustrt::rust_uv_get_stream_handle_from_write_req( write_req); } - -unsafe fn get_data_for_uv_handle(handle: *libc::c_void) -> *libc::c_void { - ret rustrt::rust_uv_get_data_for_uv_handle(handle); +unsafe fn get_data_for_uv_loop(loop_ptr: *libc::c_void) -> *libc::c_void { + rustrt::rust_uv_get_data_for_uv_loop(loop_ptr) } -unsafe fn set_data_for_uv_handle(handle: *libc::c_void, - data: *libc::c_void) { - rustrt::rust_uv_set_data_for_uv_handle(handle, data); +unsafe fn set_data_for_uv_loop(loop_ptr: *libc::c_void, data: *libc::c_void) { + rustrt::rust_uv_set_data_for_uv_loop(loop_ptr, data); +} +unsafe fn get_data_for_uv_handle(handle: *T) -> *libc::c_void { + ret rustrt::rust_uv_get_data_for_uv_handle(handle as *libc::c_void); +} +unsafe fn set_data_for_uv_handle(handle: *T, + data: *U) { + rustrt::rust_uv_set_data_for_uv_handle(handle as *libc::c_void, + data as *libc::c_void); } unsafe fn get_data_for_req(req: *libc::c_void) -> *libc::c_void { ret rustrt::rust_uv_get_data_for_req(req); @@ -627,37 +785,6 @@ unsafe fn get_base_from_buf(buf: uv_buf_t) -> *u8 { unsafe fn get_len_from_buf(buf: uv_buf_t) -> libc::size_t { ret rustrt::rust_uv_get_len_from_buf(buf); } -unsafe fn buf_init(++input: *u8, len: uint) -> uv_buf_t { - let out_buf = { base: ptr::null(), len: 0 as libc::size_t }; - let out_buf_ptr = ptr::addr_of(out_buf); - log(debug, #fmt("ll::buf_init - input %u len %u out_buf: %u", - input as uint, - len as uint, - out_buf_ptr as uint)); - // yuck :/ - rustrt::rust_uv_buf_init(out_buf_ptr, input, len); - //let result = rustrt::rust_uv_buf_init_2(input, len); - log(debug, "after rust_uv_buf_init"); - let res_base = get_base_from_buf(out_buf); - let res_len = get_len_from_buf(out_buf); - //let res_base = get_base_from_buf(result); - log(debug, #fmt("ll::buf_init - result %u len %u", - res_base as uint, - res_len as uint)); - ret out_buf; - //ret result; -} -unsafe fn ip4_addr(ip: str, port: int) --> sockaddr_in { - let mut addr_vec = str::bytes(ip); - addr_vec += [0u8]; // add null terminator - let addr_vec_ptr = vec::unsafe::to_ptr(addr_vec); - let ip_back = str::from_bytes(addr_vec); - log(debug, #fmt("vec val: '%s' length: %u", - ip_back, vec::len(addr_vec))); - ret rustrt::rust_uv_ip4_addr(addr_vec_ptr, - port as libc::c_int); -} unsafe fn malloc_buf_base_of(suggested_size: libc::size_t) -> *u8 { ret rustrt::rust_uv_malloc_buf_base_of(suggested_size); @@ -673,4 +800,602 @@ unsafe fn get_last_err_info(uv_loop: *libc::c_void) -> str { let err_msg = str::unsafe::from_c_str(strerror(err_ptr)); ret #fmt("LIBUV ERROR: name: %s msg: %s", err_name, err_msg); +} + +#[cfg(test)] +mod test { + enum tcp_read_data { + tcp_read_eof, + tcp_read_more([u8]), + tcp_read_error + } + + type request_wrapper = { + write_req: *uv_write_t, + req_buf: *[uv_buf_t], + read_chan: *comm::chan + }; + + crust fn after_close_cb(handle: *libc::c_void) { + log(debug, #fmt("after uv_close! handle ptr: %?", + handle)); + } + + crust fn on_alloc_cb(handle: *libc::c_void, + ++suggested_size: libc::size_t) + -> uv_buf_t unsafe { + log(debug, "on_alloc_cb!"); + let char_ptr = malloc_buf_base_of(suggested_size); + log(debug, #fmt("on_alloc_cb h: %? char_ptr: %u sugsize: %u", + handle, + char_ptr as uint, + suggested_size as uint)); + ret buf_init(char_ptr, suggested_size); + } + + crust fn on_read_cb(stream: *uv_stream_t, + nread: libc::ssize_t, + ++buf: uv_buf_t) unsafe { + log(debug, #fmt("CLIENT entering on_read_cb nred: %d", nread)); + if (nread > 0) { + // we have data + log(debug, #fmt("CLIENT read: data! nread: %d", nread)); + read_stop(stream); + let client_data = + get_data_for_uv_handle(stream as *libc::c_void) + as *request_wrapper; + let buf_base = get_base_from_buf(buf); + let buf_len = get_len_from_buf(buf); + let bytes = vec::unsafe::from_buf(buf_base, buf_len); + let read_chan = *((*client_data).read_chan); + let msg_from_server = str::from_bytes(bytes); + comm::send(read_chan, msg_from_server); + close(stream as *libc::c_void, after_close_cb) + } + else if (nread == -1) { + // err .. possibly EOF + log(debug, "read: eof!"); + } + else { + // nread == 0 .. do nothing, just free buf as below + log(debug, "read: do nothing!"); + } + // when we're done + free_base_of_buf(buf); + log(debug, "CLIENT exiting on_read_cb"); + } + + crust fn on_write_complete_cb(write_req: *uv_write_t, + status: libc::c_int) unsafe { + log(debug, #fmt("CLIENT beginning on_write_complete_cb status: %d", + status as int)); + let stream = get_stream_handle_from_write_req(write_req); + log(debug, #fmt("CLIENT on_write_complete_cb: tcp:%d write_handle:%d", + stream as int, write_req as int)); + let result = read_start(stream, on_alloc_cb, on_read_cb); + log(debug, #fmt("CLIENT ending on_write_complete_cb .. status: %d", + result as int)); + } + + crust fn on_connect_cb(connect_req_ptr: *uv_connect_t, + status: libc::c_int) unsafe { + log(debug, #fmt("beginning on_connect_cb .. status: %d", + status as int)); + let stream = + get_stream_handle_from_connect_req(connect_req_ptr); + if (status == 0i32) { + log(debug, "on_connect_cb: in status=0 if.."); + let client_data = get_data_for_req( + connect_req_ptr as *libc::c_void) + as *request_wrapper; + let write_handle = (*client_data).write_req as *libc::c_void; + log(debug, #fmt("on_connect_cb: tcp: %d write_hdl: %d", + stream as int, write_handle as int)); + let write_result = write(write_handle, + stream as *libc::c_void, + (*client_data).req_buf, + on_write_complete_cb); + log(debug, #fmt("on_connect_cb: write() status: %d", + write_result as int)); + } + else { + let test_loop = get_loop_for_uv_handle( + stream as *libc::c_void); + let err_msg = get_last_err_info(test_loop); + log(debug, err_msg); + assert false; + } + log(debug, "finishing on_connect_cb"); + } + + fn impl_uv_tcp_request(ip: str, port: int, req_str: str, + client_chan: *comm::chan) unsafe { + let test_loop = loop_new(); + let tcp_handle = tcp_t(); + let tcp_handle_ptr = ptr::addr_of(tcp_handle); + let connect_handle = connect_t(); + let connect_req_ptr = ptr::addr_of(connect_handle); + + // this is the persistent payload of data that we + // need to pass around to get this example to work. + // In C, this would be a malloc'd or stack-allocated + // struct that we'd cast to a void* and store as the + // data field in our uv_connect_t struct + let req_str_bytes = str::bytes(req_str); + let req_msg_ptr: *u8 = vec::unsafe::to_ptr(req_str_bytes); + log(debug, #fmt("req_msg ptr: %u", req_msg_ptr as uint)); + let req_msg = [ + buf_init(req_msg_ptr, vec::len(req_str_bytes)) + ]; + // this is the enclosing record, we'll pass a ptr to + // this to C.. + let write_handle = write_t(); + let write_handle_ptr = ptr::addr_of(write_handle); + log(debug, #fmt("tcp req: tcp stream: %d write_handle: %d", + tcp_handle_ptr as int, + write_handle_ptr as int)); + let client_data = { writer_handle: write_handle_ptr, + req_buf: ptr::addr_of(req_msg), + read_chan: client_chan }; + + let tcp_init_result = tcp_init( + test_loop as *libc::c_void, tcp_handle_ptr); + if (tcp_init_result == 0i32) { + log(debug, "sucessful tcp_init_result"); + + log(debug, "building addr..."); + let addr = ip4_addr(ip, port); + // FIXME ref #2064 + let addr_ptr = ptr::addr_of(addr); + log(debug, #fmt("after build addr in rust. port: %u", + addr.sin_port as uint)); + + // this should set up the connection request.. + log(debug, #fmt("b4 call tcp_connect connect cb: %u ", + on_connect_cb as uint)); + let tcp_connect_result = tcp_connect( + connect_req_ptr, tcp_handle_ptr, + addr_ptr, on_connect_cb); + if (tcp_connect_result == 0i32) { + // not set the data on the connect_req + // until its initialized + set_data_for_req( + connect_req_ptr as *libc::c_void, + ptr::addr_of(client_data) as *libc::c_void); + set_data_for_uv_handle( + tcp_handle_ptr as *libc::c_void, + ptr::addr_of(client_data) as *libc::c_void); + log(debug, "before run tcp req loop"); + run(test_loop); + log(debug, "after run tcp req loop"); + } + else { + log(debug, "tcp_connect() failure"); + assert false; + } + } + else { + log(debug, "tcp_init() failure"); + assert false; + } + loop_delete(test_loop); + + } + + crust fn server_after_close_cb(handle: *libc::c_void) unsafe { + log(debug, #fmt("SERVER server stream closed, should exit.. h: %?", + handle)); + } + + crust fn client_stream_after_close_cb(handle: *libc::c_void) + unsafe { + log(debug, "SERVER: closed client stream, now closing server stream"); + let client_data = get_data_for_uv_handle( + handle) as + *tcp_server_data; + close((*client_data).server as *libc::c_void, + server_after_close_cb); + } + + crust fn after_server_resp_write(req: *uv_write_t) unsafe { + let client_stream_ptr = + get_stream_handle_from_write_req(req); + log(debug, "SERVER: resp sent... closing client stream"); + close(client_stream_ptr as *libc::c_void, + client_stream_after_close_cb) + } + + crust fn on_server_read_cb(client_stream_ptr: *uv_stream_t, + nread: libc::ssize_t, + ++buf: uv_buf_t) unsafe { + if (nread > 0) { + // we have data + log(debug, #fmt("SERVER read: data! nread: %d", nread)); + + // pull out the contents of the write from the client + let buf_base = get_base_from_buf(buf); + let buf_len = get_len_from_buf(buf); + log(debug, #fmt("SERVER buf base: %u, len: %u, nread: %d", + buf_base as uint, + buf_len as uint, + nread)); + let bytes = vec::unsafe::from_buf(buf_base, buf_len); + let request_str = str::from_bytes(bytes); + + let client_data = get_data_for_uv_handle( + client_stream_ptr as *libc::c_void) as *tcp_server_data; + + let server_kill_msg = (*client_data).server_kill_msg; + let write_req = (*client_data).server_write_req; + if (str::contains(request_str, server_kill_msg)) { + log(debug, "SERVER: client req contains kill_msg!"); + log(debug, "SERVER: sending response to client"); + read_stop(client_stream_ptr); + let server_chan = *((*client_data).server_chan); + comm::send(server_chan, request_str); + let write_result = write( + write_req as *libc::c_void, + client_stream_ptr as *libc::c_void, + (*client_data).server_resp_buf, + after_server_resp_write); + log(debug, #fmt("SERVER: resp write result: %d", + write_result as int)); + if (write_result != 0i32) { + log(debug, "bad result for server resp write()"); + log(debug, get_last_err_info( + get_loop_for_uv_handle(client_stream_ptr + as *libc::c_void))); + assert false; + } + } + else { + log(debug, "SERVER: client req !contain kill_msg!"); + } + } + else if (nread == -1) { + // err .. possibly EOF + log(debug, "read: eof!"); + } + else { + // nread == 0 .. do nothing, just free buf as below + log(debug, "read: do nothing!"); + } + // when we're done + free_base_of_buf(buf); + log(debug, "SERVER exiting on_read_cb"); + } + + crust fn server_connection_cb(server_stream_ptr: + *uv_stream_t, + status: libc::c_int) unsafe { + log(debug, "client connecting!"); + let test_loop = get_loop_for_uv_handle( + server_stream_ptr as *libc::c_void); + if status != 0i32 { + let err_msg = get_last_err_info(test_loop); + log(debug, #fmt("server_connect_cb: non-zero status: %?", + err_msg)); + ret; + } + let server_data = get_data_for_uv_handle( + server_stream_ptr as *libc::c_void) as *tcp_server_data; + let client_stream_ptr = (*server_data).client; + let client_init_result = tcp_init(test_loop, + client_stream_ptr); + set_data_for_uv_handle( + client_stream_ptr as *libc::c_void, + server_data as *libc::c_void); + if (client_init_result == 0i32) { + log(debug, "successfully initialized client stream"); + let accept_result = accept(server_stream_ptr as + *libc::c_void, + client_stream_ptr as + *libc::c_void); + if (accept_result == 0i32) { + // start reading + let read_result = read_start( + client_stream_ptr as *uv_stream_t, + on_alloc_cb, + on_server_read_cb); + if (read_result == 0i32) { + log(debug, "successful server read start"); + } + else { + log(debug, #fmt("server_connection_cb: bad read:%d", + read_result as int)); + assert false; + } + } + else { + log(debug, #fmt("server_connection_cb: bad accept: %d", + accept_result as int)); + assert false; + } + } + else { + log(debug, #fmt("server_connection_cb: bad client init: %d", + client_init_result as int)); + assert false; + } + } + + type tcp_server_data = { + client: *uv_tcp_t, + server: *uv_tcp_t, + server_kill_msg: str, + server_resp_buf: *[uv_buf_t], + server_chan: *comm::chan, + server_write_req: *uv_write_t + }; + + type async_handle_data = { + continue_chan: *comm::chan + }; + + crust fn async_close_cb(handle: *libc::c_void) { + log(debug, #fmt("SERVER: closing async cb... h: %?", + handle)); + } + + crust fn continue_async_cb(async_handle: *uv_async_t, + status: libc::c_int) unsafe { + // once we're in the body of this callback, + // the tcp server's loop is set up, so we + // can continue on to let the tcp client + // do its thang + let data = get_data_for_uv_handle( + async_handle as *libc::c_void) as *async_handle_data; + let continue_chan = *((*data).continue_chan); + let should_continue = status == 0i32; + comm::send(continue_chan, should_continue); + close(async_handle as *libc::c_void, async_close_cb); + } + + fn impl_uv_tcp_server(server_ip: str, + server_port: int, + kill_server_msg: str, + server_resp_msg: str, + server_chan: *comm::chan, + continue_chan: *comm::chan) unsafe { + let test_loop = loop_new(); + let tcp_server = tcp_t(); + let tcp_server_ptr = ptr::addr_of(tcp_server); + + let tcp_client = tcp_t(); + let tcp_client_ptr = ptr::addr_of(tcp_client); + + let server_write_req = write_t(); + let server_write_req_ptr = ptr::addr_of(server_write_req); + + let resp_str_bytes = str::bytes(server_resp_msg); + let resp_msg_ptr: *u8 = vec::unsafe::to_ptr(resp_str_bytes); + log(debug, #fmt("resp_msg ptr: %u", resp_msg_ptr as uint)); + let resp_msg = [ + buf_init(resp_msg_ptr, vec::len(resp_str_bytes)) + ]; + + let continue_async_handle = async_t(); + let continue_async_handle_ptr = + ptr::addr_of(continue_async_handle); + let async_data = + { continue_chan: continue_chan }; + let async_data_ptr = ptr::addr_of(async_data); + + let server_data: tcp_server_data = { + client: tcp_client_ptr, + server: tcp_server_ptr, + server_kill_msg: kill_server_msg, + server_resp_buf: ptr::addr_of(resp_msg), + server_chan: server_chan, + server_write_req: server_write_req_ptr + }; + let server_data_ptr = ptr::addr_of(server_data); + set_data_for_uv_handle(tcp_server_ptr as *libc::c_void, + server_data_ptr as *libc::c_void); + + // uv_tcp_init() + let tcp_init_result = tcp_init( + test_loop as *libc::c_void, tcp_server_ptr); + if (tcp_init_result == 0i32) { + let server_addr = ip4_addr(server_ip, server_port); + // FIXME ref #2064 + let server_addr_ptr = ptr::addr_of(server_addr); + + // uv_tcp_bind() + let bind_result = tcp_bind(tcp_server_ptr, + server_addr_ptr); + if (bind_result == 0i32) { + log(debug, "successful uv_tcp_bind, listening"); + + // uv_listen() + let listen_result = listen(tcp_server_ptr as + *libc::c_void, + 128i32, + server_connection_cb); + if (listen_result == 0i32) { + // let the test know it can set up the tcp server, + // now.. this may still present a race, not sure.. + let async_result = async_init(test_loop, + continue_async_handle_ptr, + continue_async_cb); + if (async_result == 0i32) { + set_data_for_uv_handle( + continue_async_handle_ptr as *libc::c_void, + async_data_ptr as *libc::c_void); + async_send(continue_async_handle_ptr); + // uv_run() + run(test_loop); + log(debug, "server uv::run() has returned"); + } + else { + log(debug, #fmt("uv_async_init failure: %d", + async_result as int)); + assert false; + } + } + else { + log(debug, #fmt("non-zero result on uv_listen: %d", + listen_result as int)); + assert false; + } + } + else { + log(debug, #fmt("non-zero result on uv_tcp_bind: %d", + bind_result as int)); + assert false; + } + } + else { + log(debug, #fmt("non-zero result on uv_tcp_init: %d", + tcp_init_result as int)); + assert false; + } + loop_delete(test_loop); + } + + // this is the impl for a test that is (maybe) ran on a + // per-platform/arch basis below + fn impl_uv_tcp_server_and_request() unsafe { + let bind_ip = "0.0.0.0"; + let request_ip = "127.0.0.1"; + let port = 8888; + let kill_server_msg = "does a dog have buddha nature?"; + let server_resp_msg = "mu!"; + let client_port = comm::port::(); + let client_chan = comm::chan::(client_port); + let server_port = comm::port::(); + let server_chan = comm::chan::(server_port); + + let continue_port = comm::port::(); + let continue_chan = comm::chan::(continue_port); + let continue_chan_ptr = ptr::addr_of(continue_chan); + + task::spawn_sched(task::manual_threads(1u)) {|| + impl_uv_tcp_server(bind_ip, port, + kill_server_msg, + server_resp_msg, + ptr::addr_of(server_chan), + continue_chan_ptr); + }; + + // block until the server up is.. possibly a race? + log(debug, "before receiving on server continue_port"); + comm::recv(continue_port); + log(debug, "received on continue port, set up tcp client"); + + task::spawn_sched(task::manual_threads(1u)) {|| + impl_uv_tcp_request(request_ip, port, + kill_server_msg, + ptr::addr_of(client_chan)); + }; + + let msg_from_client = comm::recv(server_port); + let msg_from_server = comm::recv(client_port); + + assert str::contains(msg_from_client, kill_server_msg); + assert str::contains(msg_from_server, server_resp_msg); + } + + // don't run this test on fbsd or 32bit linux + #[cfg(target_os="win32")] + #[cfg(target_os="darwin")] + #[cfg(target_os="linux")] + mod tcp_and_server_client_test { + #[cfg(target_arch="x86_64")] + mod impl64 { + #[test] + fn test_uv_ll_tcp_server_and_request() unsafe { + impl_uv_tcp_server_and_request(); + } + } + #[cfg(target_arch="x86")] + mod impl32 { + #[test] + #[ignore(cfg(target_os = "linux"))] + fn test_uv_ll_tcp_server_and_request() unsafe { + impl_uv_tcp_server_and_request(); + } + } + } + + // struct size tests + #[test] + #[ignore(cfg(target_os = "freebsd"))] + fn test_uv_ll_struct_size_uv_tcp_t() { + let native_handle_size = rustrt::rust_uv_helper_uv_tcp_t_size(); + let rust_handle_size = sys::size_of::(); + let output = #fmt("uv_tcp_t -- native: %u rust: %u", + native_handle_size as uint, rust_handle_size); + log(debug, output); + assert native_handle_size as uint == rust_handle_size; + } + #[test] + #[ignore(cfg(target_os = "freebsd"))] + fn test_uv_ll_struct_size_uv_connect_t() { + let native_handle_size = + rustrt::rust_uv_helper_uv_connect_t_size(); + let rust_handle_size = sys::size_of::(); + let output = #fmt("uv_connect_t -- native: %u rust: %u", + native_handle_size as uint, rust_handle_size); + log(debug, output); + assert native_handle_size as uint == rust_handle_size; + } + #[test] + #[ignore(cfg(target_os = "freebsd"))] + fn test_uv_ll_struct_size_uv_buf_t() { + let native_handle_size = + rustrt::rust_uv_helper_uv_buf_t_size(); + let rust_handle_size = sys::size_of::(); + let output = #fmt("uv_buf_t -- native: %u rust: %u", + native_handle_size as uint, rust_handle_size); + log(debug, output); + assert native_handle_size as uint == rust_handle_size; + } + #[test] + #[ignore(cfg(target_os = "freebsd"))] + fn test_uv_ll_struct_size_uv_write_t() { + let native_handle_size = + rustrt::rust_uv_helper_uv_write_t_size(); + let rust_handle_size = sys::size_of::(); + let output = #fmt("uv_write_t -- native: %u rust: %u", + native_handle_size as uint, rust_handle_size); + log(debug, output); + assert native_handle_size as uint == rust_handle_size; + } + + #[test] + #[ignore(cfg(target_os = "freebsd"))] + fn test_uv_ll_struct_size_sockaddr_in() { + let native_handle_size = + rustrt::rust_uv_helper_sockaddr_in_size(); + let rust_handle_size = sys::size_of::(); + let output = #fmt("sockaddr_in -- native: %u rust: %u", + native_handle_size as uint, rust_handle_size); + log(debug, output); + assert native_handle_size as uint == rust_handle_size; + } + + #[test] + #[ignore(cfg(target_os = "freebsd"))] + fn test_uv_ll_struct_size_uv_async_t() { + let native_handle_size = + rustrt::rust_uv_helper_uv_async_t_size(); + let rust_handle_size = sys::size_of::(); + let output = #fmt("uv_async_t -- native: %u rust: %u", + native_handle_size as uint, rust_handle_size); + log(debug, output); + assert native_handle_size as uint == rust_handle_size; + } + + #[test] + #[ignore(cfg(target_os = "freebsd"))] + fn test_uv_ll_struct_size_uv_timer_t() { + let native_handle_size = + rustrt::rust_uv_helper_uv_timer_t_size(); + let rust_handle_size = sys::size_of::(); + let output = #fmt("uv_timer_t -- native: %u rust: %u", + native_handle_size as uint, rust_handle_size); + log(debug, output); + assert native_handle_size as uint == rust_handle_size; + } } \ No newline at end of file diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 21422646f0d2d..732d849eaf23f 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -24,6 +24,13 @@ rust_kernel::rust_kernel(rust_env *env) : non_weak_tasks(0), env(env) { + // set up storage of pointers needed to + // access the global loop. + global_loop_chan = 0; + async_handle_inner = (uintptr_t)0; + global_async_handle = &async_handle_inner; + *global_async_handle = (uintptr_t)0; + // Create the single threaded scheduler that will run on the platform's // main thread rust_manual_sched_launcher_factory launchfac; diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 1ccd423928d9a..58739b2ada0d5 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -73,6 +73,11 @@ class rust_kernel { rust_scheduler* get_scheduler_by_id_nolock(rust_sched_id id); void end_weak_tasks(); + // Used to communicate with the process-side, global libuv loop + uintptr_t global_loop_chan; + uintptr_t async_handle_inner; + uintptr_t* global_async_handle; + public: struct rust_env *env; @@ -117,6 +122,9 @@ class rust_kernel { void unweaken_task(rust_port_id chan); bool send_to_port(rust_port_id chan, void *sptr); + + uintptr_t* get_global_loop() { return &global_loop_chan; } + uintptr_t* get_global_async_handle() { return global_async_handle; } }; template struct kernel_owned { diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index 2b1e5d1ca3c57..7c0602f0a025b 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -5,6 +5,7 @@ #include "rust_globals.h" #include "rust_task.h" +#include "rust_log.h" #include "uv.h" // crust fn pointers @@ -188,7 +189,7 @@ rust_uv_hilvl_async_init(uv_loop_t* loop, crust_simple_cb cb, } extern "C" void* -rust_uv_timer_init(uv_loop_t* loop, crust_simple_cb cb, +rust_uv_hilvl_timer_init(uv_loop_t* loop, crust_simple_cb cb, uint8_t* buf) { uv_timer_t* new_timer = (uv_timer_t*)current_kernel_malloc( sizeof(uv_timer_t), @@ -201,14 +202,25 @@ rust_uv_timer_init(uv_loop_t* loop, crust_simple_cb cb, } extern "C" void -rust_uv_timer_start(uv_timer_t* the_timer, uint32_t timeout, +rust_uv_hilvl_timer_start(uv_timer_t* the_timer, uint32_t timeout, uint32_t repeat) { uv_timer_start(the_timer, native_timer_cb, timeout, repeat); } -extern "C" void +extern "C" int +rust_uv_timer_init(uv_loop_t* loop, uv_timer_t* timer) { + return uv_timer_init(loop, timer); +} + +extern "C" int +rust_uv_timer_start(uv_timer_t* the_timer, uv_timer_cb cb, + uint32_t timeout, uint32_t repeat) { + return uv_timer_start(the_timer, cb, timeout, repeat); +} + +extern "C" int rust_uv_timer_stop(uv_timer_t* the_timer) { - uv_timer_stop(the_timer); + return uv_timer_stop(the_timer); } extern "C" int @@ -221,14 +233,18 @@ rust_uv_tcp_connect(uv_connect_t* connect_ptr, uv_tcp_t* tcp_ptr, uv_connect_cb cb, sockaddr_in* addr_ptr) { - printf("inside rust_uv_tcp_connect\n"); + rust_task* task = rust_get_current_task(); + LOG(task, stdlib, "inside rust_uv_tcp_connect"); // FIXME ref #2064 sockaddr_in addr = *addr_ptr; - printf("before tcp_connect .. port: %d\n", addr.sin_port); - printf("before tcp_connect.. tcp stream: %lu cb ptr: %lu\n", - (unsigned long int)tcp_ptr, (unsigned long int)cb); + LOG(task, stdlib, "before tcp_connect .. port: %d", + addr.sin_port); + LOG(task, stdlib, "before tcp_connect.. tcp stream:" \ + "%lu cb ptr: %lu", + (unsigned long int)tcp_ptr, (unsigned long int)cb); int result = uv_tcp_connect(connect_ptr, tcp_ptr, addr, cb); - printf ("leaving rust_uv_tcp_connect.. and result: %d\n", + LOG(task, stdlib, "leaving rust_uv_tcp_connect.." \ + "and result: %d", result); return result; } @@ -236,8 +252,10 @@ rust_uv_tcp_connect(uv_connect_t* connect_ptr, extern "C" int rust_uv_tcp_bind(uv_tcp_t* tcp_server, sockaddr_in* addr_ptr) { // FIXME ref #2064 + rust_task* task = rust_get_current_task(); sockaddr_in addr = *addr_ptr; - printf("before uv_tcp_bind .. tcp_server: %lu port: %d\n", + LOG(task, stdlib, "before uv_tcp_bind .. tcp_server:" \ + "%lu port: %d", (unsigned long int)tcp_server, addr.sin_port); return uv_tcp_bind(tcp_server, addr); } @@ -281,6 +299,10 @@ extern "C" size_t rust_uv_helper_uv_async_t_size() { return sizeof(uv_async_t); } +extern "C" size_t +rust_uv_helper_uv_timer_t_size() { + return sizeof(uv_timer_t); +} extern "C" uv_stream_t* rust_uv_get_stream_handle_from_connect_req(uv_connect_t* connect) { @@ -302,11 +324,14 @@ current_kernel_malloc_alloc_cb(uv_handle_t* handle, extern "C" void rust_uv_buf_init(uv_buf_t* out_buf, char* base, size_t len) { - printf("rust_uv_buf_init: base: %lu len: %lu\n", - (long unsigned int)base, - (long unsigned int)len); + rust_task* task = rust_get_current_task(); + LOG(task, stdlib,"rust_uv_buf_init: base: %lu" \ + "len: %lu", + (unsigned long int)base, + (unsigned long int)len); *out_buf = uv_buf_init(base, len); - printf("rust_uv_buf_init: after: result->base: %lu len: %lu\n", + LOG(task, stdlib, "rust_uv_buf_init: after: " + "result->base: %" PRIxPTR " len: %" PRIxPTR, (unsigned long int)(*out_buf).base, (unsigned long int)(*out_buf).len); } @@ -316,6 +341,17 @@ rust_uv_get_loop_for_uv_handle(uv_handle_t* handle) { return handle->loop; } +extern "C" void* +rust_uv_get_data_for_uv_loop(uv_loop_t* loop) { + return loop->data; +} + +extern "C" void +rust_uv_set_data_for_uv_loop(uv_loop_t* loop, + void* data) { + loop->data = data; +} + extern "C" void* rust_uv_get_data_for_uv_handle(uv_handle_t* handle) { return handle->data; @@ -393,8 +429,29 @@ rust_uv_free_base_of_buf(uv_buf_t buf) { extern "C" struct sockaddr_in rust_uv_ip4_addr(const char* ip, int port) { - printf("before creating addr_ptr.. ip %s port %d\n", ip, port); + rust_task* task = rust_get_current_task(); + LOG(task, stdlib, "before creating addr_ptr.. ip %s" \ + "port %d", ip, port); struct sockaddr_in addr = uv_ip4_addr(ip, port); - printf("after creating .. port: %d\n", addr.sin_port); + LOG(task, stdlib, "after creating .. port: %d", addr.sin_port); return addr; } + +extern "C" uintptr_t* +rust_uv_get_kernel_global_chan_ptr() { + uintptr_t* result = rust_get_current_task()->kernel->get_global_loop(); + rust_task* task = rust_get_current_task(); + LOG(task, stdlib, "global loop: %lu", (unsigned long int)result); + LOG(task, stdlib,"global loop val: %lu", (unsigned long int)*result); + return result; +} + +extern "C" uintptr_t* +rust_uv_get_kernel_monitor_global_chan_ptr() { + return rust_uv_get_kernel_global_chan_ptr(); +} + +extern "C" uintptr_t* +rust_uv_get_kernel_global_async_handle() { + return rust_get_current_task()->kernel->get_global_async_handle(); +} diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index aa0082a82be09..545c3581425e7 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -97,6 +97,8 @@ rust_uv_hilvl_close_timer rust_uv_async_send rust_uv_async_init rust_uv_hilvl_async_init +rust_uv_hilvl_timer_init +rust_uv_hilvl_timer_start rust_uv_timer_init rust_uv_timer_start rust_uv_timer_stop @@ -123,15 +125,21 @@ rust_uv_helper_uv_write_t_size rust_uv_helper_uv_err_t_size rust_uv_helper_sockaddr_in_size rust_uv_helper_uv_async_t_size +rust_uv_helper_uv_timer_t_size rust_uv_get_stream_handle_from_connect_req rust_uv_get_stream_handle_from_write_req rust_uv_get_loop_for_uv_handle +rust_uv_get_data_for_uv_loop +rust_uv_set_data_for_uv_loop rust_uv_get_data_for_uv_handle rust_uv_set_data_for_uv_handle rust_uv_get_data_for_req rust_uv_set_data_for_req rust_uv_get_base_from_buf rust_uv_get_len_from_buf +rust_uv_get_kernel_global_chan_ptr +rust_uv_get_kernel_monitor_global_chan_ptr +rust_uv_get_kernel_global_async_handle rust_dbg_lock_create rust_dbg_lock_destroy rust_dbg_lock_lock