Thursday, February 21, 2019

Topic Based Pub/Sub Data Bus with Rebus.FM

We've been using messaging middlewares for years and it was always like "we don't want to be coupled to any specific implementation, rather we'd like to abstract the idea so that we could upgrade the internals when necessary". I remember first experiments with NServiceBus back in 2009/2010 but due to various reasons we've picked MassTransit. Using such higher-level approach has various advantages, for us, one of the thing was that we were able to migrate all deployments from MSMQ to RabbitMQ, thus gaining better performance and reliability.
One of issues that quickly turned out to be important is how the high-level engine uses the queuing subsystem. At the lower lever, where RabbitMQ operates, you have exchanges bound to queues and binary messages passed down the pipeline. At the higher level you have strongly typed contracts (classes) and strongly typed message handlers. A high-level engine needs a clean way of mapping its high level concepts to the wire level. Mass Transit does it in a straightforward manner:
Note: When a consumer is connected to a receive endpoint, the combined set of message types consumed by all of the consumers connected to the same receive endpoint are *subscribed* to the queue. The subscription method varies by broker, in the case of RabbitMQ exchange bindings are created for the message types to the exchange/queue for the receive endpoint. These subscriptions are persistent, and remain in place after the process exits. This ensures that messages published or sent that would be delivered to one of the receive endpoint consumers are saved even is the process is terminated. When the process is started, messages waiting in the queue will be delivered to the consumer(s).
What this actually means? Well, an exchange is created for each data type. In the diagram above we have two data types, Foo and Bar and thus two exchanges in RabbitMQ. Then, for each consumer, a queue is created. In the diagram we have two consumers and thus two queues.
When it comes to publishing, MassTransit just puts the message in the appropriate exchange. When it comes to subscription configuration, it uses the fanout model between exchanges and queues. In the above diagram, the Consumer 1 subscribes both message types while the Consumer 2 subscribes the Bar only. Consumers just read messages from their respective queues.
The model works great however it has its disadvantages. One of the major drawbacks here is how you set up subscriptions for consumers. It turns out that you write strongly typed handlers where the type has to be known in advance:
    public class Foo_Handler : IConsumer
    {
        public async Task Consume(ConsumeContext context)
        {
            var message = context.Message;

            // do whatever I like with and instance of Foo
        }
    }
This looks great, the handler can access the actual message in a strongly-typed fashion but ...

... all these handlers have to be either written down in an explicit way for each data type or I need a clever dynamic code generator to handle multiple data types. What's worse, my "bus" doesn't even sometimes know all the data types that are used, or rather - I don't want to recompile the data bus each time a new type appears in the family.
One of the possible approaches to the "family of data types that are not necessarily known and can change" issue involves a single "envelope-like" data type. A type that envelopes actual messages. Something like:
    // A generic envelope type
    public class MessageEnvelope
    {
        public string MessageTypeName { get; set; }

        // the actual payload serialized to JSON
        public string Content { get; set; }

        public MessageEnvelope()
        {

        }

        public MessageEnvelope(object payload)
        {
            if (payload == null) throw new ArgumentNullException();

            this.Content         = JsonConvert.SerializeObject(payload);
            this.MessageTypeName = payload.GetType().Name;
        }

        public T GetPayload()
        {
            if (string.IsNullOrEmpty(this.Content)) throw new ArgumentException();

            return JsonConvert.DeserializeObject(this.Content);
        }
    }
Suddenly, it's much easier to maintain the growing list of possible data types. I could handle types that are not known at the time the core library is compiled - the envelope will happily handle a payload of any type.
Unfortunately, while tempting, this approach has a significant disadvantage - please review the diagram above once again and imagine what happens in the RabbitMQ:
It's no longer that attractive. Now I end up with a single exchange that pushes incoming messages to all queues where agents pick up messages and have to filter them, according to the actual payload. And guess what, this slows down the pipeline! Suppose I have a lot of consumers, not two but, say, 50. A message of type Foo is published into the pipeline but since it's wrapped and is of the MessageEnvelope type, it's effectively copied to all queues (this is how the fanout works). Then, multiple consumers pick it just to realize that only few of them, maybe even a single one, subscribe this particular message. All other consumers just discard the message. Now, let's count. A single message, let's say few megabytes, copied by RabbitMQ to 50 different queues. Things go into hundred of megabytes and this is just a single message. Publish multiple messages and I have my bottleneck.
Fortunately, a solution exists that still lets me have a single envelope for all my messages but will not clog RabbitMQ with multiple messages. The solution involves topic mapping. I could still have a single exchange and multiple consumer queues. This time however, each message is published together with the topic and the binding between the exchange and queues involves explicit topis
How to set up the topic mapping in a high-level engine? Well, it's not that easy in MassTransit, unfortunately, and involves a trick I will present in another blog entry. This time I am going to show the actual code of how this particular set up works in Rebus, an engine much younger that the two introduced at the very beginning of this article. Younger doesn't mean it lacks features, though. What's nice is that it supports more transports than MassTransit (e.g. you don't even need a messaging middleware since the bare SQL Server can be used!) and supports topic based subscriptions easy.
Here's the code. We start with two dummy message types and the MessageEnvelope class presented above. A simple extension class will be used to format messages to the console.
    public class Foo
    {
        public string Data { get; set; }
    }

    public class Bar
    {
        public string Something { get; set; }
    }

    public static class EnvelopedExtensions
    {
        public static string ToJson( this object payload )
        {
            if (payload == null) throw new ArgumentNullException();

            return JsonConvert.SerializeObject(payload);
        }
    }

Now, the handler, the high-level concept of a consumer code that processes actual messages
    public class MessageEnvelope_Handler : IHandleMessages
    {
        private IBus bus { get; set; }
        private string agent { get; set; }
     
        public MessageEnvelope_Handler(IBus bus, string agent)
        {
            this.bus   = bus;
            this.agent = agent;
        }

        public async Task Handle(MessageEnvelope message)
        {
            var context = MessageContext.Current;

            await Console.Out.WriteLineAsync($"{message.MessageTypeName} received by {this.agent}\r\n{message.ToJson()}");
            
        }
    }
Then, a little configuration helper method to remove code duplication:
        static IBus ConfigureActivator(
            BuiltinHandlerActivator activator, 
            string queueName, 
            int workers = 1, 
            int parallel = 1)
        {
            var bus = Configure.With(activator)
                .Transport(t => t.UseRabbitMq("amqp://username:password@localhost", queueName))
                .Logging(l => l.ColoredConsole(LogLevel.Warn))
                .Options(conf =>
                {
                    conf.SetNumberOfWorkers(workers);
                    conf.SetMaxParallelism(parallel);
                })
                .Start();

            return bus;
        }
The actual orchestration
       static async Task Work()
        {
            using (var publisher   = new BuiltinHandlerActivator())
            using (var subscriber1 = new BuiltinHandlerActivator())
            using (var subscriber2 = new BuiltinHandlerActivator())
            {
                var publisherBus   = ConfigureActivator(publisher, "publisher");
                var subscriber1Bus = ConfigureActivator(subscriber1, "subscriber1", 2, 2);
                var subscriber2Bus = ConfigureActivator(subscriber2, "subscriber2", 2, 2);

                subscriber1.Register(() => new MessageEnvelope_Handler(subscriber1Bus, "agent1"));
                subscriber2.Register(() => new MessageEnvelope_Handler(subscriber2Bus, "agent2"));

                await subscriber1Bus.Advanced.Topics.Subscribe("Foo");
                await subscriber2Bus.Advanced.Topics.Subscribe("Foo");
                await subscriber2Bus.Advanced.Topics.Subscribe("Bar");

                Console.WriteLine("publishing");

                await publisherBus.Advanced.Topics.Publish("Foo", new MessageEnvelope(CreateFoo()));
                await publisherBus.Advanced.Topics.Publish("Bar", new MessageEnvelope(CreateBar()));

                Console.WriteLine("published");

                Console.WriteLine("Press enter to quit");
                Console.ReadLine();

            }
        }
As you can see, the Rebus' topic subscription API is used and then explicit topics are used to publish messages. Because of that, the very same message type, MessageEnvelope can be reused and still, RabbitMQ configures the binding correctly:
And this is the output of this simple demo:
publishing
published
Press enter to quit
Bar received by agent2
{"MessageTypeName":"Bar","Content":"{\"Something\":\"and this is bar\"}"}
Foo received by agent2
{"MessageTypeName":"Foo","Content":"{\"Data\":\"hello foo world\"}"}
Foo received by agent1
{"MessageTypeName":"Foo","Content":"{\"Data\":\"hello foo world\"}"}
This works as expected. The message of type Foo was handler by both consumers while Bar was handled by the second consumer only. At the price of the topic-based binding (vs the fanout binding) I have a fine-grained control over what happens in RabbitMQ and still have my single envelope type to wrap actual payloads.