Skip to content

Commit

Permalink
feat: add shield sink (#408)
Browse files Browse the repository at this point in the history
* feat: add shield sink

* fix: use correct user asset keys

* test: add sink test

* docs: add README

* docs: fix white spaces

* test: add success test

* chore: replace tabs with spaces

* refactor: code review changes

* feat: pass context to http request

* fix: http req & res close

* chore: improve errors

* fix: remove urlPathEscape for host

* test: fix payload error messages

* feat: add grpc client

* test: tests to use grpc client

* chore: fix linting issue

* chore: change http to grpc in docs

* docs: add shield sink

* test: refactor test

* test: remove retry error test

* test: add retry error test

* fix: code review changes

* feat: handle err when non User struct is sent

* fix: skip a bached record when build fails

* fix: error handling

* chore: shield proto variable name change

* fix: shield sink test

* chore: remove empty line

* test: remove test

* test: add test
  • Loading branch information
ishanarya0 authored Oct 31, 2022
1 parent 3d3b635 commit 1c7f96e
Show file tree
Hide file tree
Showing 7 changed files with 455 additions and 0 deletions.
16 changes: 16 additions & 0 deletions docs/docs/reference/sinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ sinks:
send_format_header: false
```

## Shield

`shield`

Upsert users to shield service running at a given 'host'. Request will be sent via GRPC.

```yaml
sinks:
name: shield
config:
host: shield.com
headers:
X-Shield-Email: [email protected]
X-Other-Header: value1, value2
```

_**Notes**_

Compass' Type requires certain fields to be sent, hence why `mapping` config is needed to map value from any of our metadata models to any field name when sending to Compass. Supports getting value from nested fields.
1 change: 1 addition & 0 deletions plugins/sinks/populate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ import (
_ "github.com/odpf/meteor/plugins/sinks/file"
_ "github.com/odpf/meteor/plugins/sinks/http"
_ "github.com/odpf/meteor/plugins/sinks/kafka"
_ "github.com/odpf/meteor/plugins/sinks/shield"
_ "github.com/odpf/meteor/plugins/sinks/stencil"
)
19 changes: 19 additions & 0 deletions plugins/sinks/shield/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Shield

Shield is a cloud-native role-based authorization-aware reverse-proxy service that helps you manage the authorization of given resources. With Shield, you can create groups and manage members, manage policies of the resources.

## Usage

```yaml
sinks:
name: shield
config:
host: shield.com
headers:
X-Shield-Email: [email protected]
X-Other-Header: value1, value2
```
## Contributing
Refer to the contribution guidelines for information on contributing to this module.
81 changes: 81 additions & 0 deletions plugins/sinks/shield/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package shield

import (
"context"
"fmt"
"time"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
sh "github.com/odpf/shield/proto/v1beta1"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
GRPCMaxClientSendSize = 45 << 20 // 45MB
GRPCMaxClientRecvSize = 45 << 20 // 45MB
GRPCMaxRetry uint = 3
)

type Client interface {
sh.ShieldServiceClient
Connect(ctx context.Context, host string) error
Close() error
}

func newClient() Client {
return &client{}
}

type client struct {
sh.ShieldServiceClient
conn *grpc.ClientConn
}

func (c *client) Connect(ctx context.Context, host string) (err error) {
dialTimeoutCtx, dialCancel := context.WithTimeout(ctx, time.Second*2)
defer dialCancel()

if c.conn, err = c.createConnection(dialTimeoutCtx, host); err != nil {
err = fmt.Errorf("error creating connection: %w", err)
return
}

c.ShieldServiceClient = sh.NewShieldServiceClient(c.conn)

return
}

func (c *client) Close() error {
return c.conn.Close()
}

func (c *client) createConnection(ctx context.Context, host string) (*grpc.ClientConn, error) {
retryOpts := []grpc_retry.CallOption{
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100 * time.Millisecond)),
grpc_retry.WithMax(GRPCMaxRetry),
}
var opts []grpc.DialOption
opts = append(opts,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(GRPCMaxClientSendSize),
grpc.MaxCallRecvMsgSize(GRPCMaxClientRecvSize),
),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(retryOpts...),
otelgrpc.UnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
)),
)

return grpc.DialContext(ctx, host, opts...)
}
7 changes: 7 additions & 0 deletions plugins/sinks/shield/shield.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package shield

type RequestPayload struct {
Name string `json:"name"`
Email string `json:"email"`
Metadata map[string]interface{} `json:"metadata"`
}
167 changes: 167 additions & 0 deletions plugins/sinks/shield/sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package shield

import (
"context"
_ "embed"
"fmt"
"strings"

"github.com/MakeNowJust/heredoc"
"github.com/odpf/meteor/models"
assetsv1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/registry"
"github.com/odpf/salt/log"
sh "github.com/odpf/shield/proto/v1beta1"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

//go:embed README.md
var summary string

type Config struct {
Host string `mapstructure:"host" validate:"required"`
Headers map[string]string `mapstructure:"headers"`
}

var info = plugins.Info{
Description: "Send user information to shield grpc service",
Summary: summary,
Tags: []string{"grpc", "sink"},
SampleConfig: heredoc.Doc(`
# The hostname of the shield service
host: shield.com:5556
# Additional headers send to shield, multiple headers value are separated by a comma
headers:
X-Shield-Email: [email protected]
X-Other-Header: value1, value2
`),
}

type Sink struct {
plugins.BasePlugin
client Client
config Config
logger log.Logger
}

func New(c Client, logger log.Logger) plugins.Syncer {
s := &Sink{
logger: logger,
client: c,
}
s.BasePlugin = plugins.NewBasePlugin(info, &s.config)

return s
}

func (s *Sink) Init(ctx context.Context, config plugins.Config) error {
if err := s.BasePlugin.Init(ctx, config); err != nil {
return err
}

if err := s.client.Connect(ctx, s.config.Host); err != nil {
return fmt.Errorf("error connecting to host: %w", err)
}

return nil
}

func (s *Sink) Sink(ctx context.Context, batch []models.Record) error {
for _, record := range batch {
asset := record.Data()
s.logger.Info("sinking record to shield", "record", asset.GetUrn())

userRequestBody, err := s.buildUserRequestBody(asset)
if err != nil {
s.logger.Error("failed to build shield payload", "err", err, "record", asset.Name)
continue
}

if err = s.send(ctx, userRequestBody); err != nil {
return errors.Wrap(err, "error sending data")
}

s.logger.Info("successfully sinked record to shield", "record", asset.Name)
}

return nil
}

func (s *Sink) Close() (err error) {
return
//TODO: Connection closes even when some records are unpiblished
//TODO: return s.client.Close()
}

func (s *Sink) send(ctx context.Context, userRequestBody *sh.UserRequestBody) error {
for hdrKey, hdrVal := range s.config.Headers {
hdrVals := strings.Split(hdrVal, ",")
for _, val := range hdrVals {
val = strings.TrimSpace(val)
md := metadata.New(map[string]string{hdrKey: val})
ctx = metadata.NewOutgoingContext(ctx, md)
}
}

_, err := s.client.UpdateUser(ctx, &sh.UpdateUserRequest{
Id: userRequestBody.Email,
Body: userRequestBody,
})
if err == nil {
return nil
}

if e, ok := status.FromError(err); ok {
err = fmt.Errorf("shield returns code %d: %v", e.Code(), e.Message())
switch e.Code() {
case codes.Unavailable:
return plugins.NewRetryError(err)
default:
return err
}
} else {
err = fmt.Errorf("not able to parse error returned %v", err)
}

return err
}

func (s *Sink) buildUserRequestBody(asset *assetsv1beta2.Asset) (*sh.UserRequestBody, error) {
data := asset.GetData()

var user assetsv1beta2.User
err := data.UnmarshalTo(&user)
if err != nil {
return &sh.UserRequestBody{}, errors.Wrap(err, "not a User struct")
}

if user.FullName == "" {
return &sh.UserRequestBody{}, errors.New("empty user name")
}
if user.Email == "" {
return &sh.UserRequestBody{}, errors.New("empty user email")
}
if user.Attributes == nil {
return &sh.UserRequestBody{}, errors.New("empty user attributes")
}

requestBody := &sh.UserRequestBody{
Name: user.FullName,
Email: user.Email,
Metadata: user.Attributes,
}

return requestBody, nil
}

func init() {
if err := registry.Sinks.Register("shield", func() plugins.Syncer {
return New(newClient(), plugins.GetLog())
}); err != nil {
panic(err)
}
}
Loading

0 comments on commit 1c7f96e

Please sign in to comment.