Asynchronous "For" Loops

With processors set to scale at the CPU level instead of the Mhz level, wouldn't it be great if you could write some code like:

StringBuilder output = new StringBuilder();
foreach(MyObj o in collection)
{
   string val;
   async
   {
      val = doSomethingHard(o);
   }
   sync
   {
      output.Append(val);
   }
}

 and have the compiler spawn a couple threads to do the work and take advantage of all the processors on the box?

 Well, you can come close to this experience with C# 2.0 thanks to anonymous methods and delegates:

AsyncHandler<MyObj> asyc = new AsyncHandler(); 

StringBuilder output = new StringBuilder();
foreach(MyObj o in collection)
{
   async.DoAsync(o, delegate (MyObj obj))
   {
     string val = doSomethingHard(obj);
     async.DoSync(delegate() {
       output.Append(val);
     });
   }
}
async.Done();

Just simple wrapping for async code in one delegate and the code that needs to be run non-async in another delegate. "How," you ask, "can I implement this?" Well, it's not hard. Here is an example to get you started (written in about 30 minutes, so debug/modify on your own):

 

using System;
using System.Threading;
using System.Collections.Generic;
using System.Text;

namespace ConsoleApplication3
{
    class Program
    {
        

        static void Main(string[] args)
        {
            int[] values = new int[100];
            for (int i = 0; i < values.Length; i++)
            {
                values[i] = i;
            }

            AsyncHandler<int> async = new AsyncHandler<int>();

            StringBuilder sb = new StringBuilder();
            foreach (int value in values)
            {
                async.DoAsync(value,
                    delegate(int param)
                    {
                        int secs = _rand.Next() % 5;                                             
                        System.Threading.Thread.Sleep(1000 * secs);
                        Console.WriteLine("t:" + new String('x', secs) + ":" + param);
                        
                        sb.AppendLine(param.ToString());

                        async.DoSync(
                            delegate()
                            {
                                Console.WriteLine("f");
                                sb.AppendLine("Finally: " + param);
                            }
                        );
                    });
            }
            async.Done();

            Console.WriteLine(sb.ToString());
            Console.WriteLine("Done...");
            Console.ReadLine();
        }

        

        static Random _rand = new Random();


        public delegate void Work<T>(T param);

        public class ThreadInfo<T>
        {
            public ThreadParam<T> param;
            public ThreadStart finalize;
            public Thread thread;
        }
        public class ThreadParam<T>
        {
            public T o;
            public Work<T> w;
        }

        public class AsyncHandler<T>
        {
            List<Thread> _threads = new List<Thread>();
            List<object> _threadInfo = new List<object>();
            List<object> _queue = new List<object>();

            int threadCount = 10;
            
            void doWork(object paramObj)
            {
                ThreadParam<T> param = (ThreadParam<T>)paramObj;
                try
                {                
                    param.w(param.o);
                }
                finally
                {
                    finish(Thread.CurrentThread, param);
                }
            }

            void launch(ThreadParam<T> param)
            {
                ParameterizedThreadStart start = new ParameterizedThreadStart(doWork);
                Thread t = new Thread(start);
                _threads.Add(t);
                
                foreach(ThreadInfo<T> info in _threadInfo)
                {
                    if (info.param == param)
                    {
                        info.thread = t;
                        break;
                    }
                }

                t.Start(param);                
            }
            public void DoAsync(T o, Work<T> w) 
            {
                ThreadParam<T> param = new ThreadParam<T>();
                param.o = o;
                param.w = w;

                ThreadInfo<T> info = new ThreadInfo<T>();
                info.param = param;
                _threadInfo.Add(info);

                if (_threads.Count < threadCount)
                {
                    launch(param);
                }
                else
                {
                    lock (_syncThread)
                    {
                        _queue.Add(param);
                    }
                }
            }

            public void Done()
            {
                
                bool clear = true;
                do
                {
                    clear = true;
                    foreach (ThreadInfo<T> info in _threadInfo)
                    {
                        if (info.thread == null) break;
                        if (info.thread.IsAlive)
                        {
                            clear = false;
                            break;
                        }
                    }
                    System.Threading.Thread.Sleep(100);

                } while (clear == false);

                foreach (ThreadInfo<T> info in _threadInfo)
                {
                    if (info.finalize != null)
                    {
                        info.finalize();
                    }
                }
                _threads.Clear();
                _threadInfo.Clear();            
            }

            static object _syncThread = new object();
            void finish(Thread t, ThreadParam<T> param)
            {
                _threads.Remove(t);

                lock (_syncThread)
                {
                    if (_queue.Count > 0)
                    {
                        ThreadParam<T> tp = (ThreadParam<T>)_queue[0];
                        _queue.Remove(_queue[0]);
                        launch(tp);
                    }                
                }            
                
            }

            public void DoSync(ThreadStart ts)
            {
                foreach (ThreadInfo<T> ti in _threadInfo)
                {
                    if (ti.thread == Thread.CurrentThread)
                    {
                        if (ti.finalize != null) throw new Exception("finalize already set");
                        
                        ti.finalize = ts;
                    }
                }
            }
        }
    }
}

1 Comment

Comments have been disabled for this content.