diff --git a/.schema/config.schema.json b/.schema/config.schema.json index fc641a3343..58163d5d73 100644 --- a/.schema/config.schema.json +++ b/.schema/config.schema.json @@ -1133,6 +1133,11 @@ "enabled": { "$ref": "#/definitions/handlerSwitch" }, + "key": { + "type": "string", + "title": "Cache Key", + "description": "Custom cache key per rule, i.e using templating '{{ print .Subject }}'. All templated properties must be in AuthenticationSession." + }, "ttl": { "type": "string", "pattern": "^[0-9]+(ns|us|ms|s|m|h)$", diff --git a/pipeline/mutate/mutator_hydrator.go b/pipeline/mutate/mutator_hydrator.go index 9430eefd2d..41150bff4f 100644 --- a/pipeline/mutate/mutator_hydrator.go +++ b/pipeline/mutate/mutator_hydrator.go @@ -27,6 +27,7 @@ import ( "fmt" "net/http" "net/url" + "strings" "time" "github.com/dgraph-io/ristretto" @@ -44,7 +45,6 @@ import ( const ( ErrMalformedResponseFromUpstreamAPI = "The call to an external API returned an invalid JSON object" - ErrMissingAPIURL = "Missing URL in mutator configuration" ErrInvalidAPIURL = "Invalid URL in mutator configuration" ErrNon200ResponseFromAPI = "The call to an external API returned a non-200 HTTP response" ErrInvalidCredentials = "Invalid credentials were provided in mutator configuration" @@ -54,10 +54,9 @@ const ( ) type MutatorHydrator struct { - c configuration.Provider - client *http.Client - d mutatorHydratorDependencies - + c configuration.Provider + client *http.Client + d mutatorHydratorDependencies hydrateCache *ristretto.Cache cacheTTL *time.Duration } @@ -85,8 +84,8 @@ type externalAPIConfig struct { type cacheConfig struct { Enabled bool `json:"enabled"` TTL string `json:"ttl"` - - ttl time.Duration + Key string `json:"key"` + ttl time.Duration } type MutatorHydratorConfig struct { @@ -102,73 +101,76 @@ func NewMutatorHydrator(c configuration.Provider, d mutatorHydratorDependencies) cache, _ := ristretto.NewCache(&ristretto.Config{ // This will hold about 1000 unique mutation responses. NumCounters: 10000, - // Allocate a max of 32MB + // Allocate a max of 32 MB MaxCost: 1 << 25, // This is a best-practice value. BufferItems: 64, }) - return &MutatorHydrator{c: c, d: d, client: httpx.NewResilientClientLatencyToleranceSmall(nil), hydrateCache: cache} + return &MutatorHydrator{c: c, d: d, + client: httpx.NewResilientClientLatencyToleranceSmall(nil), hydrateCache: cache} } func (a *MutatorHydrator) GetID() string { return "hydrator" } -func (a *MutatorHydrator) cacheKey(config *MutatorHydratorConfig, session string) string { - return fmt.Sprintf("%s|%x", config.Api.URL, md5.Sum([]byte(session))) +// cacheKey creates a (composite) cache key. +func (a *MutatorHydrator) cacheKey(keys ...string) string { + return fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(keys, "")))) } -func (a *MutatorHydrator) hydrateFromCache(config *MutatorHydratorConfig, session string) (*authn.AuthenticationSession, bool) { - if !config.Cache.Enabled { - return nil, false - } - - item, found := a.hydrateCache.Get(a.cacheKey(config, session)) +func (a *MutatorHydrator) hydrateFromCache(key string) (*authn.AuthenticationSession, bool) { + item, found := a.hydrateCache.Get(key) if !found { return nil, false } - return item.(*authn.AuthenticationSession).Copy(), true } func (a *MutatorHydrator) hydrateToCache(config *MutatorHydratorConfig, key string, session *authn.AuthenticationSession) { - if !config.Cache.Enabled { - return - } - - if a.hydrateCache.SetWithTTL(a.cacheKey(config, key), session.Copy(), 0, config.Cache.ttl) { + if a.hydrateCache.SetWithTTL(key, session.Copy(), 0, config.Cache.ttl) { a.d.Logger().Debug("Cache reject item") } } -func (a *MutatorHydrator) Mutate(r *http.Request, session *authn.AuthenticationSession, config json.RawMessage, _ pipeline.Rule) error { +func (a *MutatorHydrator) Mutate(r *http.Request, session *authn.AuthenticationSession, config json.RawMessage, p pipeline.Rule) error { cfg, err := a.Config(config) if err != nil { return err } - var b bytes.Buffer - if err := json.NewEncoder(&b).Encode(session); err != nil { - return errors.WithStack(err) - } + s := &bytes.Buffer{} + s.Grow(2048) - encodedSession := b.String() - if cacheSession, ok := a.hydrateFromCache(cfg, encodedSession); ok { - *session = *cacheSession - return nil + err = json.NewEncoder(s).Encode(session) + switch { + case err != nil: + return errors.WithStack(err) + case !cfg.Cache.Enabled: + case len(cfg.Cache.Key) > 0: + // Build a composite cache key with property from configuration. + if cacheSession, ok := a.hydrateFromCache(a.cacheKey( + cfg.Api.URL, cfg.Cache.Key, p.GetID(), session.Subject)); ok { + *session = *cacheSession + return nil + } + a.d.Logger().Debugf("Cache key %s in rule %s was not found. Falling back on default.", + cfg.Cache.Key, p.GetID()) + fallthrough + default: + if cacheSession, ok := a.hydrateFromCache(a.cacheKey(cfg.Api.URL, s.String())); ok { + *session = *cacheSession + return nil + } } - - if cfg.Api.URL == "" { - return errors.New(ErrMissingAPIURL) - } else if _, err := url.ParseRequestURI(cfg.Api.URL); err != nil { + if _, err = url.ParseRequestURI(cfg.Api.URL); err != nil { return errors.New(ErrInvalidAPIURL) } - req, err := http.NewRequest("POST", cfg.Api.URL, &b) + + req, err := http.NewRequest("POST", cfg.Api.URL, s) if err != nil { return errors.WithStack(err) - } - - if r.URL != nil { + } else if r.URL != nil { q := r.URL.Query() req.URL.RawQuery = q.Encode() } @@ -190,21 +192,23 @@ func (a *MutatorHydrator) Mutate(r *http.Request, session *authn.AuthenticationS giveUpAfter := time.Millisecond * 50 if len(cfg.Api.Retry.MaxDelay) > 0 { if d, err := time.ParseDuration(cfg.Api.Retry.MaxDelay); err != nil { - a.d.Logger().WithError(err).Warn("Unable to parse max_delay in the Hydrator Mutator, falling pack to default.") + a.d.Logger().WithError(err).Warn("Unable to parse max_delay in the Hydrator Mutator, " + + "falling pack to default.") } else { maxRetryDelay = d } } if len(cfg.Api.Retry.GiveUpAfter) > 0 { if d, err := time.ParseDuration(cfg.Api.Retry.GiveUpAfter); err != nil { - a.d.Logger().WithError(err).Warn("Unable to parse max_delay in the Hydrator Mutator, falling pack to default.") + a.d.Logger().WithError(err).Warn("Unable to parse max_delay in the Hydrator Mutator, " + + "falling pack to default.") } else { giveUpAfter = d } } - client.Transport = httpx.NewResilientRoundTripper(a.client.Transport, maxRetryDelay, giveUpAfter) } + sessionCacheKey := a.cacheKey(cfg.Api.URL, s.String()) res, err := client.Do(req.WithContext(r.Context())) if err != nil { @@ -217,25 +221,29 @@ func (a *MutatorHydrator) Mutate(r *http.Request, session *authn.AuthenticationS case http.StatusUnauthorized: if cfg.Api.Auth != nil { return errors.New(ErrInvalidCredentials) - } else { - return errors.New(ErrNoCredentialsProvided) } + return errors.New(ErrNoCredentialsProvided) default: return errors.New(ErrNon200ResponseFromAPI) } sessionFromUpstream := authn.AuthenticationSession{} - err = json.NewDecoder(res.Body).Decode(&sessionFromUpstream) - if err != nil { + + if err := json.NewDecoder(res.Body).Decode(&sessionFromUpstream); err != nil { return errors.WithStack(err) - } - if sessionFromUpstream.Subject != session.Subject { + } else if sessionFromUpstream.Subject != session.Subject { return errors.New(ErrMalformedResponseFromUpstreamAPI) } *session = sessionFromUpstream - a.hydrateToCache(cfg, encodedSession, session) - + switch { + case !cfg.Cache.Enabled: + case len(cfg.Cache.Key) > 0: + a.hydrateToCache(cfg, a.cacheKey( + cfg.Api.URL, cfg.Cache.Key, p.GetID(), session.Subject), session) + default: + a.hydrateToCache(cfg, sessionCacheKey, session) + } return nil } @@ -258,14 +266,12 @@ func (a *MutatorHydrator) Config(config json.RawMessage) (*MutatorHydratorConfig var err error c.Cache.ttl, err = time.ParseDuration(c.Cache.TTL) if err != nil { - a.d.Logger().WithError(err).WithField("ttl", c.Cache.TTL).Error("Unable to parse cache ttl in the Hydrator Mutator.") + a.d.Logger().WithError(err).WithField("ttl", + c.Cache.TTL).Error("Unable to parse cache ttl in the Hydrator Mutator.") return nil, NewErrMutatorMisconfigured(a, err) - } - - if c.Cache.ttl == 0 { + } else if c.Cache.ttl == 0 { c.Cache.ttl = time.Minute } } - return &c, nil } diff --git a/pipeline/mutate/mutator_hydrator_test.go b/pipeline/mutate/mutator_hydrator_test.go index 40760caba7..8c2cacfa5c 100644 --- a/pipeline/mutate/mutator_hydrator_test.go +++ b/pipeline/mutate/mutator_hydrator_test.go @@ -9,6 +9,7 @@ import ( "net/http/httptest" "net/url" "testing" + "time" "github.com/julienschmidt/httprouter" @@ -80,6 +81,39 @@ func defaultRouterSetup(actions ...func(a *authn.AuthenticationSession)) routerS } } +// routerAuthSessionCache is used with AuthenticationSession cache. We can't modify AuthenticationSession, +// thus we need to invoke an error if function is triggered twice. Given no error in tests we can assert +// AuthenticationSession is from cache. +func routerAuthSessionCache(actions ...func(a *authn.AuthenticationSession)) routerSetupFunction { + return func(t *testing.T) http.Handler { + router := httprouter.New() + + isSeen := false + router.POST("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + if isSeen { + w.WriteHeader(http.StatusInternalServerError) + return + } + isSeen = true + + body, err := ioutil.ReadAll(r.Body) + require.NoError(t, err) + var data authn.AuthenticationSession + err = json.Unmarshal(body, &data) + require.NoError(t, err) + for _, f := range actions { + f(&data) + } + jsonData, err := json.Marshal(data) + require.NoError(t, err) + w.WriteHeader(http.StatusOK) + _, err = w.Write(jsonData) + require.NoError(t, err) + }) + return router + } +} + func withBasicAuth(f routerSetupFunction, user, password string) routerSetupFunction { return func(t *testing.T) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -127,12 +161,17 @@ func configWithRetriesForMutator(giveUpAfter, retryDelay string) func(*httptest. } } +func configWithSpecialCacheKey(key string) func(*httptest.Server) json.RawMessage { + return func(s *httptest.Server) json.RawMessage { + return []byte(fmt.Sprintf(`{"api": {"url": "%s"}, "cache": {"enabled": true, "ttl": "30s", "key": "%s"}}`, s.URL, key)) + } +} + func TestMutatorHydrator(t *testing.T) { conf := internal.NewConfigurationWithDefaults() reg := internal.NewRegistry(conf) - a, err := reg.PipelineMutator("hydrator") - require.NoError(t, err) + a := mutate.NewMutatorHydrator(conf, reg) assert.Equal(t, "hydrator", a.GetID()) t.Run("method=mutate", func(t *testing.T) { @@ -148,6 +187,7 @@ func TestMutatorHydrator(t *testing.T) { sampleUserId := "user" sampleValidPassword := "passwd1" sampleNotValidPassword := "passwd7" + var testMap = map[string]struct { Setup func(*testing.T) http.Handler Session *authn.AuthenticationSession @@ -207,7 +247,7 @@ func TestMutatorHydrator(t *testing.T) { router := httprouter.New() router.POST("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { w.WriteHeader(http.StatusOK) - _, err = w.Write([]byte(`{}`)) + _, err := w.Write([]byte(`{}`)) require.NoError(t, err) }) return router @@ -267,6 +307,17 @@ func TestMutatorHydrator(t *testing.T) { Match: newAuthenticationSession(), Err: errors.New("mutator matching this route is misconfigured or disabled"), }, + "Empty API URL": { + Setup: defaultRouterSetup(), + Session: newAuthenticationSession(), + Rule: &rule.Rule{ID: "test-rule"}, + Config: func(s *httptest.Server) json.RawMessage { + return []byte(`{"api": {"url": ""}}`) + }, + Request: &http.Request{}, + Match: newAuthenticationSession(), + Err: errors.New("mutator matching this route is misconfigured or disabled"), + }, "Successful Basic Authentication": { Setup: withBasicAuth(defaultRouterSetup(setExtra(sampleKey, sampleValue)), sampleUserId, sampleValidPassword), Session: newAuthenticationSession(), @@ -334,7 +385,7 @@ func TestMutatorHydrator(t *testing.T) { assert.Equal(t, q["a"], []string{"b"}) assert.Equal(t, q["c"], []string{"&12"}) - _, err = w.Write([]byte(`{}`)) + _, err := w.Write([]byte(`{}`)) require.NoError(t, err) }) return router @@ -352,26 +403,128 @@ func TestMutatorHydrator(t *testing.T) { t.Run(testName, func(t *testing.T) { var router http.Handler var ts *httptest.Server + if specs.Setup != nil { router = specs.Setup(t) } ts = httptest.NewServer(router) defer ts.Close() - err := a.Mutate(specs.Request, specs.Session, specs.Config(ts), specs.Rule) - if specs.Err == nil { + if err := a.Mutate(specs.Request, specs.Session, specs.Config(ts), specs.Rule); specs.Err == nil { // Issuer must run without error require.NoError(t, err) } else { assert.EqualError(t, err, specs.Err.Error()) } - assert.Equal(t, specs.Match, specs.Session) }) } }) + t.Run("method=cache", func(t *testing.T) { + const ( + sampleSubject = "sub" + cacheTestTemplateSubject = "sub" + cacheTestKeyName = "cache" + cacheTestWriteSleep = 10 * time.Millisecond + ) + + var testMap = map[string]struct { + TemplateKey bool + CustomKey bool + SessionCache bool + Setup func(*testing.T) http.Handler + Session *authn.AuthenticationSession + Rule *rule.Rule + Config func(*httptest.Server) json.RawMessage + Request *http.Request + Match *authn.AuthenticationSession + Err error + }{ + "Custom Cache Key Cache Hit": { + CustomKey: true, + Setup: routerAuthSessionCache(), + Session: newAuthenticationSession(setSubject(sampleSubject)), + Rule: &rule.Rule{ID: "test-rule"}, + Config: configWithSpecialCacheKey(sampleSubject), + Request: &http.Request{}, + Match: newAuthenticationSession(setSubject(sampleSubject)), + Err: nil, + }, + "AuthenticationSession Key Cache Hit": { + SessionCache: true, + Setup: routerAuthSessionCache(), + Session: newAuthenticationSession(setSubject(sampleSubject)), + Rule: &rule.Rule{ID: "test-rule"}, + // An empty cache key will ensure default (AuthenticationSession) behavior is applied. + Config: configWithSpecialCacheKey(""), + Request: &http.Request{}, + Match: newAuthenticationSession(setSubject(sampleSubject)), + Err: nil, + }, + "Custom Cache Key Template Cache Hit": { + TemplateKey: true, + CustomKey: true, + Setup: routerAuthSessionCache(), + Session: newAuthenticationSession(setSubject(cacheTestTemplateSubject)), + Rule: &rule.Rule{ID: "test-rule"}, + Config: configWithSpecialCacheKey("{{ print .Subject }}"), + Request: &http.Request{}, + Match: newAuthenticationSession(setSubject(cacheTestTemplateSubject)), + Err: nil, + }, + } + + for testName, specs := range testMap { + t.Run(testName, func(t *testing.T) { + var router http.Handler + var ts *httptest.Server + + if specs.Setup != nil { + router = specs.Setup(t) + } + ts = httptest.NewServer(router) + defer ts.Close() + + if specs.SessionCache { + require.NoError(t, a.Mutate(specs.Request, specs.Session, specs.Config(ts), specs.Rule)) + } else { + specs.Session.Extra = make(map[string]interface{}) + specs.Session.Extra[cacheTestKeyName] = struct{}{} + require.NoError(t, a.Mutate(specs.Request, specs.Session, specs.Config(ts), specs.Rule)) + // Delete K/V-combination above. Must be served from the cache, + // K/V-combination present ensure session originates from cache. + delete(specs.Session.Extra, cacheTestKeyName) + } + // Cache entry is being written asynchronously. Obviously this here is not + // a good strategy, however, the alternative would be to replace the cache. + // See https://github.com/dgraph-io/ristretto/blob/9d4946d9b973c8e860ae42944e07f5bbe28a506b/cache_test.go#L17 + time.Sleep(cacheTestWriteSleep) + + if err := a.Mutate(specs.Request, specs.Session, specs.Config(ts), specs.Rule); specs.Err == nil { + // Issuer must run without error + require.NoError(t, err) + } else { + assert.EqualError(t, err, specs.Err.Error()) + } + + if specs.TemplateKey { + assert.Equal(t, specs.Session.Subject, cacheTestTemplateSubject) + } + + if specs.CustomKey { + // As specs.Session is served from cache we can't perform + // full equality assertion but assert if cache key is set. + assert.Contains(t, specs.Session.Extra, cacheTestKeyName) + } else { + assert.Equal(t, specs.Match, specs.Session) + } + }) + } + + }) + t.Run("method=validate", func(t *testing.T) { for k, testCase := range []struct { enabled bool