August 19, 2013

.Net Threading Part 2

So, my last article about threading was so popular, and since it is a big subject, I thought I would research Task Parallel Library. Tasks let you express threading in a very different way than what Object Oriented or Procedural programmer would be accustom to for threading. Let’s see why it resembles javascript promises.

If you have been seeing the progression of C#, it is clear that Microsoft is making the language more functional, like F#. Anonymous Types, Anonymous Functions, Linq, Extension Methods are all examples of the functional ideas making their way to C#, and for the most part they work great. Threads are another thing that can make a huge developer productivity boost by programming in a more functional way. This is because if you keep your data very immutable, then you can thread really easily. This is where Task Parallel library comes it, as it is Microsoft’s way to help you program Threads simpler.

Functions and Properties on Class Task

  1. 1) Start – This will start a Task
  2. 2) ContinueWith – After a Task is completed, this will execute.
  3. 3) WaitAll – Waits till all Tasks, in an array, are done.
  4. 4) Wait – Waits for just that one Task to be done.

Other Stuff

  1. 1) CancellationToken – Used to tell all Tasks, that are given the token, that cancel was called on that thread.
  2. 2) locks, Semaphore, mutex – These classes help keep Tasks, and Threads synchronized.
  3. 3) Async and Await – Two new keywords to decorate functions to tell the compiler what can be threaded. </ol> </p>

    Now that we have seen Task library and some of the other stuff to help thread with it, let’s check out a scenario:

     class Program { static readonly Semaphore semaphore = new Semaphore(Environment.ProcessorCount, Environment.ProcessorCount); static readonly ConcurrentDictionary<task,int> myCurrentTasks = new ConcurrentDictionary<task,int> (); static bool isExit; static void Main(string[] args) { int threadCount = 0; CancellationTokenSource cts = new CancellationTokenSource (); cts.Token.Register (() => {Console.WriteLine("Cancelling: "+myCurrentTasks.Keys.Count+" Threads!");}); DateTime start = DateTime.Now; Task.Factory.StartNew(() => { while (Console.ReadKey().Key != ConsoleKey.Q) ; isExit = true; }); while(true) { if (threadCount == int.MaxValue) { threadCount = 0; } semaphore.WaitOne(); Task task = new Task ( (state) => { Thread.CurrentThread.IsBackground = false; var doWork = new dodo (); doWork.work (cts.Token, state); semaphore.Release (); }, threadCount++, cts.Token); task.Start (); task.ContinueWith((theTask)=>{ int j = 0; myCurrentTasks.TryRemove(theTask, out j); theTask.Dispose(); }); myCurrentTasks.TryAdd( task, threadCount); int seconds = (DateTime.Now - start).Seconds; Console.WriteLine ("seconds: " + seconds); if (isExit) { cts.Cancel (); var tasks = myCurrentTasks.Keys.ToArray (); Task.WaitAll ( tasks.Where(p => p.Status != TaskStatus.Canceled).ToArray ()); Console.WriteLine("Cancelling Complete! This many threads: "+myCurrentTasks.Count); break; } } } } public class dodo { public void work(CancellationToken token, object state) { Console.WriteLine("Doing work at '{0}' state: '{1}'", DateTime.Now, state); Random r = new Random(); int counter = 0; int count = r.Next (6, 9); while (counter < count) { if (token.IsCancellationRequested) { Console.WriteLine("Cancelled work at '{0}' state: '{1}'", DateTime.Now, state); return; } Thread.Sleep ( 1000); counter++; } Console.WriteLine("DONE work at '{0}' state: '{1}'", DateTime.Now, state); } } 

    There is a bunch of stuff going on, so I will try to go over the good parts. Overall, the app is going to process a number of threads, equal to the Environment.ProcessorCount. This is to make sure we make the sweet amount of threads, which is generally one thread per core. Each Task processes work for 6 to 9 seconds. If the user presses 'q', then the Task uses CancellationTokenSource to cancel any working Tasks. While the Tasks are stopping, the Task.WaitAll(tasks.Where(p => p.Status != TaskStatus.Canceled).ToArray()); will wait for any thread that is still "working" to stop. The tasks array in the wait all is populated with each Task, and is removed when we call ContinueWith.

    If you read my previous post .Net Threading Part 1, There I have code that is very similar to the code above. The difference in this code is that, I can control when my Tasks/Threads are done, or when they should be done. ThreadPool can only tell threads to stop processing, nothing more. We have gain much more control with Tasks. You will also note that the more control that you get has given you more callbacks to program. The coding looks very much like a callback scenario from javascript.

    While ThreadPool is something that other languages have, like Java, they do not have the new implementation of Tasks. Tasks allow you more control over threads and give developers cleaner code. Special thanks to Alex and Scott for giving me some more reading about .Net Threads. Tasks are something you want to use over the other threading libraries.