Introduction
We have spent a number of posts looking at IDataFlowBlock, BatchedBlocks and some other interesting blocks but as a final interesting bit for blocks is the ISourceBlock interface which is implemented on the “publisher” like blocks and it has a very interesting method: LinkTo. The goal of LinkTo is to chain blocks together to do interesting and amazing things!
It's in the name
The final interesting point, and likely the most important is in the name of the assembly all this goodness comes from: System.Threading.Tasks.DataFlow. All of this is handled my multiple tasks (and likely multiple threads) under the covers without you needing to worry about it. When we look at the TPL in .NET 4 and how it made working with multi-threaded processing easier, the final goal of this is to make multi-threaded data processing easier - and it does!
Demo LinkTo
In this demo what we want to do is take in DateTime’s as fast as possible, then convert them to strings, and send them in batches to the three subscribers. Sounds complex, but with the ability to LinkTo this is very easy.
static BufferBlock<DateTime> bufferPublisher = new BufferBlock<DateTime>(); static TransformBlock<DateTime, string> transformPublisher = new TransformBlock<DateTime, string>(d => { return d.ToLongTimeString(); }); static BatchBlock<string> batchPublisher = new BatchBlock<string>(5); static ActionBlock<string[]> subscriber1 = new ActionBlock<string[]>(s => { Console.WriteLine("Subscriber 1: {0}", s); }); static ActionBlock<string[]> subscriber2 = new ActionBlock<string[]>(s => { Console.WriteLine("Subscriber 2: {0}", s); }); static ActionBlock<string[]> subscriber3 = new ActionBlock<string[]>(s => { Console.WriteLine("Subscriber 3: {0}", s); }); static void Main(string[] args) { batchPublisher.AsObservable().Subscribe(subscriber1.AsObserver()); batchPublisher.AsObservable().Subscribe(subscriber2.AsObserver()); batchPublisher.AsObservable().Subscribe(subscriber3.AsObserver()); transformPublisher.LinkTo(batchPublisher); bufferPublisher.LinkTo(transformPublisher); for (int i = 0; i < 100; i++) { bufferPublisher.Post(DateTime.Now); Thread.Sleep(200); } Console.ReadLine(); }