-
Notifications
You must be signed in to change notification settings - Fork 24
/
fan_out_test.go
138 lines (110 loc) · 2.48 KB
/
fan_out_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package fanout
import (
"fmt"
"sync"
"testing"
)
//多工作者,重复分发
func TestFanOutDuplicateMultiWorkers(t *testing.T) {
//一路输入源
dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23}
//generator integer stream
inputChan := gen(dataStreams...)
// transfer to
ch := sq(inputChan)
// split it to 3 channel
// 重复分发
outArray := Split2(ch, 3)
length := len(outArray)
t.Log("length of out channel:", length)
var wg sync.WaitGroup
wg.Add(length)
for i := 0; i < length; i++ {
go func(in <-chan int, index int) {
sum := 0
for item := range in {
sum += item
}
fmt.Println("worker:", index, sum)
wg.Done()
}(outArray[i], i)
}
wg.Wait()
}
//单个工作者,重复分发
func TestFanOutDuplicate(t *testing.T) {
//一路输入源
dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23}
//generator integer stream
inputChan := gen(dataStreams...)
// transfer to
ch := sq(inputChan)
// split it to 3 channel
// 重复分发
outArray := Split(ch, 3)
length := len(outArray)
t.Log("length of out channel:", length)
var wg sync.WaitGroup
wg.Add(length)
for i := 0; i < length; i++ {
go func(in <-chan int, index int) {
sum := 0
for item := range in {
sum += item
}
fmt.Println("worker:", index, sum)
wg.Done()
}(outArray[i], i)
}
wg.Wait()
}
//随机分发
// worker: 2 11245
// worker: 0 14988
// worker: 1 10117
func TestFanOutRandom(t *testing.T) {
//一路输入源
dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23}
//generator integer stream
inputChan := gen(dataStreams...)
// transfer to
ch := sq(inputChan)
// split it to 3 channel
// 重复分发
outArray := Split3(ch, 3)
length := len(outArray)
t.Log("length of out channel:", length)
var wg sync.WaitGroup
wg.Add(length)
for i := 0; i < length; i++ {
go func(in <-chan int, index int) {
sum := 0
for item := range in {
sum += item
}
fmt.Println("worker:", index, sum)
wg.Done()
}(outArray[i], i)
}
wg.Wait()
}
func TestManualFanOutNumbersSeq(T *testing.T) {
//一路输入源
dataStreams := []int{13, 44, 56, 99, 9, 45, 67, 90, 78, 23}
// generate the common channel with inputs
inputChan1 := gen(dataStreams...)
inputChan2 := gen(dataStreams...)
//Manual Fan-out to 2 Go-routine
c1 := sq(inputChan1)
c2 := sq(inputChan2)
fmt.Print("c1 queue: ")
for n := range c1 {
fmt.Print(n, " ")
}
fmt.Println()
fmt.Print("c2 queue: ")
for n := range c2 {
fmt.Print(n, " ")
}
fmt.Println()
}