Skip to content

Commit

Permalink
refactor: change RedriveMessageOutput to include generic type
Browse files Browse the repository at this point in the history
  • Loading branch information
vvatanabe committed Dec 11, 2023
1 parent 91874f3 commit 203c19d
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 30 deletions.
26 changes: 10 additions & 16 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Client[T any] interface {
// MoveMessageToDLQ moves a specific message from a DynamoDB-based queue to a Dead Letter Queue (DLQ).
MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput[T], error)
// RedriveMessage restore a specific message from a DynamoDB-based Dead Letter Queue (DLQ).
RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput, error)
RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput[T], error)
// GetMessage get a specific message from a DynamoDB-based queue.
GetMessage(ctx context.Context, params *GetMessageInput) (*GetMessageOutput[T], error)
// GetQueueStats is a method for obtaining statistical information about a DynamoDB-based queue.
Expand Down Expand Up @@ -529,33 +529,30 @@ type RedriveMessageInput struct {
ID string
}

type RedriveMessageOutput struct {
ID string `json:"id"`
Status Status `json:"status"`
UpdatedAt string `json:"updated_at"`
Version int `json:"version"`
type RedriveMessageOutput[T any] struct {
RedroveMessage *Message[T]
}

// RedriveMessage restore a specific message from a DynamoDB-based Dead Letter Queue (DLQ).
// It locates the message based on the specified message ID and marks it as restored from the DLQ to the standard queue.
// This process is essential for reprocessing messages that have failed to be processed and is a crucial function in error handling within the message queue system.
func (c *ClientImpl[T]) RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput, error) {
func (c *ClientImpl[T]) RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput[T], error) {
if params == nil {
params = &RedriveMessageInput{}
}
retrieved, err := c.GetMessage(ctx, &GetMessageInput{
ID: params.ID,
})
if err != nil {
return &RedriveMessageOutput{}, err
return &RedriveMessageOutput[T]{}, err
}
if retrieved.Message == nil {
return &RedriveMessageOutput{}, &IDNotFoundError{}
return &RedriveMessageOutput[T]{}, &IDNotFoundError{}
}
message := retrieved.Message
err = message.markAsRestoredFromDLQ(c.clock.Now())
if err != nil {
return &RedriveMessageOutput{}, err
return &RedriveMessageOutput[T]{}, err
}
builder := expression.NewBuilder().
WithUpdate(expression.Add(
Expand All @@ -582,13 +579,10 @@ func (c *ClientImpl[T]) RedriveMessage(ctx context.Context, params *RedriveMessa
}
updated, err := c.updateDynamoDBItem(ctx, params.ID, &expr)
if err != nil {
return &RedriveMessageOutput{}, err
return &RedriveMessageOutput[T]{}, err
}
return &RedriveMessageOutput{
ID: updated.ID,
Status: updated.GetStatus(c.clock.Now()),
UpdatedAt: updated.UpdatedAt,
Version: updated.Version,
return &RedriveMessageOutput[T]{
RedroveMessage: updated,
}, nil
}

Expand Down
22 changes: 12 additions & 10 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ func TestDynamoMQClientRedriveMessage(t *testing.T) {
type args struct {
id string
}
tests := []ClientTestCase[args, *dynamomq.RedriveMessageOutput]{
tests := []ClientTestCase[args, *dynamomq.RedriveMessageOutput[test.MessageData]]{
{
name: "should succeed when id is found and status is ready",
setup: NewSetupFunc(newPutRequestWithDLQItem("A-101", test.DefaultTestDate)),
Expand All @@ -496,11 +496,13 @@ func TestDynamoMQClientRedriveMessage(t *testing.T) {
args: args{
id: "A-101",
},
want: &dynamomq.RedriveMessageOutput{
ID: "A-101",
Status: dynamomq.StatusReady,
UpdatedAt: clock.FormatRFC3339Nano(test.DefaultTestDate.Add(10 * time.Second)),
Version: 2,
want: &dynamomq.RedriveMessageOutput[test.MessageData]{
RedroveMessage: func() *dynamomq.Message[test.MessageData] {
m := NewTestMessageItemAsDLQ("A-101", test.DefaultTestDate)
MarkAsRestoredFromDLQ(m, test.DefaultTestDate.Add(10*time.Second))
m.Version = 2
return m
}(),
},
},
{
Expand All @@ -512,7 +514,7 @@ func TestDynamoMQClientRedriveMessage(t *testing.T) {
args: args{
id: "A-101",
},
want: &dynamomq.RedriveMessageOutput{},
want: &dynamomq.RedriveMessageOutput[test.MessageData]{},
wantErr: dynamomq.InvalidStateTransitionError{
Msg: "can only redrive messages from DLQ",
Operation: "mark as restored from DLQ",
Expand All @@ -534,16 +536,16 @@ func TestDynamoMQClientRedriveMessage(t *testing.T) {
args: args{
id: "A-101",
},
want: &dynamomq.RedriveMessageOutput{},
want: &dynamomq.RedriveMessageOutput[test.MessageData]{},
wantErr: dynamomq.InvalidStateTransitionError{
Msg: "can only redrive messages from READY",
Operation: "mark as restored from DLQ",
Current: dynamomq.StatusProcessing,
},
},
}
runTestsParallel[args, *dynamomq.RedriveMessageOutput](t, "RedriveMessage()", tests,
func(client dynamomq.Client[test.MessageData], args args) (*dynamomq.RedriveMessageOutput, error) {
runTestsParallel[args, *dynamomq.RedriveMessageOutput[test.MessageData]](t, "RedriveMessage()", tests,
func(client dynamomq.Client[test.MessageData], args args) (*dynamomq.RedriveMessageOutput[test.MessageData], error) {
return client.RedriveMessage(context.Background(), &dynamomq.RedriveMessageInput{
ID: args.id,
})
Expand Down
10 changes: 10 additions & 0 deletions dynamomq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ func MarkAsMovedToDLQ[T any](m *dynamomq.Message[T], now time.Time) {
m.InvisibleUntilAt = ""
}

func MarkAsRestoredFromDLQ[T any](m *dynamomq.Message[T], now time.Time) {
ts := clock.FormatRFC3339Nano(now)
m.QueueType = dynamomq.QueueTypeStandard
m.ReceiveCount = 0
m.UpdatedAt = ts
m.SentAt = ts
m.ReceivedAt = ""
m.InvisibleUntilAt = ""
}

func NewTestMessageItemAsReady(id string, now time.Time) *dynamomq.Message[test.MessageData] {
return dynamomq.NewMessage[test.MessageData](id, test.NewMessageData(id), now)
}
Expand Down
8 changes: 4 additions & 4 deletions internal/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Client[T any] struct {
ChangeMessageVisibilityFunc func(ctx context.Context, params *dynamomq.ChangeMessageVisibilityInput) (*dynamomq.ChangeMessageVisibilityOutput[T], error)
DeleteMessageFunc func(ctx context.Context, params *dynamomq.DeleteMessageInput) (*dynamomq.DeleteMessageOutput, error)
MoveMessageToDLQFunc func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput[T], error)
RedriveMessageFunc func(ctx context.Context, params *dynamomq.RedriveMessageInput) (*dynamomq.RedriveMessageOutput, error)
RedriveMessageFunc func(ctx context.Context, params *dynamomq.RedriveMessageInput) (*dynamomq.RedriveMessageOutput[T], error)
GetMessageFunc func(ctx context.Context, params *dynamomq.GetMessageInput) (*dynamomq.GetMessageOutput[T], error)
GetQueueStatsFunc func(ctx context.Context, params *dynamomq.GetQueueStatsInput) (*dynamomq.GetQueueStatsOutput, error)
GetDLQStatsFunc func(ctx context.Context, params *dynamomq.GetDLQStatsInput) (*dynamomq.GetDLQStatsOutput, error)
Expand Down Expand Up @@ -60,7 +60,7 @@ func (m Client[T]) MoveMessageToDLQ(ctx context.Context, params *dynamomq.MoveMe
return nil, ErrNotImplemented
}

func (m Client[T]) RedriveMessage(ctx context.Context, params *dynamomq.RedriveMessageInput) (*dynamomq.RedriveMessageOutput, error) {
func (m Client[T]) RedriveMessage(ctx context.Context, params *dynamomq.RedriveMessageInput) (*dynamomq.RedriveMessageOutput[T], error) {
if m.RedriveMessageFunc != nil {
return m.RedriveMessageFunc(ctx, params)
}
Expand Down Expand Up @@ -124,8 +124,8 @@ var SuccessfulMockClient = &Client[any]{
MoveMessageToDLQFunc: func(ctx context.Context, params *dynamomq.MoveMessageToDLQInput) (*dynamomq.MoveMessageToDLQOutput[any], error) {
return &dynamomq.MoveMessageToDLQOutput[any]{}, nil
},
RedriveMessageFunc: func(ctx context.Context, params *dynamomq.RedriveMessageInput) (*dynamomq.RedriveMessageOutput, error) {
return &dynamomq.RedriveMessageOutput{}, nil
RedriveMessageFunc: func(ctx context.Context, params *dynamomq.RedriveMessageInput) (*dynamomq.RedriveMessageOutput[any], error) {
return &dynamomq.RedriveMessageOutput[any]{}, nil
},
GetMessageFunc: func(ctx context.Context, params *dynamomq.GetMessageInput) (*dynamomq.GetMessageOutput[any], error) {
return &dynamomq.GetMessageOutput[any]{
Expand Down

0 comments on commit 203c19d

Please sign in to comment.