-
Notifications
You must be signed in to change notification settings - Fork 0
/
kv_store.go
153 lines (124 loc) · 3.69 KB
/
kv_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package hermeskv
import (
"log"
"sync"
"time"
)
type ValMetaDeta struct {
value interface{}
isTombStone bool
}
type Store struct {
// store the key value pairs
sync.RWMutex
wg *sync.WaitGroup
shutdown chan struct{}
globalState map[string]*ValMetaDeta
FIFO *DoublyLinkedList
capacity int
}
//go:generate mockgen -destination=./mocks/mock_store.go -package=mocks hermeskv StoreIface
type StoreIface interface {
Set(key string, value interface{}) error
Get(key string) (interface{}, error)
Delete(key string) error
Close()
}
var _ StoreIface = (*Store)(nil)
func GetStore(capacity int) *Store {
if capacity <= 0 {
panic("cache capacity cant be zero")
}
// map[key] = val
// key = string
// val = nodeRef stored in the DLL
s := &Store{
globalState: make(map[string]*ValMetaDeta),
FIFO: getDLL(),
capacity: capacity,
RWMutex: sync.RWMutex{},
shutdown: make(chan struct{}),
wg: &sync.WaitGroup{},
}
// start the background purger
s.wg.Add(1)
interval := 5 * time.Second
go s.purger(interval)
return s
}
// clean up old entries by checking the capacity has been breached at regular intervals
// there is a possibility that we can end up adding more entries than the current capacity if the purger hasn't cleaned up yet
// but thats fine cause eventually they will be deleted.
// well no, eventual consistent seems to be problem here cause the in-memory operations(set,get,delete) are quite fast.
// we can't guarantee when and if the purger goroutine runs and deletes the head node which leads to inconsistent results.
// this is similar to redis's background thread evicting the entries according to the set policy.
// but the only hard limit for redis is the underlying RAM so redis can use more RAM than available but EVENTUALLY it will clean up the old entries.
func (s *Store) purger(interval time.Duration) {
newTicker := time.NewTicker(interval)
for {
select {
case <-newTicker.C:
for s.FIFO.capacity > s.capacity {
s.RWMutex.Lock()
defer s.RWMutex.Unlock()
log.Println("capacity breached. deleting the head node")
node := s.FIFO.deleteHead()
delete(s.globalState, node.key)
}
case <-s.shutdown:
newTicker.Stop()
s.wg.Done()
return
}
}
}
func (s *Store) Set(key string, value interface{}) error {
// take writer lock while adding to the cache
s.RWMutex.Lock()
defer s.RWMutex.Unlock()
// check the cur len > capacity, delete the head node.
for s.FIFO.capacity >= s.capacity {
// evict the head node and update the capacity
node := s.FIFO.deleteHead()
delete(s.globalState, node.key)
}
// add node to the DLL
node := getNode(key, value, nil, nil)
newNode := s.FIFO.addNode(node)
s.globalState[key] = &ValMetaDeta{
// for globalState, value is the node reference in the doubly linked list.
value: newNode,
isTombStone: false,
}
return nil
}
func (s *Store) Get(key string) (interface{}, error) {
// shared reader lock for accessing the cache
s.RWMutex.RLock()
defer s.RWMutex.RUnlock()
valMeta, ok := s.globalState[key]
if !ok || valMeta.isTombStone {
return nil, ErrNoKey
}
node := valMeta.value.(*Node)
return node.val, nil
}
func (s *Store) Delete(key string) error {
// take writer lock while deleting from the cache
s.RWMutex.Lock()
defer s.RWMutex.Unlock()
valMeta, ok := s.globalState[key]
if !ok || valMeta.isTombStone {
return ErrNoKey
}
node := valMeta.value.(*Node)
// for the globalState, we aren't marking it as a tombStone but actually deleting it.
delete(s.globalState, key)
s.FIFO.deleteNode(node)
return nil
}
// equivalent to closing a connection. useful in graceful shutdowns
func (s *Store) Close() {
close(s.shutdown)
s.wg.Wait()
}