Skip to main content
Other posts in this series can be found on the Series Index Page

Introduction

We previously looked at the IDataFlowBlock interface in it’s simplest implementation ActionBlock<TInput>, today though we are going to look at more complex implementations that give amazing powers.

BatchBlock<T>

CropperCapture[1]Where ActionBlock<TInput> could only be a subscriber, BatchBlock<T> is more than that – it can be a subscriber and a publisher rolled into one! As such the usage is rather different, we do not pass in the receive method in the constructor. Rather we call the Receive method to get the latest messages past to the subscriber.

The interesting thing about BatchBlock is it batches up messages into groups, so rather than getting each message one at a time, you get groups of messages. As such, you could wait a long time for enough the messages to arrive, so thankfully they also include a ReceiveAsync which works with C# new async methods.

In the sample below you can see how I create a BatchBlock with a batch size of 3, so messages get processed in groups of three. This also means the last two messages (since we are sending 20) are never processed.

static BatchBlock<string> pubSub = new BatchBlock<string>(3);
static int counter = 0;

static async void Process()
{
    while (true)
    {
        var messages = await pubSub.ReceiveAsync();
        foreach (var item in messages)
        {
            counter++;
            Console.WriteLine("Got message number {0}: {1}", counter, item);
        }
    }
}

static void Main(string[] args)
{
    Process();

    for (int i = 0; i < 20; i++)
    {
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(new Random().Next(200, 1000));
            pubSub.Post(DateTime.Now.ToLongTimeString());
        });
    }

    Console.ReadLine();
}

 

BatchedJoinBlock<T1,T2>

CropperCapture[1]The second block implementation we look at is the BatchedJoinBlock<T1,T2> which is similar to BatchBlock<T> except, that it has multiple type inputs. This allows you to have a single publisher that can send messages to different subscribers based on type! The batching works the same as before, but be careful as batch sizes are based on all messages regardless of types.

static BatchedJoinBlock<string, int> pubSub = new BatchedJoinBlock<string, int>(3);
static int stringCounter = 0;
static int intCounter = 0;

static async void Process()
{
    while (true)
    {
        var messages = await pubSub.ReceiveAsync();
        foreach (var item in messages.Item1)
        {
            stringCounter++;
            Console.WriteLine("Got STRING message number {0}: {1}", stringCounter, item);
        }

        foreach (var item in messages.Item2)
        {
            intCounter++;
            Console.WriteLine("Got INT message number {0}: {1}", intCounter, item);
        }
    }
}

static void Main(string[] args)
{
    Process();

    for (int i = 0; i < 20; i++)
    {
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(new Random().Next(200, 1000));
            pubSub.Target1.SendAsync(DateTime.Now.ToLongTimeString());
        });

        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(new Random().Next(200, 1000));
            pubSub.Target2.SendAsync(new Random().Next(1, 99));
        });
    }

    Console.ReadLine();
}

Multiple Subscribers

CropperCapture[2]So what happens when you add multiple subscribers to the system? It handles each processing in a round robin like way. The sample below is the same as the BatchBlock<T> about, but  has three subscribers (A, B & C) and a batch size of two.

static BatchBlock<string> pubSub = new BatchBlock<string>(2);
static int counter = 0;

static async void Process(string id)
{
    while (true)
    {
        var messages = await pubSub.ReceiveAsync();
        foreach (var item in messages)
        {
            counter++;
            Console.WriteLine("{2} - Got message number {0}: {1}", counter, item, id);
        }
    }
}

static void Main(string[] args)
{
    Process("A");
    Process("B");
    Process("C");

    for (int i = 0; i < 11; i++)
    {
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(new Random().Next(200, 1000));
            pubSub.Post(DateTime.Now.ToLongTimeString());
        });
    }

    Console.ReadLine();
}
File attachments