Skip to content

Commit

Permalink
Implemented time-series rollover, and sub-second precision
Browse files Browse the repository at this point in the history
  • Loading branch information
snej committed May 21, 2024
1 parent f96a4c6 commit e57c966
Show file tree
Hide file tree
Showing 11 changed files with 405 additions and 131 deletions.
16 changes: 8 additions & 8 deletions mqtt/auth_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ func (h *authHook) OnConnectAuthenticate(client *mochi.Client, pk packets.Packet
defer base.FatalPanicHandler()
dbc, username := h.server.clientDatabaseContext(client)
if dbc == nil {
base.WarnfCtx(h.ctx, "MQTT connection attempt failed with username %q -- does not begin with a database name", client.Properties.Username)
base.WarnfCtx(h.ctx, "MQTT connection attempt failed with username %q -- does not begin with a database name", base.UD(client.Properties.Username))
return false
}

user, _ := dbc.Authenticator(h.ctx).AuthenticateUser(username, string(pk.Connect.Password))
if user == nil {
base.WarnfCtx(h.ctx, "MQTT auth failure for username %q", client.Properties.Username)
base.WarnfCtx(h.ctx, "MQTT auth failure for username %q", base.UD(client.Properties.Username))
return false
}

Expand All @@ -59,11 +59,11 @@ func (h *authHook) OnConnectAuthenticate(client *mochi.Client, pk packets.Packet
// If we continue, the broker will take over that session, which is a possible
// security issue: this client would inherit its topic subscriptions.
base.WarnfCtx(h.ctx, "MQTT auth failure for username %q: reusing session ID %q already belonging to user %q",
client.Properties.Username, pk.Connect.ClientIdentifier, existing.Properties.Username)
base.UD(client.Properties.Username), pk.Connect.ClientIdentifier, base.UD(existing.Properties.Username))
return false
}

base.InfofCtx(h.ctx, base.KeyMQTT, "Client connection by user %q to db %q (session ID %q)", username, dbc.Name, client.ID)
base.InfofCtx(h.ctx, base.KeyMQTT, "Client connection by user %q to db %q (session ID %q)", base.UD(username), base.UD(dbc.Name), client.ID)
return true
}

Expand All @@ -74,21 +74,21 @@ func (h *authHook) OnACLCheck(client *mochi.Client, topic string, write bool) (a
dbc, username := h.server.clientDatabaseContext(client)
topic, ok := stripDbNameFromTopic(dbc, topic)
if !ok {
base.WarnfCtx(h.ctx, "MQTT: DB %s user %q tried to access topic %q not in that DB", dbc.Name, username, topic)
base.WarnfCtx(h.ctx, "MQTT: DB %s user %q tried to access topic %q not in that DB", base.UD(dbc.Name), base.UD(username), base.UD(topic))
return false
}

user, err := dbc.Authenticator(h.ctx).GetUser(username)
if err != nil {
base.WarnfCtx(h.ctx, "MQTT: OnACLCheck: Can't find DB user for MQTT client username %q", username)
base.WarnfCtx(h.ctx, "MQTT: OnACLCheck: Can't find DB user for MQTT client username %q", base.UD(username))
return false
}

allowed = dbcSettings(dbc).Authorize(user, topic, write)
if allowed {
base.InfofCtx(h.ctx, base.KeyMQTT, "DB %s user %q accessing topic %q, write=%v", dbc.Name, username, topic, write)
base.InfofCtx(h.ctx, base.KeyMQTT, "DB %s user %q accessing topic %q, write=%v", base.UD(dbc.Name), base.UD(username), base.UD(topic), write)
} else {
base.InfofCtx(h.ctx, base.KeyMQTT, "DB %s user %q blocked from accessing topic %q, write=%v", dbc.Name, username, topic, write)
base.InfofCtx(h.ctx, base.KeyMQTT, "DB %s user %q blocked from accessing topic %q, write=%v", base.UD(dbc.Name), base.UD(username), base.UD(topic), write)
}
return
}
4 changes: 2 additions & 2 deletions mqtt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ func (client *Client) onPublishReceived(pub paho.PublishReceived) (bool, error)
exp = base.SecondsToCbsExpiry(int(*msgExp))
}

err := IngestMessage(client.ctx, *match, pub.Packet.Payload, sub, client.database, exp)
err := ingestMessage(client.ctx, *match, pub.Packet.Payload, sub, client.database, exp)
if err != nil {
base.WarnfCtx(client.ctx, "MQTT Client %q failed to save message from topic %q: %v", client.config.Broker.ClientID, pub.Packet.Topic, err)
base.WarnfCtx(client.ctx, "MQTT Client %q failed to save message from topic %q: %v", client.config.Broker.ClientID, base.UD(pub.Packet.Topic), err)
}
return (err == nil), err
}
Expand Down
4 changes: 2 additions & 2 deletions mqtt/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (agent *clusterAgent) broadcastPublish(packet *packets.Packet) error {
return err
}
b := &packetBroadcast{data: buf.Bytes()}
base.InfofCtx(agent.ctx, base.KeyMQTT, "Broadcasting Publish packet for topic %q", packet.TopicName)
base.InfofCtx(agent.ctx, base.KeyMQTT, "Broadcasting Publish packet for topic %q", base.UD(packet.TopicName))
agent.broadcastQueue.QueueBroadcast(b)
}
return nil
Expand Down Expand Up @@ -230,7 +230,7 @@ func (agent *clusterAgent) NotifyMsg(message []byte) {
case 'P':
// This is a Publish packet
if packet, err := decodePacket(message[2:], message[1]); err == nil {
base.InfofCtx(agent.ctx, base.KeyMQTT, "Relaying PUBLISH packet from peer for topic %q (%d bytes)", packet.TopicName, len(packet.Payload))
base.InfofCtx(agent.ctx, base.KeyMQTT, "Relaying PUBLISH packet from peer for topic %q (%d bytes)", base.UD(packet.TopicName), len(packet.Payload))
err = agent.broker.Publish(packet.TopicName, packet.Payload, packet.FixedHeader.Retain, packet.FixedHeader.Qos)
if err != nil {
base.ErrorfCtx(agent.ctx, "MQTT cluster error publishing message forwarded from peer: %v", err)
Expand Down
130 changes: 81 additions & 49 deletions mqtt/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,11 @@ type IngestConfig struct {
Scope string `json:"scope,omitempty"` // Scope to save to
Collection string `json:"collection,omitempty"` // Collection to save to
Encoding *string `json:"payload_encoding,omitempty"` // How to parse payload (default "string")
Model *string `json:"model,omitempty"` // Save mode: "state" (default) or "time_series"
Model *string `json:"model,omitempty"` // Save mode: "state" (default), "time_series", "space_time_series"
StateTemplate Body `json:"state,omitempty"` // Document properties template
TimeSeries *TimeSeriesConfig `json:"time_series,omitempty"`
SpaceTimeSeries *SpaceTimeSeriesConfig `json:"space_time_series,omitempty"`
QoS *int `json:"qos,omitempty"` // QoS of subscription, client-side only (default: 2)
Channels []string `json:"channels,omitempty"` // Channel access of doc
QoS *int `json:"qos,omitempty"` // QoS of subscription, client-side only (default: 2)
}

type IngestMap map[string]*IngestConfig
Expand Down Expand Up @@ -140,14 +139,14 @@ func (bc *BrokerConfig) Validate() error {
return err
}
}
return validateSubscriptions(bc.Ingest)
return validateIngestMap(bc.Ingest)
}

func (config *ClientConfig) Validate() error {
if url, _ := url.Parse(config.Broker.URL); url == nil {
return fmt.Errorf("invalid broker URL `%s`", config.Broker.URL)
}
return validateSubscriptions(config.Ingest)
return validateIngestMap(config.Ingest)
}

func (config *TimeSeriesConfig) Validate() error {
Expand Down Expand Up @@ -192,59 +191,92 @@ func (config *TimeSeriesConfig) validateRotation() error {
return nil
}

func validateSubscriptions(subs IngestMap) error {
for topic, sub := range subs {
if sub.Scope == "" {
sub.Scope = base.DefaultScopeAndCollectionName().Scope
}
if sub.Collection == "" {
sub.Collection = base.DefaultScopeAndCollectionName().Collection
// Validates a map as a template for the "state" model.
func validateStateTemplate(template Body) error {
if template != nil {
tmpl := templater{
payload: Body{},
timestamp: time.Now(),
allowMissingProperties: true}
tmpl.apply(template)
if tmpl.err != nil {
return tmpl.err
}
if !sgbucket.IsValidDataStoreName(sub.Scope, sub.Collection) {
return fmt.Errorf("invalid scope/collection names %q, %q in subscription %q",
sub.Scope, sub.Collection, topic)
}
return nil
}

func (cfg *IngestConfig) Validate(topic string) error {
if cfg.Scope == "" {
cfg.Scope = base.DefaultScopeAndCollectionName().Scope
}
if cfg.Collection == "" {
cfg.Collection = base.DefaultScopeAndCollectionName().Collection
}
if !sgbucket.IsValidDataStoreName(cfg.Scope, cfg.Collection) {
return fmt.Errorf("invalid scope/collection names %q, %q in ingest config %q",
cfg.Scope, cfg.Collection, topic)
}

if _, err := MakeTopicFilter(topic); err != nil {
return err
}
if cfg.QoS != nil && (*cfg.QoS < 0 || *cfg.QoS > 2) {
return fmt.Errorf("invalid `qos` value %v in ingest config %q", *cfg.QoS, topic)
}
if xform := cfg.Encoding; xform != nil {
if *xform != EncodingString && *xform != EncodingBase64 && *xform != EncodingJSON {
return fmt.Errorf("invalid `transform` option %q in ingest config %q", *xform, topic)
}
}

if _, err := MakeTopicFilter(topic); err != nil {
// Check the `StateTemplate`, `TimeSeries`, `SpaceTimeSeries` properties:
inferredModel := ModelState
modelProperties := 0
if cfg.StateTemplate != nil {
if err := validateStateTemplate(cfg.StateTemplate); err != nil {
return err
}
if sub.QoS != nil && (*sub.QoS < 0 || *sub.QoS > 2) {
return fmt.Errorf("invalid `qos` value %v in subscription %q", *sub.QoS, topic)
modelProperties += 1
}
if cfg.TimeSeries != nil {
if err := cfg.TimeSeries.Validate(); err != nil {
return err
}
if xform := sub.Encoding; xform != nil {
if *xform != EncodingString && *xform != EncodingBase64 && *xform != EncodingJSON {
return fmt.Errorf("invalid `transform` option %q in subscription %q", *xform, topic)
}
modelProperties += 1
inferredModel = ModelTimeSeries
}
if cfg.SpaceTimeSeries != nil {
if err := cfg.SpaceTimeSeries.Validate(); err != nil {
return err
}
modelProperties += 1
inferredModel = ModelSpaceTimeSeries
}
if modelProperties > 1 {
return fmt.Errorf("multiple model properties in ingest config %q", topic)
}

if sub.Model != nil {
switch *sub.Model {
case ModelState:
if err := validateStateTemplate(sub.StateTemplate); err != nil {
return err
} else if sub.TimeSeries != nil || sub.SpaceTimeSeries != nil {
return fmt.Errorf("multiple model properties in subscription %q", topic)
}
case ModelTimeSeries:
if err := sub.TimeSeries.Validate(); err != nil {
return err
} else if sub.StateTemplate != nil || sub.SpaceTimeSeries != nil {
return fmt.Errorf("multiple model properties in subscription %q", topic)
}
case ModelSpaceTimeSeries:
if err := sub.SpaceTimeSeries.Validate(); err != nil {
return err
} else if sub.StateTemplate != nil || sub.TimeSeries != nil {
return fmt.Errorf("multiple model properties in subscription %q", topic)
}
default:
return fmt.Errorf("invalid `model` %q in subscription %q", *sub.Model, topic)
}
} else if sub.TimeSeries == nil && sub.StateTemplate == nil {
return fmt.Errorf("missing `model` subscription %q", topic)
} else if sub.TimeSeries != nil && sub.StateTemplate != nil {
return fmt.Errorf("cannot have both `state` and `time_series` in subscription %q", topic)
// Infer the `Model` property, or if given check that it matches:
if cfg.Model == nil {
cfg.Model = base.StringPtr(inferredModel)
} else if *cfg.Model != inferredModel {
switch *cfg.Model {
case ModelState, ModelTimeSeries, ModelSpaceTimeSeries:
return fmt.Errorf("ingest config %q has a %q property but \"model\": %q", topic, inferredModel, *cfg.Model)
default:
return fmt.Errorf("invalid \"model\": %q in ingest config %q", *cfg.Model, topic)
}
}
return nil
}

func validateIngestMap(subs IngestMap) error {
var err error
for topic, sub := range subs {
if err = sub.Validate(topic); err != nil {
break
}
}
return err
}
Loading

0 comments on commit e57c966

Please sign in to comment.