From 1c7f96ef675fcd7eea0cc31966ce7b2196bbbc36 Mon Sep 17 00:00:00 2001 From: Ishan Arya Date: Mon, 31 Oct 2022 09:57:09 +0530 Subject: [PATCH] feat: add shield sink (#408) * feat: add shield sink * fix: use correct user asset keys * test: add sink test * docs: add README * docs: fix white spaces * test: add success test * chore: replace tabs with spaces * refactor: code review changes * feat: pass context to http request * fix: http req & res close * chore: improve errors * fix: remove urlPathEscape for host * test: fix payload error messages * feat: add grpc client * test: tests to use grpc client * chore: fix linting issue * chore: change http to grpc in docs * docs: add shield sink * test: refactor test * test: remove retry error test * test: add retry error test * fix: code review changes * feat: handle err when non User struct is sent * fix: skip a bached record when build fails * fix: error handling * chore: shield proto variable name change * fix: shield sink test * chore: remove empty line * test: remove test * test: add test --- docs/docs/reference/sinks.md | 16 +++ plugins/sinks/populate.go | 1 + plugins/sinks/shield/README.md | 19 ++++ plugins/sinks/shield/client.go | 81 +++++++++++++++ plugins/sinks/shield/shield.go | 7 ++ plugins/sinks/shield/sink.go | 167 ++++++++++++++++++++++++++++++ plugins/sinks/shield/sink_test.go | 164 +++++++++++++++++++++++++++++ 7 files changed, 455 insertions(+) create mode 100644 plugins/sinks/shield/README.md create mode 100644 plugins/sinks/shield/client.go create mode 100644 plugins/sinks/shield/shield.go create mode 100644 plugins/sinks/shield/sink.go create mode 100644 plugins/sinks/shield/sink_test.go diff --git a/docs/docs/reference/sinks.md b/docs/docs/reference/sinks.md index e1206d48f..827bd9da9 100644 --- a/docs/docs/reference/sinks.md +++ b/docs/docs/reference/sinks.md @@ -81,6 +81,22 @@ sinks: send_format_header: false ``` +## Shield + +`shield` + +Upsert users to shield service running at a given 'host'. Request will be sent via GRPC. + +```yaml +sinks: + name: shield + config: + host: shield.com + headers: + X-Shield-Email: meteor@odpf.io + X-Other-Header: value1, value2 +``` + _**Notes**_ Compass' Type requires certain fields to be sent, hence why `mapping` config is needed to map value from any of our metadata models to any field name when sending to Compass. Supports getting value from nested fields. diff --git a/plugins/sinks/populate.go b/plugins/sinks/populate.go index 65293ba07..8906e5796 100644 --- a/plugins/sinks/populate.go +++ b/plugins/sinks/populate.go @@ -6,5 +6,6 @@ import ( _ "github.com/odpf/meteor/plugins/sinks/file" _ "github.com/odpf/meteor/plugins/sinks/http" _ "github.com/odpf/meteor/plugins/sinks/kafka" + _ "github.com/odpf/meteor/plugins/sinks/shield" _ "github.com/odpf/meteor/plugins/sinks/stencil" ) diff --git a/plugins/sinks/shield/README.md b/plugins/sinks/shield/README.md new file mode 100644 index 000000000..bb5533204 --- /dev/null +++ b/plugins/sinks/shield/README.md @@ -0,0 +1,19 @@ +# Shield + +Shield is a cloud-native role-based authorization-aware reverse-proxy service that helps you manage the authorization of given resources. With Shield, you can create groups and manage members, manage policies of the resources. + +## Usage + +```yaml +sinks: + name: shield + config: + host: shield.com + headers: + X-Shield-Email: meteor@odpf.io + X-Other-Header: value1, value2 +``` + +## Contributing + +Refer to the contribution guidelines for information on contributing to this module. \ No newline at end of file diff --git a/plugins/sinks/shield/client.go b/plugins/sinks/shield/client.go new file mode 100644 index 000000000..1de2bf7a1 --- /dev/null +++ b/plugins/sinks/shield/client.go @@ -0,0 +1,81 @@ +package shield + +import ( + "context" + "fmt" + "time" + + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + sh "github.com/odpf/shield/proto/v1beta1" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const ( + GRPCMaxClientSendSize = 45 << 20 // 45MB + GRPCMaxClientRecvSize = 45 << 20 // 45MB + GRPCMaxRetry uint = 3 +) + +type Client interface { + sh.ShieldServiceClient + Connect(ctx context.Context, host string) error + Close() error +} + +func newClient() Client { + return &client{} +} + +type client struct { + sh.ShieldServiceClient + conn *grpc.ClientConn +} + +func (c *client) Connect(ctx context.Context, host string) (err error) { + dialTimeoutCtx, dialCancel := context.WithTimeout(ctx, time.Second*2) + defer dialCancel() + + if c.conn, err = c.createConnection(dialTimeoutCtx, host); err != nil { + err = fmt.Errorf("error creating connection: %w", err) + return + } + + c.ShieldServiceClient = sh.NewShieldServiceClient(c.conn) + + return +} + +func (c *client) Close() error { + return c.conn.Close() +} + +func (c *client) createConnection(ctx context.Context, host string) (*grpc.ClientConn, error) { + retryOpts := []grpc_retry.CallOption{ + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100 * time.Millisecond)), + grpc_retry.WithMax(GRPCMaxRetry), + } + var opts []grpc.DialOption + opts = append(opts, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + grpc.WithDefaultCallOptions( + grpc.MaxCallSendMsgSize(GRPCMaxClientSendSize), + grpc.MaxCallRecvMsgSize(GRPCMaxClientRecvSize), + ), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + grpc_retry.UnaryClientInterceptor(retryOpts...), + otelgrpc.UnaryClientInterceptor(), + grpc_prometheus.UnaryClientInterceptor, + )), + grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( + otelgrpc.StreamClientInterceptor(), + grpc_prometheus.StreamClientInterceptor, + )), + ) + + return grpc.DialContext(ctx, host, opts...) +} diff --git a/plugins/sinks/shield/shield.go b/plugins/sinks/shield/shield.go new file mode 100644 index 000000000..c42dc794d --- /dev/null +++ b/plugins/sinks/shield/shield.go @@ -0,0 +1,7 @@ +package shield + +type RequestPayload struct { + Name string `json:"name"` + Email string `json:"email"` + Metadata map[string]interface{} `json:"metadata"` +} diff --git a/plugins/sinks/shield/sink.go b/plugins/sinks/shield/sink.go new file mode 100644 index 000000000..e4eae5316 --- /dev/null +++ b/plugins/sinks/shield/sink.go @@ -0,0 +1,167 @@ +package shield + +import ( + "context" + _ "embed" + "fmt" + "strings" + + "github.com/MakeNowJust/heredoc" + "github.com/odpf/meteor/models" + assetsv1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" + "github.com/odpf/meteor/plugins" + "github.com/odpf/meteor/registry" + "github.com/odpf/salt/log" + sh "github.com/odpf/shield/proto/v1beta1" + "github.com/pkg/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +//go:embed README.md +var summary string + +type Config struct { + Host string `mapstructure:"host" validate:"required"` + Headers map[string]string `mapstructure:"headers"` +} + +var info = plugins.Info{ + Description: "Send user information to shield grpc service", + Summary: summary, + Tags: []string{"grpc", "sink"}, + SampleConfig: heredoc.Doc(` + # The hostname of the shield service + host: shield.com:5556 + # Additional headers send to shield, multiple headers value are separated by a comma + headers: + X-Shield-Email: meteor@odpf.io + X-Other-Header: value1, value2 + `), +} + +type Sink struct { + plugins.BasePlugin + client Client + config Config + logger log.Logger +} + +func New(c Client, logger log.Logger) plugins.Syncer { + s := &Sink{ + logger: logger, + client: c, + } + s.BasePlugin = plugins.NewBasePlugin(info, &s.config) + + return s +} + +func (s *Sink) Init(ctx context.Context, config plugins.Config) error { + if err := s.BasePlugin.Init(ctx, config); err != nil { + return err + } + + if err := s.client.Connect(ctx, s.config.Host); err != nil { + return fmt.Errorf("error connecting to host: %w", err) + } + + return nil +} + +func (s *Sink) Sink(ctx context.Context, batch []models.Record) error { + for _, record := range batch { + asset := record.Data() + s.logger.Info("sinking record to shield", "record", asset.GetUrn()) + + userRequestBody, err := s.buildUserRequestBody(asset) + if err != nil { + s.logger.Error("failed to build shield payload", "err", err, "record", asset.Name) + continue + } + + if err = s.send(ctx, userRequestBody); err != nil { + return errors.Wrap(err, "error sending data") + } + + s.logger.Info("successfully sinked record to shield", "record", asset.Name) + } + + return nil +} + +func (s *Sink) Close() (err error) { + return + //TODO: Connection closes even when some records are unpiblished + //TODO: return s.client.Close() +} + +func (s *Sink) send(ctx context.Context, userRequestBody *sh.UserRequestBody) error { + for hdrKey, hdrVal := range s.config.Headers { + hdrVals := strings.Split(hdrVal, ",") + for _, val := range hdrVals { + val = strings.TrimSpace(val) + md := metadata.New(map[string]string{hdrKey: val}) + ctx = metadata.NewOutgoingContext(ctx, md) + } + } + + _, err := s.client.UpdateUser(ctx, &sh.UpdateUserRequest{ + Id: userRequestBody.Email, + Body: userRequestBody, + }) + if err == nil { + return nil + } + + if e, ok := status.FromError(err); ok { + err = fmt.Errorf("shield returns code %d: %v", e.Code(), e.Message()) + switch e.Code() { + case codes.Unavailable: + return plugins.NewRetryError(err) + default: + return err + } + } else { + err = fmt.Errorf("not able to parse error returned %v", err) + } + + return err +} + +func (s *Sink) buildUserRequestBody(asset *assetsv1beta2.Asset) (*sh.UserRequestBody, error) { + data := asset.GetData() + + var user assetsv1beta2.User + err := data.UnmarshalTo(&user) + if err != nil { + return &sh.UserRequestBody{}, errors.Wrap(err, "not a User struct") + } + + if user.FullName == "" { + return &sh.UserRequestBody{}, errors.New("empty user name") + } + if user.Email == "" { + return &sh.UserRequestBody{}, errors.New("empty user email") + } + if user.Attributes == nil { + return &sh.UserRequestBody{}, errors.New("empty user attributes") + } + + requestBody := &sh.UserRequestBody{ + Name: user.FullName, + Email: user.Email, + Metadata: user.Attributes, + } + + return requestBody, nil +} + +func init() { + if err := registry.Sinks.Register("shield", func() plugins.Syncer { + return New(newClient(), plugins.GetLog()) + }); err != nil { + panic(err) + } +} diff --git a/plugins/sinks/shield/sink_test.go b/plugins/sinks/shield/sink_test.go new file mode 100644 index 000000000..39f60608d --- /dev/null +++ b/plugins/sinks/shield/sink_test.go @@ -0,0 +1,164 @@ +//go:build plugins +// +build plugins + +package shield_test + +import ( + "context" + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/odpf/meteor/models" + testUtils "github.com/odpf/meteor/test/utils" + "github.com/odpf/meteor/utils" + + v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" + "github.com/odpf/meteor/plugins" + shield "github.com/odpf/meteor/plugins/sinks/shield" + shieldProto "github.com/odpf/shield/proto/v1beta1" + "github.com/stretchr/testify/assert" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var ( + validConfig = map[string]interface{}{ + "host": "shield:80", + } + urnScope = "test-shield" +) + +func TestInit(t *testing.T) { + t.Run("should return error if config is invalid", func(t *testing.T) { + sink := shield.New(new(mockClient), testUtils.Logger) + err := sink.Init(context.TODO(), plugins.Config{RawConfig: map[string]interface{}{ + "host": "", + }}) + assert.ErrorAs(t, err, &plugins.InvalidConfigError{}) + }) + + t.Run("should not return error if config is valid", func(t *testing.T) { + var err error + ctx := context.TODO() + + client := new(mockClient) + client.On("Connect", ctx, validConfig["host"]).Return(nil) + defer client.AssertExpectations(t) + + sink := shield.New(client, testUtils.Logger) + err = sink.Init(ctx, plugins.Config{URNScope: urnScope, RawConfig: validConfig}) + assert.NoError(t, err) + }) +} + +func TestSink(t *testing.T) { + t.Run("should return error if shield host returns error", func(t *testing.T) { + ctx := context.TODO() + + client := new(mockClient) + client.On("Connect", ctx, "shield:80").Return(errors.New("failed to create connection")) + shieldSink := shield.New(client, testUtils.Logger) + + err := shieldSink.Init(ctx, plugins.Config{URNScope: urnScope, RawConfig: map[string]interface{}{ + "host": "shield:80", + }}) + require.Error(t, err) + assert.EqualError(t, err, "error connecting to host: failed to create connection") + }) + + t.Run("should return RetryError if shield returns certain status code", func(t *testing.T) { + + user, err := anypb.New(&v1beta2.User{ + Email: "user@odpf.com", + FullName: "john", + Attributes: utils.TryParseMapToProto(map[string]interface{}{ + "org_unit_path": "/", + }), + }) + require.NoError(t, err) + + data := &v1beta2.Asset{ + Data: user, + } + + ctx := context.TODO() + + client := new(mockClient) + client.On("Connect", ctx, "shield:80").Return(nil) + client.On("UpdateUser", ctx, mock.Anything, mock.Anything).Return(&shieldProto.UpdateUserResponse{}, status.Errorf(codes.Unavailable, "")) + shieldSink := shield.New(client, testUtils.Logger) + err = shieldSink.Init(ctx, plugins.Config{RawConfig: map[string]interface{}{ + "host": validConfig["host"], + }}) + if err != nil { + t.Fatal(err) + } + + err = shieldSink.Sink(ctx, []models.Record{models.NewRecord(data)}) + require.Error(t, err) + assert.True(t, errors.Is(err, plugins.RetryError{})) + + }) + + t.Run("should not return when valid payload is sent", func(t *testing.T) { + u := &v1beta2.User{ + FullName: "John Doe", + Email: "john.doe@odpf.com", + Attributes: utils.TryParseMapToProto(map[string]interface{}{ + "org_unit_path": "/", + "aliases": "doe.john@odpf.com,johndoe@odpf.com", + }), + } + user, _ := anypb.New(u) + data := &v1beta2.Asset{ + Data: user, + } + + ctx := context.TODO() + + client := new(mockClient) + client.On("Connect", ctx, "shield:80").Return(nil) + client.On("UpdateUser", ctx, mock.Anything, mock.Anything).Return(&shieldProto.UpdateUserResponse{}, nil) + + shieldSink := shield.New(client, testUtils.Logger) + err := shieldSink.Init(ctx, plugins.Config{RawConfig: map[string]interface{}{ + "host": validConfig["host"], + }}) + if err != nil { + t.Fatal(err) + } + + err = shieldSink.Sink(ctx, []models.Record{models.NewRecord(data)}) + assert.Equal(t, nil, err) + }) + +} + +type mockClient struct { + shieldProto.ShieldServiceClient + mock.Mock +} + +func (c *mockClient) Connect(ctx context.Context, host string) (err error) { + args := c.Called(ctx, host) + + return args.Error(0) +} + +func (c *mockClient) Close() error { + args := c.Called() + + return args.Error(0) +} + +func (c *mockClient) UpdateUser(ctx context.Context, in *shieldProto.UpdateUserRequest, opts ...grpc.CallOption) (*shieldProto.UpdateUserResponse, error) { + args := c.Called(ctx, in, opts) + + return args.Get(0).(*shieldProto.UpdateUserResponse), args.Error(1) +}