Docker Container:
docker run -d --hostname rabbitmq --name rabbit-server -p 8080:15672 -p 5672:5672 rabbitmq:3-management
# Sender & Receiver
When working with Message Queues (RabbitMQ) there is a Sender and a Receiver.
Sender
- ”producer”
- sends message into exchange>queue
Receiver
- ”consumer”
- take the message(s) from the queue and consume it
# Create Sender
Exchange, Queues, Messages. Here we create a Sender, that allows connecting to the queue and sending messages to the queue.
// Webserver :15672; Sender :5672
// Connection String
ConnectionFactory factory = new()
{
Uri = new Uri("amqp://guest:guest@localhost:5672"),
ClientProvidedName = "Rabbit App"
};
await using var cnn = await factory.CreateConnectionAsync();
var channel = await cnn.CreateChannelAsync();
var exchangeName = "DemoExchange";
var routingKey = "demo-routing-key";
var queueName = "DemoQueue";
await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct);
await channel.QueueDeclareAsync(queueName, false, false, false, null);
await channel.QueueBindAsync(queueName, exchangeName, routingKey, null);
var messageBodyBytes = Encoding.UTF8.GetBytes("Hello World!");
await channel.BasicPublishAsync(
exchange: exchangeName,
routingKey: routingKey,
mandatory: false,
body: messageBodyBytes
);
await channel.CloseAsync();
await cnn.CloseAsync();
Nun haben wir einen Demo-Exchange mit einer DemoQueue angelegt:
In der Queue befindet sich bereits eine Message:
Nun haben wir den Sender konfiguriert, allerdings fehlt uns noch der Receiver, der die Message ausliest und verarbeitet.
# Create Receiver
ConnectionFactory factory = new()
{
Uri = new Uri("amqp://guest:guest@localhost:5672"),
ClientProvidedName = "Rabbit Receiver1 App"
};
await using var cnn = await factory.CreateConnectionAsync();
var channel = await cnn.CreateChannelAsync();
var exchangeName = "DemoExchange";
var routingKey = "demo-routing-key";
var queueName = "DemoQueue";
await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct);
await channel.QueueDeclareAsync(queueName, false, false, false, null);
await channel.QueueBindAsync(queueName, exchangeName, routingKey, null);
// prefetchSize: 0 - we don't care how big the message is
// prefetchCount: 1 - process one message at a time
// global: false - we apply this only for this instance
await channel.BasicQosAsync(0, 1, false);
// consuming the message - decode it as a string
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (sender, args) =>
{
var body = args.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Message Received: {message}");
// Acknowledgement, that this message was delivered successfully
channel.BasicAckAsync(args.DeliveryTag, false);
return Task.CompletedTask;
};
var consumerTag = await channel.BasicConsumeAsync(queueName, false, consumer);
// cancel the consumer
await channel.BasicCancelAsync(consumerTag);
await channel.CloseAsync();
await cnn.CloseAsync();
Message is consumed: