-
Notifications
You must be signed in to change notification settings - Fork 22
/
parallel.go
141 lines (119 loc) · 3.65 KB
/
parallel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/*Package fanout package provide a simple way for people who are difficult to write concurrency program that use
channels + goroutines + WaitGroup combination. I often find it difficult to write correctly.
more documentation and example at: https://github.com/sunfmin/fanout
*/
package fanout
import (
"errors"
"sync"
)
// feedInputs starts a goroutine to loop through inputs and send the
// input on the interface{} channel. If done is closed, feedInputs abandons its work.
func feedInputs(done <-chan int, inputs []interface{}) (<-chan interface{}, <-chan error) {
inputsChan := make(chan interface{})
errChan := make(chan error, 1)
go func() {
// Close the inputs channel after feedInputs returns.
defer close(inputsChan)
// No select needed for this send, since errc is buffered.
errChan <- func() error {
for _, input := range inputs {
select {
case inputsChan <- input:
case <-done:
// fmt.Println("feedInput Done")
return errors.New("loop canceled")
}
}
return nil
}()
}()
return inputsChan, errChan
}
type resultWithError struct {
result interface{}
err error
}
func work(done <-chan int, inputs <-chan interface{}, c chan<- resultWithError, w Worker) {
for input := range inputs {
// fmt.Println("Got ", input, " in worker")
re := resultWithError{}
re.result, re.err = w(input)
select {
case c <- re:
case <-done:
// fmt.Println("worker done.")
return
}
}
}
// Worker is the interface to be implemented when using this helper package
// If the Worker func needs to have multiple params, You can wrap them into one struct,
// Also for multiple result, You can wrap them into one result struct,
// In Worker, If it return any error, All the other workers will stop immediately.
// If you want to ignore Error in some of the workers, Then return nil error in your Worker func.
type Worker func(input interface{}) (interface{}, error)
// ParallelRun starts `workerNum` of goroutines immediately to consume the value of inputs, and provide input to `Worker` func.
// and run the `Worker`, If any worker finish, it will put the result value into a channel, then append to the results value.
// The func will block the execution and wait for all goroutines to finish, then return results all together.
func ParallelRun(workerNum int, w Worker, inputs []interface{}) ([]interface{}, error) {
// closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan int)
defer close(done)
inputsc, errc := feedInputs(done, inputs)
// fmt.Printf("errc = %#v, %d\n", errc, len(errc))
// Start a fixed number of goroutines to do the worker.
c := make(chan resultWithError)
var wg sync.WaitGroup
wg.Add(workerNum)
for i := 0; i < workerNum; i++ {
// fmt.Println("starting ", i)
go func() {
work(done, inputsc, c, w)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()
results := []interface{}{}
for r := range c {
if r.err != nil {
return nil, r.err
}
results = append(results, r.result)
}
// Check whether the feedInputs failed.
if err := <-errc; err != nil {
return nil, err
}
return results, nil
}
func ParallelRunCh(workerNum int, w Worker, inputsc chan interface{}) ([]interface{}, error) {
done := make(chan int)
defer close(done)
c := make(chan resultWithError)
var wg sync.WaitGroup
wg.Add(workerNum)
for i := 0; i < workerNum; i++ {
// fmt.Println("starting ", i)
go func() {
work(done, inputsc, c, w)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()
results := []interface{}{}
for r := range c {
if r.err != nil {
return nil, r.err
}
results = append(results, r.result)
}
return results, nil
}