Skip to content

Commit

Permalink
Merge pull request #45 from maier/master
Browse files Browse the repository at this point in the history
v0.0.39 [CIRC-7530]
  • Loading branch information
maier authored Jan 18, 2022
2 parents 45dcd79 + adab323 commit 69535ed
Show file tree
Hide file tree
Showing 13 changed files with 1,098 additions and 58 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# v0.0.39

* add: input plugin to pull circonus httptrap stream tag formatted metrics [CIRC-7530]

# v0.0.38

* build(deps): bump github.com/shirou/gopsutil/v3 from 3.21.11 to 3.21.12
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
_ "github.com/circonus-labs/circonus-unified-agent/plugins/inputs/ceph"
_ "github.com/circonus-labs/circonus-unified-agent/plugins/inputs/cgroup"
_ "github.com/circonus-labs/circonus-unified-agent/plugins/inputs/chrony"
_ "github.com/circonus-labs/circonus-unified-agent/plugins/inputs/circ_http_json"
_ "github.com/circonus-labs/circonus-unified-agent/plugins/inputs/cisco_telemetry_mdt"
_ "github.com/circonus-labs/circonus-unified-agent/plugins/inputs/clickhouse"
_ "github.com/circonus-labs/circonus-unified-agent/plugins/inputs/cloud_pubsub"
Expand Down
12 changes: 4 additions & 8 deletions plugins/inputs/circ_http_json/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Circonus HTTP JSON

This input plugin provides the ability to fetch [Circonus HTTPTrap stream tag and structured format metrics](https://docs.circonus.com/circonus/integrations/library/httptrap/#httptrap-json-format) and forward them to a Circonus Unified Agent check.
This input plugin provides the ability to fetch [Circonus HTTPTrap stream tag and structured format metrics](https://docs.circonus.com/circonus/integrations/library/json-push-httptrap/#stream-tags) and forward them to a Circonus Unified Agent check.

## Configuration

Expand All @@ -10,21 +10,17 @@ This input plugin provides the ability to fetch [Circonus HTTPTrap stream tag an
[[inputs.circ_http_json]]
instance_id = "idb_stats"
url = "http://127.0.0.1:8112/stats.json?format=tagged"

[[inputs.circ_http_json]]
instance_id = "idb_mtev"
url = "http://127.0.0.1:8112/mtev/stats.json?format=tagged"
```

Note the addition of `?format=tagged` use for these endpoints to ensure stream tagged, structured metric format.
Note the addition of `?format=tagged` query argument -- use for these ironDB endpoints to ensure stream tagged, structured metric format.

## Example Metric Format

```json
{
"foo|ST[env:prod,app:web]": { "_type": "n", "_value": 12 },
"foo|ST[env:qa,app:web]": { "_type": "n", "_value": 0 },
"foo|ST[b\"fihiYXIp\":b\"PHF1dXg+\"]": { "_type": "n", "_value": 3 }
"foo|ST[env:qa,app:web]": { "_type": "I", "_value": 0 },
"foo|ST[b\"fihiYXIp\":b\"PHF1dXg+\"]": { "_type": "L", "_value": 3 }
}
```

Expand Down
157 changes: 107 additions & 50 deletions plugins/inputs/circ_http_json/circ_http_json.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package circhttpjson

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"net"
Expand All @@ -20,40 +20,18 @@ import (
"github.com/hashicorp/go-retryablehttp"
)

// Collect HTTPTrap JSON payloads and forward to circonus broker
//
// 1. Use HTTP to GET metrics in valid httptrap stream tagged, structured metric format.
// {
// "foo|ST[env:prod,app:web]": { "_type": "n", "_value": 12 },
// "foo|ST[env:qa,app:web]": { "_type": "n", "_value": 0 },
// "foo|ST[b\"fihiYXIp\":b\"PHF1dXg+\"]": { "_type": "n", "_value": 3 }
// }
// _type must be a valid httptrap (reconnoiter) metric type i=int,I=uint,l=int64,L=uint64,n=double,s=text,hH=histograms
// see: https://docs.circonus.com/circonus/integrations/library/httptrap/#httptrap-json-format for more
// information - note, metrics must use stream tag, structured formatting not arbitrary json formatting.
//
// 2. Verify metrics are formatted correctly (json.Marshal)
//
// 3. Forward to httptrap check
//
// Note: this input only supports direct metrics - they do NOT go through a regular output plugin

type Metric struct {
Value interface{} `json:"_value"`
Timestamp *uint64 `json:"_ts,omitempty"`
Type string `json:"_type"`
}

type Metrics map[string]Metric
// NOTE: this input only supports direct metrics - they do NOT go through a regular output plugin

type CHJ struct {
Log cua.Logger
dest *trapmetrics.TrapMetrics
tlsCfg *tls.Config
instLogger *Logshim
TLSCN string
InstanceID string `json:"instance_id"`
URL string
TLSCAFile string
TLSCN string
Debug bool
}

func (chj *CHJ) Init() error {
Expand Down Expand Up @@ -84,6 +62,12 @@ func (chj *CHJ) Init() error {

chj.dest = dest

// this is needed for retryablehttp to work...
chj.instLogger = &Logshim{
logh: chj.Log,
debug: chj.Debug,
}

return nil
}

Expand All @@ -96,6 +80,10 @@ func (*CHJ) SampleConfig() string {
instance_id = "" # required
url = "" # required
## optional, turn on debugging for the *metric fetch* phase of the plugin
## metric submission, to the broker, will output via regular agent debug setting.
debug = false
## Optional: tls ca cert file and common name to use
## pass if URL is https and not using a public ca
# tls_ca_cert_file = ""
Expand All @@ -104,27 +92,32 @@ url = "" # required
}

func (chj *CHJ) Gather(ctx context.Context, _ cua.Accumulator) error {
if chj.dest != nil {
data, err := chj.getURL(ctx)
if err != nil {
return err
}
if chj.dest == nil {
return fmt.Errorf("instance_id: %s -- no metric destination configured", chj.InstanceID)
}

if err := chj.verifyJSON(data); err != nil {
return err
}
data, err := chj.getURL(ctx)
if err != nil {
return fmt.Errorf("instance_id: %s -- fetching metrics from %s: %w", chj.InstanceID, chj.URL, err)
}

if _, err := chj.dest.FlushRawJSON(ctx, data); err != nil {
return err
}
if err := chj.hasStreamtags(data); err != nil {
return fmt.Errorf("instance_id: %s -- no streamtags found in metrics", chj.InstanceID)
}

// if err := chj.verifyJSON(data); err != nil {
// return fmt.Errorf("instance_id: %s -- invalid json from %s: %w", chj.InstanceID, chj.URL, err)
// }

if _, err := chj.dest.FlushRawJSON(ctx, data); err != nil {
return fmt.Errorf("instance_id: %s -- flushing metrics: %w", chj.InstanceID, err)
}

return nil
}

// getURL fetches the raw json from an endpoint, the JSON must:
// 1. use streamtag metric names
// 2. adhere to circonus httptrap formatting
//
// getURL fetches the raw json from an endpoint, the JSON must adhere to circonus httptrap formatting
// can handle tagged or un-tagged json formats -- the plugin just forwards the JSON it gets to the broker
func (chj *CHJ) getURL(ctx context.Context) ([]byte, error) {
var client *http.Client

Expand Down Expand Up @@ -175,7 +168,7 @@ func (chj *CHJ) getURL(ctx context.Context) ([]byte, error) {

retryClient := retryablehttp.NewClient()
retryClient.HTTPClient = client
retryClient.Logger = chj.Log
retryClient.Logger = chj.instLogger
defer retryClient.HTTPClient.CloseIdleConnections()

resp, err := retryClient.Do(req)
Expand All @@ -186,24 +179,33 @@ func (chj *CHJ) getURL(ctx context.Context) ([]byte, error) {
return nil, fmt.Errorf("making request: %w", err)
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("response status: %d", resp.StatusCode)
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("reading response body: %w", err)
}

if len(body) == 0 {
return nil, fmt.Errorf("empty response body")
}

return body, nil
}

// verifyJSON simply unmarshals a []byte into a metrics struct (defined above)
// if it works it is considered valid
func (chj *CHJ) verifyJSON(data []byte) error {
// hasStreamtags return true if there is at least one tagged metric
func (chj *CHJ) hasStreamtags(data []byte) error {

if len(data) == 0 {
return fmt.Errorf("invalid JSON (empty)")
return fmt.Errorf("empty json")
}
var m Metrics
if err := json.Unmarshal(data, &m); err != nil {
return fmt.Errorf("json unmarshal: %w", err)

if !bytes.Contains(data, []byte("|ST[")) {
return fmt.Errorf("no streamtags found")
}

return nil
}

Expand Down Expand Up @@ -259,3 +261,58 @@ func init() {
return &CHJ{}
})
}

//
// use the simple validation in hasStreamtags, the method below is very expensive and should be used with caution
//

// type Metric struct {
// Value interface{} `json:"_value"`
// Timestamp *uint64 `json:"_ts,omitempty"`
// Type string `json:"_type"`
// }

// type Metrics map[string]Metric

// // verifyJSON simply unmarshals a []byte into a metrics struct (defined above)
// // if it works it is considered valid -- valid JSON formatting:
// // https://docs.circonus.com/circonus/integrations/library/json-push-httptrap/#stream-tags
// func (chj *CHJ) verifyJSON(data []byte) error {
// if len(data) == 0 {
// return fmt.Errorf("empty json")
// }

// // short circuit if a tagged metric found
// if bytes.Contains(data, []byte("|ST[")) {
// return nil
// }

// var d1 bytes.Buffer
// if err := json.Compact(&d1, data); err != nil {
// return fmt.Errorf("json compact: %w", err)
// }

// if d1.Len() == 0 {
// return fmt.Errorf("invalid JSON (empty)")
// }

// var m Metrics
// if err := json.Unmarshal(d1.Bytes(), &m); err != nil {
// return fmt.Errorf("json unmarshal: %w", err)
// }

// if len(m) == 0 {
// return fmt.Errorf("invalid JSON (no metrics)")
// }

// d2, err := json.Marshal(m)
// if err != nil {
// return fmt.Errorf("json marshal: %w", err)
// }

// if d1.Len() != len(d2) {
// return fmt.Errorf("json invalid parse len: d1:%d != d2:%d", d1.Len(), len(d2))
// }

// return nil
// }
39 changes: 39 additions & 0 deletions plugins/inputs/circ_http_json/circ_http_json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package circhttpjson

import (
"os"
"path/filepath"
"testing"
)

func TestCHJ_hasStreamtags(t *testing.T) {
testDir := "testdata"
tests := []struct {
name string
file string
wantErr bool
}{
{"invalid json -- empty", filepath.Join(testDir, "invalid1.json"), true},
{"invalid json -- no metrics", filepath.Join(testDir, "invalid2.json"), true},
// the following format is valid for the broker, it is not valid for this plugin
{"invalid json -- non-streamtag", filepath.Join(testDir, "invalid3.json"), true},
{"invalid json -- non-streamtag idb sample", filepath.Join(testDir, "untagged-stats.json"), true},
{"valid", filepath.Join(testDir, "valid1.json"), false},
{"valid w/ts", filepath.Join(testDir, "valid2.json"), false},
{"valid -- idb sample", filepath.Join(testDir, "tagged-stats.json"), false},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
chj := &CHJ{}
data, err := os.ReadFile(tt.file)
if err != nil {
t.Fatalf("reading %s", err)
}
if err := chj.hasStreamtags(data); (err != nil) != tt.wantErr {
t.Errorf("CHJ.hasStreamtags() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
37 changes: 37 additions & 0 deletions plugins/inputs/circ_http_json/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package circhttpjson

import (
"strings"

"github.com/circonus-labs/circonus-unified-agent/cua"
)

// Logshim is for retryablehttp
type Logshim struct {
logh cua.Logger
prefix string
debug bool
}

func (l Logshim) Printf(fmt string, args ...interface{}) {
if strings.Contains(fmt, "[DEBUG]") {
// for retryablehttp (it only logs using Printf, and everything is DEBUG)
if l.debug {
l.logh.Infof(l.prefix+": "+fmt, args...)
}
} else {
l.logh.Infof(l.prefix+": "+fmt, args...)
}
}
func (l Logshim) Debugf(fmt string, args ...interface{}) {
l.logh.Debugf(l.prefix+": "+fmt, args...)
}
func (l Logshim) Infof(fmt string, args ...interface{}) {
l.logh.Infof(l.prefix+": "+fmt, args...)
}
func (l Logshim) Warnf(fmt string, args ...interface{}) {
l.logh.Warnf(l.prefix+": "+fmt, args...)
}
func (l Logshim) Errorf(fmt string, args ...interface{}) {
l.logh.Errorf(l.prefix+": "+fmt, args...)
}
Empty file.
1 change: 1 addition & 0 deletions plugins/inputs/circ_http_json/testdata/invalid2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
15 changes: 15 additions & 0 deletions plugins/inputs/circ_http_json/testdata/invalid3.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"number": 1.23,
"bignum_as_string": "281474976710656",
"test": "a text string",
"container": {
"key1": 1234
},
"array": [
1234,
"string",
{
"crazy": "like a fox"
}
]
}
Loading

0 comments on commit 69535ed

Please sign in to comment.