forked from viant/toolbox
-
Notifications
You must be signed in to change notification settings - Fork 0
/
collections_async.go
175 lines (161 loc) · 4.47 KB
/
collections_async.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package toolbox
import (
"reflect"
"sync"
)
//TrueValueProvider is a function that returns true, it takes one parameters which ignores,
//This provider can be used to make map from slice like map[some type]bool
//ProcessSliceAsync iterates over any slice, it calls handler with each element asynchronously
func ProcessSliceAsync(slice interface{}, handler func(item interface{}) bool) {
//The common cases with reflection for speed
var wg sync.WaitGroup
if aSlice, ok := slice.([]interface{}); ok {
wg.Add(len(aSlice))
for _, item := range aSlice {
go func(item interface{}) {
defer wg.Done()
handler(item)
}(item)
}
wg.Wait()
return
}
if aSlice, ok := slice.([]map[string]interface{}); ok {
wg.Add(len(aSlice))
for _, item := range aSlice {
go func(item interface{}) {
defer wg.Done()
handler(item)
}(item)
}
wg.Wait()
return
}
//The common cases with reflection for speed
if aSlice, ok := slice.([]string); ok {
wg.Add(len(aSlice))
for _, item := range aSlice {
go func(item interface{}) {
defer wg.Done()
handler(item)
}(item)
}
wg.Wait()
return
}
//The common cases with reflection for speed
if aSlice, ok := slice.([]int); ok {
wg.Add(len(aSlice))
for _, item := range aSlice {
go func(item interface{}) {
defer wg.Done()
handler(item)
}(item)
}
wg.Wait()
return
}
sliceValue := DiscoverValueByKind(reflect.ValueOf(slice), reflect.Slice)
wg.Add(sliceValue.Len())
for i := 0; i < sliceValue.Len(); i++ {
go func(item interface{}) {
defer wg.Done()
handler(item)
}(sliceValue.Index(i).Interface())
}
wg.Wait()
}
//IndexSlice reads passed in slice and applies function that takes a slice item as argument to return a key value.
//passed in resulting map needs to match key type return by a key function, and accept slice item type as argument.
func IndexSliceAsync(slice, resultingMap, keyFunction interface{}) {
var lock = sync.RWMutex{}
mapValue := DiscoverValueByKind(resultingMap, reflect.Map)
ProcessSliceAsync(slice, func(item interface{}) bool {
result := CallFunction(keyFunction, item)
lock.Lock() //otherwise, fatal error: concurrent map writes
defer lock.Unlock()
mapValue.SetMapIndex(reflect.ValueOf(result[0]), reflect.ValueOf(item))
return true
})
}
//SliceToMap reads passed in slice to to apply the key and value function for each item. Result of these calls is placed in the resulting map.
func SliceToMapAsync(sourceSlice, targetMap, keyFunction, valueFunction interface{}) {
//optimized case
var wg sync.WaitGroup
var lock = sync.RWMutex{}
if stringBoolMap, ok := targetMap.(map[string]bool); ok {
if stringSlice, ok := sourceSlice.([]string); ok {
if valueFunction, ok := keyFunction.(func(string) bool); ok {
if keyFunction, ok := keyFunction.(func(string) string); ok {
wg.Add(len(stringSlice))
for _, item := range stringSlice {
go func(item string) {
defer wg.Done()
key := keyFunction(item)
value := valueFunction(item)
lock.Lock()
defer lock.Unlock()
stringBoolMap[key] = value
}(item)
}
wg.Wait()
return
}
}
}
}
mapValue := DiscoverValueByKind(targetMap, reflect.Map)
ProcessSliceAsync(sourceSlice, func(item interface{}) bool {
key := CallFunction(keyFunction, item)
value := CallFunction(valueFunction, item)
lock.Lock()
defer lock.Unlock()
mapValue.SetMapIndex(reflect.ValueOf(key[0]), reflect.ValueOf(value[0]))
return true
})
}
func ProcessSliceWithIndexAsync(slice interface{}, handler func(index int, item interface{}) bool) {
var wg sync.WaitGroup
if aSlice, ok := slice.([]interface{}); ok {
wg.Add(len(aSlice))
for i, item := range aSlice {
go func(i int, item interface{}) {
defer wg.Done()
handler(i, item)
}(i, item)
}
wg.Wait()
return
}
if aSlice, ok := slice.([]string); ok {
wg.Add(len(aSlice))
for i, item := range aSlice {
go func(i int, item interface{}) {
defer wg.Done()
handler(i, item)
}(i, item)
}
wg.Wait()
return
}
if aSlice, ok := slice.([]int); ok {
wg.Add(len(aSlice))
for i, item := range aSlice {
go func(i int, item interface{}) {
defer wg.Done()
handler(i, item)
}(i, item)
}
wg.Wait()
return
}
sliceValue := DiscoverValueByKind(reflect.ValueOf(slice), reflect.Slice)
wg.Add(sliceValue.Len())
for i := 0; i < sliceValue.Len(); i++ {
go func(i int, item interface{}) {
defer wg.Done()
handler(i, item)
}(i, sliceValue.Index(i).Interface())
}
wg.Wait()
}