summaryrefslogtreecommitdiff
path: root/src/TimedQueue.vala
diff options
context:
space:
mode:
authorJörg Frings-Fürst <debian@jff.email>2024-11-11 12:28:23 +0100
committerJörg Frings-Fürst <debian@jff.email>2024-11-11 12:28:23 +0100
commit483b2e960f24c370d9e0260ab4ba8b3453408590 (patch)
treeec5e83ac39e4e5cec0ab9494da2d0edaafe72d89 /src/TimedQueue.vala
parent1bbf886bafc680c56ddd5e27ddd803b4e03685df (diff)
parentcb001bb8056869f98e9a62248bdd509a69d08faf (diff)
Merge branch 'feature/upstrem' into develop
Diffstat (limited to 'src/TimedQueue.vala')
-rw-r--r--src/TimedQueue.vala189
1 files changed, 83 insertions, 106 deletions
diff --git a/src/TimedQueue.vala b/src/TimedQueue.vala
index 4ea6a23..ac1aab6 100644
--- a/src/TimedQueue.vala
+++ b/src/TimedQueue.vala
@@ -18,7 +18,7 @@
public delegate void DequeuedCallback<G>(G item);
-public class TimedQueue<G> {
+public class HashTimedQueue<G> {
private class Element<G> {
public G item;
public ulong ready;
@@ -42,6 +42,7 @@ public class TimedQueue<G> {
private uint dequeue_spacing_msec = 0;
private ulong last_dequeue = 0;
private bool paused_state = false;
+ private Gee.HashMap<G, int> item_count;
public virtual signal void paused(bool is_paused) {
}
@@ -49,7 +50,8 @@ public class TimedQueue<G> {
// Initial design was to have a signal that passed the dequeued G, but bug in valac meant
// finding a workaround, namely using a delegate:
// https://bugzilla.gnome.org/show_bug.cgi?id=628639
- public TimedQueue(uint hold_msec, DequeuedCallback<G> callback,
+ public HashTimedQueue(uint hold_msec, DequeuedCallback<G> callback,
+ owned Gee.HashDataFunc<G>? hash_func = null,
owned Gee.EqualDataFunc<G>? equal_func = null, int priority = Priority.DEFAULT) {
this.hold_msec = hold_msec;
this.callback = callback;
@@ -64,9 +66,10 @@ public class TimedQueue<G> {
queue = new SortedList<Element<G>>(Element.comparator);
timer_id = Timeout.add(get_heartbeat_timeout(), on_heartbeat, priority);
+ item_count = new Gee.HashMap<G, int>((owned) hash_func, (owned) equal_func);
}
- ~TimedQueue() {
+ ~HashTimedQueue() {
if (timer_id != 0)
Source.remove(timer_id);
}
@@ -93,10 +96,6 @@ public class TimedQueue<G> {
: (dequeue_spacing_msec / 2)).clamp(10, uint.MAX);
}
- protected virtual void notify_dequeued(G item) {
- callback(item);
- }
-
public bool is_paused() {
return paused_state;
}
@@ -119,50 +118,80 @@ public class TimedQueue<G> {
paused(false);
}
- public virtual void clear() {
- queue.clear();
+ public void clear() {
+ lock(queue) {
+ item_count.clear();
+ queue.clear();
+ }
}
- public virtual bool contains(G item) {
- foreach (Element<G> e in queue) {
- if (equal_func(item, e.item))
- return true;
+ public bool contains(G item) {
+ lock(queue) {
+ return item_count.has_key(item);
}
-
- return false;
}
- public virtual bool enqueue(G item) {
- return queue.add(new Element<G>(item, calc_ready_time()));
+ public bool enqueue(G item) {
+ lock(queue) {
+ if (!queue.add(new Element<G>(item, calc_ready_time()))) {
+ return false;
+ }
+ item_count.set(item, item_count.has_key(item) ? item_count.get(item) + 1 : 1);
+
+ return true;
+ }
}
- public virtual bool enqueue_many(Gee.Collection<G> items) {
+ public bool enqueue_many(Gee.Collection<G> items) {
ulong ready_time = calc_ready_time();
Gee.ArrayList<Element<G>> elements = new Gee.ArrayList<Element<G>>();
foreach (G item in items)
elements.add(new Element<G>(item, ready_time));
- return queue.add_list(elements);
+ lock(queue) {
+ if (!queue.add_list(elements)) {
+ return false;
+ }
+
+ foreach (G item in items) {
+ item_count.set(item, item_count.has_key(item) ? item_count.get(item) + 1 : 1);
+ }
+ }
+
+ return true;
+
}
- public virtual bool remove_first(G item) {
- Gee.Iterator<Element<G>> iter = queue.iterator();
- while (iter.next()) {
- Element<G> e = iter.get();
- if (equal_func(item, e.item)) {
- iter.remove();
-
- return true;
+ public bool remove_first(G item) {
+ lock(queue) {
+ var item_removed = false;
+ var iter = queue.iterator();
+ while (iter.next()) {
+ Element<G> e = iter.get();
+ if (equal_func(item, e.item)) {
+ iter.remove();
+
+ item_removed = true;
+ break;
+ }
+ }
+
+ if (!item_removed) {
+ return false;
}
+
+ removed(item);
}
-
- return false;
+
+ return true;
}
- public virtual int size {
+ public int size {
get {
- return queue.size;
+ lock(queue) {
+ return queue.size;
+ }
}
}
@@ -180,23 +209,28 @@ public class TimedQueue<G> {
if (queue.size == 0)
break;
- Element<G>? head = queue.get_at(0);
- assert(head != null);
-
- if (now == 0)
- now = now_ms();
-
- if (head.ready > now)
- break;
-
- // if a space of time is required between dequeues, check now
- if ((dequeue_spacing_msec != 0) && ((now - last_dequeue) < dequeue_spacing_msec))
- break;
-
- Element<G>? h = queue.remove_at(0);
- assert(head == h);
-
- notify_dequeued(head.item);
+ G? item = null;
+ lock(queue) {
+ Element<G>? head = queue.get_at(0);
+ assert(head != null);
+
+ if (now == 0)
+ now = now_ms();
+
+ if (head.ready > now)
+ break;
+
+ // if a space of time is required between dequeues, check now
+ if ((dequeue_spacing_msec != 0) && ((now - last_dequeue) < dequeue_spacing_msec))
+ break;
+
+ Element<G>? h = queue.remove_at(0);
+ assert(head == h);
+
+ removed(head.item);
+ item = head.item;
+ }
+ callback(item);
last_dequeue = now;
// if a dequeue spacing is in place, it's a lock that only one item is dequeued per
@@ -207,65 +241,8 @@ public class TimedQueue<G> {
return true;
}
-}
-
-// HashTimedQueue uses a HashMap for quick lookups of elements via contains().
-public class HashTimedQueue<G> : TimedQueue<G> {
- private Gee.HashMap<G, int> item_count;
-
- public HashTimedQueue(uint hold_msec, DequeuedCallback<G> callback,
- owned Gee.HashDataFunc<G>? hash_func = null, owned Gee.EqualDataFunc<G>? equal_func = null,
- int priority = Priority.DEFAULT) {
- base (hold_msec, callback, (owned) equal_func, priority);
-
- item_count = new Gee.HashMap<G, int>((owned) hash_func, (owned) equal_func);
- }
-
- protected override void notify_dequeued(G item) {
- removed(item);
-
- base.notify_dequeued(item);
- }
-
- public override void clear() {
- item_count.clear();
-
- base.clear();
- }
-
- public override bool contains(G item) {
- return item_count.has_key(item);
- }
-
- public override bool enqueue(G item) {
- if (!base.enqueue(item))
- return false;
-
- item_count.set(item, item_count.has_key(item) ? item_count.get(item) + 1 : 1);
-
- return true;
- }
-
- public override bool enqueue_many(Gee.Collection<G> items) {
- if (!base.enqueue_many(items))
- return false;
-
- foreach (G item in items)
- item_count.set(item, item_count.has_key(item) ? item_count.get(item) + 1 : 1);
-
- return true;
- }
-
- public override bool remove_first(G item) {
- if (!base.remove_first(item))
- return false;
-
- removed(item);
-
- return true;
- }
-
+ // Not locking. This is always called with the lock hold
private void removed(G item) {
// item in question is either already removed
// or was never added, safe to do nothing here