Переглянути джерело

Add more parallel functions

Billy Barrow 1 тиждень тому
батько
коміт
2fb74a8c14

+ 13 - 1
src/lib/Enumerable.vala

@@ -214,7 +214,7 @@ namespace Invercargill {
             if(actual_workers < 1) {
                 actual_workers = get_num_processors();
             }
-            return new Parallel<T, TOut>(this, (owned)transform, (int)actual_workers);
+            return new ParallelTransform<T, TOut>(this, (owned)transform, (int)actual_workers);
         }
 
         public virtual void parallel_iterate(ItemDelegate<T> handler, uint workers = 0) {
@@ -225,6 +225,18 @@ namespace Invercargill {
             .iterate();
         }
 
+        public virtual Enumerable<T> parallel_where<T>(owned PredicateDelegate<T> predicate, uint workers = 0) {
+            var actual_workers = workers;
+            if(actual_workers < 1) {
+                actual_workers = get_num_processors();
+            }
+            return new ParallelFilter<T>(this, (owned)predicate, (int)actual_workers);
+        }
+
+        public virtual Enumerable<SelectionContext<T, TOut>> parallel_contextualised_select<TOut>(owned TransformDelegate<T, TOut> transform) {
+            return parallel_select<SelectionContext<T, TOut>>((i) => new SelectionContext<T, TOut>(i, transform(i)));
+        }
+
         public virtual Enumerable<SelectionContext<T, TOut>> contextualised_select<TOut>(owned TransformDelegate<T, TOut> transform) {
             return select<SelectionContext<T, TOut>>((i) => new SelectionContext<T, TOut>(i, transform(i)));
         }

+ 0 - 0
src/lib/Modifiers/Histogram.vala


+ 92 - 0
src/lib/Modifiers/ParallelFilter.vala

@@ -0,0 +1,92 @@
+using Invercargill.DataStructures;
+namespace Invercargill.Modifiers {
+
+    public class ParallelFilter<T> : Enumerable<T>{
+        private PredicateDelegate<T> predicate;
+        private Enumerable<T> input;
+        private int workers;
+
+        public ParallelFilter(Enumerable<T> input, owned PredicateDelegate<T> predicate, int workers) {
+            this.input = input;
+            this.workers = workers;
+            this.predicate = (owned)predicate;
+        }
+
+        public override int? peek_count() {
+            return input.peek_count();
+        }
+
+        public override EnumerableInfo get_info() {
+            return new EnumerableInfo.infer_single(this, EnumerableCategory.COMPUTED, input);
+        }
+
+        public override Tracker<T> get_tracker() {
+            return new ParallelFilterTracker<T>(i => predicate(i), workers, input.get_tracker());
+        }
+    }
+
+    private class ParallelFilterTracker<T> : Tracker<T> {
+
+        private Fifo<T> queue;
+        private Tracker<T> queue_tracker;
+        private PredicateDelegate<T> predicate;
+        private Tracker<T> input_tracker;
+        private int remaining_workers;
+
+        public ParallelFilterTracker(owned PredicateDelegate<T> predicate, int workers, Tracker<T> input) {
+            input_tracker = input;
+            queue = new Fifo<T>();
+            queue_tracker = queue.get_tracker();
+            this.predicate = (owned)predicate;
+
+            remaining_workers = workers;
+            for(int i = 0; i < workers; i++) {
+                new Thread<bool>(@"Invercargill Parallel Job Thread #$i", do_work);
+            }
+        }
+
+        private WorkerItem<T> next_for_worker() {
+            lock(input_tracker) {
+                var item = new WorkerItem<T>();
+                if(input_tracker.has_next()) {
+                    item.item = input_tracker.get_next();
+                    return item;
+                }
+                item.complete = true;
+                return item;
+            }
+        }
+
+        private bool do_work() {
+            var job = next_for_worker();
+            while(!job.complete) {
+                if(predicate(job.item)) {
+                    queue.push(job.item);
+                }
+                job = next_for_worker();
+            }
+
+            lock(remaining_workers) {
+                remaining_workers--;
+                if(remaining_workers == 0) {
+                    queue.unblock();
+                }
+            }
+            return true;
+        }
+
+        public override bool has_next() {
+            return queue_tracker.has_next();
+        }
+        public override T get_next() {
+            return queue_tracker.get_next();
+        }
+
+        private class WorkerItem<TIn> {
+            public TIn? item { get; set; }
+            public bool complete { get; set; }
+        }
+
+    }
+
+}

+ 5 - 5
src/lib/Modifiers/Parallel.vala → src/lib/Modifiers/ParallelTransform.vala

@@ -1,12 +1,12 @@
 using Invercargill.DataStructures;
 namespace Invercargill.Modifiers {
 
-    public class Parallel<TIn, TOut> : Enumerable<TOut>{
+    public class ParallelTransform<TIn, TOut> : Enumerable<TOut>{
         private TransformDelegate<TIn, TOut> transform_func;
         private Enumerable<TIn> input;
         private int workers;
 
-        public Parallel(Enumerable<TIn> input, owned TransformDelegate<TIn, TOut> transform, int workers) {
+        public ParallelTransform(Enumerable<TIn> input, owned TransformDelegate<TIn, TOut> transform, int workers) {
             this.input = input;
             this.workers = workers;
             transform_func = (owned)transform;
@@ -21,11 +21,11 @@ namespace Invercargill.Modifiers {
         }
 
         public override Tracker<TOut> get_tracker() {
-            return new ParallelTracker<TIn, TOut>(i => transform_func(i), workers, input.get_tracker());
+            return new ParallelTransformTracker<TIn, TOut>(i => transform_func(i), workers, input.get_tracker());
         }
     }
 
-    private class ParallelTracker<TIn, TOut> : Tracker<TOut> {
+    private class ParallelTransformTracker<TIn, TOut> : Tracker<TOut> {
 
         private Fifo<TOut> queue;
         private Tracker<TOut> queue_tracker;
@@ -33,7 +33,7 @@ namespace Invercargill.Modifiers {
         private Tracker<TIn> input_tracker;
         private int remaining_workers;
 
-        public ParallelTracker(owned TransformDelegate<TIn, TOut> transform, int workers, Tracker<TIn> input) {
+        public ParallelTransformTracker(owned TransformDelegate<TIn, TOut> transform, int workers, Tracker<TIn> input) {
             input_tracker = input;
             queue = new Fifo<TOut>();
             queue_tracker = queue.get_tracker();

+ 2 - 2
src/lib/meson.build

@@ -41,7 +41,8 @@ sources += files('Modifiers/Merge.vala')
 sources += files('Modifiers/Sort.vala')
 sources += files('Modifiers/Skip.vala')
 sources += files('Modifiers/Take.vala')
-sources += files('Modifiers/Parallel.vala')
+sources += files('Modifiers/ParallelTransform.vala')
+sources += files('Modifiers/ParallelFilter.vala')
 sources += files('Modifiers/Unique.vala')
 sources += files('Modifiers/Position.vala')
 sources += files('Modifiers/FilterTransform.vala')
@@ -60,7 +61,6 @@ sources += files('Modifiers/SymmetricDifference.vala')
 sources += files('Modifiers/Chunk.vala')
 sources += files('Modifiers/Cycle.vala')
 sources += files('Modifiers/GroupSequential.vala')
-sources += files('Modifiers/Histogram.vala')
 sources += files('Modifiers/Padding.vala')
 sources += files('Modifiers/Reverse.vala')
 sources += files('Modifiers/Scanner.vala')