/* Copyright 2016 Software Freedom Conservancy Inc.
 *
 * This software is licensed under the GNU Lesser General Public License
 * (version 2.1 or later).  See the COPYING file in this distribution.
 */

// TimedQueue is a specialized collection class.  It holds items in order, but rather than being
// manually dequeued, they are dequeued automatically after a specified amount of time has elapsed
// for that item.  As of today, it's possible the item will be dequeued a bit later than asked
// for, but it will never be early.  Future implementations might tighten up the lateness.
//
// The original design was to use a signal to notify when an item has been dequeued, but Vala has
// a bug with passing an unnamed type as a signal parameter:
// https://bugzilla.gnome.org/show_bug.cgi?id=628639
//
// The rate the items come off the queue can be spaced out.  Note that this can cause items to back
// up.  As of today, TimedQueue makes no effort to combat this.

public delegate void DequeuedCallback<G>(G item);

public class TimedQueue<G> {
    private class Element<G> {
        public G item;
        public ulong ready;
        
        public Element(G item, ulong ready) {
            this.item = item;
            this.ready = ready;
        }
        
        public static int64 comparator(void *a, void *b) {
            return (int64) ((Element *) a)->ready - (int64) ((Element *) b)->ready;
        }
    }
    
    private uint hold_msec;
    private unowned DequeuedCallback<G> callback;
    private Gee.EqualDataFunc<G> equal_func;
    private int priority;
    private uint timer_id = 0;
    private SortedList<Element<G>> queue;
    private uint dequeue_spacing_msec = 0;
    private ulong last_dequeue = 0;
    private bool paused_state = false;
    
    public virtual signal void paused(bool is_paused) {
    }
    
    // Initial design was to have a signal that passed the dequeued G, but bug in valac meant
    // finding a workaround, namely using a delegate:
    // https://bugzilla.gnome.org/show_bug.cgi?id=628639
    public TimedQueue(uint hold_msec, DequeuedCallback<G> callback,
        owned Gee.EqualDataFunc? equal_func = null, int priority = Priority.DEFAULT) {
        this.hold_msec = hold_msec;
        this.callback = callback;
        
        if (equal_func != null)
            this.equal_func = (owned) equal_func;
        else
            this.equal_func = (Gee.EqualDataFunc<G>) (Gee.Functions.get_equal_func_for(typeof(G)));
            
        this.priority = priority;
        
        queue = new SortedList<Element<G>>(Element.comparator);
        
        timer_id = Timeout.add(get_heartbeat_timeout(), on_heartbeat, priority);
    }
    
    ~TimedQueue() {
        if (timer_id != 0)
            Source.remove(timer_id);
    }
    
    public uint get_dequeue_spacing_msec() {
        return dequeue_spacing_msec;
    }
    
    public void set_dequeue_spacing_msec(uint msec) {
        if (msec == dequeue_spacing_msec)
            return;
        
        if (timer_id != 0)
            Source.remove(timer_id);
        
        dequeue_spacing_msec = msec;
        
        timer_id = Timeout.add(get_heartbeat_timeout(), on_heartbeat, priority);
    }
    
    private uint get_heartbeat_timeout() {
        return ((dequeue_spacing_msec == 0)
            ? (hold_msec / 10) 
            : (dequeue_spacing_msec / 2)).clamp(10, uint.MAX);
    }
    
    protected virtual void notify_dequeued(G item) {
        callback(item);
    }
    
    public bool is_paused() {
        return paused_state;
    }
    
    public void pause() {
        if (paused_state)
            return;
        
        paused_state = true;
        
        paused(true);
    }
    
    public void unpause() {
        if (!paused_state)
            return;
        
        paused_state = false;
        
        paused(false);
    }
    
    public virtual void clear() {
        queue.clear();
    }
    
    public virtual bool contains(G item) {
        foreach (Element<G> e in queue) {
            if (equal_func(item, e.item))
                return true;
        }
        
        return false;
    }
    
    public virtual bool enqueue(G item) {
        return queue.add(new Element<G>(item, calc_ready_time()));
    }
    
    public virtual bool enqueue_many(Gee.Collection<G> items) {
        ulong ready_time = calc_ready_time();
        
        Gee.ArrayList<Element<G>> elements = new Gee.ArrayList<Element<G>>();
        foreach (G item in items)
            elements.add(new Element<G>(item, ready_time));
        
        return queue.add_list(elements);
    }
    
    public virtual bool remove_first(G item) {
        Gee.Iterator<Element<G>> iter = queue.iterator();
        while (iter.next()) {
            Element<G> e = iter.get();
            if (equal_func(item, e.item)) {
                iter.remove();
                
                return true;
            }
        }
        
        return false;
    }
    
    public virtual int size {
        get {
            return queue.size;
        }
    }
    
    private ulong calc_ready_time() {
        return now_ms() + (ulong) hold_msec;
    }
    
    private bool on_heartbeat() {
        if (paused_state)
            return true;
        
        ulong now = 0;
        
        for (;;) {
            if (queue.size == 0)
                break;
            
            Element<G>? head = queue.get_at(0);
            assert(head != null);
            
            if (now == 0)
                now = now_ms();
            
            if (head.ready > now)
                break;
            
            // if a space of time is required between dequeues, check now
            if ((dequeue_spacing_msec != 0) && ((now - last_dequeue) < dequeue_spacing_msec))
                break;
            
            Element<G>? h = queue.remove_at(0);
            assert(head == h);
            
            notify_dequeued(head.item);
            last_dequeue = now;
            
            // if a dequeue spacing is in place, it's a lock that only one item is dequeued per
            // heartbeat
            if (dequeue_spacing_msec != 0)
                break;
        }
        
        return true;
    }
}

// HashTimedQueue uses a HashMap for quick lookups of elements via contains().

public class HashTimedQueue<G> : TimedQueue<G> {
    private Gee.HashMap<G, int> item_count;
    
    public HashTimedQueue(uint hold_msec, DequeuedCallback<G> callback,
        owned Gee.HashDataFunc<G>? hash_func = null, owned Gee.EqualDataFunc<G>? equal_func = null,
        int priority = Priority.DEFAULT) {
        base (hold_msec, callback, (owned) equal_func, priority);
        
        item_count = new Gee.HashMap<G, int>((owned) hash_func, (owned) equal_func);
    }
    
    protected override void notify_dequeued(G item) {
        removed(item);
        
        base.notify_dequeued(item);
    }
    
    public override void clear() {
        item_count.clear();
        
        base.clear();
    }
    
    public override bool contains(G item) {
        return item_count.has_key(item);
    }
    
    public override bool enqueue(G item) {
        if (!base.enqueue(item))
            return false;
        
        item_count.set(item, item_count.has_key(item) ? item_count.get(item) + 1 : 1);
        
        return true;
    }
    
    public override bool enqueue_many(Gee.Collection<G> items) {
        if (!base.enqueue_many(items))
            return false;
        
        foreach (G item in items)
            item_count.set(item, item_count.has_key(item) ? item_count.get(item) + 1 : 1);
        
        return true;
    }
    
    public override bool remove_first(G item) {
        if (!base.remove_first(item))
            return false;
        
        removed(item);
        
        return true;
    }
    
    private void removed(G item) {
        // item in question is either already removed
        // or was never added, safe to do nothing here
        if (!item_count.has_key(item))
            return;
        
        int count = item_count.get(item);
        assert(count > 0);
        
        if (--count == 0)
            item_count.unset(item);
        else
            item_count.set(item, count);
    }
}