Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT Support #6833

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions base/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ func (dh *documentBackedListener) RemoveNode(ctx context.Context, nodeID string)

// Adds or removes a nodeID from the node list document
func (dh *documentBackedListener) updateNodeList(ctx context.Context, nodeID string, remove bool) error {

dh.lock.Lock()
defer dh.lock.Unlock()

Expand All @@ -435,19 +434,22 @@ func (dh *documentBackedListener) updateNodeList(ctx context.Context, nodeID str
}
}

var what string
if remove { // RemoveNode handling
if nodeIndex == -1 {
return nil // NodeID isn't part of set, doesn't need to be removed
}
dh.nodeIDs = append(dh.nodeIDs[:nodeIndex], dh.nodeIDs[nodeIndex+1:]...)
what = "removing"
} else { // AddNode handling
if nodeIndex > -1 {
return nil // NodeID is already part of set, doesn't need to be added
}
dh.nodeIDs = append(dh.nodeIDs, nodeID)
what = "adding"
}

InfofCtx(ctx, KeyCluster, "Updating nodeList document (%s) with node IDs: %v", dh.nodeListKey, dh.nodeIDs)
InfofCtx(ctx, KeyCluster, "Updating nodeList document (%s) with node IDs: %v after %s %q", dh.nodeListKey, dh.nodeIDs, what, nodeID)

casOut, err := dh.datastore.WriteCas(dh.nodeListKey, 0, dh.cas, dh.nodeIDs, 0)

Expand Down
29 changes: 19 additions & 10 deletions base/http_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,36 @@ const (
DefaultIdleTimeout = 90 * time.Second
)

// Creates a TLS config, loading the certificate and key from disk. Returns nil if certFile is empty.
func MakeTLSConfig(certFile, keyFile string, tlsMinVersion uint16) (*tls.Config, error) {
if certFile == "" {
return nil, nil
} else if cert, err := tls.LoadX509KeyPair(certFile, keyFile); err != nil {
return nil, err
} else {
return &tls.Config{
MinVersion: tlsMinVersion,
Certificates: []tls.Certificate{cert},
}, nil
}
}

// This is like a combination of http.ListenAndServe and http.ListenAndServeTLS, which also
// uses ThrottledListen to limit the number of open HTTP connections.
func ListenAndServeHTTP(ctx context.Context, addr string, connLimit uint, certFile, keyFile string, handler http.Handler,
readTimeout, writeTimeout, readHeaderTimeout, idleTimeout time.Duration, http2Enabled bool,
tlsMinVersion uint16) (serveFn func() error, listenerAddr net.Addr, server *http.Server, err error) {
var config *tls.Config
if certFile != "" {
config = &tls.Config{}
config.MinVersion = tlsMinVersion
config, err := MakeTLSConfig(certFile, keyFile, tlsMinVersion)
if err != nil {
return nil, nil, nil, err
}
if config != nil {
protocolsEnabled := []string{"http/1.1"}
if http2Enabled {
protocolsEnabled = []string{"h2", "http/1.1"}
}
config.NextProtos = protocolsEnabled
InfofCtx(ctx, KeyHTTP, "Protocols enabled: %v on %v", config.NextProtos, SD(addr))
config.Certificates = make([]tls.Certificate, 1)
var err error
config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, nil, nil, err
}
}

// Callback that turns off TCP NODELAY option when a client transitions to a WebSocket:
Expand Down
2 changes: 2 additions & 0 deletions base/log_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
KeyImport
KeyJavascript
KeyMigrate
KeyMQTT
KeyQuery
KeyReplicate
KeySync
Expand Down Expand Up @@ -83,6 +84,7 @@ var (
KeyImport: "Import",
KeyJavascript: "Javascript",
KeyMigrate: "Migrate",
KeyMQTT: "MQTT",
KeyQuery: "Query",
KeyReplicate: "Replicate",
KeySync: "Sync",
Expand Down
201 changes: 201 additions & 0 deletions base/parse_long_duration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Copyright 2024-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.

package base

import (
"errors"
"fmt"
"time"
)

// NOTE: This file is adapted from portions of:
// https://cs.opensource.google/go/go/%20/refs/tags/go1.22.1:src/time/format.go

var unitMap = map[string]uint64{
"ns": uint64(time.Nanosecond),
"us": uint64(time.Microsecond),
"µs": uint64(time.Microsecond), // U+00B5 = micro symbol
"μs": uint64(time.Microsecond), // U+03BC = Greek letter mu
"ms": uint64(time.Millisecond),
"s": uint64(time.Second),
"min": uint64(time.Minute),
"h": uint64(time.Hour),
"d": uint64(time.Hour * 24),
"w": uint64(time.Hour * 24 * 7),
"m": uint64(time.Hour * 24 * 30),
"mon": uint64(time.Hour * 24 * 30),
"y": uint64(time.Hour * 24 * 365),
}

// ParseLongDuration is a variant of time.ParseDuration that supports longer time units.
// Valid units are "ns", "us" (or "µs"), "ms", "s", "m" (or "min"), "h",
// "d", "w", "m" (or "mon"), "y".
//
// NOTE: "m" is ambiguous. It is assumed to mean "months" unless preceded by a shorter unit;
// so for example "3m" = 3 months, but "2h30m" = 2 hours 30 minutes.
func ParseLongDuration(s string) (time.Duration, error) {
// [-+]?([0-9]*(\.[0-9]*)?[a-z]+)+
orig := s
var d uint64
neg := false
small_units := false

// Consume [-+]?
if s != "" {
c := s[0]
if c == '-' || c == '+' {
neg = c == '-'
s = s[1:]
}
}
// Special case: if all that is left is "0", this is zero.
if s == "0" {
return 0, nil
}
if s == "" {
return 0, fmt.Errorf("invalid duration %q", orig)
}
for s != "" {
var (
v, f uint64 // integers before, after decimal point
scale float64 = 1 // value = v + f/scale
)

var err error

// The next character must be [0-9.]
if !(s[0] == '.' || '0' <= s[0] && s[0] <= '9') {
return 0, fmt.Errorf("invalid duration %q", orig)
}
// Consume [0-9]*
pl := len(s)
v, s, err = leadingInt(s)
if err != nil {
return 0, fmt.Errorf("invalid duration %q", orig)
}
pre := pl != len(s) // whether we consumed anything before a period

// Consume (\.[0-9]*)?
post := false
if s != "" && s[0] == '.' {
s = s[1:]
pl := len(s)
f, scale, s = leadingFraction(s)
post = pl != len(s)
}
if !pre && !post {
// no digits (e.g. ".s" or "-.s")
return 0, fmt.Errorf("invalid duration %q", orig)
}

// Consume unit.
i := 0
for ; i < len(s); i++ {
c := s[i]
if c == '.' || '0' <= c && c <= '9' {
break
}
}
if i == 0 {
return 0, fmt.Errorf("missing unit in duration %q", orig)
}
u := s[:i]
s = s[i:]
unit, ok := unitMap[u]
if !ok {
return 0, fmt.Errorf("unknown unit %q in duration %q", u, orig)
}

if !small_units {
small_units = unit <= uint64(time.Hour)*24
} else if u == "m" {
unit = uint64(time.Minute)
}

if v > 1<<63/unit {
// overflow
return 0, fmt.Errorf("invalid duration %q", orig)
}
v *= unit
if f > 0 {
// float64 is needed to be nanosecond accurate for fractions of hours.
// v >= 0 && (f*unit/scale) <= 3.6e+12 (ns/h, h is the largest unit)
v += uint64(float64(f) * (float64(unit) / scale))
if v > 1<<63 {
// overflow
return 0, fmt.Errorf("invalid duration %q", orig)
}
}
d += v
if d > 1<<63 {
return 0, fmt.Errorf("invalid duration %q", orig)
}
}
if neg {
return -time.Duration(d), nil
}
if d > 1<<63-1 {
return 0, fmt.Errorf("invalid duration %q", orig)
}
return time.Duration(d), nil
}

var errLeadingInt = errors.New("time: bad [0-9]*") // never printed

// leadingInt consumes the leading [0-9]* from s.
func leadingInt[bytes []byte | string](s bytes) (x uint64, rem bytes, err error) {
i := 0
for ; i < len(s); i++ {
c := s[i]
if c < '0' || c > '9' {
break
}
if x > 1<<63/10 {
// overflow
return 0, rem, errLeadingInt
}
x = x*10 + uint64(c) - '0'
if x > 1<<63 {
// overflow
return 0, rem, errLeadingInt
}
}
return x, s[i:], nil
}

// leadingFraction consumes the leading [0-9]* from s.
// It is used only for fractions, so does not return an error on overflow,
// it just stops accumulating precision.
func leadingFraction(s string) (x uint64, scale float64, rem string) {
i := 0
scale = 1
overflow := false
for ; i < len(s); i++ {
c := s[i]
if c < '0' || c > '9' {
break
}
if overflow {
continue
}
if x > (1<<63-1)/10 {
// It's possible for overflow to give a positive number, so take care.
overflow = true
continue
}
y := x*10 + uint64(c) - '0'
if y > 1<<63 {
overflow = true
continue
}
x = y
scale *= 10
}
return x, scale, s[i:]
}
Loading
Loading