Łukasz Łysik / llysik@gmail.com / @lukasz_lysik
Restaurant Reservations - Free • Instant • Confirmed
Restaurant Reservations - Free • Instant • Confirmed
Over 32k restaurants worldwide
Over 16M reservations every month
Over 450k reviews per month
Over 20M sent emails per month
rabbitmq-plugins enable rabbitmq_management
Source: https://www.rabbitmq.com/tutorials/amqp-concepts.html
Demo - RabbitMQ Simulator
Source: "RabbitMQ in Action", Jason J. W. Williams, Alvaro Videla
var factory = new ConnectionFactory { HostName = "localhost" };
var connection = factory.CreateConnection(); // Thread safe
var channel = connection.CreateModel(); // Thread unsafe
channel.ExchangeDeclare(
exchange: "logs",
type: "fanout");
channel.QueueDeclare(
queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(
queue: "hello",
exchange: "logs",
routingKey: "");
var message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
properties.ContentType = "application/vnd.ticket_changed.v1+json";
properties.Headers = new Dictionary<string, object>();
properties.Headers.Add("anything", "here");
channel.BasicPublish(
exchange: "hello",
routingKey: "",
mandatory: false,
basicProperties: properties,
body: body);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer);
while (true)
{
var eventArgs = consumer.Queue.Dequeue();
var message = Encoding.UTF8.GetString(eventArgs.Body);
Console.WriteLine("Received: {0}", message);
}
// eventArgs.BasicProperties
// eventArgs.RoutingKey
// noAck - "no manual acks"
channel.BasicConsume(queue: "hello", noAck: false, consumer: consumer);
// Process message
channel.BasicAck(eventArgs.DeliveryTag, multiple: false);
Source: https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html
// Create temporary queue
var responseQueueName = channel.QueueDeclare().QueueName;
// Wait for the response
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(responseQueueName, true, consumer);
var correlationId = Guid.NewGuid().ToString();
var properties = channel.CreateBasicProperties();
properties.CorrelationId = correlationId;
properties.ReplyTo = responseQueueName;
channel.BasicPublish("hello", "", properties, body);
var eventArgs = consumer.Queue.Dequeue();
var response = ComputeResponse(eventArgs.Body);
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = eventArgs.BasicProperties.CorrelationId;
channel.BasicPublish(
exchange: "",
routingKey: props.ReplyTo,
basicProperties: replyProps,
body: response);
// Per-Queue Message TTL
var args = new Dictionary<string, object>();
args.Add("x-message-ttl", 60000);
channel.QueueDeclare("myqueue", false, false, false, args);
// Per-Message TTL
var properties = new BasicProperties();
properties.Expiration = "60000";
channel.BasicPublish("my-exchange", "routing-key", properties, body);
// Per-Queue Message TTL
var args = new Dictionary<string, object>();
args.Add("x-message-ttl", 60000);
args.Add("x-dead-letter-exchange", "some.exchange.name");
args.Add("x-dead-letter-routing-key", "some-routing-key");
channel.QueueDeclare("myqueue", false, false, false, args);