//! @module Processing //! Processing implements a simple supervisor/worker pattern with a thread-safe, //! dual-priority queue. /* * A Task allows heap items to be sorted by two levels of priority and control * the behavior of Worker threads using SIGNALs. */ static class Task { static int pri, pri2; public mixed data; public void create(int priority, int secondary_priority, mixed value) { pri = priority; pri2 = secondary_priority; data = value; } public int(0..1) `==(object o) { return pri == o->pri || (pri == o->pri && pri2 == o->pri2); } public int(0..1) `>(object o) { return pri > o->pri || (pri == o->pri && pri2 > o->pri2); } public int(0..1) `<(object o) { return pri < o->pri || (pri == o->pri && pri2 < o->pri2); } } /* * A dual-priority queue that is thread-safe and may be joined. */ static class TaskQueue { public enum STATE { OPEN, CLOSED }; static ADT.Heap heap; // Holds Tasks static Thread.Mutex heap_lock; // Locks access to heap static Thread.Condition heap_cond; // Notifies threads waiting on heap to populate static Thread.Mutex join_lock; // Locks join_cond static Thread.Condition join_cond; // Awakens threads waiting on all tasks to be complete static int incomplete; // Tracks tasks that have been popped but not completed static int waiting; // Tracks the number of items waiting in the queue static STATE state; // Flags the queue as closed and stops push ops public void create() { // Initialize members heap = ADT.Heap(); heap_lock = Thread.Mutex(); heap_cond = Thread.Condition(); join_lock = Thread.Mutex(); join_cond = Thread.Condition(); incomplete = 0; waiting = 0; state = OPEN; } /* * Closes the queue and prevents new tasks from being added. */ public void close() { Thread.MutexKey key; state = CLOSED; key = heap_lock->lock(); heap_cond->broadcast(); key = 0; } /* * Returns 1 if this queue has been closed. */ public int(0..1) is_closed() { return state == CLOSED; } /* * Blocks until all tasks in the queue have been completed. */ public void join() { Thread.MutexKey key; key = join_lock->lock(); join_cond->wait(key); } /* * Adds a new task to the heap. Signals any threads waiting on the heap to * have a value and increments waiting. */ public void put(int priority1, int priority2, mixed value) { Thread.MutexKey key; Task task; if (state == CLOSED) error("TaskQueue is closed\n"); key = heap_lock->lock(); task = Task(priority1, priority2, value); heap->push(task); ++waiting; ++incomplete; heap_cond->signal(); key = 0; } /* * Blocks (if block is 1) until a task is on the heap, pops one off, and * decrements waiting. If block is 0, throws an error when empty. */ public Task get(void|int(0..1) block) { Task task; Thread.MutexKey key; if (block == 0 && heap->size() == 0) error("queue empty\n"); key = heap_lock->lock(); while (heap->size() == 0 && state == OPEN) heap_cond->wait(key); task = heap->pop(); --waiting; key = 0; return task; } /* * Registers that a task is complete and signals joining threads if this * brings the number of incomplete tasks to 0. */ public void task_complete(Task task) { Thread.MutexKey key; key = join_lock->lock(); if (--incomplete == 0) join_cond->broadcast(); key = 0; } } static class Worker { inherit Thread.Thread : thread; static TaskQueue queue; static function(mixed,mixed...:void) process_task; static array(mixed) args; static int stopped; public void create(TaskQueue q, function(mixed,mixed...:void) fn, void|array(mixed) arg_list) { queue = q; process_task = fn; args = arg_list ? arg_list : ({}); thread::create(loop); } public void stop() { stopped = 1; } public void loop() { /* * sleepiness is used to change the poll duration so that polling does * not overburden the cpu. When receiving many items, the poll rate * increases up to 20x/second. When receiving items only intermitently, * polling may slow down to ~3x/second. Polling starts initially at 10 * per second. The change delta is the sleepiness_min, which must be * >0. It cannot take more than 7 polls to move between max and min * (0.35/0.05 = 7), so changes in the rate of task receipt are quickly * realized in the poll rate. */ float sleepiness = 0.1; float sleepiness_max = 0.35; float sleepiness_min = 0.05; while (1) { Task task; mixed caught; caught = catch { task = queue->get(0); }; if (caught == 0) { if (sleepiness > sleepiness_min) sleepiness -= sleepiness_min; process_task(task->data, @args); queue->task_complete(task); } else if (stopped == 1) break; else { if (sleepiness < sleepiness_max) sleepiness += sleepiness_min; sleep(sleepiness); } } } } //! Implements a worker/supervisor pattern, permitting multiple work groups and //! distinct workers within each group. public class Supervisor { static mapping(string:TaskQueue) queues; static mapping(string:array(Worker)) groups; //! Creates a new Supervisor. public void create() { groups = ([]); queues = ([]); } //! Adds a new work group identified by [group_name]. The same name must //! be used when adding workers and tasks. public function(int,int,mixed:int(0..1)) add_group(string group_name) { TaskQueue queue = TaskQueue(); queues[group_name] = queue; groups[group_name] = ({}); return Function.curry(add_task)(group_name); } //! Adds a new worker to [group_name]. The worker will repeatedly apply //! [process_job] to tasks received in the queue. The parameters for //! [process_job] are ({ mixed task, @[args] }). A worker may have its own //! state using [args]. public int(0..1) add_worker(string group_name, function process_job, void|array(mixed) args) { TaskQueue queue = queues[group_name]; Worker worker = Worker(queue, process_job, args); groups[group_name] = Array.push(groups[group_name], worker); return 1; } //! Adds a task to the queue. The first available worker will pick it up. //! A task may have 2 levels of priority. public int(0..1) add_task(string group_name, int pri1, int pri2, mixed data) { queues[group_name]->put(pri1, pri2, data); return 1; } //! Closes [group_name]'s queue and stops each worker. public void close_group(string group_name) { queues[group_name]->close(); foreach(groups[group_name], Worker w) w->stop(); } //! Blocks until all tasks in [group_name]'s queue have been processed. //! [close_group] *must* be called before [join_group], or the call will //! block indefinitely. public void join_group(string group_name) { if (queues[group_name]->is_closed()) { queues[group_name]->join(); foreach(groups[group_name], Worker w) if (w->status() == Thread.THREAD_RUNNING) w->wait(); } else error("%s has not been closed.\n", group_name); } } //! @endmodule Processing