The Present I Promised

As promised in the previous blog entry, here is the code I used to implement the Active Object pattern on a .Net project I’ve been working on.

Here’s the setup for the pattern again. I was working on rewriting some caching functionality on this project, and I had some housekeeping operations that had to happen in the background. This meant that I had to have multiple threads operating at the same time, but I had several choices as to how to do this.

Kind of the default way of doing stuff on multiple threads in .Net is to kick off the processing on a ThreadPool thread. This works out well if you don’t need to actively manage the threads, but just want them to operate on their own. Users of this functionality start it using the usual asynchronous method invocation stuff in .Net. The problem with this is that the threading policy is exposed to the entire world. If you ever wanted to change to programmatically controlled threads rather than thread pool threads, or wanted to call the methods in the same thread of the caller, etc, you’d have to change all of the calling code in the entire application. This is Bad.

What you’d rather do is to encapsulate the threading policy inside a class. That way, changing your mind is easy and cheap.

Now that we’ve decided where to put the logic, we have to decide how to represent it. What we’d really like to happen is for clients to talk to our class as if they were directly calling a method, but to have our class invoke the method for us, on our own thread of control. This is the essence of Active Object. It is used to decouple the invocation of a method from that method’s execution. The advantage of it is that allows you to serialize these method invocations, eliminating threading concerns from the invoked code. Single threaded code is much more simple to write, much less error prone, and generally to be preferred if at all possible.

So, how is this done? It’s actually pretty easy. The overall process involves a few moving parts.

  • Target Class — class on which methods are called
  • Command Classes — Part of communication mechanism between threads
  • Command Queue — Transfers command objects between threads

Target Class

The target class needs to have two different sets of methods. The first methods make up the public API, and is used by outside callers. This API doesn’t actually cause the behavior to happen, but it does arrange for the other set of APIs, the private set, to be called through the communication mechanism. In my case, this class was called the BackgroundScheduler, and it was responsible for allowing scavenging-type operations to happen when needed.

using System;
using System.Threading;

namespace Example
{
    internal class BackgroundScheduler
    {
        private ProducerConsumerQueue inputQueue = new ProducerConsumerQueue();
        private Thread inputQueueThread;
        private ScavengerTask scavenger;

        public BackgroundScheduler(ScavengerTask scavenger)
        {
            this.scavenger = scavenger;

            ThreadStart queueReader = new ThreadStart(QueueReader);
            inputQueueThread = new Thread(queueReader);
            inputQueueThread.IsBackground = true;
        }

        public void StartScavenging()
        {
            inputQueue.Enqueue(new StartScavengingMsg(this));
        }

        internal void DoStartScavenging()
        {
            scavenger.DoScavenging();
        }

        private void QueueReader()
        {
            while (true)
            {
                IQueueMessage msg = inputQueue.Dequeue() as IQueueMessage;
                try
                {
                    msg.Run();
                }
                catch (ThreadAbortException)
                {
                }
            }
        }
    }
}

BackgroundScheduler is responsible for orchestrating all the activities in this little drama. It owns something called a ProducerConsumerQueue, which is what holds the IQueueMessage objects as they are passed between threads. It also owns and manages its own internal thread, which is where the processing actually happens. When a request to do something comes in, the BackgroundScheduler takes that request, creates a derived IQueueMessage, queues that message, and returns to the caller. Later, the internal thread runs, picks up the message from the other side of the ProducerConsumerQueue, and executes it. The nice part about this is that each operation runs to completion before the next background task starts. Single threaded code!

Queue Messages


The IQueueMessage interface is really very simple, consisting of only a simple Run method.

namespace Example
{
    internal interface IQueueMessage
    {
        void Run();
    }
}

This method is implemented in the StartScavengingMessage.

namespace Example
{
    internal class StartScavengingMsg : IQueueMessage
    {
        private BackgroundScheduler callback;

        public StartScavengingMsg(BackgroundScheduler callback)
        {
            this.callback = callback;
        }

        public void Run()
        {
            callback.DoStartScavenging();
        }
    }
}

When the Run method of this class is called, it just calls back to the BackgroundScheduler, invoking the internal API to cause the behavior to get run. At construction, each IQueueMessage instance is given a reference to the object to callback, so that it can invoke the behavior. One of the criticisms I received on this piece of code during the review of it was that I could have used a delegate instead of the IQueueMessage interface and saved myself from having to write simple command class derived classes, but I thought that the explicit interface communicated better than a delegate. Maybe that’s my old C++ background shining through, but I think it is easier to read like this, so I kept it as you see it.

Producer Consumer Queue


The final piece of the puzzle is the ProducerConsumerQueue. This is a special kind of queue where producers can store their messages onto one side of the queue from any thread in the system, but the queue is drained from a single consumer on its own thread. The consumer waits until there is a message to read from the queue, then reads it and returns it.

using System;
using System.Collections;
using System.Threading;

namespace Example
{
    internal class ProducerConsumerQueue
    {
        private object lockObject = new Object();
        private Queue queue = new Queue();

        public int Count
        {
            get { return queue.Count; }
        }

        public object Dequeue()
        {
            lock (lockObject)
            {
                while (queue.Count == 0)
                {
                    if (WaitUntilInterrupted())
                    {
                        return null;
                    }
                }

                return queue.Dequeue();
            }
        }

        public void Enqueue(object o)
        {
            lock (lockObject)
            {
                queue.Enqueue(o);
                Monitor.Pulse(lockObject);
            }
        }

        private bool WaitUntilInterrupted()
        {
            try
            {
                Monitor.Wait(lockObject);
            }
            catch (ThreadInterruptedException)
            {
                return true;
            }

            return false;
        }
    }
}

This class is actually pretty simple. There is a Mutex that is shared between the Enqueue and Dequeue methods. In Dequeue, the code waits for the Mutex to be pulsed. When it receives this pulse, it pulls off an object and returns it to the caller. In the meantime, from any other thread in the system, other code is free to add messages to the front of the queue. Each of these enqueue operations causes the Mutex to be pulsed, triggering the dequeue in the other thread.

Obviously, given the multithreaded nature of this class, all operations that consume any of the shared state of the ProducerConsumerQueue must be locked using the lockObject. This is the only place in the BackgroundScheduler that explicitly has to understand threading issues.

Conclusion


That’s all there is to it. Using these few simple classes, I was able to allow callers to invoke behavior that my implementation chose to run in the background. The caller didn’t know anything about my threading policy, as it was entirely contained in my BackgroundScheduler object, so I was free to change that policy on a whim. I was able to keep the code inside the ScavengerTask (not shown) single threaded, since I was guaranteed that only one instance of it would be running at a time, and I was able to control when and where that code ran.

The only improvement I’d like to make in this little grouping is that I’d like to find a way to pass only an interface to the callback methods to the IQueueMessage objects. In C++, I’d do this by creating a private base class and passing that reference to the objects, but I can’t figure out a similar solution in .Net. There are some times that I long for the expressiveness and power that is C++ 🙂

Hopefully some of you actually read down to here, and if so, thanks for listening!

— bab

7 thoughts to “The Present I Promised”

  1. Hi Brian,

    I was a little confused by the comment in the WaitUntilInterrupted method saying "// Can NEVER get here". In this code the most likely path of execution is for Monitor.Wait to return true when the lock is re-aquired, and then WaitUntilInterrupted returns false. Am I missing something?

  2. I liked your idea (even though you didn’t) of using delegates. Here is what I came up with for the BackgroundScheduler:

    internal class BackgroundScheduler

    {

    protected delegate void CommandDelegate();

    private ProducerConsumerQueue inputQueue

    = new ProducerConsumerQueue();

    private Thread inputQueueThread;

    private CommandDelegate commandDelegate;

    public BackgroundScheduler(CommandDelegate commandDelegate)

    {

    this.commandDelegate = commandDelegate;

    inputQueueThread = new Thread(new ThreadStart(QueueReader));

    inputQueueThread.IsBackground = true;

    inputQueueThread.Start();

    }

    public void Do()

    {

    inputQueue.Enqueue(commandDelegate);

    }

    private void QueueReader()

    {

    while (true)

    {

    CommandDelegate commandDelegate

    = inputQueue.Dequeue() as CommandDelegate;

    commandDelegate();

    }

    }

    }

    As you can see, the delegate allows us to schedule any Command now.

    To simplify calling I also added a subclass for the Scavenger:

    nternal class ScavengerScheduler : BackgroundScheduler

    {

    public ScavengerScheduler(ScavengerTask scavenger)

    : base(new CommandDelegate(scavenger.DoScavenging))

    {

    }

    }

    This keeps the calling code pretty much the same:

    BackgroundScheduler scheduler

    = new ScavengerScheduler(new ScavengerTask());

    scheduler.Do();

  3. Scott,

    You’re right. I guess the comment got left over from a previous refactoring. it shouldn’t be in there, because that is the most normal way out of the method.

    Thanks for pointing that out.

  4. Ooops. The CommandDelegate declaration in my delegate code needs to be public if you pass it to the contructor.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.