December 29, 2013

Rabbit MQ Message Queues

Last time, I talked about setting up Rabbit MQ on a Mac, now I will go into the different types of queues that you can set up and how they work. First type of queue being worker queues.

In Rabbit MQ, there are several different types of queues. The first 2 message queues they describe are here.. The first one is just a hello world example, but let me describe the second one, worker queues. There are 2 parts, a message sender and a message receiver. Let’s start off with the sender:

 public void Send(T msg) where T : class { using (var connection = new ConnectionFactory() { HostName = "192.168.10.103", UserName = "test", Password = "test" }.CreateConnection()) { using (var channel = Connection.CreateModel()) { System.Xml.Serialization.XmlSerializer x = new System.Xml.Serialization.XmlSerializer(msg.GetType()); channel.QueueDeclare(QueueName, true, false, false, null); TextWriter writer = new StringWriter(); x.Serialize(writer, msg); var body = Encoding.UTF8.GetBytes(writer.ToString()); IBasicProperties basicProperties = channel.CreateBasicProperties(); basicProperties.DeliveryMode = 2; channel.BasicPublish("", QueueName, basicProperties, body); } } } </pre> 

The first line is the connection to the Rabbit MQ service. This is really simple, just host, username and password.

The next using statement line is the session for your connection. This is important because of how message queues work. Message queues have to keep a session, and if they don’t, then the message you are working on will not be acknowledged. If a message isn’t acknowledged, then it will go back to the queue. This makes sure that the messages you want processed, will be processed.

channel.QueueDeclare function creates the queue and is more of a setup for any consumers later. The last 2 lines are for creating a persistent message, and to publish the bytes of the message to the queue we created. So you can see, it’s not too bad to work with. Next, let’s look at the receiver:

 public class QueueMessageEventArgs { public string Message { get; private set; } public QueueMessageEventArgs(string message) { Message = message; } } public class RabbitMQReceiver { static readonly Semaphore semaphore = new Semaphore(2,2); public IConnection Connection { get; private set; } public string QueueName { get; private set; } public bool IsStarted { get; private set; } private EventingBasicConsumer consumer; private IModel model; public IDictionary<string, string> MessageStringProperties { get; private set; } public RabbitMQReceiver(IConnection connection, string queueName, IDictionary<string, string> messageStringProperties) { Connection = connection; QueueName = queueName; MessageStringProperties = messageStringProperties; consumer = new EventingBasicConsumer(); IsStarted = false; } public RabbitMQReceiver(IConnection connection, string queueName) { Connection = connection; QueueName = queueName; MessageStringProperties = null; consumer = new EventingBasicConsumer(); IsStarted = false; } public event EventHandler MessageReceived; public void Start() { if (!IsStarted) { IsStarted = true; model = Connection.CreateModel(); consumer.Received += consumer\_Received; IDictionary<string, object> dict = null; if (MessageStringProperties != null) { dict = MessageStringProperties.ToDictionary(k => k.Key, j => (object)j.Value); } model.QueueDeclare(QueueName, true, false, false, dict); model.BasicQos(0, 1, false); model.BasicConsume(QueueName, true,consumer); } } void consumer\_Received(IBasicConsumer sender, BasicDeliverEventArgs args) { semaphore.WaitOne(); Task T = Task.Factory.StartNew( () => { var message = Encoding.UTF8.GetString(args.Body); int dots = message.Split('.').Length - 1; var arg = new QueueMessageEventArgs(message); MessageReceived.Invoke(this, arg); semaphore.Release(); }); } public void Stop() { if (IsStarted) { IsStarted = false; model.Close(); model.Dispose(); consumer.Received -= consumer_Received; } } public void Dispose() { Stop(); } } </pre> 

Now that is a shit load of code, but it’s to help with the structure of it. If you look at the tutorial site for worker queue, there is a while loop which makes everything complicated with blocking. In my example, I use a different consumer type, EventingBasicConsumer, which allows for a non-blocking version of listening to a worker queue.

The start function initializes listening to the EventingBasicConsumer. In start, we attach to the received event, and that is where we will read the message back from Rabbit MQ. The message comes back as a byte array, like we sent it, so we need to convert it back to string.

So that is it for creating a message queue. The next article I make, I will create a pub sub system and describe the remaining examples.