Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Composite configurable mutator cache key per rule. #885

Draft
wants to merge 24 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .schema/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,11 @@
"enabled": {
"$ref": "#/definitions/handlerSwitch"
},
"key": {
"type": "string",
"title": "Cache key",
"description": "Define custom cache key, i.e '{{ print .Subject }}'. All properties must be part of AuthenticationSession."
anderslauri marked this conversation as resolved.
Show resolved Hide resolved
},
"ttl": {
"type": "string",
"pattern": "^[0-9]+(ns|us|ms|s|m|h)$",
Expand Down
129 changes: 75 additions & 54 deletions pipeline/mutate/mutator_hydrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"fmt"
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/dgraph-io/ristretto"
Expand All @@ -51,13 +53,14 @@ const (
ErrNoCredentialsProvided = "No credentials were provided in mutator configuration"
contentTypeHeaderKey = "Content-Type"
contentTypeJSONHeaderValue = "application/json"
poolBufCapacityBytes = 4096
)

type MutatorHydrator struct {
c configuration.Provider
client *http.Client
d mutatorHydratorDependencies

c configuration.Provider
client *http.Client
d mutatorHydratorDependencies
p sync.Pool
hydrateCache *ristretto.Cache
cacheTTL *time.Duration
}
Expand Down Expand Up @@ -85,8 +88,8 @@ type externalAPIConfig struct {
type cacheConfig struct {
Enabled bool `json:"enabled"`
TTL string `json:"ttl"`

ttl time.Duration
Key string `json:"key"`
ttl time.Duration
}

type MutatorHydratorConfig struct {
Expand All @@ -107,68 +110,83 @@ func NewMutatorHydrator(c configuration.Provider, d mutatorHydratorDependencies)
// This is a best-practice value.
BufferItems: 64,
})
return &MutatorHydrator{c: c, d: d, client: httpx.NewResilientClientLatencyToleranceSmall(nil), hydrateCache: cache}
return &MutatorHydrator{c: c, d: d,
p: sync.Pool{
anderslauri marked this conversation as resolved.
Show resolved Hide resolved
New: func() interface{} {
b := &bytes.Buffer{}
b.Grow(poolBufCapacityBytes)
return b
},
},
client: httpx.NewResilientClientLatencyToleranceSmall(nil), hydrateCache: cache}
}

func (a *MutatorHydrator) GetID() string {
return "hydrator"
}

func (a *MutatorHydrator) cacheKey(config *MutatorHydratorConfig, session string) string {
return fmt.Sprintf("%s|%x", config.Api.URL, md5.Sum([]byte(session)))
// cacheKey creates a (composite) cache key.
func (a *MutatorHydrator) cacheKey(keys ...string) string {
return fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(keys, ""))))
}

func (a *MutatorHydrator) hydrateFromCache(config *MutatorHydratorConfig, session string) (*authn.AuthenticationSession, bool) {
if !config.Cache.Enabled {
return nil, false
}

item, found := a.hydrateCache.Get(a.cacheKey(config, session))
func (a *MutatorHydrator) hydrateFromCache(key string) (*authn.AuthenticationSession, bool) {
item, found := a.hydrateCache.Get(key)
if !found {
return nil, false
}

return item.(*authn.AuthenticationSession).Copy(), true
}

func (a *MutatorHydrator) hydrateToCache(config *MutatorHydratorConfig, key string, session *authn.AuthenticationSession) {
if !config.Cache.Enabled {
return
}

if a.hydrateCache.SetWithTTL(a.cacheKey(config, key), session.Copy(), 0, config.Cache.ttl) {
if a.hydrateCache.SetWithTTL(key, session.Copy(), 0, config.Cache.ttl) {
a.d.Logger().Debug("Cache reject item")
}
}

func (a *MutatorHydrator) Mutate(r *http.Request, session *authn.AuthenticationSession, config json.RawMessage, _ pipeline.Rule) error {
func (a *MutatorHydrator) Mutate(r *http.Request, session *authn.AuthenticationSession, config json.RawMessage, p pipeline.Rule) error {
cfg, err := a.Config(config)
if err != nil {
return err
}

var b bytes.Buffer
if err := json.NewEncoder(&b).Encode(session); err != nil {
return errors.WithStack(err)
}
sessionCacheKey := a.p.Get().(*bytes.Buffer)
defer func() {
sessionCacheKey.Reset()
a.p.Put(sessionCacheKey)
}()

encodedSession := b.String()
if cacheSession, ok := a.hydrateFromCache(cfg, encodedSession); ok {
*session = *cacheSession
return nil
err = json.NewEncoder(sessionCacheKey).Encode(session)
switch {
case err != nil:
return errors.WithStack(err)
case !cfg.Cache.Enabled:
break
case len(cfg.Cache.Key) > 0:
// Build a composite cache key with property from configuration.
if cacheSession, ok := a.hydrateFromCache(a.cacheKey(
Copy link
Author

@anderslauri anderslauri Dec 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key is a composite key based on configuration property + three distinct repeatable properties (URL for hydrator, rule id and subject of session).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if that is enough! What about extra properties of the session, such as e.g. "scope" or "permissions"? These could all influence the hydrator response and could lead to eventual security vulnerabilities down the road. I think we need to take the full session in a serialized form if we want to be sure that the cache can actually be reused!

Copy link
Author

@anderslauri anderslauri Dec 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If using complete AuthenticationSession one is not benefiting given the volatiltity of this structure - using subject, rule id, hydrator URL as part of the composition is enabling flexibility while ensuring a certain level of constraint. The user is also free given flexibility to set a key which can include all of the above (i.e. JWT-claims) using templating.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I am unsure though wether I can agree here. I think it points to a deeper issue though which is manifesting in this pr: It is not possible to define the cache key reliably. Actually, one would probably like to define the cache key themselves (e.g. subject + scope) to make the system more efficient. For example, the AuthSession might contain vital info such as a "permissions" array. Yet, it may also contain a counter or timestamp which is changing for most of the requests - invalidating the cache if included.

We are still not there yet with a new concept for Ory Oathkeeper, but I think this is a very interesting problem that could very well warrant a realignment on Ory Oathkeeper which would be to make access control at the reverse proxy as efficient and flexible as possible.

I think this too could benefit from JsonNet, as JsonNet is typable, lintable, and can produce errors (go templating fulfills none of these properties).

Unfortunately, for this PR in particular, I don't think the current implementation can be accepted because it still bears too many risks and this particular type of issue has already caused a CVE in Ory Oathkeeper which is why I am so hyper-sensible about this topic: GHSA-qvp4-rpmr-xwrr

Copy link
Author

@anderslauri anderslauri Jan 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. Let us inverse the solution. If you feel a user can't define a cache key freely (given cause of concerns as mentioned) - what about allowing the user to define exclusion of request/response headers (I also believe I saw an issue regarding this)? If the whole AuthenticationSession is used as a cache key headers represents a volatile part (i.e. subject to middleware enrichment which is hard to predict and control). extra is defined as part of previous steps in the pipeline and can be more controlled. What is your thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a brilliant idea! Exclusion is much safer (because explicit) :)

cfg.Api.URL, cfg.Cache.Key, p.GetID(), session.Subject)); ok {
*session = *cacheSession
return nil
}
a.d.Logger().Debugf("Cache key %s in rule %s was not found. Falling back on default.",
cfg.Cache.Key, p.GetID())
fallthrough
default:
if cacheSession, ok := a.hydrateFromCache(a.cacheKey(sessionCacheKey.String())); ok {
*session = *cacheSession
return nil
}
}

if cfg.Api.URL == "" {
return errors.New(ErrMissingAPIURL)
} else if _, err := url.ParseRequestURI(cfg.Api.URL); err != nil {
if _, err = url.ParseRequestURI(cfg.Api.URL); err != nil {
return errors.New(ErrInvalidAPIURL)
}
req, err := http.NewRequest("POST", cfg.Api.URL, &b)

req, err := http.NewRequest("POST", cfg.Api.URL, sessionCacheKey)
if err != nil {
return errors.WithStack(err)
}

if r.URL != nil {
} else if r.URL != nil {
q := r.URL.Query()
req.URL.RawQuery = q.Encode()
}
Expand All @@ -190,19 +208,20 @@ func (a *MutatorHydrator) Mutate(r *http.Request, session *authn.AuthenticationS
giveUpAfter := time.Millisecond * 50
if len(cfg.Api.Retry.MaxDelay) > 0 {
if d, err := time.ParseDuration(cfg.Api.Retry.MaxDelay); err != nil {
a.d.Logger().WithError(err).Warn("Unable to parse max_delay in the Hydrator Mutator, falling pack to default.")
a.d.Logger().WithError(err).Warn("Unable to parse max_delay in the Hydrator Mutator, " +
"falling pack to default.")
} else {
maxRetryDelay = d
}
}
if len(cfg.Api.Retry.GiveUpAfter) > 0 {
if d, err := time.ParseDuration(cfg.Api.Retry.GiveUpAfter); err != nil {
a.d.Logger().WithError(err).Warn("Unable to parse max_delay in the Hydrator Mutator, falling pack to default.")
a.d.Logger().WithError(err).Warn("Unable to parse max_delay in the Hydrator Mutator, " +
"falling pack to default.")
} else {
giveUpAfter = d
}
}

client.Transport = httpx.NewResilientRoundTripper(a.client.Transport, maxRetryDelay, giveUpAfter)
}

Expand All @@ -217,25 +236,29 @@ func (a *MutatorHydrator) Mutate(r *http.Request, session *authn.AuthenticationS
case http.StatusUnauthorized:
if cfg.Api.Auth != nil {
return errors.New(ErrInvalidCredentials)
} else {
return errors.New(ErrNoCredentialsProvided)
}
return errors.New(ErrNoCredentialsProvided)
default:
return errors.New(ErrNon200ResponseFromAPI)
}

sessionFromUpstream := authn.AuthenticationSession{}
err = json.NewDecoder(res.Body).Decode(&sessionFromUpstream)
if err != nil {

if err := json.NewDecoder(res.Body).Decode(&sessionFromUpstream); err != nil {
return errors.WithStack(err)
}
if sessionFromUpstream.Subject != session.Subject {
} else if sessionFromUpstream.Subject != session.Subject {
return errors.New(ErrMalformedResponseFromUpstreamAPI)
}
*session = sessionFromUpstream

a.hydrateToCache(cfg, encodedSession, session)

switch {
case !cfg.Cache.Enabled:
break
case len(cfg.Cache.Key) > 0:
a.hydrateToCache(cfg, a.cacheKey(
cfg.Api.URL, cfg.Cache.Key, p.GetID(), session.Subject), session)
default:
a.hydrateToCache(cfg, sessionCacheKey.String(), session)
}
return nil
}

Expand All @@ -258,14 +281,12 @@ func (a *MutatorHydrator) Config(config json.RawMessage) (*MutatorHydratorConfig
var err error
c.Cache.ttl, err = time.ParseDuration(c.Cache.TTL)
if err != nil {
a.d.Logger().WithError(err).WithField("ttl", c.Cache.TTL).Error("Unable to parse cache ttl in the Hydrator Mutator.")
a.d.Logger().WithError(err).WithField("ttl",
c.Cache.TTL).Error("Unable to parse cache ttl in the Hydrator Mutator.")
return nil, NewErrMutatorMisconfigured(a, err)
}

if c.Cache.ttl == 0 {
} else if c.Cache.ttl == 0 {
c.Cache.ttl = time.Minute
}
}

return &c, nil
}
75 changes: 68 additions & 7 deletions pipeline/mutate/mutator_hydrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http/httptest"
"net/url"
"testing"
"time"

"github.com/julienschmidt/httprouter"

Expand Down Expand Up @@ -127,12 +128,17 @@ func configWithRetriesForMutator(giveUpAfter, retryDelay string) func(*httptest.
}
}

func configWithSpecialCacheKey(key string) func(*httptest.Server) json.RawMessage {
return func(s *httptest.Server) json.RawMessage {
return []byte(fmt.Sprintf(`{"api": {"url": "%s"}, "cache": {"enabled": true, "ttl": "30s", "key": "%s"}}`, s.URL, key))
}
}

func TestMutatorHydrator(t *testing.T) {
conf := internal.NewConfigurationWithDefaults()
reg := internal.NewRegistry(conf)

a, err := reg.PipelineMutator("hydrator")
require.NoError(t, err)
a := mutate.NewMutatorHydrator(conf, reg)
assert.Equal(t, "hydrator", a.GetID())

t.Run("method=mutate", func(t *testing.T) {
Expand All @@ -148,6 +154,7 @@ func TestMutatorHydrator(t *testing.T) {
sampleUserId := "user"
sampleValidPassword := "passwd1"
sampleNotValidPassword := "passwd7"

var testMap = map[string]struct {
Setup func(*testing.T) http.Handler
Session *authn.AuthenticationSession
Expand Down Expand Up @@ -207,7 +214,7 @@ func TestMutatorHydrator(t *testing.T) {
router := httprouter.New()
router.POST("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
w.WriteHeader(http.StatusOK)
_, err = w.Write([]byte(`{}`))
_, err := w.Write([]byte(`{}`))
require.NoError(t, err)
})
return router
Expand Down Expand Up @@ -267,6 +274,17 @@ func TestMutatorHydrator(t *testing.T) {
Match: newAuthenticationSession(),
Err: errors.New("mutator matching this route is misconfigured or disabled"),
},
"Empty API URL": {
Setup: defaultRouterSetup(),
Session: newAuthenticationSession(),
Rule: &rule.Rule{ID: "test-rule"},
Config: func(s *httptest.Server) json.RawMessage {
return []byte(`{"api": {"url": ""}}`)
},
Request: &http.Request{},
Match: newAuthenticationSession(),
Err: errors.New("mutator matching this route is misconfigured or disabled"),
},
"Successful Basic Authentication": {
Setup: withBasicAuth(defaultRouterSetup(setExtra(sampleKey, sampleValue)), sampleUserId, sampleValidPassword),
Session: newAuthenticationSession(),
Expand Down Expand Up @@ -334,7 +352,7 @@ func TestMutatorHydrator(t *testing.T) {
assert.Equal(t, q["a"], []string{"b"})
assert.Equal(t, q["c"], []string{"&12"})

_, err = w.Write([]byte(`{}`))
_, err := w.Write([]byte(`{}`))
require.NoError(t, err)
})
return router
Expand All @@ -346,27 +364,70 @@ func TestMutatorHydrator(t *testing.T) {
Match: newAuthenticationSession(),
Err: nil,
},
"Custom Cache Key No Cache Hit": {
Setup: defaultRouterSetup(),
Session: newAuthenticationSession(setSubject(sampleSubject)),
Rule: &rule.Rule{ID: "test-rule"},
Config: configWithSpecialCacheKey(sampleSubject),
Request: &http.Request{},
Match: newAuthenticationSession(setSubject(sampleSubject)),
Err: nil,
},
"Custom Cache Key Cache Hit": {
Setup: defaultRouterSetup(),
Session: newAuthenticationSession(setSubject(sampleSubject)),
Rule: &rule.Rule{ID: "test-rule"},
Config: configWithSpecialCacheKey(sampleSubject),
anderslauri marked this conversation as resolved.
Show resolved Hide resolved
Request: &http.Request{},
Match: newAuthenticationSession(setSubject(sampleSubject)),
Err: nil,
},
}

for testName, specs := range testMap {
t.Run(testName, func(t *testing.T) {
var router http.Handler
var ts *httptest.Server

if specs.Setup != nil {
router = specs.Setup(t)
}
ts = httptest.NewServer(router)
defer ts.Close()

err := a.Mutate(specs.Request, specs.Session, specs.Config(ts), specs.Rule)
if specs.Err == nil {
const (
ck = "cache"
ct = "Custom Cache Key Cache Hit"
)

switch {
case testName == ct:
specs.Session.Extra = make(map[string]interface{})
specs.Session.Extra[ck] = struct{}{}
_ = a.Mutate(specs.Request, specs.Session, specs.Config(ts), specs.Rule)
// Delete the K/V-combination above, needs to be served from the cache,
// knowing same K/V-combination asserts session is originating from cache.
delete(specs.Session.Extra, ck)
// Cache entry is being written asynchronously. Obviously this here is not
// a good strategy, however, the alternative would be to replace the cache.
time.Sleep(50 * time.Millisecond)
}

if err := a.Mutate(specs.Request, specs.Session, specs.Config(ts), specs.Rule); specs.Err == nil {
// Issuer must run without error
require.NoError(t, err)
} else {
assert.EqualError(t, err, specs.Err.Error())
}

assert.Equal(t, specs.Match, specs.Session)
switch {
case testName == ct:
// As specs.Session is served from cache we can't perform
// full equality assertion but assert if cache key is set.
assert.Contains(t, specs.Session.Extra, ck)
default:
assert.Equal(t, specs.Match, specs.Session)
}
})
}

Expand Down