Skip to content

Use a queue to keep track of running and blocked tasks in rust_sched_loop #2100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mk/rt.mk
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ RUNTIME_CS_$(1) := \
rt/rust_builtin.cpp \
rt/rust_run_program.cpp \
rt/rust_env.cpp \
rt/rust_task_queue.cpp \
rt/rust_sched_loop.cpp \
rt/rust_sched_launcher.cpp \
rt/rust_sched_driver.cpp \
Expand Down
2 changes: 0 additions & 2 deletions src/rt/rust_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

#include "rust_globals.h"
#include "util/array_list.h"
#include "util/indexed_list.h"
#include "util/synchronized_indexed_list.h"
#include "util/hash_map.h"
#include "sync/sync.h"
#include "sync/lock_and_signal.h"
Expand Down
6 changes: 4 additions & 2 deletions src/rt/rust_port_selector.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#include "rust_port.h"
#include "rust_port_selector.h"
#include "rust_util.h"

rust_port_selector::rust_port_selector()
rust_port_selector::rust_port_selector(rust_kernel * kernel)
: ports(NULL), n_ports(0) {
isaac_init(kernel, &rctx);
}

void
Expand All @@ -27,7 +29,7 @@ rust_port_selector::select(rust_task *task, rust_port **dptr,
// message.

// Start looking for ports from a different index each time.
size_t j = isaac_rand(&task->sched_loop->rctx);
size_t j = isaac_rand(&rctx);
for (size_t i = 0; i < n_ports; i++) {
size_t k = (i + j) % n_ports;
rust_port *port = ports[k];
Expand Down
3 changes: 2 additions & 1 deletion src/rt/rust_port_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ class rust_port_selector : public rust_cond {
rust_port **ports;
size_t n_ports;
lock_and_signal rendezvous_lock;
randctx rctx;

public:
rust_port_selector();
rust_port_selector(rust_kernel *kernel);

void select(rust_task *task,
rust_port **dptr,
Expand Down
92 changes: 40 additions & 52 deletions src/rt/rust_sched_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ rust_sched_loop::rust_sched_loop(rust_scheduler *sched,
id(id),
should_exit(false),
cached_c_stack(NULL),
extra_c_stack(NULL),
dead_task(NULL),
pump_signal(NULL),
kernel(sched->kernel),
Expand All @@ -36,7 +37,6 @@ rust_sched_loop::rust_sched_loop(rust_scheduler *sched,
name("main")
{
LOGPTR(this, "new dom", (uintptr_t)this);
isaac_init(kernel, &rctx);

if (!tls_initialized)
init_tls();
Expand Down Expand Up @@ -73,35 +73,27 @@ rust_sched_loop::fail() {
kernel->fail();
}

static void
kill_task(rust_task * task)
{
task->unsupervise();
task->kill();
}

void
rust_sched_loop::kill_all_tasks() {
std::vector<rust_task*> all_tasks;

{
scoped_lock with(lock);

for (size_t i = 0; i < running_tasks.length(); i++) {
all_tasks.push_back(running_tasks[i]);
}

for (size_t i = 0; i < blocked_tasks.length(); i++) {
all_tasks.push_back(blocked_tasks[i]);
}
}
rust_task_iterator it = running_tasks.iterator();
while (it.hasNext())
kill_task(it.next());

while (!all_tasks.empty()) {
rust_task *task = all_tasks.back();
all_tasks.pop_back();
// We don't want the failure of these tasks to propagate back
// to the kernel again since we're already failing everything
task->unsupervise();
task->kill();
}
it = blocked_tasks.iterator();
while (it.hasNext())
kill_task(it.next());
}

size_t
rust_sched_loop::number_of_live_tasks() {
return running_tasks.length() + blocked_tasks.length();
return running_tasks.size() + blocked_tasks.size();
}

/**
Expand Down Expand Up @@ -152,38 +144,34 @@ rust_sched_loop::schedule_task() {
I(this, this);
// FIXME: in the face of failing tasks, this is not always right.
// I(this, n_live_tasks() > 0);
if (running_tasks.length() > 0) {
size_t k = isaac_rand(&rctx);
// Look around for a runnable task, starting at k.
for(size_t j = 0; j < running_tasks.length(); ++j) {
size_t i = (j + k) % running_tasks.length();
return (rust_task *)running_tasks[i];
}
}
return NULL;
return running_tasks.next();
}

void
rust_sched_loop::log_state() {
if (log_rt_task < log_debug) return;

if (!running_tasks.is_empty()) {
if (running_tasks.size() != 0) {
log(NULL, log_debug, "running tasks:");
for (size_t i = 0; i < running_tasks.length(); i++) {
rust_task_iterator it = running_tasks.iterator();
while (it.hasNext()) {
rust_task * task = it.next();
log(NULL, log_debug, "\t task: %s @0x%" PRIxPTR,
running_tasks[i]->name,
running_tasks[i]);
task->name,
task);
}
}

if (!blocked_tasks.is_empty()) {
if (blocked_tasks.size() != 0) {
log(NULL, log_debug, "blocked tasks:");
for (size_t i = 0; i < blocked_tasks.length(); i++) {
rust_task_iterator it = blocked_tasks.iterator();
while (it.hasNext()) {
rust_task * task = it.next();
log(NULL, log_debug, "\t task: %s @0x%" PRIxPTR
", blocked on: 0x%" PRIxPTR " '%s'",
blocked_tasks[i]->name, blocked_tasks[i],
blocked_tasks[i]->get_cond(),
blocked_tasks[i]->get_cond_name());
task->name, task,
task->get_cond(),
task->get_cond_name());
}
}
}
Expand Down Expand Up @@ -252,12 +240,11 @@ rust_sched_loop::run_single_turn() {
id);

reap_dead_tasks();

lock.unlock();
return sched_loop_state_keep_going;
} else {
A(this, running_tasks.is_empty(), "Should have no running tasks");
A(this, blocked_tasks.is_empty(), "Should have no blocked tasks");
A(this, running_tasks.size() == 0, "Should have no running tasks");
A(this, blocked_tasks.size() == 0, "Should have no blocked tasks");
A(this, dead_task == NULL, "Should have no dead tasks");

DLOG(this, dom, "finished main-loop %d", id);
Expand Down Expand Up @@ -288,7 +275,7 @@ rust_sched_loop::create_task(rust_task *spawner, const char *name) {
return task;
}

rust_task_list *
rust_task_queue *
rust_sched_loop::state_list(rust_task_state state) {
switch (state) {
case task_state_running:
Expand Down Expand Up @@ -327,14 +314,15 @@ rust_sched_loop::transition(rust_task *task,
name, (uintptr_t)this, state_name(src), state_name(dst),
state_name(task->get_state()));
I(this, task->get_state() == src);
rust_task_list *src_list = state_list(src);
if (src_list) {

rust_task_queue *src_list = state_list(src);
if (src_list)
src_list->remove(task);
}
rust_task_list *dst_list = state_list(dst);
if (dst_list) {
dst_list->append(task);
}

rust_task_queue *dst_list = state_list(dst);
if (dst_list)
dst_list->insert(task);

if (dst == task_state_dead) {
I(this, dead_task == NULL);
dead_task = task;
Expand Down
12 changes: 5 additions & 7 deletions src/rt/rust_sched_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "rust_stack.h"
#include "rust_signal.h"
#include "context.h"
#include "rust_task_queue.h"

enum rust_task_state {
task_state_newborn,
Expand All @@ -25,8 +26,6 @@ enum rust_sched_loop_state {

struct rust_task;

typedef indexed_list<rust_task> rust_task_list;

struct rust_sched_loop
{
private:
Expand All @@ -52,16 +51,17 @@ struct rust_sched_loop
stk_seg *cached_c_stack;
stk_seg *extra_c_stack;

rust_task_list running_tasks;
rust_task_list blocked_tasks;
rust_task_queue running_tasks;
rust_task_queue blocked_tasks;

rust_task *dead_task;

rust_signal *pump_signal;

void prepare_c_stack(rust_task *task);
void unprepare_c_stack();

rust_task_list *state_list(rust_task_state state);
rust_task_queue *state_list(rust_task_state state);
const char *state_name(rust_task_state state);

void pump_loop();
Expand All @@ -82,8 +82,6 @@ struct rust_sched_loop
size_t min_stack_size;
rust_env *env;

randctx rctx;

// FIXME: Neither of these are used
int32_t list_index;
const char *const name;
Expand Down
4 changes: 3 additions & 1 deletion src/rt/rust_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ rust_task::rust_task(rust_sched_loop *sched_loop, rust_task_state state,
sched_loop(sched_loop),
kernel(sched_loop->kernel),
name(name),
list_index(-1),
rendezvous_ptr(0),
local_region(&sched_loop->srv->local_region),
boxed(&local_region),
unwinding(false),
propagate_failure(true),
cc_counter(0),
total_stack_sz(0),
next(NULL),
prev(NULL),
state(state),
cond(NULL),
cond_name("none"),
Expand All @@ -41,6 +42,7 @@ rust_task::rust_task(rust_sched_loop *sched_loop, rust_task_state state,
c_stack(NULL),
next_c_sp(0),
next_rust_sp(0),
port_selector(kernel),
supervisor(spawner)
{
LOGPTR(sched_loop, "new task", (uintptr_t)this);
Expand Down
5 changes: 4 additions & 1 deletion src/rt/rust_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ rust_task : public kernel_owned<rust_task>, rust_cond
// Fields known only to the runtime.
rust_kernel *kernel;
const char *const name;
int32_t list_index;

// Rendezvous pointer for receiving data when blocked on a port. If we're
// trying to read data and no data is available on any incoming channel,
Expand All @@ -143,6 +142,10 @@ rust_task : public kernel_owned<rust_task>, rust_cond
// The amount of stack we're using, excluding red zones
size_t total_stack_sz;

// for linked list in rust_task_queue
rust_task * next;
rust_task * prev;

private:

// Protects state, cond, cond_name
Expand Down
59 changes: 59 additions & 0 deletions src/rt/rust_task_queue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@

#include "rust_task_queue.h"
#include "rust_internal.h"

rust_task_iterator::rust_task_iterator(rust_task * h, size_t m) :
count(0), cur(h), max(m)
{
}

rust_task *
rust_task_iterator::next() {
count++;
rust_task *ret = cur;
cur = cur->next;
return ret;
}

rust_task_queue::rust_task_queue() : head(NULL), sz(0)
{
}

rust_task *
rust_task_queue::next() {
if (sz == 0)
return NULL;
rust_task *ret = head;
head = head->next;
return ret;
}

void
rust_task_queue::insert(rust_task *elem) {
if (++sz == 1) {
head = elem;
head->next = head;
head->prev = head;
} else {
elem->prev = head->prev;
elem->next = head;
head->prev->next = elem;
head->prev = elem;
}
}

void
rust_task_queue::remove(rust_task *elem) {
if (sz == 0 || elem->next == NULL || elem->prev == NULL)
return;
if (--sz == 0)
head = NULL;
else {
if (elem == head)
head = elem->next;
elem->next->prev = elem->prev;
elem->prev->next = elem->next;
}
elem->next = NULL;
elem->prev = NULL;
}
Loading