Fun With Threads
Hello Everyone! Been a long time. Well since its bitterly cold now & getting worse (gonna be -15F tonight) I've decided to blog (rather than go outside). I'd like to share some code with the world and I figured this would be an excellent place to try it. A friend of mine, WallyM has been having some difficulties with his web spider project and the framework threadpool. One of my suggestions was to go away from the threadpool and use either threads that he manages or a custom threadpool implementation. It just so happened that I'd written a self tuning threadpool a while back. I basically wrote this because it was listed as one of the Microsoft interview questions listed on Chris Sell's website. I thought it would be a neat little project and I'd learn a bit about threads at the same time. I've played with it for a while and I like where its going (I'm sure there are numerous improvements). I'm thinking that this threadpool would be great in a server application where you'd want to control the amount of threads available to any one component/task/etc..
The threadpool has 4 constructors, it allows you to specify a name for the pool, a minimum number of threads, and a maximum number of threads. If you leave out the min or max number of threads it will create 1 * Number_of_Processors for the Minimum and 5 * Number_Of_Processors for the maximum. As items are sent into the threadpool to be processed/executed the pool will determine if it needs to create new threads to help with the load. Its a simple algorithm i used, if (Items_in_Queue / number_of_threads) Create_new_thread. By the same token, if a thread trys to get something from the queue and fails it will be killed/aborted. This will continue until the minimum number of threads is reached. Once the Min Thread Count is reached and there's nothing in the work queue, the threads will be put to sleep and until there is work to do. I suspect I may need to refine parts of this functionality.
So here it is: Its designed for use with Whidbey, but it can be easily modified and used with .NET v1/v1.1
using System;
using System.Threading;
using System.Collections.Generic;
namespace SSMAGIC.PoseidonServer.Threading
{
/// <summary>
/// Summary description for ThreadPool.
/// </summary>
public class ThreadPool : IDisposable
{
int _numThreads = 0;
int _minNumThreads = 0;
int _maxThreads = 0;
string _poolName = "SSMAGIC THREADPOOL";
Thread _poolManagerThread = null;
Thread[] _threadPool = null;
Queue<ThreadWorkItem> UserWorkQueue = new Queue<ThreadWorkItem>();
bool _stopping = false;
public ThreadPool(string poolName, int minThreadCount, int maxThreadCount)
{
_numThreads = minThreadCount;
_minNumThreads = minThreadCount;
_maxThreads = maxThreadCount;
_poolName = poolName;
_poolManagerThread = new Thread(InitializePool);
_poolManagerThread.Start();
}
public ThreadPool(string poolName, int minThreadCount)
{
_numThreads = minThreadCount;
_minNumThreads = minThreadCount;
_maxThreads = Int32.Parse(System.Environment.GetEnvironmentVariable("NUMBER_OF_PROCESSORS")) * 5;;
_poolName = poolName;
_poolManagerThread = new Thread(InitializePool);
_poolManagerThread.Start();
}
public ThreadPool(string poolName)
{
_poolName = poolName;
_minNumThreads = Int32.Parse(System.Environment.GetEnvironmentVariable("NUMBER_OF_PROCESSORS"));
_numThreads = _minNumThreads;
_maxThreads = Int32.Parse(System.Environment.GetEnvironmentVariable("NUMBER_OF_PROCESSORS")) * 5;
_poolManagerThread = new Thread(InitializePool);
_poolManagerThread.Start();
}
public ThreadPool()
{
_poolName = "SSMAGIC THREADPOOL";
_minNumThreads = Int32.Parse(System.Environment.GetEnvironmentVariable("NUMBER_OF_PROCESSORS"));
_numThreads = _minNumThreads;
_maxThreads = Int32.Parse(System.Environment.GetEnvironmentVariable("NUMBER_OF_PROCESSORS")) * 5;
_poolManagerThread = new Thread(InitializePool);
_poolManagerThread.Start();
}
public void QueuePoolWorkItem(ThreadCallBack tb,object state)
{
UserWorkQueue.Enqueue(new ThreadWorkItem(tb, state));
}
private void InitializePool()
{
_threadPool = new Thread[_maxThreads];
for (int i = 0; i < _numThreads; i++)
{
_threadPool[i] = new Thread(new ThreadStart(ProcessWorkItem));
_threadPool[i].Name = _poolName + " Thread " + i.ToString();
_threadPool[i].Start();
}
while (!_stopping)
{
Thread.Sleep(1000);
ManageThreads();
}
// if here we're shutting down, abort all threads.
for (int j = 0; j < _numThreads; j++)
{
Console.WriteLine("Killing Thread {0}", _threadPool[j].Name);
_threadPool[j].Abort();
}
}
private void ManageThreads()
{
double workIndex = 0;
if (_numThreads != _maxThreads)
{
workIndex = UserWorkQueue.Count / _numThreads;
if (workIndex > 4)
{
// if there's too much activity waiting, then create a new thread to help.
_threadPool[_numThreads] = new Thread(new ThreadStart(ProcessWorkItem));
_threadPool[_numThreads].Name = _poolName + " Thread " + _numThreads.ToString();
_threadPool[_numThreads++].Start();
}
}
}
private void ProcessWorkItem()
{
ThreadWorkItem userWorkItem = null;
ThreadCallBack workItemCallBack = null;
try
{
while (true)
{
userWorkItem = UserWorkQueue.Dequeue();
workItemCallBack = userWorkItem.CallBack;
workItemCallBack(userWorkItem.State);
}
}
catch (InvalidOperationException IOEx)
{
/* A InvalidOperationException will be thrown when we can no longer get
ThreadWorkItems from the queue. In that event, we'll check to see if
the current number of threads is higher than the current minimum number
of threads. if so this thread will be terminated.
*/
IOEx.ToString();
lock (this)
{
if (this._numThreads > this._minNumThreads)
{ /* there are excess threads */
this._numThreads--;
Console.WriteLine("Killing Thread {0}",Thread.CurrentThread.Name);
Thread.CurrentThread.Abort ();
}
}
}
catch (Exception Ex)
{
throw Ex;
}
}
#region IDisposable Members
public void Dispose ()
{
// TODO: Add ThreadPool.Dispose implementation
this._stopping = true;
}
#endregion
}
public delegate void ThreadCallBack(object state);
internal class ThreadWorkItem
{
public ThreadWorkItem(ThreadCallBack tcb, object state)
{
_workItem = tcb;
_state = state;
}
public ThreadCallBack _workItem;
private object _state;
public ThreadCallBack CallBack
{
get { return _workItem; }
set { _workItem = value; }
}
public object State
{
get { return _state; }
set { _state = value; }
}
}
}
Sample App that uses the threadpool: Note, as this is a quick & dirty app, there's nothing to stop the threadpool normally you'd something stop it.
using System;
using System.Threading;
using SSMAGIC.PoseidonServer.Threading;
namespace ThreadpoolUnitTest
{
/// <summary>
/// Summary description for Class1.
/// </summary>
class ThreadPoolUnitTestClass
{
/// <summary>
/// The main entry point for the application.
/// </summary>
[STAThread]
static void Main(string[] args)
{
//
// TODO: Add code to start application here
//
ThreadCallBack tcb = new ThreadCallBack(ThreadPoolUnitTestClass.DoLotsOfWork);
object someData = new object();
SSMAGIC.PoseidonServer.Threading.ThreadPool myTp = new SSMAGIC.PoseidonServer.Threading.ThreadPool();
for (int i = 0; i < 50; i++)
{
myTp.QueuePoolWorkItem(tcb,someData);
}
//nsole.WriteLine("Finished with all work, closing... ");
//myTp.Dispose();
}
public static void DoLotsOfWork(object data)
{
Thread.Sleep(1500);
Console.WriteLine("{0} Is Doing Work", Thread.CurrentThread.Name);
}
}
}