From fe63a8433f29f60290006d786c1593ac8e641b5c Mon Sep 17 00:00:00 2001 From: Sergey Burykin Date: Fri, 2 Oct 2020 16:31:01 +0300 Subject: [PATCH 1/8] made destinations - reloadable --- adapters/clickhouse.go | 24 +- adapters/google_cloud_storage.go | 8 +- adapters/postgres.go | 14 +- adapters/s3.go | 12 +- adapters/snowflake.go | 18 +- appconfig/appconfig.go | 3 +- authorization/parser.go | 92 ++++++ authorization/parser_test.go | 80 +++++ authorization/service.go | 127 +++----- authorization/watcher.go | 48 --- config/config.template.yaml | 19 +- destinations/collections.go | 45 +++ destinations/parser.go | 28 ++ destinations/service.go | 295 ++++++++++++++++++ destinations/service_test.go | 199 ++++++++++++ destinations/status.go | 9 + destinations/unit.go | 29 ++ ...2s_preprocessor.go => api_preprocessor.go} | 16 +- ...essor_test.go => api_preprocessor_test.go} | 34 +- events/destination_service.go | 37 --- ...c2s_preprocessor.go => js_preprocessor.go} | 14 +- ...cessor_test.go => js_preprocessor_test.go} | 4 +- go.mod | 1 + handlers/event.go | 5 +- logfiles/uploader.go | 14 +- logging/writer.go | 23 +- main.go | 38 ++- main_test.go | 21 +- middleware/token_auth.go | 2 +- resources/watcher.go | 67 ++++ storages/factory.go | 228 +++++--------- ..._event_input.json => api_event_input.json} | 0 ..._fact_output.json => api_fact_output.json} | 2 +- 33 files changed, 1121 insertions(+), 435 deletions(-) create mode 100644 authorization/parser.go create mode 100644 authorization/parser_test.go delete mode 100644 authorization/watcher.go create mode 100644 destinations/collections.go create mode 100644 destinations/parser.go create mode 100644 destinations/service.go create mode 100644 destinations/service_test.go create mode 100644 destinations/status.go create mode 100644 destinations/unit.go rename events/{s2s_preprocessor.go => api_preprocessor.go} (76%) rename events/{s2s_preprocessor_test.go => api_preprocessor_test.go} (89%) delete mode 100644 events/destination_service.go rename events/{c2s_preprocessor.go => js_preprocessor.go} (81%) rename events/{c2s_preprocessor_test.go => js_preprocessor_test.go} (96%) create mode 100644 resources/watcher.go rename test_data/{s2s_event_input.json => api_event_input.json} (100%) rename test_data/{s2s_fact_output.json => api_fact_output.json} (96%) diff --git a/adapters/clickhouse.go b/adapters/clickhouse.go index b9baadebb..d9766ead3 100644 --- a/adapters/clickhouse.go +++ b/adapters/clickhouse.go @@ -55,26 +55,26 @@ var ( //ClickHouseConfig dto for deserialized clickhouse config type ClickHouseConfig struct { - Dsns []string `mapstructure:"dsns"` - Database string `mapstructure:"db"` - Tls map[string]string `mapstructure:"tls"` - Cluster string `mapstructure:"cluster"` - Engine *EngineConfig `mapstructure:"engine"` + Dsns []string `mapstructure:"dsns" json:"dsns,omitempty"` + Database string `mapstructure:"db" json:"db,omitempty"` + Tls map[string]string `mapstructure:"tls" json:"tls,omitempty"` + Cluster string `mapstructure:"cluster" json:"cluster,omitempty"` + Engine *EngineConfig `mapstructure:"engine" json:"engine,omitempty"` } //EngineConfig dto for deserialized clickhouse engine config type EngineConfig struct { - RawStatement string `mapstructure:"raw_statement"` - NonNullFields []string `mapstructure:"non_null_fields"` - PartitionFields []FieldConfig `mapstructure:"partition_fields"` - OrderFields []FieldConfig `mapstructure:"order_fields"` - PrimaryKeys []string `mapstructure:"primary_keys"` + RawStatement string `mapstructure:"raw_statement" json:"raw_statement,omitempty"` + NonNullFields []string `mapstructure:"non_null_fields" json:"non_null_fields,omitempty"` + PartitionFields []FieldConfig `mapstructure:"partition_fields" json:"partition_fields,omitempty"` + OrderFields []FieldConfig `mapstructure:"order_fields" json:"order_fields,omitempty"` + PrimaryKeys []string `mapstructure:"primary_keys" json:"primary_keys,omitempty"` } //FieldConfig dto for deserialized clickhouse engine fields type FieldConfig struct { - Function string `mapstructure:"function"` - Field string `mapstructure:"field"` + Function string `mapstructure:"function" json:"function,omitempty"` + Field string `mapstructure:"field" json:"field,omitempty"` } //Validate required fields in ClickHouseConfig diff --git a/adapters/google_cloud_storage.go b/adapters/google_cloud_storage.go index 5dc5f0caa..c926cc19f 100644 --- a/adapters/google_cloud_storage.go +++ b/adapters/google_cloud_storage.go @@ -19,10 +19,10 @@ type GoogleCloudStorage struct { } type GoogleConfig struct { - Bucket string `mapstructure:"gcs_bucket"` - Project string `mapstructure:"bq_project"` - Dataset string `mapstructure:"bq_dataset"` - KeyFile interface{} `mapstructure:"key_file"` + Bucket string `mapstructure:"gcs_bucket" json:"gcs_bucket,omitempty"` + Project string `mapstructure:"bq_project" json:"bq_project,omitempty"` + Dataset string `mapstructure:"bq_dataset" json:"bq_dataset,omitempty"` + KeyFile interface{} `mapstructure:"key_file" json:"key_file,omitempty"` //will be set on validation credentials option.ClientOption diff --git a/adapters/postgres.go b/adapters/postgres.go index 48af3f707..26b87e7b8 100644 --- a/adapters/postgres.go +++ b/adapters/postgres.go @@ -54,13 +54,13 @@ var ( //DataSourceConfig dto for deserialized datasource config (e.g. in Postgres or AwsRedshift destination) type DataSourceConfig struct { - Host string `mapstructure:"host"` - Port int `mapstructure:"port"` - Db string `mapstructure:"db"` - Schema string `mapstructure:"schema"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - Parameters map[string]string `mapstructure:"parameters"` + Host string `mapstructure:"host" json:"host,omitempty"` + Port int `mapstructure:"port" json:"port,omitempty"` + Db string `mapstructure:"db" json:"db,omitempty"` + Schema string `mapstructure:"schema" json:"schema,omitempty"` + Username string `mapstructure:"username" json:"username,omitempty"` + Password string `mapstructure:"password" json:"password,omitempty"` + Parameters map[string]string `mapstructure:"parameters" json:"parameters,omitempty"` } //Validate required fields in DataSourceConfig diff --git a/adapters/s3.go b/adapters/s3.go index c861b76dd..b07ada877 100644 --- a/adapters/s3.go +++ b/adapters/s3.go @@ -18,12 +18,12 @@ type S3 struct { } type S3Config struct { - AccessKeyID string `mapstructure:"access_key_id"` - SecretKey string `mapstructure:"secret_access_key"` - Bucket string `mapstructure:"bucket"` - Region string `mapstructure:"region"` - Endpoint string `mapstructure:"endpoint"` - Folder string `mapstructure:"folder"` + AccessKeyID string `mapstructure:"access_key_id" json:"access_key_id,omitempty"` + SecretKey string `mapstructure:"secret_access_key" json:"secret_access_key,omitempty"` + Bucket string `mapstructure:"bucket" json:"bucket,omitempty"` + Region string `mapstructure:"region" json:"region,omitempty"` + Endpoint string `mapstructure:"endpoint" json:"endpoint,omitempty"` + Folder string `mapstructure:"folder" json:"folder,omitempty"` } func (s3c *S3Config) Validate() error { diff --git a/adapters/snowflake.go b/adapters/snowflake.go index 54f22fce3..1cca3543f 100644 --- a/adapters/snowflake.go +++ b/adapters/snowflake.go @@ -42,15 +42,15 @@ var ( //SnowflakeConfig dto for deserialized datasource config for Snowflake type SnowflakeConfig struct { - Account string `mapstructure:"account"` - Port int `mapstructure:"port"` - Db string `mapstructure:"db"` - Schema string `mapstructure:"schema"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - Warehouse string `mapstructure:"warehouse"` - Stage string `mapstructure:"stage"` - Parameters map[string]*string `mapstructure:"parameters"` + Account string `mapstructure:"account" json:"account,omitempty"` + Port int `mapstructure:"port" json:"port,omitempty"` + Db string `mapstructure:"db" json:"db,omitempty"` + Schema string `mapstructure:"schema" json:"schema,omitempty"` + Username string `mapstructure:"username" json:"username,omitempty"` + Password string `mapstructure:"password" json:"password,omitempty"` + Warehouse string `mapstructure:"warehouse" json:"warehouse,omitempty"` + Stage string `mapstructure:"stage" json:"stage,omitempty"` + Parameters map[string]*string `mapstructure:"parameters" json:"parameters,omitempty"` } //Validate required fields in SnowflakeConfig diff --git a/appconfig/appconfig.go b/appconfig/appconfig.go index c2eb7fb49..b57e29620 100644 --- a/appconfig/appconfig.go +++ b/appconfig/appconfig.go @@ -26,7 +26,8 @@ var Instance *AppConfig func setDefaultParams() { viper.SetDefault("server.port", "8001") viper.SetDefault("server.static_files_dir", "./web") - viper.SetDefault("server.auth_reload_sec", 300) + viper.SetDefault("server.auth_reload_sec", 100) + viper.SetDefault("server.destinations_reload_sec", 120) viper.SetDefault("geo.maxmind_path", "/home/eventnative/app/res/") viper.SetDefault("log.path", "/home/eventnative/logs/events") viper.SetDefault("log.show_in_server", false) diff --git a/authorization/parser.go b/authorization/parser.go new file mode 100644 index 000000000..aea6956ce --- /dev/null +++ b/authorization/parser.go @@ -0,0 +1,92 @@ +package authorization + +import ( + "encoding/json" + "errors" + "fmt" + "reflect" + "strings" +) + +type TokensPayload struct { + Js []interface{} `json:"js,omitempty"` + Api []interface{} `json:"api,omitempty"` +} + +//parse tokens from formats: +//{"js": value, "api": value} where value might be strings array or json objects array with object format: +//{"token":"123", "origins":["origin1", "origin2"]} +func parseFromBytes(b []byte) (map[string][]string, map[string][]string, error) { + payload := &TokensPayload{} + err := json.Unmarshal(b, payload) + if err != nil { + return nil, nil, fmt.Errorf("Error unmarshalling tokens. Payload must be json with 'js' and 'api' keys of json array or string array formats: %v", err) + } + + jsTokens, err := reformatObj(payload.Js) + if err != nil { + return nil, nil, err + } + + apiTokens, err := reformatObj(payload.Api) + if err != nil { + return nil, nil, err + } + + return jsTokens, apiTokens, nil +} + +func reformat(tokensArr []string) map[string][]string { + tokensOrigins := map[string][]string{} + for _, t := range tokensArr { + trimmed := strings.TrimSpace(t) + if trimmed != "" { + tokensOrigins[trimmed] = []string{} + } + } + + return tokensOrigins +} + +func reformatObj(tokensArr []interface{}) (map[string][]string, error) { + tokensOrigins := map[string][]string{} + for _, t := range tokensArr { + switch t.(type) { + case string: + token := t.(string) + trimmed := strings.TrimSpace(token) + if trimmed != "" { + tokensOrigins[trimmed] = []string{} + } + case map[string]interface{}: + tokenObj := t.(map[string]interface{}) + token, ok := tokenObj["token"] + if !ok { + return nil, errors.New("Unknown authorization token format: each object must contain token field") + } + + var origins []string + trimmed := strings.TrimSpace(token.(string)) + if trimmed != "" { + originsObj, ok := tokenObj["origins"] + if ok { + originsArr, ok := originsObj.([]interface{}) + if !ok { + return nil, errors.New("Unknown authorization origins format: origins must be array of strings") + } + + for _, originI := range originsArr { + origins = append(origins, originI.(string)) + } + } + + tokensOrigins[trimmed] = origins + } + default: + return nil, errors.New("Unknown authorization token format type: " + reflect.TypeOf(t).Name()) + } + + } + + return tokensOrigins, nil +} diff --git a/authorization/parser_test.go b/authorization/parser_test.go new file mode 100644 index 000000000..c6dfde0d4 --- /dev/null +++ b/authorization/parser_test.go @@ -0,0 +1,80 @@ +package authorization + +import ( + "github.com/ksensehq/eventnative/test" + "github.com/stretchr/testify/require" + "testing" +) + +func TestParseFromBytes(t *testing.T) { + tests := []struct { + name string + input []byte + expectedJs map[string][]string + expectedApi map[string][]string + expectedErr string + }{ + { + "Empty input", + []byte{}, + nil, + nil, + "Error unmarshalling tokens. Payload must be json with 'js' and 'api' keys of json array or string array formats: unexpected end of JSON input", + }, + { + "Empty json input", + []byte(`{}`), + map[string][]string{}, + map[string][]string{}, + "", + }, + { + "Empty json keys input", + []byte(`{"js":[], "api":[]}`), + map[string][]string{}, + map[string][]string{}, + "", + }, + { + "Wrong keys json input", + []byte(`{"jsss":[], apii: []}`), + nil, + nil, + "Error unmarshalling tokens. Payload must be json with 'js' and 'api' keys of json array or string array formats: invalid character 'a' looking for beginning of object key string", + }, + { + "Wrong json keys format", + []byte(`{"js":{}, "api":{}}`), + nil, + nil, + "Error unmarshalling tokens. Payload must be json with 'js' and 'api' keys of json array or string array formats: json: cannot unmarshal object into Go struct field TokensPayload.js of type []interface {}", + }, + { + "js strings and api objects", + []byte(`{"js":["js1", "js2"], "api":[{"token":"api1", "origins":["origin1"]}]}`), + map[string][]string{"js1": {}, "js2": {}}, + map[string][]string{"api1": {"origin1"}}, + "", + }, + { + "js objects and api strings", + []byte(`{"api":["api1", "api2"], "js":[{"token":"js1", "origins":["origin1"]}]}`), + map[string][]string{"js1": {"origin1"}}, + map[string][]string{"api1": {}, "api2": {}}, + "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualJs, actualApi, err := parseFromBytes(tt.input) + if tt.expectedErr != "" { + require.EqualError(t, err, tt.expectedErr, "Errors aren't equal") + } else { + require.NoError(t, err) + test.ObjectsEqual(t, tt.expectedJs, actualJs, "Js tokens and expected tokens aren't equal") + test.ObjectsEqual(t, tt.expectedApi, actualApi, "Api tokens and expected tokens aren't equal") + } + }) + } +} diff --git a/authorization/service.go b/authorization/service.go index c5b31f5d0..29074e37e 100644 --- a/authorization/service.go +++ b/authorization/service.go @@ -1,65 +1,64 @@ package authorization import ( - "encoding/json" "errors" - "fmt" "github.com/google/uuid" "github.com/ksensehq/eventnative/logging" "github.com/ksensehq/eventnative/resources" "github.com/spf13/viper" "strings" "sync" + "time" ) -type Token struct { - Value string `json:"token,omitempty"` - Origins []string `json:"origins,omitempty"` -} +const serviceName = "authorization" type Service struct { sync.RWMutex - c2STokens map[string][]string - s2STokens map[string][]string + jsTokens map[string][]string + apiTokens map[string][]string } func NewService() (*Service, error) { - service := &Service{} + service := &Service{jsTokens: map[string][]string{}, apiTokens: map[string][]string{}} reloadSec := viper.GetInt("server.auth_reload_sec") if reloadSec == 0 { return nil, errors.New("server.auth_reload_sec can't be empty") } - c2sTokens, err := load("server.auth", service.updateC2STokens, reloadSec) - if err != nil { - return nil, fmt.Errorf("Error loading server.auth tokens: %v", err) - } - - s2sTokens, err := load("server.s2s_auth", service.updateS2STokens, reloadSec) - if err != nil { - return nil, fmt.Errorf("Error loading server.s2s_auth tokens: %v", err) + authViper := viper.Sub("server.auth") + if authViper != nil { + service.jsTokens = reformat(authViper.GetStringSlice("js")) + service.apiTokens = reformat(authViper.GetStringSlice("api")) + } else { + authSource := viper.GetString("server.auth") + + if strings.HasPrefix(authSource, "http://") || strings.HasPrefix(authSource, "https://") { + resources.Watch(serviceName, authSource, resources.LoadFromHttp, service.updateTokens, time.Duration(reloadSec)*time.Second) + } else if strings.Contains(authSource, "file://") { + resources.Watch(serviceName, strings.Replace(authSource, "file://", "", 1), resources.LoadFromFile, service.updateTokens, time.Duration(reloadSec)*time.Second) + } else if strings.HasPrefix(authSource, "{") && strings.HasSuffix(authSource, "}") { + service.updateTokens([]byte(authSource)) + } } - if len(c2sTokens) == 0 && len(s2sTokens) == 0 { + if len(service.jsTokens) == 0 && len(service.apiTokens) == 0 { //autogenerated generatedToken := uuid.New().String() - c2sTokens[generatedToken] = []string{} - s2sTokens[generatedToken] = []string{} - logging.Warn("Empty 'server.tokens' config key. Auto generate token:", generatedToken) + service.jsTokens[generatedToken] = []string{} + service.apiTokens[generatedToken] = []string{} + logging.Warn("Empty 'server.auth.js' and 'server.auth.api' config keys. Auto generate token:", generatedToken) } - service.c2STokens = c2sTokens - service.s2STokens = s2sTokens - return service, nil } -func (s *Service) GetC2SOrigins(token string) ([]string, bool) { +func (s *Service) GetJsOrigins(token string) ([]string, bool) { s.RLock() defer s.RUnlock() - origins, ok := s.c2STokens[token] + origins, ok := s.jsTokens[token] return origins, ok } @@ -68,83 +67,31 @@ func (s *Service) GetAllTokens() map[string]bool { defer s.RUnlock() result := map[string]bool{} - for k := range s.c2STokens { + for k := range s.jsTokens { result[k] = true } - for k := range s.s2STokens { + for k := range s.apiTokens { result[k] = true } return result } -func (s *Service) GetS2SOrigins(token string) ([]string, bool) { +func (s *Service) GetApiOrigins(token string) ([]string, bool) { s.RLock() defer s.RUnlock() - origins, ok := s.s2STokens[token] + origins, ok := s.apiTokens[token] return origins, ok } -func (s *Service) updateC2STokens(c2sTokens map[string][]string) { - s.Lock() - s.c2STokens = c2sTokens - s.Unlock() -} - -func (s *Service) updateS2STokens(s2sTokens map[string][]string) { - s.Lock() - s.s2STokens = s2sTokens - s.Unlock() -} - -func load(viperKey string, updateFunc func(map[string][]string), reloadSec int) (map[string][]string, error) { - authSource := viper.GetString(viperKey) - if strings.HasPrefix(authSource, "http://") || strings.HasPrefix(authSource, "https://") { - return Watcher(authSource, resources.LoadFromHttp, updateFunc, reloadSec) - } else if strings.Contains(authSource, "file://") { - return Watcher(strings.Replace(authSource, "file://", "", 1), resources.LoadFromFile, updateFunc, reloadSec) - } else if strings.HasPrefix(authSource, "[") && strings.HasSuffix(authSource, "]") { - return parseFromBytes("app config json array", []byte(authSource)) - } else { - return loadFromConfig(viperKey) - } -} - -func loadFromConfig(viperKey string) (map[string][]string, error) { - tokensArr := viper.GetStringSlice(viperKey) - tokensOrigins := map[string][]string{} - for _, t := range tokensArr { - trimmed := strings.TrimSpace(t) - if trimmed != "" { - tokensOrigins[trimmed] = []string{} - } - } - - return tokensOrigins, nil -} - -func parseFromBytes(source string, b []byte) (map[string][]string, error) { - var tokens []Token - err := json.Unmarshal(b, &tokens) +func (s *Service) updateTokens(payload []byte) { + js, api, err := parseFromBytes(payload) if err != nil { - //try to unmarshal into []string - var strTokens []string - err := json.Unmarshal(b, &strTokens) - if err != nil { - return nil, fmt.Errorf("Error unmarshalling tokens from %s. Payload must be json array or string array: %v", source, err) - } - for _, t := range strTokens { - tokens = append(tokens, Token{Value: t}) - } - } - - tokensOrigins := map[string][]string{} - for _, tokenObj := range tokens { - trimmed := strings.TrimSpace(tokenObj.Value) - if trimmed != "" { - tokensOrigins[trimmed] = tokenObj.Origins - } + logging.Errorf("Error updating authorization tokens: %v", err) + } else { + s.Lock() + s.jsTokens = js + s.apiTokens = api + s.Unlock() } - - return tokensOrigins, nil } diff --git a/authorization/watcher.go b/authorization/watcher.go deleted file mode 100644 index 3c7563162..000000000 --- a/authorization/watcher.go +++ /dev/null @@ -1,48 +0,0 @@ -package authorization - -import ( - "crypto/md5" - "fmt" - "github.com/ksensehq/eventnative/appstatus" - "github.com/ksensehq/eventnative/logging" - "time" -) - -func Watcher(source string, loadFunc func(string) ([]byte, error), updateFunc func(map[string][]string), reloadSec int) (map[string][]string, error) { - payload, err := loadFunc(source) - if err != nil { - return nil, err - } - - go watch(source, payload, loadFunc, updateFunc, reloadSec) - return parseFromBytes(source, payload) -} - -func watch(source string, firstTimePayload []byte, loadFunc func(string) ([]byte, error), updateFunc func(map[string][]string), reloadSec int) { - hash := fmt.Sprintf("%x", md5.Sum(firstTimePayload)) - for { - if appstatus.Instance.Idle { - break - } - - time.Sleep(time.Duration(reloadSec) * time.Second) - actualPayload, err := loadFunc(source) - if err != nil { - logging.Errorf("Error reloading %s: %v", source, err) - continue - } - - newHash := fmt.Sprintf("%x", md5.Sum(actualPayload)) - if hash != newHash { - result, err := parseFromBytes(source, actualPayload) - if err != nil { - logging.Errorf("Error parsing reloaded %s: %v", source, err) - continue - } - - updateFunc(result) - logging.Infof("New resource from %s was loaded", source) - hash = newHash - } - } -} diff --git a/config/config.template.yaml b/config/config.template.yaml index 2e84ed266..a153d0f83 100644 --- a/config/config.template.yaml +++ b/config/config.template.yaml @@ -4,17 +4,21 @@ server: port: 8001 name: event-us-01 #This parameter is required in cluster deployments. If not set - will be taken from os.Hostname() + #might be http url of file source + #auth: https://source_of_tokens auth: - - bd33c5fa-d69f-11ea-87d0-0242ac130003 - - c20765a0-d69f-15ea-82d0-0242ac130003 - s2s_auth: - - 5f15eba2-db58-11ea-87d0-0242ac130003 - - 62faa226-db58-11ea-87d0-0242ac130003 - auth_reload_sec: 60 #default value is 300. If auth or s2s_auth is http or file:/// source than it will be reloaded every auth_reload_sec + js: + - bd33c5fa-d69f-11ea-87d0-0242ac130003 + - c20765a0-d69f-15ea-82d0-0242ac130003 + api: + - 5f15eba2-db58-11ea-87d0-0242ac130003 + - 62faa226-db58-11ea-87d0-0242ac130003 + auth_reload_sec: 60 #default value is 100. If 'auth' is http or file:/// source than it will be reloaded every auth_reload_sec public_url: https://yourhost log: path: /home/eventnative/logs/ #omit this key to write log to stdout rotation_min: 60 #1440 (24 hours) default value + destinations_reload_sec: 60 #default value is 120. If 'destinations' is http or file:/// source than it will be reloaded every destinations_reload_sec geo.maxmind_path: https://statichost/GeoIP2-City.mmdb @@ -22,6 +26,8 @@ log: path: /home/eventnative/logs/events rotation_min: 5 +#might be http url or file source +#destinations: https://source_of_destinations destinations: redshift_one: type: redshift @@ -38,6 +44,7 @@ destinations: secret_access_key: secretabc123 bucket: my-bucket region: us-west-1 + folder: redshift_one #Optional. Specify this parameter if several destinations use one s3 bucket data_layout: mapping: - "/key1/key2 -> /key3" diff --git a/destinations/collections.go b/destinations/collections.go new file mode 100644 index 000000000..dcf6ddfae --- /dev/null +++ b/destinations/collections.go @@ -0,0 +1,45 @@ +package destinations + +import "github.com/ksensehq/eventnative/events" + +//map["token"]map["destination_name"]interface +//because 1 token = ∞ storages +type TokenizedStorages map[string]map[string]events.StorageProxy + +//map["token"]map["token | destination_name"]interface +//because 1 token = 1logger but ∞ event.queue +type TokenizedConsumers map[string]map[string]events.Consumer + +func (ts TokenizedStorages) Add(token, name string, proxy events.StorageProxy) { + storageProxies, ok := ts[token] + if !ok { + storageProxies = map[string]events.StorageProxy{} + ts[token] = storageProxies + } + storageProxies[name] = proxy +} + +func (ts TokenizedStorages) AddAll(other TokenizedStorages) { + for token, storages := range other { + for name, storage := range storages { + ts.Add(token, name, storage) + } + } +} + +func (tc TokenizedConsumers) Add(token, name string, proxy events.Consumer) { + consumers, ok := tc[token] + if !ok { + consumers = map[string]events.Consumer{} + tc[token] = consumers + } + consumers[name] = proxy +} + +func (tc TokenizedConsumers) AddAll(other TokenizedConsumers) { + for token, consumers := range other { + for name, consumer := range consumers { + tc.Add(token, name, consumer) + } + } +} diff --git a/destinations/parser.go b/destinations/parser.go new file mode 100644 index 000000000..a32f57f02 --- /dev/null +++ b/destinations/parser.go @@ -0,0 +1,28 @@ +package destinations + +import ( + "encoding/json" + "github.com/google/martian/log" + "github.com/ksensehq/eventnative/resources" + "github.com/ksensehq/eventnative/storages" +) + +func parseFromBytes(b []byte) (map[string]storages.DestinationConfig, error) { + payload := map[string]storages.DestinationConfig{} + err := json.Unmarshal(b, &payload) + if err != nil { + return nil, err + } + + return payload, nil +} + +func getHash(name string, destination storages.DestinationConfig) string { + b, err := json.Marshal(destination) + if err != nil { + log.Errorf("Error getting hash(marshalling) from [%s] destination: %v", name, err) + return "" + } + + return resources.GetHash(b) +} diff --git a/destinations/service.go b/destinations/service.go new file mode 100644 index 000000000..13603a569 --- /dev/null +++ b/destinations/service.go @@ -0,0 +1,295 @@ +package destinations + +import ( + "context" + "errors" + "fmt" + "github.com/hashicorp/go-multierror" + "github.com/ksensehq/eventnative/appconfig" + "github.com/ksensehq/eventnative/events" + "github.com/ksensehq/eventnative/logging" + "github.com/ksensehq/eventnative/resources" + "github.com/ksensehq/eventnative/storages" + "github.com/spf13/viper" + "strings" + "sync" + "time" +) + +const serviceName = "destinations" +const marshallingErrorMsg = `Error initializing destinations: wrong config format: each destination must contains one key and config as a value(see https://docs.eventnative.dev/configuration) e.g. +destinations: + custom_name: + type: redshift + ... +` + +//LoggerUsage is used for counting when logger isn't used +type LoggerUsage struct { + logger events.Consumer + usage int +} + +//Service is reloadable service of events destinations per token +type Service struct { + storageFactoryMethod func(ctx context.Context, name, logEventPath string, destination storages.DestinationConfig, monitorKeeper storages.MonitorKeeper) (events.StorageProxy, *events.PersistentQueue, error) + ctx context.Context + logEventPath string + monitorKeeper storages.MonitorKeeper + + unitsByName map[string]*Unit + loggersUsageByToken map[string]*LoggerUsage + + sync.RWMutex + consumersByToken TokenizedConsumers + storagesByToken TokenizedStorages +} + +//only for tests +func NewTestService(consumersByToken TokenizedConsumers, storagesByToken TokenizedStorages) *Service { + return &Service{ + consumersByToken: consumersByToken, + storagesByToken: storagesByToken, + } +} + +//NewService return loaded Service instance and call resources.Watcher() if destinations source is http url or file path +func NewService(ctx context.Context, destinations *viper.Viper, destinationsSource, logEventPath string, monitorKeeper storages.MonitorKeeper, + storageFactoryMethod func(ctx context.Context, name, logEventPath string, destination storages.DestinationConfig, monitorKeeper storages.MonitorKeeper) (events.StorageProxy, *events.PersistentQueue, error)) (*Service, error) { + service := &Service{ + storageFactoryMethod: storageFactoryMethod, + ctx: ctx, + logEventPath: logEventPath, + monitorKeeper: monitorKeeper, + + unitsByName: map[string]*Unit{}, + loggersUsageByToken: map[string]*LoggerUsage{}, + + consumersByToken: map[string]map[string]events.Consumer{}, + storagesByToken: map[string]map[string]events.StorageProxy{}, + } + + reloadSec := viper.GetInt("server.destinations_reload_sec") + if reloadSec == 0 { + return nil, errors.New("server.destinations_reload_sec can't be empty") + } + + if destinations != nil { + dc := map[string]storages.DestinationConfig{} + if err := destinations.Unmarshal(&dc); err != nil { + logging.Error(marshallingErrorMsg, err) + return service, nil + } + + service.init(dc) + + if len(service.unitsByName) == 0 { + logging.Errorf("Destinations are empty") + } + + } else if destinationsSource != "" { + if strings.HasPrefix(destinationsSource, "http://") || strings.HasPrefix(destinationsSource, "https://") { + resources.Watch(serviceName, destinationsSource, resources.LoadFromHttp, service.updateDestinations, time.Duration(reloadSec)*time.Second) + } else if strings.Contains(destinationsSource, "file://") { + resources.Watch(serviceName, strings.Replace(destinationsSource, "file://", "", 1), resources.LoadFromFile, service.updateDestinations, time.Duration(reloadSec)*time.Second) + } else if strings.HasPrefix(destinationsSource, "{") && strings.HasSuffix(destinationsSource, "}") { + service.updateDestinations([]byte(destinationsSource)) + } else { + return nil, errors.New("Unknown destination source: " + destinationsSource) + } + } + + return service, nil +} + +func (ds *Service) GetConsumers(token string) (consumers []events.Consumer) { + ds.RLock() + defer ds.RUnlock() + for _, c := range ds.consumersByToken[token] { + consumers = append(consumers, c) + } + return +} + +func (ds *Service) GetStorages(token string) (storages []events.StorageProxy) { + ds.RLock() + defer ds.RUnlock() + for _, s := range ds.storagesByToken[token] { + storages = append(storages, s) + } + return +} + +func (s *Service) updateDestinations(payload []byte) { + dc, err := parseFromBytes(payload) + if err != nil { + logging.Error(marshallingErrorMsg, err) + return + } + + s.init(dc) + + if len(s.unitsByName) == 0 { + logging.Errorf("Destinations are empty") + } +} + +//1. close and remove all destinations which don't exist in new config +//2. recreate/create changed/new destinations +func (s *Service) init(dc map[string]storages.DestinationConfig) { + StatusInstance.Reloading = true + + //close and remove non-existent (in new config) + toDelete := map[string]*Unit{} + for name, unit := range s.unitsByName { + _, ok := dc[name] + if !ok { + toDelete[name] = unit + } + } + if len(toDelete) > 0 { + s.Lock() + for name, unit := range toDelete { + s.remove(name, unit) + } + s.Unlock() + } + + // create or recreate + newConsumers := TokenizedConsumers{} + newStorages := TokenizedStorages{} + for name, d := range dc { + //common case + destination := d + + hash := getHash(name, destination) + unit, ok := s.unitsByName[name] + if ok { + if unit.hash == hash { + //destination wasn't changed + continue + } + //remove old + s.Lock() + s.remove(name, unit) + s.Unlock() + } + + //create new + newStorageProxy, eventQueue, err := s.storageFactoryMethod(s.ctx, name, s.logEventPath, destination, s.monitorKeeper) + if err != nil { + logging.Errorf("[%s] Error initializing destination of type %s: %v", name, destination.Type, err) + continue + } + + tokens := destination.OnlyTokens + if len(tokens) == 0 { + logging.Warnf("[%s] only_tokens wasn't provided. All tokens will be stored.", name) + for token := range appconfig.Instance.AuthorizationService.GetAllTokens() { + tokens = append(tokens, token) + } + } + + s.unitsByName[name] = &Unit{ + eventQueue: eventQueue, + storage: newStorageProxy, + tokens: tokens, + hash: hash, + } + + //append: + //storage per token + //consumer(event queue or logger) per token + for _, token := range tokens { + if destination.Mode == storages.StreamMode { + //2 destinations with 2 queues can be under 1 token + newConsumers.Add(token, name, eventQueue) + } else { + //get or create new logger + loggerUsage, ok := s.loggersUsageByToken[token] + if !ok { + eventLogWriter, err := logging.NewWriter(logging.Config{ + LoggerName: "event-" + token, + ServerName: appconfig.Instance.ServerName, + FileDir: s.logEventPath, + RotationMin: viper.GetInt64("log.rotation_min")}) + if err != nil { + logging.Errorf("[%s] Error creating tokenized logger: %v", name, err) + } else { + logger := events.NewAsyncLogger(eventLogWriter, viper.GetBool("log.show_in_server")) + loggerUsage = &LoggerUsage{logger: logger, usage: 0} + s.loggersUsageByToken[token] = loggerUsage + } + } + + if loggerUsage != nil { + loggerUsage.usage += 1 + //2 destinations with only 1 logger can be under 1 token + newConsumers.Add(token, token, loggerUsage.logger) + } + } + + newStorages.Add(token, name, newStorageProxy) + } + } + + s.Lock() + s.consumersByToken.AddAll(newConsumers) + s.storagesByToken.AddAll(newStorages) + s.Unlock() + + StatusInstance.Reloading = false +} + +//remove destination from all collections and close it +func (s *Service) remove(name string, unit *Unit) { + //remove from all collections: queue or logger(if needed) + storage + for _, token := range unit.tokens { + oldConsumers := s.consumersByToken[token] + if unit.eventQueue != nil { + delete(oldConsumers, name) + } else { + //logger + loggerUsage := s.loggersUsageByToken[token] + loggerUsage.usage -= 1 + if loggerUsage.usage == 0 { + delete(oldConsumers, token) + delete(s.loggersUsageByToken, token) + loggerUsage.logger.Close() + } + } + + if len(oldConsumers) == 0 { + delete(s.consumersByToken, token) + } + + //storage + oldStorages := s.storagesByToken[token] + delete(oldStorages, name) + if len(oldStorages) == 0 { + delete(s.storagesByToken, token) + } + } + + if err := unit.Close(); err != nil { + logging.Errorf("[%s] Error closing destination unit: %v", name, err) + } + + delete(s.unitsByName, name) +} + +func (s *Service) Close() (multiErr error) { + for token, loggerUsage := range s.loggersUsageByToken { + if err := loggerUsage.logger.Close(); err != nil { + multiErr = multierror.Append(multiErr, fmt.Errorf("Error closing logger for token [%s]: %v", token, err)) + } + } + + for name, unit := range s.unitsByName { + if err := unit.Close(); err != nil { + multiErr = multierror.Append(multiErr, fmt.Errorf("[%s] Error closing destination unit: %v", name, err)) + } + } + + return +} diff --git a/destinations/service_test.go b/destinations/service_test.go new file mode 100644 index 000000000..db9461f0a --- /dev/null +++ b/destinations/service_test.go @@ -0,0 +1,199 @@ +package destinations + +import ( + "context" + "github.com/ksensehq/eventnative/appconfig" + "github.com/ksensehq/eventnative/events" + "github.com/ksensehq/eventnative/storages" + "github.com/spf13/viper" + "github.com/stretchr/testify/require" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +type payloadHolder struct { + payload []byte +} + +type testProxyMock struct { +} + +func (tpm *testProxyMock) Get() (events.Storage, bool) { + return nil, false +} + +func (tpm *testProxyMock) Close() error { + return nil +} + +// 1. init +// 2. change +// 3. remove all +// 4. init again +// 5. remove all again +func TestServiceInit(t *testing.T) { + viper.Set("server.destinations_reload_sec", 1) + appconfig.Init() + + initialDestinations := `{ + "redshift_1": { + "type": "redshift", + "only_tokens": ["token1", "token2"], + "datasource": { + "host": "host_redshift_1" + } + }, + "pg_1": { + "type": "postgres", + "only_tokens": ["token1", "token3"], + "datasource": { + "host": "host_pg_1" + } + }, + "pg_2": { + "type": "postgres", + "mode": "stream", + "only_tokens": ["token3"], + "datasource": { + "host": "host_pg_2" + } + } +}` + payload := &payloadHolder{payload: []byte(initialDestinations)} + mockSourceServer := startTestServer(payload) + + service, err := NewService(context.Background(), nil, mockSourceServer.URL, "/tmp", nil, createTestStorage) + require.NoError(t, err) + require.NotNil(t, service) + + initialConfigAsserts(t, service) + //wasn't changed + time.Sleep(1 * time.Second) + initialConfigAsserts(t, service) + + //change + changedDestinations := `{ + "pg_1": { + "type": "postgres", + "only_tokens": ["token1", "token3", "token4"], + "datasource": { + "host": "host_pg_1" + } + }, + "pg_2": { + "type": "postgres", + "mode": "stream", + "only_tokens": ["token3"], + "datasource": { + "host": "host_pg_2" + } + }, + "pg_3": { + "type": "postgres", + "mode": "stream", + "only_tokens": ["token4"], + "datasource": { + "host": "host_pg_3" + } + }, +"pg_4": { + "type": "postgres", + "only_tokens": ["token3"], + "datasource": { + "host": "host_pg_4" + } + }, +"pg_5": { + "type": "postgres", + "mode": "stream", + "only_tokens": ["token3"], + "datasource": { + "host": "host_pg_5" + } + } +}` + payload.payload = []byte(changedDestinations) + time.Sleep(1 * time.Second) + changedConfigAsserts(t, service) + + //delete all + emptyDestinations := `{}` + payload.payload = []byte(emptyDestinations) + time.Sleep(1 * time.Second) + emptyConfigAsserts(t, service) + + //init all again + payload.payload = []byte(initialDestinations) + time.Sleep(1 * time.Second) + initialConfigAsserts(t, service) + + //delete all one more time + payload.payload = []byte(emptyDestinations) + time.Sleep(1 * time.Second) + emptyConfigAsserts(t, service) +} + +func initialConfigAsserts(t *testing.T, service *Service) { + require.Equal(t, 3, len(service.storagesByToken)) + require.Equal(t, 3, len(service.consumersByToken)) + + require.Equal(t, 2, len(service.GetStorages("token1"))) + require.Equal(t, 1, len(service.GetConsumers("token1"))) + + require.Equal(t, 1, len(service.GetStorages("token2"))) + require.Equal(t, 1, len(service.GetConsumers("token2"))) + + require.Equal(t, 2, len(service.GetStorages("token3"))) + require.Equal(t, 2, len(service.GetConsumers("token3"))) +} + +func changedConfigAsserts(t *testing.T, service *Service) { + require.Equal(t, 3, len(service.storagesByToken)) + require.Equal(t, 3, len(service.consumersByToken)) + + require.Equal(t, 1, len(service.GetStorages("token1"))) + require.Equal(t, 1, len(service.GetConsumers("token1"))) + + require.Equal(t, 0, len(service.GetStorages("token2"))) + require.Equal(t, 0, len(service.GetConsumers("token2"))) + + require.Equal(t, 4, len(service.GetStorages("token3"))) + require.Equal(t, 3, len(service.GetConsumers("token3"))) + + require.Equal(t, 2, len(service.GetStorages("token4"))) + require.Equal(t, 2, len(service.GetConsumers("token4"))) +} + +func emptyConfigAsserts(t *testing.T, service *Service) { + require.Equal(t, 0, len(service.storagesByToken)) + require.Equal(t, 0, len(service.consumersByToken)) + + require.Equal(t, 0, len(service.GetStorages("token1"))) + require.Equal(t, 0, len(service.GetConsumers("token1"))) + + require.Equal(t, 0, len(service.GetStorages("token2"))) + require.Equal(t, 0, len(service.GetConsumers("token2"))) + + require.Equal(t, 0, len(service.GetStorages("token3"))) + require.Equal(t, 0, len(service.GetConsumers("token3"))) + + require.Equal(t, 0, len(service.GetStorages("token4"))) + require.Equal(t, 0, len(service.GetConsumers("token4"))) +} + +func startTestServer(ph *payloadHolder) *httptest.Server { + return httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write(ph.payload) + })) +} + +func createTestStorage(ctx context.Context, name, logEventPath string, destination storages.DestinationConfig, monitorKeeper storages.MonitorKeeper) (events.StorageProxy, *events.PersistentQueue, error) { + var eventQueue *events.PersistentQueue + if destination.Mode == storages.StreamMode { + eventQueue, _ = events.NewPersistentQueue(name, "/tmp") + } + return &testProxyMock{}, eventQueue, nil +} diff --git a/destinations/status.go b/destinations/status.go new file mode 100644 index 000000000..a0bc4968c --- /dev/null +++ b/destinations/status.go @@ -0,0 +1,9 @@ +package destinations + +var StatusInstance = &Status{Reloading: false} + +//Singleton struct for storing destinations reloading state. Uploader check this flag +//and don't upload batch files if Reloading = true +type Status struct { + Reloading bool +} diff --git a/destinations/unit.go b/destinations/unit.go new file mode 100644 index 000000000..d5a91d31e --- /dev/null +++ b/destinations/unit.go @@ -0,0 +1,29 @@ +package destinations + +import ( + "fmt" + "github.com/hashicorp/go-multierror" + "github.com/ksensehq/eventnative/events" +) + +//Unit holds storage bundle for closing at once +type Unit struct { + eventQueue *events.PersistentQueue + storage events.StorageProxy + + tokens []string + hash string +} + +//Close eventsQueue if exists and storage +func (u *Unit) Close() (multiErr error) { + if u.eventQueue != nil { + if err := u.eventQueue.Close(); err != nil { + multiErr = multierror.Append(multiErr, fmt.Errorf("Error closing events queue: %v", err)) + } + } + if err := u.storage.Close(); err != nil { + multiErr = multierror.Append(multiErr, err) + } + return +} diff --git a/events/s2s_preprocessor.go b/events/api_preprocessor.go similarity index 76% rename from events/s2s_preprocessor.go rename to events/api_preprocessor.go index 5b69eed69..b6352b59a 100644 --- a/events/s2s_preprocessor.go +++ b/events/api_preprocessor.go @@ -9,14 +9,14 @@ import ( "net/http" ) -//S2SPreprocessor preprocess server 2 server integration events -type S2SPreprocessor struct { +//ApiPreprocessor preprocess server 2 server integration events +type ApiPreprocessor struct { geoResolver geo.Resolver uaResolver useragent.Resolver } -func NewS2SPreprocessor() Preprocessor { - return &S2SPreprocessor{ +func NewApiPreprocessor() Preprocessor { + return &ApiPreprocessor{ geoResolver: appconfig.Instance.GeoResolver, uaResolver: appconfig.Instance.UaResolver, } @@ -25,12 +25,12 @@ func NewS2SPreprocessor() Preprocessor { //Preprocess resolve geo from ip field or skip if geo.GeoDataKey field was provided //resolve useragent from uaKey or skip if useragent.ParsedUaKey field was provided //return same object -func (s2sp *S2SPreprocessor) Preprocess(fact Fact, r *http.Request) (Fact, error) { +func (ap *ApiPreprocessor) Preprocess(fact Fact, r *http.Request) (Fact, error) { if fact == nil { return nil, errors.New("Input fact can't be nil") } - fact["src"] = "s2s" + fact["src"] = "api" ip := extractIp(r) if ip != "" { fact[ipKey] = ip @@ -41,7 +41,7 @@ func (s2sp *S2SPreprocessor) Preprocess(fact Fact, r *http.Request) (Fact, error //geo.GeoDataKey node overwrite geo resolving if _, ok := deviceCtxObject[geo.GeoDataKey]; !ok { if ip, ok := deviceCtxObject["ip"]; ok { - geoData, err := s2sp.geoResolver.Resolve(ip.(string)) + geoData, err := ap.geoResolver.Resolve(ip.(string)) if err != nil { logging.Error(err) } @@ -54,7 +54,7 @@ func (s2sp *S2SPreprocessor) Preprocess(fact Fact, r *http.Request) (Fact, error if _, ok := deviceCtxObject[useragent.ParsedUaKey]; !ok { if ua, ok := deviceCtxObject[uaKey]; ok { if uaStr, ok := ua.(string); ok { - deviceCtxObject[useragent.ParsedUaKey] = s2sp.uaResolver.Resolve(uaStr) + deviceCtxObject[useragent.ParsedUaKey] = ap.uaResolver.Resolve(uaStr) } } } diff --git a/events/s2s_preprocessor_test.go b/events/api_preprocessor_test.go similarity index 89% rename from events/s2s_preprocessor_test.go rename to events/api_preprocessor_test.go index 8f2fc8208..b0360f0d2 100644 --- a/events/s2s_preprocessor_test.go +++ b/events/api_preprocessor_test.go @@ -8,7 +8,7 @@ import ( "testing" ) -func TestS2SPreprocess(t *testing.T) { +func TestApiPreprocess(t *testing.T) { geoDataMock := &geo.Data{ Country: "US", City: "New York", @@ -35,13 +35,13 @@ func TestS2SPreprocess(t *testing.T) { "Empty input object", Fact{}, &http.Request{Header: http.Header{}}, - Fact{"src": "s2s"}, + Fact{"src": "api"}, "", }, { "Process ok without device ctx", Fact{ - "event_origin": "s2s_test", + "event_origin": "api_test", "src": "123", "event_data": map[string]interface{}{"key1": "key2"}, "user": map[string]interface{}{"id": "123"}, @@ -49,10 +49,10 @@ func TestS2SPreprocess(t *testing.T) { &http.Request{Header: http.Header{"X-Forwarded-For": []string{"10.10.10.10"}}}, Fact{ "event_data": map[string]interface{}{"key1": "key2"}, - "event_origin": "s2s_test", + "event_origin": "api_test", "user": map[string]interface{}{"id": "123"}, "page_ctx": map[string]interface{}{"referer": "www.site.com"}, - "src": "s2s", + "src": "api", "source_ip": "10.10.10.10", }, "", @@ -60,7 +60,7 @@ func TestS2SPreprocess(t *testing.T) { { "Process ok with device ctx but without ip and ua", Fact{ - "event_origin": "s2s_test", + "event_origin": "api_test", "src": "123", "event_data": map[string]interface{}{"key1": "key2"}, "user": map[string]interface{}{"id": "123"}, @@ -73,10 +73,10 @@ func TestS2SPreprocess(t *testing.T) { "location": (*geo.Data)(nil), }, "event_data": map[string]interface{}{"key1": "key2"}, - "event_origin": "s2s_test", + "event_origin": "api_test", "user": map[string]interface{}{"id": "123"}, "page_ctx": map[string]interface{}{"referer": "www.site.com"}, - "src": "s2s", + "src": "api", "source_ip": "10.10.10.10", }, "", @@ -84,7 +84,7 @@ func TestS2SPreprocess(t *testing.T) { { "Process ok with device ctx with ip and ua", Fact{ - "event_origin": "s2s_test", + "event_origin": "api_test", "src": "123", "event_data": map[string]interface{}{"key1": "key2"}, "user": map[string]interface{}{"id": "123"}, @@ -97,10 +97,10 @@ func TestS2SPreprocess(t *testing.T) { "location": geoDataMock, }, "event_data": map[string]interface{}{"key1": "key2"}, - "event_origin": "s2s_test", + "event_origin": "api_test", "user": map[string]interface{}{"id": "123"}, "page_ctx": map[string]interface{}{"referer": "www.site.com"}, - "src": "s2s", + "src": "api", "source_ip": "10.10.10.10", }, "", @@ -108,7 +108,7 @@ func TestS2SPreprocess(t *testing.T) { { "Process ok with location and parsed ua", Fact{ - "event_origin": "s2s_test", + "event_origin": "api_test", "src": "123", "event_data": map[string]interface{}{"key1": "key2"}, "user": map[string]interface{}{"id": "123"}, @@ -124,10 +124,10 @@ func TestS2SPreprocess(t *testing.T) { "parsed_ua": map[string]interface{}{"custom_ua": "123"}, }, "event_data": map[string]interface{}{"key1": "key2"}, - "event_origin": "s2s_test", + "event_origin": "api_test", "user": map[string]interface{}{"id": "123"}, "page_ctx": map[string]interface{}{"referer": "www.site.com"}, - "src": "s2s", + "src": "api", "source_ip": "10.10.10.10", }, "", @@ -143,7 +143,7 @@ func TestS2SPreprocess(t *testing.T) { "billing": []string{"1", "2"}, "keys": map[string]interface{}{"key1": "key2"}, "weather": map[string]interface{}{"id": "123", "type": "good"}, - "src": "s2s", + "src": "api", "source_ip": "10.10.10.10", }, "", @@ -151,12 +151,12 @@ func TestS2SPreprocess(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s2sPreprocessor := &S2SPreprocessor{ + apiPreprocessor := &ApiPreprocessor{ geoResolver: geo.Mock{"20.20.20.20": geoDataMock}, uaResolver: useragent.Mock{}, } - actualFact, actualErr := s2sPreprocessor.Preprocess(tt.input, tt.inputReq) + actualFact, actualErr := apiPreprocessor.Preprocess(tt.input, tt.inputReq) if tt.expectedErr == "" { require.NoError(t, actualErr) } else { diff --git a/events/destination_service.go b/events/destination_service.go deleted file mode 100644 index 42da25634..000000000 --- a/events/destination_service.go +++ /dev/null @@ -1,37 +0,0 @@ -package events - -import "sync" - -//DestinationService is reloadable service of events destinations per token -type DestinationService struct { - sync.RWMutex - consumersByToken map[string][]Consumer - storagesByToken map[string][]StorageProxy -} - -func NewDestinationService(consumersByToken map[string][]Consumer, storagesByToken map[string][]StorageProxy) *DestinationService { - return &DestinationService{ - RWMutex: sync.RWMutex{}, - consumersByToken: consumersByToken, - storagesByToken: storagesByToken, - } -} - -func (ds *DestinationService) GetConsumers(token string) []Consumer { - ds.RLock() - defer ds.RUnlock() - return ds.consumersByToken[token] -} - -func (ds *DestinationService) GetStorages(token string) []StorageProxy { - ds.RLock() - defer ds.RUnlock() - return ds.storagesByToken[token] -} - -func (ds *DestinationService) Reload(consumersByToken map[string][]Consumer, storagesByToken map[string][]StorageProxy) { - ds.Lock() - defer ds.Unlock() - ds.storagesByToken = storagesByToken - ds.consumersByToken = consumersByToken -} diff --git a/events/c2s_preprocessor.go b/events/js_preprocessor.go similarity index 81% rename from events/c2s_preprocessor.go rename to events/js_preprocessor.go index c1bb463d1..4b2fd2a33 100644 --- a/events/c2s_preprocessor.go +++ b/events/js_preprocessor.go @@ -21,14 +21,14 @@ type Preprocessor interface { Preprocess(fact Fact, r *http.Request) (Fact, error) } -//C2SPreprocessor preprocess client 2 server integration events -type C2SPreprocessor struct { +//JsPreprocessor preprocess client 2 server integration events +type JsPreprocessor struct { geoResolver geo.Resolver uaResolver useragent.Resolver } -func NewC2SPreprocessor() Preprocessor { - return &C2SPreprocessor{ +func NewJsPreprocessor() Preprocessor { + return &JsPreprocessor{ geoResolver: appconfig.Instance.GeoResolver, uaResolver: appconfig.Instance.UaResolver, } @@ -38,7 +38,7 @@ func NewC2SPreprocessor() Preprocessor { //resolve useragent from uaKey //put data to eventnKey //return same object -func (c2sp *C2SPreprocessor) Preprocess(fact Fact, r *http.Request) (Fact, error) { +func (jp *JsPreprocessor) Preprocess(fact Fact, r *http.Request) (Fact, error) { if fact == nil { return nil, nilFactErr } @@ -58,7 +58,7 @@ func (c2sp *C2SPreprocessor) Preprocess(fact Fact, r *http.Request) (Fact, error return nil, fmt.Errorf("Unable to cast %s to object: %v", eventnKey, eventnObject) } - geoData, err := c2sp.geoResolver.Resolve(ip) + geoData, err := jp.geoResolver.Resolve(ip) if err != nil { logging.Error(err) } @@ -70,7 +70,7 @@ func (c2sp *C2SPreprocessor) Preprocess(fact Fact, r *http.Request) (Fact, error ua, ok := eventFact[uaKey] if ok { if uaStr, ok := ua.(string); ok { - eventFact[useragent.ParsedUaKey] = c2sp.uaResolver.Resolve(uaStr) + eventFact[useragent.ParsedUaKey] = jp.uaResolver.Resolve(uaStr) } } diff --git a/events/c2s_preprocessor_test.go b/events/js_preprocessor_test.go similarity index 96% rename from events/c2s_preprocessor_test.go rename to events/js_preprocessor_test.go index 2964c18a4..96002d1b7 100644 --- a/events/c2s_preprocessor_test.go +++ b/events/js_preprocessor_test.go @@ -8,7 +8,7 @@ import ( "testing" ) -func TestC2SPreprocess(t *testing.T) { +func TestJsPreprocess(t *testing.T) { geoDataMock := &geo.Data{ Country: "US", City: "New York", @@ -71,7 +71,7 @@ func TestC2SPreprocess(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c2sPreprocessor := &C2SPreprocessor{ + c2sPreprocessor := &JsPreprocessor{ geoResolver: geo.Mock{"10.10.10.10": geoDataMock}, uaResolver: useragent.Mock{}, } diff --git a/go.mod b/go.mod index 37734231b..d992c2072 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/gin-gonic/gin v1.6.3 github.com/golang/protobuf v1.4.2 // indirect github.com/google/go-cmp v0.5.1 // indirect + github.com/google/martian v2.1.0+incompatible github.com/google/uuid v1.1.1 github.com/gookit/color v1.3.1 github.com/hashicorp/go-multierror v1.1.0 diff --git a/handlers/event.go b/handlers/event.go index 59da44cc0..f98290a20 100644 --- a/handlers/event.go +++ b/handlers/event.go @@ -2,6 +2,7 @@ package handlers import ( "github.com/gin-gonic/gin" + "github.com/ksensehq/eventnative/destinations" "github.com/ksensehq/eventnative/events" "github.com/ksensehq/eventnative/logging" "github.com/ksensehq/eventnative/middleware" @@ -13,12 +14,12 @@ const apiTokenKey = "api_key" //Accept all events type EventHandler struct { - destinationService *events.DestinationService + destinationService *destinations.Service preprocessor events.Preprocessor } //Accept all events according to token -func NewEventHandler(destinationService *events.DestinationService, preprocessor events.Preprocessor) (eventHandler *EventHandler) { +func NewEventHandler(destinationService *destinations.Service, preprocessor events.Preprocessor) (eventHandler *EventHandler) { return &EventHandler{ destinationService: destinationService, preprocessor: preprocessor, diff --git a/logfiles/uploader.go b/logfiles/uploader.go index e6db49a6b..3b7464c7c 100644 --- a/logfiles/uploader.go +++ b/logfiles/uploader.go @@ -2,7 +2,7 @@ package logfiles import ( "github.com/ksensehq/eventnative/appstatus" - "github.com/ksensehq/eventnative/events" + "github.com/ksensehq/eventnative/destinations" "github.com/ksensehq/eventnative/logging" "io/ioutil" "os" @@ -26,10 +26,10 @@ type PeriodicUploader struct { uploadEvery time.Duration statusManager *statusManager - destinationService *events.DestinationService + destinationService *destinations.Service } -func NewUploader(logEventPath, fileMask string, filesBatchSize, uploadEveryS int, destinationService *events.DestinationService) (*PeriodicUploader, error) { +func NewUploader(logEventPath, fileMask string, filesBatchSize, uploadEveryS int, destinationService *destinations.Service) (*PeriodicUploader, error) { statusManager, err := newStatusManager(logEventPath) if err != nil { return nil, err @@ -53,6 +53,12 @@ func (u *PeriodicUploader) Start() { if appstatus.Instance.Idle { break } + + if destinations.StatusInstance.Reloading { + time.Sleep(2 * time.Second) + continue + } + files, err := filepath.Glob(u.fileMask) if err != nil { logging.Error("Error finding files by mask", u.fileMask, err) @@ -86,7 +92,7 @@ func (u *PeriodicUploader) Start() { token := regexResult[1] storageProxies := u.destinationService.GetStorages(token) if len(storageProxies) == 0 { - logging.Warnf("Destination storages weren't found for token %s", token) + logging.Warnf("Destination storages weren't found for file [%s] and token [%s]", filePath, token) continue } diff --git a/logging/writer.go b/logging/writer.go index 11da92968..c64aa659a 100644 --- a/logging/writer.go +++ b/logging/writer.go @@ -2,6 +2,7 @@ package logging import ( "fmt" + "github.com/google/martian/log" "gopkg.in/natefinch/lumberjack.v2" "io" "os" @@ -11,6 +12,10 @@ import ( const logFileMaxSizeMB = 100 +type WriterProxy struct { + lWriter *lumberjack.Logger +} + //Create stdout or file or mock writers func NewWriter(config Config) (io.WriteCloser, error) { if err := config.Validate(); err != nil { @@ -41,9 +46,23 @@ func newRollingWriter(config Config) io.WriteCloser { go func() { for { <-ticker.C - lWriter.Rotate() + if err := lWriter.Rotate(); err != nil { + log.Errorf("Error rotating log file: %v", err) + } } }() - return lWriter + return &WriterProxy{lWriter: lWriter} +} + +func (wp *WriterProxy) Write(p []byte) (int, error) { + return wp.lWriter.Write(p) +} + +func (wp *WriterProxy) Close() error { + if err := wp.lWriter.Rotate(); err != nil { + log.Errorf("Error rotating log file: %v", err) + } + + return wp.lWriter.Close() } diff --git a/main.go b/main.go index 8f6ac0b35..719c64081 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( "github.com/gin-gonic/gin" "github.com/ksensehq/eventnative/appconfig" "github.com/ksensehq/eventnative/appstatus" + "github.com/ksensehq/eventnative/destinations" "github.com/ksensehq/eventnative/events" "github.com/ksensehq/eventnative/handlers" "github.com/ksensehq/eventnative/logfiles" @@ -31,6 +32,8 @@ const ( uploaderFileMask = "-event-*-20*.log" uploaderBatchSize = 50 uploaderLoadEveryS = 60 + + destinationsKey = "destinations" ) var ( @@ -93,7 +96,8 @@ func main() { os.Exit(0) }() - destinationsViper := viper.Sub("destinations") + destinationsViper := viper.Sub(destinationsKey) + destinationsSource := viper.GetString(destinationsKey) //override with config from os env jsonConfig := viper.GetString("destinations_json") @@ -103,14 +107,15 @@ func main() { if err := envJsonViper.ReadConfig(bytes.NewBufferString(jsonConfig)); err != nil { logging.Error("Error reading/parsing json config from DESTINATIONS_JSON", err) } else { - destinationsViper = envJsonViper.Sub("destinations") + destinationsViper = envJsonViper.Sub(destinationsKey) + destinationsSource = envJsonViper.GetString(destinationsKey) } } - syncServiceType := viper.GetString("synchronization_service.type") - syncServiceEndpoint := viper.GetString("synchronization_service.endpoint") - connectionTimeoutSeconds := viper.GetUint("synchronization_service.connection_timeout_seconds") - monitorKeeper, err := storages.NewMonitorKeeper(syncServiceType, syncServiceEndpoint, connectionTimeoutSeconds) + monitorKeeper, err := storages.NewMonitorKeeper( + viper.GetString("synchronization_service.type"), + viper.GetString("synchronization_service.endpoint"), + viper.GetUint("synchronization_service.connection_timeout_seconds")) if err != nil { logging.Fatal("Failed to initiate monitor keeper ", err) } @@ -119,12 +124,11 @@ func main() { logEventPath := viper.GetString("log.path") //Create event destinations: - //per token - //storage - //consumer - batchStoragesByToken, streamingConsumersByToken := storages.Create(ctx, destinationsViper, logEventPath, monitorKeeper) - - destinationsService := events.NewDestinationService(streamingConsumersByToken, batchStoragesByToken) + destinationsService, err := destinations.NewService(ctx, destinationsViper, destinationsSource, logEventPath, monitorKeeper, storages.Create) + if err != nil { + logging.Fatal(err) + } + appconfig.Instance.ScheduleClosing(destinationsService) //Uploader must read event logger directory uploader, err := logfiles.NewUploader(logEventPath, appconfig.Instance.ServerName+uploaderFileMask, uploaderBatchSize, uploaderLoadEveryS, destinationsService) @@ -147,7 +151,7 @@ func main() { logging.Fatal(server.ListenAndServe()) } -func SetupRouter(destinations *events.DestinationService) *gin.Engine { +func SetupRouter(destinations *destinations.Service) *gin.Engine { gin.SetMode(gin.ReleaseMode) router := gin.New() //gin.Default() @@ -166,12 +170,12 @@ func SetupRouter(destinations *events.DestinationService) *gin.Engine { router.GET("/s/:filename", staticHandler.Handler) router.GET("/t/:filename", staticHandler.Handler) - c2sEventHandler := handlers.NewEventHandler(destinations, events.NewC2SPreprocessor()).Handler - s2sEventHandler := handlers.NewEventHandler(destinations, events.NewS2SPreprocessor()).Handler + jsEventHandler := handlers.NewEventHandler(destinations, events.NewJsPreprocessor()).Handler + apiEventHandler := handlers.NewEventHandler(destinations, events.NewApiPreprocessor()).Handler apiV1 := router.Group("/api/v1") { - apiV1.POST("/event", middleware.TokenAuth(c2sEventHandler, appconfig.Instance.AuthorizationService.GetC2SOrigins, "")) - apiV1.POST("/s2s/event", middleware.TokenAuth(s2sEventHandler, appconfig.Instance.AuthorizationService.GetS2SOrigins, "The token isn't a server token. Please use s2s integration token\n")) + apiV1.POST("/event", middleware.TokenAuth(jsEventHandler, appconfig.Instance.AuthorizationService.GetJsOrigins, "")) + apiV1.POST("/s2s/event", middleware.TokenAuth(apiEventHandler, appconfig.Instance.AuthorizationService.GetApiOrigins, "The token isn't a server token. Please use s2s integration token\n")) } return router diff --git a/main_test.go b/main_test.go index 7fb13e663..bf6eb02ce 100644 --- a/main_test.go +++ b/main_test.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "github.com/ksensehq/eventnative/destinations" "github.com/ksensehq/eventnative/events" "github.com/ksensehq/eventnative/logging" "github.com/ksensehq/eventnative/middleware" @@ -23,8 +24,7 @@ import ( func SetTestDefaultParams() { viper.Set("log.path", "") - viper.Set("server.auth", []string{"c2stoken"}) - viper.Set("server.s2s_auth", `[{"token":"s2stoken", "origins":["whiteorigin*"]}]`) + viper.Set("server.auth", `{"js": ["c2stoken"], "api":[{"token":"s2stoken", "origins":["whiteorigin*"]}]}`) } func TestApiEvent(t *testing.T) { @@ -52,7 +52,7 @@ func TestApiEvent(t *testing.T) { "Unauthorized s2s endpoint", "/api/v1/s2s/event?token=wrongtoken", "", - "test_data/s2s_event_input.json", + "test_data/api_event_input.json", "", http.StatusUnauthorized, "The token isn't a server token. Please use s2s integration token\n", @@ -70,7 +70,7 @@ func TestApiEvent(t *testing.T) { "Unauthorized s2s wrong origin", "/api/v1/s2s/event?token=s2stoken", "http://ksense.com", - "test_data/s2s_event_input.json", + "test_data/api_event_input.json", "", http.StatusUnauthorized, "", @@ -89,8 +89,8 @@ func TestApiEvent(t *testing.T) { "S2S Api event consuming test", "/api/v1/s2s/event?token=s2stoken", "https://whiteorigin.com/", - "test_data/s2s_event_input.json", - "test_data/s2s_fact_output.json", + "test_data/api_event_input.json", + "test_data/api_fact_output.json", http.StatusOK, "", }, @@ -104,10 +104,11 @@ func TestApiEvent(t *testing.T) { defer appconfig.Instance.Close() inmemWriter := logging.InitInMemoryWriter() - router := SetupRouter(events.NewDestinationService(map[string][]events.Consumer{ - "c2stoken": {events.NewAsyncLogger(inmemWriter, false)}, - "s2stoken": {events.NewAsyncLogger(inmemWriter, false)}, - }, map[string][]events.StorageProxy{})) + router := SetupRouter(destinations.NewTestService( + map[string]map[string]events.Consumer{ + "c2stoken": {"test": events.NewAsyncLogger(inmemWriter, false)}, + "s2stoken": {"test": events.NewAsyncLogger(inmemWriter, false)}, + }, map[string]map[string]events.StorageProxy{})) freezeTime := time.Date(2020, 06, 16, 23, 0, 0, 0, time.UTC) patch := monkey.Patch(time.Now, func() time.Time { return freezeTime }) diff --git a/middleware/token_auth.go b/middleware/token_auth.go index 46451c290..5ea98e7a4 100644 --- a/middleware/token_auth.go +++ b/middleware/token_auth.go @@ -13,7 +13,7 @@ const TokenName = "token" //TokenAuth check that provided token: //1. is valid -//2. exists in specific (c2s or s2s) config +//2. exists in specific (js or api) token config //3. origins equal func TokenAuth(main gin.HandlerFunc, isAllowedOriginsFunc func(string) ([]string, bool), errMsg string) gin.HandlerFunc { return func(c *gin.Context) { diff --git a/resources/watcher.go b/resources/watcher.go new file mode 100644 index 000000000..d0ed826c8 --- /dev/null +++ b/resources/watcher.go @@ -0,0 +1,67 @@ +package resources + +import ( + "crypto/md5" + "fmt" + "github.com/ksensehq/eventnative/appstatus" + "github.com/ksensehq/eventnative/logging" + "time" +) + +type Watcher struct { + name string + hash string + source string + reloadEvery time.Duration + + loadFunc func(string) ([]byte, error) + consumer func([]byte) +} + +//First load source then run goroutine to reload source every 'reloadEvery' duration +//On every load check if content was changed => run consumer otherwise do nothing +func Watch(name, source string, loadFunc func(string) ([]byte, error), consumer func([]byte), reloadEvery time.Duration) { + w := &Watcher{ + name: name, + hash: "", + source: source, + loadFunc: loadFunc, + consumer: consumer, + reloadEvery: reloadEvery, + } + w.watch() +} + +func (w *Watcher) watch() { + w.download() + go func() { + for { + if appstatus.Instance.Idle { + break + } + + time.Sleep(w.reloadEvery) + + w.download() + } + }() +} + +func (w *Watcher) download() { + payload, err := w.loadFunc(w.source) + if err != nil { + logging.Errorf("Error reloading resource [%s]: %v", w.name, err) + return + } + + newHash := GetHash(payload) + if w.hash != newHash { + w.consumer(payload) + logging.Infof("New resource [%s] has been loaded", w.name) + w.hash = newHash + } +} + +func GetHash(payload []byte) string { + return fmt.Sprintf("%x", md5.Sum(payload)) +} diff --git a/storages/factory.go b/storages/factory.go index afbf3ebe5..4ad5f15b6 100644 --- a/storages/factory.go +++ b/storages/factory.go @@ -9,35 +9,34 @@ import ( "github.com/ksensehq/eventnative/events" "github.com/ksensehq/eventnative/logging" "github.com/ksensehq/eventnative/schema" - "github.com/spf13/viper" ) const ( defaultTableName = "events" - batchMode = "batch" - streamMode = "stream" + BatchMode = "batch" + StreamMode = "stream" ) var unknownDestination = errors.New("Unknown destination type") type DestinationConfig struct { - OnlyTokens []string `mapstructure:"only_tokens"` - Type string `mapstructure:"type"` - Mode string `mapstructure:"mode"` - DataLayout *DataLayout `mapstructure:"data_layout"` - BreakOnError bool `mapstructure:"break_on_error"` - - DataSource *adapters.DataSourceConfig `mapstructure:"datasource"` - S3 *adapters.S3Config `mapstructure:"s3"` - Google *adapters.GoogleConfig `mapstructure:"google"` - ClickHouse *adapters.ClickHouseConfig `mapstructure:"clickhouse"` - Snowflake *adapters.SnowflakeConfig `mapstructure:"snowflake"` + OnlyTokens []string `mapstructure:"only_tokens" json:"only_tokens,omitempty"` + Type string `mapstructure:"type" json:"type,omitempty"` + Mode string `mapstructure:"mode" json:"mode,omitempty"` + DataLayout *DataLayout `mapstructure:"data_layout" json:"data_layout,omitempty"` + BreakOnError bool `mapstructure:"break_on_error" json:"break_on_error,omitempty"` + + DataSource *adapters.DataSourceConfig `mapstructure:"datasource" json:"datasource,omitempty"` + S3 *adapters.S3Config `mapstructure:"s3" json:"s3,omitempty"` + Google *adapters.GoogleConfig `mapstructure:"google" json:"google,omitempty"` + ClickHouse *adapters.ClickHouseConfig `mapstructure:"clickhouse" json:"clickhouse,omitempty"` + Snowflake *adapters.SnowflakeConfig `mapstructure:"snowflake" json:"snowflake,omitempty"` } type DataLayout struct { - Mapping []string `mapstructure:"mapping"` - TableNameTemplate string `mapstructure:"table_name_template"` + Mapping []string `mapstructure:"mapping" json:"mapping,omitempty"` + TableNameTemplate string `mapstructure:"table_name_template" json:"table_name_template,omitempty"` } type Config struct { @@ -50,151 +49,92 @@ type Config struct { eventQueue *events.PersistentQueue } -//Create event storage proxies and event consumers (loggers or event-queues) from incoming config +//Create event storage proxy and event consumer (logger or event-queue) //Enrich incoming configs with default values if needed -func Create(ctx context.Context, destinations *viper.Viper, logEventPath string, monitorKeeper MonitorKeeper) (map[string][]events.StorageProxy, map[string][]events.Consumer) { - storageProxies := map[string][]events.StorageProxy{} - consumers := map[string][]events.Consumer{} - loggers := map[string]events.Consumer{} - - if destinations == nil { - return storageProxies, consumers +func Create(ctx context.Context, name, logEventPath string, destination DestinationConfig, monitorKeeper MonitorKeeper) (events.StorageProxy, *events.PersistentQueue, error) { + if destination.Type == "" { + destination.Type = name } - - dc := map[string]DestinationConfig{} - if err := destinations.Unmarshal(&dc); err != nil { - logging.Error("Error initializing destinations: wrong config format: each destination must contains one key and config as a value e.g. destinations:\n custom_name:\n type: redshift ...", err) - return storageProxies, consumers + if destination.Mode == "" { + destination.Mode = BatchMode } - for name, d := range dc { - //common case - destination := d - if destination.Type == "" { - destination.Type = name - } - if destination.Mode == "" { - destination.Mode = batchMode - } + var mapping []string + var tableName string + if destination.DataLayout != nil { + mapping = destination.DataLayout.Mapping - var mapping []string - var tableName string - if destination.DataLayout != nil { - mapping = destination.DataLayout.Mapping - - if destination.DataLayout.TableNameTemplate != "" { - tableName = destination.DataLayout.TableNameTemplate - } - } - - logging.Infof("[%s] Initializing destination of type: %s in mode: %s", name, destination.Type, destination.Mode) - - if tableName == "" { - tableName = defaultTableName - logging.Infof("[%s] uses default table name: %s", name, tableName) + if destination.DataLayout.TableNameTemplate != "" { + tableName = destination.DataLayout.TableNameTemplate } + } - if len(mapping) == 0 { - logging.Warnf("[%s] doesn't have mapping rules", name) - } else { - logging.Infof("[%s] Configured field mapping rules:", name) - for _, m := range mapping { - logging.Infof("[%s] %s", name, m) - } - } + logging.Infof("[%s] Initializing destination of type: %s in mode: %s", name, destination.Type, destination.Mode) - if destination.Mode != batchMode && destination.Mode != streamMode { - logError(name, destination.Type, fmt.Errorf("Unknown destination mode: %s. Available mode: [%s, %s]", destination.Mode, batchMode, streamMode)) - continue - } + if tableName == "" { + tableName = defaultTableName + logging.Infof("[%s] uses default table name: %s", name, tableName) + } - processor, err := schema.NewProcessor(tableName, mapping) - if err != nil { - logError(name, destination.Type, err) - continue + if len(mapping) == 0 { + logging.Warnf("[%s] doesn't have mapping rules", name) + } else { + logging.Infof("[%s] Configured field mapping rules:", name) + for _, m := range mapping { + logging.Infof("[%s] %s", name, m) } + } - var eventQueue *events.PersistentQueue - if destination.Mode == streamMode { - queueName := fmt.Sprintf("%s-%s", appconfig.Instance.ServerName, name) - eventQueue, err = events.NewPersistentQueue(queueName, logEventPath) - if err != nil { - logError(name, destination.Type, err) - continue - } - - appconfig.Instance.ScheduleClosing(eventQueue) - } + if destination.Mode != BatchMode && destination.Mode != StreamMode { + return nil, nil, fmt.Errorf("Unknown destination mode: %s. Available mode: [%s, %s]", destination.Mode, BatchMode, StreamMode) + } - storageConfig := &Config{ - ctx: ctx, - name: name, - destination: &destination, - processor: processor, - streamMode: destination.Mode == streamMode, - monitorKeeper: monitorKeeper, - eventQueue: eventQueue, - } + processor, err := schema.NewProcessor(tableName, mapping) + if err != nil { + return nil, nil, err + } - var storageProxy events.StorageProxy - switch destination.Type { - case "redshift": - storageProxy = newProxy(createRedshift, storageConfig) - case "bigquery": - storageProxy = newProxy(createBigQuery, storageConfig) - case "postgres": - storageProxy = newProxy(createPostgres, storageConfig) - case "clickhouse": - storageProxy = newProxy(createClickHouse, storageConfig) - case "s3": - storageProxy = newProxy(createS3, storageConfig) - case "snowflake": - storageProxy = newProxy(createSnowflake, storageConfig) - default: - logError(name, destination.Type, unknownDestination) - continue + var eventQueue *events.PersistentQueue + if destination.Mode == StreamMode { + queueName := fmt.Sprintf("%s-%s", appconfig.Instance.ServerName, name) + eventQueue, err = events.NewPersistentQueue(queueName, logEventPath) + if err != nil { + return nil, nil, err } + } - appconfig.Instance.ScheduleClosing(storageProxy) - - tokens := destination.OnlyTokens - if len(tokens) == 0 { - logging.Warnf("[%s] only_tokens wasn't provided. All tokens will be stored.", name) - for token := range appconfig.Instance.AuthorizationService.GetAllTokens() { - tokens = append(tokens, token) - } - } + storageConfig := &Config{ + ctx: ctx, + name: name, + destination: &destination, + processor: processor, + streamMode: destination.Mode == StreamMode, + monitorKeeper: monitorKeeper, + eventQueue: eventQueue, + } - //append: - //storage per token - //consumer(event queue or logger) per token - for _, token := range tokens { - if storageConfig.streamMode { - consumers[token] = append(consumers[token], eventQueue) - } else { - logger, ok := loggers[token] - if !ok { - eventLogWriter, err := logging.NewWriter(logging.Config{ - LoggerName: "event-" + token, - ServerName: appconfig.Instance.ServerName, - FileDir: logEventPath, - RotationMin: viper.GetInt64("log.rotation_min")}) - if err != nil { - appconfig.Instance.Close() - logging.Fatal(err) - } - logger = events.NewAsyncLogger(eventLogWriter, viper.GetBool("log.show_in_server")) - loggers[token] = logger - appconfig.Instance.ScheduleClosing(logger) - } - consumers[token] = append(consumers[token], logger) - } - - storageProxies[token] = append(storageProxies[token], storageProxy) + var storageProxy events.StorageProxy + switch destination.Type { + case "redshift": + storageProxy = newProxy(createRedshift, storageConfig) + case "bigquery": + storageProxy = newProxy(createBigQuery, storageConfig) + case "postgres": + storageProxy = newProxy(createPostgres, storageConfig) + case "clickhouse": + storageProxy = newProxy(createClickHouse, storageConfig) + case "s3": + storageProxy = newProxy(createS3, storageConfig) + case "snowflake": + storageProxy = newProxy(createSnowflake, storageConfig) + default: + if eventQueue != nil { + eventQueue.Close() } - + return nil, nil, unknownDestination } - return storageProxies, consumers + + return storageProxy, eventQueue, nil } func logError(destinationName, destinationType string, err error) { @@ -283,7 +223,7 @@ func createS3(config *Config) (events.Storage, error) { if config.eventQueue != nil { config.eventQueue.Close() } - return nil, fmt.Errorf("S3 destination doesn't support %s mode", streamMode) + return nil, fmt.Errorf("S3 destination doesn't support %s mode", StreamMode) } s3Config := config.destination.S3 if err := s3Config.Validate(); err != nil { diff --git a/test_data/s2s_event_input.json b/test_data/api_event_input.json similarity index 100% rename from test_data/s2s_event_input.json rename to test_data/api_event_input.json diff --git a/test_data/s2s_fact_output.json b/test_data/api_fact_output.json similarity index 96% rename from test_data/s2s_fact_output.json rename to test_data/api_fact_output.json index de4632322..afe0bc8f1 100644 --- a/test_data/s2s_fact_output.json +++ b/test_data/api_fact_output.json @@ -19,6 +19,6 @@ "referer": "", "url": "http://track-demo.ksense" }, - "src": "s2s", + "src": "api", "source_ip":"95.82.232.185" } \ No newline at end of file From a85ed52ac95b07d232a247ba1d32e4a9b927219e Mon Sep 17 00:00:00 2001 From: Sergey Burykin Date: Fri, 2 Oct 2020 17:15:08 +0300 Subject: [PATCH 2/8] fixed tokens as string --- authorization/parser.go | 39 +++++++++++++++++++++++++++++------- authorization/parser_test.go | 34 +++++++++++++++++++++++++++++++ authorization/service.go | 4 ++-- 3 files changed, 68 insertions(+), 9 deletions(-) diff --git a/authorization/parser.go b/authorization/parser.go index aea6956ce..5fb57a3c1 100644 --- a/authorization/parser.go +++ b/authorization/parser.go @@ -4,6 +4,8 @@ import ( "encoding/json" "errors" "fmt" + "github.com/google/martian/log" + "github.com/spf13/viper" "reflect" "strings" ) @@ -36,16 +38,39 @@ func parseFromBytes(b []byte) (map[string][]string, map[string][]string, error) return jsTokens, apiTokens, nil } -func reformat(tokensArr []string) map[string][]string { - tokensOrigins := map[string][]string{} - for _, t := range tokensArr { - trimmed := strings.TrimSpace(t) - if trimmed != "" { - tokensOrigins[trimmed] = []string{} +//parse from viper string slice or from viper json string value +func parseFromConfig(viperConfig *viper.Viper, key string) (tokensOrigins map[string][]string) { + tokensOrigins = map[string][]string{} + + tokensStrArr := viperConfig.GetStringSlice(key) + if len(tokensStrArr) > 0 { + for _, t := range tokensStrArr { + trimmed := strings.TrimSpace(t) + if trimmed != "" { + tokensOrigins[trimmed] = []string{} + } } + return + } + + jsonStr := viperConfig.GetString(key) + if jsonStr == "" { + return + } + + var tokensArr []interface{} + var err error + if err = json.Unmarshal([]byte(jsonStr), &tokensArr); err != nil { + log.Errorf("Error parsing [%s] tokens from config: %v", key, err) + return + } + + tokensOrigins, err = reformatObj(tokensArr) + if err != nil { + log.Errorf("Error parsing [%s] tokens from config: %v", key, err) } - return tokensOrigins + return } func reformatObj(tokensArr []interface{}) (map[string][]string, error) { diff --git a/authorization/parser_test.go b/authorization/parser_test.go index c6dfde0d4..32faa1d64 100644 --- a/authorization/parser_test.go +++ b/authorization/parser_test.go @@ -2,6 +2,7 @@ package authorization import ( "github.com/ksensehq/eventnative/test" + "github.com/spf13/viper" "github.com/stretchr/testify/require" "testing" ) @@ -78,3 +79,36 @@ func TestParseFromBytes(t *testing.T) { }) } } + +func TestParseFromConfig(t *testing.T) { + vp := viper.New() + tests := []struct { + name string + input interface{} + expected map[string][]string + }{ + { + "config doesn't have key", + nil, + map[string][]string{}, + }, + { + "value is string slice", + []string{"token1", "token2"}, + map[string][]string{"token1": {}, "token2": {}}, + }, + { + "value is string", + "token3", + map[string][]string{"token3": {}}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + vp.Set("key", tt.input) + actual := parseFromConfig(vp, "key") + test.ObjectsEqual(t, tt.expected, actual, "Parsed tokens aren't equal with expected") + }) + } +} diff --git a/authorization/service.go b/authorization/service.go index 29074e37e..67e6eca9c 100644 --- a/authorization/service.go +++ b/authorization/service.go @@ -29,8 +29,8 @@ func NewService() (*Service, error) { authViper := viper.Sub("server.auth") if authViper != nil { - service.jsTokens = reformat(authViper.GetStringSlice("js")) - service.apiTokens = reformat(authViper.GetStringSlice("api")) + service.jsTokens = parseFromConfig(authViper, "js") + service.apiTokens = parseFromConfig(authViper, "api") } else { authSource := viper.GetString("server.auth") From 51944c486c246fb881fdd5c815bb96db2f9750b4 Mon Sep 17 00:00:00 2001 From: Sergey Burykin Date: Tue, 6 Oct 2020 20:06:18 +0300 Subject: [PATCH 3/8] refactored token structure --- authorization/parser.go | 149 ++++++++++++++--------------------- authorization/parser_test.go | 127 +++++++++++++++++------------ authorization/service.go | 123 ++++++++++++++++++++--------- destinations/collections.go | 28 +++---- destinations/service.go | 91 ++++++++++----------- destinations/service_test.go | 41 +++++----- destinations/unit.go | 4 +- handlers/event.go | 5 +- logfiles/uploader.go | 6 +- main.go | 4 +- 10 files changed, 317 insertions(+), 261 deletions(-) diff --git a/authorization/parser.go b/authorization/parser.go index 5fb57a3c1..05c1fa206 100644 --- a/authorization/parser.go +++ b/authorization/parser.go @@ -2,116 +2,89 @@ package authorization import ( "encoding/json" - "errors" "fmt" - "github.com/google/martian/log" - "github.com/spf13/viper" - "reflect" + "github.com/ksensehq/eventnative/resources" "strings" ) -type TokensPayload struct { - Js []interface{} `json:"js,omitempty"` - Api []interface{} `json:"api,omitempty"` +type Token struct { + Id string `mapstructure:"id" json:"id,omitempty"` + ClientSecret string `mapstructure:"client_secret" json:"client_secret,omitempty"` + ServerSecret string `mapstructure:"server_secret" json:"server_secret,omitempty"` + Origins []string `mapstructure:"origins" json:"origins,omitempty"` } -//parse tokens from formats: -//{"js": value, "api": value} where value might be strings array or json objects array with object format: -//{"token":"123", "origins":["origin1", "origin2"]} -func parseFromBytes(b []byte) (map[string][]string, map[string][]string, error) { - payload := &TokensPayload{} - err := json.Unmarshal(b, payload) - if err != nil { - return nil, nil, fmt.Errorf("Error unmarshalling tokens. Payload must be json with 'js' and 'api' keys of json array or string array formats: %v", err) - } - - jsTokens, err := reformatObj(payload.Js) - if err != nil { - return nil, nil, err - } +type TokensPayload struct { + Tokens []Token `json:"tokens,omitempty"` +} - apiTokens, err := reformatObj(payload.Api) - if err != nil { - return nil, nil, err - } +type TokensHolder struct { + //origins by client token + clientTokensOrigins map[string][]string + //origins by server token + serverTokensOrigins map[string][]string - return jsTokens, apiTokens, nil + //all token ids + ids []string + //token by: client_secret/server_secret/id + all map[string]Token } -//parse from viper string slice or from viper json string value -func parseFromConfig(viperConfig *viper.Viper, key string) (tokensOrigins map[string][]string) { - tokensOrigins = map[string][]string{} - - tokensStrArr := viperConfig.GetStringSlice(key) - if len(tokensStrArr) > 0 { - for _, t := range tokensStrArr { - trimmed := strings.TrimSpace(t) - if trimmed != "" { - tokensOrigins[trimmed] = []string{} - } - } - return - } +func (th *TokensHolder) IsEmpty() bool { + return th == nil || len(th.ids) == 0 +} - jsonStr := viperConfig.GetString(key) - if jsonStr == "" { - return +//parse tokens from json bytes +func parseFromBytes(b []byte) (*TokensHolder, error) { + payload := &TokensPayload{} + err := json.Unmarshal(b, payload) + if err != nil { + return nil, fmt.Errorf("Error unmarshalling tokens. Payload must be json with 'tokens' key: %v", err) } - var tokensArr []interface{} - var err error - if err = json.Unmarshal([]byte(jsonStr), &tokensArr); err != nil { - log.Errorf("Error parsing [%s] tokens from config: %v", key, err) - return - } + return reformat(payload.Tokens), nil +} - tokensOrigins, err = reformatObj(tokensArr) - if err != nil { - log.Errorf("Error parsing [%s] tokens from config: %v", key, err) +func fromStrings(clientSecrets []string) *TokensHolder { + var tokens []Token + for _, clientSecret := range clientSecrets { + tokens = append(tokens, Token{ClientSecret: clientSecret}) } - - return + return reformat(tokens) } -func reformatObj(tokensArr []interface{}) (map[string][]string, error) { - tokensOrigins := map[string][]string{} - for _, t := range tokensArr { - switch t.(type) { - case string: - token := t.(string) - trimmed := strings.TrimSpace(token) - if trimmed != "" { - tokensOrigins[trimmed] = []string{} - } - case map[string]interface{}: - tokenObj := t.(map[string]interface{}) - token, ok := tokenObj["token"] - if !ok { - return nil, errors.New("Unknown authorization token format: each object must contain token field") - } +func reformat(tokens []Token) *TokensHolder { + clientTokensOrigins := map[string][]string{} + serverTokensOrigins := map[string][]string{} + all := map[string]Token{} + var ids []string - var origins []string - trimmed := strings.TrimSpace(token.(string)) - if trimmed != "" { - originsObj, ok := tokenObj["origins"] - if ok { - originsArr, ok := originsObj.([]interface{}) - if !ok { - return nil, errors.New("Unknown authorization origins format: origins must be array of strings") - } + for _, tokenObj := range tokens { + if tokenObj.Id == "" { + //hash from client,server secret will be id + tokenObj.Id = resources.GetHash([]byte(tokenObj.ClientSecret + tokenObj.ServerSecret)) + } - for _, originI := range originsArr { - origins = append(origins, originI.(string)) - } - } + all[tokenObj.Id] = tokenObj + ids = append(ids, tokenObj.Id) - tokensOrigins[trimmed] = origins - } - default: - return nil, errors.New("Unknown authorization token format type: " + reflect.TypeOf(t).Name()) + trimmedClientToken := strings.TrimSpace(tokenObj.ClientSecret) + if trimmedClientToken != "" { + clientTokensOrigins[trimmedClientToken] = tokenObj.Origins + all[trimmedClientToken] = tokenObj } + trimmedServerToken := strings.TrimSpace(tokenObj.ServerSecret) + if trimmedServerToken != "" { + serverTokensOrigins[trimmedServerToken] = tokenObj.Origins + all[trimmedServerToken] = tokenObj + } } - return tokensOrigins, nil + return &TokensHolder{ + clientTokensOrigins: clientTokensOrigins, + serverTokensOrigins: serverTokensOrigins, + ids: ids, + all: all, + } } diff --git a/authorization/parser_test.go b/authorization/parser_test.go index 32faa1d64..0face1cdb 100644 --- a/authorization/parser_test.go +++ b/authorization/parser_test.go @@ -2,7 +2,6 @@ package authorization import ( "github.com/ksensehq/eventnative/test" - "github.com/spf13/viper" "github.com/stretchr/testify/require" "testing" ) @@ -11,104 +10,130 @@ func TestParseFromBytes(t *testing.T) { tests := []struct { name string input []byte - expectedJs map[string][]string - expectedApi map[string][]string + expected *TokensHolder expectedErr string }{ { "Empty input", []byte{}, nil, + "Error unmarshalling tokens. Payload must be json with 'tokens' key: unexpected end of JSON input", + }, + { + "Wrong json keys format", + []byte(`{"tokens":{}}}`), nil, - "Error unmarshalling tokens. Payload must be json with 'js' and 'api' keys of json array or string array formats: unexpected end of JSON input", + "Error unmarshalling tokens. Payload must be json with 'tokens' key: invalid character '}' after top-level value", }, { "Empty json input", []byte(`{}`), - map[string][]string{}, - map[string][]string{}, - "", - }, - { - "Empty json keys input", - []byte(`{"js":[], "api":[]}`), - map[string][]string{}, - map[string][]string{}, + &TokensHolder{clientTokensOrigins: map[string][]string{}, serverTokensOrigins: map[string][]string{}, all: map[string]Token{}}, "", }, { "Wrong keys json input", - []byte(`{"jsss":[], apii: []}`), - nil, - nil, - "Error unmarshalling tokens. Payload must be json with 'js' and 'api' keys of json array or string array formats: invalid character 'a' looking for beginning of object key string", - }, - { - "Wrong json keys format", - []byte(`{"js":{}, "api":{}}`), - nil, - nil, - "Error unmarshalling tokens. Payload must be json with 'js' and 'api' keys of json array or string array formats: json: cannot unmarshal object into Go struct field TokensPayload.js of type []interface {}", + []byte(`{"jsss":[], "apii": []}`), + &TokensHolder{clientTokensOrigins: map[string][]string{}, serverTokensOrigins: map[string][]string{}, all: map[string]Token{}}, + "", }, + { - "js strings and api objects", - []byte(`{"js":["js1", "js2"], "api":[{"token":"api1", "origins":["origin1"]}]}`), - map[string][]string{"js1": {}, "js2": {}}, - map[string][]string{"api1": {"origin1"}}, + "Empty json tokens input", + []byte(`{"tokens":[]}`), + &TokensHolder{clientTokensOrigins: map[string][]string{}, serverTokensOrigins: map[string][]string{}, all: map[string]Token{}}, "", }, { - "js objects and api strings", - []byte(`{"api":["api1", "api2"], "js":[{"token":"js1", "origins":["origin1"]}]}`), - map[string][]string{"js1": {"origin1"}}, - map[string][]string{"api1": {}, "api2": {}}, + "ok", + []byte(`{"tokens":[{"id":"id1","client_secret":"cl_secret1","server_secret":"sr_secret1","origins":["abc.com","rr.ru"]},{"client_secret":"cl_secret2"},{"server_secret":"sr_secret3"}]}`), + buildExpected(), "", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - actualJs, actualApi, err := parseFromBytes(tt.input) + actualTokenHolder, err := parseFromBytes(tt.input) if tt.expectedErr != "" { require.EqualError(t, err, tt.expectedErr, "Errors aren't equal") } else { require.NoError(t, err) - test.ObjectsEqual(t, tt.expectedJs, actualJs, "Js tokens and expected tokens aren't equal") - test.ObjectsEqual(t, tt.expectedApi, actualApi, "Api tokens and expected tokens aren't equal") + test.ObjectsEqual(t, tt.expected.ids, actualTokenHolder.ids, "Token holder ids aren't equal") + test.ObjectsEqual(t, tt.expected.all, actualTokenHolder.all, "Token holders all aren't equal") + test.ObjectsEqual(t, tt.expected.clientTokensOrigins, actualTokenHolder.clientTokensOrigins, "Token holders client tokens aren't equal") + test.ObjectsEqual(t, tt.expected.serverTokensOrigins, actualTokenHolder.serverTokensOrigins, "Token holders server tokens aren't equal") } }) } } -func TestParseFromConfig(t *testing.T) { - vp := viper.New() +func buildExpected() *TokensHolder { + token1 := Token{ + Id: "id1", + ClientSecret: "cl_secret1", + ServerSecret: "sr_secret1", + Origins: []string{"abc.com", "rr.ru"}, + } + token2 := Token{ + Id: "31429624a471b9bdc6d60350c6cfc24d", + ClientSecret: "cl_secret2", + } + token3 := Token{ + Id: "03f9ed11a0268dd78766686f8f292b7b", + ServerSecret: "sr_secret3", + } + return &TokensHolder{ + clientTokensOrigins: map[string][]string{"cl_secret1": {"abc.com", "rr.ru"}, "cl_secret2": nil}, + serverTokensOrigins: map[string][]string{"sr_secret1": {"abc.com", "rr.ru"}, "sr_secret3": nil}, + ids: []string{"id1", "31429624a471b9bdc6d60350c6cfc24d", "03f9ed11a0268dd78766686f8f292b7b"}, + all: map[string]Token{ + "id1": token1, + "cl_secret1": token1, + "sr_secret1": token1, + + "31429624a471b9bdc6d60350c6cfc24d": token2, + "cl_secret2": token2, + + "03f9ed11a0268dd78766686f8f292b7b": token3, + "sr_secret3": token3, + }, + } +} + +func TestFromStrings(t *testing.T) { tests := []struct { name string - input interface{} - expected map[string][]string + input []string + expected *TokensHolder }{ { - "config doesn't have key", - nil, - map[string][]string{}, + "empty tokens", + []string{}, + &TokensHolder{clientTokensOrigins: map[string][]string{}, serverTokensOrigins: map[string][]string{}, all: map[string]Token{}}, }, { "value is string slice", []string{"token1", "token2"}, - map[string][]string{"token1": {}, "token2": {}}, - }, - { - "value is string", - "token3", - map[string][]string{"token3": {}}, + &TokensHolder{ + clientTokensOrigins: map[string][]string{"token1": nil, "token2": nil}, + serverTokensOrigins: map[string][]string{}, + all: map[string]Token{ + "78b1e6d775cec5260001af137a79dbd5": {Id: "78b1e6d775cec5260001af137a79dbd5", ClientSecret: "token1"}, + "token1": {Id: "78b1e6d775cec5260001af137a79dbd5", ClientSecret: "token1"}, + + "0e0530c1430da76495955eb06eb99d95": {Id: "0e0530c1430da76495955eb06eb99d95", ClientSecret: "token2"}, + "token2": {Id: "0e0530c1430da76495955eb06eb99d95", ClientSecret: "token2"}, + }, + ids: []string{"78b1e6d775cec5260001af137a79dbd5", "0e0530c1430da76495955eb06eb99d95"}, + }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - vp.Set("key", tt.input) - actual := parseFromConfig(vp, "key") - test.ObjectsEqual(t, tt.expected, actual, "Parsed tokens aren't equal with expected") + actual := fromStrings(tt.input) + test.ObjectsEqual(t, tt.expected, actual, "Token holders aren't equal") }) } } diff --git a/authorization/service.go b/authorization/service.go index 67e6eca9c..a40d8baad 100644 --- a/authorization/service.go +++ b/authorization/service.go @@ -11,87 +11,138 @@ import ( "time" ) -const serviceName = "authorization" +const ( + serviceName = "authorization" + viperAuthKey = "server.auth" + + defaultTokenId = "defaultid" +) type Service struct { sync.RWMutex - jsTokens map[string][]string - apiTokens map[string][]string + + tokensHolder *TokensHolder } func NewService() (*Service, error) { - service := &Service{jsTokens: map[string][]string{}, apiTokens: map[string][]string{}} + service := &Service{} reloadSec := viper.GetInt("server.auth_reload_sec") if reloadSec == 0 { return nil, errors.New("server.auth_reload_sec can't be empty") } - authViper := viper.Sub("server.auth") - if authViper != nil { - service.jsTokens = parseFromConfig(authViper, "js") - service.apiTokens = parseFromConfig(authViper, "api") + var tokens []Token + err := viper.UnmarshalKey(viperAuthKey, &tokens) + if err == nil { + service.tokensHolder = reformat(tokens) } else { - authSource := viper.GetString("server.auth") - - if strings.HasPrefix(authSource, "http://") || strings.HasPrefix(authSource, "https://") { - resources.Watch(serviceName, authSource, resources.LoadFromHttp, service.updateTokens, time.Duration(reloadSec)*time.Second) - } else if strings.Contains(authSource, "file://") { - resources.Watch(serviceName, strings.Replace(authSource, "file://", "", 1), resources.LoadFromFile, service.updateTokens, time.Duration(reloadSec)*time.Second) - } else if strings.HasPrefix(authSource, "{") && strings.HasSuffix(authSource, "}") { - service.updateTokens([]byte(authSource)) + auth := viper.GetStringSlice(viperAuthKey) + + if len(auth) == 1 { + authSource := auth[0] + if strings.HasPrefix(authSource, "http://") || strings.HasPrefix(authSource, "https://") { + resources.Watch(serviceName, authSource, resources.LoadFromHttp, service.updateTokens, time.Duration(reloadSec)*time.Second) + } else if strings.HasPrefix(authSource, "file://") { + resources.Watch(serviceName, strings.Replace(authSource, "file://", "", 1), resources.LoadFromFile, service.updateTokens, time.Duration(reloadSec)*time.Second) + } else if strings.HasPrefix(authSource, "{") && strings.HasSuffix(authSource, "}") { + tokensHolder, err := parseFromBytes([]byte(authSource)) + if err != nil { + return nil, err + } + service.tokensHolder = tokensHolder + } else { + //plain token + service.tokensHolder = fromStrings(auth) + } + } else { + //array of tokens + service.tokensHolder = fromStrings(auth) } + } - if len(service.jsTokens) == 0 && len(service.apiTokens) == 0 { + if service.tokensHolder.IsEmpty() { //autogenerated - generatedToken := uuid.New().String() - service.jsTokens[generatedToken] = []string{} - service.apiTokens[generatedToken] = []string{} - logging.Warn("Empty 'server.auth.js' and 'server.auth.api' config keys. Auto generate token:", generatedToken) + generatedTokenSecret := uuid.New().String() + generatedToken := Token{ + Id: defaultTokenId, + ClientSecret: generatedTokenSecret, + ServerSecret: generatedTokenSecret, + Origins: []string{}, + } + + service.tokensHolder = reformat([]Token{generatedToken}) + logging.Warn("Empty 'server.auth' config keys. Auto generate token:", generatedTokenSecret) } return service, nil } -func (s *Service) GetJsOrigins(token string) ([]string, bool) { +//GetClientOrigins return origins by client_secret +func (s *Service) GetClientOrigins(clientSecret string) ([]string, bool) { + s.RLock() + defer s.RUnlock() + + origins, ok := s.tokensHolder.clientTokensOrigins[clientSecret] + return origins, ok +} + +//GetServerOrigins return origins by server_secret +func (s *Service) GetServerOrigins(serverSecret string) ([]string, bool) { s.RLock() defer s.RUnlock() - origins, ok := s.jsTokens[token] + origins, ok := s.tokensHolder.serverTokensOrigins[serverSecret] return origins, ok } -func (s *Service) GetAllTokens() map[string]bool { +//GetAllTokenIds return all token ids +func (s *Service) GetAllTokenIds() []string { + s.RLock() + defer s.RUnlock() + + return s.tokensHolder.ids +} + +//GetAllIdsByToken return token ids by token identity(client_secret/server_secret/token id) +func (s *Service) GetAllIdsByToken(tokenIdentity []string) (ids []string) { s.RLock() defer s.RUnlock() - result := map[string]bool{} - for k := range s.jsTokens { - result[k] = true + deduplication := map[string]bool{} + for _, tokenFilter := range tokenIdentity { + tokenObj, _ := s.tokensHolder.all[tokenFilter] + deduplication[tokenObj.Id] = true } - for k := range s.apiTokens { - result[k] = true + + for id := range deduplication { + ids = append(ids, id) } - return result + return } -func (s *Service) GetApiOrigins(token string) ([]string, bool) { +//GetTokenId return token id by client_secret/server_secret/token id +//return "" if token wasn't found +func (s *Service) GetTokenId(tokenFilter string) string { s.RLock() defer s.RUnlock() - origins, ok := s.apiTokens[token] - return origins, ok + token, ok := s.tokensHolder.all[tokenFilter] + if ok { + return token.Id + } + return "" } +//parse and set tokensHolder with lock func (s *Service) updateTokens(payload []byte) { - js, api, err := parseFromBytes(payload) + tokenHolder, err := parseFromBytes(payload) if err != nil { logging.Errorf("Error updating authorization tokens: %v", err) } else { s.Lock() - s.jsTokens = js - s.apiTokens = api + s.tokensHolder = tokenHolder s.Unlock() } } diff --git a/destinations/collections.go b/destinations/collections.go index dcf6ddfae..58b1fbcd8 100644 --- a/destinations/collections.go +++ b/destinations/collections.go @@ -2,44 +2,44 @@ package destinations import "github.com/ksensehq/eventnative/events" -//map["token"]map["destination_name"]interface -//because 1 token = ∞ storages +//map["tokenId"]map["destination_name"]interface +//because 1 token id = ∞ storages type TokenizedStorages map[string]map[string]events.StorageProxy -//map["token"]map["token | destination_name"]interface -//because 1 token = 1logger but ∞ event.queue +//map["tokenId"]map["tokenId | destination_name"]interface +//because 1 token id = 1logger but ∞ event.queue type TokenizedConsumers map[string]map[string]events.Consumer -func (ts TokenizedStorages) Add(token, name string, proxy events.StorageProxy) { - storageProxies, ok := ts[token] +func (ts TokenizedStorages) Add(tokenId, name string, proxy events.StorageProxy) { + storageProxies, ok := ts[tokenId] if !ok { storageProxies = map[string]events.StorageProxy{} - ts[token] = storageProxies + ts[tokenId] = storageProxies } storageProxies[name] = proxy } func (ts TokenizedStorages) AddAll(other TokenizedStorages) { - for token, storages := range other { + for tokenId, storages := range other { for name, storage := range storages { - ts.Add(token, name, storage) + ts.Add(tokenId, name, storage) } } } -func (tc TokenizedConsumers) Add(token, name string, proxy events.Consumer) { - consumers, ok := tc[token] +func (tc TokenizedConsumers) Add(tokenId, name string, proxy events.Consumer) { + consumers, ok := tc[tokenId] if !ok { consumers = map[string]events.Consumer{} - tc[token] = consumers + tc[tokenId] = consumers } consumers[name] = proxy } func (tc TokenizedConsumers) AddAll(other TokenizedConsumers) { - for token, consumers := range other { + for tokenId, consumers := range other { for name, consumer := range consumers { - tc.Add(token, name, consumer) + tc.Add(tokenId, name, consumer) } } } diff --git a/destinations/service.go b/destinations/service.go index 13603a569..ca8f00ed7 100644 --- a/destinations/service.go +++ b/destinations/service.go @@ -37,19 +37,19 @@ type Service struct { logEventPath string monitorKeeper storages.MonitorKeeper - unitsByName map[string]*Unit - loggersUsageByToken map[string]*LoggerUsage + unitsByName map[string]*Unit + loggersUsageByTokenId map[string]*LoggerUsage sync.RWMutex - consumersByToken TokenizedConsumers - storagesByToken TokenizedStorages + consumersByTokenId TokenizedConsumers + storagesByTokenId TokenizedStorages } //only for tests -func NewTestService(consumersByToken TokenizedConsumers, storagesByToken TokenizedStorages) *Service { +func NewTestService(consumersByTokenId TokenizedConsumers, storagesByTokenId TokenizedStorages) *Service { return &Service{ - consumersByToken: consumersByToken, - storagesByToken: storagesByToken, + consumersByTokenId: consumersByTokenId, + storagesByTokenId: storagesByTokenId, } } @@ -62,11 +62,11 @@ func NewService(ctx context.Context, destinations *viper.Viper, destinationsSour logEventPath: logEventPath, monitorKeeper: monitorKeeper, - unitsByName: map[string]*Unit{}, - loggersUsageByToken: map[string]*LoggerUsage{}, + unitsByName: map[string]*Unit{}, + loggersUsageByTokenId: map[string]*LoggerUsage{}, - consumersByToken: map[string]map[string]events.Consumer{}, - storagesByToken: map[string]map[string]events.StorageProxy{}, + consumersByTokenId: map[string]map[string]events.Consumer{}, + storagesByTokenId: map[string]map[string]events.StorageProxy{}, } reloadSec := viper.GetInt("server.destinations_reload_sec") @@ -102,19 +102,19 @@ func NewService(ctx context.Context, destinations *viper.Viper, destinationsSour return service, nil } -func (ds *Service) GetConsumers(token string) (consumers []events.Consumer) { +func (ds *Service) GetConsumers(tokenId string) (consumers []events.Consumer) { ds.RLock() defer ds.RUnlock() - for _, c := range ds.consumersByToken[token] { + for _, c := range ds.consumersByTokenId[tokenId] { consumers = append(consumers, c) } return } -func (ds *Service) GetStorages(token string) (storages []events.StorageProxy) { +func (ds *Service) GetStorages(tokenId string) (storages []events.StorageProxy) { ds.RLock() defer ds.RUnlock() - for _, s := range ds.storagesByToken[token] { + for _, s := range ds.storagesByTokenId[tokenId] { storages = append(storages, s) } return @@ -169,7 +169,7 @@ func (s *Service) init(dc map[string]storages.DestinationConfig) { //destination wasn't changed continue } - //remove old + //remove old (for recreation) s.Lock() s.remove(name, unit) s.Unlock() @@ -182,34 +182,37 @@ func (s *Service) init(dc map[string]storages.DestinationConfig) { continue } - tokens := destination.OnlyTokens - if len(tokens) == 0 { + var tokenIds []string + //map token -> id + if len(destination.OnlyTokens) > 0 { + tokenIds = appconfig.Instance.AuthorizationService.GetAllIdsByToken(destination.OnlyTokens) + } else { logging.Warnf("[%s] only_tokens wasn't provided. All tokens will be stored.", name) - for token := range appconfig.Instance.AuthorizationService.GetAllTokens() { - tokens = append(tokens, token) - } + tokenIds = appconfig.Instance.AuthorizationService.GetAllTokenIds() } s.unitsByName[name] = &Unit{ eventQueue: eventQueue, storage: newStorageProxy, - tokens: tokens, + tokenIds: tokenIds, hash: hash, } + //create: + // 1 logger per token id + // 1 queue per destination id //append: - //storage per token - //consumer(event queue or logger) per token - for _, token := range tokens { + // storage per token id + // consumers per client_secret and server_secret + for _, tokenId := range tokenIds { if destination.Mode == storages.StreamMode { - //2 destinations with 2 queues can be under 1 token - newConsumers.Add(token, name, eventQueue) + newConsumers.Add(tokenId, name, eventQueue) } else { //get or create new logger - loggerUsage, ok := s.loggersUsageByToken[token] + loggerUsage, ok := s.loggersUsageByTokenId[tokenId] if !ok { eventLogWriter, err := logging.NewWriter(logging.Config{ - LoggerName: "event-" + token, + LoggerName: "event-" + tokenId, ServerName: appconfig.Instance.ServerName, FileDir: s.logEventPath, RotationMin: viper.GetInt64("log.rotation_min")}) @@ -218,24 +221,24 @@ func (s *Service) init(dc map[string]storages.DestinationConfig) { } else { logger := events.NewAsyncLogger(eventLogWriter, viper.GetBool("log.show_in_server")) loggerUsage = &LoggerUsage{logger: logger, usage: 0} - s.loggersUsageByToken[token] = loggerUsage + s.loggersUsageByTokenId[tokenId] = loggerUsage } } if loggerUsage != nil { loggerUsage.usage += 1 - //2 destinations with only 1 logger can be under 1 token - newConsumers.Add(token, token, loggerUsage.logger) + //2 destinations with only 1 logger can be under 1 tokenId + newConsumers.Add(tokenId, tokenId, loggerUsage.logger) } } - newStorages.Add(token, name, newStorageProxy) + newStorages.Add(tokenId, name, newStorageProxy) } } s.Lock() - s.consumersByToken.AddAll(newConsumers) - s.storagesByToken.AddAll(newStorages) + s.consumersByTokenId.AddAll(newConsumers) + s.storagesByTokenId.AddAll(newStorages) s.Unlock() StatusInstance.Reloading = false @@ -244,30 +247,30 @@ func (s *Service) init(dc map[string]storages.DestinationConfig) { //remove destination from all collections and close it func (s *Service) remove(name string, unit *Unit) { //remove from all collections: queue or logger(if needed) + storage - for _, token := range unit.tokens { - oldConsumers := s.consumersByToken[token] + for _, tokenId := range unit.tokenIds { + oldConsumers := s.consumersByTokenId[tokenId] if unit.eventQueue != nil { delete(oldConsumers, name) } else { //logger - loggerUsage := s.loggersUsageByToken[token] + loggerUsage := s.loggersUsageByTokenId[tokenId] loggerUsage.usage -= 1 if loggerUsage.usage == 0 { - delete(oldConsumers, token) - delete(s.loggersUsageByToken, token) + delete(oldConsumers, tokenId) + delete(s.loggersUsageByTokenId, tokenId) loggerUsage.logger.Close() } } if len(oldConsumers) == 0 { - delete(s.consumersByToken, token) + delete(s.consumersByTokenId, tokenId) } //storage - oldStorages := s.storagesByToken[token] + oldStorages := s.storagesByTokenId[tokenId] delete(oldStorages, name) if len(oldStorages) == 0 { - delete(s.storagesByToken, token) + delete(s.storagesByTokenId, tokenId) } } @@ -279,7 +282,7 @@ func (s *Service) remove(name string, unit *Unit) { } func (s *Service) Close() (multiErr error) { - for token, loggerUsage := range s.loggersUsageByToken { + for token, loggerUsage := range s.loggersUsageByTokenId { if err := loggerUsage.logger.Close(); err != nil { multiErr = multierror.Append(multiErr, fmt.Errorf("Error closing logger for token [%s]: %v", token, err)) } diff --git a/destinations/service_test.go b/destinations/service_test.go index db9461f0a..9f52590c7 100644 --- a/destinations/service_test.go +++ b/destinations/service_test.go @@ -35,6 +35,7 @@ func (tpm *testProxyMock) Close() error { // 5. remove all again func TestServiceInit(t *testing.T) { viper.Set("server.destinations_reload_sec", 1) + viper.Set("server.auth", []string{"token1", "token2", "token3", "token4"}) appconfig.Init() initialDestinations := `{ @@ -136,39 +137,39 @@ func TestServiceInit(t *testing.T) { } func initialConfigAsserts(t *testing.T, service *Service) { - require.Equal(t, 3, len(service.storagesByToken)) - require.Equal(t, 3, len(service.consumersByToken)) + require.Equal(t, 3, len(service.storagesByTokenId)) + require.Equal(t, 3, len(service.consumersByTokenId)) - require.Equal(t, 2, len(service.GetStorages("token1"))) - require.Equal(t, 1, len(service.GetConsumers("token1"))) + require.Equal(t, 2, len(service.GetStorages(appconfig.Instance.AuthorizationService.GetTokenId("token1")))) + require.Equal(t, 1, len(service.GetConsumers(appconfig.Instance.AuthorizationService.GetTokenId("token1")))) - require.Equal(t, 1, len(service.GetStorages("token2"))) - require.Equal(t, 1, len(service.GetConsumers("token2"))) + require.Equal(t, 1, len(service.GetStorages(appconfig.Instance.AuthorizationService.GetTokenId("token2")))) + require.Equal(t, 1, len(service.GetConsumers(appconfig.Instance.AuthorizationService.GetTokenId("token2")))) - require.Equal(t, 2, len(service.GetStorages("token3"))) - require.Equal(t, 2, len(service.GetConsumers("token3"))) + require.Equal(t, 2, len(service.GetStorages(appconfig.Instance.AuthorizationService.GetTokenId("token3")))) + require.Equal(t, 2, len(service.GetConsumers(appconfig.Instance.AuthorizationService.GetTokenId("token3")))) } func changedConfigAsserts(t *testing.T, service *Service) { - require.Equal(t, 3, len(service.storagesByToken)) - require.Equal(t, 3, len(service.consumersByToken)) + require.Equal(t, 3, len(service.storagesByTokenId)) + require.Equal(t, 3, len(service.consumersByTokenId)) - require.Equal(t, 1, len(service.GetStorages("token1"))) - require.Equal(t, 1, len(service.GetConsumers("token1"))) + require.Equal(t, 1, len(service.GetStorages(appconfig.Instance.AuthorizationService.GetTokenId("token1")))) + require.Equal(t, 1, len(service.GetConsumers(appconfig.Instance.AuthorizationService.GetTokenId("token1")))) - require.Equal(t, 0, len(service.GetStorages("token2"))) - require.Equal(t, 0, len(service.GetConsumers("token2"))) + require.Equal(t, 0, len(service.GetStorages(appconfig.Instance.AuthorizationService.GetTokenId("token2")))) + require.Equal(t, 0, len(service.GetConsumers(appconfig.Instance.AuthorizationService.GetTokenId("token2")))) - require.Equal(t, 4, len(service.GetStorages("token3"))) - require.Equal(t, 3, len(service.GetConsumers("token3"))) + require.Equal(t, 4, len(service.GetStorages(appconfig.Instance.AuthorizationService.GetTokenId("token3")))) + require.Equal(t, 3, len(service.GetConsumers(appconfig.Instance.AuthorizationService.GetTokenId("token3")))) - require.Equal(t, 2, len(service.GetStorages("token4"))) - require.Equal(t, 2, len(service.GetConsumers("token4"))) + require.Equal(t, 2, len(service.GetStorages(appconfig.Instance.AuthorizationService.GetTokenId("token4")))) + require.Equal(t, 2, len(service.GetConsumers(appconfig.Instance.AuthorizationService.GetTokenId("token4")))) } func emptyConfigAsserts(t *testing.T, service *Service) { - require.Equal(t, 0, len(service.storagesByToken)) - require.Equal(t, 0, len(service.consumersByToken)) + require.Equal(t, 0, len(service.storagesByTokenId)) + require.Equal(t, 0, len(service.consumersByTokenId)) require.Equal(t, 0, len(service.GetStorages("token1"))) require.Equal(t, 0, len(service.GetConsumers("token1"))) diff --git a/destinations/unit.go b/destinations/unit.go index d5a91d31e..5d09d9c7a 100644 --- a/destinations/unit.go +++ b/destinations/unit.go @@ -11,8 +11,8 @@ type Unit struct { eventQueue *events.PersistentQueue storage events.StorageProxy - tokens []string - hash string + tokenIds []string + hash string } //Close eventsQueue if exists and storage diff --git a/handlers/event.go b/handlers/event.go index f98290a20..9b59d672f 100644 --- a/handlers/event.go +++ b/handlers/event.go @@ -2,6 +2,7 @@ package handlers import ( "github.com/gin-gonic/gin" + "github.com/ksensehq/eventnative/appconfig" "github.com/ksensehq/eventnative/destinations" "github.com/ksensehq/eventnative/events" "github.com/ksensehq/eventnative/logging" @@ -50,7 +51,9 @@ func (eh *EventHandler) Handler(c *gin.Context) { processed[apiTokenKey] = token processed[timestamp.Key] = timestamp.NowUTC() - consumers := eh.destinationService.GetConsumers(token) + tokenId := appconfig.Instance.AuthorizationService.GetTokenId(token) + + consumers := eh.destinationService.GetConsumers(tokenId) if len(consumers) == 0 { logging.Warnf("Unknown token[%s] request was received", token) } else { diff --git a/logfiles/uploader.go b/logfiles/uploader.go index 3b7464c7c..6e6eb9b60 100644 --- a/logfiles/uploader.go +++ b/logfiles/uploader.go @@ -89,10 +89,10 @@ func (u *PeriodicUploader) Start() { continue } - token := regexResult[1] - storageProxies := u.destinationService.GetStorages(token) + tokenId := regexResult[1] + storageProxies := u.destinationService.GetStorages(tokenId) if len(storageProxies) == 0 { - logging.Warnf("Destination storages weren't found for file [%s] and token [%s]", filePath, token) + logging.Warnf("Destination storages weren't found for file [%s] and token [%s]", filePath, tokenId) continue } diff --git a/main.go b/main.go index 719c64081..bf21510a6 100644 --- a/main.go +++ b/main.go @@ -174,8 +174,8 @@ func SetupRouter(destinations *destinations.Service) *gin.Engine { apiEventHandler := handlers.NewEventHandler(destinations, events.NewApiPreprocessor()).Handler apiV1 := router.Group("/api/v1") { - apiV1.POST("/event", middleware.TokenAuth(jsEventHandler, appconfig.Instance.AuthorizationService.GetJsOrigins, "")) - apiV1.POST("/s2s/event", middleware.TokenAuth(apiEventHandler, appconfig.Instance.AuthorizationService.GetApiOrigins, "The token isn't a server token. Please use s2s integration token\n")) + apiV1.POST("/event", middleware.TokenAuth(jsEventHandler, appconfig.Instance.AuthorizationService.GetClientOrigins, "")) + apiV1.POST("/s2s/event", middleware.TokenAuth(apiEventHandler, appconfig.Instance.AuthorizationService.GetServerOrigins, "The token isn't a server token. Please use s2s integration token\n")) } return router From 516eaa9716fdfc808cf13847a92512a66d85ca71 Mon Sep 17 00:00:00 2001 From: Sergey Burykin Date: Tue, 6 Oct 2020 20:12:19 +0300 Subject: [PATCH 4/8] fixed test --- main_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/main_test.go b/main_test.go index bf6eb02ce..4b6d0ee34 100644 --- a/main_test.go +++ b/main_test.go @@ -24,7 +24,7 @@ import ( func SetTestDefaultParams() { viper.Set("log.path", "") - viper.Set("server.auth", `{"js": ["c2stoken"], "api":[{"token":"s2stoken", "origins":["whiteorigin*"]}]}`) + viper.Set("server.auth", `{"tokens":[{"id":"id1","client_secret":"c2stoken","server_secret":"s2stoken","origins":["whiteorigin*"]}]}`) } func TestApiEvent(t *testing.T) { @@ -79,7 +79,7 @@ func TestApiEvent(t *testing.T) { { "C2S Api event consuming test", "/api/v1/event?token=c2stoken", - "", + "https://whiteorigin.com/", "test_data/event_input.json", "test_data/fact_output.json", http.StatusOK, @@ -106,8 +106,7 @@ func TestApiEvent(t *testing.T) { inmemWriter := logging.InitInMemoryWriter() router := SetupRouter(destinations.NewTestService( map[string]map[string]events.Consumer{ - "c2stoken": {"test": events.NewAsyncLogger(inmemWriter, false)}, - "s2stoken": {"test": events.NewAsyncLogger(inmemWriter, false)}, + "id1": {"id1": events.NewAsyncLogger(inmemWriter, false)}, }, map[string]map[string]events.StorageProxy{})) freezeTime := time.Date(2020, 06, 16, 23, 0, 0, 0, time.UTC) From 43489a6ef6d3cc848f94a8cccb076ba7bde3714d Mon Sep 17 00:00:00 2001 From: Sergey Burykin Date: Wed, 7 Oct 2020 10:10:53 +0300 Subject: [PATCH 5/8] added destinations payload wrapper --- destinations/parser.go | 8 ++- destinations/service_test.go | 136 ++++++++++++++++++++--------------- 2 files changed, 86 insertions(+), 58 deletions(-) diff --git a/destinations/parser.go b/destinations/parser.go index a32f57f02..072e3e72a 100644 --- a/destinations/parser.go +++ b/destinations/parser.go @@ -7,14 +7,18 @@ import ( "github.com/ksensehq/eventnative/storages" ) +type Payload struct { + Destinations map[string]storages.DestinationConfig `json:"destinations,omitempty"` +} + func parseFromBytes(b []byte) (map[string]storages.DestinationConfig, error) { - payload := map[string]storages.DestinationConfig{} + payload := &Payload{} err := json.Unmarshal(b, &payload) if err != nil { return nil, err } - return payload, nil + return payload.Destinations, nil } func getHash(name string, destination storages.DestinationConfig) string { diff --git a/destinations/service_test.go b/destinations/service_test.go index 9f52590c7..7a308fa67 100644 --- a/destinations/service_test.go +++ b/destinations/service_test.go @@ -39,26 +39,36 @@ func TestServiceInit(t *testing.T) { appconfig.Init() initialDestinations := `{ - "redshift_1": { - "type": "redshift", - "only_tokens": ["token1", "token2"], - "datasource": { - "host": "host_redshift_1" - } - }, - "pg_1": { - "type": "postgres", - "only_tokens": ["token1", "token3"], - "datasource": { - "host": "host_pg_1" - } - }, - "pg_2": { - "type": "postgres", - "mode": "stream", - "only_tokens": ["token3"], - "datasource": { - "host": "host_pg_2" + "destinations": { + "redshift_1": { + "type": "redshift", + "only_tokens": [ + "token1", + "token2" + ], + "datasource": { + "host": "host_redshift_1" + } + }, + "pg_1": { + "type": "postgres", + "only_tokens": [ + "token1", + "token3" + ], + "datasource": { + "host": "host_pg_1" + } + }, + "pg_2": { + "type": "postgres", + "mode": "stream", + "only_tokens": [ + "token3" + ], + "datasource": { + "host": "host_pg_2" + } } } }` @@ -76,42 +86,56 @@ func TestServiceInit(t *testing.T) { //change changedDestinations := `{ - "pg_1": { - "type": "postgres", - "only_tokens": ["token1", "token3", "token4"], - "datasource": { - "host": "host_pg_1" - } - }, - "pg_2": { - "type": "postgres", - "mode": "stream", - "only_tokens": ["token3"], - "datasource": { - "host": "host_pg_2" - } - }, - "pg_3": { - "type": "postgres", - "mode": "stream", - "only_tokens": ["token4"], - "datasource": { - "host": "host_pg_3" - } - }, -"pg_4": { - "type": "postgres", - "only_tokens": ["token3"], - "datasource": { - "host": "host_pg_4" - } - }, -"pg_5": { - "type": "postgres", - "mode": "stream", - "only_tokens": ["token3"], - "datasource": { - "host": "host_pg_5" + "destinations": { + "pg_1": { + "type": "postgres", + "only_tokens": [ + "token1", + "token3", + "token4" + ], + "datasource": { + "host": "host_pg_1" + } + }, + "pg_2": { + "type": "postgres", + "mode": "stream", + "only_tokens": [ + "token3" + ], + "datasource": { + "host": "host_pg_2" + } + }, + "pg_3": { + "type": "postgres", + "mode": "stream", + "only_tokens": [ + "token4" + ], + "datasource": { + "host": "host_pg_3" + } + }, + "pg_4": { + "type": "postgres", + "only_tokens": [ + "token3" + ], + "datasource": { + "host": "host_pg_4" + } + }, + "pg_5": { + "type": "postgres", + "mode": "stream", + "only_tokens": [ + "token3" + ], + "datasource": { + "host": "host_pg_5" + } } } }` From 0c3ba932692706074f137920bf047ee17882c365 Mon Sep 17 00:00:00 2001 From: Sergey Burykin Date: Wed, 7 Oct 2020 10:32:26 +0300 Subject: [PATCH 6/8] added storages type constants --- storages/bigquery.go | 6 ++---- storages/clickhouse.go | 6 ++---- storages/factory.go | 12 ++++++------ storages/postgres.go | 6 ++---- storages/redshift.go | 5 ++--- storages/s3.go | 2 +- storages/snowflake.go | 6 ++---- storages/types.go | 10 ++++++++++ 8 files changed, 27 insertions(+), 26 deletions(-) create mode 100644 storages/types.go diff --git a/storages/bigquery.go b/storages/bigquery.go index 12cdebd6b..edb9df467 100644 --- a/storages/bigquery.go +++ b/storages/bigquery.go @@ -14,8 +14,6 @@ import ( "time" ) -const bqStorageType = "BigQuery" - //Store files to google BigQuery in two modes: //batch: via google cloud storage in batch mode (1 file = 1 transaction) //stream: via events queue in stream mode (1 object = 1 transaction) @@ -54,7 +52,7 @@ func NewBigQuery(ctx context.Context, name string, eventQueue *events.Persistent return nil, err } - tableHelper := NewTableHelper(bigQueryAdapter, monitorKeeper, bqStorageType) + tableHelper := NewTableHelper(bigQueryAdapter, monitorKeeper, BigQueryType) bq := &BigQuery{ name: name, @@ -164,7 +162,7 @@ func (bq *BigQuery) Name() string { } func (bq *BigQuery) Type() string { - return bqStorageType + return BigQueryType } func (bq *BigQuery) Close() (multiErr error) { diff --git a/storages/clickhouse.go b/storages/clickhouse.go index c4e2316f6..d475e90b3 100644 --- a/storages/clickhouse.go +++ b/storages/clickhouse.go @@ -11,8 +11,6 @@ import ( "math/rand" ) -const clickHouseStorageType = "ClickHouse" - //Store files to ClickHouse in two modes: //batch: (1 file = 1 transaction) //stream: (1 object = 1 transaction) @@ -52,7 +50,7 @@ func NewClickHouse(ctx context.Context, name string, eventQueue *events.Persiste } chAdapters = append(chAdapters, adapter) - tableHelpers = append(tableHelpers, NewTableHelper(adapter, monitorKeeper, clickHouseStorageType)) + tableHelpers = append(tableHelpers, NewTableHelper(adapter, monitorKeeper, ClickHouseType)) } ch := &ClickHouse{ @@ -85,7 +83,7 @@ func (ch *ClickHouse) Name() string { } func (ch *ClickHouse) Type() string { - return clickHouseStorageType + return ClickHouseType } //Insert fact in ClickHouse diff --git a/storages/factory.go b/storages/factory.go index 4ad5f15b6..c5d1be27d 100644 --- a/storages/factory.go +++ b/storages/factory.go @@ -115,17 +115,17 @@ func Create(ctx context.Context, name, logEventPath string, destination Destinat var storageProxy events.StorageProxy switch destination.Type { - case "redshift": + case RedshiftType: storageProxy = newProxy(createRedshift, storageConfig) - case "bigquery": + case BigQueryType: storageProxy = newProxy(createBigQuery, storageConfig) - case "postgres": + case PostgresType: storageProxy = newProxy(createPostgres, storageConfig) - case "clickhouse": + case ClickHouseType: storageProxy = newProxy(createClickHouse, storageConfig) - case "s3": + case S3Type: storageProxy = newProxy(createS3, storageConfig) - case "snowflake": + case SnowflakeType: storageProxy = newProxy(createSnowflake, storageConfig) default: if eventQueue != nil { diff --git a/storages/postgres.go b/storages/postgres.go index fd5340047..edc4e229c 100644 --- a/storages/postgres.go +++ b/storages/postgres.go @@ -9,8 +9,6 @@ import ( "github.com/ksensehq/eventnative/schema" ) -const postgresStorageType = "Postgres" - //Store files to Postgres in two modes: //batch: (1 file = 1 transaction) //stream: (1 object = 1 transaction) @@ -37,7 +35,7 @@ func NewPostgres(ctx context.Context, config *adapters.DataSourceConfig, process return nil, err } - tableHelper := NewTableHelper(adapter, monitorKeeper, postgresStorageType) + tableHelper := NewTableHelper(adapter, monitorKeeper, PostgresType) p := &Postgres{ name: storageName, @@ -123,5 +121,5 @@ func (p *Postgres) Name() string { } func (p *Postgres) Type() string { - return postgresStorageType + return PostgresType } diff --git a/storages/redshift.go b/storages/redshift.go index 570a5b014..4c8f734f9 100644 --- a/storages/redshift.go +++ b/storages/redshift.go @@ -14,7 +14,6 @@ import ( ) const tableFileKeyDelimiter = "-table-" -const redshiftStorageType = "Redshift" //Store files to aws RedShift in two modes: //batch: via aws s3 in batch mode (1 file = 1 transaction) @@ -52,7 +51,7 @@ func NewAwsRedshift(ctx context.Context, name string, eventQueue *events.Persist return nil, err } - tableHelper := NewTableHelper(redshiftAdapter, monitorKeeper, redshiftStorageType) + tableHelper := NewTableHelper(redshiftAdapter, monitorKeeper, RedshiftType) ar := &AwsRedshift{ name: name, @@ -174,7 +173,7 @@ func (ar *AwsRedshift) Name() string { } func (ar *AwsRedshift) Type() string { - return redshiftStorageType + return RedshiftType } func (ar *AwsRedshift) Close() error { diff --git a/storages/s3.go b/storages/s3.go index 727b99c2c..dee30dc92 100644 --- a/storages/s3.go +++ b/storages/s3.go @@ -57,7 +57,7 @@ func (s3 *S3) Name() string { } func (s3 *S3) Type() string { - return "S3" + return S3Type } func (s3 *S3) Close() error { diff --git a/storages/snowflake.go b/storages/snowflake.go index d1a89a5f5..936cdb2f9 100644 --- a/storages/snowflake.go +++ b/storages/snowflake.go @@ -15,8 +15,6 @@ import ( "time" ) -const snowflakeStorageType = "Snowflake" - //Store files to Snowflake in two modes: //batch: via aws s3 (or gcp) in batch mode (1 file = 1 transaction) //stream: via events queue in stream mode (1 object = 1 transaction) @@ -56,7 +54,7 @@ func NewSnowflake(ctx context.Context, name string, eventQueue *events.Persisten return nil, err } - tableHelper := NewTableHelper(snowflakeAdapter, monitorKeeper, snowflakeStorageType) + tableHelper := NewTableHelper(snowflakeAdapter, monitorKeeper, SnowflakeType) snowflake := &Snowflake{ name: name, @@ -228,7 +226,7 @@ func (s *Snowflake) Name() string { } func (s *Snowflake) Type() string { - return snowflakeStorageType + return SnowflakeType } func (s *Snowflake) Close() (multiErr error) { diff --git a/storages/types.go b/storages/types.go new file mode 100644 index 000000000..690354136 --- /dev/null +++ b/storages/types.go @@ -0,0 +1,10 @@ +package storages + +const ( + RedshiftType = "redshift" + BigQueryType = "bigquery" + PostgresType = "postgres" + ClickHouseType = "clickhouse" + S3Type = "s3" + SnowflakeType = "snowflake" +) From 510f88ea06f53f6ca7137694ca9706a7f7a2edc1 Mon Sep 17 00:00:00 2001 From: Sergey Burykin Date: Wed, 7 Oct 2020 18:10:40 +0300 Subject: [PATCH 7/8] fixed events pointer (reading from persistent queue) fixed stopping streaming gourutine --- appconfig/appconfig.go | 4 ++-- config/config.template.yaml | 4 ++-- destinations/service.go | 1 + destinations/unit.go | 7 ++++--- events/persistent_queue.go | 12 +++++++++--- main.go | 1 + resources/watcher.go | 1 + storages/bigquery.go | 8 +++++++- storages/clickhouse.go | 8 +++++++- storages/postgres.go | 7 ++++++- storages/redshift.go | 8 +++++++- storages/snowflake.go | 8 +++++++- storages/streaming.go | 14 ++++++++++++-- 13 files changed, 66 insertions(+), 17 deletions(-) diff --git a/appconfig/appconfig.go b/appconfig/appconfig.go index b57e29620..973d81810 100644 --- a/appconfig/appconfig.go +++ b/appconfig/appconfig.go @@ -26,8 +26,8 @@ var Instance *AppConfig func setDefaultParams() { viper.SetDefault("server.port", "8001") viper.SetDefault("server.static_files_dir", "./web") - viper.SetDefault("server.auth_reload_sec", 100) - viper.SetDefault("server.destinations_reload_sec", 120) + viper.SetDefault("server.auth_reload_sec", 30) + viper.SetDefault("server.destinations_reload_sec", 40) viper.SetDefault("geo.maxmind_path", "/home/eventnative/app/res/") viper.SetDefault("log.path", "/home/eventnative/logs/events") viper.SetDefault("log.show_in_server", false) diff --git a/config/config.template.yaml b/config/config.template.yaml index a153d0f83..c5e827cff 100644 --- a/config/config.template.yaml +++ b/config/config.template.yaml @@ -13,12 +13,12 @@ server: api: - 5f15eba2-db58-11ea-87d0-0242ac130003 - 62faa226-db58-11ea-87d0-0242ac130003 - auth_reload_sec: 60 #default value is 100. If 'auth' is http or file:/// source than it will be reloaded every auth_reload_sec + auth_reload_sec: 60 #default value is 30. If 'auth' is http or file:/// source than it will be reloaded every auth_reload_sec public_url: https://yourhost log: path: /home/eventnative/logs/ #omit this key to write log to stdout rotation_min: 60 #1440 (24 hours) default value - destinations_reload_sec: 60 #default value is 120. If 'destinations' is http or file:/// source than it will be reloaded every destinations_reload_sec + destinations_reload_sec: 60 #default value is 40. If 'destinations' is http or file:/// source than it will be reloaded every destinations_reload_sec geo.maxmind_path: https://statichost/GeoIP2-City.mmdb diff --git a/destinations/service.go b/destinations/service.go index ca8f00ed7..4f28d8160 100644 --- a/destinations/service.go +++ b/destinations/service.go @@ -279,6 +279,7 @@ func (s *Service) remove(name string, unit *Unit) { } delete(s.unitsByName, name) + logging.Infof("[%s] has been removed!", name) } func (s *Service) Close() (multiErr error) { diff --git a/destinations/unit.go b/destinations/unit.go index 5d09d9c7a..32ddf0372 100644 --- a/destinations/unit.go +++ b/destinations/unit.go @@ -17,13 +17,14 @@ type Unit struct { //Close eventsQueue if exists and storage func (u *Unit) Close() (multiErr error) { + if err := u.storage.Close(); err != nil { + multiErr = multierror.Append(multiErr, err) + } + if u.eventQueue != nil { if err := u.eventQueue.Close(); err != nil { multiErr = multierror.Append(multiErr, fmt.Errorf("Error closing events queue: %v", err)) } } - if err := u.storage.Close(); err != nil { - multiErr = multierror.Append(multiErr, err) - } return } diff --git a/events/persistent_queue.go b/events/persistent_queue.go index f9fcd6064..f36c5137d 100644 --- a/events/persistent_queue.go +++ b/events/persistent_queue.go @@ -11,12 +11,14 @@ import ( const eventsPerPersistedFile = 2000 +var ErrQueueClosed = errors.New("queue is closed") + type QueuedFact struct { FactBytes []byte DequeuedTime time.Time } -// QueuedFactBuilder creates and returns a new events.Fact. +// QueuedFactBuilder creates and returns a new *events.QueuedFact (must be pointer). // This is used when we load a segment of the queue from disk. func QueuedFactBuilder() interface{} { return &QueuedFact{} @@ -46,7 +48,7 @@ func (pq *PersistentQueue) ConsumeTimed(f Fact, t time.Time) { return } - if err := pq.queue.Enqueue(QueuedFact{FactBytes: factBytes, DequeuedTime: t}); err != nil { + if err := pq.queue.Enqueue(&QueuedFact{FactBytes: factBytes, DequeuedTime: t}); err != nil { logSkippedEvent(f, fmt.Errorf("Error putting event fact bytes to the persistent queue: %v", err)) return } @@ -55,9 +57,13 @@ func (pq *PersistentQueue) ConsumeTimed(f Fact, t time.Time) { func (pq *PersistentQueue) DequeueBlock() (Fact, time.Time, error) { iface, err := pq.queue.DequeueBlock() if err != nil { + if err == dque.ErrQueueClosed { + err = ErrQueueClosed + } return nil, time.Time{}, err } - wrappedFact, ok := iface.(QueuedFact) + + wrappedFact, ok := iface.(*QueuedFact) if !ok || len(wrappedFact.FactBytes) == 0 { return nil, time.Time{}, errors.New("Dequeued object is not a QueuedFact instance or fact bytes is empty") } diff --git a/main.go b/main.go index bf21510a6..b7c11b597 100644 --- a/main.go +++ b/main.go @@ -88,6 +88,7 @@ func main() { signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP) go func() { <-c + logging.Info("* Service is shutting down.. *") telemetry.ServerStop() appstatus.Instance.Idle = true cancel() diff --git a/resources/watcher.go b/resources/watcher.go index d0ed826c8..192f2beda 100644 --- a/resources/watcher.go +++ b/resources/watcher.go @@ -29,6 +29,7 @@ func Watch(name, source string, loadFunc func(string) ([]byte, error), consumer consumer: consumer, reloadEvery: reloadEvery, } + logging.Infof("Resource [%s] will be loaded every %d seconds", name, int(reloadEvery.Seconds())) w.watch() } diff --git a/storages/bigquery.go b/storages/bigquery.go index edb9df467..a2c3889b8 100644 --- a/storages/bigquery.go +++ b/storages/bigquery.go @@ -23,6 +23,7 @@ type BigQuery struct { bqAdapter *adapters.BigQuery tableHelper *TableHelper schemaProcessor *schema.Processor + streamingWorker *StreamingWorker breakOnError bool } @@ -63,7 +64,8 @@ func NewBigQuery(ctx context.Context, name string, eventQueue *events.Persistent breakOnError: breakOnError, } if streamMode { - newStreamingWorker(eventQueue, processor, bq).start() + bq.streamingWorker = newStreamingWorker(eventQueue, processor, bq) + bq.streamingWorker.start() } else { bq.startBatch() } @@ -173,5 +175,9 @@ func (bq *BigQuery) Close() (multiErr error) { multiErr = multierror.Append(multiErr, fmt.Errorf("[%s] Error closing BigQuery client: %v", bq.Name(), err)) } + if bq.streamingWorker != nil { + bq.streamingWorker.Close() + } + return } diff --git a/storages/clickhouse.go b/storages/clickhouse.go index d475e90b3..11a47ea60 100644 --- a/storages/clickhouse.go +++ b/storages/clickhouse.go @@ -19,6 +19,7 @@ type ClickHouse struct { adapters []*adapters.ClickHouse tableHelpers []*TableHelper schemaProcessor *schema.Processor + streamingWorker *StreamingWorker breakOnError bool } @@ -72,7 +73,8 @@ func NewClickHouse(ctx context.Context, name string, eventQueue *events.Persiste } if streamMode { - newStreamingWorker(eventQueue, processor, ch).start() + ch.streamingWorker = newStreamingWorker(eventQueue, processor, ch) + ch.streamingWorker.start() } return ch, nil @@ -152,6 +154,10 @@ func (ch *ClickHouse) Close() (multiErr error) { } } + if ch.streamingWorker != nil { + ch.streamingWorker.Close() + } + return multiErr } diff --git a/storages/postgres.go b/storages/postgres.go index edc4e229c..4ef657f55 100644 --- a/storages/postgres.go +++ b/storages/postgres.go @@ -17,6 +17,7 @@ type Postgres struct { adapter *adapters.Postgres tableHelper *TableHelper schemaProcessor *schema.Processor + streamingWorker *StreamingWorker breakOnError bool } @@ -46,7 +47,8 @@ func NewPostgres(ctx context.Context, config *adapters.DataSourceConfig, process } if streamMode { - newStreamingWorker(eventQueue, processor, p).start() + p.streamingWorker = newStreamingWorker(eventQueue, processor, p) + p.streamingWorker.start() } return p, nil @@ -113,6 +115,9 @@ func (p *Postgres) Close() error { return fmt.Errorf("[%s] Error closing postgres datasource: %v", p.Name(), err) } + if p.streamingWorker != nil { + p.streamingWorker.Close() + } return nil } diff --git a/storages/redshift.go b/storages/redshift.go index 4c8f734f9..55c8a4cf0 100644 --- a/storages/redshift.go +++ b/storages/redshift.go @@ -24,6 +24,7 @@ type AwsRedshift struct { redshiftAdapter *adapters.AwsRedshift tableHelper *TableHelper schemaProcessor *schema.Processor + streamingWorker *StreamingWorker breakOnError bool } @@ -63,7 +64,8 @@ func NewAwsRedshift(ctx context.Context, name string, eventQueue *events.Persist } if streamMode { - newStreamingWorker(eventQueue, processor, ar).start() + ar.streamingWorker = newStreamingWorker(eventQueue, processor, ar) + ar.streamingWorker.start() } else { ar.startBatch() } @@ -181,5 +183,9 @@ func (ar *AwsRedshift) Close() error { return fmt.Errorf("[%s] Error closing redshift datasource: %v", ar.Name(), err) } + if ar.streamingWorker != nil { + ar.streamingWorker.Close() + } + return nil } diff --git a/storages/snowflake.go b/storages/snowflake.go index 936cdb2f9..be0c11757 100644 --- a/storages/snowflake.go +++ b/storages/snowflake.go @@ -24,6 +24,7 @@ type Snowflake struct { snowflakeAdapter *adapters.Snowflake tableHelper *TableHelper schemaProcessor *schema.Processor + streamingWorker *StreamingWorker breakOnError bool } @@ -66,7 +67,8 @@ func NewSnowflake(ctx context.Context, name string, eventQueue *events.Persisten } if streamMode { - newStreamingWorker(eventQueue, processor, snowflake).start() + snowflake.streamingWorker = newStreamingWorker(eventQueue, processor, snowflake) + snowflake.streamingWorker.start() } else { snowflake.startBatch() } @@ -238,5 +240,9 @@ func (s *Snowflake) Close() (multiErr error) { multiErr = multierror.Append(multiErr, fmt.Errorf("[%s] Error closing snowflake stage: %v", s.Name(), err)) } + if s.streamingWorker != nil { + s.streamingWorker.Close() + } + return } diff --git a/storages/streaming.go b/storages/streaming.go index 76f3ab40a..30cfd24c2 100644 --- a/storages/streaming.go +++ b/storages/streaming.go @@ -1,7 +1,6 @@ package storages import ( - "github.com/ksensehq/eventnative/appstatus" "github.com/ksensehq/eventnative/events" "github.com/ksensehq/eventnative/logging" "github.com/ksensehq/eventnative/schema" @@ -19,6 +18,8 @@ type StreamingWorker struct { eventQueue *events.PersistentQueue schemaProcessor *schema.Processor streamingStorage StreamingStorage + + closed bool } func newStreamingWorker(eventQueue *events.PersistentQueue, schemaProcessor *schema.Processor, streamingStorage StreamingStorage) *StreamingWorker { @@ -35,11 +36,15 @@ func newStreamingWorker(eventQueue *events.PersistentQueue, schemaProcessor *sch func (sw *StreamingWorker) start() { go func() { for { - if appstatus.Instance.Idle { + if sw.closed { break } + fact, dequeuedTime, err := sw.eventQueue.DequeueBlock() if err != nil { + if err == events.ErrQueueClosed && sw.closed { + continue + } logging.Errorf("[%s] Error reading event fact from queue: %v", sw.streamingStorage.Name(), err) continue } @@ -73,3 +78,8 @@ func (sw *StreamingWorker) start() { } }() } + +func (sw *StreamingWorker) Close() error { + sw.closed = true + return nil +} From 511d5fd75e97d0d1e2e9e3c860ac20c9e8c9e852 Mon Sep 17 00:00:00 2001 From: Sergey Burykin Date: Wed, 7 Oct 2020 18:35:50 +0300 Subject: [PATCH 8/8] fixed config template --- config/config.template.yaml | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/config/config.template.yaml b/config/config.template.yaml index c5e827cff..403d5dcd5 100644 --- a/config/config.template.yaml +++ b/config/config.template.yaml @@ -6,13 +6,24 @@ server: name: event-us-01 #This parameter is required in cluster deployments. If not set - will be taken from os.Hostname() #might be http url of file source #auth: https://source_of_tokens - auth: - js: + #or might be full configured token object + #auth: + # - + # id: unique_tokenId + # client_secret: bd33c5fa-d69f-11ea-87d0-0242ac130003 + # server_secret: 5f15eba2-db58-11ea-87d0-0242ac130003 + # origins: + # - *abc.com + # - efg.com + # - + # id: unique_tokenId2 + # client_secret: 123jsy213c5fa-c20765a0-d69f003 + # - + # id: unique_tokenId3 + # server_secret: 231dasds-3211kb3rdf-412dkjnabf + auth: #plain strings - client_secrets - bd33c5fa-d69f-11ea-87d0-0242ac130003 - c20765a0-d69f-15ea-82d0-0242ac130003 - api: - - 5f15eba2-db58-11ea-87d0-0242ac130003 - - 62faa226-db58-11ea-87d0-0242ac130003 auth_reload_sec: 60 #default value is 30. If 'auth' is http or file:/// source than it will be reloaded every auth_reload_sec public_url: https://yourhost log: