Skip to content

Commit

Permalink
Merge pull request #1307 from jehiah/ctx_1307
Browse files Browse the repository at this point in the history
*: remove *Context
  • Loading branch information
jehiah authored Nov 28, 2020
2 parents c3a47b9 + c19adf3 commit 8adb229
Show file tree
Hide file tree
Showing 21 changed files with 379 additions and 399 deletions.
5 changes: 0 additions & 5 deletions nsqadmin/context.go

This file was deleted.

192 changes: 96 additions & 96 deletions nsqadmin/http.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions nsqadmin/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func basicAuthUser(req *http.Request) string {
}

func (s *httpServer) notifyAdminAction(action, topic, channel, node string, req *http.Request) {
if s.ctx.nsqadmin.getOpts().NotificationHTTPEndpoint == "" {
if s.nsqadmin.getOpts().NotificationHTTPEndpoint == "" {
return
}
via, _ := os.Hostname()
Expand Down Expand Up @@ -67,5 +67,5 @@ func (s *httpServer) notifyAdminAction(action, topic, channel, node string, req
Via: via,
}
// Perform all work in a new goroutine so this never blocks
go func() { s.ctx.nsqadmin.notifications <- a }()
go func() { s.nsqadmin.notifications <- a }()
}
2 changes: 1 addition & 1 deletion nsqadmin/nsqadmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (n *NSQAdmin) Main() error {
})
}

httpServer := NewHTTPServer(&Context{n})
httpServer := NewHTTPServer(n)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpListener, http_api.CompressHandler(httpServer), "HTTP", n.logf))
})
Expand Down
56 changes: 28 additions & 28 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Channel struct {

topicName string
name string
ctx *context
nsqd *NSQD

backend BackendQueue

Expand Down Expand Up @@ -71,7 +71,7 @@ type Channel struct {
}

// NewChannel creates a new instance of the Channel type and returns a pointer
func NewChannel(topicName string, channelName string, ctx *context,
func NewChannel(topicName string, channelName string, nsqd *NSQD,
deleteCallback func(*Channel)) *Channel {

c := &Channel{
Expand All @@ -80,16 +80,16 @@ func NewChannel(topicName string, channelName string, ctx *context,
memoryMsgChan: nil,
clients: make(map[int64]Consumer),
deleteCallback: deleteCallback,
ctx: ctx,
nsqd: nsqd,
}
// create mem-queue only if size > 0 (do not use unbuffered chan)
if ctx.nsqd.getOpts().MemQueueSize > 0 {
c.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize)
if nsqd.getOpts().MemQueueSize > 0 {
c.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize)
}
if len(ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 {
if len(nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 {
c.e2eProcessingLatencyStream = quantile.New(
ctx.nsqd.getOpts().E2EProcessingLatencyWindowTime,
ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles,
nsqd.getOpts().E2EProcessingLatencyWindowTime,
nsqd.getOpts().E2EProcessingLatencyPercentiles,
)
}

Expand All @@ -100,30 +100,30 @@ func NewChannel(topicName string, channelName string, ctx *context,
c.backend = newDummyBackendQueue()
} else {
dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
opts := ctx.nsqd.getOpts()
opts := nsqd.getOpts()
lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...)
}
// backend names, for uniqueness, automatically include the topic...
backendName := getBackendName(topicName, channelName)
c.backend = diskqueue.New(
backendName,
ctx.nsqd.getOpts().DataPath,
ctx.nsqd.getOpts().MaxBytesPerFile,
nsqd.getOpts().DataPath,
nsqd.getOpts().MaxBytesPerFile,
int32(minValidMsgLength),
int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
ctx.nsqd.getOpts().SyncEvery,
ctx.nsqd.getOpts().SyncTimeout,
int32(nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
nsqd.getOpts().SyncEvery,
nsqd.getOpts().SyncTimeout,
dqLogf,
)
}

c.ctx.nsqd.Notify(c)
c.nsqd.Notify(c)

return c
}

func (c *Channel) initPQ() {
pqSize := int(math.Max(1, float64(c.ctx.nsqd.getOpts().MemQueueSize)/10))
pqSize := int(math.Max(1, float64(c.nsqd.getOpts().MemQueueSize)/10))

c.inFlightMutex.Lock()
c.inFlightMessages = make(map[MessageID]*Message)
Expand Down Expand Up @@ -160,13 +160,13 @@ func (c *Channel) exit(deleted bool) error {
}

if deleted {
c.ctx.nsqd.logf(LOG_INFO, "CHANNEL(%s): deleting", c.name)
c.nsqd.logf(LOG_INFO, "CHANNEL(%s): deleting", c.name)

// since we are explicitly deleting a channel (not just at system exit time)
// de-register this from the lookupd
c.ctx.nsqd.Notify(c)
c.nsqd.Notify(c)
} else {
c.ctx.nsqd.logf(LOG_INFO, "CHANNEL(%s): closing", c.name)
c.nsqd.logf(LOG_INFO, "CHANNEL(%s): closing", c.name)
}

// this forceably closes client connections
Expand Down Expand Up @@ -212,7 +212,7 @@ finish:
// it does not drain inflight/deferred because it is only called in Close()
func (c *Channel) flush() error {
if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 {
c.ctx.nsqd.logf(LOG_INFO, "CHANNEL(%s): flushing %d memory %d in-flight %d deferred messages to backend",
c.nsqd.logf(LOG_INFO, "CHANNEL(%s): flushing %d memory %d in-flight %d deferred messages to backend",
c.name, len(c.memoryMsgChan), len(c.inFlightMessages), len(c.deferredMessages))
}

Expand All @@ -221,7 +221,7 @@ func (c *Channel) flush() error {
case msg := <-c.memoryMsgChan:
err := writeMessageToBackend(msg, c.backend)
if err != nil {
c.ctx.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err)
c.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err)
}
default:
goto finish
Expand All @@ -233,7 +233,7 @@ finish:
for _, msg := range c.inFlightMessages {
err := writeMessageToBackend(msg, c.backend)
if err != nil {
c.ctx.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err)
c.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err)
}
}
c.inFlightMutex.Unlock()
Expand All @@ -243,7 +243,7 @@ finish:
msg := item.Value.(*Message)
err := writeMessageToBackend(msg, c.backend)
if err != nil {
c.ctx.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err)
c.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err)
}
}
c.deferredMutex.Unlock()
Expand Down Expand Up @@ -306,9 +306,9 @@ func (c *Channel) put(m *Message) error {
case c.memoryMsgChan <- m:
default:
err := writeMessageToBackend(m, c.backend)
c.ctx.nsqd.SetHealth(err)
c.nsqd.SetHealth(err)
if err != nil {
c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
c.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
c.name, err)
return err
}
Expand All @@ -331,9 +331,9 @@ func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout ti

newTimeout := time.Now().Add(clientMsgTimeout)
if newTimeout.Sub(msg.deliveryTS) >=
c.ctx.nsqd.getOpts().MaxMsgTimeout {
c.nsqd.getOpts().MaxMsgTimeout {
// we would have gone over, set to the max
newTimeout = msg.deliveryTS.Add(c.ctx.nsqd.getOpts().MaxMsgTimeout)
newTimeout = msg.deliveryTS.Add(c.nsqd.getOpts().MaxMsgTimeout)
}

msg.pri = newTimeout.UnixNano()
Expand Down Expand Up @@ -398,7 +398,7 @@ func (c *Channel) AddClient(clientID int64, client Consumer) error {
return nil
}

maxChannelConsumers := c.ctx.nsqd.getOpts().MaxChannelConsumers
maxChannelConsumers := c.nsqd.getOpts().MaxChannelConsumers
if maxChannelConsumers != 0 && len(c.clients) >= maxChannelConsumers {
return errors.New("E_TOO_MANY_CHANNEL_CONSUMERS")
}
Expand Down
6 changes: 3 additions & 3 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestChannelEmptyConsumer(t *testing.T) {
topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel")
client := newClientV2(0, conn, &context{nsqd})
client := newClientV2(0, conn, nsqd)
client.SetReadyCount(25)
err := channel.AddClient(client.ID, client)
test.Equal(t, err, nil)
Expand Down Expand Up @@ -189,12 +189,12 @@ func TestMaxChannelConsumers(t *testing.T) {
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel")

client1 := newClientV2(1, conn, &context{nsqd})
client1 := newClientV2(1, conn, nsqd)
client1.SetReadyCount(25)
err := channel.AddClient(client1.ID, client1)
test.Equal(t, err, nil)

client2 := newClientV2(2, conn, &context{nsqd})
client2 := newClientV2(2, conn, nsqd)
client2.SetReadyCount(25)
err = channel.AddClient(client2.ID, client2)
test.NotEqual(t, err, nil)
Expand Down
36 changes: 18 additions & 18 deletions nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type clientV2 struct {
metaLock sync.RWMutex

ID int64
ctx *context
nsqd *NSQD
UserAgent string

// original connection
Expand Down Expand Up @@ -108,25 +108,25 @@ type clientV2 struct {
AuthState *auth.State
}

func newClientV2(id int64, conn net.Conn, ctx *context) *clientV2 {
func newClientV2(id int64, conn net.Conn, nsqd *NSQD) *clientV2 {
var identifier string
if conn != nil {
identifier, _, _ = net.SplitHostPort(conn.RemoteAddr().String())
}

c := &clientV2{
ID: id,
ctx: ctx,
ID: id,
nsqd: nsqd,

Conn: conn,

Reader: bufio.NewReaderSize(conn, defaultBufferSize),
Writer: bufio.NewWriterSize(conn, defaultBufferSize),

OutputBufferSize: defaultBufferSize,
OutputBufferTimeout: ctx.nsqd.getOpts().OutputBufferTimeout,
OutputBufferTimeout: nsqd.getOpts().OutputBufferTimeout,

MsgTimeout: ctx.nsqd.getOpts().MsgTimeout,
MsgTimeout: nsqd.getOpts().MsgTimeout,

// ReadyStateChan has a buffer of 1 to guarantee that in the event
// there is a race the state update is not lost
Expand All @@ -142,7 +142,7 @@ func newClientV2(id int64, conn net.Conn, ctx *context) *clientV2 {
IdentifyEventChan: make(chan identifyEvent, 1),

// heartbeats are client configurable but default to 30s
HeartbeatInterval: ctx.nsqd.getOpts().ClientTimeout / 2,
HeartbeatInterval: nsqd.getOpts().ClientTimeout / 2,

pubCounts: make(map[string]uint64),
}
Expand All @@ -155,7 +155,7 @@ func (c *clientV2) String() string {
}

func (c *clientV2) Identify(data identifyDataV2) error {
c.ctx.nsqd.logf(LOG_INFO, "[%s] IDENTIFY: %+v", c, data)
c.nsqd.logf(LOG_INFO, "[%s] IDENTIFY: %+v", c, data)

c.metaLock.Lock()
c.ClientID = data.ClientID
Expand Down Expand Up @@ -317,7 +317,7 @@ func (c *clientV2) IsReadyForMessages() bool {
readyCount := atomic.LoadInt64(&c.ReadyCount)
inFlightCount := atomic.LoadInt64(&c.InFlightCount)

c.ctx.nsqd.logf(LOG_DEBUG, "[%s] state rdy: %4d inflt: %4d", c, readyCount, inFlightCount)
c.nsqd.logf(LOG_DEBUG, "[%s] state rdy: %4d inflt: %4d", c, readyCount, inFlightCount)

if inFlightCount >= readyCount || readyCount <= 0 {
return false
Expand Down Expand Up @@ -402,7 +402,7 @@ func (c *clientV2) SetHeartbeatInterval(desiredInterval int) error {
case desiredInterval == 0:
// do nothing (use default)
case desiredInterval >= 1000 &&
desiredInterval <= int(c.ctx.nsqd.getOpts().MaxHeartbeatInterval/time.Millisecond):
desiredInterval <= int(c.nsqd.getOpts().MaxHeartbeatInterval/time.Millisecond):
c.HeartbeatInterval = time.Duration(desiredInterval) * time.Millisecond
default:
return fmt.Errorf("heartbeat interval (%d) is invalid", desiredInterval)
Expand All @@ -421,8 +421,8 @@ func (c *clientV2) SetOutputBuffer(desiredSize int, desiredTimeout int) error {
case desiredTimeout == 0:
// do nothing (use default)
case true &&
desiredTimeout >= int(c.ctx.nsqd.getOpts().MinOutputBufferTimeout/time.Millisecond) &&
desiredTimeout <= int(c.ctx.nsqd.getOpts().MaxOutputBufferTimeout/time.Millisecond):
desiredTimeout >= int(c.nsqd.getOpts().MinOutputBufferTimeout/time.Millisecond) &&
desiredTimeout <= int(c.nsqd.getOpts().MaxOutputBufferTimeout/time.Millisecond):

c.OutputBufferTimeout = time.Duration(desiredTimeout) * time.Millisecond
default:
Expand All @@ -436,7 +436,7 @@ func (c *clientV2) SetOutputBuffer(desiredSize int, desiredTimeout int) error {
c.OutputBufferTimeout = 0
case desiredSize == 0:
// do nothing (use default)
case desiredSize >= 64 && desiredSize <= int(c.ctx.nsqd.getOpts().MaxOutputBufferSize):
case desiredSize >= 64 && desiredSize <= int(c.nsqd.getOpts().MaxOutputBufferSize):
c.OutputBufferSize = desiredSize
default:
return fmt.Errorf("output buffer size (%d) is invalid", desiredSize)
Expand Down Expand Up @@ -469,7 +469,7 @@ func (c *clientV2) SetMsgTimeout(msgTimeout int) error {
case msgTimeout == 0:
// do nothing (use default)
case msgTimeout >= 1000 &&
msgTimeout <= int(c.ctx.nsqd.getOpts().MaxMsgTimeout/time.Millisecond):
msgTimeout <= int(c.nsqd.getOpts().MaxMsgTimeout/time.Millisecond):
c.MsgTimeout = time.Duration(msgTimeout) * time.Millisecond
default:
return fmt.Errorf("msg timeout (%d) is invalid", msgTimeout)
Expand All @@ -482,7 +482,7 @@ func (c *clientV2) UpgradeTLS() error {
c.writeLock.Lock()
defer c.writeLock.Unlock()

tlsConn := tls.Server(c.Conn, c.ctx.nsqd.tlsConfig)
tlsConn := tls.Server(c.Conn, c.nsqd.tlsConfig)
tlsConn.SetDeadline(time.Now().Add(5 * time.Second))
err := tlsConn.Handshake()
if err != nil {
Expand Down Expand Up @@ -570,10 +570,10 @@ func (c *clientV2) QueryAuthd() error {
}
}

authState, err := auth.QueryAnyAuthd(c.ctx.nsqd.getOpts().AuthHTTPAddresses,
authState, err := auth.QueryAnyAuthd(c.nsqd.getOpts().AuthHTTPAddresses,
remoteIP, tlsEnabled, commonName, c.AuthSecret,
c.ctx.nsqd.getOpts().HTTPClientConnectTimeout,
c.ctx.nsqd.getOpts().HTTPClientRequestTimeout)
c.nsqd.getOpts().HTTPClientConnectTimeout,
c.nsqd.getOpts().HTTPClientRequestTimeout)
if err != nil {
return err
}
Expand Down
5 changes: 0 additions & 5 deletions nsqd/context.go

This file was deleted.

Loading

0 comments on commit 8adb229

Please sign in to comment.