RabbitMQ in OpenTable


Polish Developers in London

8. Oct 2015

Łukasz Łysik / llysik@gmail.com / @lukasz_lysik

Restaurant Reservations - Free • Instant • Confirmed

Restaurant Reservations - Free • Instant • Confirmed

  Over 32k restaurants worldwide

  Over 16M reservations every month

  Over 450k reviews per month

  Over 20M sent emails per month

Transactional Messaging System (TMS)

Transactional Messaging System (TMS)

Before
After

Transactional Messaging System (TMS)

Transactional Messaging System (TMS)

RabbitMQ

RabbitMQ

  • Message broker
  • Open Source
  • Implements AMQP standard
    (Advanced Messaging Queueing Protocol)
  • Written in Erlang

RabbitMQ


rabbitmq-plugins enable rabbitmq_management
					

AMQP

Source: https://www.rabbitmq.com/tutorials/amqp-concepts.html

AMQP

Demo - RabbitMQ Simulator

Queues in TMS

Queues in TMS

Connection

Source: "RabbitMQ in Action", Jason J. W. Williams, Alvaro Videla


var factory = new ConnectionFactory { HostName = "localhost" };
var connection = factory.CreateConnection();  // Thread safe
var channel = connection.CreateModel();       // Thread unsafe
					

Declarations


channel.ExchangeDeclare(
		exchange: "logs",
		type: "fanout");
					

channel.QueueDeclare(
		queue: "hello",
		durable: false,
		exclusive: false,
		autoDelete: false,
		arguments: null);
					

channel.QueueBind(
		queue: "hello",
		exchange: "logs",
		routingKey: "");
					

Sending a message


var message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
					

var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
properties.ContentType = "application/vnd.ticket_changed.v1+json";
properties.Headers = new Dictionary<string, object>();
properties.Headers.Add("anything", "here");
					

channel.BasicPublish(
		exchange: "hello",
		routingKey: "",
		mandatory: false,
		basicProperties: properties,
		body: body);
					

Receiving a message


var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer);
					

while (true)
{
	var eventArgs = consumer.Queue.Dequeue();
	var message = Encoding.UTF8.GetString(eventArgs.Body);
	Console.WriteLine("Received: {0}", message);
}
					

// eventArgs.BasicProperties
// eventArgs.RoutingKey
					

Receiving a message - manual ACK


// noAck - "no manual acks"
channel.BasicConsume(queue: "hello", noAck: false, consumer: consumer);
					

// Process message
channel.BasicAck(eventArgs.DeliveryTag, multiple: false);
					

RPC

Source: https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

RPC - client


// Create temporary queue
var responseQueueName = channel.QueueDeclare().QueueName;

// Wait for the response
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(responseQueueName, true, consumer);
					

var correlationId = Guid.NewGuid().ToString();

var properties = channel.CreateBasicProperties();
properties.CorrelationId = correlationId;
properties.ReplyTo = responseQueueName;

channel.BasicPublish("hello", "", properties, body);
					

RPC - server


var eventArgs = consumer.Queue.Dequeue();
var response = ComputeResponse(eventArgs.Body);
					

var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = eventArgs.BasicProperties.CorrelationId;
					

channel.BasicPublish(
		exchange: "",
		routingKey: props.ReplyTo,
		basicProperties: replyProps,
		body: response);
					

Time-To-Live Extensions

Time-To-Live Extensions


// Per-Queue Message TTL
var args = new Dictionary<string, object>();
args.Add("x-message-ttl", 60000);
channel.QueueDeclare("myqueue", false, false, false, args);
					

// Per-Message TTL
var properties = new BasicProperties();
properties.Expiration = "60000";
channel.BasicPublish("my-exchange", "routing-key", properties, body);
					

Dead Letter Exchange


// Per-Queue Message TTL
var args = new Dictionary<string, object>();
args.Add("x-message-ttl", 60000);
args.Add("x-dead-letter-exchange", "some.exchange.name");
args.Add("x-dead-letter-routing-key", "some-routing-key");
channel.QueueDeclare("myqueue", false, false, false, args);
					

Demos

Questions?

Łukasz Łysik
llysik@opentable.com