Skip to content

Commit

Permalink
Merge pull request #76 from ksensehq/PMF-22-destination-reloading
Browse files Browse the repository at this point in the history
made destinations - reloadable
  • Loading branch information
Sergey Burykin authored Oct 8, 2020
2 parents acc1471 + ef7bb9c commit 49e2e9a
Show file tree
Hide file tree
Showing 42 changed files with 1,358 additions and 469 deletions.
24 changes: 12 additions & 12 deletions adapters/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,26 +55,26 @@ var (

//ClickHouseConfig dto for deserialized clickhouse config
type ClickHouseConfig struct {
Dsns []string `mapstructure:"dsns"`
Database string `mapstructure:"db"`
Tls map[string]string `mapstructure:"tls"`
Cluster string `mapstructure:"cluster"`
Engine *EngineConfig `mapstructure:"engine"`
Dsns []string `mapstructure:"dsns" json:"dsns,omitempty"`
Database string `mapstructure:"db" json:"db,omitempty"`
Tls map[string]string `mapstructure:"tls" json:"tls,omitempty"`
Cluster string `mapstructure:"cluster" json:"cluster,omitempty"`
Engine *EngineConfig `mapstructure:"engine" json:"engine,omitempty"`
}

//EngineConfig dto for deserialized clickhouse engine config
type EngineConfig struct {
RawStatement string `mapstructure:"raw_statement"`
NonNullFields []string `mapstructure:"non_null_fields"`
PartitionFields []FieldConfig `mapstructure:"partition_fields"`
OrderFields []FieldConfig `mapstructure:"order_fields"`
PrimaryKeys []string `mapstructure:"primary_keys"`
RawStatement string `mapstructure:"raw_statement" json:"raw_statement,omitempty"`
NonNullFields []string `mapstructure:"non_null_fields" json:"non_null_fields,omitempty"`
PartitionFields []FieldConfig `mapstructure:"partition_fields" json:"partition_fields,omitempty"`
OrderFields []FieldConfig `mapstructure:"order_fields" json:"order_fields,omitempty"`
PrimaryKeys []string `mapstructure:"primary_keys" json:"primary_keys,omitempty"`
}

//FieldConfig dto for deserialized clickhouse engine fields
type FieldConfig struct {
Function string `mapstructure:"function"`
Field string `mapstructure:"field"`
Function string `mapstructure:"function" json:"function,omitempty"`
Field string `mapstructure:"field" json:"field,omitempty"`
}

//Validate required fields in ClickHouseConfig
Expand Down
8 changes: 4 additions & 4 deletions adapters/google_cloud_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ type GoogleCloudStorage struct {
}

type GoogleConfig struct {
Bucket string `mapstructure:"gcs_bucket" json:"gcs_bucket"`
Project string `mapstructure:"bq_project" json:"bq_project"`
Dataset string `mapstructure:"bq_dataset" json:"bq_dataset"`
KeyFile interface{} `mapstructure:"key_file" json:"key_file"`
Bucket string `mapstructure:"gcs_bucket" json:"gcs_bucket,omitempty"`
Project string `mapstructure:"bq_project" json:"bq_project,omitempty"`
Dataset string `mapstructure:"bq_dataset" json:"bq_dataset,omitempty"`
KeyFile interface{} `mapstructure:"key_file" json:"key_file,omitempty"`

//will be set on validation
credentials option.ClientOption
Expand Down
14 changes: 7 additions & 7 deletions adapters/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ var (

//DataSourceConfig dto for deserialized datasource config (e.g. in Postgres or AwsRedshift destination)
type DataSourceConfig struct {
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
Db string `mapstructure:"db"`
Schema string `mapstructure:"schema"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Parameters map[string]string `mapstructure:"parameters"`
Host string `mapstructure:"host" json:"host,omitempty"`
Port int `mapstructure:"port" json:"port,omitempty"`
Db string `mapstructure:"db" json:"db,omitempty"`
Schema string `mapstructure:"schema" json:"schema,omitempty"`
Username string `mapstructure:"username" json:"username,omitempty"`
Password string `mapstructure:"password" json:"password,omitempty"`
Parameters map[string]string `mapstructure:"parameters" json:"parameters,omitempty"`
}

//Validate required fields in DataSourceConfig
Expand Down
12 changes: 6 additions & 6 deletions adapters/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ type S3 struct {
}

type S3Config struct {
AccessKeyID string `mapstructure:"access_key_id" json:"access_key_id"`
SecretKey string `mapstructure:"secret_access_key" json:"secret_access_key"`
Bucket string `mapstructure:"bucket"`
Region string `mapstructure:"region"`
Endpoint string `mapstructure:"endpoint"`
Folder string `mapstructure:"folder"`
AccessKeyID string `mapstructure:"access_key_id" json:"access_key_id,omitempty"`
SecretKey string `mapstructure:"secret_access_key" json:"secret_access_key,omitempty"`
Bucket string `mapstructure:"bucket" json:"bucket,omitempty"`
Region string `mapstructure:"region" json:"region,omitempty"`
Endpoint string `mapstructure:"endpoint" json:"endpoint,omitempty"`
Folder string `mapstructure:"folder" json:"folder,omitempty"`
}

func (s3c *S3Config) Validate() error {
Expand Down
18 changes: 9 additions & 9 deletions adapters/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ var (

//SnowflakeConfig dto for deserialized datasource config for Snowflake
type SnowflakeConfig struct {
Account string `mapstructure:"account"`
Port int `mapstructure:"port"`
Db string `mapstructure:"db"`
Schema string `mapstructure:"schema"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
Warehouse string `mapstructure:"warehouse"`
Stage string `mapstructure:"stage"`
Parameters map[string]*string `mapstructure:"parameters"`
Account string `mapstructure:"account" json:"account,omitempty"`
Port int `mapstructure:"port" json:"port,omitempty"`
Db string `mapstructure:"db" json:"db,omitempty"`
Schema string `mapstructure:"schema" json:"schema,omitempty"`
Username string `mapstructure:"username" json:"username,omitempty"`
Password string `mapstructure:"password" json:"password,omitempty"`
Warehouse string `mapstructure:"warehouse" json:"warehouse,omitempty"`
Stage string `mapstructure:"stage" json:"stage,omitempty"`
Parameters map[string]*string `mapstructure:"parameters" json:"parameters,omitempty"`
}

//Validate required fields in SnowflakeConfig
Expand Down
3 changes: 2 additions & 1 deletion appconfig/appconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ var Instance *AppConfig
func setDefaultParams() {
viper.SetDefault("server.port", "8001")
viper.SetDefault("server.static_files_dir", "./web")
viper.SetDefault("server.auth_reload_sec", 300)
viper.SetDefault("server.auth_reload_sec", 30)
viper.SetDefault("server.destinations_reload_sec", 40)
viper.SetDefault("geo.maxmind_path", "/home/eventnative/app/res/")
viper.SetDefault("log.path", "/home/eventnative/logs/events")
viper.SetDefault("log.show_in_server", false)
Expand Down
90 changes: 90 additions & 0 deletions authorization/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package authorization

import (
"encoding/json"
"fmt"
"github.com/ksensehq/eventnative/resources"
"strings"
)

type Token struct {
Id string `mapstructure:"id" json:"id,omitempty"`
ClientSecret string `mapstructure:"client_secret" json:"client_secret,omitempty"`
ServerSecret string `mapstructure:"server_secret" json:"server_secret,omitempty"`
Origins []string `mapstructure:"origins" json:"origins,omitempty"`
}

type TokensPayload struct {
Tokens []Token `json:"tokens,omitempty"`
}

type TokensHolder struct {
//origins by client token
clientTokensOrigins map[string][]string
//origins by server token
serverTokensOrigins map[string][]string

//all token ids
ids []string
//token by: client_secret/server_secret/id
all map[string]Token
}

func (th *TokensHolder) IsEmpty() bool {
return th == nil || len(th.ids) == 0
}

//parse tokens from json bytes
func parseFromBytes(b []byte) (*TokensHolder, error) {
payload := &TokensPayload{}
err := json.Unmarshal(b, payload)
if err != nil {
return nil, fmt.Errorf("Error unmarshalling tokens. Payload must be json with 'tokens' key: %v", err)
}

return reformat(payload.Tokens), nil
}

func fromStrings(clientSecrets []string) *TokensHolder {
var tokens []Token
for _, clientSecret := range clientSecrets {
tokens = append(tokens, Token{ClientSecret: clientSecret})
}
return reformat(tokens)
}

func reformat(tokens []Token) *TokensHolder {
clientTokensOrigins := map[string][]string{}
serverTokensOrigins := map[string][]string{}
all := map[string]Token{}
var ids []string

for _, tokenObj := range tokens {
if tokenObj.Id == "" {
//hash from client,server secret will be id
tokenObj.Id = resources.GetHash([]byte(tokenObj.ClientSecret + tokenObj.ServerSecret))
}

all[tokenObj.Id] = tokenObj
ids = append(ids, tokenObj.Id)

trimmedClientToken := strings.TrimSpace(tokenObj.ClientSecret)
if trimmedClientToken != "" {
clientTokensOrigins[trimmedClientToken] = tokenObj.Origins
all[trimmedClientToken] = tokenObj
}

trimmedServerToken := strings.TrimSpace(tokenObj.ServerSecret)
if trimmedServerToken != "" {
serverTokensOrigins[trimmedServerToken] = tokenObj.Origins
all[trimmedServerToken] = tokenObj
}
}

return &TokensHolder{
clientTokensOrigins: clientTokensOrigins,
serverTokensOrigins: serverTokensOrigins,
ids: ids,
all: all,
}
}
139 changes: 139 additions & 0 deletions authorization/parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package authorization

import (
"github.com/ksensehq/eventnative/test"
"github.com/stretchr/testify/require"
"testing"
)

func TestParseFromBytes(t *testing.T) {
tests := []struct {
name string
input []byte
expected *TokensHolder
expectedErr string
}{
{
"Empty input",
[]byte{},
nil,
"Error unmarshalling tokens. Payload must be json with 'tokens' key: unexpected end of JSON input",
},
{
"Wrong json keys format",
[]byte(`{"tokens":{}}}`),
nil,
"Error unmarshalling tokens. Payload must be json with 'tokens' key: invalid character '}' after top-level value",
},
{
"Empty json input",
[]byte(`{}`),
&TokensHolder{clientTokensOrigins: map[string][]string{}, serverTokensOrigins: map[string][]string{}, all: map[string]Token{}},
"",
},
{
"Wrong keys json input",
[]byte(`{"jsss":[], "apii": []}`),
&TokensHolder{clientTokensOrigins: map[string][]string{}, serverTokensOrigins: map[string][]string{}, all: map[string]Token{}},
"",
},

{
"Empty json tokens input",
[]byte(`{"tokens":[]}`),
&TokensHolder{clientTokensOrigins: map[string][]string{}, serverTokensOrigins: map[string][]string{}, all: map[string]Token{}},
"",
},
{
"ok",
[]byte(`{"tokens":[{"id":"id1","client_secret":"cl_secret1","server_secret":"sr_secret1","origins":["abc.com","rr.ru"]},{"client_secret":"cl_secret2"},{"server_secret":"sr_secret3"}]}`),
buildExpected(),
"",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actualTokenHolder, err := parseFromBytes(tt.input)
if tt.expectedErr != "" {
require.EqualError(t, err, tt.expectedErr, "Errors aren't equal")
} else {
require.NoError(t, err)
test.ObjectsEqual(t, tt.expected.ids, actualTokenHolder.ids, "Token holder ids aren't equal")
test.ObjectsEqual(t, tt.expected.all, actualTokenHolder.all, "Token holders all aren't equal")
test.ObjectsEqual(t, tt.expected.clientTokensOrigins, actualTokenHolder.clientTokensOrigins, "Token holders client tokens aren't equal")
test.ObjectsEqual(t, tt.expected.serverTokensOrigins, actualTokenHolder.serverTokensOrigins, "Token holders server tokens aren't equal")
}
})
}
}

func buildExpected() *TokensHolder {
token1 := Token{
Id: "id1",
ClientSecret: "cl_secret1",
ServerSecret: "sr_secret1",
Origins: []string{"abc.com", "rr.ru"},
}
token2 := Token{
Id: "31429624a471b9bdc6d60350c6cfc24d",
ClientSecret: "cl_secret2",
}
token3 := Token{
Id: "03f9ed11a0268dd78766686f8f292b7b",
ServerSecret: "sr_secret3",
}
return &TokensHolder{
clientTokensOrigins: map[string][]string{"cl_secret1": {"abc.com", "rr.ru"}, "cl_secret2": nil},
serverTokensOrigins: map[string][]string{"sr_secret1": {"abc.com", "rr.ru"}, "sr_secret3": nil},
ids: []string{"id1", "31429624a471b9bdc6d60350c6cfc24d", "03f9ed11a0268dd78766686f8f292b7b"},
all: map[string]Token{
"id1": token1,
"cl_secret1": token1,
"sr_secret1": token1,

"31429624a471b9bdc6d60350c6cfc24d": token2,
"cl_secret2": token2,

"03f9ed11a0268dd78766686f8f292b7b": token3,
"sr_secret3": token3,
},
}
}

func TestFromStrings(t *testing.T) {
tests := []struct {
name string
input []string
expected *TokensHolder
}{
{
"empty tokens",
[]string{},
&TokensHolder{clientTokensOrigins: map[string][]string{}, serverTokensOrigins: map[string][]string{}, all: map[string]Token{}},
},
{
"value is string slice",
[]string{"token1", "token2"},
&TokensHolder{
clientTokensOrigins: map[string][]string{"token1": nil, "token2": nil},
serverTokensOrigins: map[string][]string{},
all: map[string]Token{
"78b1e6d775cec5260001af137a79dbd5": {Id: "78b1e6d775cec5260001af137a79dbd5", ClientSecret: "token1"},
"token1": {Id: "78b1e6d775cec5260001af137a79dbd5", ClientSecret: "token1"},

"0e0530c1430da76495955eb06eb99d95": {Id: "0e0530c1430da76495955eb06eb99d95", ClientSecret: "token2"},
"token2": {Id: "0e0530c1430da76495955eb06eb99d95", ClientSecret: "token2"},
},
ids: []string{"78b1e6d775cec5260001af137a79dbd5", "0e0530c1430da76495955eb06eb99d95"},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := fromStrings(tt.input)
test.ObjectsEqual(t, tt.expected, actual, "Token holders aren't equal")
})
}
}
Loading

0 comments on commit 49e2e9a

Please sign in to comment.