Technologies

Exercise 


# Definition

  • A passenger may request a taxi ride, sending their current location and their desired destination.
  • The system will pick one or more available taxi drivers (prefer drivers that are close to the passenger).
  • The taxi driver can accept or decline rides.
  • The system makes sure that only one taxi driver is assigned to pick up a passenger.
  • A taxi driver is in one of the following states:
    • Unavailable
    • Available
    • OfferedRide
    • OnRouteToPassenger
    • OnRouteToDestination
  • A taxi driver will at all times report its current location and state to the system.
  • Passengers will receive an invoice upon a completed ride, based on the distance and a rate (€/km).
  • The system will log all messages.

# Assignment

  • Decide on an architecture.
    • Architecture diagrams created in class are available in the SEW teams channel and may be used for reference.
    • You can still refine your architecture
    • Provide an updated system overview diagram in your upload.

RabbitMq_AnderTaxi 2024-12-10 08.08.02.excalidraw

⚠ Switch to EXCALIDRAW VIEW in the MORE OPTIONS menu of this document. ⚠ You can decompress Drawing data with the command palette: ‘Decompress current Excalidraw file’. For more info check in plugin settings under ‘Saving’

Excalidraw Data

Text Elements

TaxiStateService

Polling Taxi State (+ Location)

PassengerService

Request all States

Responds

DriverService

closest Taxis

Taxi

State

Offer: Accept/Decline Location + Destination

Responds

Cancel Remaining Offers

Client

Location + Destination

Location

Completed Ride Taxi

InvoiceService

Location + Destination

Invoice

LogService

All Actions

1

2

3

4

5

6

6

7

8

9

Link to original

Elements:

  • User
  • Taxi
  • ClosestTaxiService
    • finds taxis, that are nearby the passenger’s location
  • TaxiStateService
    • polling
    • delivers the current state of the taxi
  • TaxiManagementService
    • central service for managing taxi-requests, offers and assignments
    • guarantees: one taxi per passenger
  • InvoiceService
    • service, that handles the creation of invoices

  • Implement the messaging infrastructure in RabbitMQ
    • Create and configure all necessary exchanges and queues to ensure message delivery to the individual services in your system.
    • No upload needed, but students may be picked to showcase their configuration.
docker run -d --hostname rabbitmq --name rabbit-server -p 8080:15672 -p 5672:5672 rabbitmq:3-management

Login

  • guest
  • guest

# Code

RabbitMqService.cs

using System.Text;  
using RabbitMQ.Client.Events;  
  
namespace AnderTaxi_RabbitMq;  
  
using RabbitMQ.Client;  
  
public class RabbitMqService  
{  
    private readonly string _rabbitMqHost = "localhost";  
    // topic exchange  
    private readonly string _exchangeName = "ride_service_exchange";  
  
    // 4 queues with routing-keys  
    private readonly Dictionary<string, string> _queueBindings = new()  
    {  
        { "log_service_queue", "log.#" },  
        { "invoice_service_queue", "invoice.#" },  
        { "passenger_service_queue", "passenger.#" },  
        { "driver_service_queue", "driver.#" }  
    };  
  
    // creation - topic-exchanges + bind queues with routing-keys  
    public async Task ConfigureMessagingInfrastructure()  
    {  
        ConnectionFactory factory = new()  
        {  
            Uri = new Uri("amqp://guest:guest@localhost:5672"),  
            HostName = _rabbitMqHost  
        };  
  
        await using var cnn = await factory.CreateConnectionAsync();  
        var channel = await cnn.CreateChannelAsync();  
        await channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Topic);  
  
        // create queues - bind with routing-keys  
        try  
        {  
            foreach (var (queue, routingKey) in _queueBindings)  
            {  
                Console.WriteLine($"Declaring queue: {queue} with routingKey: {routingKey}");  
                await channel.QueueDeclareAsync(queue: queue, durable: false, exclusive: false, autoDelete: false);  
                await channel.QueueBindAsync(queue: queue, exchange: _exchangeName, routingKey: routingKey);  
                Console.WriteLine($"Queue {queue} declared and bound.");  
            }  
        }        catch (Exception ex)  
        {  
            Console.WriteLine($"Error creating or binding queue: {ex.Message}");  
        }  
  
        Console.WriteLine("RabbitMQ infrastructure configured successfully.");  
    }  
  
    public async Task PublishMessage(string routingKey, string message)  
    {  
        ConnectionFactory factory = new()  
        {  
            Uri = new Uri("amqp://guest:guest@localhost:5672"),  
            HostName = _rabbitMqHost  
        };  
  
        await using var cnn = await factory.CreateConnectionAsync();  
        var channel = await cnn.CreateChannelAsync();  
  
        var body = Encoding.UTF8.GetBytes(message);  
  
        await channel.BasicPublishAsync(  
            exchange: _exchangeName,  
            routingKey: routingKey,  
            mandatory: false,  
            body: body  
        );  
  
        Console.WriteLine($"Message published: {message} with routingKey: {routingKey}");  
    }  
  
    public async Task<string> ConsumeMessages(string queueName)  
    {  
        ConnectionFactory factory = new()  
        {  
            Uri = new Uri("amqp://guest:guest@localhost:5672"),  
            HostName = _rabbitMqHost,  
            VirtualHost = "/"  
        };  
  
        await using var cnn = await factory.CreateConnectionAsync();  
        var channel = await cnn.CreateChannelAsync();  
  
        var message = string.Empty;  
        var consumer = new AsyncEventingBasicConsumer(channel);  
        consumer.ReceivedAsync += (sender, args) =>  
        {  
            var body = args.Body.ToArray();  
            message = Encoding.UTF8.GetString(body);  
            Console.WriteLine($"Message received in {queueName}: {message}");  
            return Task.CompletedTask;  
        };  
          
        var consumerTag = await channel.BasicConsumeAsync(queueName, true, consumer);  
  
        await Task.Delay(5000);  
          
        await channel.BasicCancelAsync(consumerTag);  
  
        Console.WriteLine($"Started consuming messages from {queueName}");  
  
        return message;  
    }  
}

PublishRequest.cs

namespace AnderTaxi_RabbitMq;  
  
public class PublishRequest  
{  
    public string RoutingKey { get; set; }  
    public string Message { get; set; }  
}

Program.cs

using System.Text;  
using AnderTaxi_RabbitMq;  
using RabbitMQ.Client;  
  
var builder = WebApplication.CreateBuilder(args);  
  
builder.Services.AddSingleton<RabbitMqService>();  
  
var app = builder.Build();  
  
var rabbitMqService = app.Services.GetRequiredService<RabbitMqService>();  
await rabbitMqService.ConfigureMessagingInfrastructure();  
  
app.MapGet("/", () => "RabbitMQ Infrastructure Configured!");  
  
app.MapPost("/publish", async (PublishRequest request) =>  
{  
    var rabbitMqService = app.Services.GetRequiredService<RabbitMqService>();  
    await rabbitMqService.PublishMessage(request.RoutingKey, request.Message);  
    return Results.Ok($"Message published to routing key {request.RoutingKey}: {request.Message}");  
});  
  
app.MapGet("/consume/{queueName}", async (string queueName) =>  
{  
    var rabbitMqService = app.Services.GetRequiredService<RabbitMqService>();  
    var message = await rabbitMqService.ConsumeMessages(queueName);  
    return Results.Ok($"Consuming messages from queue {queueName}: {message}");  
});  
  
app.Run();

/


# Exchange & Queues

As you can see, the ride_service_exchange got created

Also our 4 queues got created.


# Sending Messages

Here we send an invoice.

  • The structure is based on the PublishRequest class

Sender:

{
  "routingKey": "invoice.user1",
  "message": "5,99€"
}

Receiver:

/consume/invoice_service_queue

Here we get the price sent above.


  • Visualize your RabbitMQ infrastructure
    • Upload a diagram showing all the exchanges, queues and routing configurations in your RabbitMQ message broker.

# Topicexchange

From publisher to consumer.

  • ride_service_exchange
    • log_service_queue
    • passenger_service_queue
    • invoice_service_queue
    • driver_service_queue

RabbitMq_AnderTaxi 2025-01-04 21.54.34.excalidraw

⚠ Switch to EXCALIDRAW VIEW in the MORE OPTIONS menu of this document. ⚠ You can decompress Drawing data with the command palette: ‘Decompress current Excalidraw file’. For more info check in plugin settings under ‘Saving’

Excalidraw Data

Text Elements

Publisher

taxiservice.topic

Message

invoice.user1

logservice_queue

invoiceservice_queue

passengerservice_queue

driverservice_queue

^lVZfsRxb

passenger.#

invoice.#

driver.#

Topicexchange

Consumer

Consumer

Consumer

Consumer

5,99€

5,99€

distanceservice_queue

distance.#

Consumer

Exchange

Routing Key

Queues

taxi.location.direct

taxi.state.topic

taxi.management.direct

invoice.fanout

taxi.closest.queue

taxi.state.request.queue

request.state

taxi.offer.queue

invoice.queue

logging.queue

driverstateservice_queue

state.#

Consumer

Link to original