From 324cee8638a9a3dfab68cb47e2fa3c05295beb37 Mon Sep 17 00:00:00 2001 From: vvatanabe Date: Sun, 3 Dec 2023 09:47:04 +0900 Subject: [PATCH] feat: implements visibility timeout --- README.md | 26 ++-- client.go | 230 ++++++++++++++----------------- client_test.go | 61 ++++---- consumer.go | 2 +- consumer_test.go | 6 +- dynamomq_test.go | 9 +- internal/clock/clock.go | 7 +- internal/cmd/fail.go | 2 +- internal/cmd/interactive.go | 4 +- internal/cmd/interactive_test.go | 12 +- internal/cmd/ls.go | 3 +- internal/cmd/root.go | 5 +- internal/mock/mock.go | 32 ++--- internal/mock/mock_test.go | 4 +- message.go | 58 ++++---- message_test.go | 75 ++++++++++ 16 files changed, 295 insertions(+), 241 deletions(-) create mode 100644 message_test.go diff --git a/README.md b/README.md index fa1664d..5f50ba9 100644 --- a/README.md +++ b/README.md @@ -331,18 +331,18 @@ This design ensures that DynamoMQ maintains message reliability while enabling t The DynamoDB table for the DynamoMQ message queue system is designed to efficiently manage and track the status of messages. Here’s a breakdown of the table schema: -| Key | Attributes | Type | Example Value | -|-------|--------------------------|--------|-------------------------------------| -| PK | id | string | A-101 | -| | data | any | any | -| | status | string | READY or PROCESSING | -| | receive_count | number | 1 | -| GSIPK | queue_type | string | STANDARD or DLQ | -| | version | number | 1 | -| | creation_timestamp | string | 2006-01-02T15:04:05.999999999Z07:00 | -| | last_updated_timestamp | string | 2006-01-02T15:04:05.999999999Z07:00 | -| GSISK | queue_add_timestamp | string | 2006-01-02T15:04:05.999999999Z07:00 | -| | queue_peek_timestamp | string | 2006-01-02T15:04:05.999999999Z07:00 | +| Key | Attributes | Type | Example Value | +|-------|------------------------|--------|-------------------------------------| +| PK | id | string | A-101 | +| | data | any | any | +| | visibility_timeout | number | 10 | +| | receive_count | number | 1 | +| GSIPK | queue_type | string | STANDARD or DLQ | +| | version | number | 1 | +| | creation_timestamp | string | 2006-01-02T15:04:05.999999999Z07:00 | +| | last_updated_timestamp | string | 2006-01-02T15:04:05.999999999Z07:00 | +| GSISK | queue_add_timestamp | string | 2006-01-02T15:04:05.999999999Z07:00 | +| | queue_peek_timestamp | string | 2006-01-02T15:04:05.999999999Z07:00 | **PK (Primary Key)** `ID`: A unique identifier for each message, such as 'A-101'. This is a string value that facilitates the retrieval and management of messages. @@ -352,7 +352,7 @@ The DynamoDB table for the DynamoMQ message queue system is designed to efficien **Attributes**: These are the various properties associated with each message: - `data`: This attribute holds the content of the message and can be of any type. -- `status`: Indicates the current state of the message, either 'READY' for new messages or 'PROCESSING' for messages being processed. +- `isibility_timeout`: The new value for the message's visibility timeout (in seconds). - `receive_count`: A numerical count of how many times the message has been retrieved from the queue. - `version`: A number that can be used for optimistic locking and to ensure that the message is not being concurrently modified. - `creation_timestamp`: The date and time when the message was created. ISO 8601 format. diff --git a/client.go b/client.go index a255e12..04a6537 100644 --- a/client.go +++ b/client.go @@ -4,7 +4,6 @@ import ( "context" "errors" "sort" - "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" @@ -18,7 +17,7 @@ const ( DefaultTableName = "dynamo-mq-table" DefaultQueueingIndexName = "dynamo-mq-index-queue_type-queue_add_timestamp" DefaultRetryMaxAttempts = 10 - DefaultVisibilityTimeoutInMinutes = 1 + DefaultVisibilityTimeoutInSeconds = 60 DefaultMaxListMessages = 10 DefaultQueryLimit = 250 ) @@ -26,7 +25,7 @@ const ( type Client[T any] interface { SendMessage(ctx context.Context, params *SendMessageInput[T]) (*SendMessageOutput[T], error) ReceiveMessage(ctx context.Context, params *ReceiveMessageInput) (*ReceiveMessageOutput[T], error) - UpdateMessageAsVisible(ctx context.Context, params *UpdateMessageAsVisibleInput) (*UpdateMessageAsVisibleOutput[T], error) + ChangeMessageVisibility(ctx context.Context, params *ChangeMessageVisibilityInput) (*ChangeMessageVisibilityOutput[T], error) DeleteMessage(ctx context.Context, params *DeleteMessageInput) (*DeleteMessageOutput, error) MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput, error) RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput, error) @@ -38,19 +37,18 @@ type Client[T any] interface { } type ClientOptions struct { - DynamoDB *dynamodb.Client - TableName string - QueueingIndexName string - VisibilityTimeoutInMinutes int - MaximumReceives int - UseFIFO bool - BaseEndpoint string - RetryMaxAttempts int - Clock clock.Clock - MarshalMap func(in interface{}) (map[string]types.AttributeValue, error) - UnmarshalMap func(m map[string]types.AttributeValue, out interface{}) error - UnmarshalListOfMaps func(l []map[string]types.AttributeValue, out interface{}) error - BuildExpression func(b expression.Builder) (expression.Expression, error) + DynamoDB *dynamodb.Client + TableName string + QueueingIndexName string + MaximumReceives int + UseFIFO bool + BaseEndpoint string + RetryMaxAttempts int + Clock clock.Clock + MarshalMap func(in interface{}) (map[string]types.AttributeValue, error) + UnmarshalMap func(m map[string]types.AttributeValue, out interface{}) error + UnmarshalListOfMaps func(l []map[string]types.AttributeValue, out interface{}) error + BuildExpression func(b expression.Builder) (expression.Expression, error) } func WithAWSDynamoDBClient(client *dynamodb.Client) func(*ClientOptions) { @@ -71,12 +69,6 @@ func WithQueueingIndexName(queueingIndexName string) func(*ClientOptions) { } } -func WithAWSVisibilityTimeout(minutes int) func(*ClientOptions) { - return func(s *ClientOptions) { - s.VisibilityTimeoutInMinutes = minutes - } -} - func WithUseFIFO(useFIFO bool) func(*ClientOptions) { return func(s *ClientOptions) { s.UseFIFO = useFIFO @@ -97,15 +89,14 @@ func WithAWSRetryMaxAttempts(retryMaxAttempts int) func(*ClientOptions) { func NewFromConfig[T any](cfg aws.Config, optFns ...func(*ClientOptions)) (Client[T], error) { o := &ClientOptions{ - TableName: DefaultTableName, - QueueingIndexName: DefaultQueueingIndexName, - RetryMaxAttempts: DefaultRetryMaxAttempts, - VisibilityTimeoutInMinutes: DefaultVisibilityTimeoutInMinutes, - UseFIFO: false, - Clock: &clock.RealClock{}, - MarshalMap: attributevalue.MarshalMap, - UnmarshalMap: attributevalue.UnmarshalMap, - UnmarshalListOfMaps: attributevalue.UnmarshalListOfMaps, + TableName: DefaultTableName, + QueueingIndexName: DefaultQueueingIndexName, + RetryMaxAttempts: DefaultRetryMaxAttempts, + UseFIFO: false, + Clock: &clock.RealClock{}, + MarshalMap: attributevalue.MarshalMap, + UnmarshalMap: attributevalue.UnmarshalMap, + UnmarshalListOfMaps: attributevalue.UnmarshalListOfMaps, BuildExpression: func(b expression.Builder) (expression.Expression, error) { return b.Build() }, @@ -114,17 +105,16 @@ func NewFromConfig[T any](cfg aws.Config, optFns ...func(*ClientOptions)) (Clien opt(o) } c := &client[T]{ - tableName: o.TableName, - queueingIndexName: o.QueueingIndexName, - visibilityTimeoutInMinutes: o.VisibilityTimeoutInMinutes, - maximumReceives: o.MaximumReceives, - useFIFO: o.UseFIFO, - dynamoDB: o.DynamoDB, - clock: o.Clock, - marshalMap: o.MarshalMap, - unmarshalMap: o.UnmarshalMap, - unmarshalListOfMaps: o.UnmarshalListOfMaps, - buildExpression: o.BuildExpression, + tableName: o.TableName, + queueingIndexName: o.QueueingIndexName, + maximumReceives: o.MaximumReceives, + useFIFO: o.UseFIFO, + dynamoDB: o.DynamoDB, + clock: o.Clock, + marshalMap: o.MarshalMap, + unmarshalMap: o.UnmarshalMap, + unmarshalListOfMaps: o.UnmarshalListOfMaps, + buildExpression: o.BuildExpression, } if c.dynamoDB != nil { return c, nil @@ -139,17 +129,16 @@ func NewFromConfig[T any](cfg aws.Config, optFns ...func(*ClientOptions)) (Clien } type client[T any] struct { - dynamoDB *dynamodb.Client - tableName string - queueingIndexName string - visibilityTimeoutInMinutes int - maximumReceives int - useFIFO bool - clock clock.Clock - marshalMap func(in interface{}) (map[string]types.AttributeValue, error) - unmarshalMap func(m map[string]types.AttributeValue, out interface{}) error - unmarshalListOfMaps func(l []map[string]types.AttributeValue, out interface{}) error - buildExpression func(b expression.Builder) (expression.Expression, error) + dynamoDB *dynamodb.Client + tableName string + queueingIndexName string + maximumReceives int + useFIFO bool + clock clock.Clock + marshalMap func(in interface{}) (map[string]types.AttributeValue, error) + unmarshalMap func(m map[string]types.AttributeValue, out interface{}) error + unmarshalListOfMaps func(l []map[string]types.AttributeValue, out interface{}) error + buildExpression func(b expression.Builder) (expression.Expression, error) } type SendMessageInput[T any] struct { @@ -176,7 +165,8 @@ func (c *client[T]) SendMessage(ctx context.Context, params *SendMessageInput[T] if retrieved.Message != nil { return &SendMessageOutput[T]{}, &IDDuplicatedError{} } - message := NewMessage(params.ID, params.Data, c.clock.Now()) + now := c.clock.Now() + message := NewMessage(params.ID, params.Data, now) err = c.put(ctx, message) if err != nil { return &SendMessageOutput[T]{}, err @@ -184,7 +174,7 @@ func (c *client[T]) SendMessage(ctx context.Context, params *SendMessageInput[T] return &SendMessageOutput[T]{ Result: &Result{ ID: message.ID, - Status: message.Status, + Status: message.GetStatus(now), LastUpdatedTimestamp: message.LastUpdatedTimestamp, Version: message.Version, }, @@ -193,7 +183,8 @@ func (c *client[T]) SendMessage(ctx context.Context, params *SendMessageInput[T] } type ReceiveMessageInput struct { - QueueType QueueType + QueueType QueueType + VisibilityTimeout int } // ReceiveMessageOutput represents the result for the ReceiveMessage() API call. @@ -211,29 +202,29 @@ func (c *client[T]) ReceiveMessage(ctx context.Context, params *ReceiveMessageIn params.QueueType = QueueTypeStandard } - selectedItem, err := c.queryDynamoDB(ctx, params) + selected, err := c.selectMessage(ctx, params) if err != nil { return &ReceiveMessageOutput[T]{}, err } - updatedItem, err := c.processSelectedItem(ctx, selectedItem) + updated, err := c.processSelectedMessage(ctx, selected) if err != nil { return &ReceiveMessageOutput[T]{}, err } return &ReceiveMessageOutput[T]{ Result: &Result{ - ID: updatedItem.ID, - Status: updatedItem.Status, - LastUpdatedTimestamp: updatedItem.LastUpdatedTimestamp, - Version: updatedItem.Version, + ID: updated.ID, + Status: updated.GetStatus(c.clock.Now()), + LastUpdatedTimestamp: updated.LastUpdatedTimestamp, + Version: updated.Version, }, - PeekFromQueueTimestamp: updatedItem.PeekFromQueueTimestamp, - PeekedMessageObject: updatedItem, + PeekFromQueueTimestamp: updated.PeekFromQueueTimestamp, + PeekedMessageObject: updated, }, nil } -func (c *client[T]) queryDynamoDB(ctx context.Context, params *ReceiveMessageInput) (*Message[T], error) { +func (c *client[T]) selectMessage(ctx context.Context, params *ReceiveMessageInput) (*Message[T], error) { builder := expression.NewBuilder(). WithKeyCondition(expression.Key("queue_type").Equal(expression.Value(params.QueueType))) expr, err := c.buildExpression(builder) @@ -241,18 +232,18 @@ func (c *client[T]) queryDynamoDB(ctx context.Context, params *ReceiveMessageInp return nil, BuildingExpressionError{Cause: err} } - selectedItem, err := c.executeQuery(ctx, expr) + selected, err := c.executeQuery(ctx, params, expr) if err != nil { return nil, err } - if selectedItem == nil { + if selected == nil { return nil, &EmptyQueueError{} } - return selectedItem, nil + return selected, nil } -func (c *client[T]) executeQuery(ctx context.Context, expr expression.Expression) (*Message[T], error) { +func (c *client[T]) executeQuery(ctx context.Context, params *ReceiveMessageInput, expr expression.Expression) (*Message[T], error) { var exclusiveStartKey map[string]types.AttributeValue var selectedItem *Message[T] for { @@ -272,7 +263,7 @@ func (c *client[T]) executeQuery(ctx context.Context, expr expression.Expression exclusiveStartKey = queryResult.LastEvaluatedKey - selectedItem, err = c.processQueryResult(queryResult) + selectedItem, err = c.processQueryResult(params, queryResult) if err != nil { return nil, err } @@ -283,97 +274,89 @@ func (c *client[T]) executeQuery(ctx context.Context, expr expression.Expression return selectedItem, nil } -func (c *client[T]) processQueryResult(queryResult *dynamodb.QueryOutput) (*Message[T], error) { - var selectedItem *Message[T] - visibilityTimeout := c.getVisibilityTimeout() - +func (c *client[T]) processQueryResult(params *ReceiveMessageInput, queryResult *dynamodb.QueryOutput) (*Message[T], error) { + var selected *Message[T] for _, itemMap := range queryResult.Items { - item := Message[T]{} - if err := c.unmarshalMap(itemMap, &item); err != nil { + message := Message[T]{} + if err := c.unmarshalMap(itemMap, &message); err != nil { return nil, UnmarshalingAttributeError{Cause: err} } - if err := item.markAsProcessing(c.clock.Now(), visibilityTimeout); err == nil { - selectedItem = &item + if err := message.markAsProcessing(c.clock.Now(), params.VisibilityTimeout); err == nil { + selected = &message break } if c.useFIFO { return nil, &EmptyQueueError{} } } - return selectedItem, nil -} - -func (c *client[T]) getVisibilityTimeout() time.Duration { - return time.Duration(c.visibilityTimeoutInMinutes) * time.Minute + return selected, nil } -func (c *client[T]) processSelectedItem(ctx context.Context, item *Message[T]) (*Message[T], error) { +func (c *client[T]) processSelectedMessage(ctx context.Context, message *Message[T]) (*Message[T], error) { builder := expression.NewBuilder(). WithUpdate(expression. Add(expression.Name("version"), expression.Value(1)). Add(expression.Name("receive_count"), expression.Value(1)). - Set(expression.Name("last_updated_timestamp"), expression.Value(item.LastUpdatedTimestamp)). - Set(expression.Name("queue_peek_timestamp"), expression.Value(item.PeekFromQueueTimestamp)). - Set(expression.Name("status"), expression.Value(item.Status))). - WithCondition(expression.Name("version").Equal(expression.Value(item.Version))) + Set(expression.Name("visibility_timeout"), expression.Value(message.VisibilityTimeout)). + Set(expression.Name("last_updated_timestamp"), expression.Value(message.LastUpdatedTimestamp)). + Set(expression.Name("queue_peek_timestamp"), expression.Value(message.PeekFromQueueTimestamp))). + WithCondition(expression.Name("version").Equal(expression.Value(message.Version))) expr, err := c.buildExpression(builder) if err != nil { return nil, BuildingExpressionError{Cause: err} } - updated, err := c.updateDynamoDBItem(ctx, item.ID, &expr) + updated, err := c.updateDynamoDBItem(ctx, message.ID, &expr) if err != nil { return nil, err } return updated, nil } -type UpdateMessageAsVisibleInput struct { - ID string +type ChangeMessageVisibilityInput struct { + ID string + VisibilityTimeout int } -// UpdateMessageAsVisibleOutput represents the result for the UpdateMessageAsVisible() API call. -type UpdateMessageAsVisibleOutput[T any] struct { +// ChangeMessageVisibilityOutput represents the result for the ChangeMessageVisibility() API call. +type ChangeMessageVisibilityOutput[T any] struct { *Result // Embedded type for inheritance-like behavior in Go Message *Message[T] `json:"-"` } -func (c *client[T]) UpdateMessageAsVisible(ctx context.Context, params *UpdateMessageAsVisibleInput) (*UpdateMessageAsVisibleOutput[T], error) { +func (c *client[T]) ChangeMessageVisibility(ctx context.Context, params *ChangeMessageVisibilityInput) (*ChangeMessageVisibilityOutput[T], error) { if params == nil { - params = &UpdateMessageAsVisibleInput{} + params = &ChangeMessageVisibilityInput{} } retrieved, err := c.GetMessage(ctx, &GetMessageInput{ ID: params.ID, }) if err != nil { - return &UpdateMessageAsVisibleOutput[T]{}, err + return &ChangeMessageVisibilityOutput[T]{}, err } if retrieved.Message == nil { - return &UpdateMessageAsVisibleOutput[T]{}, &IDNotFoundError{} + return &ChangeMessageVisibilityOutput[T]{}, &IDNotFoundError{} } message := retrieved.Message - err = message.markAsReady(c.clock.Now()) - if err != nil { - return &UpdateMessageAsVisibleOutput[T]{}, err - } + message.changeVisibilityTimeout(c.clock.Now(), params.VisibilityTimeout) builder := expression.NewBuilder(). WithUpdate(expression. Add(expression.Name("version"), expression.Value(1)). Set(expression.Name("last_updated_timestamp"), expression.Value(message.LastUpdatedTimestamp)). - Set(expression.Name("status"), expression.Value(message.Status))). + Set(expression.Name("visibility_timeout"), expression.Value(message.VisibilityTimeout))). WithCondition(expression.Name("version").Equal(expression.Value(message.Version))) expr, err := c.buildExpression(builder) if err != nil { - return &UpdateMessageAsVisibleOutput[T]{}, BuildingExpressionError{Cause: err} + return &ChangeMessageVisibilityOutput[T]{}, BuildingExpressionError{Cause: err} } retried, err := c.updateDynamoDBItem(ctx, message.ID, &expr) if err != nil { - return &UpdateMessageAsVisibleOutput[T]{}, err + return &ChangeMessageVisibilityOutput[T]{}, err } - return &UpdateMessageAsVisibleOutput[T]{ + return &ChangeMessageVisibilityOutput[T]{ Result: &Result{ ID: retried.ID, - Status: retried.Status, + Status: retried.GetStatus(c.clock.Now()), LastUpdatedTimestamp: retried.LastUpdatedTimestamp, Version: retried.Version, }, @@ -438,7 +421,7 @@ func (c *client[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessageToD //lint:ignore nilerr reason return &MoveMessageToDLQOutput{ ID: params.ID, - Status: message.Status, + Status: message.GetStatus(c.clock.Now()), LastUpdatedTimestamp: message.LastUpdatedTimestamp, Version: message.Version, }, nil @@ -446,31 +429,32 @@ func (c *client[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessageToD builder := expression.NewBuilder(). WithUpdate(expression. Add(expression.Name("version"), expression.Value(1)). - Set(expression.Name("status"), expression.Value(message.Status)). + Set(expression.Name("visibility_timeout"), expression.Value(message.VisibilityTimeout)). Set(expression.Name("receive_count"), expression.Value(message.ReceiveCount)). Set(expression.Name("queue_type"), expression.Value(message.QueueType)). Set(expression.Name("last_updated_timestamp"), expression.Value(message.LastUpdatedTimestamp)). Set(expression.Name("queue_add_timestamp"), expression.Value(message.AddToQueueTimestamp)). - Set(expression.Name(" queue_peek_timestamp"), expression.Value(message.AddToQueueTimestamp))). + Set(expression.Name("queue_peek_timestamp"), expression.Value(message.AddToQueueTimestamp))). WithCondition(expression.Name("version").Equal(expression.Value(message.Version))) expr, err := c.buildExpression(builder) if err != nil { return &MoveMessageToDLQOutput{}, BuildingExpressionError{Cause: err} } - item, err := c.updateDynamoDBItem(ctx, params.ID, &expr) + updated, err := c.updateDynamoDBItem(ctx, params.ID, &expr) if err != nil { return &MoveMessageToDLQOutput{}, err } return &MoveMessageToDLQOutput{ ID: params.ID, - Status: item.Status, - LastUpdatedTimestamp: item.LastUpdatedTimestamp, - Version: item.Version, + Status: updated.GetStatus(c.clock.Now()), + LastUpdatedTimestamp: updated.LastUpdatedTimestamp, + Version: updated.Version, }, nil } type RedriveMessageInput struct { - ID string + ID string + VisibilityTimeout int } type RedriveMessageOutput struct { @@ -494,7 +478,7 @@ func (c *client[T]) RedriveMessage(ctx context.Context, params *RedriveMessageIn return &RedriveMessageOutput{}, &IDNotFoundError{} } message := retrieved.Message - err = message.markAsRestoredFromDLQ(c.clock.Now(), c.getVisibilityTimeout()) + err = message.markAsRestoredFromDLQ(c.clock.Now(), params.VisibilityTimeout) if err != nil { return &RedriveMessageOutput{}, err } @@ -506,8 +490,8 @@ func (c *client[T]) RedriveMessage(ctx context.Context, params *RedriveMessageIn expression.Name("queue_type"), expression.Value(message.QueueType), ).Set( - expression.Name("status"), - expression.Value(message.Status), + expression.Name("visibility_timeout"), + expression.Value(message.VisibilityTimeout), ).Set( expression.Name("last_updated_timestamp"), expression.Value(message.LastUpdatedTimestamp), @@ -527,7 +511,7 @@ func (c *client[T]) RedriveMessage(ctx context.Context, params *RedriveMessageIn } return &RedriveMessageOutput{ ID: updated.ID, - Status: updated.Status, + Status: updated.GetStatus(c.clock.Now()), LastUpdatedTimestamp: updated.LastUpdatedTimestamp, Version: updated.Version, }, nil @@ -610,22 +594,22 @@ func (c *client[T]) processQueryItemsForQueueStats(items []map[string]types.Attr return UnmarshalingAttributeError{Cause: err} } - updateQueueStatsFromItem[T](&item, stats) + c.updateQueueStatsFromItem(&item, stats) } return nil } const maxFirst100ItemsInQueue = 100 -func updateQueueStatsFromItem[T any](item *Message[T], stats *GetQueueStatsOutput) { - if item.Status == StatusProcessing { +func (c *client[T]) updateQueueStatsFromItem(message *Message[T], stats *GetQueueStatsOutput) { + if message.GetStatus(c.clock.Now()) == StatusProcessing { stats.TotalRecordsInProcessing++ if len(stats.First100SelectedIDsInQueue) < maxFirst100ItemsInQueue { - stats.First100SelectedIDsInQueue = append(stats.First100SelectedIDsInQueue, item.ID) + stats.First100SelectedIDsInQueue = append(stats.First100SelectedIDsInQueue, message.ID) } } if len(stats.First100IDsInQueue) < maxFirst100ItemsInQueue { - stats.First100IDsInQueue = append(stats.First100IDsInQueue, item.ID) + stats.First100IDsInQueue = append(stats.First100IDsInQueue, message.ID) } } diff --git a/client_test.go b/client_test.go index 8e8da61..b7db7e1 100644 --- a/client_test.go +++ b/client_test.go @@ -120,17 +120,17 @@ func TestDynamoMQClientShouldReturnError(t *testing.T) { wantError: &dynamomq.IDDuplicatedError{}, }, { - name: "UpdateMessageAsVisible should return IDNotProvidedError", + name: "ChangeMessageVisibility should return IDNotProvidedError", operation: func() error { - _, err := client.UpdateMessageAsVisible(context.Background(), nil) + _, err := client.ChangeMessageVisibility(context.Background(), nil) return err }, wantError: &dynamomq.IDNotProvidedError{}, }, { - name: "UpdateMessageAsVisible should return IDNotFoundError", + name: "ChangeMessageVisibility should return IDNotFoundError", operation: func() error { - _, err := client.UpdateMessageAsVisible(context.Background(), &dynamomq.UpdateMessageAsVisibleInput{ + _, err := client.ChangeMessageVisibility(context.Background(), &dynamomq.ChangeMessageVisibilityInput{ ID: "B-101", }) return err @@ -273,7 +273,7 @@ func TestDynamoMQClientReceiveMessage(t *testing.T) { r := &dynamomq.ReceiveMessageOutput[test.MessageData]{ Result: &dynamomq.Result{ ID: m.ID, - Status: m.Status, + Status: dynamomq.StatusProcessing, LastUpdatedTimestamp: m.LastUpdatedTimestamp, Version: m.Version, }, @@ -296,7 +296,9 @@ func TestDynamoMQClientReceiveMessage(t *testing.T) { } runTestsParallel[any, *dynamomq.ReceiveMessageOutput[test.MessageData]](t, "ReceiveMessage()", tests, func(client dynamomq.Client[test.MessageData], _ any) (*dynamomq.ReceiveMessageOutput[test.MessageData], error) { - return client.ReceiveMessage(context.Background(), nil) + return client.ReceiveMessage(context.Background(), &dynamomq.ReceiveMessageInput{ + VisibilityTimeout: dynamomq.DefaultVisibilityTimeoutInSeconds, + }) }) } @@ -319,7 +321,9 @@ func testDynamoMQClientReceiveMessageSequence(t *testing.T, useFIFO bool) { } for i, want := range wants { - result, err := client.ReceiveMessage(ctx, &dynamomq.ReceiveMessageInput{}) + result, err := client.ReceiveMessage(ctx, &dynamomq.ReceiveMessageInput{ + VisibilityTimeout: dynamomq.DefaultVisibilityTimeoutInSeconds, + }) test.AssertError(t, err, nil, fmt.Sprintf("ReceiveMessage() [%d-1]", i)) test.AssertDeepEqual(t, result, want, fmt.Sprintf("ReceiveMessage() [%d-2]", i)) @@ -327,7 +331,9 @@ func testDynamoMQClientReceiveMessageSequence(t *testing.T, useFIFO bool) { return } - _, err = client.ReceiveMessage(ctx, &dynamomq.ReceiveMessageInput{}) + _, err = client.ReceiveMessage(ctx, &dynamomq.ReceiveMessageInput{ + VisibilityTimeout: dynamomq.DefaultVisibilityTimeoutInSeconds, + }) test.AssertError(t, err, &dynamomq.EmptyQueueError{}, fmt.Sprintf("ReceiveMessage() [%d-3]", i)) _, err = client.DeleteMessage(ctx, &dynamomq.DeleteMessageInput{ @@ -336,7 +342,9 @@ func testDynamoMQClientReceiveMessageSequence(t *testing.T, useFIFO bool) { test.AssertError(t, err, nil, fmt.Sprintf("DeleteMessage() [%d]", i)) } - _, err := client.ReceiveMessage(ctx, &dynamomq.ReceiveMessageInput{}) + _, err := client.ReceiveMessage(ctx, &dynamomq.ReceiveMessageInput{ + VisibilityTimeout: dynamomq.DefaultVisibilityTimeoutInSeconds, + }) test.AssertError(t, err, &dynamomq.EmptyQueueError{}, "ReceiveMessage() [last]") } @@ -356,7 +364,7 @@ func TestDynamoMQClientUpdateMessageAsVisible(t *testing.T) { id string } now := test.DefaultTestDate.Add(10 * time.Second) - tests := []ClientTestCase[args, *dynamomq.UpdateMessageAsVisibleOutput[test.MessageData]]{ + tests := []ClientTestCase[args, *dynamomq.ChangeMessageVisibilityOutput[test.MessageData]]{ { name: "should succeed when id is found", setup: NewSetupFunc(newPutRequestWithProcessingItem("A-101", now)), @@ -366,7 +374,7 @@ func TestDynamoMQClientUpdateMessageAsVisible(t *testing.T) { args: args{ id: "A-101", }, - want: &dynamomq.UpdateMessageAsVisibleOutput[test.MessageData]{ + want: &dynamomq.ChangeMessageVisibilityOutput[test.MessageData]{ Result: &dynamomq.Result{ ID: "A-101", Status: dynamomq.StatusReady, @@ -381,26 +389,10 @@ func TestDynamoMQClientUpdateMessageAsVisible(t *testing.T) { }(), }, }, - { - name: "should return InvalidStateTransitionError when status is ready", - setup: NewSetupFunc(newPutRequestWithReadyItem("A-101", now)), - sdkClock: mock.Clock{ - T: now, - }, - args: args{ - id: "A-101", - }, - want: &dynamomq.UpdateMessageAsVisibleOutput[test.MessageData]{}, - wantErr: dynamomq.InvalidStateTransitionError{ - Msg: "message is currently ready", - Operation: "mark as ready", - Current: dynamomq.StatusReady, - }, - }, } - runTestsParallel[args, *dynamomq.UpdateMessageAsVisibleOutput[test.MessageData]](t, "UpdateMessageAsVisible()", tests, - func(client dynamomq.Client[test.MessageData], args args) (*dynamomq.UpdateMessageAsVisibleOutput[test.MessageData], error) { - return client.UpdateMessageAsVisible(context.Background(), &dynamomq.UpdateMessageAsVisibleInput{ + runTestsParallel[args, *dynamomq.ChangeMessageVisibilityOutput[test.MessageData]](t, "ChangeMessageVisibility()", tests, + func(client dynamomq.Client[test.MessageData], args args) (*dynamomq.ChangeMessageVisibilityOutput[test.MessageData], error) { + return client.ChangeMessageVisibility(context.Background(), &dynamomq.ChangeMessageVisibilityInput{ ID: args.id, }) }) @@ -450,7 +442,7 @@ func TestDynamoMQClientMoveMessageToDLQ(t *testing.T) { s := NewTestMessageItemAsDLQ("A-101", test.DefaultTestDate) r := &dynamomq.MoveMessageToDLQOutput{ ID: s.ID, - Status: s.Status, + Status: dynamomq.StatusReady, LastUpdatedTimestamp: s.LastUpdatedTimestamp, Version: s.Version, } @@ -473,7 +465,7 @@ func TestDynamoMQClientMoveMessageToDLQ(t *testing.T) { m.Version = 2 r := &dynamomq.MoveMessageToDLQOutput{ ID: m.ID, - Status: m.Status, + Status: dynamomq.StatusReady, LastUpdatedTimestamp: m.LastUpdatedTimestamp, Version: m.Version, } @@ -822,7 +814,6 @@ func prepareTestClient(ctx context.Context, t *testing.T, dynamomq.WithAWSDynamoDBClient(raw), mock.WithClock(sdkClock), dynamomq.WithUseFIFO(useFIFO), - dynamomq.WithAWSVisibilityTimeout(1), dynamomq.WithAWSRetryMaxAttempts(dynamomq.DefaultRetryMaxAttempts), WithUnmarshalMap(unmarshalMap), WithMarshalMap(marshalMap), @@ -945,9 +936,9 @@ func TestTestDynamoMQClientReturnUnmarshalingAttributeError(t *testing.T) { }, }, { - name: "UpdateMessageAsVisible should return UnmarshalingAttributeError", + name: "ChangeMessageVisibility should return UnmarshalingAttributeError", operation: func() (any, error) { - return client.UpdateMessageAsVisible(context.Background(), &dynamomq.UpdateMessageAsVisibleInput{ + return client.ChangeMessageVisibility(context.Background(), &dynamomq.ChangeMessageVisibilityInput{ ID: "A-101", }) }, diff --git a/consumer.go b/consumer.go index 22dcafd..5f31d45 100644 --- a/consumer.go +++ b/consumer.go @@ -166,7 +166,7 @@ func (c *Consumer[T]) shouldRetry(msg *Message[T]) bool { } func (c *Consumer[T]) retryMessage(ctx context.Context, msg *Message[T]) { - if _, err := c.client.UpdateMessageAsVisible(ctx, &UpdateMessageAsVisibleInput{ID: msg.ID}); err != nil { + if _, err := c.client.ChangeMessageVisibility(ctx, &ChangeMessageVisibilityInput{ID: msg.ID}); err != nil { c.logf("DynamoMQ: Failed to update a message as visible. %s", err) } } diff --git a/consumer_test.go b/consumer_test.go index 9367be8..4f342bb 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -284,11 +284,11 @@ func NewClientForConsumerTest(queue, dlq chan *dynamomq.Message[test.MessageData store.Delete(params.ID) return &dynamomq.DeleteMessageOutput{}, nil }, - UpdateMessageAsVisibleFunc: func(ctx context.Context, params *dynamomq.UpdateMessageAsVisibleInput) (*dynamomq.UpdateMessageAsVisibleOutput[test.MessageData], error) { + ChangeMessageVisibilityFunc: func(ctx context.Context, params *dynamomq.ChangeMessageVisibilityInput) (*dynamomq.ChangeMessageVisibilityOutput[test.MessageData], error) { if cfg.SimulateMessageAsVisibleError { return nil, test.ErrTest } - return &dynamomq.UpdateMessageAsVisibleOutput[test.MessageData]{}, nil + return &dynamomq.ChangeMessageVisibilityOutput[test.MessageData]{}, nil }, MoveMessageToDLQFunc: func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput, error) { if cfg.SimulateMoveMessageToDLQError { @@ -299,7 +299,7 @@ func NewClientForConsumerTest(queue, dlq chan *dynamomq.Message[test.MessageData dlq <- msg return &dynamomq.MoveMessageToDLQOutput{ ID: msg.ID, - Status: msg.Status, + Status: dynamomq.StatusReady, LastUpdatedTimestamp: msg.LastUpdatedTimestamp, Version: 2, }, nil diff --git a/dynamomq_test.go b/dynamomq_test.go index fb5d41f..eea841d 100644 --- a/dynamomq_test.go +++ b/dynamomq_test.go @@ -10,13 +10,13 @@ import ( func MarkAsReady[T any](m *dynamomq.Message[T], now time.Time) { ts := clock.FormatRFC3339Nano(now) - m.Status = dynamomq.StatusReady + m.VisibilityTimeout = 0 m.LastUpdatedTimestamp = ts } func MarkAsProcessing[T any](m *dynamomq.Message[T], now time.Time) { ts := clock.FormatRFC3339Nano(now) - m.Status = dynamomq.StatusProcessing + m.VisibilityTimeout = dynamomq.DefaultVisibilityTimeoutInSeconds m.LastUpdatedTimestamp = ts m.PeekFromQueueTimestamp = ts } @@ -24,7 +24,7 @@ func MarkAsProcessing[T any](m *dynamomq.Message[T], now time.Time) { func MarkAsMovedToDLQ[T any](m *dynamomq.Message[T], now time.Time) { ts := clock.FormatRFC3339Nano(now) m.QueueType = dynamomq.QueueTypeDLQ - m.Status = dynamomq.StatusReady + m.VisibilityTimeout = 0 m.ReceiveCount = 0 m.LastUpdatedTimestamp = ts m.AddToQueueTimestamp = ts @@ -53,10 +53,11 @@ func NewMessageFromReadyToProcessing(id string, MarkAsProcessing(m, processingTime) m.Version = 2 m.ReceiveCount = 1 + m.VisibilityTimeout = dynamomq.DefaultVisibilityTimeoutInSeconds r := &dynamomq.ReceiveMessageOutput[test.MessageData]{ Result: &dynamomq.Result{ ID: m.ID, - Status: m.Status, + Status: dynamomq.StatusProcessing, LastUpdatedTimestamp: m.LastUpdatedTimestamp, Version: m.Version, }, diff --git a/internal/clock/clock.go b/internal/clock/clock.go index f6bd819..bd1ff37 100644 --- a/internal/clock/clock.go +++ b/internal/clock/clock.go @@ -11,10 +11,15 @@ func FormatRFC3339Nano(now time.Time) string { } func RFC3339NanoToUnixMilli(rfc3339NanoDate string) int64 { - t, _ := time.Parse(time.RFC3339Nano, rfc3339NanoDate) + t := RFC3339NanoToTime(rfc3339NanoDate) return t.UnixMilli() } +func RFC3339NanoToTime(rfc3339NanoDate string) time.Time { + t, _ := time.Parse(time.RFC3339Nano, rfc3339NanoDate) + return t +} + type Clock interface { Now() time.Time } diff --git a/internal/cmd/fail.go b/internal/cmd/fail.go index 943a06a..b39198e 100644 --- a/internal/cmd/fail.go +++ b/internal/cmd/fail.go @@ -19,7 +19,7 @@ func (f CommandFactory) CreateFailCommand(flgs *Flags) *cobra.Command { if err != nil { return err } - _, err = client.UpdateMessageAsVisible(ctx, &dynamomq.UpdateMessageAsVisibleInput{ + _, err = client.ChangeMessageVisibility(ctx, &dynamomq.ChangeMessageVisibilityInput{ ID: flgs.ID, }) if err != nil { diff --git a/internal/cmd/interactive.go b/internal/cmd/interactive.go index bf99468..ff7aea5 100644 --- a/internal/cmd/interactive.go +++ b/internal/cmd/interactive.go @@ -155,7 +155,7 @@ func (c *Interactive) ls(ctx context.Context, _ []string) error { } fmt.Println("List messages of first 10 IDs:") for _, m := range out.Messages { - fmt.Printf("* ID: %s, status: %s", m.ID, m.Status) + fmt.Printf("* ID: %s, status: %s", m.ID, m.GetStatus(clock.Now())) } return nil } @@ -312,7 +312,7 @@ func (c *Interactive) fail(ctx context.Context, _ []string) error { if c.Message == nil { return errorCLIModeRestriction("`fail`") } - _, err := c.Client.UpdateMessageAsVisible(ctx, &dynamomq.UpdateMessageAsVisibleInput{ + _, err := c.Client.ChangeMessageVisibility(ctx, &dynamomq.ChangeMessageVisibilityInput{ ID: c.Message.ID, }) if err != nil { diff --git a/internal/cmd/interactive_test.go b/internal/cmd/interactive_test.go index 604c8bb..7f2a314 100644 --- a/internal/cmd/interactive_test.go +++ b/internal/cmd/interactive_test.go @@ -381,8 +381,8 @@ func TestRunInteractiveFailReturnError(t *testing.T) { { name: "should failed when fail to get message", client: mock.Client[any]{ - UpdateMessageAsVisibleFunc: func(ctx context.Context, params *dynamomq.UpdateMessageAsVisibleInput) (*dynamomq.UpdateMessageAsVisibleOutput[any], error) { - return &dynamomq.UpdateMessageAsVisibleOutput[any]{}, nil + ChangeMessageVisibilityFunc: func(ctx context.Context, params *dynamomq.ChangeMessageVisibilityInput) (*dynamomq.ChangeMessageVisibilityOutput[any], error) { + return &dynamomq.ChangeMessageVisibilityOutput[any]{}, nil }, GetMessageFunc: func(ctx context.Context, params *dynamomq.GetMessageInput) (*dynamomq.GetMessageOutput[any], error) { return &dynamomq.GetMessageOutput[any]{}, test.ErrTest @@ -392,8 +392,8 @@ func TestRunInteractiveFailReturnError(t *testing.T) { { name: "should failed when message is not found", client: mock.Client[any]{ - UpdateMessageAsVisibleFunc: func(ctx context.Context, params *dynamomq.UpdateMessageAsVisibleInput) (*dynamomq.UpdateMessageAsVisibleOutput[any], error) { - return &dynamomq.UpdateMessageAsVisibleOutput[any]{}, nil + ChangeMessageVisibilityFunc: func(ctx context.Context, params *dynamomq.ChangeMessageVisibilityInput) (*dynamomq.ChangeMessageVisibilityOutput[any], error) { + return &dynamomq.ChangeMessageVisibilityOutput[any]{}, nil }, GetMessageFunc: func(ctx context.Context, params *dynamomq.GetMessageInput) (*dynamomq.GetMessageOutput[any], error) { return &dynamomq.GetMessageOutput[any]{}, nil @@ -403,8 +403,8 @@ func TestRunInteractiveFailReturnError(t *testing.T) { { name: "should failed when fail to get queue stats", client: mock.Client[any]{ - UpdateMessageAsVisibleFunc: func(ctx context.Context, params *dynamomq.UpdateMessageAsVisibleInput) (*dynamomq.UpdateMessageAsVisibleOutput[any], error) { - return &dynamomq.UpdateMessageAsVisibleOutput[any]{}, nil + ChangeMessageVisibilityFunc: func(ctx context.Context, params *dynamomq.ChangeMessageVisibilityInput) (*dynamomq.ChangeMessageVisibilityOutput[any], error) { + return &dynamomq.ChangeMessageVisibilityOutput[any]{}, nil }, GetMessageFunc: func(ctx context.Context, params *dynamomq.GetMessageInput) (*dynamomq.GetMessageOutput[any], error) { return &dynamomq.GetMessageOutput[any]{ diff --git a/internal/cmd/ls.go b/internal/cmd/ls.go index e3823ed..98ad46e 100644 --- a/internal/cmd/ls.go +++ b/internal/cmd/ls.go @@ -5,6 +5,7 @@ import ( "github.com/spf13/cobra" "github.com/vvatanabe/dynamomq" + "github.com/vvatanabe/dynamomq/internal/clock" ) func (f CommandFactory) CreateLSCommand(flgs *Flags) *cobra.Command { @@ -26,7 +27,7 @@ func (f CommandFactory) CreateLSCommand(flgs *Flags) *cobra.Command { for _, m := range out.Messages { result.Statuses = append(result.Statuses, Status{ ID: m.ID, - Status: m.Status, + Status: m.GetStatus(clock.Now()), QueueType: m.QueueType, }) } diff --git a/internal/cmd/root.go b/internal/cmd/root.go index 795923e..76c0a3f 100644 --- a/internal/cmd/root.go +++ b/internal/cmd/root.go @@ -12,6 +12,7 @@ import ( "github.com/aws/aws-sdk-go-v2/config" "github.com/spf13/cobra" "github.com/vvatanabe/dynamomq" + "github.com/vvatanabe/dynamomq/internal/clock" ) type CommandFactory struct { @@ -131,7 +132,7 @@ type SystemInfo struct { func GetSystemInfo[T any](m *dynamomq.Message[T]) *SystemInfo { return &SystemInfo{ ID: m.ID, - Status: m.Status, + Status: m.GetStatus(clock.Now()), ReceiveCount: m.ReceiveCount, QueueType: m.QueueType, Version: m.Version, @@ -144,8 +145,8 @@ func GetSystemInfo[T any](m *dynamomq.Message[T]) *SystemInfo { func ResetSystemInfo[T any](m *dynamomq.Message[T], now time.Time) { msg := dynamomq.NewMessage[T](m.ID, m.Data, now) - m.Status = msg.Status m.QueueType = msg.QueueType + m.VisibilityTimeout = msg.VisibilityTimeout m.ReceiveCount = msg.ReceiveCount m.Version = msg.Version m.CreationTimestamp = msg.CreationTimestamp diff --git a/internal/mock/mock.go b/internal/mock/mock.go index 709af80..abc9e29 100644 --- a/internal/mock/mock.go +++ b/internal/mock/mock.go @@ -12,17 +12,17 @@ import ( var ErrNotImplemented = errors.New("not implemented") type Client[T any] struct { - SendMessageFunc func(ctx context.Context, params *dynamomq.SendMessageInput[T]) (*dynamomq.SendMessageOutput[T], error) - ReceiveMessageFunc func(ctx context.Context, params *dynamomq.ReceiveMessageInput) (*dynamomq.ReceiveMessageOutput[T], error) - UpdateMessageAsVisibleFunc func(ctx context.Context, params *dynamomq.UpdateMessageAsVisibleInput) (*dynamomq.UpdateMessageAsVisibleOutput[T], error) - DeleteMessageFunc func(ctx context.Context, params *dynamomq.DeleteMessageInput) (*dynamomq.DeleteMessageOutput, error) - MoveMessageToDLQFunc func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput, error) - RedriveMessageFunc func(ctx context.Context, params *dynamomq.RedriveMessageInput) (*dynamomq.RedriveMessageOutput, error) - GetMessageFunc func(ctx context.Context, params *dynamomq.GetMessageInput) (*dynamomq.GetMessageOutput[T], error) - GetQueueStatsFunc func(ctx context.Context, params *dynamomq.GetQueueStatsInput) (*dynamomq.GetQueueStatsOutput, error) - GetDLQStatsFunc func(ctx context.Context, params *dynamomq.GetDLQStatsInput) (*dynamomq.GetDLQStatsOutput, error) - ListMessagesFunc func(ctx context.Context, params *dynamomq.ListMessagesInput) (*dynamomq.ListMessagesOutput[T], error) - ReplaceMessageFunc func(ctx context.Context, params *dynamomq.ReplaceMessageInput[T]) (*dynamomq.ReplaceMessageOutput, error) + SendMessageFunc func(ctx context.Context, params *dynamomq.SendMessageInput[T]) (*dynamomq.SendMessageOutput[T], error) + ReceiveMessageFunc func(ctx context.Context, params *dynamomq.ReceiveMessageInput) (*dynamomq.ReceiveMessageOutput[T], error) + ChangeMessageVisibilityFunc func(ctx context.Context, params *dynamomq.ChangeMessageVisibilityInput) (*dynamomq.ChangeMessageVisibilityOutput[T], error) + DeleteMessageFunc func(ctx context.Context, params *dynamomq.DeleteMessageInput) (*dynamomq.DeleteMessageOutput, error) + MoveMessageToDLQFunc func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput, error) + RedriveMessageFunc func(ctx context.Context, params *dynamomq.RedriveMessageInput) (*dynamomq.RedriveMessageOutput, error) + GetMessageFunc func(ctx context.Context, params *dynamomq.GetMessageInput) (*dynamomq.GetMessageOutput[T], error) + GetQueueStatsFunc func(ctx context.Context, params *dynamomq.GetQueueStatsInput) (*dynamomq.GetQueueStatsOutput, error) + GetDLQStatsFunc func(ctx context.Context, params *dynamomq.GetDLQStatsInput) (*dynamomq.GetDLQStatsOutput, error) + ListMessagesFunc func(ctx context.Context, params *dynamomq.ListMessagesInput) (*dynamomq.ListMessagesOutput[T], error) + ReplaceMessageFunc func(ctx context.Context, params *dynamomq.ReplaceMessageInput[T]) (*dynamomq.ReplaceMessageOutput, error) } func (m Client[T]) SendMessage(ctx context.Context, params *dynamomq.SendMessageInput[T]) (*dynamomq.SendMessageOutput[T], error) { @@ -39,9 +39,9 @@ func (m Client[T]) ReceiveMessage(ctx context.Context, params *dynamomq.ReceiveM return nil, ErrNotImplemented } -func (m Client[T]) UpdateMessageAsVisible(ctx context.Context, params *dynamomq.UpdateMessageAsVisibleInput) (*dynamomq.UpdateMessageAsVisibleOutput[T], error) { - if m.UpdateMessageAsVisibleFunc != nil { - return m.UpdateMessageAsVisibleFunc(ctx, params) +func (m Client[T]) ChangeMessageVisibility(ctx context.Context, params *dynamomq.ChangeMessageVisibilityInput) (*dynamomq.ChangeMessageVisibilityOutput[T], error) { + if m.ChangeMessageVisibilityFunc != nil { + return m.ChangeMessageVisibilityFunc(ctx, params) } return nil, ErrNotImplemented } @@ -115,8 +115,8 @@ var SuccessfulMockClient = &Client[any]{ PeekedMessageObject: &dynamomq.Message[any]{}, }, nil }, - UpdateMessageAsVisibleFunc: func(ctx context.Context, params *dynamomq.UpdateMessageAsVisibleInput) (*dynamomq.UpdateMessageAsVisibleOutput[any], error) { - return &dynamomq.UpdateMessageAsVisibleOutput[any]{ + ChangeMessageVisibilityFunc: func(ctx context.Context, params *dynamomq.ChangeMessageVisibilityInput) (*dynamomq.ChangeMessageVisibilityOutput[any], error) { + return &dynamomq.ChangeMessageVisibilityOutput[any]{ Result: &dynamomq.Result{}, Message: &dynamomq.Message[any]{}, }, nil diff --git a/internal/mock/mock_test.go b/internal/mock/mock_test.go index 5c9ef44..580e8d5 100644 --- a/internal/mock/mock_test.go +++ b/internal/mock/mock_test.go @@ -31,9 +31,9 @@ func TestMockClient(t *testing.T) { }, }, { - name: "UpdateMessageAsVisible", + name: "ChangeMessageVisibility", method: func(client *mock.Client[any]) (any, error) { - return client.UpdateMessageAsVisible(ctx, nil) + return client.ChangeMessageVisibility(ctx, nil) }, }, { diff --git a/message.go b/message.go index 51ddc7b..db91dc4 100644 --- a/message.go +++ b/message.go @@ -11,8 +11,8 @@ func NewMessage[T any](id string, data T, now time.Time) *Message[T] { return &Message[T]{ ID: id, Data: data, - Status: StatusReady, ReceiveCount: 0, + VisibilityTimeout: 0, QueueType: QueueTypeStandard, Version: 1, CreationTimestamp: ts, @@ -23,9 +23,10 @@ func NewMessage[T any](id string, data T, now time.Time) *Message[T] { } type Message[T any] struct { - ID string `json:"id" dynamodbav:"id"` - Data T `json:"data" dynamodbav:"data"` - Status Status `json:"status" dynamodbav:"status"` + ID string `json:"id" dynamodbav:"id"` + Data T `json:"data" dynamodbav:"data"` + // The new value for the message's visibility timeout (in seconds). + VisibilityTimeout int `json:"visibility_timeout" dynamodbav:"visibility_timeout"` ReceiveCount int `json:"receive_count" dynamodbav:"receive_count"` QueueType QueueType `json:"queue_type" dynamodbav:"queue_type,omitempty"` Version int `json:"version" dynamodbav:"version"` @@ -35,45 +36,38 @@ type Message[T any] struct { PeekFromQueueTimestamp string `json:"queue_peek_timestamp" dynamodbav:"queue_peek_timestamp"` } -func (m *Message[T]) isQueueSelected(now time.Time, visibilityTimeout time.Duration) bool { - if m.Status != StatusProcessing { - return false +func (m *Message[T]) GetStatus(now time.Time) Status { + peekUTCTime := clock.RFC3339NanoToTime(m.PeekFromQueueTimestamp) + invisibleTime := peekUTCTime.Add(time.Duration(m.VisibilityTimeout) * time.Second) + if now.Before(invisibleTime) { + return StatusProcessing } - peekUTCTimestamp := clock.RFC3339NanoToUnixMilli(m.PeekFromQueueTimestamp) - timeDifference := now.UnixMilli() - peekUTCTimestamp - return timeDifference <= visibilityTimeout.Milliseconds() + return StatusReady } func (m *Message[T]) isDLQ() bool { return m.QueueType == QueueTypeDLQ } -func (m *Message[T]) markAsReady(now time.Time) error { - if m.Status != StatusProcessing { - return InvalidStateTransitionError{ - Msg: "message is currently ready", - Operation: "mark as ready", - Current: m.Status, - } - } +func (m *Message[T]) changeVisibilityTimeout(now time.Time, visibilityTimeout int) { ts := clock.FormatRFC3339Nano(now) - m.Status = StatusReady m.LastUpdatedTimestamp = ts - return nil + m.VisibilityTimeout = visibilityTimeout } -func (m *Message[T]) markAsProcessing(now time.Time, visibilityTimeout time.Duration) error { - if m.isQueueSelected(now, visibilityTimeout) { +func (m *Message[T]) markAsProcessing(now time.Time, visibilityTimeout int) error { + status := m.GetStatus(now) + if status == StatusProcessing { return InvalidStateTransitionError{ Msg: "message is currently being processed", Operation: "mark as processing", - Current: m.Status, + Current: status, } } ts := clock.FormatRFC3339Nano(now) - m.Status = StatusProcessing m.LastUpdatedTimestamp = ts m.PeekFromQueueTimestamp = ts + m.VisibilityTimeout = visibilityTimeout return nil } @@ -82,37 +76,39 @@ func (m *Message[T]) markAsMovedToDLQ(now time.Time) error { return InvalidStateTransitionError{ Msg: "message is already in DLQ", Operation: "mark as moved to DLQ", - Current: m.Status, + Current: m.GetStatus(now), } } ts := clock.FormatRFC3339Nano(now) m.QueueType = QueueTypeDLQ - m.Status = StatusReady m.ReceiveCount = 0 + m.VisibilityTimeout = 0 m.LastUpdatedTimestamp = ts m.AddToQueueTimestamp = ts m.PeekFromQueueTimestamp = "" return nil } -func (m *Message[T]) markAsRestoredFromDLQ(now time.Time, visibilityTimeout time.Duration) error { +func (m *Message[T]) markAsRestoredFromDLQ(now time.Time, visibilityTimeout int) error { + status := m.GetStatus(now) if !m.isDLQ() { return InvalidStateTransitionError{ Msg: "can only redrive messages from DLQ", Operation: "mark as restored from DLQ", - Current: m.Status, + Current: status, } } - if m.isQueueSelected(now, visibilityTimeout) { + if status == StatusProcessing { return InvalidStateTransitionError{ Msg: "can only redrive messages from READY", Operation: "mark as restored from DLQ", - Current: m.Status, + Current: status, } } ts := clock.FormatRFC3339Nano(now) m.QueueType = QueueTypeStandard - m.Status = StatusReady + m.VisibilityTimeout = visibilityTimeout + m.ReceiveCount = 0 m.LastUpdatedTimestamp = ts m.AddToQueueTimestamp = ts return nil diff --git a/message_test.go b/message_test.go new file mode 100644 index 0000000..f7c4219 --- /dev/null +++ b/message_test.go @@ -0,0 +1,75 @@ +package dynamomq_test + +import ( + "testing" + "time" + + "github.com/vvatanabe/dynamomq" + "github.com/vvatanabe/dynamomq/internal/test" + "github.com/vvatanabe/dynamomq/internal/clock" +) + +func TestMessageGetStatus(t *testing.T) { + type args struct { + now time.Time + } + type testCase[T any] struct { + name string + m dynamomq.Message[T] + args args + want dynamomq.Status + } + tests := []testCase[any]{ + { + name: "should return StatusReady when VisibilityTimeout is 0", + m: dynamomq.Message[any]{ + VisibilityTimeout: 0, + PeekFromQueueTimestamp: clock.FormatRFC3339Nano(test.DefaultTestDate), + }, + args: args{ + now: test.DefaultTestDate, + }, + want: dynamomq.StatusReady, + }, + { + name: "should return StatusProcessing when current time is before VisibilityTimeout", + m: dynamomq.Message[any]{ + VisibilityTimeout: 1, + PeekFromQueueTimestamp: clock.FormatRFC3339Nano(test.DefaultTestDate.Add(time.Second)), + }, + args: args{ + now: test.DefaultTestDate.Add(time.Second), + }, + want: dynamomq.StatusProcessing, + }, + { + name: "should return StatusReady when current time is after VisibilityTimeout", + m: dynamomq.Message[any]{ + VisibilityTimeout: 5, + PeekFromQueueTimestamp: clock.FormatRFC3339Nano(test.DefaultTestDate), + }, + args: args{ + now: test.DefaultTestDate.Add(time.Second * 6), + }, + want: dynamomq.StatusReady, + }, + { + name: "should return StatusProcessing when current time is equal VisibilityTimeout", + m: dynamomq.Message[any]{ + VisibilityTimeout: 4, + PeekFromQueueTimestamp: clock.FormatRFC3339Nano(test.DefaultTestDate.Add(time.Second * 4)), + }, + args: args{ + now: time.Date(2021, 1, 1, 0, 0, 4, 0, time.UTC), + }, + want: dynamomq.StatusProcessing, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.m.GetStatus(tt.args.now); got != tt.want { + t.Errorf("GetStatus() = %v, want %v", got, tt.want) + } + }) + } +}