Billy Barrow 2 年 前
コミット
85f1a56275
3 ファイル変更20 行追加7 行削除
  1. 1 1
      src/lib/Concrete/Fifo.vala
  2. 9 1
      src/lib/Enumerable.vala
  3. 10 5
      src/lib/Queries/Parallel.vala

+ 1 - 1
src/lib/Concrete/Fifo.vala

@@ -15,7 +15,7 @@ namespace Invercargill {
             mutex.unlock();
         }
 
-        public void push(T item) {
+        public void push(owned T item) {
             mutex.lock();
             var fifo_item = new FifoItem<T>() {
                 item = item,

+ 9 - 1
src/lib/Enumerable.vala

@@ -99,7 +99,7 @@ namespace Invercargill {
             return new SkipQuery<T>(this, count);
         }
 
-        public virtual Enumerable<Tout> parallel<Tout>(owned TransformDelegate<T, Tout> transform, uint workers = 0) {
+        public virtual Enumerable<Tout> parallel_select<Tout>(owned TransformDelegate<T, Tout> transform, uint workers = 0) {
             var actual_workers = workers;
             if(actual_workers < 1) {
                 actual_workers = get_num_processors();
@@ -107,6 +107,14 @@ namespace Invercargill {
             return new ParallelQuery<T, Tout>(this, (owned)transform, (int)actual_workers);
         }
 
+        public virtual int parallel_iterate(ItemDelegate<T> handler, uint workers = 0) {
+            return parallel_select<T>(i => {
+                handler(i);
+                return i;
+            }, workers)
+            .count();
+        }
+
         public virtual Tout aggrigate<Tout>(Tout initial, AggrigateDelegate<Tout, T> aggrigate_func) {
             var aggrigate = initial;
             iterate(i => {

+ 10 - 5
src/lib/Queries/Parallel.vala

@@ -21,6 +21,7 @@ namespace Invercargill {
         private Tracker<Tout> queue_tracker;
         private TransformDelegate<Tin, Tout> transform_func;
         private Tracker<Tin> input_tracker;
+        private int remaining_workers;
 
         public ParallelTracker(owned TransformDelegate<Tin, Tout> transform, int workers, Tracker<Tin> input) {
             input_tracker = input;
@@ -28,6 +29,7 @@ namespace Invercargill {
             queue_tracker = queue.get_tracker();
             this.transform_func = (owned)transform;
 
+            remaining_workers = workers;
             for(int i = 0; i < workers; i++) {
                 new Thread<bool>(@"Invercargill Parallel Job Thread #$i", do_work);
             }
@@ -41,10 +43,6 @@ namespace Invercargill {
                     return item;
                 }
                 item.complete = true;
-                if(!queue.marked_complete) {
-                    queue.complete();
-                }
-
                 return item;
             }
         }
@@ -53,10 +51,17 @@ namespace Invercargill {
             var job = next_for_worker();
             while(!job.complete) {
                 var output = transform_func(job.item);
-                queue.push(output);
+                queue.push((owned)output);
 
                 job = next_for_worker();
             }
+
+            lock(remaining_workers) {
+                remaining_workers--;
+                if(remaining_workers == 0) {
+                    queue.complete();
+                }
+            }
             return true;
         }