Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limited sqs size #3620

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

### Changed

- **CUMULUS-3678**
- sqs messages truncated to max sqs length if necessary
- **CUMULUS-3928**
- updated publish scripting to use [email protected] for user email
- updated publish scripting to use esm over common import of latest-version
Expand Down Expand Up @@ -416,7 +418,7 @@ to update to at least [cumulus-ecs-task:2.1.0](https://hub.docker.com/layers/cum
- Updates async_operations Docker image to Node v20 and bumps its cumulus dependencies to v18.3.0 to
support `aws-sdk` v3 changes.

### Added
### Add

- **CUMULUS-3614**
- `tf-modules/monitoring` module now deploys Glue table for querying dead-letter-archive messages.
Expand Down
13 changes: 10 additions & 3 deletions packages/aws-client/src/SQS.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import {
Message,
QueueAttributeName,
ReceiveMessageCommand,
SendMessageCommand } from '@aws-sdk/client-sqs';
SendMessageCommand,
SendMessageCommandOutput
} from '@aws-sdk/client-sqs';

import { StepFunctionEventBridgeEvent } from './Lambda';
import { sqs } from './services';
Expand Down Expand Up @@ -94,12 +96,17 @@ export const sendSQSMessage = (
queueUrl: string,
message: string | object,
logOverride: Logger | undefined = undefined
) => {
): Promise<SendMessageCommandOutput> => {
const logger = logOverride || log;
let messageBody;
if (isString(message)) messageBody = message;
else if (isObject(message)) messageBody = JSON.stringify(message);
else throw new Error('body type is not accepted');
const maxLength = 262144;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How'd we come to this size?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm so that's in bytes 262,144 bytes. JS's length returns UTF-16 "units".

In UTF-16 encoding, every code unit is exact 16 bits long.
https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/String#utf-16_characters_unicode_code_points_and_grapheme_clusters:~:text=In%20UTF%2D16%20encoding%2C%20every%20code%20unit%20is%20exact%2016%20bits%20long

Is there a better way to determine the message size? If I'm thinking about this correctly (big if), the length JS would return is roughly twice as large as the max size SQS allows:

If length returns e.g. 262144 units, which are each 16 bits long, and a byte is 8 bits...

It seems like 'length' may not be appropriate here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like you're right. I've tested this with 262144 length vs 262144, but I think they're using the standard ascii/unicode split where most characters are 1 byte, but bytes >=128 are interpreted as the first of two in a unicode. I'll do some testing here to be sure and figure out how to correctly measure this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a fix for this is up now, thanks for that great catch!
there's an additional caveat that if a unicode character gets bifurcated, javascript encoding adds two more padding bytes that need to be dealt with. this latest change accounts for those things and tests them

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm is there another way to handle this besides inspecting the character length of the message? It's starting to feel like an anti-pattern if we have to continue to use string.length and accommodate for the difference between units.

I'm also a little concerned about not storing the original, untruncated message anywhere.

We handle similar limitations with messages via the CMA by dumping the full message to S3 and referencing it in the message passed between functions in a step function. Is there something similar we could do here with SQS messages?

How was the issue initially encountered? Did a user report it?

if (messageBody.length > maxLength) {
const warningString = '...TruncatedForLength';
messageBody = `${messageBody.substring(0, maxLength - warningString.length)}${warningString}`;
}

const command = new SendMessageCommand({
MessageBody: messageBody,
Expand All @@ -125,7 +132,7 @@ type ReceiveSQSMessagesOptions = {
*/
export const receiveSQSMessages = async (
queueUrl: string,
options: ReceiveSQSMessagesOptions
options: ReceiveSQSMessagesOptions = {}
etcart marked this conversation as resolved.
Show resolved Hide resolved
): Promise<SQSMessage[]> => {
const params = {
QueueUrl: queueUrl,
Expand Down
21 changes: 21 additions & 0 deletions packages/aws-client/tests/test-SQS.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const {
sqsQueueExists,
sendSQSMessage,
isSQSRecordLike,
receiveSQSMessages,
} = require('../SQS');

const randomString = () => cryptoRandomString({ length: 10 });
Expand Down Expand Up @@ -100,3 +101,23 @@ test('isSQSRecordLike filters correctly for sqs record shape', (t) => {
body *should* be a string form json object,
but strictly checking is not compatible with the current use-case*/
});

test('sendSQSMessage truncates oversized messages safely', async (t) => {
const queueName = randomString();
const queueUrl = await createQueue(queueName);
const maxLength = 262144;
const overflowMessage = '0'.repeat(maxLength + 2);
await sendSQSMessage(queueUrl, overflowMessage);

let recievedMessage = await receiveSQSMessages(queueUrl);
let messageBody = recievedMessage[0].Body;
t.true(messageBody.endsWith('...TruncatedForLength'));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you normally add negative test cases? Like if the message is under the max size, then verify the '...TruncatedForLength' message does not appear. In the event it adds it to all the messages regardless of size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good point, I'll add it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wait, the lower part of this test is doing exactly that. it could be separated though to be clear that that's wha's being tested

Copy link
Contributor

@BWex-NASA BWex-NASA Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe at the end add: t.false(messageBody.endsWith('...TruncatedForLength'));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, we're checking implicitly just by checking that "it's unchanged", but verbosity==good in testing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

t.is(messageBody.length, maxLength);

const underflowMessage = '0'.repeat(maxLength);
await sendSQSMessage(queueUrl, underflowMessage);
recievedMessage = await receiveSQSMessages(queueUrl);
messageBody = recievedMessage[0].Body;

t.deepEqual(messageBody, underflowMessage);
});