diff --git a/adapters/clickhouse.go b/adapters/clickhouse.go index ee414e8dd..531b04192 100644 --- a/adapters/clickhouse.go +++ b/adapters/clickhouse.go @@ -55,26 +55,26 @@ var ( //ClickHouseConfig dto for deserialized clickhouse config type ClickHouseConfig struct { - Dsns []string `mapstructure:"dsns"` - Database string `mapstructure:"db"` - Tls map[string]string `mapstructure:"tls"` - Cluster string `mapstructure:"cluster"` - Engine *EngineConfig `mapstructure:"engine"` + Dsns []string `mapstructure:"dsns" json:"dsns,omitempty"` + Database string `mapstructure:"db" json:"db,omitempty"` + Tls map[string]string `mapstructure:"tls" json:"tls,omitempty"` + Cluster string `mapstructure:"cluster" json:"cluster,omitempty"` + Engine *EngineConfig `mapstructure:"engine" json:"engine,omitempty"` } //EngineConfig dto for deserialized clickhouse engine config type EngineConfig struct { - RawStatement string `mapstructure:"raw_statement"` - NonNullFields []string `mapstructure:"non_null_fields"` - PartitionFields []FieldConfig `mapstructure:"partition_fields"` - OrderFields []FieldConfig `mapstructure:"order_fields"` - PrimaryKeys []string `mapstructure:"primary_keys"` + RawStatement string `mapstructure:"raw_statement" json:"raw_statement,omitempty"` + NonNullFields []string `mapstructure:"non_null_fields" json:"non_null_fields,omitempty"` + PartitionFields []FieldConfig `mapstructure:"partition_fields" json:"partition_fields,omitempty"` + OrderFields []FieldConfig `mapstructure:"order_fields" json:"order_fields,omitempty"` + PrimaryKeys []string `mapstructure:"primary_keys" json:"primary_keys,omitempty"` } //FieldConfig dto for deserialized clickhouse engine fields type FieldConfig struct { - Function string `mapstructure:"function"` - Field string `mapstructure:"field"` + Function string `mapstructure:"function" json:"function,omitempty"` + Field string `mapstructure:"field" json:"field,omitempty"` } //Validate required fields in ClickHouseConfig diff --git a/adapters/google_cloud_storage.go b/adapters/google_cloud_storage.go index a93eab4f9..c926cc19f 100644 --- a/adapters/google_cloud_storage.go +++ b/adapters/google_cloud_storage.go @@ -19,10 +19,10 @@ type GoogleCloudStorage struct { } type GoogleConfig struct { - Bucket string `mapstructure:"gcs_bucket" json:"gcs_bucket"` - Project string `mapstructure:"bq_project" json:"bq_project"` - Dataset string `mapstructure:"bq_dataset" json:"bq_dataset"` - KeyFile interface{} `mapstructure:"key_file" json:"key_file"` + Bucket string `mapstructure:"gcs_bucket" json:"gcs_bucket,omitempty"` + Project string `mapstructure:"bq_project" json:"bq_project,omitempty"` + Dataset string `mapstructure:"bq_dataset" json:"bq_dataset,omitempty"` + KeyFile interface{} `mapstructure:"key_file" json:"key_file,omitempty"` //will be set on validation credentials option.ClientOption diff --git a/adapters/postgres.go b/adapters/postgres.go index 3ef81f161..1ecbd3fc6 100644 --- a/adapters/postgres.go +++ b/adapters/postgres.go @@ -54,13 +54,13 @@ var ( //DataSourceConfig dto for deserialized datasource config (e.g. in Postgres or AwsRedshift destination) type DataSourceConfig struct { - Host string `mapstructure:"host"` - Port int `mapstructure:"port"` - Db string `mapstructure:"db"` - Schema string `mapstructure:"schema"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - Parameters map[string]string `mapstructure:"parameters"` + Host string `mapstructure:"host" json:"host,omitempty"` + Port int `mapstructure:"port" json:"port,omitempty"` + Db string `mapstructure:"db" json:"db,omitempty"` + Schema string `mapstructure:"schema" json:"schema,omitempty"` + Username string `mapstructure:"username" json:"username,omitempty"` + Password string `mapstructure:"password" json:"password,omitempty"` + Parameters map[string]string `mapstructure:"parameters" json:"parameters,omitempty"` } //Validate required fields in DataSourceConfig diff --git a/adapters/s3.go b/adapters/s3.go index 6a5302a24..b07ada877 100644 --- a/adapters/s3.go +++ b/adapters/s3.go @@ -18,12 +18,12 @@ type S3 struct { } type S3Config struct { - AccessKeyID string `mapstructure:"access_key_id" json:"access_key_id"` - SecretKey string `mapstructure:"secret_access_key" json:"secret_access_key"` - Bucket string `mapstructure:"bucket"` - Region string `mapstructure:"region"` - Endpoint string `mapstructure:"endpoint"` - Folder string `mapstructure:"folder"` + AccessKeyID string `mapstructure:"access_key_id" json:"access_key_id,omitempty"` + SecretKey string `mapstructure:"secret_access_key" json:"secret_access_key,omitempty"` + Bucket string `mapstructure:"bucket" json:"bucket,omitempty"` + Region string `mapstructure:"region" json:"region,omitempty"` + Endpoint string `mapstructure:"endpoint" json:"endpoint,omitempty"` + Folder string `mapstructure:"folder" json:"folder,omitempty"` } func (s3c *S3Config) Validate() error { diff --git a/adapters/snowflake.go b/adapters/snowflake.go index 073c6deea..1a4bdf1bb 100644 --- a/adapters/snowflake.go +++ b/adapters/snowflake.go @@ -42,15 +42,15 @@ var ( //SnowflakeConfig dto for deserialized datasource config for Snowflake type SnowflakeConfig struct { - Account string `mapstructure:"account"` - Port int `mapstructure:"port"` - Db string `mapstructure:"db"` - Schema string `mapstructure:"schema"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - Warehouse string `mapstructure:"warehouse"` - Stage string `mapstructure:"stage"` - Parameters map[string]*string `mapstructure:"parameters"` + Account string `mapstructure:"account" json:"account,omitempty"` + Port int `mapstructure:"port" json:"port,omitempty"` + Db string `mapstructure:"db" json:"db,omitempty"` + Schema string `mapstructure:"schema" json:"schema,omitempty"` + Username string `mapstructure:"username" json:"username,omitempty"` + Password string `mapstructure:"password" json:"password,omitempty"` + Warehouse string `mapstructure:"warehouse" json:"warehouse,omitempty"` + Stage string `mapstructure:"stage" json:"stage,omitempty"` + Parameters map[string]*string `mapstructure:"parameters" json:"parameters,omitempty"` } //Validate required fields in SnowflakeConfig diff --git a/appconfig/appconfig.go b/appconfig/appconfig.go index c2eb7fb49..973d81810 100644 --- a/appconfig/appconfig.go +++ b/appconfig/appconfig.go @@ -26,7 +26,8 @@ var Instance *AppConfig func setDefaultParams() { viper.SetDefault("server.port", "8001") viper.SetDefault("server.static_files_dir", "./web") - viper.SetDefault("server.auth_reload_sec", 300) + viper.SetDefault("server.auth_reload_sec", 30) + viper.SetDefault("server.destinations_reload_sec", 40) viper.SetDefault("geo.maxmind_path", "/home/eventnative/app/res/") viper.SetDefault("log.path", "/home/eventnative/logs/events") viper.SetDefault("log.show_in_server", false) diff --git a/authorization/parser.go b/authorization/parser.go new file mode 100644 index 000000000..05c1fa206 --- /dev/null +++ b/authorization/parser.go @@ -0,0 +1,90 @@ +package authorization + +import ( + "encoding/json" + "fmt" + "github.com/ksensehq/eventnative/resources" + "strings" +) + +type Token struct { + Id string `mapstructure:"id" json:"id,omitempty"` + ClientSecret string `mapstructure:"client_secret" json:"client_secret,omitempty"` + ServerSecret string `mapstructure:"server_secret" json:"server_secret,omitempty"` + Origins []string `mapstructure:"origins" json:"origins,omitempty"` +} + +type TokensPayload struct { + Tokens []Token `json:"tokens,omitempty"` +} + +type TokensHolder struct { + //origins by client token + clientTokensOrigins map[string][]string + //origins by server token + serverTokensOrigins map[string][]string + + //all token ids + ids []string + //token by: client_secret/server_secret/id + all map[string]Token +} + +func (th *TokensHolder) IsEmpty() bool { + return th == nil || len(th.ids) == 0 +} + +//parse tokens from json bytes +func parseFromBytes(b []byte) (*TokensHolder, error) { + payload := &TokensPayload{} + err := json.Unmarshal(b, payload) + if err != nil { + return nil, fmt.Errorf("Error unmarshalling tokens. Payload must be json with 'tokens' key: %v", err) + } + + return reformat(payload.Tokens), nil +} + +func fromStrings(clientSecrets []string) *TokensHolder { + var tokens []Token + for _, clientSecret := range clientSecrets { + tokens = append(tokens, Token{ClientSecret: clientSecret}) + } + return reformat(tokens) +} + +func reformat(tokens []Token) *TokensHolder { + clientTokensOrigins := map[string][]string{} + serverTokensOrigins := map[string][]string{} + all := map[string]Token{} + var ids []string + + for _, tokenObj := range tokens { + if tokenObj.Id == "" { + //hash from client,server secret will be id + tokenObj.Id = resources.GetHash([]byte(tokenObj.ClientSecret + tokenObj.ServerSecret)) + } + + all[tokenObj.Id] = tokenObj + ids = append(ids, tokenObj.Id) + + trimmedClientToken := strings.TrimSpace(tokenObj.ClientSecret) + if trimmedClientToken != "" { + clientTokensOrigins[trimmedClientToken] = tokenObj.Origins + all[trimmedClientToken] = tokenObj + } + + trimmedServerToken := strings.TrimSpace(tokenObj.ServerSecret) + if trimmedServerToken != "" { + serverTokensOrigins[trimmedServerToken] = tokenObj.Origins + all[trimmedServerToken] = tokenObj + } + } + + return &TokensHolder{ + clientTokensOrigins: clientTokensOrigins, + serverTokensOrigins: serverTokensOrigins, + ids: ids, + all: all, + } +} diff --git a/authorization/parser_test.go b/authorization/parser_test.go new file mode 100644 index 000000000..0face1cdb --- /dev/null +++ b/authorization/parser_test.go @@ -0,0 +1,139 @@ +package authorization + +import ( + "github.com/ksensehq/eventnative/test" + "github.com/stretchr/testify/require" + "testing" +) + +func TestParseFromBytes(t *testing.T) { + tests := []struct { + name string + input []byte + expected *TokensHolder + expectedErr string + }{ + { + "Empty input", + []byte{}, + nil, + "Error unmarshalling tokens. Payload must be json with 'tokens' key: unexpected end of JSON input", + }, + { + "Wrong json keys format", + []byte(`{"tokens":{}}}`), + nil, + "Error unmarshalling tokens. Payload must be json with 'tokens' key: invalid character '}' after top-level value", + }, + { + "Empty json input", + []byte(`{}`), + &TokensHolder{clientTokensOrigins: map[string][]string{}, serverTokensOrigins: map[string][]string{}, all: map[string]Token{}}, + "", + }, + { + "Wrong keys json input", + []byte(`{"jsss":[], "apii": []}`), + &TokensHolder{clientTokensOrigins: map[string][]string{}, serverTokensOrigins: map[string][]string{}, all: map[string]Token{}}, + "", + }, + + { + "Empty json tokens input", + []byte(`{"tokens":[]}`), + &TokensHolder{clientTokensOrigins: map[string][]string{}, serverTokensOrigins: map[string][]string{}, all: map[string]Token{}}, + "", + }, + { + "ok", + []byte(`{"tokens":[{"id":"id1","client_secret":"cl_secret1","server_secret":"sr_secret1","origins":["abc.com","rr.ru"]},{"client_secret":"cl_secret2"},{"server_secret":"sr_secret3"}]}`), + buildExpected(), + "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualTokenHolder, err := parseFromBytes(tt.input) + if tt.expectedErr != "" { + require.EqualError(t, err, tt.expectedErr, "Errors aren't equal") + } else { + require.NoError(t, err) + test.ObjectsEqual(t, tt.expected.ids, actualTokenHolder.ids, "Token holder ids aren't equal") + test.ObjectsEqual(t, tt.expected.all, actualTokenHolder.all, "Token holders all aren't equal") + test.ObjectsEqual(t, tt.expected.clientTokensOrigins, actualTokenHolder.clientTokensOrigins, "Token holders client tokens aren't equal") + test.ObjectsEqual(t, tt.expected.serverTokensOrigins, actualTokenHolder.serverTokensOrigins, "Token holders server tokens aren't equal") + } + }) + } +} + +func buildExpected() *TokensHolder { + token1 := Token{ + Id: "id1", + ClientSecret: "cl_secret1", + ServerSecret: "sr_secret1", + Origins: []string{"abc.com", "rr.ru"}, + } + token2 := Token{ + Id: "31429624a471b9bdc6d60350c6cfc24d", + ClientSecret: "cl_secret2", + } + token3 := Token{ + Id: "03f9ed11a0268dd78766686f8f292b7b", + ServerSecret: "sr_secret3", + } + return &TokensHolder{ + clientTokensOrigins: map[string][]string{"cl_secret1": {"abc.com", "rr.ru"}, "cl_secret2": nil}, + serverTokensOrigins: map[string][]string{"sr_secret1": {"abc.com", "rr.ru"}, "sr_secret3": nil}, + ids: []string{"id1", "31429624a471b9bdc6d60350c6cfc24d", "03f9ed11a0268dd78766686f8f292b7b"}, + all: map[string]Token{ + "id1": token1, + "cl_secret1": token1, + "sr_secret1": token1, + + "31429624a471b9bdc6d60350c6cfc24d": token2, + "cl_secret2": token2, + + "03f9ed11a0268dd78766686f8f292b7b": token3, + "sr_secret3": token3, + }, + } +} + +func TestFromStrings(t *testing.T) { + tests := []struct { + name string + input []string + expected *TokensHolder + }{ + { + "empty tokens", + []string{}, + &TokensHolder{clientTokensOrigins: map[string][]string{}, serverTokensOrigins: map[string][]string{}, all: map[string]Token{}}, + }, + { + "value is string slice", + []string{"token1", "token2"}, + &TokensHolder{ + clientTokensOrigins: map[string][]string{"token1": nil, "token2": nil}, + serverTokensOrigins: map[string][]string{}, + all: map[string]Token{ + "78b1e6d775cec5260001af137a79dbd5": {Id: "78b1e6d775cec5260001af137a79dbd5", ClientSecret: "token1"}, + "token1": {Id: "78b1e6d775cec5260001af137a79dbd5", ClientSecret: "token1"}, + + "0e0530c1430da76495955eb06eb99d95": {Id: "0e0530c1430da76495955eb06eb99d95", ClientSecret: "token2"}, + "token2": {Id: "0e0530c1430da76495955eb06eb99d95", ClientSecret: "token2"}, + }, + ids: []string{"78b1e6d775cec5260001af137a79dbd5", "0e0530c1430da76495955eb06eb99d95"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actual := fromStrings(tt.input) + test.ObjectsEqual(t, tt.expected, actual, "Token holders aren't equal") + }) + } +} diff --git a/authorization/service.go b/authorization/service.go index c5b31f5d0..a40d8baad 100644 --- a/authorization/service.go +++ b/authorization/service.go @@ -1,26 +1,27 @@ package authorization import ( - "encoding/json" "errors" - "fmt" "github.com/google/uuid" "github.com/ksensehq/eventnative/logging" "github.com/ksensehq/eventnative/resources" "github.com/spf13/viper" "strings" "sync" + "time" ) -type Token struct { - Value string `json:"token,omitempty"` - Origins []string `json:"origins,omitempty"` -} +const ( + serviceName = "authorization" + viperAuthKey = "server.auth" + + defaultTokenId = "defaultid" +) type Service struct { sync.RWMutex - c2STokens map[string][]string - s2STokens map[string][]string + + tokensHolder *TokensHolder } func NewService() (*Service, error) { @@ -31,120 +32,117 @@ func NewService() (*Service, error) { return nil, errors.New("server.auth_reload_sec can't be empty") } - c2sTokens, err := load("server.auth", service.updateC2STokens, reloadSec) - if err != nil { - return nil, fmt.Errorf("Error loading server.auth tokens: %v", err) - } + var tokens []Token + err := viper.UnmarshalKey(viperAuthKey, &tokens) + if err == nil { + service.tokensHolder = reformat(tokens) + } else { + auth := viper.GetStringSlice(viperAuthKey) + + if len(auth) == 1 { + authSource := auth[0] + if strings.HasPrefix(authSource, "http://") || strings.HasPrefix(authSource, "https://") { + resources.Watch(serviceName, authSource, resources.LoadFromHttp, service.updateTokens, time.Duration(reloadSec)*time.Second) + } else if strings.HasPrefix(authSource, "file://") { + resources.Watch(serviceName, strings.Replace(authSource, "file://", "", 1), resources.LoadFromFile, service.updateTokens, time.Duration(reloadSec)*time.Second) + } else if strings.HasPrefix(authSource, "{") && strings.HasSuffix(authSource, "}") { + tokensHolder, err := parseFromBytes([]byte(authSource)) + if err != nil { + return nil, err + } + service.tokensHolder = tokensHolder + } else { + //plain token + service.tokensHolder = fromStrings(auth) + } + } else { + //array of tokens + service.tokensHolder = fromStrings(auth) + } - s2sTokens, err := load("server.s2s_auth", service.updateS2STokens, reloadSec) - if err != nil { - return nil, fmt.Errorf("Error loading server.s2s_auth tokens: %v", err) } - if len(c2sTokens) == 0 && len(s2sTokens) == 0 { + if service.tokensHolder.IsEmpty() { //autogenerated - generatedToken := uuid.New().String() - c2sTokens[generatedToken] = []string{} - s2sTokens[generatedToken] = []string{} - logging.Warn("Empty 'server.tokens' config key. Auto generate token:", generatedToken) - } + generatedTokenSecret := uuid.New().String() + generatedToken := Token{ + Id: defaultTokenId, + ClientSecret: generatedTokenSecret, + ServerSecret: generatedTokenSecret, + Origins: []string{}, + } - service.c2STokens = c2sTokens - service.s2STokens = s2sTokens + service.tokensHolder = reformat([]Token{generatedToken}) + logging.Warn("Empty 'server.auth' config keys. Auto generate token:", generatedTokenSecret) + } return service, nil } -func (s *Service) GetC2SOrigins(token string) ([]string, bool) { +//GetClientOrigins return origins by client_secret +func (s *Service) GetClientOrigins(clientSecret string) ([]string, bool) { s.RLock() defer s.RUnlock() - origins, ok := s.c2STokens[token] + origins, ok := s.tokensHolder.clientTokensOrigins[clientSecret] return origins, ok } -func (s *Service) GetAllTokens() map[string]bool { +//GetServerOrigins return origins by server_secret +func (s *Service) GetServerOrigins(serverSecret string) ([]string, bool) { s.RLock() defer s.RUnlock() - result := map[string]bool{} - for k := range s.c2STokens { - result[k] = true - } - for k := range s.s2STokens { - result[k] = true - } - return result + origins, ok := s.tokensHolder.serverTokensOrigins[serverSecret] + return origins, ok } -func (s *Service) GetS2SOrigins(token string) ([]string, bool) { +//GetAllTokenIds return all token ids +func (s *Service) GetAllTokenIds() []string { s.RLock() defer s.RUnlock() - origins, ok := s.s2STokens[token] - return origins, ok + return s.tokensHolder.ids } -func (s *Service) updateC2STokens(c2sTokens map[string][]string) { - s.Lock() - s.c2STokens = c2sTokens - s.Unlock() -} +//GetAllIdsByToken return token ids by token identity(client_secret/server_secret/token id) +func (s *Service) GetAllIdsByToken(tokenIdentity []string) (ids []string) { + s.RLock() + defer s.RUnlock() -func (s *Service) updateS2STokens(s2sTokens map[string][]string) { - s.Lock() - s.s2STokens = s2sTokens - s.Unlock() -} + deduplication := map[string]bool{} + for _, tokenFilter := range tokenIdentity { + tokenObj, _ := s.tokensHolder.all[tokenFilter] + deduplication[tokenObj.Id] = true + } -func load(viperKey string, updateFunc func(map[string][]string), reloadSec int) (map[string][]string, error) { - authSource := viper.GetString(viperKey) - if strings.HasPrefix(authSource, "http://") || strings.HasPrefix(authSource, "https://") { - return Watcher(authSource, resources.LoadFromHttp, updateFunc, reloadSec) - } else if strings.Contains(authSource, "file://") { - return Watcher(strings.Replace(authSource, "file://", "", 1), resources.LoadFromFile, updateFunc, reloadSec) - } else if strings.HasPrefix(authSource, "[") && strings.HasSuffix(authSource, "]") { - return parseFromBytes("app config json array", []byte(authSource)) - } else { - return loadFromConfig(viperKey) + for id := range deduplication { + ids = append(ids, id) } + return } -func loadFromConfig(viperKey string) (map[string][]string, error) { - tokensArr := viper.GetStringSlice(viperKey) - tokensOrigins := map[string][]string{} - for _, t := range tokensArr { - trimmed := strings.TrimSpace(t) - if trimmed != "" { - tokensOrigins[trimmed] = []string{} - } - } +//GetTokenId return token id by client_secret/server_secret/token id +//return "" if token wasn't found +func (s *Service) GetTokenId(tokenFilter string) string { + s.RLock() + defer s.RUnlock() - return tokensOrigins, nil + token, ok := s.tokensHolder.all[tokenFilter] + if ok { + return token.Id + } + return "" } -func parseFromBytes(source string, b []byte) (map[string][]string, error) { - var tokens []Token - err := json.Unmarshal(b, &tokens) +//parse and set tokensHolder with lock +func (s *Service) updateTokens(payload []byte) { + tokenHolder, err := parseFromBytes(payload) if err != nil { - //try to unmarshal into []string - var strTokens []string - err := json.Unmarshal(b, &strTokens) - if err != nil { - return nil, fmt.Errorf("Error unmarshalling tokens from %s. Payload must be json array or string array: %v", source, err) - } - for _, t := range strTokens { - tokens = append(tokens, Token{Value: t}) - } - } - - tokensOrigins := map[string][]string{} - for _, tokenObj := range tokens { - trimmed := strings.TrimSpace(tokenObj.Value) - if trimmed != "" { - tokensOrigins[trimmed] = tokenObj.Origins - } + logging.Errorf("Error updating authorization tokens: %v", err) + } else { + s.Lock() + s.tokensHolder = tokenHolder + s.Unlock() } - - return tokensOrigins, nil } diff --git a/authorization/watcher.go b/authorization/watcher.go deleted file mode 100644 index 3c7563162..000000000 --- a/authorization/watcher.go +++ /dev/null @@ -1,48 +0,0 @@ -package authorization - -import ( - "crypto/md5" - "fmt" - "github.com/ksensehq/eventnative/appstatus" - "github.com/ksensehq/eventnative/logging" - "time" -) - -func Watcher(source string, loadFunc func(string) ([]byte, error), updateFunc func(map[string][]string), reloadSec int) (map[string][]string, error) { - payload, err := loadFunc(source) - if err != nil { - return nil, err - } - - go watch(source, payload, loadFunc, updateFunc, reloadSec) - return parseFromBytes(source, payload) -} - -func watch(source string, firstTimePayload []byte, loadFunc func(string) ([]byte, error), updateFunc func(map[string][]string), reloadSec int) { - hash := fmt.Sprintf("%x", md5.Sum(firstTimePayload)) - for { - if appstatus.Instance.Idle { - break - } - - time.Sleep(time.Duration(reloadSec) * time.Second) - actualPayload, err := loadFunc(source) - if err != nil { - logging.Errorf("Error reloading %s: %v", source, err) - continue - } - - newHash := fmt.Sprintf("%x", md5.Sum(actualPayload)) - if hash != newHash { - result, err := parseFromBytes(source, actualPayload) - if err != nil { - logging.Errorf("Error parsing reloaded %s: %v", source, err) - continue - } - - updateFunc(result) - logging.Infof("New resource from %s was loaded", source) - hash = newHash - } - } -} diff --git a/config/config.template.yaml b/config/config.template.yaml index 2e84ed266..403d5dcd5 100644 --- a/config/config.template.yaml +++ b/config/config.template.yaml @@ -4,17 +4,32 @@ server: port: 8001 name: event-us-01 #This parameter is required in cluster deployments. If not set - will be taken from os.Hostname() - auth: - - bd33c5fa-d69f-11ea-87d0-0242ac130003 - - c20765a0-d69f-15ea-82d0-0242ac130003 - s2s_auth: - - 5f15eba2-db58-11ea-87d0-0242ac130003 - - 62faa226-db58-11ea-87d0-0242ac130003 - auth_reload_sec: 60 #default value is 300. If auth or s2s_auth is http or file:/// source than it will be reloaded every auth_reload_sec + #might be http url of file source + #auth: https://source_of_tokens + #or might be full configured token object + #auth: + # - + # id: unique_tokenId + # client_secret: bd33c5fa-d69f-11ea-87d0-0242ac130003 + # server_secret: 5f15eba2-db58-11ea-87d0-0242ac130003 + # origins: + # - *abc.com + # - efg.com + # - + # id: unique_tokenId2 + # client_secret: 123jsy213c5fa-c20765a0-d69f003 + # - + # id: unique_tokenId3 + # server_secret: 231dasds-3211kb3rdf-412dkjnabf + auth: #plain strings - client_secrets + - bd33c5fa-d69f-11ea-87d0-0242ac130003 + - c20765a0-d69f-15ea-82d0-0242ac130003 + auth_reload_sec: 60 #default value is 30. If 'auth' is http or file:/// source than it will be reloaded every auth_reload_sec public_url: https://yourhost log: path: /home/eventnative/logs/ #omit this key to write log to stdout rotation_min: 60 #1440 (24 hours) default value + destinations_reload_sec: 60 #default value is 40. If 'destinations' is http or file:/// source than it will be reloaded every destinations_reload_sec geo.maxmind_path: https://statichost/GeoIP2-City.mmdb @@ -22,6 +37,8 @@ log: path: /home/eventnative/logs/events rotation_min: 5 +#might be http url or file source +#destinations: https://source_of_destinations destinations: redshift_one: type: redshift @@ -38,6 +55,7 @@ destinations: secret_access_key: secretabc123 bucket: my-bucket region: us-west-1 + folder: redshift_one #Optional. Specify this parameter if several destinations use one s3 bucket data_layout: mapping: - "/key1/key2 -> /key3" diff --git a/destinations/collections.go b/destinations/collections.go new file mode 100644 index 000000000..58b1fbcd8 --- /dev/null +++ b/destinations/collections.go @@ -0,0 +1,45 @@ +package destinations + +import "github.com/ksensehq/eventnative/events" + +//map["tokenId"]map["destination_name"]interface +//because 1 token id = ∞ storages +type TokenizedStorages map[string]map[string]events.StorageProxy + +//map["tokenId"]map["tokenId | destination_name"]interface +//because 1 token id = 1logger but ∞ event.queue +type TokenizedConsumers map[string]map[string]events.Consumer + +func (ts TokenizedStorages) Add(tokenId, name string, proxy events.StorageProxy) { + storageProxies, ok := ts[tokenId] + if !ok { + storageProxies = map[string]events.StorageProxy{} + ts[tokenId] = storageProxies + } + storageProxies[name] = proxy +} + +func (ts TokenizedStorages) AddAll(other TokenizedStorages) { + for tokenId, storages := range other { + for name, storage := range storages { + ts.Add(tokenId, name, storage) + } + } +} + +func (tc TokenizedConsumers) Add(tokenId, name string, proxy events.Consumer) { + consumers, ok := tc[tokenId] + if !ok { + consumers = map[string]events.Consumer{} + tc[tokenId] = consumers + } + consumers[name] = proxy +} + +func (tc TokenizedConsumers) AddAll(other TokenizedConsumers) { + for tokenId, consumers := range other { + for name, consumer := range consumers { + tc.Add(tokenId, name, consumer) + } + } +} diff --git a/destinations/parser.go b/destinations/parser.go new file mode 100644 index 000000000..072e3e72a --- /dev/null +++ b/destinations/parser.go @@ -0,0 +1,32 @@ +package destinations + +import ( + "encoding/json" + "github.com/google/martian/log" + "github.com/ksensehq/eventnative/resources" + "github.com/ksensehq/eventnative/storages" +) + +type Payload struct { + Destinations map[string]storages.DestinationConfig `json:"destinations,omitempty"` +} + +func parseFromBytes(b []byte) (map[string]storages.DestinationConfig, error) { + payload := &Payload{} + err := json.Unmarshal(b, &payload) + if err != nil { + return nil, err + } + + return payload.Destinations, nil +} + +func getHash(name string, destination storages.DestinationConfig) string { + b, err := json.Marshal(destination) + if err != nil { + log.Errorf("Error getting hash(marshalling) from [%s] destination: %v", name, err) + return "" + } + + return resources.GetHash(b) +} diff --git a/destinations/service.go b/destinations/service.go new file mode 100644 index 000000000..4f28d8160 --- /dev/null +++ b/destinations/service.go @@ -0,0 +1,299 @@ +package destinations + +import ( + "context" + "errors" + "fmt" + "github.com/hashicorp/go-multierror" + "github.com/ksensehq/eventnative/appconfig" + "github.com/ksensehq/eventnative/events" + "github.com/ksensehq/eventnative/logging" + "github.com/ksensehq/eventnative/resources" + "github.com/ksensehq/eventnative/storages" + "github.com/spf13/viper" + "strings" + "sync" + "time" +) + +const serviceName = "destinations" +const marshallingErrorMsg = `Error initializing destinations: wrong config format: each destination must contains one key and config as a value(see https://docs.eventnative.dev/configuration) e.g. +destinations: + custom_name: + type: redshift + ... +` + +//LoggerUsage is used for counting when logger isn't used +type LoggerUsage struct { + logger events.Consumer + usage int +} + +//Service is reloadable service of events destinations per token +type Service struct { + storageFactoryMethod func(ctx context.Context, name, logEventPath string, destination storages.DestinationConfig, monitorKeeper storages.MonitorKeeper) (events.StorageProxy, *events.PersistentQueue, error) + ctx context.Context + logEventPath string + monitorKeeper storages.MonitorKeeper + + unitsByName map[string]*Unit + loggersUsageByTokenId map[string]*LoggerUsage + + sync.RWMutex + consumersByTokenId TokenizedConsumers + storagesByTokenId TokenizedStorages +} + +//only for tests +func NewTestService(consumersByTokenId TokenizedConsumers, storagesByTokenId TokenizedStorages) *Service { + return &Service{ + consumersByTokenId: consumersByTokenId, + storagesByTokenId: storagesByTokenId, + } +} + +//NewService return loaded Service instance and call resources.Watcher() if destinations source is http url or file path +func NewService(ctx context.Context, destinations *viper.Viper, destinationsSource, logEventPath string, monitorKeeper storages.MonitorKeeper, + storageFactoryMethod func(ctx context.Context, name, logEventPath string, destination storages.DestinationConfig, monitorKeeper storages.MonitorKeeper) (events.StorageProxy, *events.PersistentQueue, error)) (*Service, error) { + service := &Service{ + storageFactoryMethod: storageFactoryMethod, + ctx: ctx, + logEventPath: logEventPath, + monitorKeeper: monitorKeeper, + + unitsByName: map[string]*Unit{}, + loggersUsageByTokenId: map[string]*LoggerUsage{}, + + consumersByTokenId: map[string]map[string]events.Consumer{}, + storagesByTokenId: map[string]map[string]events.StorageProxy{}, + } + + reloadSec := viper.GetInt("server.destinations_reload_sec") + if reloadSec == 0 { + return nil, errors.New("server.destinations_reload_sec can't be empty") + } + + if destinations != nil { + dc := map[string]storages.DestinationConfig{} + if err := destinations.Unmarshal(&dc); err != nil { + logging.Error(marshallingErrorMsg, err) + return service, nil + } + + service.init(dc) + + if len(service.unitsByName) == 0 { + logging.Errorf("Destinations are empty") + } + + } else if destinationsSource != "" { + if strings.HasPrefix(destinationsSource, "http://") || strings.HasPrefix(destinationsSource, "https://") { + resources.Watch(serviceName, destinationsSource, resources.LoadFromHttp, service.updateDestinations, time.Duration(reloadSec)*time.Second) + } else if strings.Contains(destinationsSource, "file://") { + resources.Watch(serviceName, strings.Replace(destinationsSource, "file://", "", 1), resources.LoadFromFile, service.updateDestinations, time.Duration(reloadSec)*time.Second) + } else if strings.HasPrefix(destinationsSource, "{") && strings.HasSuffix(destinationsSource, "}") { + service.updateDestinations([]byte(destinationsSource)) + } else { + return nil, errors.New("Unknown destination source: " + destinationsSource) + } + } + + return service, nil +} + +func (ds *Service) GetConsumers(tokenId string) (consumers []events.Consumer) { + ds.RLock() + defer ds.RUnlock() + for _, c := range ds.consumersByTokenId[tokenId] { + consumers = append(consumers, c) + } + return +} + +func (ds *Service) GetStorages(tokenId string) (storages []events.StorageProxy) { + ds.RLock() + defer ds.RUnlock() + for _, s := range ds.storagesByTokenId[tokenId] { + storages = append(storages, s) + } + return +} + +func (s *Service) updateDestinations(payload []byte) { + dc, err := parseFromBytes(payload) + if err != nil { + logging.Error(marshallingErrorMsg, err) + return + } + + s.init(dc) + + if len(s.unitsByName) == 0 { + logging.Errorf("Destinations are empty") + } +} + +//1. close and remove all destinations which don't exist in new config +//2. recreate/create changed/new destinations +func (s *Service) init(dc map[string]storages.DestinationConfig) { + StatusInstance.Reloading = true + + //close and remove non-existent (in new config) + toDelete := map[string]*Unit{} + for name, unit := range s.unitsByName { + _, ok := dc[name] + if !ok { + toDelete[name] = unit + } + } + if len(toDelete) > 0 { + s.Lock() + for name, unit := range toDelete { + s.remove(name, unit) + } + s.Unlock() + } + + // create or recreate + newConsumers := TokenizedConsumers{} + newStorages := TokenizedStorages{} + for name, d := range dc { + //common case + destination := d + + hash := getHash(name, destination) + unit, ok := s.unitsByName[name] + if ok { + if unit.hash == hash { + //destination wasn't changed + continue + } + //remove old (for recreation) + s.Lock() + s.remove(name, unit) + s.Unlock() + } + + //create new + newStorageProxy, eventQueue, err := s.storageFactoryMethod(s.ctx, name, s.logEventPath, destination, s.monitorKeeper) + if err != nil { + logging.Errorf("[%s] Error initializing destination of type %s: %v", name, destination.Type, err) + continue + } + + var tokenIds []string + //map token -> id + if len(destination.OnlyTokens) > 0 { + tokenIds = appconfig.Instance.AuthorizationService.GetAllIdsByToken(destination.OnlyTokens) + } else { + logging.Warnf("[%s] only_tokens wasn't provided. All tokens will be stored.", name) + tokenIds = appconfig.Instance.AuthorizationService.GetAllTokenIds() + } + + s.unitsByName[name] = &Unit{ + eventQueue: eventQueue, + storage: newStorageProxy, + tokenIds: tokenIds, + hash: hash, + } + + //create: + // 1 logger per token id + // 1 queue per destination id + //append: + // storage per token id + // consumers per client_secret and server_secret + for _, tokenId := range tokenIds { + if destination.Mode == storages.StreamMode { + newConsumers.Add(tokenId, name, eventQueue) + } else { + //get or create new logger + loggerUsage, ok := s.loggersUsageByTokenId[tokenId] + if !ok { + eventLogWriter, err := logging.NewWriter(logging.Config{ + LoggerName: "event-" + tokenId, + ServerName: appconfig.Instance.ServerName, + FileDir: s.logEventPath, + RotationMin: viper.GetInt64("log.rotation_min")}) + if err != nil { + logging.Errorf("[%s] Error creating tokenized logger: %v", name, err) + } else { + logger := events.NewAsyncLogger(eventLogWriter, viper.GetBool("log.show_in_server")) + loggerUsage = &LoggerUsage{logger: logger, usage: 0} + s.loggersUsageByTokenId[tokenId] = loggerUsage + } + } + + if loggerUsage != nil { + loggerUsage.usage += 1 + //2 destinations with only 1 logger can be under 1 tokenId + newConsumers.Add(tokenId, tokenId, loggerUsage.logger) + } + } + + newStorages.Add(tokenId, name, newStorageProxy) + } + } + + s.Lock() + s.consumersByTokenId.AddAll(newConsumers) + s.storagesByTokenId.AddAll(newStorages) + s.Unlock() + + StatusInstance.Reloading = false +} + +//remove destination from all collections and close it +func (s *Service) remove(name string, unit *Unit) { + //remove from all collections: queue or logger(if needed) + storage + for _, tokenId := range unit.tokenIds { + oldConsumers := s.consumersByTokenId[tokenId] + if unit.eventQueue != nil { + delete(oldConsumers, name) + } else { + //logger + loggerUsage := s.loggersUsageByTokenId[tokenId] + loggerUsage.usage -= 1 + if loggerUsage.usage == 0 { + delete(oldConsumers, tokenId) + delete(s.loggersUsageByTokenId, tokenId) + loggerUsage.logger.Close() + } + } + + if len(oldConsumers) == 0 { + delete(s.consumersByTokenId, tokenId) + } + + //storage + oldStorages := s.storagesByTokenId[tokenId] + delete(oldStorages, name) + if len(oldStorages) == 0 { + delete(s.storagesByTokenId, tokenId) + } + } + + if err := unit.Close(); err != nil { + logging.Errorf("[%s] Error closing destination unit: %v", name, err) + } + + delete(s.unitsByName, name) + logging.Infof("[%s] has been removed!", name) +} + +func (s *Service) Close() (multiErr error) { + for token, loggerUsage := range s.loggersUsageByTokenId { + if err := loggerUsage.logger.Close(); err != nil { + multiErr = multierror.Append(multiErr, fmt.Errorf("Error closing logger for token [%s]: %v", token, err)) + } + } + + for name, unit := range s.unitsByName { + if err := unit.Close(); err != nil { + multiErr = multierror.Append(multiErr, fmt.Errorf("[%s] Error closing destination unit: %v", name, err)) + } + } + + return +} diff --git a/destinations/service_test.go b/destinations/service_test.go new file mode 100644 index 000000000..7a308fa67 --- /dev/null +++ b/destinations/service_test.go @@ -0,0 +1,224 @@ +package destinations + +import ( + "context" + "github.com/ksensehq/eventnative/appconfig" + "github.com/ksensehq/eventnative/events" + "github.com/ksensehq/eventnative/storages" + "github.com/spf13/viper" + "github.com/stretchr/testify/require" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +type payloadHolder struct { + payload []byte +} + +type testProxyMock struct { +} + +func (tpm *testProxyMock) Get() (events.Storage, bool) { + return nil, false +} + +func (tpm *testProxyMock) Close() error { + return nil +} + +// 1. init +// 2. change +// 3. remove all +// 4. init again +// 5. remove all again +func TestServiceInit(t *testing.T) { + viper.Set("server.destinations_reload_sec", 1) + viper.Set("server.auth", []string{"token1", "token2", "token3", "token4"}) + appconfig.Init() + + initialDestinations := `{ + "destinations": { + "redshift_1": { + "type": "redshift", + "only_tokens": [ + "token1", + "token2" + ], + "datasource": { + "host": "host_redshift_1" + } + }, + "pg_1": { + "type": "postgres", + "only_tokens": [ + "token1", + "token3" + ], + "datasource": { + "host": "host_pg_1" + } + }, + "pg_2": { + "type": "postgres", + "mode": "stream", + "only_tokens": [ + "token3" + ], + "datasource": { + "host": "host_pg_2" + } + } + } +}` + payload := &payloadHolder{payload: []byte(initialDestinations)} + mockSourceServer := startTestServer(payload) + + service, err := NewService(context.Background(), nil, mockSourceServer.URL, "/tmp", nil, createTestStorage) + require.NoError(t, err) + require.NotNil(t, service) + + initialConfigAsserts(t, service) + //wasn't changed + time.Sleep(1 * time.Second) + initialConfigAsserts(t, service) + + //change + changedDestinations := `{ + "destinations": { + "pg_1": { + "type": "postgres", + "only_tokens": [ + "token1", + "token3", + "token4" + ], + "datasource": { + "host": "host_pg_1" + } + }, + "pg_2": { + "type": "postgres", + "mode": "stream", + "only_tokens": [ + "token3" + ], + "datasource": { + "host": "host_pg_2" + } + }, + "pg_3": { + "type": "postgres", + "mode": "stream", + "only_tokens": [ + "token4" + ], + "datasource": { + "host": "host_pg_3" + } + }, + "pg_4": { + "type": "postgres", + "only_tokens": [ + "token3" + ], + "datasource": { + "host": "host_pg_4" + } + }, + "pg_5": { + "type": "postgres", + "mode": "stream", + "only_tokens": [ + "token3" + ], + "datasource": { + "host": "host_pg_5" + } + } + } +}` + payload.payload = []byte(changedDestinations) + time.Sleep(1 * time.Second) + changedConfigAsserts(t, service) + + //delete all + emptyDestinations := `{}` + payload.payload = []byte(emptyDestinations) + time.Sleep(1 * time.Second) + emptyConfigAsserts(t, service) + + //init all again + payload.payload = []byte(initialDestinations) + time.Sleep(1 * time.Second) + initialConfigAsserts(t, service) + + //delete all one more time + payload.payload = []byte(emptyDestinations) + time.Sleep(1 * time.Second) + emptyConfigAsserts(t, service) +} + +func initialConfigAsserts(t *testing.T, service *Service) { + require.Equal(t, 3, len(service.storagesByTokenId)) + require.Equal(t, 3, len(service.consumersByTokenId)) + + require.Equal(t, 2, len(service.GetStorages(appconfig.Instance.AuthorizationService.GetTokenId("token1")))) + require.Equal(t, 1, len(service.GetConsumers(appconfig.Instance.AuthorizationService.GetTokenId("token1")))) + + require.Equal(t, 1, len(service.GetStorages(appconfig.Instance.AuthorizationService.GetTokenId("token2")))) + require.Equal(t, 1, len(service.GetConsumers(appconfig.Instance.AuthorizationService.GetTokenId("token2")))) + + require.Equal(t, 2, len(service.GetStorages(appconfig.Instance.AuthorizationService.GetTokenId("token3")))) + require.Equal(t, 2, len(service.GetConsumers(appconfig.Instance.AuthorizationService.GetTokenId("token3")))) +} + +func changedConfigAsserts(t *testing.T, service *Service) { + require.Equal(t, 3, len(service.storagesByTokenId)) + require.Equal(t, 3, len(service.consumersByTokenId)) + + require.Equal(t, 1, len(service.GetStorages(appconfig.Instance.AuthorizationService.GetTokenId("token1")))) + require.Equal(t, 1, len(service.GetConsumers(appconfig.Instance.AuthorizationService.GetTokenId("token1")))) + + require.Equal(t, 0, len(service.GetStorages(appconfig.Instance.AuthorizationService.GetTokenId("token2")))) + require.Equal(t, 0, len(service.GetConsumers(appconfig.Instance.AuthorizationService.GetTokenId("token2")))) + + require.Equal(t, 4, len(service.GetStorages(appconfig.Instance.AuthorizationService.GetTokenId("token3")))) + require.Equal(t, 3, len(service.GetConsumers(appconfig.Instance.AuthorizationService.GetTokenId("token3")))) + + require.Equal(t, 2, len(service.GetStorages(appconfig.Instance.AuthorizationService.GetTokenId("token4")))) + require.Equal(t, 2, len(service.GetConsumers(appconfig.Instance.AuthorizationService.GetTokenId("token4")))) +} + +func emptyConfigAsserts(t *testing.T, service *Service) { + require.Equal(t, 0, len(service.storagesByTokenId)) + require.Equal(t, 0, len(service.consumersByTokenId)) + + require.Equal(t, 0, len(service.GetStorages("token1"))) + require.Equal(t, 0, len(service.GetConsumers("token1"))) + + require.Equal(t, 0, len(service.GetStorages("token2"))) + require.Equal(t, 0, len(service.GetConsumers("token2"))) + + require.Equal(t, 0, len(service.GetStorages("token3"))) + require.Equal(t, 0, len(service.GetConsumers("token3"))) + + require.Equal(t, 0, len(service.GetStorages("token4"))) + require.Equal(t, 0, len(service.GetConsumers("token4"))) +} + +func startTestServer(ph *payloadHolder) *httptest.Server { + return httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write(ph.payload) + })) +} + +func createTestStorage(ctx context.Context, name, logEventPath string, destination storages.DestinationConfig, monitorKeeper storages.MonitorKeeper) (events.StorageProxy, *events.PersistentQueue, error) { + var eventQueue *events.PersistentQueue + if destination.Mode == storages.StreamMode { + eventQueue, _ = events.NewPersistentQueue(name, "/tmp") + } + return &testProxyMock{}, eventQueue, nil +} diff --git a/destinations/status.go b/destinations/status.go new file mode 100644 index 000000000..a0bc4968c --- /dev/null +++ b/destinations/status.go @@ -0,0 +1,9 @@ +package destinations + +var StatusInstance = &Status{Reloading: false} + +//Singleton struct for storing destinations reloading state. Uploader check this flag +//and don't upload batch files if Reloading = true +type Status struct { + Reloading bool +} diff --git a/destinations/unit.go b/destinations/unit.go new file mode 100644 index 000000000..32ddf0372 --- /dev/null +++ b/destinations/unit.go @@ -0,0 +1,30 @@ +package destinations + +import ( + "fmt" + "github.com/hashicorp/go-multierror" + "github.com/ksensehq/eventnative/events" +) + +//Unit holds storage bundle for closing at once +type Unit struct { + eventQueue *events.PersistentQueue + storage events.StorageProxy + + tokenIds []string + hash string +} + +//Close eventsQueue if exists and storage +func (u *Unit) Close() (multiErr error) { + if err := u.storage.Close(); err != nil { + multiErr = multierror.Append(multiErr, err) + } + + if u.eventQueue != nil { + if err := u.eventQueue.Close(); err != nil { + multiErr = multierror.Append(multiErr, fmt.Errorf("Error closing events queue: %v", err)) + } + } + return +} diff --git a/events/s2s_preprocessor.go b/events/api_preprocessor.go similarity index 76% rename from events/s2s_preprocessor.go rename to events/api_preprocessor.go index 5b69eed69..b6352b59a 100644 --- a/events/s2s_preprocessor.go +++ b/events/api_preprocessor.go @@ -9,14 +9,14 @@ import ( "net/http" ) -//S2SPreprocessor preprocess server 2 server integration events -type S2SPreprocessor struct { +//ApiPreprocessor preprocess server 2 server integration events +type ApiPreprocessor struct { geoResolver geo.Resolver uaResolver useragent.Resolver } -func NewS2SPreprocessor() Preprocessor { - return &S2SPreprocessor{ +func NewApiPreprocessor() Preprocessor { + return &ApiPreprocessor{ geoResolver: appconfig.Instance.GeoResolver, uaResolver: appconfig.Instance.UaResolver, } @@ -25,12 +25,12 @@ func NewS2SPreprocessor() Preprocessor { //Preprocess resolve geo from ip field or skip if geo.GeoDataKey field was provided //resolve useragent from uaKey or skip if useragent.ParsedUaKey field was provided //return same object -func (s2sp *S2SPreprocessor) Preprocess(fact Fact, r *http.Request) (Fact, error) { +func (ap *ApiPreprocessor) Preprocess(fact Fact, r *http.Request) (Fact, error) { if fact == nil { return nil, errors.New("Input fact can't be nil") } - fact["src"] = "s2s" + fact["src"] = "api" ip := extractIp(r) if ip != "" { fact[ipKey] = ip @@ -41,7 +41,7 @@ func (s2sp *S2SPreprocessor) Preprocess(fact Fact, r *http.Request) (Fact, error //geo.GeoDataKey node overwrite geo resolving if _, ok := deviceCtxObject[geo.GeoDataKey]; !ok { if ip, ok := deviceCtxObject["ip"]; ok { - geoData, err := s2sp.geoResolver.Resolve(ip.(string)) + geoData, err := ap.geoResolver.Resolve(ip.(string)) if err != nil { logging.Error(err) } @@ -54,7 +54,7 @@ func (s2sp *S2SPreprocessor) Preprocess(fact Fact, r *http.Request) (Fact, error if _, ok := deviceCtxObject[useragent.ParsedUaKey]; !ok { if ua, ok := deviceCtxObject[uaKey]; ok { if uaStr, ok := ua.(string); ok { - deviceCtxObject[useragent.ParsedUaKey] = s2sp.uaResolver.Resolve(uaStr) + deviceCtxObject[useragent.ParsedUaKey] = ap.uaResolver.Resolve(uaStr) } } } diff --git a/events/s2s_preprocessor_test.go b/events/api_preprocessor_test.go similarity index 89% rename from events/s2s_preprocessor_test.go rename to events/api_preprocessor_test.go index 8f2fc8208..b0360f0d2 100644 --- a/events/s2s_preprocessor_test.go +++ b/events/api_preprocessor_test.go @@ -8,7 +8,7 @@ import ( "testing" ) -func TestS2SPreprocess(t *testing.T) { +func TestApiPreprocess(t *testing.T) { geoDataMock := &geo.Data{ Country: "US", City: "New York", @@ -35,13 +35,13 @@ func TestS2SPreprocess(t *testing.T) { "Empty input object", Fact{}, &http.Request{Header: http.Header{}}, - Fact{"src": "s2s"}, + Fact{"src": "api"}, "", }, { "Process ok without device ctx", Fact{ - "event_origin": "s2s_test", + "event_origin": "api_test", "src": "123", "event_data": map[string]interface{}{"key1": "key2"}, "user": map[string]interface{}{"id": "123"}, @@ -49,10 +49,10 @@ func TestS2SPreprocess(t *testing.T) { &http.Request{Header: http.Header{"X-Forwarded-For": []string{"10.10.10.10"}}}, Fact{ "event_data": map[string]interface{}{"key1": "key2"}, - "event_origin": "s2s_test", + "event_origin": "api_test", "user": map[string]interface{}{"id": "123"}, "page_ctx": map[string]interface{}{"referer": "www.site.com"}, - "src": "s2s", + "src": "api", "source_ip": "10.10.10.10", }, "", @@ -60,7 +60,7 @@ func TestS2SPreprocess(t *testing.T) { { "Process ok with device ctx but without ip and ua", Fact{ - "event_origin": "s2s_test", + "event_origin": "api_test", "src": "123", "event_data": map[string]interface{}{"key1": "key2"}, "user": map[string]interface{}{"id": "123"}, @@ -73,10 +73,10 @@ func TestS2SPreprocess(t *testing.T) { "location": (*geo.Data)(nil), }, "event_data": map[string]interface{}{"key1": "key2"}, - "event_origin": "s2s_test", + "event_origin": "api_test", "user": map[string]interface{}{"id": "123"}, "page_ctx": map[string]interface{}{"referer": "www.site.com"}, - "src": "s2s", + "src": "api", "source_ip": "10.10.10.10", }, "", @@ -84,7 +84,7 @@ func TestS2SPreprocess(t *testing.T) { { "Process ok with device ctx with ip and ua", Fact{ - "event_origin": "s2s_test", + "event_origin": "api_test", "src": "123", "event_data": map[string]interface{}{"key1": "key2"}, "user": map[string]interface{}{"id": "123"}, @@ -97,10 +97,10 @@ func TestS2SPreprocess(t *testing.T) { "location": geoDataMock, }, "event_data": map[string]interface{}{"key1": "key2"}, - "event_origin": "s2s_test", + "event_origin": "api_test", "user": map[string]interface{}{"id": "123"}, "page_ctx": map[string]interface{}{"referer": "www.site.com"}, - "src": "s2s", + "src": "api", "source_ip": "10.10.10.10", }, "", @@ -108,7 +108,7 @@ func TestS2SPreprocess(t *testing.T) { { "Process ok with location and parsed ua", Fact{ - "event_origin": "s2s_test", + "event_origin": "api_test", "src": "123", "event_data": map[string]interface{}{"key1": "key2"}, "user": map[string]interface{}{"id": "123"}, @@ -124,10 +124,10 @@ func TestS2SPreprocess(t *testing.T) { "parsed_ua": map[string]interface{}{"custom_ua": "123"}, }, "event_data": map[string]interface{}{"key1": "key2"}, - "event_origin": "s2s_test", + "event_origin": "api_test", "user": map[string]interface{}{"id": "123"}, "page_ctx": map[string]interface{}{"referer": "www.site.com"}, - "src": "s2s", + "src": "api", "source_ip": "10.10.10.10", }, "", @@ -143,7 +143,7 @@ func TestS2SPreprocess(t *testing.T) { "billing": []string{"1", "2"}, "keys": map[string]interface{}{"key1": "key2"}, "weather": map[string]interface{}{"id": "123", "type": "good"}, - "src": "s2s", + "src": "api", "source_ip": "10.10.10.10", }, "", @@ -151,12 +151,12 @@ func TestS2SPreprocess(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s2sPreprocessor := &S2SPreprocessor{ + apiPreprocessor := &ApiPreprocessor{ geoResolver: geo.Mock{"20.20.20.20": geoDataMock}, uaResolver: useragent.Mock{}, } - actualFact, actualErr := s2sPreprocessor.Preprocess(tt.input, tt.inputReq) + actualFact, actualErr := apiPreprocessor.Preprocess(tt.input, tt.inputReq) if tt.expectedErr == "" { require.NoError(t, actualErr) } else { diff --git a/events/destination_service.go b/events/destination_service.go deleted file mode 100644 index 42da25634..000000000 --- a/events/destination_service.go +++ /dev/null @@ -1,37 +0,0 @@ -package events - -import "sync" - -//DestinationService is reloadable service of events destinations per token -type DestinationService struct { - sync.RWMutex - consumersByToken map[string][]Consumer - storagesByToken map[string][]StorageProxy -} - -func NewDestinationService(consumersByToken map[string][]Consumer, storagesByToken map[string][]StorageProxy) *DestinationService { - return &DestinationService{ - RWMutex: sync.RWMutex{}, - consumersByToken: consumersByToken, - storagesByToken: storagesByToken, - } -} - -func (ds *DestinationService) GetConsumers(token string) []Consumer { - ds.RLock() - defer ds.RUnlock() - return ds.consumersByToken[token] -} - -func (ds *DestinationService) GetStorages(token string) []StorageProxy { - ds.RLock() - defer ds.RUnlock() - return ds.storagesByToken[token] -} - -func (ds *DestinationService) Reload(consumersByToken map[string][]Consumer, storagesByToken map[string][]StorageProxy) { - ds.Lock() - defer ds.Unlock() - ds.storagesByToken = storagesByToken - ds.consumersByToken = consumersByToken -} diff --git a/events/c2s_preprocessor.go b/events/js_preprocessor.go similarity index 81% rename from events/c2s_preprocessor.go rename to events/js_preprocessor.go index c1bb463d1..4b2fd2a33 100644 --- a/events/c2s_preprocessor.go +++ b/events/js_preprocessor.go @@ -21,14 +21,14 @@ type Preprocessor interface { Preprocess(fact Fact, r *http.Request) (Fact, error) } -//C2SPreprocessor preprocess client 2 server integration events -type C2SPreprocessor struct { +//JsPreprocessor preprocess client 2 server integration events +type JsPreprocessor struct { geoResolver geo.Resolver uaResolver useragent.Resolver } -func NewC2SPreprocessor() Preprocessor { - return &C2SPreprocessor{ +func NewJsPreprocessor() Preprocessor { + return &JsPreprocessor{ geoResolver: appconfig.Instance.GeoResolver, uaResolver: appconfig.Instance.UaResolver, } @@ -38,7 +38,7 @@ func NewC2SPreprocessor() Preprocessor { //resolve useragent from uaKey //put data to eventnKey //return same object -func (c2sp *C2SPreprocessor) Preprocess(fact Fact, r *http.Request) (Fact, error) { +func (jp *JsPreprocessor) Preprocess(fact Fact, r *http.Request) (Fact, error) { if fact == nil { return nil, nilFactErr } @@ -58,7 +58,7 @@ func (c2sp *C2SPreprocessor) Preprocess(fact Fact, r *http.Request) (Fact, error return nil, fmt.Errorf("Unable to cast %s to object: %v", eventnKey, eventnObject) } - geoData, err := c2sp.geoResolver.Resolve(ip) + geoData, err := jp.geoResolver.Resolve(ip) if err != nil { logging.Error(err) } @@ -70,7 +70,7 @@ func (c2sp *C2SPreprocessor) Preprocess(fact Fact, r *http.Request) (Fact, error ua, ok := eventFact[uaKey] if ok { if uaStr, ok := ua.(string); ok { - eventFact[useragent.ParsedUaKey] = c2sp.uaResolver.Resolve(uaStr) + eventFact[useragent.ParsedUaKey] = jp.uaResolver.Resolve(uaStr) } } diff --git a/events/c2s_preprocessor_test.go b/events/js_preprocessor_test.go similarity index 96% rename from events/c2s_preprocessor_test.go rename to events/js_preprocessor_test.go index 2964c18a4..96002d1b7 100644 --- a/events/c2s_preprocessor_test.go +++ b/events/js_preprocessor_test.go @@ -8,7 +8,7 @@ import ( "testing" ) -func TestC2SPreprocess(t *testing.T) { +func TestJsPreprocess(t *testing.T) { geoDataMock := &geo.Data{ Country: "US", City: "New York", @@ -71,7 +71,7 @@ func TestC2SPreprocess(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c2sPreprocessor := &C2SPreprocessor{ + c2sPreprocessor := &JsPreprocessor{ geoResolver: geo.Mock{"10.10.10.10": geoDataMock}, uaResolver: useragent.Mock{}, } diff --git a/events/persistent_queue.go b/events/persistent_queue.go index f9fcd6064..f36c5137d 100644 --- a/events/persistent_queue.go +++ b/events/persistent_queue.go @@ -11,12 +11,14 @@ import ( const eventsPerPersistedFile = 2000 +var ErrQueueClosed = errors.New("queue is closed") + type QueuedFact struct { FactBytes []byte DequeuedTime time.Time } -// QueuedFactBuilder creates and returns a new events.Fact. +// QueuedFactBuilder creates and returns a new *events.QueuedFact (must be pointer). // This is used when we load a segment of the queue from disk. func QueuedFactBuilder() interface{} { return &QueuedFact{} @@ -46,7 +48,7 @@ func (pq *PersistentQueue) ConsumeTimed(f Fact, t time.Time) { return } - if err := pq.queue.Enqueue(QueuedFact{FactBytes: factBytes, DequeuedTime: t}); err != nil { + if err := pq.queue.Enqueue(&QueuedFact{FactBytes: factBytes, DequeuedTime: t}); err != nil { logSkippedEvent(f, fmt.Errorf("Error putting event fact bytes to the persistent queue: %v", err)) return } @@ -55,9 +57,13 @@ func (pq *PersistentQueue) ConsumeTimed(f Fact, t time.Time) { func (pq *PersistentQueue) DequeueBlock() (Fact, time.Time, error) { iface, err := pq.queue.DequeueBlock() if err != nil { + if err == dque.ErrQueueClosed { + err = ErrQueueClosed + } return nil, time.Time{}, err } - wrappedFact, ok := iface.(QueuedFact) + + wrappedFact, ok := iface.(*QueuedFact) if !ok || len(wrappedFact.FactBytes) == 0 { return nil, time.Time{}, errors.New("Dequeued object is not a QueuedFact instance or fact bytes is empty") } diff --git a/go.mod b/go.mod index 37734231b..d992c2072 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/gin-gonic/gin v1.6.3 github.com/golang/protobuf v1.4.2 // indirect github.com/google/go-cmp v0.5.1 // indirect + github.com/google/martian v2.1.0+incompatible github.com/google/uuid v1.1.1 github.com/gookit/color v1.3.1 github.com/hashicorp/go-multierror v1.1.0 diff --git a/handlers/event.go b/handlers/event.go index 59da44cc0..9b59d672f 100644 --- a/handlers/event.go +++ b/handlers/event.go @@ -2,6 +2,8 @@ package handlers import ( "github.com/gin-gonic/gin" + "github.com/ksensehq/eventnative/appconfig" + "github.com/ksensehq/eventnative/destinations" "github.com/ksensehq/eventnative/events" "github.com/ksensehq/eventnative/logging" "github.com/ksensehq/eventnative/middleware" @@ -13,12 +15,12 @@ const apiTokenKey = "api_key" //Accept all events type EventHandler struct { - destinationService *events.DestinationService + destinationService *destinations.Service preprocessor events.Preprocessor } //Accept all events according to token -func NewEventHandler(destinationService *events.DestinationService, preprocessor events.Preprocessor) (eventHandler *EventHandler) { +func NewEventHandler(destinationService *destinations.Service, preprocessor events.Preprocessor) (eventHandler *EventHandler) { return &EventHandler{ destinationService: destinationService, preprocessor: preprocessor, @@ -49,7 +51,9 @@ func (eh *EventHandler) Handler(c *gin.Context) { processed[apiTokenKey] = token processed[timestamp.Key] = timestamp.NowUTC() - consumers := eh.destinationService.GetConsumers(token) + tokenId := appconfig.Instance.AuthorizationService.GetTokenId(token) + + consumers := eh.destinationService.GetConsumers(tokenId) if len(consumers) == 0 { logging.Warnf("Unknown token[%s] request was received", token) } else { diff --git a/logfiles/uploader.go b/logfiles/uploader.go index e6db49a6b..6e6eb9b60 100644 --- a/logfiles/uploader.go +++ b/logfiles/uploader.go @@ -2,7 +2,7 @@ package logfiles import ( "github.com/ksensehq/eventnative/appstatus" - "github.com/ksensehq/eventnative/events" + "github.com/ksensehq/eventnative/destinations" "github.com/ksensehq/eventnative/logging" "io/ioutil" "os" @@ -26,10 +26,10 @@ type PeriodicUploader struct { uploadEvery time.Duration statusManager *statusManager - destinationService *events.DestinationService + destinationService *destinations.Service } -func NewUploader(logEventPath, fileMask string, filesBatchSize, uploadEveryS int, destinationService *events.DestinationService) (*PeriodicUploader, error) { +func NewUploader(logEventPath, fileMask string, filesBatchSize, uploadEveryS int, destinationService *destinations.Service) (*PeriodicUploader, error) { statusManager, err := newStatusManager(logEventPath) if err != nil { return nil, err @@ -53,6 +53,12 @@ func (u *PeriodicUploader) Start() { if appstatus.Instance.Idle { break } + + if destinations.StatusInstance.Reloading { + time.Sleep(2 * time.Second) + continue + } + files, err := filepath.Glob(u.fileMask) if err != nil { logging.Error("Error finding files by mask", u.fileMask, err) @@ -83,10 +89,10 @@ func (u *PeriodicUploader) Start() { continue } - token := regexResult[1] - storageProxies := u.destinationService.GetStorages(token) + tokenId := regexResult[1] + storageProxies := u.destinationService.GetStorages(tokenId) if len(storageProxies) == 0 { - logging.Warnf("Destination storages weren't found for token %s", token) + logging.Warnf("Destination storages weren't found for file [%s] and token [%s]", filePath, tokenId) continue } diff --git a/logging/writer.go b/logging/writer.go index 11da92968..c64aa659a 100644 --- a/logging/writer.go +++ b/logging/writer.go @@ -2,6 +2,7 @@ package logging import ( "fmt" + "github.com/google/martian/log" "gopkg.in/natefinch/lumberjack.v2" "io" "os" @@ -11,6 +12,10 @@ import ( const logFileMaxSizeMB = 100 +type WriterProxy struct { + lWriter *lumberjack.Logger +} + //Create stdout or file or mock writers func NewWriter(config Config) (io.WriteCloser, error) { if err := config.Validate(); err != nil { @@ -41,9 +46,23 @@ func newRollingWriter(config Config) io.WriteCloser { go func() { for { <-ticker.C - lWriter.Rotate() + if err := lWriter.Rotate(); err != nil { + log.Errorf("Error rotating log file: %v", err) + } } }() - return lWriter + return &WriterProxy{lWriter: lWriter} +} + +func (wp *WriterProxy) Write(p []byte) (int, error) { + return wp.lWriter.Write(p) +} + +func (wp *WriterProxy) Close() error { + if err := wp.lWriter.Rotate(); err != nil { + log.Errorf("Error rotating log file: %v", err) + } + + return wp.lWriter.Close() } diff --git a/main.go b/main.go index 322c0a7d4..3d03e890a 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( "github.com/gin-gonic/gin" "github.com/ksensehq/eventnative/appconfig" "github.com/ksensehq/eventnative/appstatus" + "github.com/ksensehq/eventnative/destinations" "github.com/ksensehq/eventnative/events" "github.com/ksensehq/eventnative/handlers" "github.com/ksensehq/eventnative/logfiles" @@ -31,6 +32,8 @@ const ( uploaderFileMask = "-event-*-20*.log" uploaderBatchSize = 50 uploaderLoadEveryS = 60 + + destinationsKey = "destinations" ) var ( @@ -85,6 +88,7 @@ func main() { signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP) go func() { <-c + logging.Info("* Service is shutting down.. *") telemetry.ServerStop() appstatus.Instance.Idle = true cancel() @@ -93,7 +97,8 @@ func main() { os.Exit(0) }() - destinationsViper := viper.Sub("destinations") + destinationsViper := viper.Sub(destinationsKey) + destinationsSource := viper.GetString(destinationsKey) //override with config from os env jsonConfig := viper.GetString("destinations_json") @@ -103,14 +108,15 @@ func main() { if err := envJsonViper.ReadConfig(bytes.NewBufferString(jsonConfig)); err != nil { logging.Error("Error reading/parsing json config from DESTINATIONS_JSON", err) } else { - destinationsViper = envJsonViper.Sub("destinations") + destinationsViper = envJsonViper.Sub(destinationsKey) + destinationsSource = envJsonViper.GetString(destinationsKey) } } - syncServiceType := viper.GetString("synchronization_service.type") - syncServiceEndpoint := viper.GetString("synchronization_service.endpoint") - connectionTimeoutSeconds := viper.GetUint("synchronization_service.connection_timeout_seconds") - monitorKeeper, err := storages.NewMonitorKeeper(syncServiceType, syncServiceEndpoint, connectionTimeoutSeconds) + monitorKeeper, err := storages.NewMonitorKeeper( + viper.GetString("synchronization_service.type"), + viper.GetString("synchronization_service.endpoint"), + viper.GetUint("synchronization_service.connection_timeout_seconds")) if err != nil { logging.Fatal("Failed to initiate monitor keeper ", err) } @@ -119,12 +125,11 @@ func main() { logEventPath := viper.GetString("log.path") //Create event destinations: - //per token - //storage - //consumer - batchStoragesByToken, streamingConsumersByToken := storages.Create(ctx, destinationsViper, logEventPath, monitorKeeper) - - destinationsService := events.NewDestinationService(streamingConsumersByToken, batchStoragesByToken) + destinationsService, err := destinations.NewService(ctx, destinationsViper, destinationsSource, logEventPath, monitorKeeper, storages.Create) + if err != nil { + logging.Fatal(err) + } + appconfig.Instance.ScheduleClosing(destinationsService) //Uploader must read event logger directory uploader, err := logfiles.NewUploader(logEventPath, appconfig.Instance.ServerName+uploaderFileMask, uploaderBatchSize, uploaderLoadEveryS, destinationsService) @@ -148,7 +153,7 @@ func main() { logging.Fatal(server.ListenAndServe()) } -func SetupRouter(destinations *events.DestinationService, adminToken string) *gin.Engine { +func SetupRouter(destinations *destinations.Service, adminToken string) *gin.Engine { gin.SetMode(gin.ReleaseMode) router := gin.New() //gin.Default() @@ -167,14 +172,14 @@ func SetupRouter(destinations *events.DestinationService, adminToken string) *gi router.GET("/s/:filename", staticHandler.Handler) router.GET("/t/:filename", staticHandler.Handler) - c2sEventHandler := handlers.NewEventHandler(destinations, events.NewC2SPreprocessor()).Handler - s2sEventHandler := handlers.NewEventHandler(destinations, events.NewS2SPreprocessor()).Handler + jsEventHandler := handlers.NewEventHandler(destinations, events.NewJsPreprocessor()).Handler + apiEventHandler := handlers.NewEventHandler(destinations, events.NewApiPreprocessor()).Handler adminTokenMiddleware := middleware.AdminToken{Token: adminToken} apiV1 := router.Group("/api/v1") { - apiV1.POST("/event", middleware.TokenAuth(c2sEventHandler, appconfig.Instance.AuthorizationService.GetC2SOrigins, "")) - apiV1.POST("/s2s/event", middleware.TokenAuth(s2sEventHandler, appconfig.Instance.AuthorizationService.GetS2SOrigins, "The token isn't a server token. Please use s2s integration token\n")) + apiV1.POST("/event", middleware.TokenAuth(jsEventHandler, appconfig.Instance.AuthorizationService.GetClientOrigins, "")) + apiV1.POST("/s2s/event", middleware.TokenAuth(apiEventHandler, appconfig.Instance.AuthorizationService.GetServerOrigins, "The token isn't a server token. Please use s2s integration token\n")) apiV1.POST("/test_connection", adminTokenMiddleware.AdminAuth(handlers.NewConnectionTestHandler().Handler, "Admin token does not match")) } diff --git a/main_test.go b/main_test.go index 84fdaef42..05244d402 100644 --- a/main_test.go +++ b/main_test.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "github.com/ksensehq/eventnative/destinations" "github.com/ksensehq/eventnative/events" "github.com/ksensehq/eventnative/logging" "github.com/ksensehq/eventnative/middleware" @@ -23,8 +24,7 @@ import ( func SetTestDefaultParams() { viper.Set("log.path", "") - viper.Set("server.auth", []string{"c2stoken"}) - viper.Set("server.s2s_auth", `[{"token":"s2stoken", "origins":["whiteorigin*"]}]`) + viper.Set("server.auth", `{"tokens":[{"id":"id1","client_secret":"c2stoken","server_secret":"s2stoken","origins":["whiteorigin*"]}]}`) } func TestApiEvent(t *testing.T) { @@ -52,7 +52,7 @@ func TestApiEvent(t *testing.T) { "Unauthorized s2s endpoint", "/api/v1/s2s/event?token=wrongtoken", "", - "test_data/s2s_event_input.json", + "test_data/api_event_input.json", "", http.StatusUnauthorized, "The token isn't a server token. Please use s2s integration token\n", @@ -70,7 +70,7 @@ func TestApiEvent(t *testing.T) { "Unauthorized s2s wrong origin", "/api/v1/s2s/event?token=s2stoken", "http://ksense.com", - "test_data/s2s_event_input.json", + "test_data/api_event_input.json", "", http.StatusUnauthorized, "", @@ -79,7 +79,7 @@ func TestApiEvent(t *testing.T) { { "C2S Api event consuming test", "/api/v1/event?token=c2stoken", - "", + "https://whiteorigin.com/", "test_data/event_input.json", "test_data/fact_output.json", http.StatusOK, @@ -89,8 +89,8 @@ func TestApiEvent(t *testing.T) { "S2S Api event consuming test", "/api/v1/s2s/event?token=s2stoken", "https://whiteorigin.com/", - "test_data/s2s_event_input.json", - "test_data/s2s_fact_output.json", + "test_data/api_event_input.json", + "test_data/api_fact_output.json", http.StatusOK, "", }, @@ -104,10 +104,10 @@ func TestApiEvent(t *testing.T) { defer appconfig.Instance.Close() inmemWriter := logging.InitInMemoryWriter() - router := SetupRouter(events.NewDestinationService(map[string][]events.Consumer{ - "c2stoken": {events.NewAsyncLogger(inmemWriter, false)}, - "s2stoken": {events.NewAsyncLogger(inmemWriter, false)}, - }, map[string][]events.StorageProxy{}), "") + router := SetupRouter(destinations.NewTestService( + map[string]map[string]events.Consumer{ + "id1": {"id1": events.NewAsyncLogger(inmemWriter, false)}, + }, map[string]map[string]events.StorageProxy{}), "") freezeTime := time.Date(2020, 06, 16, 23, 0, 0, 0, time.UTC) patch := monkey.Patch(time.Now, func() time.Time { return freezeTime }) diff --git a/middleware/token_auth.go b/middleware/token_auth.go index 46451c290..5ea98e7a4 100644 --- a/middleware/token_auth.go +++ b/middleware/token_auth.go @@ -13,7 +13,7 @@ const TokenName = "token" //TokenAuth check that provided token: //1. is valid -//2. exists in specific (c2s or s2s) config +//2. exists in specific (js or api) token config //3. origins equal func TokenAuth(main gin.HandlerFunc, isAllowedOriginsFunc func(string) ([]string, bool), errMsg string) gin.HandlerFunc { return func(c *gin.Context) { diff --git a/resources/watcher.go b/resources/watcher.go new file mode 100644 index 000000000..192f2beda --- /dev/null +++ b/resources/watcher.go @@ -0,0 +1,68 @@ +package resources + +import ( + "crypto/md5" + "fmt" + "github.com/ksensehq/eventnative/appstatus" + "github.com/ksensehq/eventnative/logging" + "time" +) + +type Watcher struct { + name string + hash string + source string + reloadEvery time.Duration + + loadFunc func(string) ([]byte, error) + consumer func([]byte) +} + +//First load source then run goroutine to reload source every 'reloadEvery' duration +//On every load check if content was changed => run consumer otherwise do nothing +func Watch(name, source string, loadFunc func(string) ([]byte, error), consumer func([]byte), reloadEvery time.Duration) { + w := &Watcher{ + name: name, + hash: "", + source: source, + loadFunc: loadFunc, + consumer: consumer, + reloadEvery: reloadEvery, + } + logging.Infof("Resource [%s] will be loaded every %d seconds", name, int(reloadEvery.Seconds())) + w.watch() +} + +func (w *Watcher) watch() { + w.download() + go func() { + for { + if appstatus.Instance.Idle { + break + } + + time.Sleep(w.reloadEvery) + + w.download() + } + }() +} + +func (w *Watcher) download() { + payload, err := w.loadFunc(w.source) + if err != nil { + logging.Errorf("Error reloading resource [%s]: %v", w.name, err) + return + } + + newHash := GetHash(payload) + if w.hash != newHash { + w.consumer(payload) + logging.Infof("New resource [%s] has been loaded", w.name) + w.hash = newHash + } +} + +func GetHash(payload []byte) string { + return fmt.Sprintf("%x", md5.Sum(payload)) +} diff --git a/storages/bigquery.go b/storages/bigquery.go index 12cdebd6b..a2c3889b8 100644 --- a/storages/bigquery.go +++ b/storages/bigquery.go @@ -14,8 +14,6 @@ import ( "time" ) -const bqStorageType = "BigQuery" - //Store files to google BigQuery in two modes: //batch: via google cloud storage in batch mode (1 file = 1 transaction) //stream: via events queue in stream mode (1 object = 1 transaction) @@ -25,6 +23,7 @@ type BigQuery struct { bqAdapter *adapters.BigQuery tableHelper *TableHelper schemaProcessor *schema.Processor + streamingWorker *StreamingWorker breakOnError bool } @@ -54,7 +53,7 @@ func NewBigQuery(ctx context.Context, name string, eventQueue *events.Persistent return nil, err } - tableHelper := NewTableHelper(bigQueryAdapter, monitorKeeper, bqStorageType) + tableHelper := NewTableHelper(bigQueryAdapter, monitorKeeper, BigQueryType) bq := &BigQuery{ name: name, @@ -65,7 +64,8 @@ func NewBigQuery(ctx context.Context, name string, eventQueue *events.Persistent breakOnError: breakOnError, } if streamMode { - newStreamingWorker(eventQueue, processor, bq).start() + bq.streamingWorker = newStreamingWorker(eventQueue, processor, bq) + bq.streamingWorker.start() } else { bq.startBatch() } @@ -164,7 +164,7 @@ func (bq *BigQuery) Name() string { } func (bq *BigQuery) Type() string { - return bqStorageType + return BigQueryType } func (bq *BigQuery) Close() (multiErr error) { @@ -175,5 +175,9 @@ func (bq *BigQuery) Close() (multiErr error) { multiErr = multierror.Append(multiErr, fmt.Errorf("[%s] Error closing BigQuery client: %v", bq.Name(), err)) } + if bq.streamingWorker != nil { + bq.streamingWorker.Close() + } + return } diff --git a/storages/clickhouse.go b/storages/clickhouse.go index c4e2316f6..11a47ea60 100644 --- a/storages/clickhouse.go +++ b/storages/clickhouse.go @@ -11,8 +11,6 @@ import ( "math/rand" ) -const clickHouseStorageType = "ClickHouse" - //Store files to ClickHouse in two modes: //batch: (1 file = 1 transaction) //stream: (1 object = 1 transaction) @@ -21,6 +19,7 @@ type ClickHouse struct { adapters []*adapters.ClickHouse tableHelpers []*TableHelper schemaProcessor *schema.Processor + streamingWorker *StreamingWorker breakOnError bool } @@ -52,7 +51,7 @@ func NewClickHouse(ctx context.Context, name string, eventQueue *events.Persiste } chAdapters = append(chAdapters, adapter) - tableHelpers = append(tableHelpers, NewTableHelper(adapter, monitorKeeper, clickHouseStorageType)) + tableHelpers = append(tableHelpers, NewTableHelper(adapter, monitorKeeper, ClickHouseType)) } ch := &ClickHouse{ @@ -74,7 +73,8 @@ func NewClickHouse(ctx context.Context, name string, eventQueue *events.Persiste } if streamMode { - newStreamingWorker(eventQueue, processor, ch).start() + ch.streamingWorker = newStreamingWorker(eventQueue, processor, ch) + ch.streamingWorker.start() } return ch, nil @@ -85,7 +85,7 @@ func (ch *ClickHouse) Name() string { } func (ch *ClickHouse) Type() string { - return clickHouseStorageType + return ClickHouseType } //Insert fact in ClickHouse @@ -154,6 +154,10 @@ func (ch *ClickHouse) Close() (multiErr error) { } } + if ch.streamingWorker != nil { + ch.streamingWorker.Close() + } + return multiErr } diff --git a/storages/factory.go b/storages/factory.go index afbf3ebe5..c5d1be27d 100644 --- a/storages/factory.go +++ b/storages/factory.go @@ -9,35 +9,34 @@ import ( "github.com/ksensehq/eventnative/events" "github.com/ksensehq/eventnative/logging" "github.com/ksensehq/eventnative/schema" - "github.com/spf13/viper" ) const ( defaultTableName = "events" - batchMode = "batch" - streamMode = "stream" + BatchMode = "batch" + StreamMode = "stream" ) var unknownDestination = errors.New("Unknown destination type") type DestinationConfig struct { - OnlyTokens []string `mapstructure:"only_tokens"` - Type string `mapstructure:"type"` - Mode string `mapstructure:"mode"` - DataLayout *DataLayout `mapstructure:"data_layout"` - BreakOnError bool `mapstructure:"break_on_error"` - - DataSource *adapters.DataSourceConfig `mapstructure:"datasource"` - S3 *adapters.S3Config `mapstructure:"s3"` - Google *adapters.GoogleConfig `mapstructure:"google"` - ClickHouse *adapters.ClickHouseConfig `mapstructure:"clickhouse"` - Snowflake *adapters.SnowflakeConfig `mapstructure:"snowflake"` + OnlyTokens []string `mapstructure:"only_tokens" json:"only_tokens,omitempty"` + Type string `mapstructure:"type" json:"type,omitempty"` + Mode string `mapstructure:"mode" json:"mode,omitempty"` + DataLayout *DataLayout `mapstructure:"data_layout" json:"data_layout,omitempty"` + BreakOnError bool `mapstructure:"break_on_error" json:"break_on_error,omitempty"` + + DataSource *adapters.DataSourceConfig `mapstructure:"datasource" json:"datasource,omitempty"` + S3 *adapters.S3Config `mapstructure:"s3" json:"s3,omitempty"` + Google *adapters.GoogleConfig `mapstructure:"google" json:"google,omitempty"` + ClickHouse *adapters.ClickHouseConfig `mapstructure:"clickhouse" json:"clickhouse,omitempty"` + Snowflake *adapters.SnowflakeConfig `mapstructure:"snowflake" json:"snowflake,omitempty"` } type DataLayout struct { - Mapping []string `mapstructure:"mapping"` - TableNameTemplate string `mapstructure:"table_name_template"` + Mapping []string `mapstructure:"mapping" json:"mapping,omitempty"` + TableNameTemplate string `mapstructure:"table_name_template" json:"table_name_template,omitempty"` } type Config struct { @@ -50,151 +49,92 @@ type Config struct { eventQueue *events.PersistentQueue } -//Create event storage proxies and event consumers (loggers or event-queues) from incoming config +//Create event storage proxy and event consumer (logger or event-queue) //Enrich incoming configs with default values if needed -func Create(ctx context.Context, destinations *viper.Viper, logEventPath string, monitorKeeper MonitorKeeper) (map[string][]events.StorageProxy, map[string][]events.Consumer) { - storageProxies := map[string][]events.StorageProxy{} - consumers := map[string][]events.Consumer{} - loggers := map[string]events.Consumer{} - - if destinations == nil { - return storageProxies, consumers +func Create(ctx context.Context, name, logEventPath string, destination DestinationConfig, monitorKeeper MonitorKeeper) (events.StorageProxy, *events.PersistentQueue, error) { + if destination.Type == "" { + destination.Type = name } - - dc := map[string]DestinationConfig{} - if err := destinations.Unmarshal(&dc); err != nil { - logging.Error("Error initializing destinations: wrong config format: each destination must contains one key and config as a value e.g. destinations:\n custom_name:\n type: redshift ...", err) - return storageProxies, consumers + if destination.Mode == "" { + destination.Mode = BatchMode } - for name, d := range dc { - //common case - destination := d - if destination.Type == "" { - destination.Type = name - } - if destination.Mode == "" { - destination.Mode = batchMode - } + var mapping []string + var tableName string + if destination.DataLayout != nil { + mapping = destination.DataLayout.Mapping - var mapping []string - var tableName string - if destination.DataLayout != nil { - mapping = destination.DataLayout.Mapping - - if destination.DataLayout.TableNameTemplate != "" { - tableName = destination.DataLayout.TableNameTemplate - } - } - - logging.Infof("[%s] Initializing destination of type: %s in mode: %s", name, destination.Type, destination.Mode) - - if tableName == "" { - tableName = defaultTableName - logging.Infof("[%s] uses default table name: %s", name, tableName) + if destination.DataLayout.TableNameTemplate != "" { + tableName = destination.DataLayout.TableNameTemplate } + } - if len(mapping) == 0 { - logging.Warnf("[%s] doesn't have mapping rules", name) - } else { - logging.Infof("[%s] Configured field mapping rules:", name) - for _, m := range mapping { - logging.Infof("[%s] %s", name, m) - } - } + logging.Infof("[%s] Initializing destination of type: %s in mode: %s", name, destination.Type, destination.Mode) - if destination.Mode != batchMode && destination.Mode != streamMode { - logError(name, destination.Type, fmt.Errorf("Unknown destination mode: %s. Available mode: [%s, %s]", destination.Mode, batchMode, streamMode)) - continue - } + if tableName == "" { + tableName = defaultTableName + logging.Infof("[%s] uses default table name: %s", name, tableName) + } - processor, err := schema.NewProcessor(tableName, mapping) - if err != nil { - logError(name, destination.Type, err) - continue + if len(mapping) == 0 { + logging.Warnf("[%s] doesn't have mapping rules", name) + } else { + logging.Infof("[%s] Configured field mapping rules:", name) + for _, m := range mapping { + logging.Infof("[%s] %s", name, m) } + } - var eventQueue *events.PersistentQueue - if destination.Mode == streamMode { - queueName := fmt.Sprintf("%s-%s", appconfig.Instance.ServerName, name) - eventQueue, err = events.NewPersistentQueue(queueName, logEventPath) - if err != nil { - logError(name, destination.Type, err) - continue - } - - appconfig.Instance.ScheduleClosing(eventQueue) - } + if destination.Mode != BatchMode && destination.Mode != StreamMode { + return nil, nil, fmt.Errorf("Unknown destination mode: %s. Available mode: [%s, %s]", destination.Mode, BatchMode, StreamMode) + } - storageConfig := &Config{ - ctx: ctx, - name: name, - destination: &destination, - processor: processor, - streamMode: destination.Mode == streamMode, - monitorKeeper: monitorKeeper, - eventQueue: eventQueue, - } + processor, err := schema.NewProcessor(tableName, mapping) + if err != nil { + return nil, nil, err + } - var storageProxy events.StorageProxy - switch destination.Type { - case "redshift": - storageProxy = newProxy(createRedshift, storageConfig) - case "bigquery": - storageProxy = newProxy(createBigQuery, storageConfig) - case "postgres": - storageProxy = newProxy(createPostgres, storageConfig) - case "clickhouse": - storageProxy = newProxy(createClickHouse, storageConfig) - case "s3": - storageProxy = newProxy(createS3, storageConfig) - case "snowflake": - storageProxy = newProxy(createSnowflake, storageConfig) - default: - logError(name, destination.Type, unknownDestination) - continue + var eventQueue *events.PersistentQueue + if destination.Mode == StreamMode { + queueName := fmt.Sprintf("%s-%s", appconfig.Instance.ServerName, name) + eventQueue, err = events.NewPersistentQueue(queueName, logEventPath) + if err != nil { + return nil, nil, err } + } - appconfig.Instance.ScheduleClosing(storageProxy) - - tokens := destination.OnlyTokens - if len(tokens) == 0 { - logging.Warnf("[%s] only_tokens wasn't provided. All tokens will be stored.", name) - for token := range appconfig.Instance.AuthorizationService.GetAllTokens() { - tokens = append(tokens, token) - } - } + storageConfig := &Config{ + ctx: ctx, + name: name, + destination: &destination, + processor: processor, + streamMode: destination.Mode == StreamMode, + monitorKeeper: monitorKeeper, + eventQueue: eventQueue, + } - //append: - //storage per token - //consumer(event queue or logger) per token - for _, token := range tokens { - if storageConfig.streamMode { - consumers[token] = append(consumers[token], eventQueue) - } else { - logger, ok := loggers[token] - if !ok { - eventLogWriter, err := logging.NewWriter(logging.Config{ - LoggerName: "event-" + token, - ServerName: appconfig.Instance.ServerName, - FileDir: logEventPath, - RotationMin: viper.GetInt64("log.rotation_min")}) - if err != nil { - appconfig.Instance.Close() - logging.Fatal(err) - } - logger = events.NewAsyncLogger(eventLogWriter, viper.GetBool("log.show_in_server")) - loggers[token] = logger - appconfig.Instance.ScheduleClosing(logger) - } - consumers[token] = append(consumers[token], logger) - } - - storageProxies[token] = append(storageProxies[token], storageProxy) + var storageProxy events.StorageProxy + switch destination.Type { + case RedshiftType: + storageProxy = newProxy(createRedshift, storageConfig) + case BigQueryType: + storageProxy = newProxy(createBigQuery, storageConfig) + case PostgresType: + storageProxy = newProxy(createPostgres, storageConfig) + case ClickHouseType: + storageProxy = newProxy(createClickHouse, storageConfig) + case S3Type: + storageProxy = newProxy(createS3, storageConfig) + case SnowflakeType: + storageProxy = newProxy(createSnowflake, storageConfig) + default: + if eventQueue != nil { + eventQueue.Close() } - + return nil, nil, unknownDestination } - return storageProxies, consumers + + return storageProxy, eventQueue, nil } func logError(destinationName, destinationType string, err error) { @@ -283,7 +223,7 @@ func createS3(config *Config) (events.Storage, error) { if config.eventQueue != nil { config.eventQueue.Close() } - return nil, fmt.Errorf("S3 destination doesn't support %s mode", streamMode) + return nil, fmt.Errorf("S3 destination doesn't support %s mode", StreamMode) } s3Config := config.destination.S3 if err := s3Config.Validate(); err != nil { diff --git a/storages/postgres.go b/storages/postgres.go index fd5340047..4ef657f55 100644 --- a/storages/postgres.go +++ b/storages/postgres.go @@ -9,8 +9,6 @@ import ( "github.com/ksensehq/eventnative/schema" ) -const postgresStorageType = "Postgres" - //Store files to Postgres in two modes: //batch: (1 file = 1 transaction) //stream: (1 object = 1 transaction) @@ -19,6 +17,7 @@ type Postgres struct { adapter *adapters.Postgres tableHelper *TableHelper schemaProcessor *schema.Processor + streamingWorker *StreamingWorker breakOnError bool } @@ -37,7 +36,7 @@ func NewPostgres(ctx context.Context, config *adapters.DataSourceConfig, process return nil, err } - tableHelper := NewTableHelper(adapter, monitorKeeper, postgresStorageType) + tableHelper := NewTableHelper(adapter, monitorKeeper, PostgresType) p := &Postgres{ name: storageName, @@ -48,7 +47,8 @@ func NewPostgres(ctx context.Context, config *adapters.DataSourceConfig, process } if streamMode { - newStreamingWorker(eventQueue, processor, p).start() + p.streamingWorker = newStreamingWorker(eventQueue, processor, p) + p.streamingWorker.start() } return p, nil @@ -115,6 +115,9 @@ func (p *Postgres) Close() error { return fmt.Errorf("[%s] Error closing postgres datasource: %v", p.Name(), err) } + if p.streamingWorker != nil { + p.streamingWorker.Close() + } return nil } @@ -123,5 +126,5 @@ func (p *Postgres) Name() string { } func (p *Postgres) Type() string { - return postgresStorageType + return PostgresType } diff --git a/storages/redshift.go b/storages/redshift.go index 570a5b014..55c8a4cf0 100644 --- a/storages/redshift.go +++ b/storages/redshift.go @@ -14,7 +14,6 @@ import ( ) const tableFileKeyDelimiter = "-table-" -const redshiftStorageType = "Redshift" //Store files to aws RedShift in two modes: //batch: via aws s3 in batch mode (1 file = 1 transaction) @@ -25,6 +24,7 @@ type AwsRedshift struct { redshiftAdapter *adapters.AwsRedshift tableHelper *TableHelper schemaProcessor *schema.Processor + streamingWorker *StreamingWorker breakOnError bool } @@ -52,7 +52,7 @@ func NewAwsRedshift(ctx context.Context, name string, eventQueue *events.Persist return nil, err } - tableHelper := NewTableHelper(redshiftAdapter, monitorKeeper, redshiftStorageType) + tableHelper := NewTableHelper(redshiftAdapter, monitorKeeper, RedshiftType) ar := &AwsRedshift{ name: name, @@ -64,7 +64,8 @@ func NewAwsRedshift(ctx context.Context, name string, eventQueue *events.Persist } if streamMode { - newStreamingWorker(eventQueue, processor, ar).start() + ar.streamingWorker = newStreamingWorker(eventQueue, processor, ar) + ar.streamingWorker.start() } else { ar.startBatch() } @@ -174,7 +175,7 @@ func (ar *AwsRedshift) Name() string { } func (ar *AwsRedshift) Type() string { - return redshiftStorageType + return RedshiftType } func (ar *AwsRedshift) Close() error { @@ -182,5 +183,9 @@ func (ar *AwsRedshift) Close() error { return fmt.Errorf("[%s] Error closing redshift datasource: %v", ar.Name(), err) } + if ar.streamingWorker != nil { + ar.streamingWorker.Close() + } + return nil } diff --git a/storages/s3.go b/storages/s3.go index 727b99c2c..dee30dc92 100644 --- a/storages/s3.go +++ b/storages/s3.go @@ -57,7 +57,7 @@ func (s3 *S3) Name() string { } func (s3 *S3) Type() string { - return "S3" + return S3Type } func (s3 *S3) Close() error { diff --git a/storages/snowflake.go b/storages/snowflake.go index d1a89a5f5..be0c11757 100644 --- a/storages/snowflake.go +++ b/storages/snowflake.go @@ -15,8 +15,6 @@ import ( "time" ) -const snowflakeStorageType = "Snowflake" - //Store files to Snowflake in two modes: //batch: via aws s3 (or gcp) in batch mode (1 file = 1 transaction) //stream: via events queue in stream mode (1 object = 1 transaction) @@ -26,6 +24,7 @@ type Snowflake struct { snowflakeAdapter *adapters.Snowflake tableHelper *TableHelper schemaProcessor *schema.Processor + streamingWorker *StreamingWorker breakOnError bool } @@ -56,7 +55,7 @@ func NewSnowflake(ctx context.Context, name string, eventQueue *events.Persisten return nil, err } - tableHelper := NewTableHelper(snowflakeAdapter, monitorKeeper, snowflakeStorageType) + tableHelper := NewTableHelper(snowflakeAdapter, monitorKeeper, SnowflakeType) snowflake := &Snowflake{ name: name, @@ -68,7 +67,8 @@ func NewSnowflake(ctx context.Context, name string, eventQueue *events.Persisten } if streamMode { - newStreamingWorker(eventQueue, processor, snowflake).start() + snowflake.streamingWorker = newStreamingWorker(eventQueue, processor, snowflake) + snowflake.streamingWorker.start() } else { snowflake.startBatch() } @@ -228,7 +228,7 @@ func (s *Snowflake) Name() string { } func (s *Snowflake) Type() string { - return snowflakeStorageType + return SnowflakeType } func (s *Snowflake) Close() (multiErr error) { @@ -240,5 +240,9 @@ func (s *Snowflake) Close() (multiErr error) { multiErr = multierror.Append(multiErr, fmt.Errorf("[%s] Error closing snowflake stage: %v", s.Name(), err)) } + if s.streamingWorker != nil { + s.streamingWorker.Close() + } + return } diff --git a/storages/streaming.go b/storages/streaming.go index 76f3ab40a..30cfd24c2 100644 --- a/storages/streaming.go +++ b/storages/streaming.go @@ -1,7 +1,6 @@ package storages import ( - "github.com/ksensehq/eventnative/appstatus" "github.com/ksensehq/eventnative/events" "github.com/ksensehq/eventnative/logging" "github.com/ksensehq/eventnative/schema" @@ -19,6 +18,8 @@ type StreamingWorker struct { eventQueue *events.PersistentQueue schemaProcessor *schema.Processor streamingStorage StreamingStorage + + closed bool } func newStreamingWorker(eventQueue *events.PersistentQueue, schemaProcessor *schema.Processor, streamingStorage StreamingStorage) *StreamingWorker { @@ -35,11 +36,15 @@ func newStreamingWorker(eventQueue *events.PersistentQueue, schemaProcessor *sch func (sw *StreamingWorker) start() { go func() { for { - if appstatus.Instance.Idle { + if sw.closed { break } + fact, dequeuedTime, err := sw.eventQueue.DequeueBlock() if err != nil { + if err == events.ErrQueueClosed && sw.closed { + continue + } logging.Errorf("[%s] Error reading event fact from queue: %v", sw.streamingStorage.Name(), err) continue } @@ -73,3 +78,8 @@ func (sw *StreamingWorker) start() { } }() } + +func (sw *StreamingWorker) Close() error { + sw.closed = true + return nil +} diff --git a/storages/types.go b/storages/types.go new file mode 100644 index 000000000..690354136 --- /dev/null +++ b/storages/types.go @@ -0,0 +1,10 @@ +package storages + +const ( + RedshiftType = "redshift" + BigQueryType = "bigquery" + PostgresType = "postgres" + ClickHouseType = "clickhouse" + S3Type = "s3" + SnowflakeType = "snowflake" +) diff --git a/test_data/s2s_event_input.json b/test_data/api_event_input.json similarity index 100% rename from test_data/s2s_event_input.json rename to test_data/api_event_input.json diff --git a/test_data/s2s_fact_output.json b/test_data/api_fact_output.json similarity index 96% rename from test_data/s2s_fact_output.json rename to test_data/api_fact_output.json index de4632322..afe0bc8f1 100644 --- a/test_data/s2s_fact_output.json +++ b/test_data/api_fact_output.json @@ -19,6 +19,6 @@ "referer": "", "url": "http://track-demo.ksense" }, - "src": "s2s", + "src": "api", "source_ip":"95.82.232.185" } \ No newline at end of file