Synchronization with async/await

Do you always want to block whole thread waiting for some resources? Probably not. Why not benefit from async/await constructs available in .NET 4.5? But you cannot simply await for ManualResetEvent or Semaphore. Here comes great series of posts by Stephen Toub on how to implement async-enabled synchronization mechanisms.

It starts with ManualResetEvent, and then continuous for total of 7 blog posts. Must-read for all async/await fans out there.

Just start from http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx and move forward.

Have fun!

Synchronizing execution with ManualResetEvent

Yesterday I was asked a question:

We have one thread that’s starting 200 additional calculation threads. How can we synchronize execution of the main thread so that it will wait until all 200 worker threads complete?

My first answer was: use Task Parallel Library, queue 200 Tasks and Task.WaitAll() on them. Easy, straightforward and very readable. Yet that was not exactly what was expected, since using TPL was for some reason banned ;)
Well then – we have all those fancy synchronization mechanisms, why not use them? Semaphor, Monitor, ManualResetEvent, AutoResetEvent – those were my first ideas. So I was asked to explain ManualResetEvent in more details.
Then things got a little bit more awkward for me. I knew some time ago how to use them and this would give me no problem. But since time has passed by, my mind decided that implementation details are not needed any more and garbage collected. So I started explaining how I think it works. Enough said, I messed up pretty much all that was to mess up. So to make up for it I decided to blog about it.

Here we have some basic implementation:

        private static void ThreadingProc()
        {
            const int calculationsCount = 200;

            for (int i = 0; i < calculationsCount; i++)
            {
                ThreadPool.QueueUserWorkItem(new WaitCallback(calculation), i);
            }

            Console.WriteLine("Calculation completed");
            Console.ReadKey();
        }

        private static void calculation(object data)
        {
            System.Threading.Thread.Sleep(100);
            Console.WriteLine("Completed calculation " + (int)data);
        }

We just fire 200 threads form ThreadPool, pass thread number to each one of them and display completion message. Each thread simulates some work and then displays its completion message as well.
Result is far from expected and looks something like this:

Calculation completed
Completed calculation 1
Completed calculation 2
Completed calculation 3
...

But what else would we expect from not synchronized parallelized execution? So let’s use ManualResetEvent to help us out.
Since our computation method can get only one parameter of object type, we will first create container class to prepare all parameters and then pass them to method:

        class TaskInfo
        {
            public int TaskId { get; set; }
            public ManualResetEvent Done { get; set; }

            public TaskInfo(int taskId, ManualResetEvent done)
            {
                this.TaskId = taskId;
                this.Done = done;
            }
        }

And then slightly modified code:

        private static void ThreadingProc()
        {
            const int calculationsCount = 200;
            ManualResetEvent[] sync = new ManualResetEvent[calculationsCount];

            for (int i = 0; i < calculationsCount; i++)
            {
                ManualResetEvent done = new ManualResetEvent(false);
                sync[i] = done;
                TaskInfo ti = new TaskInfo(i, done);
                ThreadPool.QueueUserWorkItem(new WaitCallback(calculation), ti);
            }

            WaitHandle.WaitAll(sync);

            Console.WriteLine("Calculation completed");
            Console.ReadKey();
        }

        private static void calculation(object data)
        {
            TaskInfo ti = (TaskInfo)data;
            System.Threading.Thread.Sleep(100);
            Console.WriteLine("Completed calculation " + ti.TaskId);
            ti.Done.Set();
        }

Works like a charm… Well, kind of. As you will probably notice, application will end with exception saying that WaitHandle.WaitAll can support up to 64 wait objects. That’s not going to stop us for long, does it?

            for (int i = 0; i < calculationsCount; i+=64)
            {
                WaitHandle.WaitAll(sync.Skip(i).Take(64).ToArray());
            }

This simple modification takes batches of 64 ManualResetEvents and waits for them. When they are completed, it waits for another batch and so on. Since we want to wait for all threads to complete – this give us exactly what we want with simple LINQ query.
And how would it look using TPL?

        private static void TaskProc()
        {
            const int calculationsCount = 200;
            Task[] tasks = new Task[calculationsCount];

            for (int i = 0; i < calculationsCount; i++)
			{
                tasks[i] = new Task(calculationTask, i);
                tasks[i].Start();
			}

            Task.WaitAll(tasks);
        }

        private static void calculationTask(object data)
        {
            System.Threading.Thread.Sleep(100);
            Console.WriteLine("Completed calculation " + (int)data);
        }

So quite similar, yet more readable for me. And a little faster (for 200 calculations, without console access, Tasks took about 2,5s on my machine, while ThreadPool needed 3,4s to complete) but don’t take my word on it, it was just quick mini benchmark. But mini benchmarks are fun! We all should do them sometimes just for a sake of doing them :)

Maybe this post will help someone someday. Or maybe just me in future.
See you next time!