Skip to content

Commit a8addfe

Browse files
committed
Fix race conditions and deadlocks.
1 parent ab56545 commit a8addfe

File tree

1 file changed

+43
-50
lines changed

1 file changed

+43
-50
lines changed

main.go

Lines changed: 43 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -46,73 +46,66 @@ func Process(inputChan <-chan *OrderedInput, wf WorkFunction, options *Options)
4646
// Go routine to print data in order
4747
go func() {
4848
var current uint64
49-
outputMap := make(map[uint64]*processInput)
50-
for {
51-
select {
52-
case item, ok := <-aggregatorChan:
53-
if ok {
54-
if item.order != current {
55-
outputMap[item.order] = item
56-
continue
57-
}
58-
for {
59-
if item == nil {
60-
break
61-
}
62-
outputChan <- &OrderedOutput{Value: item.value}
63-
item.wg.Done()
64-
delete(outputMap, current)
65-
current++
66-
item = outputMap[current]
67-
}
68-
} else {
69-
aggregatorChan = nil
70-
}
49+
outputMap := make(map[uint64]*processInput, options.PoolSize)
50+
defer func() {
51+
close(outputChan)
52+
doneSemaphoreChan <- true
53+
}()
54+
for item := range aggregatorChan {
55+
if item.order != current {
56+
outputMap[item.order] = item
57+
continue
7158
}
72-
if aggregatorChan == nil {
73-
close(outputChan)
74-
doneSemaphoreChan <- true
59+
for {
60+
if item == nil {
61+
break
62+
}
63+
outputChan <- &OrderedOutput{Value: item.value}
64+
item.wg.Done()
65+
delete(outputMap, current)
66+
current++
67+
item = outputMap[current]
7568
}
7669
}
7770
}()
7871

79-
inputClosedSemaphoreChan := make(chan bool)
72+
poolWg := sync.WaitGroup{}
73+
poolWg.Add(processors)
8074
// Create a goroutine pool
8175
for i := 0; i < processors; i++ {
82-
go func() {
76+
go func(worker int) {
77+
defer func() {
78+
poolWg.Done()
79+
}()
8380
for input := range processChan {
8481
wg.Add(1)
8582
input.value = wf(input.value)
8683
input.wg = &wg
8784
aggregatorChan <- input
88-
select {
89-
case <-inputClosedSemaphoreChan:
90-
wg.Wait()
91-
close(aggregatorChan)
92-
default:
93-
continue
94-
}
9585
}
96-
}()
86+
}(i)
9787
}
9888

99-
var order uint64
100-
for {
101-
select {
102-
case input, ok := <-inputChan:
103-
if ok {
104-
processChan <- &processInput{input.Value, order, nil}
105-
order++
106-
} else {
107-
inputChan = nil
89+
go func() {
90+
poolWg.Wait()
91+
close(aggregatorChan)
92+
}()
93+
94+
go func() {
95+
var order uint64
96+
for {
97+
select {
98+
case input, ok := <-inputChan:
99+
if ok {
100+
processChan <- &processInput{input.Value, order, nil}
101+
order++
102+
} else {
103+
close(processChan)
104+
return
105+
}
108106
}
109107
}
110-
if inputChan == nil {
111-
close(processChan)
112-
inputClosedSemaphoreChan <- true
113-
break
114-
}
115-
}
108+
}()
116109
<-doneSemaphoreChan
117110
}()
118111
return outputChan

0 commit comments

Comments
 (0)