Skip to content

Commit

Permalink
Merge pull request #14 from 7codeRO/feature/add-sqs-and-kafka-modules
Browse files Browse the repository at this point in the history
Added SqsModule and KafkaModule as showcase
  • Loading branch information
GarryOne authored Nov 20, 2023
2 parents 029efe9 + be1dcfc commit 812b62a
Show file tree
Hide file tree
Showing 8 changed files with 2,280 additions and 13 deletions.
2,144 changes: 2,131 additions & 13 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"prisma:seed": "ts-node prisma/seed.ts"
},
"dependencies": {
"@aws-sdk/client-sqs": "^3.454.0",
"@nestjs/common": "^9.3.9",
"@nestjs/config": "^2.3.1",
"@nestjs/core": "^9.3.9",
Expand All @@ -35,6 +36,7 @@
"cors": "^2.8.5",
"crypto": "^1.0.1",
"dotenv": "^8.2.0",
"kafkajs": "^2.2.4",
"passport": "^0.4.1",
"passport-jwt": "^4.0.0",
"reflect-metadata": "^0.1.13",
Expand Down
30 changes: 30 additions & 0 deletions src/modules/kafka/kafka.consumer.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import {
Consumer,
ConsumerRunConfig,
ConsumerSubscribeTopics,
Kafka,
} from 'kafkajs';

@Injectable()
export class ConsumerService implements OnApplicationShutdown {
private readonly kafka = new Kafka({
brokers: ['localhost:9092'],
});

private readonly consumers: Consumer[] = [];

async consume(topic: ConsumerSubscribeTopics, config: ConsumerRunConfig) {
const consumer = this.kafka.consumer({ groupId: 'nestjs-kafka' });
await consumer.connect();
await consumer.subscribe(topic);
await consumer.run(config);
this.consumers.push(consumer);
}

async onApplicationShutdown(signal?: string) {
for (const consumer of this.consumers) {
await consumer.disconnect();
}
}
}
9 changes: 9 additions & 0 deletions src/modules/kafka/kafka.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Module } from '@nestjs/common';
import { ProducerService } from './kafka.producer.service';
import { ConsumerService } from './kafka.consumer.service';

@Module({
providers: [ProducerService, ConsumerService],
exports: [ProducerService, ConsumerService],
})
export class KafkaModule {}
27 changes: 27 additions & 0 deletions src/modules/kafka/kafka.producer.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import {
Injectable,
OnApplicationShutdown,
OnModuleInit,
} from '@nestjs/common';
import { Kafka } from 'kafkajs';

@Injectable()
export class ProducerService implements OnModuleInit, OnApplicationShutdown {
private readonly kafka = new Kafka({
brokers: ['localhost:9092'],
});

private readonly producer = this.kafka.producer();

async onModuleInit() {
await this.producer.connect();
}

async produce(record: any) {
await this.producer.send(record);
}

async onApplicationShutdown(signal?: string) {
await this.producer.disconnect();
}
}
7 changes: 7 additions & 0 deletions src/modules/sqs/sqs.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { Controller, Get } from '@nestjs/common';
import { SQSService } from './sqs.service';

@Controller()
export class SqsController {
constructor(private readonly sqsService: SQSService) {}
}
11 changes: 11 additions & 0 deletions src/modules/sqs/sqs.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { Logger, Module } from '@nestjs/common';
import { SQSService } from './sqs.service';
import { SqsController } from './sqs.controller';

@Module({
imports: [],
controllers: [SqsController],
providers: [SQSService, Logger],
exports: [SQSService],
})
export class SqsModule {}
63 changes: 63 additions & 0 deletions src/modules/sqs/sqs.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { Injectable, Logger } from '@nestjs/common';
import {
DeleteMessageCommand,
ReceiveMessageCommand,
SQSClient,
} from '@aws-sdk/client-sqs';

@Injectable()
export class SQSService {
private readonly logger = new Logger(SQSService.name);
private sqs: SQSClient;
private indexingJobProgressQueue = 'queue_url';

constructor() {
this.sqs = new SQSClient({
region: 'us-east-1',
});
}

public async deleteMessage(message) {
const deleteParams = {
QueueUrl: this.indexingJobProgressQueue,
ReceiptHandle: message.ReceiptHandle,
};

const deleteCommand = new DeleteMessageCommand(deleteParams);
try {
await this.sqs.send(deleteCommand);
this.logger.log('Message deleted successfully.');
} catch (err) {
this.logger.error('Error deleting message:', err);
}
}

public async receiveMessages(callback) {
const params = {
QueueUrl: this.indexingJobProgressQueue,
// MaxNumberOfMessages: 5, // Maximum number of messages to receive (1-10)
WaitTimeSeconds: 1, // maximum 20 seconds
};

const receiveCommand = new ReceiveMessageCommand(params);

try {
const response = await this.sqs.send(receiveCommand);
const messages = response.Messages || [];

if (messages.length === 0) {
this.logger.log('No messages received.');
return;
}

messages.forEach((message) => {
this.logger.log('Received message:', message.Body);
callback(message);
});
} catch (err) {
this.logger.error('Error receiving messages:', err);
} finally {
await this.receiveMessages(callback);
}
}
}

0 comments on commit 812b62a

Please sign in to comment.