Friday, March 1, 2019

Topic Based Pub/Sub Data Bus with MassTransit

In my previous blog entry I've discussed the problem of using a single envelope-like data type in a data bus. I've presented a nice solution that uses topic-based subscriptions available in Rebus.FM framework. I've also promised to show how to do the same in MassTransit.
The problem with MassTransit is that you need a clever trick to set up all the bindings as MT doesn't support topic based subscriptions that easy. The trick is to have two layers of exchanges:
  • the first layer of exchanges consists of a single exchange that is used by the publisher to put messages into the RabbitMQ
  • the second layer of exchanges consists of multiple exchanges bound to the single exchange from the first layer by routing keys
and queues bound to exchanges from the second layer:
To get this, we need models
    public class MessageEnvelope
    {
        // will help to build routing keys
        public string MessageTypeName { get; set; }

        // the payload serialized to JSON
        public string Content { get; set; }

        public MessageEnvelope()
        {

        }

        public MessageEnvelope(object payload)
        {
            if (payload == null) throw new ArgumentNullException();

            this.Content         = JsonConvert.SerializeObject(payload);
            this.MessageTypeName = payload.GetType().Name;
        }

        public T GetPayload()
        {
            if (string.IsNullOrEmpty(this.Content)) throw new ArgumentException();

            return JsonConvert.DeserializeObject(this.Content);
        }
    }

    public static class EnvelopedExtensions
    {
        public static string ToJson(this object payload)
        {
            if (payload == null) throw new ArgumentNullException();

            return JsonConvert.SerializeObject(payload);
        }
    }

    public class Foo
    {
        public string Data { get; set; }
    }

    public class Bar
    {
        public string Something { get; set; }
    }
as well as the handler. Note that the handler is almost identical to the one from the previous blog entry about Rebus.FM. In fact, the only notable difference is naming of the interface you implement.
    public class Enveloped_Handler : IConsumer
    {
        private string agent { get; set; }
        public Enveloped_Handler(string agent)
        {
            this.agent = agent;
        }

        public async Task Consume(ConsumeContext context)
        {
            var message = context.Message;

            await Console.Out.WriteLineAsync($"{message.MessageTypeName} received by {this.agent}\r\n{message.ToJson()}");
        }
    }
Then, the actual code that setups the exchanges, queues and bindings and publishes messages.
   class Program
    {
        static IBusControl ConfigureActivator(string queueName, int workers = 1)
        {
            var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri("rabbitmq://localhost"), h =>
                {
                    h.Username("username");
                    h.Password("password");
                });

                if (queueName == "publisher")
                {
                    // take routing keys from the property
                    cfg.Send(x => x.UseRoutingKeyFormatter(context => context.Message.MessageTypeName ));
                    // publish to this exchange
                    cfg.Message(x => x.SetEntityName("MessageEnvelope"));
                    // use direct binding between the exchanges
                    cfg.Publish(x => { x.ExchangeType = ExchangeType.Direct; });
                }
                else
                {
                    cfg.ReceiveEndpoint(host, queueName, e =>
                    {
                        e.PrefetchCount = (ushort)workers;

                        switch (queueName)
                        {
                            case "subscriber1":

                                e.BindMessageExchanges = false;
                                e.Bind("MessageEnvelope", x =>
                                {
                                    x.ExchangeType = ExchangeType.Direct;
                                    x.RoutingKey = "Foo";
                                });


                                e.Consumer(() => new Enveloped_Handler("agent1"));
                                break;
                            case "subscriber2":

                                e.BindMessageExchanges = false;
                                e.Bind("MessageEnvelope", x =>
                                {
                                    x.ExchangeType = ExchangeType.Direct;
                                    x.RoutingKey = "Foo";
                                });
                                e.Bind("MessageEnvelope", x =>
                                {
                                    x.ExchangeType = ExchangeType.Direct;
                                    x.RoutingKey = "Bar";
                                });


                                e.Consumer(() => new Enveloped_Handler("agent2"));
                                break;
                        }
                        //e.con
                    });
                }
            });

            bus.Start();

            return bus;
        }

        static async Task Work()
        {
            var publisherBus   = ConfigureActivator("publisher");
            var subscriber1Bus = ConfigureActivator("subscriber1", 1);
            var subscriber2Bus = ConfigureActivator("subscriber2", 1);

            Console.WriteLine("publishing");

            await publisherBus.Publish(new MessageEnvelope( CreateFoo()) );
            await publisherBus.Publish(new MessageEnvelope( CreateBar()) );

            Console.WriteLine("published");
          
            Console.WriteLine("Press enter to quit");
            Console.ReadLine();
        }

        static Foo CreateFoo()
        {
            return new Foo()
            {
                Data = "hello foo world"
            };
        }

        static Bar CreateBar()
        {
            return new Bar()
            {
                Something = "and this is bar"
            };
        }

        static void Main(string[] args)
        {
            Work().Wait();
        } 
    }
The output of this code will be
publishing
published
Press enter to quit
Foo received by agent2
{"MessageTypeName":"Foo","Content":"{\"Data\":\"hello foo world\"}"}
Foo received by agent1
{"MessageTypeName":"Foo","Content":"{\"Data\":\"hello foo world\"}"}
Bar received by agent2
{"MessageTypeName":"Bar","Content":"{\"Something\":\"and this is bar\"}"}
If there's a simpler way to have this effect in MassTransit, in particular, if it's possible to have topic based subscriptions between exchanges and queues, with no this extra layer of exchanges, please let me know in the comments section.