Tuesday, June 24, 2014

Simple interprocess messaging using queues (3/3)

In this last part of the tutorial we will try to answer questions raised in the previous entry: how should we approach a complex scenario involving multiple publishers, multiple subscribers and multiple message types so that messages reach only subscribers that really need them?

One of the common approaches would be to have an exchange per message type, a queue per subscriber and just binding between these. This way publishers would easily know where to publish messages and subscribers would just listen on their queues.

Setting up such scenario with RabbitMQ is possible, however with a help of a smart framework, wiring can be done automatically. We will use MassTransit, another possibility would be to use Rebus. I don’t quite like NServiceBus because of the closed, costly license.

Anyway, to work with MassTransit we first start with a model:

public class ModelItem
{
    public string Name { get; set; }
}

A simple producer using MassTransit would be like:

static void Main( string[] args )
  {
      var bus1 = ServiceBusFactory.New( sbc =>
      {
          sbc.UseRabbitMq(
              f =>
                  f.ConfigureHost(
                      new Uri( "rabbitmq://guest:guest@localhost/masstransit_producer" ),
                      c =>
                      {
                      } )
              );
 
          sbc.ReceiveFrom( "rabbitmq://guest:guest@localhost/masstransit_producer" );
 
          sbc.SetCreateMissingQueues( true );
      } );
 
      while ( true )
      {
          Console.WriteLine( "Press key to publish message " );
          Console.ReadLine();
 
          bus1.Publish( new ModelItem() { Name = DateTime.Now.ToString() } );
 
          Console.WriteLine( "Message sent" );
      }
  }

What is interesting here is that MassTransit (and other similar frameworks) provide a nice abstraction over a messaging infrastructure so you are no longer in a world of bare queues but rather in an abstract world of buses, subscriptions and publishing. Because of this, MassTransit supports multiple queuing subsystems, including RabbitMQ and MSMQ.

(An interesting question is: how the heck MassTransit is able to set up complex pub-sub scenarios over MSMQ? The answer is, it tries to use the multicast feature of MSMQ. In practice, as I’ve already mentioned, this just sometimes doesn’t work, messages are published but are not received.)

The consumer would be:

class Program
{
    static void Main( string[] args )
    {
        var bus1 = ServiceBusFactory.New( sbc =>
        {
            sbc.UseRabbitMq(
                f =>
                    f.ConfigureHost(
                        new Uri( "rabbitmq://guest:guest@localhost/masstransit_consumer" ),
                        c =>
                        {
                        } )
                );
 
            sbc.ReceiveFrom( "rabbitmq://localhost/masstransit_consumer" );
 
            sbc.SetCreateMissingQueues( true );
        } );
        var consumer1 = new SimpleMessageItemConsumer();
        bus1.SubscribeConsumer( () => consumer1 );
 
        Console.WriteLine( "Listening to messages..." );
        Console.ReadLine();
 
        bus1.Dispose();
    }
}
 
public class SimpleMessageItemConsumer : Consumes<ModelItem>.All
{
    #region All Members
 
    public void Consume( ModelItem message )
    {
        Console.WriteLine( "Message received : {0}", message.Name );
    }
 
    #endregion
}

I must admit that from the OO perspective, this would be the cleanest possible design. You have a strongly typed consumer class where the Consume method gets a deserialized instance of the message and you just define subscriptions by pointing the subscriber to an instance of the handler class.

Pros:

  • abstracts an actual queuing subsystem and can switch between different implementations
  • automates the way subscriptions are created, you no longer think in terms of queues, exchanges and binding but rather in terms of message types and subscriptions
  • automatically supports failuers in message handling, tracing, monitoring (and other features)

Cons:

  • this is an API-driven framework which means that you still need to hide it behind a well-defined facade (WCF possibly) so that clients get more interoperable communication protocol

Simple interprocess messaging using queues (2/3)

In the second entry of our messaging tutorial we will create a producer and a consumer using the RabbitMQ server in the middle.

RabbitMQ is a reliable messaging subsystem that supports the Advanced Message Queuing Protocol (AMQP). Because of this, RabbitMQ is a great choice for interoperable scenarios – the AMQP protocol is supported on multiple platforms and if for some reason there is still no implementations for a platform of your choice, you are free to create one (as AMQP just sits on top of the TCP).

RabbitMQ is a separate download but the installation is a breeze. The server also contains a web-based administration console that listens on the 15672 port (http://localhost:15672) (the console has to be enabled first).

RabbitMQ introduces an important concept that MSMQ lacks.

There are not only queues subscribers listen on but also exchanges publishers could alternatively use to publish messages. By separating publishing from subscribing and defining bindings between exchanges and queues, complex scenarios can be created where a message published on a single exchange is automatically delivered to multiple queues.

It still raises some difficulties, however. It is because there are not only multiple publishers, multiple subscribers but also – multiple message types. You still need an idea how to model the scenario. Should you create a queue for each subscriber? Or a queue per message type? Or a queue per subscriber and message type? What about exchanges then? We will try to answer these questions in the next part of the tutorial.

This time the code involves the RabbitMQ.Client library that can be accessed via NuGet. A simple producer would be:

class Program
{
    static void Main( string[] args )
    {
        var factory = new ConnectionFactory() 
        { 
            HostName = "localhost"                
        };
        using ( var connection = factory.CreateConnection() )
        {
            using ( var channel = connection.CreateModel() )
            {
                channel.QueueDeclare( "hello", false, false, false, null );
 
                string message = "Hello World!";
                var body = Encoding.UTF8.GetBytes( message );
 
                while ( true )
                {
                    Console.WriteLine( "Press key to send message" );
                    Console.ReadLine();
 
                    channel.BasicPublish( "", "hello", null, body );
 
                    Console.WriteLine( " [x] Sent {0}", message );
                }
            }
        }
    }
}

Note that the BasicPublish method publishes a byte array so that the structure of the message can be arbitrary but has to be interpreted in the same way at the consumer side. Note also that this basic example doesn’t involve any exchanges, rather, the publisher publishes the message directly onto a queue.

A simple consumer:

class Program
{
    static void Main( string[] args )
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare("hello", false, false, false, null);
 
                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume("hello", true, consumer);
 
                Console.WriteLine(" [*] Waiting for messages." +
                                         "To exit press CTRL+C");
                while (true)
                {
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
 
                    var body    = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] Received {0}", message);
                }
            }
        }
    }
}

Pros:

  • easy to install and maintain
  • multicast is supported out of the box because of the separation of queues from exchanges
  • can be clustered and/or mirrored

Cons:

  • multiple ways to model complex scenarios, you have to decide on your own

Simple interprocess messaging using queues (1/3)

This entry opens a 3-part tutorial on simple publish-subscribe implementations in .NET. We will try three different approaches:

  • MSMQ
  • RabbitMQ
  • MassTransit over MSMQ/RabbitMQ

and try to compare their pros and cons.

Messaging using queues is considered an important architectural pattern. It still provides the flexibility of direct interoperable interprocess communication but because of a queue in the middle between entities, heavy load peaks no longer raise immediate performance issues.

Each of our simple examples will involve a producer and a consumer. The producer will create messages and publish them onto the messaging subsystem. The consumer will await incoming messages and just print them on the console.

We start with MSMQ. Microsoft Message Queues is a well-established implementation of messaging infrastructure for Windows. Queues are persistent and there is a built-in administration snap-in available. It also supports transactions and multicasting.

The producer:

class Program
{
    static void Main( string[] args )
    {
        if ( !MessageQueue.Exists( @".\Private$\ExampleQueue" ) )
            MessageQueue.Create( @".\Private$\ExampleQueue" );
 
        MessageQueue mq = new MessageQueue( @".\Private$\ExampleQueue" );
 
        while ( true )
        {
            Console.WriteLine( "Press key to send a message" );
            Console.ReadLine();
 
            mq.Send( "Hello world form MSMQ", "Title" );
 
            Console.WriteLine( "Message sent" );
        }
 
        Console.ReadLine();
    }
}

Although in this example I am just sending a string, the Send method accepts any object. It tries to serialize it and puts it into a queue, in this example the ExampleQueue.

The consumer:

class Program
{
    static void Main( string[] args )
    {
        if ( !MessageQueue.Exists( @".\Private$\ExampleQueue" ) )
            MessageQueue.Create( @".\Private$\ExampleQueue" );
 
        MessageQueue mq = new MessageQueue( @".\Private$\ExampleQueue" );
 
        while ( true )
        {
            Console.WriteLine( "Listening to messages..." );
 
            Message msg = mq.Receive();
            msg.Formatter = new XmlMessageFormatter( new [] { typeof( string ) } );
 
            Console.WriteLine( "Message says: {0}", msg.Body );
        }
 
        Console.WriteLine( "Message sent" );
        Console.ReadLine();
    }
}

The consumer uses a blocking Receive method (note that there is also an async BeginReceive). What is interesting there is that a formatter can be attached to received message so that instead of reading the BodyStream and manually deserializing the contents, the message object does deserialization on its own and the Body property contains a reference to the deserialized message.

Such simple example raises an immediate question on how to build a more complicated scenario involving multiple publishers and multiple subscribers and some subscription rules. Unfortunately, there are no simple answers to the question.

In theory, the multicast feature should be an answer but in practice, we have found that it just doesn’t work sometimes. It’s like it works for days and then suddenly messages are not delivered and the server has to be restarted. Despite hours spent on digging into the issue, we haven’t found any single reason for the issue.

Pros:

  • MSMQ is built into the operating system, no need to install any third-party components
  • the API is a part of the Base Class Library
  • supports sending complex objects

Cons: