From 7fa3cd5bb8979ae3186e72e3d966b4a96f665e2a Mon Sep 17 00:00:00 2001 From: Bob Callaway Date: Mon, 9 Oct 2023 17:57:56 -0400 Subject: [PATCH] move index storage into interface (#1741) * move index storage into interface Signed-off-by: Bob Callaway * fix nits, add unit tests Signed-off-by: Bob Callaway --------- Signed-off-by: Bob Callaway --- cmd/rekor-server/app/root.go | 2 + pkg/api/api.go | 24 ++++--- pkg/api/entries.go | 6 +- pkg/api/error.go | 2 +- pkg/api/index.go | 16 ++--- pkg/indexstorage/indexstorage.go | 38 ++++++++++ pkg/indexstorage/redis/redis.go | 62 +++++++++++++++++ pkg/indexstorage/redis/redis_test.go | 100 +++++++++++++++++++++++++++ 8 files changed, 229 insertions(+), 21 deletions(-) create mode 100644 pkg/indexstorage/indexstorage.go create mode 100644 pkg/indexstorage/redis/redis.go create mode 100644 pkg/indexstorage/redis/redis_test.go diff --git a/cmd/rekor-server/app/root.go b/cmd/rekor-server/app/root.go index 04a47fcd9..c061a553c 100644 --- a/cmd/rekor-server/app/root.go +++ b/cmd/rekor-server/app/root.go @@ -101,6 +101,8 @@ Memory and file-based signers should only be used for testing.`) rootCmd.PersistentFlags().Bool("enable_retrieve_api", true, "enables Redis-based index API endpoint") _ = rootCmd.PersistentFlags().MarkDeprecated("enable_retrieve_api", "this flag is deprecated in favor of enabled_api_endpoints (searchIndex)") + rootCmd.PersistentFlags().String("search_index.storage_provider", "redis", + `Index Storage provider to use. Valid options are: [redis].`) rootCmd.PersistentFlags().String("redis_server.address", "127.0.0.1", "Redis server address") rootCmd.PersistentFlags().Uint16("redis_server.port", 6379, "Redis server port") diff --git a/pkg/api/api.go b/pkg/api/api.go index b033a022a..fdbdb9549 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "github.com/sigstore/rekor/pkg/indexstorage" "github.com/sigstore/rekor/pkg/log" "github.com/sigstore/rekor/pkg/pubsub" "github.com/sigstore/rekor/pkg/sharding" @@ -145,9 +146,10 @@ func NewAPI(treeID uint) (*API, error) { } var ( - api *API - storageClient storage.AttestationStorage - redisClient *redis.Client + api *API + attestationStorageClient storage.AttestationStorage + indexStorageClient indexstorage.IndexStorage + redisClient *redis.Client ) func ConfigureAPI(treeID uint) { @@ -159,21 +161,25 @@ func ConfigureAPI(treeID uint) { } if viper.GetBool("enable_retrieve_api") || viper.GetBool("enable_stable_checkpoint") || slices.Contains(viper.GetStringSlice("enabled_api_endpoints"), "searchIndex") { - redisClient = redis.NewClient(&redis.Options{ - Addr: fmt.Sprintf("%v:%v", viper.GetString("redis_server.address"), viper.GetUint64("redis_server.port")), - Network: "tcp", - DB: 0, // default DB - }) + indexStorageClient, err = indexstorage.NewIndexStorage(viper.GetString("search_index.storage_provider")) + if err != nil { + log.Logger.Panic(err) + } } if viper.GetBool("enable_attestation_storage") { - storageClient, err = storage.NewAttestationStorage() + attestationStorageClient, err = storage.NewAttestationStorage() if err != nil { log.Logger.Panic(err) } } if viper.GetBool("enable_stable_checkpoint") { + redisClient = redis.NewClient(&redis.Options{ + Addr: fmt.Sprintf("%v:%v", viper.GetString("redis_server.address"), viper.GetUint64("redis_server.port")), + Network: "tcp", + DB: 0, // default DB + }) checkpointPublisher := witness.NewCheckpointPublisher(context.Background(), api.logClient, api.logRanges.ActiveTreeID(), viper.GetString("rekor_server.hostname"), api.signer, redisClient, viper.GetUint("publish_frequency"), CheckpointPublishCount) diff --git a/pkg/api/entries.go b/pkg/api/entries.go index 49643e4ce..ca2ac8244 100644 --- a/pkg/api/entries.go +++ b/pkg/api/entries.go @@ -135,14 +135,14 @@ func logEntryFromLeaf(ctx context.Context, signer signature.Signer, _ trilliancl attKey := entryWithAtt.AttestationKey() // if we're given a key by the type logic, let's try that first if attKey != "" { - att, fetchErr = storageClient.FetchAttestation(ctx, attKey) + att, fetchErr = attestationStorageClient.FetchAttestation(ctx, attKey) if fetchErr != nil { log.ContextLogger(ctx).Debugf("error fetching attestation by key, trying by UUID: %s %v", attKey, fetchErr) } } // if looking up by key failed or we weren't able to generate a key, try looking up by uuid if attKey == "" || fetchErr != nil { - att, fetchErr = storageClient.FetchAttestation(ctx, entryIDstruct.UUID) + att, fetchErr = attestationStorageClient.FetchAttestation(ctx, entryIDstruct.UUID) if fetchErr != nil { log.ContextLogger(ctx).Debugf("error fetching attestation by uuid: %s %v", entryIDstruct.UUID, fetchErr) } @@ -244,7 +244,7 @@ func createLogEntry(params entries.CreateLogEntryParams) (models.LogEntry, middl IntegratedTime: swag.Int64(queuedLeaf.IntegrateTimestamp.AsTime().Unix()), } - if redisClient != nil { + if indexStorageClient != nil { go func() { keys, err := entry.IndexKeys() if err != nil { diff --git a/pkg/api/error.go b/pkg/api/error.go index d05e9170e..e0a04fc93 100644 --- a/pkg/api/error.go +++ b/pkg/api/error.go @@ -42,7 +42,7 @@ const ( malformedUUID = "UUID must be a 64-character hexadecimal string" malformedPublicKey = "public key provided could not be parsed" failedToGenerateCanonicalKey = "error generating canonicalized public key" - redisUnexpectedResult = "unexpected result from searching index" + indexStorageUnexpectedResult = "unexpected result from searching index" lastSizeGreaterThanKnown = "the tree size requested(%d) was greater than what is currently observable(%d)" signingError = "error signing" sthGenerateError = "error generating signed tree head" diff --git a/pkg/api/index.go b/pkg/api/index.go index 69f695a28..0fb5a7232 100644 --- a/pkg/api/index.go +++ b/pkg/api/index.go @@ -45,9 +45,9 @@ func SearchIndexHandler(params index.SearchIndexParams) middleware.Responder { if params.Query.Hash != "" { // This must be a valid hash sha := util.PrefixSHA(params.Query.Hash) - resultUUIDs, err := redisClient.LRange(httpReqCtx, strings.ToLower(sha), 0, -1).Result() + resultUUIDs, err := indexStorageClient.LookupIndices(httpReqCtx, strings.ToLower(sha)) if err != nil { - return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("redis client: %w", err), redisUnexpectedResult) + return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("index storage error: %w", err), indexStorageUnexpectedResult) } result.Add(resultUUIDs) } @@ -72,16 +72,16 @@ func SearchIndexHandler(params index.SearchIndexParams) middleware.Responder { } keyHash := sha256.Sum256(canonicalKey) - resultUUIDs, err := redisClient.LRange(httpReqCtx, strings.ToLower(hex.EncodeToString(keyHash[:])), 0, -1).Result() + resultUUIDs, err := indexStorageClient.LookupIndices(httpReqCtx, strings.ToLower(hex.EncodeToString(keyHash[:]))) if err != nil { - return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("redis client: %w", err), redisUnexpectedResult) + return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("index storage error: %w", err), indexStorageUnexpectedResult) } result.Add(resultUUIDs) } if params.Query.Email != "" { - resultUUIDs, err := redisClient.LRange(httpReqCtx, strings.ToLower(params.Query.Email.String()), 0, -1).Result() + resultUUIDs, err := indexStorageClient.LookupIndices(httpReqCtx, strings.ToLower(params.Query.Email.String())) if err != nil { - return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("redis client: %w", err), redisUnexpectedResult) + return handleRekorAPIError(params, http.StatusInternalServerError, fmt.Errorf("index storage error: %w", err), indexStorageUnexpectedResult) } result.Add(resultUUIDs) } @@ -100,7 +100,7 @@ func SearchIndexNotImplementedHandler(_ index.SearchIndexParams) middleware.Resp } func addToIndex(ctx context.Context, key, value string) error { - _, err := redisClient.LPush(ctx, key, value).Result() + err := indexStorageClient.WriteIndex(ctx, key, value) if err != nil { return fmt.Errorf("redis client: %w", err) } @@ -108,7 +108,7 @@ func addToIndex(ctx context.Context, key, value string) error { } func storeAttestation(ctx context.Context, uuid string, attestation []byte) error { - return storageClient.StoreAttestation(ctx, uuid, attestation) + return attestationStorageClient.StoreAttestation(ctx, uuid, attestation) } // Uniq is a collection of unique elements. diff --git a/pkg/indexstorage/indexstorage.go b/pkg/indexstorage/indexstorage.go new file mode 100644 index 000000000..3921fbf6d --- /dev/null +++ b/pkg/indexstorage/indexstorage.go @@ -0,0 +1,38 @@ +// Copyright 2023 The Sigstore Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package indexstorage + +import ( + "context" + "fmt" + + "github.com/sigstore/rekor/pkg/indexstorage/redis" + "github.com/spf13/viper" +) + +type IndexStorage interface { + LookupIndices(context.Context, string) ([]string, error) // Returns indices for specified key + WriteIndex(context.Context, string, string) error // Writes index for specified key +} + +// NewIndexStorage instantiates a new IndexStorage provider based on the requested type +func NewIndexStorage(providerType string) (IndexStorage, error) { + switch providerType { + case redis.ProviderType: + return redis.NewProvider(viper.GetString("redis_server.address"), viper.GetString("redis_server.port")) + default: + return nil, fmt.Errorf("invalid index storage provider type: %v", providerType) + } +} diff --git a/pkg/indexstorage/redis/redis.go b/pkg/indexstorage/redis/redis.go new file mode 100644 index 000000000..bdf42e396 --- /dev/null +++ b/pkg/indexstorage/redis/redis.go @@ -0,0 +1,62 @@ +// Copyright 2023 The Sigstore Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package redis + +import ( + "context" + "errors" + "fmt" + "strings" + + redis "github.com/redis/go-redis/v9" +) + +const ProviderType = "redis" + +// IndexStorageProvider implements indexstorage.IndexStorage +type IndexStorageProvider struct { + client *redis.Client +} + +func NewProvider(address, port string) (*IndexStorageProvider, error) { + provider := &IndexStorageProvider{} + provider.client = redis.NewClient(&redis.Options{ + Addr: fmt.Sprintf("%v:%v", address, port), + Network: "tcp", + DB: 0, // default DB + }) + return provider, nil +} + +// LookupIndices looks up and returns all indices for the specified key. The key value will be canonicalized +// by converting all characters into a lowercase value before looking up in Redis +func (isp *IndexStorageProvider) LookupIndices(ctx context.Context, key string) ([]string, error) { + if isp.client == nil { + return []string{}, errors.New("redis client has not been initialized") + } + return isp.client.LRange(ctx, strings.ToLower(key), 0, -1).Result() +} + +// WriteIndex adds the index for the specified key. The key value will be canonicalized +// by converting all characters into a lowercase value before appending the index in Redis +func (isp *IndexStorageProvider) WriteIndex(ctx context.Context, key, index string) error { + if isp.client == nil { + return errors.New("redis client has not been initialized") + } + if _, err := isp.client.LPush(ctx, strings.ToLower(key), index).Result(); err != nil { + return fmt.Errorf("redis client: %w", err) + } + return nil +} diff --git a/pkg/indexstorage/redis/redis_test.go b/pkg/indexstorage/redis/redis_test.go new file mode 100644 index 000000000..509679604 --- /dev/null +++ b/pkg/indexstorage/redis/redis_test.go @@ -0,0 +1,100 @@ +// Copyright 2023 The Sigstore Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package redis + +import ( + "context" + "errors" + "testing" + + "github.com/go-redis/redismock/v9" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "go.uber.org/goleak" +) + +func TestLookupIndices(t *testing.T) { + key := "87c1b129fbadd7b6e9abc0a9ef7695436d767aece042bec198a97e949fcbe14c" + value := []string{"1e1f2c881ae0608ec77ebf88a75c66d3099113a7343238f2f7a0ebb91a4ed335"} + redisClient, mock := redismock.NewClientMock() + mock.Regexp().ExpectLRange(key, 0, -1).SetVal(value) + + isp := IndexStorageProvider{redisClient} + + indices, err := isp.LookupIndices(context.Background(), key) + if err != nil { + t.Error(err) + } + + less := func(a, b string) bool { return a < b } + if cmp.Diff(value, indices, cmpopts.SortSlices(less)) != "" { + t.Errorf("expected %s, got %s", value, indices) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Error(err) + } + + mock.ClearExpect() + errRedis := errors.New("redis error") + mock.Regexp().ExpectLRange(key, 0, -1).SetErr(errRedis) + if _, err := isp.LookupIndices(context.Background(), key); err == nil { + t.Error("unexpected success") + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Error(err) + } +} + +func TestWriteIndex(t *testing.T) { + key := "87c1b129fbadd7b6e9abc0a9ef7695436d767aece042bec198a97e949fcbe14c" + value := []string{"1e1f2c881ae0608ec77ebf88a75c66d3099113a7343238f2f7a0ebb91a4ed335"} + redisClient, mock := redismock.NewClientMock() + mock.Regexp().ExpectLPush(key, value).SetVal(1) + + isp := IndexStorageProvider{redisClient} + if err := isp.WriteIndex(context.Background(), key, value[0]); err != nil { + t.Error(err) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Error(err) + } + + mock.ClearExpect() + errRedis := errors.New("redis error") + mock.Regexp().ExpectLPush(key, value).SetErr(errRedis) + if err := isp.WriteIndex(context.Background(), key, value[0]); err == nil { + t.Error("unexpected success") + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Error(err) + } +} + +func TestUninitializedClient(t *testing.T) { + // this is not initialized with a real Redis client + isp := IndexStorageProvider{} + if _, err := isp.LookupIndices(context.Background(), "key"); err == nil { + t.Error("unexpected success") + } + if err := isp.WriteIndex(context.Background(), "key", "value"); err == nil { + t.Error("unexpected success") + } +} + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +}