forked from couchbase/gocbcore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
agentgroup.go
244 lines (206 loc) · 7.69 KB
/
agentgroup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
package gocbcore
import (
"errors"
"sync"
"time"
)
// AgentGroup represents a collection of agents that can be used for performing operations
// against a cluster. It holds an internal special agent type which does not create its own
// memcached connections but registers itself for cluster config updates on all agents that
// are created through it.
type AgentGroup struct {
agentsLock sync.Mutex
boundAgents map[string]*Agent
// clusterAgent holds no memcached connections but can be used for cluster level (i.e. http) operations.
// It sets its own internal state by listening to cluster config updates on underlying agents.
clusterAgent *clusterAgent
config *AgentGroupConfig
}
// CreateAgentGroup will return a new AgentGroup with a base config of the config provided.
// Volatile: AgentGroup is subject to change or removal.
func CreateAgentGroup(config *AgentGroupConfig) (*AgentGroup, error) {
logInfof("SDK Version: gocbcore/%s", goCbCoreVersionStr)
logInfof("Creating new agent group: %+v", config)
c := config.toAgentConfig()
agent, err := CreateAgent(c)
if err != nil {
return nil, err
}
ag := &AgentGroup{
config: config,
boundAgents: make(map[string]*Agent),
}
ag.clusterAgent, err = createClusterAgent(&clusterAgentConfig{
UserAgent: config.UserAgent,
SeedConfig: config.SeedConfig,
SecurityConfig: config.SecurityConfig,
HTTPConfig: config.HTTPConfig,
TracerConfig: config.TracerConfig,
MeterConfig: config.MeterConfig,
DefaultRetryStrategy: config.DefaultRetryStrategy,
CircuitBreakerConfig: config.CircuitBreakerConfig,
})
if err != nil {
return nil, err
}
ag.clusterAgent.RegisterWith(agent.cfgManager, agent.dialer)
ag.boundAgents[config.BucketName] = agent
return ag, nil
}
// OpenBucket will attempt to open a new bucket against the cluster.
// If an agent using the specified bucket name already exists then this will not open a new connection.
func (ag *AgentGroup) OpenBucket(bucketName string) error {
if bucketName == "" {
return wrapError(errInvalidArgument, "bucket name cannot be empty")
}
existing := ag.GetAgent(bucketName)
if existing != nil {
return nil
}
config := ag.config.toAgentConfig()
config.BucketName = bucketName
agent, err := CreateAgent(config)
if err != nil {
return err
}
ag.clusterAgent.RegisterWith(agent.cfgManager, agent.dialer)
ag.agentsLock.Lock()
ag.boundAgents[bucketName] = agent
ag.maybeCloseGlobalAgent()
ag.agentsLock.Unlock()
return nil
}
// GetAgent will return the agent, if any, corresponding to the bucket name specified.
func (ag *AgentGroup) GetAgent(bucketName string) *Agent {
if bucketName == "" {
// We don't allow access to the global level agent. We close that agent on OpenBucket so we don't want
// to return an agent that we then later close. Doing so would only lead to pain.
return nil
}
ag.agentsLock.Lock()
existingAgent := ag.boundAgents[bucketName]
ag.agentsLock.Unlock()
if existingAgent != nil {
return existingAgent
}
return nil
}
// Close will close all underlying agents.
func (ag *AgentGroup) Close() error {
var firstError error
ag.agentsLock.Lock()
for _, agent := range ag.boundAgents {
ag.clusterAgent.UnregisterWith(agent.cfgManager, agent.dialer)
if err := agent.Close(); err != nil && firstError == nil {
firstError = err
}
}
ag.agentsLock.Unlock()
if err := ag.clusterAgent.Close(); err != nil && firstError == nil {
firstError = err
}
return firstError
}
// N1QLQuery executes a N1QL query against a random connected agent.
// If no agent is connected then this will block until one is available or the deadline is reached.
func (ag *AgentGroup) N1QLQuery(opts N1QLQueryOptions, cb N1QLQueryCallback) (PendingOp, error) {
return ag.clusterAgent.N1QLQuery(opts, cb)
}
// PreparedN1QLQuery executes a prepared N1QL query against a random connected agent.
// If no agent is connected then this will block until one is available or the deadline is reached.
func (ag *AgentGroup) PreparedN1QLQuery(opts N1QLQueryOptions, cb N1QLQueryCallback) (PendingOp, error) {
return ag.clusterAgent.PreparedN1QLQuery(opts, cb)
}
// AnalyticsQuery executes an analytics query against a random connected agent.
// If no agent is connected then this will block until one is available or the deadline is reached.
func (ag *AgentGroup) AnalyticsQuery(opts AnalyticsQueryOptions, cb AnalyticsQueryCallback) (PendingOp, error) {
return ag.clusterAgent.AnalyticsQuery(opts, cb)
}
// SearchQuery executes a Search query against a random connected agent.
// If no agent is connected then this will block until one is available or the deadline is reached.
func (ag *AgentGroup) SearchQuery(opts SearchQueryOptions, cb SearchQueryCallback) (PendingOp, error) {
return ag.clusterAgent.SearchQuery(opts, cb)
}
// DoHTTPRequest will perform an HTTP request against one of the HTTP
// services which are available within the SDK, using a random connected agent.
// If no agent is connected then this will block until one is available or the deadline is reached.
func (ag *AgentGroup) DoHTTPRequest(req *HTTPRequest, cb DoHTTPRequestCallback) (PendingOp, error) {
return ag.clusterAgent.DoHTTPRequest(req, cb)
}
// WaitUntilReady returns whether or not the AgentGroup can ping the requested services.
// This can only be used when no bucket has been opened, if a bucket has been opened then you *must* use the agent
// belonging to that bucket.
func (ag *AgentGroup) WaitUntilReady(deadline time.Time, opts WaitUntilReadyOptions,
cb WaitUntilReadyCallback) (PendingOp, error) {
return ag.clusterAgent.WaitUntilReady(deadline, opts, cb)
}
// Ping pings all of the servers we are connected to and returns
// a report regarding the pings that were performed.
func (ag *AgentGroup) Ping(opts PingOptions, cb PingCallback) (PendingOp, error) {
return ag.clusterAgent.Ping(opts, cb)
}
// Diagnostics returns diagnostics information about the client.
// Mainly containing a list of open connections and their current
// states.
func (ag *AgentGroup) Diagnostics(opts DiagnosticsOptions) (*DiagnosticInfo, error) {
var agents []*Agent
ag.agentsLock.Lock()
// There's no point in trying to get diagnostics from clusterAgent as it has no kv connections.
// In fact it doesn't even expose a Diagnostics function.
for _, agent := range ag.boundAgents {
agents = append(agents, agent)
}
ag.agentsLock.Unlock()
if len(agents) == 0 {
return nil, errors.New("no agents available")
}
var firstError error
var diags []*DiagnosticInfo
for _, agent := range agents {
report, err := agent.diagnostics.Diagnostics(opts)
if err != nil && firstError == nil {
firstError = err
continue
}
diags = append(diags, report)
}
if len(diags) == 0 {
return nil, firstError
}
var overallReport DiagnosticInfo
var connected int
var expected int
for _, report := range diags {
expected++
overallReport.MemdConns = append(overallReport.MemdConns, report.MemdConns...)
if report.State == ClusterStateOnline {
connected++
}
if report.ConfigRev > overallReport.ConfigRev {
overallReport.ConfigRev = report.ConfigRev
}
}
if connected == expected {
overallReport.State = ClusterStateOnline
} else if connected > 0 {
overallReport.State = ClusterStateDegraded
} else {
overallReport.State = ClusterStateOffline
}
return &overallReport, nil
}
func (ag *AgentGroup) maybeCloseGlobalAgent() {
// Close and delete the global level agent that we created on Connect.
agent := ag.boundAgents[""]
if agent == nil {
return
}
logDebugf("Shutting down global level agent")
delete(ag.boundAgents, "")
go func() {
ag.clusterAgent.UnregisterWith(agent.cfgManager, agent.dialer)
if err := agent.Close(); err != nil {
logDebugf("Failed to close agent: %s", err)
}
}()
}