Asynchronous "For" Loops
With processors set to scale at the CPU level instead of the Mhz level, wouldn't it be great if you could write some code like:
StringBuilder output = new StringBuilder(); foreach(MyObj o in collection) { string val; async { val = doSomethingHard(o); } sync { output.Append(val); } }
and have the compiler spawn a couple threads to do the work and take advantage of all the processors on the box?
Well, you can come close to this experience with C# 2.0 thanks to anonymous methods and delegates:
AsyncHandler<MyObj> asyc = new AsyncHandler(); StringBuilder output = new StringBuilder(); foreach(MyObj o in collection) { async.DoAsync(o, delegate (MyObj obj)) { string val = doSomethingHard(obj); async.DoSync(delegate() { output.Append(val); }); } } async.Done();
Just simple wrapping for async code in one delegate and the code that needs to be run non-async in another delegate. "How," you ask, "can I implement this?" Well, it's not hard. Here is an example to get you started (written in about 30 minutes, so debug/modify on your own):
using System; using System.Threading; using System.Collections.Generic; using System.Text; namespace ConsoleApplication3 { class Program { static void Main(string[] args) { int[] values = new int[100]; for (int i = 0; i < values.Length; i++) { values[i] = i; } AsyncHandler<int> async = new AsyncHandler<int>(); StringBuilder sb = new StringBuilder(); foreach (int value in values) { async.DoAsync(value, delegate(int param) { int secs = _rand.Next() % 5; System.Threading.Thread.Sleep(1000 * secs); Console.WriteLine("t:" + new String('x', secs) + ":" + param); sb.AppendLine(param.ToString()); async.DoSync( delegate() { Console.WriteLine("f"); sb.AppendLine("Finally: " + param); } ); }); } async.Done(); Console.WriteLine(sb.ToString()); Console.WriteLine("Done..."); Console.ReadLine(); } static Random _rand = new Random(); public delegate void Work<T>(T param); public class ThreadInfo<T> { public ThreadParam<T> param; public ThreadStart finalize; public Thread thread; } public class ThreadParam<T> { public T o; public Work<T> w; } public class AsyncHandler<T> { List<Thread> _threads = new List<Thread>(); List<object> _threadInfo = new List<object>(); List<object> _queue = new List<object>(); int threadCount = 10; void doWork(object paramObj) { ThreadParam<T> param = (ThreadParam<T>)paramObj; try { param.w(param.o); } finally { finish(Thread.CurrentThread, param); } } void launch(ThreadParam<T> param) { ParameterizedThreadStart start = new ParameterizedThreadStart(doWork); Thread t = new Thread(start); _threads.Add(t); foreach(ThreadInfo<T> info in _threadInfo) { if (info.param == param) { info.thread = t; break; } } t.Start(param); } public void DoAsync(T o, Work<T> w) { ThreadParam<T> param = new ThreadParam<T>(); param.o = o; param.w = w; ThreadInfo<T> info = new ThreadInfo<T>(); info.param = param; _threadInfo.Add(info); if (_threads.Count < threadCount) { launch(param); } else { lock (_syncThread) { _queue.Add(param); } } } public void Done() { bool clear = true; do { clear = true; foreach (ThreadInfo<T> info in _threadInfo) { if (info.thread == null) break; if (info.thread.IsAlive) { clear = false; break; } } System.Threading.Thread.Sleep(100); } while (clear == false); foreach (ThreadInfo<T> info in _threadInfo) { if (info.finalize != null) { info.finalize(); } } _threads.Clear(); _threadInfo.Clear(); } static object _syncThread = new object(); void finish(Thread t, ThreadParam<T> param) { _threads.Remove(t); lock (_syncThread) { if (_queue.Count > 0) { ThreadParam<T> tp = (ThreadParam<T>)_queue[0]; _queue.Remove(_queue[0]); launch(tp); } } } public void DoSync(ThreadStart ts) { foreach (ThreadInfo<T> ti in _threadInfo) { if (ti.thread == Thread.CurrentThread) { if (ti.finalize != null) throw new Exception("finalize already set"); ti.finalize = ts; } } } } } }