Tuesday, June 24, 2014

Simple interprocess messaging using queues (3/3)

In this last part of the tutorial we will try to answer questions raised in the previous entry: how should we approach a complex scenario involving multiple publishers, multiple subscribers and multiple message types so that messages reach only subscribers that really need them?

One of the common approaches would be to have an exchange per message type, a queue per subscriber and just binding between these. This way publishers would easily know where to publish messages and subscribers would just listen on their queues.

Setting up such scenario with RabbitMQ is possible, however with a help of a smart framework, wiring can be done automatically. We will use MassTransit, another possibility would be to use Rebus. I don’t quite like NServiceBus because of the closed, costly license.

Anyway, to work with MassTransit we first start with a model:

public class ModelItem
{
    public string Name { get; set; }
}

A simple producer using MassTransit would be like:

static void Main( string[] args )
  {
      var bus1 = ServiceBusFactory.New( sbc =>
      {
          sbc.UseRabbitMq(
              f =>
                  f.ConfigureHost(
                      new Uri( "rabbitmq://guest:guest@localhost/masstransit_producer" ),
                      c =>
                      {
                      } )
              );
 
          sbc.ReceiveFrom( "rabbitmq://guest:guest@localhost/masstransit_producer" );
 
          sbc.SetCreateMissingQueues( true );
      } );
 
      while ( true )
      {
          Console.WriteLine( "Press key to publish message " );
          Console.ReadLine();
 
          bus1.Publish( new ModelItem() { Name = DateTime.Now.ToString() } );
 
          Console.WriteLine( "Message sent" );
      }
  }

What is interesting here is that MassTransit (and other similar frameworks) provide a nice abstraction over a messaging infrastructure so you are no longer in a world of bare queues but rather in an abstract world of buses, subscriptions and publishing. Because of this, MassTransit supports multiple queuing subsystems, including RabbitMQ and MSMQ.

(An interesting question is: how the heck MassTransit is able to set up complex pub-sub scenarios over MSMQ? The answer is, it tries to use the multicast feature of MSMQ. In practice, as I’ve already mentioned, this just sometimes doesn’t work, messages are published but are not received.)

The consumer would be:

class Program
{
    static void Main( string[] args )
    {
        var bus1 = ServiceBusFactory.New( sbc =>
        {
            sbc.UseRabbitMq(
                f =>
                    f.ConfigureHost(
                        new Uri( "rabbitmq://guest:guest@localhost/masstransit_consumer" ),
                        c =>
                        {
                        } )
                );
 
            sbc.ReceiveFrom( "rabbitmq://localhost/masstransit_consumer" );
 
            sbc.SetCreateMissingQueues( true );
        } );
        var consumer1 = new SimpleMessageItemConsumer();
        bus1.SubscribeConsumer( () => consumer1 );
 
        Console.WriteLine( "Listening to messages..." );
        Console.ReadLine();
 
        bus1.Dispose();
    }
}
 
public class SimpleMessageItemConsumer : Consumes<ModelItem>.All
{
    #region All Members
 
    public void Consume( ModelItem message )
    {
        Console.WriteLine( "Message received : {0}", message.Name );
    }
 
    #endregion
}

I must admit that from the OO perspective, this would be the cleanest possible design. You have a strongly typed consumer class where the Consume method gets a deserialized instance of the message and you just define subscriptions by pointing the subscriber to an instance of the handler class.

Pros:

  • abstracts an actual queuing subsystem and can switch between different implementations
  • automates the way subscriptions are created, you no longer think in terms of queues, exchanges and binding but rather in terms of message types and subscriptions
  • automatically supports failuers in message handling, tracing, monitoring (and other features)

Cons:

  • this is an API-driven framework which means that you still need to hide it behind a well-defined facade (WCF possibly) so that clients get more interoperable communication protocol

No comments: