-
Notifications
You must be signed in to change notification settings - Fork 155
/
worker.go
163 lines (145 loc) · 4.79 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package gosms
import (
"github.com/haxpax/gosms/modem"
"log"
"strings"
"time"
)
//TODO: should be configurable
const SMSRetryLimit = 3
const (
SMSPending = iota // 0
SMSProcessed // 1
SMSError // 2
)
type SMS struct {
UUID string `json:"uuid"`
Mobile string `json:"mobile"`
Body string `json:"body"`
Status int `json:"status"`
Retries int `json:"retries"`
Device string `json:"device"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
var messages chan SMS
var wakeupMessageLoader chan bool
var bufferMaxSize int
var bufferLowCount int
var messageCountSinceLastWakeup int
var timeOfLastWakeup time.Time
var messageLoaderTimeout time.Duration
var messageLoaderCountout int
var messageLoaderLongTimeout time.Duration
func InitWorker(modems []*modem.GSMModem, bufferSize, bufferLow, loaderTimeout, countOut, loaderLongTimeout int) {
log.Println("--- InitWorker")
bufferMaxSize = bufferSize
bufferLowCount = bufferLow
messageLoaderTimeout = time.Duration(loaderTimeout) * time.Minute
messageLoaderCountout = countOut
messageLoaderLongTimeout = time.Duration(loaderLongTimeout) * time.Minute
messages = make(chan SMS, bufferMaxSize)
wakeupMessageLoader = make(chan bool, 1)
wakeupMessageLoader <- true
messageCountSinceLastWakeup = 0
timeOfLastWakeup = time.Now().Add((time.Duration(loaderTimeout) * -1) * time.Minute) //older time handles the cold start state of the system
// its important to init messages channel before starting modems because nil
// channel is non-blocking
for i := 0; i < len(modems); i++ {
modem := modems[i]
err := modem.Connect()
if err != nil {
log.Println("InitWorker: error connecting", modem.DeviceId, err)
continue
}
go processMessages(modem)
}
go messageLoader(bufferMaxSize, bufferLowCount)
}
func EnqueueMessage(message *SMS, insertToDB bool) {
log.Println("--- EnqueueMessage: ", message)
if insertToDB {
insertMessage(message)
}
//wakeup message loader and exit
go func() {
//notify the message loader only if its been to too long
//or too many messages since last notification
messageCountSinceLastWakeup++
if messageCountSinceLastWakeup > messageLoaderCountout || time.Now().Sub(timeOfLastWakeup) > messageLoaderTimeout {
log.Println("EnqueueMessage: ", "waking up message loader")
wakeupMessageLoader <- true
messageCountSinceLastWakeup = 0
timeOfLastWakeup = time.Now()
}
log.Println("EnqueueMessage - anon: count since last wakeup: ", messageCountSinceLastWakeup)
}()
}
func messageLoader(bufferSize, minFill int) {
// Load pending messages from database as needed
for {
/*
- set a fairly long timeout for wakeup
- if there are very few number of messages in the system and they failed at first go,
and there are no events happening to call EnqueueMessage, those messages might get
stalled in the system until someone knocks on the API door
- we can afford a really long polling in this case
*/
timeout := make(chan bool, 1)
go func() {
time.Sleep(messageLoaderLongTimeout)
timeout <- true
}()
log.Println("messageLoader: ", "waiting for wakeup call")
select {
case <-wakeupMessageLoader:
log.Println("messageLoader: woken up by channel call")
case <-timeout:
log.Println("messageLoader: woken up by timeout")
}
if len(messages) >= bufferLowCount {
//if we have sufficient number of messages to process,
//don't bother hitting the database
log.Println("messageLoader: ", "I have sufficient messages")
continue
}
countToFetch := bufferMaxSize - len(messages)
log.Println("messageLoader: ", "I need to fetch more messages", countToFetch)
pendingMsgs, err := getPendingMessages(countToFetch)
if err == nil {
log.Println("messageLoader: ", len(pendingMsgs), " pending messages found")
for _, msg := range pendingMsgs {
messages <- msg
}
}
}
}
func processMessages(modem *modem.GSMModem) {
defer func() {
log.Println("--- deferring ProcessMessage")
}()
//log.Println("--- ProcessMessage")
for {
message := <-messages
log.Println("processing: ", message.UUID, modem.DeviceId)
status := modem.SendSMS(message.Mobile, message.Body)
if strings.HasSuffix(status, "OK\r\n") {
message.Status = SMSProcessed
} else if strings.HasSuffix(status, "ERROR\r\n") {
message.Status = SMSError
} else {
message.Status = SMSPending
}
message.Device = modem.DeviceId
message.Retries++
updateMessageStatus(message)
if message.Status != SMSProcessed && message.Retries < SMSRetryLimit {
// push message back to queue until either it is sent successfully or
// retry count is reached
// I can't push it to channel directly. Doing so may cause the sms to be in
// the queue twice. I don't want that
EnqueueMessage(&message, false)
}
time.Sleep(5 * time.Microsecond)
}
}