Oh the joy of Semaphore with ThreadPool

I recently created a small routine which would repeat a process using different accounts.  To speed up the process I then used a ThreadPool so I could get multiple process running at the same time to reduce the amount of time the complete process would take.  I do however want to block the calling thread until all processes have been complete.  This requires syncronization and the choice of many methods to achieve it.  I chose Semaphore.

Now I am no expert of Multithreading and Syncronization BUT I do know that Semaphore gave me a complete pain release while trying to achieve Syncronization. My intial problem came when I tried to do more iterations than 64 using WaitHandles, more specifically ManualResetEvents.  The method of:

WaitHandle.WaitAll(WaitHandle[] waithandles);

Did not work because it would not allow me to define more than 64 ManualResetEvents (I assume due to a hardware constraint).  This got me thinking even more.  The use of a counter would be good as I can pass state into the ThreadPool WaitCallback and thus increment a counter each time a process completes.

        private static int waitCounter;
        public void Execute()
        {          
            for (int i = 0; i < 10000; i++)
            {
                ThreadPool.QueueUserWorkItem(new WaitCallback(delegate(object state)
                {
                    Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
                    waitCounter++;
                    Console.WriteLine(waitCounter + " After incrementation");
                }));
            }
            while (true)
            {
                if (waitCounter == 10000)
                    break;
            }
            Console.WriteLine("Exiting");
        }

The only problem here is we run a while(true) loop which is processor intensive.  Semaphore is much more powerful. here is the above example but using Semaphore:

        private static Semaphore semaphore1 = new Semaphore(0, 10000);
        public void Execute()
        {         
            for (int i = 0; i < 10000; i++)
            {
                ThreadPool.QueueUserWorkItem(new WaitCallback(delegate(object state)
                {
                    int count = (int)state;
                    Console.WriteLine(count);
                    semaphore1.Release(1);
                }),i);
            }

            Console.WriteLine("Exiting");
        }

To explain the definition of Semaphore I really like this one: http://geekswithblogs.net/shahed/archive/2006/06/09/81268.aspx. The above example would work just aswell if I instantiated the semaphore with 20000, as in that case we are only using 10000 so we have enough keys.

Cheers

 

Andrew

Published Wednesday, February 27, 2008 7:06 PM by REA_ANDREW

Comments

# re: Oh the joy of Semaphore with ThreadPool

Tuesday, April 22, 2008 6:39 AM by booler

It doesn't seem to me as if the above code would block the calling thread- after the 10000 threadpool work items are queued, there is no test for which have completed, so the calling thread would barrel on regardless.

You would need something like this:

       public void Execute()

       {        

           for (int i = 0; i < 10000; i++)

           {

               ThreadPool.QueueUserWorkItem(new WaitCallback(delegate(object state)

               {

                   int count = (int)state;

                   Console.WriteLine(count);

                   semaphore1.Release(1);

               }),i);

           }

           for (int i = 0; i < 10000; i++)

           {

               semaphore1.WaitOne();

           }

           Console.WriteLine("Exiting");

       }

To check all worker threads have completed their work.

# re: Oh the joy of Semaphore with ThreadPool

Thursday, April 24, 2008 5:29 AM by REA_ANDREW

booler :: Cheers for the pointer!  I really appreciate it.  That makes perfect sense!

# re: Oh the joy of Semaphore with ThreadPool

Friday, January 23, 2009 5:05 AM by Juan

Using WaitHandle.WaitAll() will make it even more elegant

       const int TCOUNT = 10000;

       static WaitHandle[] waitHandles;

       static void Main(string[] args)

       {

           waitHandles = new WaitHandle[TCOUNT];

           for(int i=0;i<TCOUNT;i++)

               {

                   waitHandles[i] = new ManualResetEvent(false);

                   ThreadPool.QueueUserWorkItem(new WaitCallback(delegate(object state)

                   {

                       int count = (int)state;

                       Console.WriteLine(count);

                       ManualResetEvent m = (ManualResetEvent)(waitHandles[i]);

                       m.Set();

                   }), i);

               }

           WaitHandle.WaitAll(waitHandles);

       }

# re: Oh the joy of Semaphore with ThreadPool

Thursday, February 12, 2009 9:13 PM by Mark

Juan... WaitAll throws an exception when there are more than 64 wait handles in the waitHandles array.

A better approach is to initialize a counter, use Interlocked.Decrement(counter) every time a thread finishes, and when the counter reaches 0, call Set on a single wait handle.  But always make sure to decrement the counter and check for zero inside of a finally block.  Otherwise, if one of your works throws an exception, your application will never finish (yeah, hang).

Leave a Comment

(required) 
(required) 
(optional)
(required)