summaryrefslogtreecommitdiff
path: root/src/threads
diff options
context:
space:
mode:
authorJörg Frings-Fürst <debian@jff-webhosting.net>2014-07-23 09:06:59 +0200
committerJörg Frings-Fürst <debian@jff-webhosting.net>2014-07-23 09:06:59 +0200
commit4ea2cc3bd4a7d9b1c54a9d33e6a1cf82e7c8c21d (patch)
treed2e54377d14d604356c86862a326f64ae64dadd6 /src/threads
Imported Upstream version 0.18.1upstream/0.18.1
Diffstat (limited to 'src/threads')
-rw-r--r--src/threads/BackgroundJob.vala243
-rw-r--r--src/threads/Semaphore.vala160
-rw-r--r--src/threads/Threads.vala14
-rw-r--r--src/threads/Workers.vala104
-rw-r--r--src/threads/mk/threads.mk30
5 files changed, 551 insertions, 0 deletions
diff --git a/src/threads/BackgroundJob.vala b/src/threads/BackgroundJob.vala
new file mode 100644
index 0000000..178211e
--- /dev/null
+++ b/src/threads/BackgroundJob.vala
@@ -0,0 +1,243 @@
+/* Copyright 2011-2014 Yorba Foundation
+ *
+ * This software is licensed under the GNU LGPL (version 2.1 or later).
+ * See the COPYING file in this distribution.
+ */
+
+// This callback is executed when an associated BackgroundJob completes. It is called from within
+// the Gtk event loop, *not* the background thread's context.
+public delegate void CompletionCallback(BackgroundJob job);
+
+// This callback is executed when an associated BackgroundJob has been cancelled (via its
+// Cancellable). Note that it's *possible* the BackgroundJob performed some or all of its work
+// prior to executing this delegate.
+public delegate void CancellationCallback(BackgroundJob job);
+
+// This callback is executed by the BackgroundJob when a unit of work is completed, but not the
+// entire job. It is called from within the Gtk event loop, *not* the background thread's
+// context.
+//
+// Note that there does not seem to be any guarantees of order in the Idle queue documentation,
+// and this it's possible (and, depending on assigned priorities, likely) that notifications could
+// arrive in different orders, and even after the CompletionCallback. Thus, no guarantee of
+// ordering is made here.
+//
+// NOTE: Would like Value to be nullable, but can't due to this bug:
+// https://bugzilla.gnome.org/show_bug.cgi?id=607098
+//
+// NOTE: There will be a memory leak using NotificationCallbacks due to this bug:
+// https://bugzilla.gnome.org/show_bug.cgi?id=571264
+//
+// NOTE: Because of these two bugs, using an abstract base class rather than Value. When both are
+// fixed (at least the second), may consider going back to Value.
+
+public abstract class NotificationObject {
+}
+
+public abstract class InterlockedNotificationObject : NotificationObject {
+ private Semaphore semaphore = new Semaphore();
+
+ // Only called by BackgroundJob; no need for users or subclasses to use
+ public void internal_wait_for_completion() {
+ semaphore.wait();
+ }
+
+ // Only called by BackgroundJob; no need for users or subclasses to use
+ public void internal_completed() {
+ semaphore.notify();
+ }
+}
+
+public delegate void NotificationCallback(BackgroundJob job, NotificationObject? user);
+
+// This abstract class represents a unit of work that can be executed within a background thread's
+// context. If specified, the job may be cancellable (which can be checked by execute() and the
+// worker thread prior to calling execute()). The BackgroundJob may also specify a
+// CompletionCallback and/or a CancellationCallback to be executed within Gtk's event loop.
+// A BackgroundJob may also emit NotificationCallbacks, all of which are also executed within
+// Gtk's event loop.
+//
+// The BackgroundJob may be constructed with a reference to its "owner". This is not used directly
+// by BackgroundJob or Worker, but merely exists to hold a reference to the Object that is receiving
+// the various callbacks from BackgroundJob. Without this, it's possible for the object creating
+// BackgroundJobs to be freed before all the callbacks have been received, or even during a callback,
+// which is an unstable situation.
+public abstract class BackgroundJob {
+ public enum JobPriority {
+ HIGHEST = 100,
+ HIGH = 75,
+ NORMAL = 50,
+ LOW = 25,
+ LOWEST = 0;
+
+ // Returns negative if this is higher, zero if equal, positive if this is lower
+ public int compare(JobPriority other) {
+ return (int) other - (int) this;
+ }
+
+ public static int compare_func(void *a, void *b) {
+ return (int) b - (int) a;
+ }
+ }
+
+ private class NotificationJob {
+ public unowned NotificationCallback callback;
+ public BackgroundJob background_job;
+ public NotificationObject? user;
+
+ public NotificationJob(NotificationCallback callback, BackgroundJob background_job,
+ NotificationObject? user) {
+ this.callback = callback;
+ this.background_job = background_job;
+ this.user = user;
+ }
+ }
+
+ private static Gee.ArrayList<NotificationJob> notify_queue = new Gee.ArrayList<NotificationJob>();
+
+ private Object owner;
+ private unowned CompletionCallback callback;
+ private Cancellable cancellable;
+ private unowned CancellationCallback cancellation;
+ private BackgroundJob self = null;
+ private AbstractSemaphore semaphore = null;
+
+ // The thinking here is that there is exactly one CompletionCallback per job, and the caller
+ // probably wants to know that to set off UI and other events in response. There are several
+ // (possibly hundreds or thousands) or notifications, and thus should arrive in a more
+ // controlled way (to avoid locking up the UI, for example). This has ramifications about
+ // the order in which completion and notifications arrive (see above note).
+ private int completion_priority = Priority.HIGH;
+ private int notification_priority = Priority.DEFAULT_IDLE;
+
+ public BackgroundJob(Object? owner = null, CompletionCallback? callback = null,
+ Cancellable? cancellable = null, CancellationCallback? cancellation = null,
+ AbstractSemaphore? completion_semaphore = null) {
+ this.owner = owner;
+ this.callback = callback;
+ this.cancellable = cancellable;
+ this.cancellation = cancellation;
+ this.semaphore = completion_semaphore;
+ }
+
+ public abstract void execute();
+
+ public virtual JobPriority get_priority() {
+ return JobPriority.NORMAL;
+ }
+
+ // For the CompareFunc delegate, according to JobPriority.
+ public static int priority_compare_func(BackgroundJob a, BackgroundJob b) {
+ return a.get_priority().compare(b.get_priority());
+ }
+
+ // For the Comparator delegate, according to JobPriority.
+ public static int64 priority_comparator(void *a, void *b) {
+ return priority_compare_func((BackgroundJob) a, (BackgroundJob) b);
+ }
+
+ // This method is not thread-safe. Best to set priority before the job is enqueued.
+ public void set_completion_priority(int priority) {
+ completion_priority = priority;
+ }
+
+ // This method is not thread-safe. Best to set priority before the job is enqueued.
+ public void set_notification_priority(int priority) {
+ notification_priority = priority;
+ }
+
+ // This method is thread-safe, but only waits if a completion semaphore has been set, otherwise
+ // exits immediately. Note that blocking for a semaphore does NOT spin the event loop, so a
+ // thread relying on it to continue should not use this.
+ public void wait_for_completion() {
+ if (semaphore != null)
+ semaphore.wait();
+ }
+
+ public Cancellable? get_cancellable() {
+ return cancellable;
+ }
+
+ public bool is_cancelled() {
+ return (cancellable != null) ? cancellable.is_cancelled() : false;
+ }
+
+ public void cancel() {
+ if (cancellable != null)
+ cancellable.cancel();
+ }
+
+ // This should only be called by Workers. Beware to all who fail to heed.
+ public void internal_notify_completion() {
+ if (semaphore != null)
+ semaphore.notify();
+
+ if (callback == null && cancellation == null)
+ return;
+
+ if (is_cancelled() && cancellation == null)
+ return;
+
+ // Because Idle doesn't maintain a ref count of the job, and it's going to be dropped by
+ // the worker thread soon, need to maintain a ref until the completion callback is made
+ self = this;
+
+ Idle.add_full(completion_priority, on_notify_completion);
+ }
+
+ private bool on_notify_completion() {
+ // it's still possible the caller cancelled this operation during or after the execute()
+ // method was called ... since the completion work can be costly for a job that was
+ // already cancelled, and the caller might've dropped all references to the job by now,
+ // only notify completion in this context if not cancelled
+ if (is_cancelled()) {
+ if (cancellation != null)
+ cancellation(this);
+ } else {
+ if (callback != null)
+ callback(this);
+ }
+
+ // drop the ref so this object can be freed ... must not touch "this" after this point
+ self = null;
+
+ return false;
+ }
+
+ // This call may be executed by the child class during execute() to inform of a unit of
+ // work being completed
+ protected void notify(NotificationCallback callback, NotificationObject? user) {
+ lock (notify_queue) {
+ notify_queue.add(new NotificationJob(callback, this, user));
+ }
+
+ Idle.add_full(notification_priority, on_notification_ready);
+
+ // If an interlocked notification, block until the main thread completes the notification
+ // callback
+ InterlockedNotificationObject? interlocked = user as InterlockedNotificationObject;
+ if (interlocked != null)
+ interlocked.internal_wait_for_completion();
+ }
+
+ private bool on_notification_ready() {
+ // this is called once for every notification added, so there should always be something
+ // waiting for us
+ NotificationJob? notification_job = null;
+ lock (notify_queue) {
+ if (notify_queue.size > 0)
+ notification_job = notify_queue.remove_at(0);
+ }
+ assert(notification_job != null);
+
+ notification_job.callback(notification_job.background_job, notification_job.user);
+
+ // Release the blocked thread waiting for this notification to complete
+ InterlockedNotificationObject? interlocked = notification_job.user as InterlockedNotificationObject;
+ if (interlocked != null)
+ interlocked.internal_completed();
+
+ return false;
+ }
+}
+
diff --git a/src/threads/Semaphore.vala b/src/threads/Semaphore.vala
new file mode 100644
index 0000000..dfb0a2f
--- /dev/null
+++ b/src/threads/Semaphore.vala
@@ -0,0 +1,160 @@
+/* Copyright 2011-2014 Yorba Foundation
+ *
+ * This software is licensed under the GNU LGPL (version 2.1 or later).
+ * See the COPYING file in this distribution.
+ */
+
+// Semaphores may be used to be notified when a job is completed. This provides an alternate
+// mechanism (essentially, a blocking mechanism) to the system of callbacks that BackgroundJob
+// offers. They can also be used for other job-dependent notification mechanisms.
+public abstract class AbstractSemaphore {
+ public enum Type {
+ SERIAL,
+ BROADCAST
+ }
+
+ protected enum NotifyAction {
+ NONE,
+ SIGNAL
+ }
+
+ protected enum WaitAction {
+ SLEEP,
+ READY
+ }
+
+ private Type type;
+ private Mutex mutex = Mutex();
+ private Cond monitor = Cond();
+
+ public AbstractSemaphore(Type type) {
+ assert(type == Type.SERIAL || type == Type.BROADCAST);
+
+ this.type = type;
+ }
+
+ private void trigger() {
+ if (type == Type.SERIAL)
+ monitor.signal();
+ else
+ monitor.broadcast();
+ }
+
+ public void notify() {
+ mutex.lock();
+
+ NotifyAction action = do_notify();
+ switch (action) {
+ case NotifyAction.NONE:
+ // do nothing
+ break;
+
+ case NotifyAction.SIGNAL:
+ trigger();
+ break;
+
+ default:
+ error("Unknown semaphore action: %s", action.to_string());
+ }
+
+ mutex.unlock();
+ }
+
+ // This method is called by notify() with the semaphore's mutex locked.
+ protected abstract NotifyAction do_notify();
+
+ public void wait() {
+ mutex.lock();
+
+ while (do_wait() == WaitAction.SLEEP)
+ monitor.wait(mutex);
+
+ mutex.unlock();
+ }
+
+ // This method is called by wait() with the semaphore's mutex locked.
+ protected abstract WaitAction do_wait();
+
+ // Returns true if the semaphore is reset, false otherwise.
+ public bool reset() {
+ mutex.lock();
+ bool is_reset = do_reset();
+ mutex.unlock();
+
+ return is_reset;
+ }
+
+ // This method is called by reset() with the semaphore's mutex locked. Returns true if reset,
+ // false if not supported.
+ protected virtual bool do_reset() {
+ return false;
+ }
+}
+
+public class Semaphore : AbstractSemaphore {
+ bool passed = false;
+
+ public Semaphore() {
+ base (AbstractSemaphore.Type.BROADCAST);
+ }
+
+ protected override AbstractSemaphore.NotifyAction do_notify() {
+ if (passed)
+ return NotifyAction.NONE;
+
+ passed = true;
+
+ return NotifyAction.SIGNAL;
+ }
+
+ protected override AbstractSemaphore.WaitAction do_wait() {
+ return passed ? WaitAction.READY : WaitAction.SLEEP;
+ }
+}
+
+public class CountdownSemaphore : AbstractSemaphore {
+ private int total;
+ private int passed = 0;
+
+ public CountdownSemaphore(int total) {
+ base (AbstractSemaphore.Type.BROADCAST);
+
+ this.total = total;
+ }
+
+ protected override AbstractSemaphore.NotifyAction do_notify() {
+ if (passed >= total)
+ critical("CountdownSemaphore overrun: %d/%d", passed + 1, total);
+
+ return (++passed >= total) ? NotifyAction.SIGNAL : NotifyAction.NONE;
+ }
+
+ protected override AbstractSemaphore.WaitAction do_wait() {
+ return (passed < total) ? WaitAction.SLEEP : WaitAction.READY;
+ }
+}
+
+public class EventSemaphore : AbstractSemaphore {
+ bool fired = false;
+
+ public EventSemaphore() {
+ base (AbstractSemaphore.Type.BROADCAST);
+ }
+
+ protected override AbstractSemaphore.NotifyAction do_notify() {
+ fired = true;
+
+ return NotifyAction.SIGNAL;
+ }
+
+ protected override AbstractSemaphore.WaitAction do_wait() {
+ return fired ? WaitAction.READY : WaitAction.SLEEP;
+ }
+
+ protected override bool do_reset() {
+ fired = false;
+
+ return true;
+ }
+}
+
diff --git a/src/threads/Threads.vala b/src/threads/Threads.vala
new file mode 100644
index 0000000..0e1b1ec
--- /dev/null
+++ b/src/threads/Threads.vala
@@ -0,0 +1,14 @@
+/* Copyright 2011-2014 Yorba Foundation
+ *
+ * This software is licensed under the GNU Lesser General Public License
+ * (version 2.1 or later). See the COPYING file in this distribution.
+ */
+
+namespace Threads {
+ public void init() throws Error {
+ }
+
+ public void terminate() {
+ }
+}
+
diff --git a/src/threads/Workers.vala b/src/threads/Workers.vala
new file mode 100644
index 0000000..756eb01
--- /dev/null
+++ b/src/threads/Workers.vala
@@ -0,0 +1,104 @@
+/* Copyright 2009-2014 Yorba Foundation
+ *
+ * This software is licensed under the GNU LGPL (version 2.1 or later).
+ * See the COPYING file in this distribution.
+ */
+
+
+public class BackgroundJobBatch : SortedList<BackgroundJob> {
+ public BackgroundJobBatch() {
+ base (BackgroundJob.priority_comparator);
+ }
+}
+
+// Workers wraps some of ThreadPool's oddities up into an interface that emphasizes BackgroundJobs.
+public class Workers {
+ public const int UNLIMITED_THREADS = -1;
+
+ private ThreadPool<void *> thread_pool;
+ private AsyncQueue<BackgroundJob> queue = new AsyncQueue<BackgroundJob>();
+ private EventSemaphore empty_event = new EventSemaphore();
+ private int enqueued = 0;
+
+ public Workers(int max_threads, bool exclusive) {
+ if (max_threads <= 0 && max_threads != UNLIMITED_THREADS)
+ max_threads = 1;
+
+ // event starts as set because queue is empty
+ empty_event.notify();
+
+ try {
+ thread_pool = new ThreadPool<void *>.with_owned_data(thread_start, max_threads, exclusive);
+ } catch (ThreadError err) {
+ error("Unable to create thread pool: %s", err.message);
+ }
+ }
+
+ public static int threads_per_cpu(int per = 1, int max = -1) requires (per > 0) ensures (result > 0) {
+ int count = number_of_processors() * per;
+
+ return (max < 0) ? count : count.clamp(0, max);
+ }
+
+ // This is useful when the intent is for the worker threads to use all the CPUs minus one for
+ // the main/UI thread. (No guarantees, of course.)
+ public static int thread_per_cpu_minus_one() ensures (result > 0) {
+ return (number_of_processors() - 1).clamp(1, int.MAX);
+ }
+
+ // Enqueues a BackgroundJob for work in a thread context. BackgroundJob.execute() is called
+ // within the thread's context, while its CompletionCallback is called within the Gtk event loop.
+ public void enqueue(BackgroundJob job) {
+ empty_event.reset();
+
+ lock (queue) {
+ queue.push_sorted(job, BackgroundJob.priority_compare_func);
+ enqueued++;
+ }
+
+ try {
+ thread_pool.add(job);
+ } catch (ThreadError err) {
+ // error should only occur when a thread could not be created, in which case, the
+ // BackgroundJob is queued up
+ warning("Unable to create worker thread: %s", err.message);
+ }
+ }
+
+ public void enqueue_many(BackgroundJobBatch batch) {
+ foreach (BackgroundJob job in batch)
+ enqueue(job);
+ }
+
+ public void wait_for_empty_queue() {
+ empty_event.wait();
+ }
+
+ // Returns the number of BackgroundJobs on the queue, not including active jobs.
+ public int get_pending_job_count() {
+ lock (queue) {
+ return enqueued;
+ }
+ }
+
+ private void thread_start(void *ignored) {
+ BackgroundJob? job;
+ bool empty;
+ lock (queue) {
+ job = queue.try_pop();
+ assert(job != null);
+
+ assert(enqueued > 0);
+ empty = (--enqueued == 0);
+ }
+
+ if (!job.is_cancelled())
+ job.execute();
+
+ job.internal_notify_completion();
+
+ if (empty)
+ empty_event.notify();
+ }
+}
+
diff --git a/src/threads/mk/threads.mk b/src/threads/mk/threads.mk
new file mode 100644
index 0000000..83afc47
--- /dev/null
+++ b/src/threads/mk/threads.mk
@@ -0,0 +1,30 @@
+
+# UNIT_NAME is the Vala namespace. A file named UNIT_NAME.vala must be in this directory with
+# a init() and terminate() function declared in the namespace.
+UNIT_NAME := Threads
+
+# UNIT_DIR should match the subdirectory the files are located in. Generally UNIT_NAME in all
+# lowercase. The name of this file should be UNIT_DIR.mk.
+UNIT_DIR := threads
+
+# All Vala files in the unit should be listed here with no subdirectory prefix.
+#
+# NOTE: Do *not* include the unit's master file, i.e. UNIT_NAME.vala.
+UNIT_FILES := \
+ Workers.vala \
+ BackgroundJob.vala \
+ Semaphore.vala
+
+# Any unit this unit relies upon (and should be initialized before it's initialized) should
+# be listed here using its Vala namespace.
+#
+# NOTE: All units are assumed to rely upon the unit-unit. Do not include that here.
+UNIT_USES :=
+
+# List any additional files that are used in the build process as a part of this unit that should
+# be packaged in the tarball. File names should be relative to the unit's home directory.
+UNIT_RC :=
+
+# unitize.mk must be called at the end of each UNIT_DIR.mk file.
+include unitize.mk
+