Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracesexporter: Allow multiple datasources in configuration, and round-robin load-balance amongst them #190

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions exporter/clickhousetracesexporter/clickhouse_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ func newExporter(cfg component.Config, logger *zap.Logger) (*storage, error) {

configClickHouse := cfg.(*Config)

f := ClickHouseNewFactory(configClickHouse.Migrations, configClickHouse.Datasource, configClickHouse.DockerMultiNodeCluster)
f := ClickHouseNewFactory(configClickHouse.Migrations, configClickHouse.Datasources, configClickHouse.DockerMultiNodeCluster)

err := f.Initialize(logger)
conn, err := f.Initialize(logger)
if err != nil {
return nil, err
}
err = initFeatures(f.db, f.Options)
err = initFeatures(conn, f.Options)
if err != nil {
return nil, err
}
Expand All @@ -57,7 +57,7 @@ func newExporter(cfg component.Config, logger *zap.Logger) (*storage, error) {
}

collector := usage.NewUsageCollector(
f.db,
conn,
usage.Options{ReportingInterval: usage.DefaultCollectionInterval},
"signoz_traces",
UsageExporter,
Expand Down
91 changes: 50 additions & 41 deletions exporter/clickhousetracesexporter/clickhouse_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ import (

// Factory implements storage.Factory for Clickhouse backend.
type Factory struct {
logger *zap.Logger
Options *Options
db clickhouse.Conn
archive clickhouse.Conn
datasource string
makeWriter writerMaker
logger *zap.Logger
Options *Options
conns []clickhouse.Conn
archive_conns []clickhouse.Conn
makeWriter writerMaker
}

// Writer writes spans to storage.
Expand All @@ -55,7 +54,7 @@ var (
)

// NewFactory creates a new Factory.
func ClickHouseNewFactory(migrations string, datasource string, dockerMultiNodeCluster bool) *Factory {
func ClickHouseNewFactory(migrations string, datasources []string, dockerMultiNodeCluster bool) *Factory {
writeLatencyDistribution := view.Distribution(100, 250, 500, 750, 1000, 2000, 4000, 8000, 16000, 32000, 64000, 128000, 256000, 512000)

writeLatencyView := &view.View{
Expand All @@ -68,7 +67,7 @@ func ClickHouseNewFactory(migrations string, datasource string, dockerMultiNodeC

view.Register(writeLatencyView)
return &Factory{
Options: NewOptions(migrations, datasource, dockerMultiNodeCluster, primaryNamespace, archiveNamespace),
Options: NewOptions(migrations, datasources, dockerMultiNodeCluster, primaryNamespace, archiveNamespace),
// makeReader: func(db *clickhouse.Conn, operationsTable, indexTable, spansTable string) (spanstore.Reader, error) {
// return store.NewTraceReader(db, operationsTable, indexTable, spansTable), nil
// },
Expand All @@ -78,54 +77,64 @@ func ClickHouseNewFactory(migrations string, datasource string, dockerMultiNodeC
}
}

// Round-robin connection-selection strategy.
func (f *Factory) selectConn() clickhouse.Conn {
if len(f.conns) > 1 {
f.conns = append(f.conns[1:], f.conns[0])
}
return f.conns[0]
}

Comment on lines +80 to +87
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is, currently, not thread-safe. I'm not sure whether parallelism is utilized in signoz-otel-collector; should I include a mutex against the list of collectors or something like that?

// Initialize implements storage.Factory
func (f *Factory) Initialize(logger *zap.Logger) error {
func (f *Factory) Initialize(logger *zap.Logger) (clickhouse.Conn, error) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a clear way in Go to indicate what this return-value is, semantically? (I name the return-value init_conn below, but idk if folks are going to reflexively check the source …)

I'd rather indicate in the type/signature that the user shouldn't use the returned conn for anything other than initialization …

f.logger = logger

db, err := f.connect(f.Options.getPrimary())
conns, err := f.connect(f.Options.getPrimary())
if err != nil {
return fmt.Errorf("error connecting to primary db: %v", err)
return nil, fmt.Errorf("error connecting to primary db: %v", err)
}

f.db = db
f.conns = conns

archiveConfig := f.Options.others[archiveNamespace]
if archiveConfig.Enabled {
archive, err := f.connect(archiveConfig)
archive_conns, err := f.connect(archiveConfig)
if err != nil {
return fmt.Errorf("error connecting to archive db: %v", err)
return nil, fmt.Errorf("error connecting to archive db: %v", err)
}

f.archive = archive
f.archive_conns = archive_conns
}

err = patchGroupByParenInMV(db, f)
init_conn := f.selectConn()

err = patchGroupByParenInMV(init_conn, f)
if err != nil {
return err
return nil, err
}

// drop schema migrations table if running in docker multi node cluster mode so that migrations are run on new nodes
if f.Options.primary.DockerMultiNodeCluster {
err = dropSchemaMigrationsTable(db, f)
err = dropSchemaMigrationsTable(init_conn, f)
if err != nil {
return err
return nil, err
}
}

f.logger.Info("Running migrations from path: ", zap.Any("test", f.Options.primary.Migrations))
clickhouseUrl, err := buildClickhouseMigrateURL(f.Options.primary.Datasource, f.Options.primary.Cluster)
clickhouseUrl, err := buildClickhouseMigrateURL(f.Options.primary.Datasources[0], f.Options.primary.Cluster)
if err != nil {
return fmt.Errorf("Failed to build Clickhouse migrate URL, error: %s", err)
return nil, fmt.Errorf("failed to build Clickhouse migrate URL, error: %s", err)
}
m, err := migrate.New(
"file://"+f.Options.primary.Migrations,
clickhouseUrl)
if err != nil {
return fmt.Errorf("Clickhouse Migrate failed to run, error: %s", err)
return nil, fmt.Errorf("clickhouse migrate failed to run, error: %s", err)
}
err = m.Up()
f.logger.Info("Clickhouse Migrate finished", zap.Error(err))
return nil
return init_conn, nil
}

func patchGroupByParenInMV(db clickhouse.Conn, f *Factory) error {
Expand Down Expand Up @@ -232,7 +241,7 @@ func buildClickhouseMigrateURL(datasource string, cluster string) (string, error
}
host := parsedURL.Host
if host == "" {
return "", fmt.Errorf("Unable to parse host")
return "", fmt.Errorf("unable to parse host")

}
paramMap, err := url.ParseQuery(parsedURL.RawQuery)
Expand All @@ -250,12 +259,12 @@ func buildClickhouseMigrateURL(datasource string, cluster string) (string, error
return clickhouseUrl, nil
}

func (f *Factory) connect(cfg *namespaceConfig) (clickhouse.Conn, error) {
func (f *Factory) connect(cfg *namespaceConfig) ([]clickhouse.Conn, error) {
if cfg.Encoding != EncodingJSON && cfg.Encoding != EncodingProto {
return nil, fmt.Errorf("unknown encoding %q, supported: %q, %q", cfg.Encoding, EncodingJSON, EncodingProto)
}

return cfg.Connector(cfg)
return cfg.Connectors(cfg)
}

// AddFlags implements plugin.Configurable
Expand All @@ -273,7 +282,7 @@ func (f *Factory) CreateSpanWriter() (Writer, error) {
cfg := f.Options.getPrimary()
return f.makeWriter(WriterOptions{
logger: f.logger,
db: f.db,
conns: f.conns,
traceDatabase: cfg.TraceDatabase,
spansTable: cfg.SpansTable,
indexTable: cfg.IndexTable,
Expand All @@ -286,13 +295,13 @@ func (f *Factory) CreateSpanWriter() (Writer, error) {

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (Writer, error) {
if f.archive == nil {
if f.archive_conns == nil {
return nil, nil
}
cfg := f.Options.others[archiveNamespace]
return f.makeWriter(WriterOptions{
logger: f.logger,
db: f.archive,
conns: f.archive_conns,
traceDatabase: cfg.TraceDatabase,
spansTable: cfg.SpansTable,
indexTable: cfg.IndexTable,
Expand All @@ -305,22 +314,22 @@ func (f *Factory) CreateArchiveSpanWriter() (Writer, error) {

// Close Implements io.Closer and closes the underlying storage
func (f *Factory) Close() error {
if f.db != nil {
err := f.db.Close()
if err != nil {
return err
if len(f.conns) > 0 {
for _, conn := range f.conns {
if err := conn.Close(); err != nil {
return err
}
}

f.db = nil
f.conns = nil
}

if f.archive != nil {
err := f.archive.Close()
if err != nil {
return err
if len(f.archive_conns) > 0 {
for _, conn := range f.archive_conns {
if err := conn.Close(); err != nil {
return err
}
}

f.archive = nil
f.archive_conns = nil
}

return nil
Expand Down
6 changes: 3 additions & 3 deletions exporter/clickhousetracesexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (

// Config defines configuration for tracing exporter.
type Config struct {
Options `mapstructure:",squash"`
Datasource string `mapstructure:"datasource"`
Migrations string `mapstructure:"migrations"`
Options `mapstructure:",squash"`
Datasources []string `mapstructure:"datasources"`
Migrations string `mapstructure:"migrations"`
// Docker Multi Node Cluster is a flag to enable the docker multi node cluster. Default is false.
DockerMultiNodeCluster bool `mapstructure:"docker_multi_node_cluster"`
// LowCardinalExceptionGrouping is a flag to enable exception grouping by serviceName + exceptionType. Default is false.
Expand Down
50 changes: 50 additions & 0 deletions exporter/clickhousetracesexporter/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clickhousetracesexporter

import (
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/otelcol/otelcoltest"
)

func TestLoadConfig(t *testing.T) {
factories, err := otelcoltest.NopFactories()
require.NoError(t, err)

factory := NewFactory()
factories.Exporters[typeStr] = factory
cfg, err := otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Exporters), 2)

defaultCfg := factory.CreateDefaultConfig()
defaultCfg.(*Config).Datasources = []string{"tcp://127.0.0.1:9000"}
r0 := cfg.Exporters[component.NewID(typeStr)]
assert.Equal(t, r0, defaultCfg)

r1 := cfg.Exporters[component.NewIDWithName(typeStr, "full")].(*Config)
assert.Equal(t, r1, &Config{
Datasources: []string{"tcp://clickhouse:9000/?database=signoz_traces"},
DockerMultiNodeCluster: true,
LowCardinalExceptionGrouping: true,
})
}
Loading