summaryrefslogtreecommitdiff
path: root/src/TimedQueue.vala
diff options
context:
space:
mode:
authorJörg Frings-Fürst <debian@jff-webhosting.net>2014-07-23 09:06:59 +0200
committerJörg Frings-Fürst <debian@jff-webhosting.net>2014-07-23 09:06:59 +0200
commit4ea2cc3bd4a7d9b1c54a9d33e6a1cf82e7c8c21d (patch)
treed2e54377d14d604356c86862a326f64ae64dadd6 /src/TimedQueue.vala
Imported Upstream version 0.18.1upstream/0.18.1
Diffstat (limited to 'src/TimedQueue.vala')
-rw-r--r--src/TimedQueue.vala284
1 files changed, 284 insertions, 0 deletions
diff --git a/src/TimedQueue.vala b/src/TimedQueue.vala
new file mode 100644
index 0000000..7001421
--- /dev/null
+++ b/src/TimedQueue.vala
@@ -0,0 +1,284 @@
+/* Copyright 2010-2014 Yorba Foundation
+ *
+ * This software is licensed under the GNU Lesser General Public License
+ * (version 2.1 or later). See the COPYING file in this distribution.
+ */
+
+// TimedQueue is a specialized collection class. It holds items in order, but rather than being
+// manually dequeued, they are dequeued automatically after a specified amount of time has elapsed
+// for that item. As of today, it's possible the item will be dequeued a bit later than asked
+// for, but it will never be early. Future implementations might tighten up the lateness.
+//
+// The original design was to use a signal to notify when an item has been dequeued, but Vala has
+// a bug with passing an unnamed type as a signal parameter:
+// https://bugzilla.gnome.org/show_bug.cgi?id=628639
+//
+// The rate the items come off the queue can be spaced out. Note that this can cause items to back
+// up. As of today, TimedQueue makes no effort to combat this.
+
+public delegate void DequeuedCallback<G>(G item);
+
+public class TimedQueue<G> {
+ private class Element<G> {
+ public G item;
+ public ulong ready;
+
+ public Element(G item, ulong ready) {
+ this.item = item;
+ this.ready = ready;
+ }
+
+ public static int64 comparator(void *a, void *b) {
+ return (int64) ((Element *) a)->ready - (int64) ((Element *) b)->ready;
+ }
+ }
+
+ private uint hold_msec;
+ private unowned DequeuedCallback<G> callback;
+ private Gee.EqualDataFunc<G> equal_func;
+ private int priority;
+ private uint timer_id = 0;
+ private SortedList<Element<G>> queue;
+ private uint dequeue_spacing_msec = 0;
+ private ulong last_dequeue = 0;
+ private bool paused_state = false;
+
+ public virtual signal void paused(bool is_paused) {
+ }
+
+ // Initial design was to have a signal that passed the dequeued G, but bug in valac meant
+ // finding a workaround, namely using a delegate:
+ // https://bugzilla.gnome.org/show_bug.cgi?id=628639
+ public TimedQueue(uint hold_msec, DequeuedCallback<G> callback,
+ owned Gee.EqualDataFunc? equal_func = null, int priority = Priority.DEFAULT) {
+ this.hold_msec = hold_msec;
+ this.callback = callback;
+
+ if (equal_func != null)
+ this.equal_func = (owned) equal_func;
+ else
+ this.equal_func = (Gee.EqualDataFunc<G>) (Gee.Functions.get_equal_func_for(typeof(G)));
+
+ this.priority = priority;
+
+ queue = new SortedList<Element<G>>(Element.comparator);
+
+ timer_id = Timeout.add(get_heartbeat_timeout(), on_heartbeat, priority);
+ }
+
+ ~TimedQueue() {
+ if (timer_id != 0)
+ Source.remove(timer_id);
+ }
+
+ public uint get_dequeue_spacing_msec() {
+ return dequeue_spacing_msec;
+ }
+
+ public void set_dequeue_spacing_msec(uint msec) {
+ if (msec == dequeue_spacing_msec)
+ return;
+
+ if (timer_id != 0)
+ Source.remove(timer_id);
+
+ dequeue_spacing_msec = msec;
+
+ timer_id = Timeout.add(get_heartbeat_timeout(), on_heartbeat, priority);
+ }
+
+ private uint get_heartbeat_timeout() {
+ return ((dequeue_spacing_msec == 0)
+ ? (hold_msec / 10)
+ : (dequeue_spacing_msec / 2)).clamp(10, uint.MAX);
+ }
+
+ protected virtual void notify_dequeued(G item) {
+ callback(item);
+ }
+
+ public bool is_paused() {
+ return paused_state;
+ }
+
+ public void pause() {
+ if (paused_state)
+ return;
+
+ paused_state = true;
+
+ paused(true);
+ }
+
+ public void unpause() {
+ if (!paused_state)
+ return;
+
+ paused_state = false;
+
+ paused(false);
+ }
+
+ public virtual void clear() {
+ queue.clear();
+ }
+
+ public virtual bool contains(G item) {
+ foreach (Element<G> e in queue) {
+ if (equal_func(item, e.item))
+ return true;
+ }
+
+ return false;
+ }
+
+ public virtual bool enqueue(G item) {
+ return queue.add(new Element<G>(item, calc_ready_time()));
+ }
+
+ public virtual bool enqueue_many(Gee.Collection<G> items) {
+ ulong ready_time = calc_ready_time();
+
+ Gee.ArrayList<Element<G>> elements = new Gee.ArrayList<Element<G>>();
+ foreach (G item in items)
+ elements.add(new Element<G>(item, ready_time));
+
+ return queue.add_list(elements);
+ }
+
+ public virtual bool remove_first(G item) {
+ Gee.Iterator<Element<G>> iter = queue.iterator();
+ while (iter.next()) {
+ Element<G> e = iter.get();
+ if (equal_func(item, e.item)) {
+ iter.remove();
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public virtual int size {
+ get {
+ return queue.size;
+ }
+ }
+
+ private ulong calc_ready_time() {
+ return now_ms() + (ulong) hold_msec;
+ }
+
+ private bool on_heartbeat() {
+ if (paused_state)
+ return true;
+
+ ulong now = 0;
+
+ for (;;) {
+ if (queue.size == 0)
+ break;
+
+ Element<G>? head = queue.get_at(0);
+ assert(head != null);
+
+ if (now == 0)
+ now = now_ms();
+
+ if (head.ready > now)
+ break;
+
+ // if a space of time is required between dequeues, check now
+ if ((dequeue_spacing_msec != 0) && ((now - last_dequeue) < dequeue_spacing_msec))
+ break;
+
+ Element<G>? h = queue.remove_at(0);
+ assert(head == h);
+
+ notify_dequeued(head.item);
+ last_dequeue = now;
+
+ // if a dequeue spacing is in place, it's a lock that only one item is dequeued per
+ // heartbeat
+ if (dequeue_spacing_msec != 0)
+ break;
+ }
+
+ return true;
+ }
+}
+
+// HashTimedQueue uses a HashMap for quick lookups of elements via contains().
+
+public class HashTimedQueue<G> : TimedQueue<G> {
+ private Gee.HashMap<G, int> item_count;
+
+ public HashTimedQueue(uint hold_msec, DequeuedCallback<G> callback,
+ owned Gee.HashDataFunc<G>? hash_func = null, owned Gee.EqualDataFunc<G>? equal_func = null,
+ int priority = Priority.DEFAULT) {
+ base (hold_msec, callback, (owned) equal_func, priority);
+
+ item_count = new Gee.HashMap<G, int>((owned) hash_func, (owned) equal_func);
+ }
+
+ protected override void notify_dequeued(G item) {
+ removed(item);
+
+ base.notify_dequeued(item);
+ }
+
+ public override void clear() {
+ item_count.clear();
+
+ base.clear();
+ }
+
+ public override bool contains(G item) {
+ return item_count.has_key(item);
+ }
+
+ public override bool enqueue(G item) {
+ if (!base.enqueue(item))
+ return false;
+
+ item_count.set(item, item_count.has_key(item) ? item_count.get(item) + 1 : 1);
+
+ return true;
+ }
+
+ public override bool enqueue_many(Gee.Collection<G> items) {
+ if (!base.enqueue_many(items))
+ return false;
+
+ foreach (G item in items)
+ item_count.set(item, item_count.has_key(item) ? item_count.get(item) + 1 : 1);
+
+ return true;
+ }
+
+ public override bool remove_first(G item) {
+ if (!base.remove_first(item))
+ return false;
+
+ removed(item);
+
+ return true;
+ }
+
+ private void removed(G item) {
+ // item in question is either already removed
+ // or was never added, safe to do nothing here
+ if (!item_count.has_key(item))
+ return;
+
+ int count = item_count.get(item);
+ assert(count > 0);
+
+ if (--count == 0)
+ item_count.unset(item);
+ else
+ item_count.set(item, count);
+ }
+}
+