-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
tracesexporter: Allow multiple datasources
in configuration, and round-robin load-balance amongst them
#190
base: main
Are you sure you want to change the base?
Changes from all commits
c9b2219
d11d017
9ab8cc4
aca13fe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,12 +33,11 @@ import ( | |
|
||
// Factory implements storage.Factory for Clickhouse backend. | ||
type Factory struct { | ||
logger *zap.Logger | ||
Options *Options | ||
db clickhouse.Conn | ||
archive clickhouse.Conn | ||
datasource string | ||
makeWriter writerMaker | ||
logger *zap.Logger | ||
Options *Options | ||
conns []clickhouse.Conn | ||
archive_conns []clickhouse.Conn | ||
makeWriter writerMaker | ||
} | ||
|
||
// Writer writes spans to storage. | ||
|
@@ -55,7 +54,7 @@ var ( | |
) | ||
|
||
// NewFactory creates a new Factory. | ||
func ClickHouseNewFactory(migrations string, datasource string, dockerMultiNodeCluster bool) *Factory { | ||
func ClickHouseNewFactory(migrations string, datasources []string, dockerMultiNodeCluster bool) *Factory { | ||
writeLatencyDistribution := view.Distribution(100, 250, 500, 750, 1000, 2000, 4000, 8000, 16000, 32000, 64000, 128000, 256000, 512000) | ||
|
||
writeLatencyView := &view.View{ | ||
|
@@ -68,7 +67,7 @@ func ClickHouseNewFactory(migrations string, datasource string, dockerMultiNodeC | |
|
||
view.Register(writeLatencyView) | ||
return &Factory{ | ||
Options: NewOptions(migrations, datasource, dockerMultiNodeCluster, primaryNamespace, archiveNamespace), | ||
Options: NewOptions(migrations, datasources, dockerMultiNodeCluster, primaryNamespace, archiveNamespace), | ||
// makeReader: func(db *clickhouse.Conn, operationsTable, indexTable, spansTable string) (spanstore.Reader, error) { | ||
// return store.NewTraceReader(db, operationsTable, indexTable, spansTable), nil | ||
// }, | ||
|
@@ -78,54 +77,64 @@ func ClickHouseNewFactory(migrations string, datasource string, dockerMultiNodeC | |
} | ||
} | ||
|
||
// Round-robin connection-selection strategy. | ||
func (f *Factory) selectConn() clickhouse.Conn { | ||
if len(f.conns) > 1 { | ||
f.conns = append(f.conns[1:], f.conns[0]) | ||
} | ||
return f.conns[0] | ||
} | ||
|
||
// Initialize implements storage.Factory | ||
func (f *Factory) Initialize(logger *zap.Logger) error { | ||
func (f *Factory) Initialize(logger *zap.Logger) (clickhouse.Conn, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a clear way in Go to indicate what this return-value is, semantically? (I name the return-value I'd rather indicate in the type/signature that the user shouldn't use the returned |
||
f.logger = logger | ||
|
||
db, err := f.connect(f.Options.getPrimary()) | ||
conns, err := f.connect(f.Options.getPrimary()) | ||
if err != nil { | ||
return fmt.Errorf("error connecting to primary db: %v", err) | ||
return nil, fmt.Errorf("error connecting to primary db: %v", err) | ||
} | ||
|
||
f.db = db | ||
f.conns = conns | ||
|
||
archiveConfig := f.Options.others[archiveNamespace] | ||
if archiveConfig.Enabled { | ||
archive, err := f.connect(archiveConfig) | ||
archive_conns, err := f.connect(archiveConfig) | ||
if err != nil { | ||
return fmt.Errorf("error connecting to archive db: %v", err) | ||
return nil, fmt.Errorf("error connecting to archive db: %v", err) | ||
} | ||
|
||
f.archive = archive | ||
f.archive_conns = archive_conns | ||
} | ||
|
||
err = patchGroupByParenInMV(db, f) | ||
init_conn := f.selectConn() | ||
|
||
err = patchGroupByParenInMV(init_conn, f) | ||
if err != nil { | ||
return err | ||
return nil, err | ||
} | ||
|
||
// drop schema migrations table if running in docker multi node cluster mode so that migrations are run on new nodes | ||
if f.Options.primary.DockerMultiNodeCluster { | ||
err = dropSchemaMigrationsTable(db, f) | ||
err = dropSchemaMigrationsTable(init_conn, f) | ||
if err != nil { | ||
return err | ||
return nil, err | ||
} | ||
} | ||
|
||
f.logger.Info("Running migrations from path: ", zap.Any("test", f.Options.primary.Migrations)) | ||
clickhouseUrl, err := buildClickhouseMigrateURL(f.Options.primary.Datasource, f.Options.primary.Cluster) | ||
clickhouseUrl, err := buildClickhouseMigrateURL(f.Options.primary.Datasources[0], f.Options.primary.Cluster) | ||
if err != nil { | ||
return fmt.Errorf("Failed to build Clickhouse migrate URL, error: %s", err) | ||
return nil, fmt.Errorf("failed to build Clickhouse migrate URL, error: %s", err) | ||
} | ||
m, err := migrate.New( | ||
"file://"+f.Options.primary.Migrations, | ||
clickhouseUrl) | ||
if err != nil { | ||
return fmt.Errorf("Clickhouse Migrate failed to run, error: %s", err) | ||
return nil, fmt.Errorf("clickhouse migrate failed to run, error: %s", err) | ||
} | ||
err = m.Up() | ||
f.logger.Info("Clickhouse Migrate finished", zap.Error(err)) | ||
return nil | ||
return init_conn, nil | ||
} | ||
|
||
func patchGroupByParenInMV(db clickhouse.Conn, f *Factory) error { | ||
|
@@ -232,7 +241,7 @@ func buildClickhouseMigrateURL(datasource string, cluster string) (string, error | |
} | ||
host := parsedURL.Host | ||
if host == "" { | ||
return "", fmt.Errorf("Unable to parse host") | ||
return "", fmt.Errorf("unable to parse host") | ||
|
||
} | ||
paramMap, err := url.ParseQuery(parsedURL.RawQuery) | ||
|
@@ -250,12 +259,12 @@ func buildClickhouseMigrateURL(datasource string, cluster string) (string, error | |
return clickhouseUrl, nil | ||
} | ||
|
||
func (f *Factory) connect(cfg *namespaceConfig) (clickhouse.Conn, error) { | ||
func (f *Factory) connect(cfg *namespaceConfig) ([]clickhouse.Conn, error) { | ||
if cfg.Encoding != EncodingJSON && cfg.Encoding != EncodingProto { | ||
return nil, fmt.Errorf("unknown encoding %q, supported: %q, %q", cfg.Encoding, EncodingJSON, EncodingProto) | ||
} | ||
|
||
return cfg.Connector(cfg) | ||
return cfg.Connectors(cfg) | ||
} | ||
|
||
// AddFlags implements plugin.Configurable | ||
|
@@ -273,7 +282,7 @@ func (f *Factory) CreateSpanWriter() (Writer, error) { | |
cfg := f.Options.getPrimary() | ||
return f.makeWriter(WriterOptions{ | ||
logger: f.logger, | ||
db: f.db, | ||
conns: f.conns, | ||
traceDatabase: cfg.TraceDatabase, | ||
spansTable: cfg.SpansTable, | ||
indexTable: cfg.IndexTable, | ||
|
@@ -286,13 +295,13 @@ func (f *Factory) CreateSpanWriter() (Writer, error) { | |
|
||
// CreateArchiveSpanWriter implements storage.ArchiveFactory | ||
func (f *Factory) CreateArchiveSpanWriter() (Writer, error) { | ||
if f.archive == nil { | ||
if f.archive_conns == nil { | ||
return nil, nil | ||
} | ||
cfg := f.Options.others[archiveNamespace] | ||
return f.makeWriter(WriterOptions{ | ||
logger: f.logger, | ||
db: f.archive, | ||
conns: f.archive_conns, | ||
traceDatabase: cfg.TraceDatabase, | ||
spansTable: cfg.SpansTable, | ||
indexTable: cfg.IndexTable, | ||
|
@@ -305,22 +314,22 @@ func (f *Factory) CreateArchiveSpanWriter() (Writer, error) { | |
|
||
// Close Implements io.Closer and closes the underlying storage | ||
func (f *Factory) Close() error { | ||
if f.db != nil { | ||
err := f.db.Close() | ||
if err != nil { | ||
return err | ||
if len(f.conns) > 0 { | ||
for _, conn := range f.conns { | ||
if err := conn.Close(); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
f.db = nil | ||
f.conns = nil | ||
} | ||
|
||
if f.archive != nil { | ||
err := f.archive.Close() | ||
if err != nil { | ||
return err | ||
if len(f.archive_conns) > 0 { | ||
for _, conn := range f.archive_conns { | ||
if err := conn.Close(); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
f.archive = nil | ||
f.archive_conns = nil | ||
} | ||
|
||
return nil | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
// Copyright 2020, OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package clickhousetracesexporter | ||
|
||
import ( | ||
"path/filepath" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/otelcol/otelcoltest" | ||
) | ||
|
||
func TestLoadConfig(t *testing.T) { | ||
factories, err := otelcoltest.NopFactories() | ||
require.NoError(t, err) | ||
|
||
factory := NewFactory() | ||
factories.Exporters[typeStr] = factory | ||
cfg, err := otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", "config.yaml"), factories) | ||
require.NoError(t, err) | ||
require.NotNil(t, cfg) | ||
|
||
assert.Equal(t, len(cfg.Exporters), 2) | ||
|
||
defaultCfg := factory.CreateDefaultConfig() | ||
defaultCfg.(*Config).Datasources = []string{"tcp://127.0.0.1:9000"} | ||
r0 := cfg.Exporters[component.NewID(typeStr)] | ||
assert.Equal(t, r0, defaultCfg) | ||
|
||
r1 := cfg.Exporters[component.NewIDWithName(typeStr, "full")].(*Config) | ||
assert.Equal(t, r1, &Config{ | ||
Datasources: []string{"tcp://clickhouse:9000/?database=signoz_traces"}, | ||
DockerMultiNodeCluster: true, | ||
LowCardinalExceptionGrouping: true, | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is, currently, not thread-safe. I'm not sure whether parallelism is utilized in
signoz-otel-collector
; should I include a mutex against the list of collectors or something like that?