Skip to content

Commit

Permalink
Merge branch 'master' into PMF-22-destination-reloading
Browse files Browse the repository at this point in the history
# Conflicts:
#	adapters/google_cloud_storage.go
#	adapters/s3.go
#	main.go
#	main_test.go
  • Loading branch information
xtreding committed Oct 8, 2020
2 parents 511d5fd + acc1471 commit ef7bb9c
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 4 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Please see our extensive documentation [here](https://eventnative-docs.ksense.io
## Community
We are made for developers, by developers and would love to have you join our community.
* [Wiki](https://github.com/ksensehq/eventnative/wiki) - Check out our development wiki.
* [Slack](https://join.slack.com/t/eventnative/shared_invite/zt-gincgy2s-ZYwXXBjw_GIN1PhVzgaUNA) - Join our slack.
* [Slack](https://join.slack.com/t/eventnative/shared_invite/zt-ick4jl3k-Hj1SYVHCyJrZmyJAzaO~Uw) - Join our slack.
* [Email](mailto:[email protected]) - Send us an email.
* Submit a pull request!

Expand Down
9 changes: 9 additions & 0 deletions adapters/aws_redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type AwsRedshift struct {
s3Config *S3Config
}

type RedshiftConfig struct {
DbConfig *DataSourceConfig `json:"database"`
S3Config *S3Config `json:"s3"`
}

//NewAwsRedshift return configured AwsRedshift adapter instance
func NewAwsRedshift(ctx context.Context, dsConfig *DataSourceConfig, s3Config *S3Config) (*AwsRedshift, error) {
postgres, err := NewPostgres(ctx, dsConfig)
Expand Down Expand Up @@ -67,6 +72,10 @@ func (ar *AwsRedshift) CreateDbSchema(dbSchemaName string) error {
return createDbSchemaInTransaction(ar.dataSourceProxy.ctx, wrappedTx, dbSchemaName)
}

func (ar *AwsRedshift) Test() error {
return ar.dataSourceProxy.dataSource.Ping()
}

//Insert provided object in AwsRedshift in stream mode
func (ar *AwsRedshift) Insert(schema *schema.Table, valuesMap map[string]interface{}) error {
wrappedTx, err := ar.OpenTx()
Expand Down
5 changes: 5 additions & 0 deletions adapters/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func (bq *BigQuery) Copy(fileKey, tableName string) error {
return nil
}

func (bq *BigQuery) Test() error {
_, err := bq.client.Query("SELECT 1;").Read(context.Background())
return err
}

//Insert provided object in BigQuery in stream mode
func (bq *BigQuery) Insert(schema *schema.Table, valuesMap map[string]interface{}) error {
inserter := bq.client.Dataset(bq.config.Dataset).Table(schema.Name).Inserter()
Expand Down
4 changes: 4 additions & 0 deletions adapters/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,10 @@ func (ch *ClickHouse) PatchTableSchema(patchSchema *schema.Table) error {
return wrappedTx.tx.Commit()
}

func (ch *ClickHouse) Test() error {
return ch.dataSource.Ping()
}

//Insert provided object in ClickHouse in stream mode
func (ch *ClickHouse) Insert(schema *schema.Table, valuesMap map[string]interface{}) error {
wrappedTx, err := ch.OpenTx()
Expand Down
4 changes: 4 additions & 0 deletions adapters/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ func (p *Postgres) patchTableSchemaInTransaction(wrappedTx *Transaction, patchSc
return wrappedTx.tx.Commit()
}

func (p *Postgres) Test() error {
return p.dataSource.Ping()
}

//Insert provided object in postgres
func (p *Postgres) Insert(schema *schema.Table, valuesMap map[string]interface{}) error {
wrappedTx, err := p.OpenTx()
Expand Down
4 changes: 4 additions & 0 deletions adapters/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ func (s *Snowflake) Copy(wrappedTx *Transaction, fileKey, header, tableName stri
return err
}

func (s *Snowflake) Test() error {
return s.dataSource.Ping()
}

//Insert provided object in snowflake
func (s *Snowflake) Insert(schema *schema.Table, valuesMap map[string]interface{}) error {
wrappedTx, err := s.OpenTx()
Expand Down
133 changes: 133 additions & 0 deletions handlers/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package handlers

import (
"context"
"encoding/json"
"errors"
"github.com/gin-gonic/gin"
"github.com/ksensehq/eventnative/adapters"
"github.com/ksensehq/eventnative/logging"
"github.com/ksensehq/eventnative/middleware"
"net/http"
)

type ConnectionConfig struct {
DestinationType string `json:"type"`
ConnectionConfig interface{} `json:"config"`
}

type ConnectionTestHandler struct {
}

type RedshiftConfig struct {
DbConfig *adapters.DataSourceConfig `json:"database"`
S3Config *adapters.S3Config `json:"s3"`
}

type SnowflakeExternalConfig struct {
SnowflakeConfig adapters.SnowflakeConfig `json:"snowflake"`
S3Config adapters.S3Config `json:"s3"`
}

func testConnection(config ConnectionConfig) error {
switch config.DestinationType {
case "postgres":
var postgresConfig adapters.DataSourceConfig
body, err := json.Marshal(config.ConnectionConfig)
if err != nil {
return err
}
err = json.Unmarshal(body, &postgresConfig)
if err != nil {
return err
}
if err := postgresConfig.Validate(); err != nil {
return err
}
postgres, err := adapters.NewPostgres(context.Background(), &postgresConfig)
if err != nil {
return err
}
defer postgres.Close()
return postgres.Test()
case "clickhouse":
var chConfig adapters.ClickHouseConfig
body, err := json.Marshal(config.ConnectionConfig)
if err != nil {
return err
}
err = json.Unmarshal(body, &chConfig)
if err != nil {
return err
}
if err = chConfig.Validate(); err != nil {
return err
}
tableStatementFactory, err := adapters.NewTableStatementFactory(&chConfig)
if err != nil {
return err
}
nonNullFields := map[string]bool{"eventn_ctx_event_id": true, "_timestamp": true}
for i := range chConfig.Dsns {
var resultError error
resultError = nil
ch, err := adapters.NewClickHouse(context.Background(), chConfig.Dsns[i], chConfig.Database, chConfig.Cluster, chConfig.Tls, tableStatementFactory, nonNullFields)
if err != nil {
resultError = err
continue
}
resultError = ch.Test()
if err = ch.Close(); err != nil {
logging.Warn("Failed to close clickhouse datasource %s", err)
}
if resultError == nil {
return nil
}
}
return nil
case "redshift":
var rsConfig RedshiftConfig
body, err := json.Marshal(config.ConnectionConfig)
if err != nil {
return err
}
err = json.Unmarshal(body, &rsConfig)
if err != nil {
return err
}
if err = rsConfig.DbConfig.Validate(); err != nil {
return err
}
if rsConfig.S3Config != nil {
if err = rsConfig.S3Config.Validate(); err != nil {
return err
}
}
redshift, err := adapters.NewAwsRedshift(context.Background(), rsConfig.DbConfig, rsConfig.S3Config)
if err != nil {
return err
}
defer redshift.Close()
return redshift.Test()
default:
return errors.New("unsupported destination type " + config.DestinationType)
}
}

func NewConnectionTestHandler() *ConnectionTestHandler {
return &ConnectionTestHandler{}
}

func (h *ConnectionTestHandler) Handler(c *gin.Context) {
connectionConfig := ConnectionConfig{}
if err := c.BindJSON(&connectionConfig); err != nil {
c.JSON(http.StatusBadRequest, middleware.ErrorResponse{Message: "Failed to parse body", Error: err})
return
}
err := testConnection(connectionConfig)
if err != nil {
c.JSON(http.StatusBadRequest, middleware.ErrorResponse{Message: err.Error(), Error: err})
return
}
c.JSON(http.StatusOK, middleware.OkResponse{Status: "Connection established"})
}
8 changes: 6 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ func main() {
}
uploader.Start()

router := SetupRouter(destinationsService)
adminToken := viper.GetString("server.admin_token")
router := SetupRouter(destinationsService, adminToken)

telemetry.ServerStart()
logging.Info("Started server: " + appconfig.Instance.Authority)
Expand All @@ -152,7 +153,7 @@ func main() {
logging.Fatal(server.ListenAndServe())
}

func SetupRouter(destinations *destinations.Service) *gin.Engine {
func SetupRouter(destinations *destinations.Service, adminToken string) *gin.Engine {
gin.SetMode(gin.ReleaseMode)

router := gin.New() //gin.Default()
Expand All @@ -173,10 +174,13 @@ func SetupRouter(destinations *destinations.Service) *gin.Engine {

jsEventHandler := handlers.NewEventHandler(destinations, events.NewJsPreprocessor()).Handler
apiEventHandler := handlers.NewEventHandler(destinations, events.NewApiPreprocessor()).Handler

adminTokenMiddleware := middleware.AdminToken{Token: adminToken}
apiV1 := router.Group("/api/v1")
{
apiV1.POST("/event", middleware.TokenAuth(jsEventHandler, appconfig.Instance.AuthorizationService.GetClientOrigins, ""))
apiV1.POST("/s2s/event", middleware.TokenAuth(apiEventHandler, appconfig.Instance.AuthorizationService.GetServerOrigins, "The token isn't a server token. Please use s2s integration token\n"))
apiV1.POST("/test_connection", adminTokenMiddleware.AdminAuth(handlers.NewConnectionTestHandler().Handler, "Admin token does not match"))
}

return router
Expand Down
2 changes: 1 addition & 1 deletion main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestApiEvent(t *testing.T) {
router := SetupRouter(destinations.NewTestService(
map[string]map[string]events.Consumer{
"id1": {"id1": events.NewAsyncLogger(inmemWriter, false)},
}, map[string]map[string]events.StorageProxy{}))
}, map[string]map[string]events.StorageProxy{}), "")

freezeTime := time.Date(2020, 06, 16, 23, 0, 0, 0, time.UTC)
patch := monkey.Patch(time.Now, func() time.Time { return freezeTime })
Expand Down
25 changes: 25 additions & 0 deletions middleware/admin_auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package middleware

import (
"github.com/gin-gonic/gin"
"net/http"
)

type AdminToken struct {
Token string
}

func (a *AdminToken) AdminAuth(main gin.HandlerFunc, errMsg string) gin.HandlerFunc {
return func(c *gin.Context) {
if a.Token == "" {
c.JSON(http.StatusUnauthorized, ErrorResponse{Message: "admin_token must be configured"})
return
}
token := c.GetHeader("X-Admin-Token")
if token != a.Token {
c.JSON(http.StatusUnauthorized, ErrorResponse{Message: errMsg})
return
}
main(c)
}
}
10 changes: 10 additions & 0 deletions middleware/web_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package middleware

type OkResponse struct {
Status string `json:"status"`
}

type ErrorResponse struct {
Message string `json:"message"`
Error error `json:"error"`
}

0 comments on commit ef7bb9c

Please sign in to comment.