Billy Barrow hace 2 años
padre
commit
2f3cd3294f

+ 77 - 0
src/lib/Concrete/Fifo.vala

@@ -0,0 +1,77 @@
+namespace Invercargill {
+
+    public class Fifo<T> : Enumerable<T> {
+
+        public bool marked_complete { get; private set; }
+        public bool is_complete { get; private set; }
+
+        private FifoItem<T>? first_item = null;
+        private FifoItem<T>? last_item = null;
+
+        public void complete() {
+            mutex.lock();
+            marked_complete = true;
+            cond.broadcast();
+            mutex.unlock();
+        }
+
+        public void push(T item) {
+            mutex.lock();
+            var fifo_item = new FifoItem<T>() {
+                item = item,
+            };
+
+            if(first_item == null) {
+                first_item = fifo_item;
+                last_item = fifo_item;
+            }
+            else {
+                last_item.next_item = fifo_item;
+                last_item = fifo_item;
+            }
+
+            cond.broadcast();
+            mutex.unlock();
+        }
+
+        private bool has_next() {
+            mutex.lock();
+            while(first_item == null && !marked_complete){
+                cond.wait(mutex);
+            }
+
+            var state = first_item != null;
+            mutex.unlock();
+            return state;
+        }
+
+        private T get_next() {
+            mutex.lock();
+            var item = first_item;
+            first_item = item.next_item;
+
+            if(first_item == null && marked_complete) {
+                is_complete = true;
+            }
+            
+            mutex.unlock();
+            return item;
+        }
+
+        private Cond cond = Cond ();
+        private Mutex mutex = Mutex ();
+
+        public override Tracker<T> get_tracker () {
+
+            return new LambdaTracker<T>(
+                () => has_next(),
+                () => get_next());
+        }
+
+        private class FifoItem<T> {
+            public T item { get; set; }
+            public FifoItem<T>? next_item { get; set; }
+        }
+
+    }
+}

+ 27 - 0
src/lib/Concrete/GenericArrayEnumerable.vala

@@ -0,0 +1,27 @@
+
+namespace Invercargill {
+
+    internal class GenericArrayEnumerable<T> : Enumerable<T> {
+
+        private GenericArray<T> array;
+
+        public GenericArrayEnumerable(GenericArray<T> input) {
+            array = input;
+        }
+
+        public override Tracker<T> get_tracker() {
+            var i = 0;
+            return new LambdaTracker<T>(
+                () => {
+                    return i < array.length;
+                },
+                () => {
+                    var res = array[i];
+                    i++;
+                    return res;
+                });
+        }
+
+    }
+
+}

+ 8 - 0
src/lib/Enumerable.vala

@@ -99,6 +99,14 @@ namespace Invercargill {
             return new SkipQuery<T>(this, count);
         }
 
+        public virtual Enumerable<Tout> parallel<Tout>(owned TransformDelegate<T, Tout> transform, uint workers = 0) {
+            var actual_workers = workers;
+            if(actual_workers < 1) {
+                actual_workers = get_num_processors();
+            }
+            return new ParallelQuery<T, Tout>(this, (owned)transform, (int)actual_workers);
+        }
+
         public virtual Tout aggrigate<Tout>(Tout initial, AggrigateDelegate<Tout, T> aggrigate_func) {
             var aggrigate = initial;
             iterate(i => {

+ 4 - 0
src/lib/Invercargill.vala

@@ -12,6 +12,10 @@ namespace Invercargill {
         return new ArrayEnumerable<T>(input);
     }
 
+    public static Enumerable<T> gate<T>(GenericArray<T> input) {
+        return new GenericArrayEnumerable<T>(input);
+    }
+
     public static Enumerable<T> gte<T>(Gee.Iterable<T> input) {
         return new GeeEnumerable<T>(input);
     }

+ 79 - 0
src/lib/Queries/Parallel.vala

@@ -0,0 +1,79 @@
+namespace Invercargill {
+
+    private class ParallelQuery<Tin, Tout> : BaseQuery<Tin, Tout> {
+        private TransformDelegate<Tin, Tout> transform_func;
+        private int workers;
+
+        public ParallelQuery(Enumerable<Tin> input, owned TransformDelegate<Tin, Tout> transform, int workers) {
+            this.input = input;
+            this.workers = workers;
+            transform_func = (owned)transform;
+        }
+
+        public override Tracker<Tout> get_tracker() {
+            return new ParallelTracker<Tin, Tout>(i => transform_func(i), workers, input.get_tracker());
+        }
+    }
+
+    private class ParallelTracker<Tin, Tout> : Tracker<Tout> {
+
+        private Fifo<Tout> queue;
+        private Tracker<Tout> queue_tracker;
+        private TransformDelegate<Tin, Tout> transform_func;
+        private Tracker<Tin> input_tracker;
+
+        public ParallelTracker(owned TransformDelegate<Tin, Tout> transform, int workers, Tracker<Tin> input) {
+            input_tracker = input;
+            queue = new Fifo<Tout>();
+            queue_tracker = queue.get_tracker();
+            this.transform_func = (owned)transform;
+
+            for(int i = 0; i < workers; i++) {
+                new Thread<bool>(@"Invercargill Parallel Job Thread #$i", do_work);
+            }
+        }
+
+        private WorkerItem<Tin> next_for_worker() {
+            lock(input_tracker) {
+                var item = new WorkerItem<Tin>();
+                if(input_tracker.has_next()) {
+                    item.item = input_tracker.get_next();
+                    return item;
+                }
+                item.complete = true;
+                if(!queue.marked_complete) {
+                    queue.complete();
+                }
+
+                return item;
+            }
+        }
+
+        private bool do_work() {
+            var job = next_for_worker();
+            while(!job.complete) {
+                var output = transform_func(job.item);
+                queue.push(output);
+
+                job = next_for_worker();
+            }
+            return true;
+        }
+
+        public override bool has_next() {
+            return queue_tracker.has_next();
+        }
+        public override Tout get_next() {
+            return queue_tracker.get_next();
+        }
+
+        private class WorkerItem<Tin> {
+            public Tin? item { get; set; }
+            public bool complete { get; set; }
+        }
+
+
+
+    }
+
+}

+ 3 - 0
src/lib/meson.build

@@ -20,9 +20,11 @@ sources += files('Queries/Merge.vala')
 sources += files('Queries/Sort.vala')
 sources += files('Queries/Skip.vala')
 sources += files('Queries/Take.vala')
+sources += files('Queries/Parallel.vala')
 
 sources += files('Concrete/ArrayEnumerable.vala')
 sources += files('Concrete/GeeEnumerable.vala')
+sources += files('Concrete/GenericArrayEnumerable.vala')
 sources += files('Concrete/ListEnumerable.vala')
 sources += files('Concrete/RangeEnumerable.vala')
 sources += files('Concrete/ConcatEnumerable.vala')
@@ -31,6 +33,7 @@ sources += files('Concrete/DirEnumerable.vala')
 sources += files('Concrete/ZipperEnumerable.vala')
 sources += files('Concrete/EmptyEnumerable.vala')
 sources += files('Concrete/Sequence.vala')
+sources += files('Concrete/Fifo.vala')
 
 invercargill = shared_library('invercargill', sources,
     dependencies: dependencies,

+ 33 - 0
src/tests/Integration/Parallel.vala

@@ -0,0 +1,33 @@
+using Invercargill;
+
+void parallel_tests() {
+
+
+    Test.add_func("/invercargill/operator/parallel/task", () => {
+        var items = ate(new ParallelObj[] { 
+            new ParallelObj(),
+            new ParallelObj(),
+            new ParallelObj(),
+            new ParallelObj(),
+            new ParallelObj(),
+            new ParallelObj(),
+            new ParallelObj(),
+            new ParallelObj(),
+        });
+
+        var result = items.parallel<ParallelObj>(o => o.big_process(), 4).to_sequence();
+        
+        assert_true(result.all(o => o.processed));
+    });
+
+}
+
+class ParallelObj {
+    public bool processed;
+
+    public ParallelObj big_process() {
+        GLib.Thread.usleep(1000000);
+        processed = true;
+        return this;
+    }
+}

+ 1 - 0
src/tests/TestRunner.vala

@@ -8,6 +8,7 @@ public static int main(string[] args) {
     select_many_tests();
     gee_tests();
     tracker_tests();
+    parallel_tests();
 
     Test.run();
 

+ 1 - 0
src/tests/meson.build

@@ -8,5 +8,6 @@ sources += files('Integration/Take.vala')
 sources += files('Integration/Sort.vala')
 sources += files('Integration/Gee.vala')
 sources += files('Integration/Tracker.vala')
+sources += files('Integration/Parallel.vala')
 
 executable('invercargill-test-suite', sources, dependencies: dependencies, install: true)