Skip to main content

.NET 4.5 Baby Steps, Part 5: Some more interesting blocks

Other posts in this series can be found on the Series Index Page

Introduction

We have seen the IDataFlowBlock, in three different implementations and now we will look at a few more.

BroadcastBlock<T>

In the BatchBlock we saw that if you had multiple subscribers, messages are delivered to subscribers in a round robin way, but what about if you want to send the same message to all providers? The solution is the BoardcastBlock<T>.

static BroadcastBlock<string> pubSub = new BroadcastBlock<string>(s =>
    {
        return s + " relayed from publisher";
    });

static async void Process()
{
    var message = await pubSub.ReceiveAsync();
    Console.WriteLine(message);
}

static void Main(string[] args)
{
    // setup 5 subscribers
    for (int i = 0; i < 5; i++)
    {
        Process();
    }

    pubSub.Post(DateTime.Now.ToLongTimeString());

    Console.ReadLine();
}
CropperCapture[1]

TransformBlock<TInput,TOutput>

The next interesting block is the transform block which works similar to the action block, except that the input and output can be different types and so we can transform the data internally.

static TransformBlock<int, string> pubSub = new TransformBlock<int, string>(i =>
    {
        return string.Format("we got: {0}", i);
    });

static async void Process()
{
    while (true)
    {
        var message = await pubSub.ReceiveAsync();
        Console.WriteLine(message);
    }
}

static void Main(string[] args)
{
    Process();
     
    for (int i = 0; i < 10; i++)
    {            
        pubSub.Post(i);
        Thread.Sleep(1000);
    }

    Console.ReadLine();
}
CropperCapture[1]
File attachments

.NET 4.5 Baby Steps, Part 6: ISourceBlock

Other posts in this series can be found on the Series Index Page

Introduction

We have spent a number of posts looking at IDataFlowBlock, BatchedBlocks and some other interesting blocks but as a final interesting bit for blocks is the ISourceBlock interface which is implemented on the “publisher” like blocks and it has a very interesting method: LinkTo. The goal of LinkTo is to chain blocks together to do interesting and amazing things!

It's in the name

The final interesting point, and likely the most important is in the name of the assembly all this goodness comes from: System.Threading.Tasks.DataFlow. All of this is handled my multiple tasks (and likely multiple threads) under the covers without you needing to worry about it. When we look at the TPL in .NET 4 and how it made working with multi-threaded processing easier, the final goal of this is to make multi-threaded data processing easier - and it does!

Demo LinkTo

In this demo what we want to do is take in DateTime’s as fast as possible, then convert them to strings, and send them in batches to the three subscribers. Sounds complex, but with the ability to LinkTo this is very easy.

static BufferBlock<DateTime> bufferPublisher = new BufferBlock<DateTime>();
static TransformBlock<DateTime, string> transformPublisher = new TransformBlock<DateTime, string>(d =>
    {
        return d.ToLongTimeString();
    });
static BatchBlock<string> batchPublisher = new BatchBlock<string>(5);

static ActionBlock<string[]> subscriber1 = new ActionBlock<string[]>(s =>
    {
        Console.WriteLine("Subscriber 1: {0}", s);
    });
static ActionBlock<string[]> subscriber2 = new ActionBlock<string[]>(s =>
{
    Console.WriteLine("Subscriber 2: {0}", s);
});

static ActionBlock<string[]> subscriber3 = new ActionBlock<string[]>(s =>
{
    Console.WriteLine("Subscriber 3: {0}", s);
});


static void Main(string[] args)
{
    batchPublisher.AsObservable().Subscribe(subscriber1.AsObserver());
    batchPublisher.AsObservable().Subscribe(subscriber2.AsObserver());
    batchPublisher.AsObservable().Subscribe(subscriber3.AsObserver());

    transformPublisher.LinkTo(batchPublisher);
    bufferPublisher.LinkTo(transformPublisher);

    for (int i = 0; i < 100; i++)
    {
        bufferPublisher.Post(DateTime.Now);
        Thread.Sleep(200);
    }

    Console.ReadLine();
}
CropperCapture[1]
File attachments
LinkTo Demo (7.75 KB)

.NET 4.5 Baby Steps, Part 4: BatchedBlocks

Other posts in this series can be found on the Series Index Page

Introduction

We previously looked at the IDataFlowBlock interface in it’s simplest implementation ActionBlock<TInput>, today though we are going to look at more complex implementations that give amazing powers.

BatchBlock<T>

CropperCapture[1]Where ActionBlock<TInput> could only be a subscriber, BatchBlock<T> is more than that – it can be a subscriber and a publisher rolled into one! As such the usage is rather different, we do not pass in the receive method in the constructor. Rather we call the Receive method to get the latest messages past to the subscriber.

The interesting thing about BatchBlock is it batches up messages into groups, so rather than getting each message one at a time, you get groups of messages. As such, you could wait a long time for enough the messages to arrive, so thankfully they also include a ReceiveAsync which works with C# new async methods.

In the sample below you can see how I create a BatchBlock with a batch size of 3, so messages get processed in groups of three. This also means the last two messages (since we are sending 20) are never processed.

static BatchBlock<string> pubSub = new BatchBlock<string>(3);
static int counter = 0;

static async void Process()
{
    while (true)
    {
        var messages = await pubSub.ReceiveAsync();
        foreach (var item in messages)
        {
            counter++;
            Console.WriteLine("Got message number {0}: {1}", counter, item);
        }
    }
}

static void Main(string[] args)
{
    Process();

    for (int i = 0; i < 20; i++)
    {
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(new Random().Next(200, 1000));
            pubSub.Post(DateTime.Now.ToLongTimeString());
        });
    }

    Console.ReadLine();
}

 

BatchedJoinBlock<T1,T2>

CropperCapture[1]The second block implementation we look at is the BatchedJoinBlock<T1,T2> which is similar to BatchBlock<T> except, that it has multiple type inputs. This allows you to have a single publisher that can send messages to different subscribers based on type! The batching works the same as before, but be careful as batch sizes are based on all messages regardless of types.

static BatchedJoinBlock<string, int> pubSub = new BatchedJoinBlock<string, int>(3);
static int stringCounter = 0;
static int intCounter = 0;

static async void Process()
{
    while (true)
    {
        var messages = await pubSub.ReceiveAsync();
        foreach (var item in messages.Item1)
        {
            stringCounter++;
            Console.WriteLine("Got STRING message number {0}: {1}", stringCounter, item);
        }

        foreach (var item in messages.Item2)
        {
            intCounter++;
            Console.WriteLine("Got INT message number {0}: {1}", intCounter, item);
        }
    }
}

static void Main(string[] args)
{
    Process();

    for (int i = 0; i < 20; i++)
    {
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(new Random().Next(200, 1000));
            pubSub.Target1.SendAsync(DateTime.Now.ToLongTimeString());
        });

        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(new Random().Next(200, 1000));
            pubSub.Target2.SendAsync(new Random().Next(1, 99));
        });
    }

    Console.ReadLine();
}

Multiple Subscribers

CropperCapture[2]So what happens when you add multiple subscribers to the system? It handles each processing in a round robin like way. The sample below is the same as the BatchBlock<T> about, but  has three subscribers (A, B & C) and a batch size of two.

static BatchBlock<string> pubSub = new BatchBlock<string>(2);
static int counter = 0;

static async void Process(string id)
{
    while (true)
    {
        var messages = await pubSub.ReceiveAsync();
        foreach (var item in messages)
        {
            counter++;
            Console.WriteLine("{2} - Got message number {0}: {1}", counter, item, id);
        }
    }
}

static void Main(string[] args)
{
    Process("A");
    Process("B");
    Process("C");

    for (int i = 0; i < 11; i++)
    {
        Task.Factory.StartNew(() =>
        {
            Thread.Sleep(new Random().Next(200, 1000));
            pubSub.Post(DateTime.Now.ToLongTimeString());
        });
    }

    Console.ReadLine();
}
File attachments

.NET 4.5 Baby Steps, Part 3: IDataFlowBlock

Other posts in this series can be found on the Series Index Page

Introduction

A new interface in .NET is the IDataFlowBlock, which is implemented in many interesting ways, so to look at those we will start off with the simplest implementation. ActionBlock<TInput> is a completely new class in .NET 4.5 and provides a way of working with data in a very task orientated way. I simplistically think of this as the implementation of the IObserver interface we got in .NET 4. but do not limit your thinking to just that.

To use it, you first must add a reference to System.Threading.Tasks.DataFlow

image

In this simple first example I am doing a fairly simple Pub/Sub demo:

var subscriber = new ActionBlock<string>(input =>
    {
        Console.WriteLine("Got: {0}", input);
    });

for (int i = 0; i < 10; i++)
{
    Task.Factory.StartNew(() =>
        {
            Thread.Sleep(new Random().Next(200, 1000));
            subscriber.Post(DateTime.Now.ToLongTimeString());
        });
}

Console.ReadLine();
CropperCapture[3]

As IObserver<T>

So the first fantastic feature is that it does have the ability (via extension method) to be an IObsserver<T> so it really solves the need to build up your own subscriber classes when implementing a pub/sub model.

First is the code for the publisher class – this is normal for the IObservable<T> as we had in .NET 4. This just means our new code can play well with our existing code.

public class Publisher : IObservable<string>
{
    List<IObserver<string>> subscribers = new List<IObserver<string>>();

    public IDisposable Subscribe(IObserver<string> observer)
    {
        subscribers.Add(observer);
        return null;
    }

    public void Send()
    {
        foreach (var item in subscribers)
        {
            item.OnNext(DateTime.Now.ToLongTimeString());
        }
    }
}

For our demo code, which produces the same as above:

var publisher = new Publisher();

var subscriber = new ActionBlock<string>(input =>
    {
        Console.WriteLine("Got: {0}", input);
    });

publisher.Subscribe(subscriber.AsObserver());

for (int i = 0; i < 10; i++)
{
    Task.Factory.StartNew(() =>
        {
            Thread.Sleep(new Random().Next(200, 1000));
            publisher.Send();
        });
}

Complete

The next awesome feature is the Complete method which can be used to stop accepting of input when called – this is great for services where you want to shut down.

In this demo code it will run until you press enter:

var subscriber = new ActionBlock<string>(input =>
{
    Console.WriteLine("Got: {0}", input);
});


Task.Factory.StartNew(() =>
{
    while (true)
    {

        Thread.Sleep(new Random().Next(200, 1000));
        subscriber.Post(DateTime.Now.ToLongTimeString());
    }
});

Console.WriteLine("Press any key to stop input");
Console.ReadLine();
subscriber.Complete();
File attachments

.NET 4.5 Baby Steps, Part 2: Task timeout cancellation

Other posts in this series can be found on the Series Index Page

Introduction

When Tasks where introduced in .NET 4 one of the fantastic abilities was to be able to pass in a CancellationToken and use that to cancel/break out of tasks (think like a cancel button on a file copy).

So in the following code we create a cancellation source and pass the token to the task and it will output the date/time until you press enter. Then we call the Cancel method and it stops.

var cancelTokenSource = new System.Threading.CancellationTokenSource();

Task.Factory.StartNew(() =>
{
    while (!cancelTokenSource.IsCancellationRequested)
    {                    
        Console.WriteLine(DateTime.Now.ToLongTimeString());
        Thread.Sleep(1000);
    }
}, cancelTokenSource.Token);

Console.WriteLine("Press any key to cancel");
Console.ReadLine();
cancelTokenSource.Cancel();
Console.WriteLine("Done");
Console.ReadLine();

CropperCapture[1]

What is new in .NET 4.5?

.NET 4.5 adds a FANTASTIC new feature to this, the ability to cancel automatically after a set timeout! So all we need to is change the constructor and set the time out. In the demo below it is set to three seconds.

It is also important to note that it is time from when you create the token source and not time from when the task starts.

var cancelTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3))

Below in the screen capture, see how the word Done does not appear but processing stops? That is because it is cancelled (processing stopped) but I never pressed any keys – so it is still waiting for the readline above Done.

CropperCapture[2]

File attachments

.NET 4.5 Baby Steps, Part 1: ThreadLocal<T>

Other posts in this series can be found on the Series Index Page

Introduction

ThreadLocal<T> was introduced in .NET 4 and didn’t get much attention because it didn’t do much over the ThreadStaticAttribute which we have had since version 1 of the framework, so let’s just review what it does. In short it gives every unique thread that uses it, it’s own global field. Let’s look at this code:

static ThreadLocal<int> balances = new ThreadLocal<int>(() =>
    {                
        return 10;
    });

static void Main(string[] args)
{
    for (int i = 0; i < 10; i++)
    {
        new Thread(AddMoney).Start();
    }

    Console.ReadLine();
}

static void AddMoney()
{
    Console.WriteLine("Before {0}", balances.Value);
    balances.Value += new Random().Next(0, 1000);
    Console.WriteLine("After {0}", balances.Value);
}

Which produces:

CropperCapture[1]

Note that ever Before is set to 10 and that is because the lambda method that we pass to the ThreadLocal<T> constructor is run for each unique thread.

What’s new in .NET 4.5?

.NET 4.5 improves the usefulness of this by including the .Values parameter which allows you to list the results from each thread! To make use of this you need to opt-in in the constructor by adding true:

static ThreadLocal<int> balances = new ThreadLocal<int>(() =>
{
    return 10;
}, true);
And then in my demo I will output the results using:
foreach (var item in balances.Values)
{
    Console.WriteLine("Balance at end: {0}", item);
}

CropperCapture[2]

This is VERY useful when working with threads and doing individual calculations and then collating the results at the end!

Warning

ThreadLocal<T> only works with unique threads! So using with the TPL or ThreadPool which reuse threads will not work as expected!

File attachments

JSinSA 2012

jsinsa

AsHoIrXCEAA2s1YThis past weekend was the ever fantastic JavaScript in South Africa (JSinSA) conference. This year focus was on HTML 5, JavaScript & CSS 3 – easily some of the MOST important topics for developers regardless of platform to know about.

It was it’s second year and while I was very lucky to go to it as an attendee in the first year, this year I was even more lucky to be a presenter at the conference. I was also very lucky to present on a topic I am passionate about: Windows 8.

The talk provided an introduction to Windows 8 & how development works, and in the 45mins I was done, we built an application which could take a photo from a web cam and send it to Twitter (the actual photo is to the right).

You can get the slides and bits from the talk below.

File attachments
Demo Script (18.56 KB)
Completed Demo (660.07 KB)

South African Postal Codes for Windows Phone

1I can NEVER EVER remember the postal code for where I work or where I live – that little four digit number just eludes my brain. So to solve that I thought, why can’t I have EVERY postal code with me always? So that is what I made happen with this simple Windows Phone application: Postal codes!

It is dog simple: one input the name of what you want then hit search and boom results. It includes both street and box codes Smile

For the developers out there, this application source code is available too at: https://bitbucket.org/rmaclean/postal-codes

Windows 8 for the .NET developer

Last night I presented on Windows 8 for the .NET developer at the fantastic Developer User Group! We had a bumper crowd there which was great and really had some interesting discussions during and after the talk. Thank you to all that attended!

For those looking for the slides, demo script and demo bits they are below!

File attachments
Demo script (16.71 KB)