Skip to content

Commit

Permalink
feat: Test 100K plan with AWS SQS (#493)
Browse files Browse the repository at this point in the history
  • Loading branch information
storm1729 authored Jan 5, 2025
1 parent db4b4b9 commit 7f0c9c4
Show file tree
Hide file tree
Showing 7 changed files with 969 additions and 62 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"start": "next start"
},
"dependencies": {
"@aws-sdk/client-sqs": "^3.716.0",
"@formatjs/intl-localematcher": "^0.5.2",
"@geist-ui/react": "^2.2.5",
"@geist-ui/react-icons": "^1.0.1",
Expand Down
2 changes: 1 addition & 1 deletion src/app/[lang]/dashboard/bulk/BulkHistory.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export function BulkHistory(props: {
}

setBulkJobs(res.data);
}, 3000);
}, 10_000);
}, [supabase]);

const renderStatus: TableColumnRender<Tables<"bulk_jobs_info">> = (
Expand Down
6 changes: 2 additions & 4 deletions src/app/[lang]/pricing/Plans.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ import type {
ProductWithPrice,
SubscriptionWithPrice,
} from "@/supabase/supabaseServer";

// Currently the 100K plan per month is disabled.
const SHOW_100K = false;
import { ENABLE_BULK } from "@/util/helpers";

interface PlansProps {
d: Dictionary;
Expand Down Expand Up @@ -73,7 +71,7 @@ export function Plans({ d, products, subscription, isLoggedIn }: PlansProps) {
}
/>
</Grid>
{SHOW_100K && (
{ENABLE_BULK && (
<Grid xs={20} sm={6}>
<SaaS100k
d={d}
Expand Down
62 changes: 8 additions & 54 deletions src/app/api/v1/bulk/route.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { NextRequest } from "next/server";
import amqplib from "amqplib";
import { supabaseAdmin } from "@/supabase/supabaseAdmin";
import { sentryException } from "@/util/sentry";
import { ENABLE_BULK, getWebappURL } from "@/util/helpers";
import { ENABLE_BULK } from "@/util/helpers";
import { isEarlyResponse } from "@/app/api/v0/check_email/checkUserInDb";
import { SAAS_100K_PRODUCT_ID } from "@/util/subs";
import { Json, Tables } from "@/supabase/database.types";
import { cookies } from "next/headers";
import { createClient } from "@/supabase/server";
import { getSubAndCalls } from "@/supabase/supabaseServer";
import { sendEmailsToSQS } from "./sqs";

interface BulkPayload {
input_type: "array";
Expand All @@ -23,7 +23,7 @@ export const POST = async (req: NextRequest): Promise<Response> => {
error: "Bulk verification is not enabled",
},
{
status: 403,
status: 503,
}
);
}
Expand Down Expand Up @@ -83,7 +83,7 @@ export const POST = async (req: NextRequest): Promise<Response> => {
throw res1.error;
}
const bulkJob = res1.data[0];
console.log(`[💪] Created job ${bulkJob.id}`);
console.log(`[💪 Supabase] Created job ${bulkJob.id}`);

// Split Array in chunks of 5000 so that Supabase can handle it.
const chunkSize = 5000;
Expand All @@ -109,59 +109,13 @@ export const POST = async (req: NextRequest): Promise<Response> => {
bulkEmails = bulkEmails.concat(res2.data);

console.log(
`[💪] Inserted job ${bulkJob.id} chunk ${i++}/${chunks.length}`
`[💪 Supabase] Inserted job ${bulkJob.id} chunk ${i++}/${
chunks.length
}`
);
}

const conn = await amqplib
.connect(process.env.RCH_AMQP_ADDR || "amqp://localhost")
.catch((err) => {
const message = `Error connecting to RabbitMQ: ${
(err as AggregateError).errors
? (err as AggregateError).errors
.map((e) => e.message)
.join(", ")
: err.message
}`;

throw new Error(message);
});

const ch1 = await conn.createChannel().catch((err) => {
throw new Error(`Error creating RabbitMQ channel: ${err.message}`);
});
const queueName = `check_email.Smtp`; // TODO
await ch1.assertQueue(queueName, {
maxPriority: 5,
});

bulkEmails.forEach(({ email, id }) => {
ch1.sendToQueue(
queueName,
Buffer.from(
JSON.stringify({
input: {
to_email: email,
},
webhook: {
url: `${getWebappURL()}/api/v1/bulk/webhook`,
extra: {
bulkEmailId: id,
userId: user.id,
endpoint: "/v1/bulk",
},
},
})
),
{
contentType: "application/json",
priority: 1,
}
);
});

await ch1.close();
await conn.close();
await sendEmailsToSQS(bulkEmails, user.id);

return Response.json({
bulk_job_id: bulkJob.id,
Expand Down
73 changes: 73 additions & 0 deletions src/app/api/v1/bulk/sqs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { Tables } from "@/supabase/database.types";
import { SQSClient, SendMessageBatchCommand } from "@aws-sdk/client-sqs";
import type { CheckEmailInput } from "@reacherhq/api";
import type { WebhookExtra } from "./webhook/route";
import { getWebappURL } from "@/util/helpers";

// Initialize the SQS client
const sqsClient = new SQSClient({ region: "eu-west-3" });

const queueUrl =
"https://sqs.eu-west-3.amazonaws.com/430118836964/check-email-queue";

// Matches this Rust struct:
// https://github.com/reacherhq/check-if-email-exists/blob/v0.10.1/backend/src/worker/do_work.rs#L34
type CheckEmailTask = {
input: CheckEmailInput;
job_id: {
bulk: number;
};
webhook?: {
on_each_email?: {
url: string;
headers: Record<string, string | undefined>;
extra: WebhookExtra;
};
};
};

export const sendEmailsToSQS = async (
bulkEmails: Tables<"bulk_emails">[],
userId: string
) => {
const command = new SendMessageBatchCommand({
QueueUrl: queueUrl,
Entries: bulkEmails.map((bulkEmail) => ({
Id: bulkEmail.id.toString(),
MessageBody: JSON.stringify(bulkEmailToTask(bulkEmail, userId)),
})),
});

const response = await sqsClient.send(command);
console.log(
"[💪 SQS] Message sent successfully:",
JSON.stringify(response)
);
};

function bulkEmailToTask(
bulkEmail: Tables<"bulk_emails">,
userId: string
): CheckEmailTask {
return {
input: {
to_email: bulkEmail.email,
},
job_id: {
bulk: bulkEmail.bulk_job_id,
},
webhook: {
on_each_email: {
url: `${getWebappURL()}/api/v1/bulk/webhook`,
headers: {
"x-reacher-secret": process.env.RCH_HEADER_SECRET,
},
extra: {
bulkEmailId: bulkEmail.id,
userId,
endpoint: "/v1/bulk",
},
},
},
};
}
2 changes: 1 addition & 1 deletion src/util/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,4 @@ export function convertPgError(err: PostgrestError): Error {
);
}

export const ENABLE_BULK: boolean = false;
export const ENABLE_BULK: boolean = true;
Loading

0 comments on commit 7f0c9c4

Please sign in to comment.