Asynchronous "For" Loops
With processors set to scale at the CPU level instead of the Mhz level, wouldn't it be great if you could write some code like:
StringBuilder output = new StringBuilder();
foreach(MyObj o in collection)
{
string val;
async
{
val = doSomethingHard(o);
}
sync
{
output.Append(val);
}
}
and have the compiler spawn a couple threads to do the work and take advantage of all the processors on the box?
Well, you can come close to this experience with C# 2.0 thanks to anonymous methods and delegates:
AsyncHandler<MyObj> asyc = new AsyncHandler();
StringBuilder output = new StringBuilder();
foreach(MyObj o in collection)
{
async.DoAsync(o, delegate (MyObj obj))
{
string val = doSomethingHard(obj);
async.DoSync(delegate() {
output.Append(val);
});
}
}
async.Done();
Just simple wrapping for async code in one delegate and the code that needs to be run non-async in another delegate. "How," you ask, "can I implement this?" Well, it's not hard. Here is an example to get you started (written in about 30 minutes, so debug/modify on your own):
using System;
using System.Threading;
using System.Collections.Generic;
using System.Text;
namespace ConsoleApplication3
{
class Program
{
static void Main(string[] args)
{
int[] values = new int[100];
for (int i = 0; i < values.Length; i++)
{
values[i] = i;
}
AsyncHandler<int> async = new AsyncHandler<int>();
StringBuilder sb = new StringBuilder();
foreach (int value in values)
{
async.DoAsync(value,
delegate(int param)
{
int secs = _rand.Next() % 5;
System.Threading.Thread.Sleep(1000 * secs);
Console.WriteLine("t:" + new String('x', secs) + ":" + param);
sb.AppendLine(param.ToString());
async.DoSync(
delegate()
{
Console.WriteLine("f");
sb.AppendLine("Finally: " + param);
}
);
});
}
async.Done();
Console.WriteLine(sb.ToString());
Console.WriteLine("Done...");
Console.ReadLine();
}
static Random _rand = new Random();
public delegate void Work<T>(T param);
public class ThreadInfo<T>
{
public ThreadParam<T> param;
public ThreadStart finalize;
public Thread thread;
}
public class ThreadParam<T>
{
public T o;
public Work<T> w;
}
public class AsyncHandler<T>
{
List<Thread> _threads = new List<Thread>();
List<object> _threadInfo = new List<object>();
List<object> _queue = new List<object>();
int threadCount = 10;
void doWork(object paramObj)
{
ThreadParam<T> param = (ThreadParam<T>)paramObj;
try
{
param.w(param.o);
}
finally
{
finish(Thread.CurrentThread, param);
}
}
void launch(ThreadParam<T> param)
{
ParameterizedThreadStart start = new ParameterizedThreadStart(doWork);
Thread t = new Thread(start);
_threads.Add(t);
foreach(ThreadInfo<T> info in _threadInfo)
{
if (info.param == param)
{
info.thread = t;
break;
}
}
t.Start(param);
}
public void DoAsync(T o, Work<T> w)
{
ThreadParam<T> param = new ThreadParam<T>();
param.o = o;
param.w = w;
ThreadInfo<T> info = new ThreadInfo<T>();
info.param = param;
_threadInfo.Add(info);
if (_threads.Count < threadCount)
{
launch(param);
}
else
{
lock (_syncThread)
{
_queue.Add(param);
}
}
}
public void Done()
{
bool clear = true;
do
{
clear = true;
foreach (ThreadInfo<T> info in _threadInfo)
{
if (info.thread == null) break;
if (info.thread.IsAlive)
{
clear = false;
break;
}
}
System.Threading.Thread.Sleep(100);
} while (clear == false);
foreach (ThreadInfo<T> info in _threadInfo)
{
if (info.finalize != null)
{
info.finalize();
}
}
_threads.Clear();
_threadInfo.Clear();
}
static object _syncThread = new object();
void finish(Thread t, ThreadParam<T> param)
{
_threads.Remove(t);
lock (_syncThread)
{
if (_queue.Count > 0)
{
ThreadParam<T> tp = (ThreadParam<T>)_queue[0];
_queue.Remove(_queue[0]);
launch(tp);
}
}
}
public void DoSync(ThreadStart ts)
{
foreach (ThreadInfo<T> ti in _threadInfo)
{
if (ti.thread == Thread.CurrentThread)
{
if (ti.finalize != null) throw new Exception("finalize already set");
ti.finalize = ts;
}
}
}
}
}
}