February 09, 2014

Rabbit MQ Topics and Pub Sub

So it has been a while since part 1 and part 2 of my Rabbit MQ series, but I definitely wanted to rap up my rabbling about Rabbit MQ and all the different configurations that could happen with the Message Queue. In this article, I will talk about how Topics, pub sub and routings work and what they are best at, when considering your architecture.

The best part about this whole talk is the idea that this is how message queues work. Even if you do find yourself using a different message queue, the idea of queues, topics, pub sub and routings are still the main focus. Today, I plan to talk about the last topics (no pun intended) in this discussion: pub sub, routings and Topics.

Pub Sub

Pub sub is actually Publish/subscribe and in the rabbit mq space, you can find more information here. Think of this as the simplest approach to message broadcasting.

Let’s say, I have a news feed that I want to send, I will give it a category, USA. Now, if I want to hear news for USA, I will create a USA queue. What I have to do, as a subscriber, is tell the exchange to fan out all messages, about USA, to my queue. Any messages about the other countries, will not reach this queue, so they must be listen to specifically. Also, any other people who want to listen to the USA, can, however, if no one is using the USA queue, it will simply disappear. This approach makes more sense for simple chat systems, games, and routing systems to other queues, like topics or routing.

Routing

The problem with publish and subscribe is that the message disappears. What if you need a persistent messaging system? This is where routing come in. So, for example, what if have need of a logging system, where I can monitor errors, but the rest goes into the database, let’s say for history. Now, what I can do is set up 2 events, one error, and one All Logs. Both create a queue, and have an exchange, but unlike the previous pub sub model, those messages will be waiting until something processes those requests and “acknowledges” the message. The acknowledgment tells the queue to dump the message off of the queue.

This makes perfect sense for a system that wants to process all messages, but only a certain event. This makes sense for routing messages, but does not have ANY flexibility in finer grain filtering. You could create 5 queues, but what if you need more types of filtering on the fly? This is where topics can help.

Topics

For topics, I will be talking mostly from this. Topics provide ALL of the benefits that are above, so your message is persisted and you can add as much filtering as you want. From experience, this is what you typically want to use for any message broadcasting. Below is some code I wrote to demonstrate topics:

 //send a message public void Send(T msg, string topic) where T : class { using (var channel = Connection.CreateModel()) { System.Xml.Serialization.XmlSerializer x = new System.Xml.Serialization.XmlSerializer(msg.GetType()); channel.ExchangeDeclare(QueueName, ExchangeType.Topic); TextWriter writer = new StringWriter(); x.Serialize(writer, msg); var body = Encoding.UTF8.GetBytes(writer.ToString()); IBasicProperties basicProperties = channel.CreateBasicProperties(); channel.BasicPublish(QueueName, topic, basicProperties, body); } } //listen for a message public class RabbitMQTopicReceiver : IMessageTopicReceiver { public IConnection Connection { get; private set; } public string QueueName { get; private set; } public string Topic { get; private set; } public bool IsStarted { get; private set; } private EventingBasicConsumer consumer; private IModel model; public IDictionary<string, string> MessageStringProperties { get; private set; } public RabbitMQTopicReceiver(IConnection connection, string queueName, string topic, IDictionary<string, string> messageStringProperties) { Connection = connection; QueueName = queueName; MessageStringProperties = messageStringProperties; Topic = topic; consumer = new EventingBasicConsumer(); IsStarted = false; } public RabbitMQTopicReceiver(IConnection connection, string queueName, string topic) { Connection = connection; QueueName = queueName; MessageStringProperties = null; Topic = topic; consumer = new EventingBasicConsumer(); IsStarted = false; } public event EventHandler MessageReceived; public void Start() { if (!IsStarted) { IsStarted = true; model = Connection.CreateModel(); consumer.Received += consumer\_Received; IDictionary<string, object> dict = null; if (MessageStringProperties != null) { MessageStringProperties.ToDictionary(k => k.Key, j => (object)j.Value); } model.ExchangeDeclare(QueueName, ExchangeType.Topic); var queueName = model.QueueDeclare(); model.QueueBind(queueName, QueueName, Topic, dict); model.BasicConsume(queueName, true,consumer); } } void consumer\_Received(IBasicConsumer sender, BasicDeliverEventArgs args) { var message = Encoding.UTF8.GetString(args.Body); int dots = message.Split('.').Length - 1; //sender.Model.BasicAck(args.DeliveryTag, false); var arg = new TopicMessageEventArgs(message); MessageReceived.Invoke(this, arg); } public void Stop() { if (IsStarted) { IsStarted = false; model.Close(); model.Dispose(); consumer.Received -= consumer\_Received; } } public void Dispose() { Stop(); } } </pre> 

The important things to note are in the ExchangeDeclare function. It contains Topic, for creating a topic message queue. If you wanted to do Pub Sub, just change it to Fanout or if you want Routing, just change it to Direct. There may be some tweaks that you need to do, but thats about it. If you want to check out more on my example, you can go to my github page for this code example.