-
Notifications
You must be signed in to change notification settings - Fork 6
/
cmap.go
236 lines (217 loc) · 5.13 KB
/
cmap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
package cmap
import (
"bytes"
"encoding/binary"
"hash/fnv"
"sync"
)
const (
// DefaultPoolSize 提供分配共享池大小的默认值
DefaultPoolSize = 1 << 5
)
// ConcurrencyMap 并发的Map接口
type ConcurrencyMap interface {
// Get 获取给定键值对应的元素值。若没有对应的元素值则返回nil
Get(key interface{}) (interface{}, error)
// Set 给指定的键设置元素值。若该键值已存在,则替换
Set(key interface{}, elem interface{}) error
// SetIfAbsent 给指定的键设置元素值。
// 若该键值已存在,则不替换,并返回已经存在的值同时返回false
// 若改键值不存在,则增加该键值同时返回true
SetIfAbsent(key interface{}, elem interface{}) (interface{}, bool)
// Remove 删除给定键值对应的键值对,并返回旧的元素值。若没有旧元素的值则返回nil
Remove(key interface{}) (interface{}, error)
// Contains 判断是否包含给定的键值
Contains(key interface{}) (bool, error)
// Clear 清除所有的键值对
Clear()
// Len 获取键值对的数量
Len() int
// ToMap 获取已包含的键值对所组成的字典值
ToMap() map[interface{}]interface{}
// Elements 获取并发Map中的元素
Elements() <-chan ConcurrencyElement
// Keys 获取所有的键数据
Keys() []interface{}
// Values 获取所有的值数据
Values() []interface{}
}
// NewConcurrencyMap 创建并发的Map接口
// poolSize 分配共享池的大小,默认为32
func NewConcurrencyMap(poolSizes ...uint) ConcurrencyMap {
var size uint
if len(poolSizes) > 0 {
size = poolSizes[0]
} else {
size = DefaultPoolSize
}
pools := make([]*concurrencyItem, size)
for i := 0; i < int(size); i++ {
pools[i] = &concurrencyItem{
items: make(map[interface{}]interface{}),
}
}
return &concurrencyMap{
size: int(size),
pools: pools,
}
}
// ConcurrencyElement 存储的元素项
type ConcurrencyElement struct {
Key interface{}
Value interface{}
}
type concurrencyItem struct {
sync.RWMutex
items map[interface{}]interface{}
}
type concurrencyMap struct {
sync.RWMutex
size int
pools []*concurrencyItem
}
func (cm *concurrencyMap) getItem(key interface{}) (*concurrencyItem, error) {
var v interface{}
switch key.(type) {
case string:
v = []byte(key.(string))
case int:
v = int32(key.(int))
default:
v = key
}
buffer := new(bytes.Buffer)
err := binary.Write(buffer, binary.LittleEndian, v)
if err != nil {
return nil, err
}
defer buffer.Reset()
hasher := fnv.New32()
hasher.Write(buffer.Bytes())
return cm.pools[uint(hasher.Sum32())%uint(cm.size)], nil
}
func (cm *concurrencyMap) Get(key interface{}) (interface{}, error) {
item, err := cm.getItem(key)
if err != nil {
return nil, err
}
item.RLock()
v := item.items[key]
item.RUnlock()
return v, nil
}
func (cm *concurrencyMap) Set(key interface{}, elem interface{}) error {
item, err := cm.getItem(key)
if err != nil {
return err
}
item.Lock()
item.items[key] = elem
item.Unlock()
return nil
}
func (cm *concurrencyMap) SetIfAbsent(key interface{}, elem interface{}) (interface{}, bool) {
item, err := cm.getItem(key)
if err != nil {
return item, false
}
item.Lock()
_, ok := item.items[key]
if !ok {
item.items[key] = elem
}
item.Unlock()
return elem, true
}
func (cm *concurrencyMap) Remove(key interface{}) (interface{}, error) {
item, err := cm.getItem(key)
if err != nil {
return nil, err
}
item.Lock()
elem, ok := item.items[key]
if ok {
delete(item.items, key)
}
item.Unlock()
return elem, nil
}
func (cm *concurrencyMap) Contains(key interface{}) (bool, error) {
item, err := cm.getItem(key)
if err != nil {
return false, err
}
item.RLock()
_, ok := item.items[key]
item.RUnlock()
return ok, nil
}
func (cm *concurrencyMap) Clear() {
for i := 0; i < cm.size; i++ {
item := cm.pools[i]
item.Lock()
item.items = make(map[interface{}]interface{})
item.Unlock()
}
}
func (cm *concurrencyMap) Len() int {
var count int
for i := 0; i < int(cm.size); i++ {
item := cm.pools[i]
item.RLock()
count += len(item.items)
item.RUnlock()
}
return count
}
func (cm *concurrencyMap) ToMap() map[interface{}]interface{} {
data := make(map[interface{}]interface{})
for i := 0; i < cm.size; i++ {
item := cm.pools[i]
item.RLock()
for k, v := range item.items {
data[k] = v
}
item.RUnlock()
}
return data
}
func (cm *concurrencyMap) Elements() <-chan ConcurrencyElement {
chElement := make(chan ConcurrencyElement)
go func() {
for i := 0; i < cm.size; i++ {
item := cm.pools[i]
item.RLock()
for k, v := range item.items {
chElement <- ConcurrencyElement{Key: k, Value: v}
}
item.RUnlock()
}
close(chElement)
}()
return chElement
}
func (cm *concurrencyMap) Keys() []interface{} {
var keys []interface{}
for i := 0; i < cm.size; i++ {
item := cm.pools[i]
item.RLock()
for k := range item.items {
keys = append(keys, k)
}
item.RUnlock()
}
return keys
}
func (cm *concurrencyMap) Values() []interface{} {
var values []interface{}
for i := 0; i < cm.size; i++ {
item := cm.pools[i]
item.RLock()
for _, v := range item.items {
values = append(values, v)
}
item.RUnlock()
}
return values
}