I frequently come across the need to process large amounts (i.e. in the hundreds of thousands or even millions) of repetitive pieces of data such as records from a database table or a collection of objects, apply some business transformation, and them save them (e.g. rewrite them to the database, generate a report, etc.)
In a simple database example you could just write an SQL statement that does an UPDATE of the affected records based on certain selection criteria (as specified by the WHERE clause). This is a pretty limited way of doing things, however, and if you start getting into complex situations you might end up using cursors which are a whole other can of worms. So if you start performing the update in code, you are automatically obliged to get a DataReader, read the records one by one, apply your business logic, and the rewrite the results. For various reasons, this is very time consuming.
A non-database related example could be a collection holding several million objects that all need to be manipulated somehow. You will undoubtedly use a foreach() loop and handle the objects one by one, but this again is an inefficient use of CPU power. You can do better.
This article describes a very efficient and scalable mechanism for processing such large amounts of data. It uses multiple concurrent threads to handle small batches of work, and can take advantage of multiple core CPUs by creating more threads depending on how many cores there are.
A demo project is available at the codeproject. It creates 5,000,000 customer objects that hold basic information such as an ID and a name, updates a discount percentage for each, and reverses the customer name 50 times (just so we have something time consuming to do). It was run on my machine (which has a quad-core CPU) for a benchmark and produced the following processing times:
- Single thread execution: 5 minutes and 20 seconds.
- Multiple thread execution (running two threads per CPU): 2 minutes and 40 seconds.
You pick the winner.
Batch processing the old fashioned way
As stated in the introduction, you could process a batch of objects (or records) one at a time, by using a foreach() loop and handling the objects in that loop one at a time. Some of the advantages to this method are that it’s easier to understand the code and you don’t have to deal with threading. That having being said, the code can be perfectly understood if it’s well documented (which it should be!), and if you’re processing large amounts of data you should already be familiar with threading and should be using it by now. If you’re not, then get on it.
A simple one-batch example:
class CustomerMaintenanceIn this example, a list of Customer objects is passed down to a CustomerMaintenance class. The Update() method is then executed, and it just applies some changes to every single object in the list. This is a very simple example since the foreach() loop could very well contain much more complex code. It is that update code which will slow you down if you’re doing something complex.
// a collection of Custoemr objects that we need to process
// Constructor. User passes in the collection of Customer
// objects to process.
public CustomerMaintenance (List
mCustomerList = customerList;
// Update method applies all the business transformations
// to the Custoemr objects, one at a time.
public void Update()
foreach (Customer customerToUpdate in mCustomerList)
customerToUpdate.Discount *= 1.10;
Batch processing with concurrent threads
The biggest flaw in the example above is that it does not take advantage of multiple core CPUs, and is most definitely not scalable. If you compare the execution time between a single core and a multiple core machine running this code you might find a faster time on the latter, but it will not be a big difference. Moreover, the difference would most probably be due to a faster CPU, more RAM, or some other such factor. The biggest weapon in your arsenal - the extra CPUs - will just remain unused and untapped.
So here’s how you do it. Processing you data in concurrent threads involves several steps:
Step 1 - Assigning key values to each object
The first thing we need is the ability to refer to each object by a key value. When we launch the thread workers, we will not be passing them a batch containing all the objects to be handled. Rather, all those objects will remain in one collection visible to all the thread workers, and each worker will be working on distinct objects within this list, which they will access by a key.
Coming up with these keys is slightly different when working with objects as opposed to database records. If you’re dealing with a list of objects, you can either choose a property that will be unique amongst all those objects (such as the customer ID), or if you do not have such a property, you can simply use an incremental counter (which does not necessarily need to be a property of the objects).
To work with objects, simply declare a SortedList
// Shared collection, containing all the objects to updateIf, however, you’re dealing with database records, you will need to use a key field in your SELECT statement. Assuming you are going to issue an UPDATE on all customer records in a customer table, you should first issue a SELECT statement retrieving all the customer IDs that will be affected, and then store those IDs in a List
mCustomerList mCustomerList =
// code to populate the dictionary with the Customer objects goes here
// Create a list of keys containing all key values to the shared
// collection. This becomes a sort of index.
allCustomerIDs = new List (mCustomerList.Keys);
Step 2 - Preparing a semaphore
A semaphore (you will need to reference System.Threading for this one) is a very simple yet crucial element. It will control how many running thread workers we currently have, and when one has finished its work and exited, the semaphore will let us know that we can launch another worker. Semaphores are quite configurable, and you can easily specify how many requests it can handle.
// This will create a semaphore that helps controlStep 3 - Looping through the list of keys and dispatching work in batches
// as many thread launches as we need to.
Semaphore mSemaphore = new Semaphore(numberOfThreadsToUse, numberOfThreadsToUse)
Now comes the fun part. We loop while the list of keys created in step 1 contains data, and do the following:
- Wait until the semaphore has a free resource.
- Reserve a resource in the semaphore.
- Copy a predetermined number of items from keys list to a work list that will be passed down to the thread worker.
- Remove those same keys from the keys list so they are not dispatched for processing a second time.
- Launch the thread worker, passing it the work list. The thread will apply the business rules and modifications to the objects in the shared main list that are indexed by the key values passed down to it.
- When the thread worker is done, it releases the semaphore resource (thereby enabling the launch of another thread worker) and exits.
private void UpdateAllCustomersInConcurrentBatches()Step 4 - processing a batch of records in a thread worker
// retrieve the number of CPUs on this machine, and calculate the total number
// of threads we should run.
ManagementObjectSearcher managementObjectSearcher = new ManagementObjectSearcher("select * from Win32_Processor");
ManagementObjectCollection managementObjectCollection = managementObjectSearcher.Get();
int numberOfCpus = managementObjectCollection.Count;
int numberOfThreadsToUse = numberOfCpus * mMaxNumberOfThreadsPerCpu;
int batchSize = 5000;
// get a list of all the key values to process
allCustomerIDs = new List (mCustomerList.Keys);
while (allCustomerIDs.Count > 0)
// make of list of customer IDs to process in the next batch
customerIDsToProcess = allCustomerIDs.GetRange(0, System.Math.Min(batchSize, allCustomerIDs.Count));
// remove those customer IDs from the master list so they are not processed again
allCustomerIDs.RemoveRange(0, System.Math.Min(batchSize, allCustomerIDs.Count));
// wait for the semaphore to let us launch another thread
// launch a thread worker and give it the list of customer IDs to process
ThreadPool.QueueUserWorkItem(new WaitCallback(UpdateAllCustomersInSubBatch), customerIDsToProcess);
// ensure all threads have exited by waiting until we can get all the semaphore requests
for (int ctr = 0; ctr < numberOfThreadsToUse; ctr++)
The method to process the records will be launched in step 3, and will receive the list of keys to work with. It will then use a foreach() loop to go through them, and using each key in the loop, access a Customer object in the shared collection and apply the appropriate business rules and changes to it.
Similarly, if you are working with database records you would use this key value to issue a SELECT statement for one record in the table, fetch it, update it, and write it back (or maybe just issue an UPDATE statement).
private void UpdateAllCustomersInSubBatch(object state)A note about exceptions
customerIDsToProcess = state as List ;
foreach (long customerID in customerIDsToProcess)
Customer tempCustomer = mCustomerList[customerID];
// a foreach item cannot be passed down by reference, so pass
// a copy.
mCustomerList[customerID].Discount = tempCustomer.Discount;
mCustomerList[customerID].Name = tempCustomer.Name;
catch (Exception ex)
// An exception was raised. This thread has no access to the UI, so store the exception in
// mExceptions and get out.
// The work in this thread is complete. Release the semaphore request so that it can be reused to
// launch another thread worker for the next batch.
An important thing to note about threads is that you have to be careful with exceptions. In a single threaded application, exceptions will be available to all the objects up the execution path, and you can therefore trap and handle them at any point.
In a multithreaded application, however, exceptions will only go up as high as the first method to execute in that thread. If you do not have any try/catch blocks to handle the exceptions before (or on) that point, you’ll get an “unhandled exception” error, and the application will stop when the exception is raised.
To get around this, the demo uses a List
A note about thread safety and locking
It’s also important to note that since multiple threads will be accessing and modifying the same objects (e.g. the shared object list and the exception collection), we need to protect against corruption and ensure that only one thread at a time has access to the shared resources. To do this, we use the lock() command which ensures that only one thread at a time can access the code in the lock block and therefore only one thread at a time can update this hared resource.
Splitting up massive amounts of repetitive work into smaller batches and processing them in parallel is simply a must-have when it comes to processing large amounts of data. The benchmark I show in the introduction shows how you can easily save 50% of your execution time, although the savings will largely be determined by the exact type of transformations you need to make to the data, and how the code is implemented.
Still, I think this is a time saver that cannot be ignored, and I hope it helps someone out there.
A sample project is available for testing here: http://www.codeproject.com/KB/cs/FasterBatchProcessingWith.aspx