Skip to content

Commit

Permalink
feat!: release of v1.0.0
Browse files Browse the repository at this point in the history
Release v1.0.0
  • Loading branch information
Peter Wikström authored Feb 27, 2023
1 parent e50e814 commit 1cd0573
Show file tree
Hide file tree
Showing 22 changed files with 1,069 additions and 367 deletions.
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,25 @@ Have you found a bug or have an idea for improvement? Feel free to contribute! S
## Example usage
Given any AsyncAPI file (`AsyncAPI.yml`) first generate the client with the [AsyncAPI generator](https://github.com/asyncapi/generator) such as
```bash
ag .\asyncapi.yaml .\dotnet-rabbitmq-template\ -o .\output --force-write -p server=production
ag .\asyncapi.yaml .\dotnet-rabbitmq-template\ -o .\output --force-write
```

# How to use
The generated output shall be seen a subscriber and/or publisher of message on/from a rabbit mq broker.

## Requirements
* @asyncapi/generator < v2.0.0 >v1.1.1
* @asyncapi/generator < v2.0.0 > v1.1.1

Install the generator through [npm or run it from docker official installer](https://github.com/asyncapi/generator#install).

## Template Parameters
These are the available template parameters:
|Parameter|Type|Description|
|---|---|---|
| namespace | String | Use this parameter to specify the namespace for the generated C# client `--param "namespace=Company.Services"`, defaults to `Demo`
| user | String | Use this parameter to specify a user for for accessing the RabbitMq cluster `--param "user=username"`, defaults to `user`
| password | String | Use this parameter to specify a password for for accessing the RabbitMq cluster `--param "password=password"`, defaults to `password`

# Contributing

Before contributing please read the [CONTRIBUTING](CONTRIBUTING.md) document.
Expand Down
11 changes: 4 additions & 7 deletions components/Consumers.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import { toPascalCase } from '../utils/common';

export function Consumers({ channels }) {
if (channels.length === 0) {
return null;
}

return `protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_amqpService.${toPascalCase(channels[0].operationId)}();
return Task.CompletedTask;
}`;
return `
// Code for the subscriber: Recieves messages from RabbitMq
_amqpService.${channels[0].subscriber.operationId}();
`;
}
16 changes: 8 additions & 8 deletions components/Publishers.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import { toPascalCase } from '../utils/common';

export function Publishers({ channels }) {
if (channels.length === 0) {
return null;
}

return `protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// TODO: Send message on custom events, below is a timing example.
return `
// Code for the publisher: Send message on custom events, below is a timing example.
// A real world example would probably read temeratures from some kind of I/O temperature sensor.
/*
var rnd = new Random((int) DateTime.Now.Ticks);
while (!stoppingToken.IsCancellationRequested)
{
var message = new ${toPascalCase(channels[0].messageType)}();
_amqpService.${toPascalCase(channels[0].operationId)}(message);
var message = new ${channels[0].publisher.messageType}();
_amqpService.${channels[0].publisher.operationId}(message);
await Task.Delay(rnd.Next(500, 3000), stoppingToken);
}
}`;
*/
`;
}
6 changes: 5 additions & 1 deletion components/Worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ namespace ${params.namespace}
_amqpService = new AmqpService(configuration);
}
${childrenContent}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
${childrenContent}
return Task.CompletedTask;
}
public override void Dispose()
{
Expand Down
38 changes: 17 additions & 21 deletions components/templates/amqpservice.interface.js
Original file line number Diff line number Diff line change
@@ -1,52 +1,48 @@
import { getChannels, toPascalCase } from '../../utils/common';

const template = (publishers, consumers, params) => `using System;
const template = (channels, params) => `using System;
using ${params.namespace}.Models;
namespace ${params.namespace}.Services.Interfaces;
public interface IAmqpService : IDisposable
{
${publishers
${channels
.filter((channel) => channel.publisher)
.map(
(publisher) => `
(channel) => `
/// <summary>
/// Operations from async api specification
/// Publish operation from ${channel.routingKey}
/// </summary>
/// <param name="message">The message to be handled by this amqp operation</param>
void ${toPascalCase(publisher.operationId)}(${
publisher.messageType
} message);
void ${toPascalCase(channel.publisher.operationId)}(${toPascalCase(
channel.publisher.messageType
)} message);
`
)
.join('')}
${consumers
${channels
.filter((channel) => channel.subscriber)
.map(
(consumer) => `
(channel) => `
/// <summary>
/// Operations from async api specification
/// Subscribe operation from ${channel.routingKey}
/// </summary>
/// <param name="message">The message to be handled by this amqp operation</param>
void ${toPascalCase(consumer.operationId)}();
void ${toPascalCase(channel.subscriber.operationId)}();
`
)
.join('')}
}`;

export function IAmqpService({ asyncapi, params }) {
if (!asyncapi.hasComponents()) {
const channels = getChannels(asyncapi);

if (channels.length === 0) {
return null;
}

const publishers = getChannels(asyncapi).filter(
(channel) => channel.isPublish
);
const consumers = getChannels(asyncapi).filter(
(channel) => !channel.isPublish
);

return template(publishers, consumers, params);
return template(channels, params);
}
99 changes: 41 additions & 58 deletions components/templates/amqpservice.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
import { getChannels, toPascalCase } from '../../utils/common';
import { getChannels } from '../../utils/common';

const template = (asyncapi, params) => {
const publishers = getChannels(asyncapi).filter(
(channel) => channel.isPublish
);
const consumers = getChannels(asyncapi).filter(
(channel) => !channel.isPublish
);

const protocol = Object.entries(asyncapi.servers())
.map(([serverName, server]) => {
if (serverName === params.server) {
return server.protocol();
}
})
.join('');
const channels = getChannels(asyncapi);

if (channels.length === 0) {
return null;
}

return `using System;
using System.Collections.Generic;
Expand Down Expand Up @@ -51,64 +42,59 @@ public class AmqpService : IAmqpService
var factory = new ConnectionFactory
{
Uri = new Uri($"${protocol}://{user}:{password}@{host}"),
Uri = new Uri($"amqps://{user}:{password}@{host}"),
RequestedHeartbeat = TimeSpan.FromSeconds(10)
};
_connection = factory.CreateConnection();
_channelPool = ChannelPool.Create(_connection);
}
${publishers
${channels
.filter((channel) => channel.publisher)
.map(
(publisher) => `/// <summary>
(channel) => `/// <summary>
/// Operations from async api specification
/// </summary>
/// <param name="message">The message to be handled by this amqp operation</param>
public void ${toPascalCase(publisher.operationId)}(${
publisher.messageType
} message)
public void ${channel.publisher.operationId}(${channel.publisher.messageType} message)
{
var exchange = "${publisher.exchange}";
var routingKey = "${publisher.routingKey}";
var exchange = "${channel.exchange}";
var routingKey = "${channel.routingKey}";
var channel = _channelPool.GetChannel("${toPascalCase(
publisher.operationId
)}");
var channel = _channelPool.GetChannel("${channel.publisher.operationId}");
var exchangeProps = new Dictionary<string, object>
{
{"CC", "${publisher.cc}"},
{"BCC", "${publisher.bcc}"},
{"alternate-exchange", "${publisher.alternateExchange}"},
{"CC", "${channel.publisher.cc}"},
{"BCC", "${channel.publisher.bcc}"},
{"alternate-exchange", "${channel.publisher.alternateExchange}"},
};
channel.ExchangeDeclare(
exchange: exchange, // exchange.name from channel binding
type: "${publisher.exchangeType}", // type from channel binding
${publisher.isDurable}, // durable from channel binding
${publisher.isAutoDelete}, // autoDelete from channel binding
type: "${channel.publisher.exchangeType}", // type from channel binding
${channel.isDurable}, // durable from channel binding
${channel.isAutoDelete}, // autoDelete from channel binding
exchangeProps);
var props = channel.CreateBasicProperties();
props.CorrelationId = $"{Guid.NewGuid()}";
props.ReplyTo = "${publisher.replyTo}";
props.DeliveryMode = Byte.Parse("${publisher.deliveryMode}");
props.Priority = ${publisher.priority};
props.ReplyTo = "${channel.publisher.replyTo}";
props.DeliveryMode = Byte.Parse("${channel.publisher.deliveryMode}");
props.Priority = ${channel.publisher.priority};
props.Timestamp = new AmqpTimestamp(DateTimeOffset.UnixEpoch.Ticks);
props.Expiration = "${publisher.expiration}";
props.Expiration = "${channel.publisher.expiration}";
_logger.Verbose("Sending message {@${
publisher.messageType
}} with correlation id {CorrelationId}",
_logger.Verbose("Sending message {@${channel.publisher.messageType}} with correlation id {CorrelationId}",
message,
props.CorrelationId);
var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message));
channel.BasicPublish(exchange: exchange,
routingKey: routingKey,
mandatory: ${publisher.mandatory},
mandatory: ${channel.publisher.mandatory},
basicProperties: props,
body: body);
}
Expand All @@ -117,32 +103,31 @@ public class AmqpService : IAmqpService
)
.join('')}
${consumers
${channels
.filter((channel) => channel.subscriber)
.map(
(consumer) => `public void ${toPascalCase(consumer.operationId)}()
(channel) => `public void ${channel.subscriber.operationId}()
{
var queue = "${consumer.queue}"; // queue from specification
var channel = _channelPool.GetChannel("${toPascalCase(
consumer.operationId
)}");
var queue = "${channel.queue}"; // queue from specification
var channel = _channelPool.GetChannel("${channel.subscriber.operationId}");
// TODO: declare passive?
channel.QueueDeclare(queue);
// IMPORTANT! If the routing key contains {some-parameter-name}
// you must change the routing key below to something meaningful for amqp service listening for messages.
// For demo purposes you can just replace it with the wildcard '#' which means it recieves
// all messages no matter what the parameter is.
channel.QueueBind(queue: queue,
exchange: "${consumer.exchange}",
routingKey: "${consumer.routingKey}");
exchange: "${channel.exchange}",
routingKey: "${channel.routingKey}");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (_, ea) =>
{
var body = ea.Body.ToArray();
var message = JsonSerializer.Deserialize<${
consumer.messageType
}>(Encoding.UTF8.GetString(body));
_logger.Verbose("${toPascalCase(
consumer.messageType
)} received, {@${toPascalCase(consumer.messageType)}}", message);
var message = JsonSerializer.Deserialize<${channel.subscriber.messageType}>(Encoding.UTF8.GetString(body));
_logger.Verbose("${channel.subscriber.messageType} received, {@${channel.subscriber.messageType}}", message);
try
{
Expand All @@ -152,9 +137,7 @@ public class AmqpService : IAmqpService
}
catch (Exception e)
{
_logger.Error(e, "Something went wrong trying to process message {@${toPascalCase(
consumer.messageType
)}},", message);
_logger.Error(e, "Something went wrong trying to process message {@${channel.subscriber.messageType}},", message);
channel.BasicReject(ea.DeliveryTag, false);
}
};
Expand All @@ -176,7 +159,7 @@ public class AmqpService : IAmqpService
};

export function AmqpService({ asyncapi, params }) {
if (!asyncapi.hasComponents()) {
if (!asyncapi.hasChannels()) {
return null;
}

Expand Down
4 changes: 0 additions & 4 deletions components/templates/channelpool.interface.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,5 @@ public interface IChannelPool : IDisposable
}`;

export function IChannelPool({ asyncapi, params }) {
if (!asyncapi.hasComponents()) {
return null;
}

return template(asyncapi, params);
}
2 changes: 1 addition & 1 deletion components/templates/channelpool.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public class ChannelPool : IChannelPool
};

export function ChannelPool({ asyncapi, params }) {
if (!asyncapi.hasComponents()) {
if (!asyncapi.hasChannels()) {
return null;
}

Expand Down
Loading

0 comments on commit 1cd0573

Please sign in to comment.