diff options
author | Jörg Frings-Fürst <debian@jff.email> | 2024-11-11 12:28:23 +0100 |
---|---|---|
committer | Jörg Frings-Fürst <debian@jff.email> | 2024-11-11 12:28:23 +0100 |
commit | 483b2e960f24c370d9e0260ab4ba8b3453408590 (patch) | |
tree | ec5e83ac39e4e5cec0ab9494da2d0edaafe72d89 /src/TimedQueue.vala | |
parent | 1bbf886bafc680c56ddd5e27ddd803b4e03685df (diff) | |
parent | cb001bb8056869f98e9a62248bdd509a69d08faf (diff) |
Merge branch 'feature/upstrem' into develop
Diffstat (limited to 'src/TimedQueue.vala')
-rw-r--r-- | src/TimedQueue.vala | 189 |
1 files changed, 83 insertions, 106 deletions
diff --git a/src/TimedQueue.vala b/src/TimedQueue.vala index 4ea6a23..ac1aab6 100644 --- a/src/TimedQueue.vala +++ b/src/TimedQueue.vala @@ -18,7 +18,7 @@ public delegate void DequeuedCallback<G>(G item); -public class TimedQueue<G> { +public class HashTimedQueue<G> { private class Element<G> { public G item; public ulong ready; @@ -42,6 +42,7 @@ public class TimedQueue<G> { private uint dequeue_spacing_msec = 0; private ulong last_dequeue = 0; private bool paused_state = false; + private Gee.HashMap<G, int> item_count; public virtual signal void paused(bool is_paused) { } @@ -49,7 +50,8 @@ public class TimedQueue<G> { // Initial design was to have a signal that passed the dequeued G, but bug in valac meant // finding a workaround, namely using a delegate: // https://bugzilla.gnome.org/show_bug.cgi?id=628639 - public TimedQueue(uint hold_msec, DequeuedCallback<G> callback, + public HashTimedQueue(uint hold_msec, DequeuedCallback<G> callback, + owned Gee.HashDataFunc<G>? hash_func = null, owned Gee.EqualDataFunc<G>? equal_func = null, int priority = Priority.DEFAULT) { this.hold_msec = hold_msec; this.callback = callback; @@ -64,9 +66,10 @@ public class TimedQueue<G> { queue = new SortedList<Element<G>>(Element.comparator); timer_id = Timeout.add(get_heartbeat_timeout(), on_heartbeat, priority); + item_count = new Gee.HashMap<G, int>((owned) hash_func, (owned) equal_func); } - ~TimedQueue() { + ~HashTimedQueue() { if (timer_id != 0) Source.remove(timer_id); } @@ -93,10 +96,6 @@ public class TimedQueue<G> { : (dequeue_spacing_msec / 2)).clamp(10, uint.MAX); } - protected virtual void notify_dequeued(G item) { - callback(item); - } - public bool is_paused() { return paused_state; } @@ -119,50 +118,80 @@ public class TimedQueue<G> { paused(false); } - public virtual void clear() { - queue.clear(); + public void clear() { + lock(queue) { + item_count.clear(); + queue.clear(); + } } - public virtual bool contains(G item) { - foreach (Element<G> e in queue) { - if (equal_func(item, e.item)) - return true; + public bool contains(G item) { + lock(queue) { + return item_count.has_key(item); } - - return false; } - public virtual bool enqueue(G item) { - return queue.add(new Element<G>(item, calc_ready_time())); + public bool enqueue(G item) { + lock(queue) { + if (!queue.add(new Element<G>(item, calc_ready_time()))) { + return false; + } + item_count.set(item, item_count.has_key(item) ? item_count.get(item) + 1 : 1); + + return true; + } } - public virtual bool enqueue_many(Gee.Collection<G> items) { + public bool enqueue_many(Gee.Collection<G> items) { ulong ready_time = calc_ready_time(); Gee.ArrayList<Element<G>> elements = new Gee.ArrayList<Element<G>>(); foreach (G item in items) elements.add(new Element<G>(item, ready_time)); - return queue.add_list(elements); + lock(queue) { + if (!queue.add_list(elements)) { + return false; + } + + foreach (G item in items) { + item_count.set(item, item_count.has_key(item) ? item_count.get(item) + 1 : 1); + } + } + + return true; + } - public virtual bool remove_first(G item) { - Gee.Iterator<Element<G>> iter = queue.iterator(); - while (iter.next()) { - Element<G> e = iter.get(); - if (equal_func(item, e.item)) { - iter.remove(); - - return true; + public bool remove_first(G item) { + lock(queue) { + var item_removed = false; + var iter = queue.iterator(); + while (iter.next()) { + Element<G> e = iter.get(); + if (equal_func(item, e.item)) { + iter.remove(); + + item_removed = true; + break; + } + } + + if (!item_removed) { + return false; } + + removed(item); } - - return false; + + return true; } - public virtual int size { + public int size { get { - return queue.size; + lock(queue) { + return queue.size; + } } } @@ -180,23 +209,28 @@ public class TimedQueue<G> { if (queue.size == 0) break; - Element<G>? head = queue.get_at(0); - assert(head != null); - - if (now == 0) - now = now_ms(); - - if (head.ready > now) - break; - - // if a space of time is required between dequeues, check now - if ((dequeue_spacing_msec != 0) && ((now - last_dequeue) < dequeue_spacing_msec)) - break; - - Element<G>? h = queue.remove_at(0); - assert(head == h); - - notify_dequeued(head.item); + G? item = null; + lock(queue) { + Element<G>? head = queue.get_at(0); + assert(head != null); + + if (now == 0) + now = now_ms(); + + if (head.ready > now) + break; + + // if a space of time is required between dequeues, check now + if ((dequeue_spacing_msec != 0) && ((now - last_dequeue) < dequeue_spacing_msec)) + break; + + Element<G>? h = queue.remove_at(0); + assert(head == h); + + removed(head.item); + item = head.item; + } + callback(item); last_dequeue = now; // if a dequeue spacing is in place, it's a lock that only one item is dequeued per @@ -207,65 +241,8 @@ public class TimedQueue<G> { return true; } -} - -// HashTimedQueue uses a HashMap for quick lookups of elements via contains(). -public class HashTimedQueue<G> : TimedQueue<G> { - private Gee.HashMap<G, int> item_count; - - public HashTimedQueue(uint hold_msec, DequeuedCallback<G> callback, - owned Gee.HashDataFunc<G>? hash_func = null, owned Gee.EqualDataFunc<G>? equal_func = null, - int priority = Priority.DEFAULT) { - base (hold_msec, callback, (owned) equal_func, priority); - - item_count = new Gee.HashMap<G, int>((owned) hash_func, (owned) equal_func); - } - - protected override void notify_dequeued(G item) { - removed(item); - - base.notify_dequeued(item); - } - - public override void clear() { - item_count.clear(); - - base.clear(); - } - - public override bool contains(G item) { - return item_count.has_key(item); - } - - public override bool enqueue(G item) { - if (!base.enqueue(item)) - return false; - - item_count.set(item, item_count.has_key(item) ? item_count.get(item) + 1 : 1); - - return true; - } - - public override bool enqueue_many(Gee.Collection<G> items) { - if (!base.enqueue_many(items)) - return false; - - foreach (G item in items) - item_count.set(item, item_count.has_key(item) ? item_count.get(item) + 1 : 1); - - return true; - } - - public override bool remove_first(G item) { - if (!base.remove_first(item)) - return false; - - removed(item); - - return true; - } - + // Not locking. This is always called with the lock hold private void removed(G item) { // item in question is either already removed // or was never added, safe to do nothing here |