Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ES type setting, add ES client logging, improve plugin URL err #1265

Merged
merged 1 commit into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 93 additions & 92 deletions helm/botkube/README.md

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions helm/botkube/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,14 @@ communications:
# -- If true, skips the verification of TLS certificate of the Elastic nodes.
# It's useful for clusters with self-signed certificates.
skipTLSVerify: false
# -- Specify the log level for Elasticsearch client. Leave empty to disable logging.
## Possible values: "info", "error", "trace".
## - "info": Logs information level messages.
## - "error": Logs information and error level messages.
## - "trace": Logs information, error, and trace level messages.
## To disable logging, simply leave the logLevel empty or remove the line.
logLevel: ""

# -- Map of configured indices. The `indices` property name is an alias for a given configuration.
#
## Format: indices.{alias}
Expand Down
2 changes: 1 addition & 1 deletion internal/plugin/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (m *Manager) loadRepositoriesMetadata(ctx context.Context, forceUpdate bool

err := m.fetchIndex(ctx, path, entry.URL)
if err != nil {
return fmt.Errorf("while fetching index for %q repository: %w", repo, err)
return fmt.Errorf("while fetching index for %q repository with URL %q: %w", repo, entry.URL, err)
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ type Elasticsearch struct {
SkipTLSVerify bool `yaml:"skipTLSVerify"`
AWSSigning AWSSigning `yaml:"awsSigning"`
Indices map[string]ELSIndex `yaml:"indices" validate:"required_if=Enabled true,dive,omitempty,min=1"`
LogLevel string `yaml:"logLevel"`
}

// AWSSigning contains AWS configurations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ communications:
bindings:
sources:
- k8s-events
logLevel: ""
analytics:
disable: true
settings:
Expand Down
37 changes: 22 additions & 15 deletions pkg/sink/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,25 @@ type Elasticsearch struct {
func NewElasticsearch(log logrus.FieldLogger, c config.Elasticsearch, reporter AnalyticsReporter) (*Elasticsearch, error) {
var elsClient *elastic.Client
var err error
var creds *credentials.Credentials

var elsOpts []elastic.ClientOptionFunc
switch c.LogLevel {
case "info":
elsOpts = append(elsOpts, elastic.SetInfoLog(log))
case "error":
elsOpts = append(elsOpts, elastic.SetInfoLog(log), elastic.SetErrorLog(log))
case "trace":
elsOpts = append(elsOpts, elastic.SetInfoLog(log), elastic.SetErrorLog(log), elastic.SetTraceLog(log))
}

if c.AWSSigning.Enabled {
// Get credentials from environment variables and create the AWS Signature Version 4 signer
sess := session.Must(session.NewSession())

// Use OIDC token to generate credentials if using IAM to Service Account
awsRoleARN := os.Getenv(awsRoleARNEnvName)
awsWebIdentityTokenFile := os.Getenv(awsWebIDTokenFileEnvName)
var creds *credentials.Credentials
if awsRoleARN != "" && awsWebIdentityTokenFile != "" {
svc := sts.New(sess)
p := stscreds.NewWebIdentityRoleProviderWithOptions(svc, awsRoleARN, "", stscreds.FetchTokenPath(awsWebIdentityTokenFile))
Expand All @@ -78,41 +89,37 @@ func NewElasticsearch(log logrus.FieldLogger, c config.Elasticsearch, reporter A
if err != nil {
return nil, fmt.Errorf("while creating new AWS Signing client: %w", err)
}
elsClient, err = elastic.NewClient(
elsOpts = append(elsOpts,
elastic.SetURL(c.Server),
elastic.SetScheme("https"),
elastic.SetHttpClient(awsClient),
elastic.SetSniff(false),
elastic.SetHealthcheck(false),
elastic.SetGzip(false),
)
if err != nil {
return nil, fmt.Errorf("while creating new Elastic client: %w", err)
}
} else {
elsClientParams := []elastic.ClientOptionFunc{
elsOpts = append(elsOpts,
elastic.SetURL(c.Server),
elastic.SetBasicAuth(c.Username, c.Password),
elastic.SetSniff(false),
elastic.SetHealthcheck(false),
elastic.SetGzip(true),
}
)

if c.SkipTLSVerify {
tr := &http.Transport{
// #nosec G402
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
httpClient := &http.Client{Transport: tr}
elsClientParams = append(elsClientParams, elastic.SetHttpClient(httpClient))
}
// create elasticsearch client
elsClient, err = elastic.NewClient(elsClientParams...)
if err != nil {
return nil, fmt.Errorf("while creating new Elastic client: %w", err)
elsOpts = append(elsOpts, elastic.SetHttpClient(httpClient))
}
}

elsClient, err = elastic.NewClient(elsOpts...)
if err != nil {
return nil, fmt.Errorf("while creating new Elastic client: %w", err)
}
pong, _, err := elsClient.Ping(c.Server).Do(context.Background())
if err != nil {
return nil, fmt.Errorf("while pinging cluster: %w", err)
Expand Down Expand Up @@ -177,10 +184,10 @@ func (e *Elasticsearch) flushIndex(ctx context.Context, indexCfg config.ELSIndex
if err != nil {
return fmt.Errorf("while getting cluster major version: %w", err)
}
if majorVersion <= 7 {
if majorVersion <= 7 && indexCfg.Type != "" {
// Only Elasticsearch <= 7.x supports Type parameter
// nolint:staticcheck
indexService.Type(e.clusterVersion)
indexService.Type(indexCfg.Type)
}
_, err = indexService.BodyJson(event).Do(ctx)
if err != nil {
Expand Down
Loading