Table of contents

  1. Inception
  2. Locks: class Queue
  3. wait() and notify(): class Results
  4. Threads: class Runner
  5. Putting it all together
  6. Result and download

1. Inception

Motivation

We start asking ourselves: what's the need of an event queuer? It's going to be some code effort after all, what are its uses?

An event queuer is basically a piece of software that receives events to be executed and – thus the term queue – executes them from the first arrived to the last, sequentially one after another.

This comes really handy whenever a number of long-execution functions has to be called from one important thread, such as the graphical thread, or when a common resource can be called by multiple threads which cannot block, or when a timeout handler is needed... the possibilities are endless.

And let's not forget we are talking about Python, with its reference to functions, it will be as easy as writing enqueue(someclass.somefunction), no messing around with interfaces in Java (though there's a EventQueue pre-defined in the java collection)

Features

We want our event queuer to have the following:

  • Should be simple to use – you istantiate it and bam! It runs with no problems.
  • Should be thread safe, we want it to handle all the concurrency, so we don't have to. That's the point: you call it from whichever thread and it works.
  • Should return results when we want them, and all the results, even if our call threw an exception. We need that same exception.
  • Should be smart: we don't want to see any busy wait, as some other implementations out there do. We're not banging rocks together here!

As for the public interface, we need to keep it really simple. Just one function to add an event to the queue, "enqueue", and another to stop processing and receiving events, "stop". Another function to get the results, you say? We'll come to that, we don't want a publicly available function to expose that.

Here's our first skeleton.

class EventQueue:
	def enqueue(self, '''function call''' highPriority = False):
		pass
	def stop(self, highPriority = False):
		pass

The class is defined with two functions and empty bodies (the pass keyword is to avoid compilation error). Both have one optional argument, highPriority. If an event is high priority it won't be put in the back of the queue, but right at the front. The same happens for quit, which – as you may have understood – doesn't stop any working job, it just waits for its turn.

And this is exactly everything an external object will see, just these two public functions.

How to call a function

The question remains: what shape should a function-which-calls-functions have? We will need:

  • A function reference, which will tell us exactly which function to call.
  • A list of unnamed parameters, for the standard function parameters.
  • A dictionary of named parameters, for parameters passed by name.

For example, a desired call to telephone.callPeople(bob, "483", showNumber = False, ifDiverted = telephone.ABORT) would become:

def execute(func, args = [], kwargs = {}):
	func(*args, **kwargs)

execute(telephone.callPeople, [bob, "483"], {'showNumber':False, 'ifDiverted': telephone.ABORT})

Separation of concerns

A good practice in computer programming is separation of concerns: therefore we are going to build three internal objects which will handle three different aspects of the event queuer's job:

  • Queue will handle the enqueuing and dequeuing operations (with no real knowledge of what kind of element it's moving around)
  • Results has to memorize all the function results, and return them when asked. By an authorized entity.
  • Runner will be the main gear of our event queue. Will run whenever needed, take the next element from the Queue and store the results in Results

2. Locks: class Queue

Remember this is a internal class of EventQueue? Well, it doesn't really matter. It's still a really good practice to just show a public interface and work with hidden internal variables and functions. It helps with the complexity and modularization of classes.

class EventQueue:
	def __init__(self):
		self._queue = self.Queue()

	def enqueue(self, func, args=[], kwargs={}, highPriority = False):
		element = self.packCall(func, args, kwargs)
		return self._queue.enqueue(element, highPriority)

	def packCall(self, func, args, kwargs):
		return (func, args, kwargs)

	class Queue:
		def __init__(self):
			self._list = []

		def enqueue(self, element, highPriority):
			if highPriority:
				self._list.insert(0, element)
			else:
				self._list.append(element)
			return

		def hasMore(self):
			return len(self._list) > 0

		def dequeue(self):
			return self._list.pop(0)

The code for the Queue subclass is still very simple. We can see that for this class every element is not a function call, just an element: the call is encapsulated in a tuple (func, args, kwargs) by the main EventQueue class.

Handling concurrency

And yet, the above code is too simple to handle concurrent threads adding function calls to the queue; we must handle concurrency in a safe and graceful way.

A lock is exactly what we need to make the magic happen, magic which is called mutual exclusion. We need to import threading, and then we'll exploit a threading.Lock() to prevent two different threads from accessing the queue at the same time. To access the queue, one first would have to:

  1. Acquire the lock
  2. Access the queue
  3. Release the lock

Pretty simple, right? The key point is that if thread A is trying to acquire the lock when thread B has got it and is accessing the queue, A will halt until thread B releases the lock. As a result, no more than thread will be accessing the queue at the same time.

The code to use a lock is the following:

import threading
lock = threading.Lock()
lock.acquire()
'''access the shared resource'''
lock.release()

And the acquire-release section can be rewritten in this compact and easier form:

with lock:
	'''access the shared resource'''

Which brings us to a revised code for the Queue object:

class Queue:
	def __init__(self):
		self._list = []
		self._lock = threading.Lock()

	def enqueue(self, element, highPriority):
		with self._lock:
			if highPriority:
				self._list.insert(0, element)
			else:
				self._list.append(element)

	def hasMore(self):
		with self._lock:
			return len(self._list) > 0

	def dequeue(self):
		with self._lock:
			return self._list.pop(0)

This object is very small but effective, an can be easily tested for its behaviour, one of the advantages of the separation of concerns.

3. wait() and notify(): class Results

The second subclass we're going to model should keep all the information about the results and give a chance of retrieval to authorized clients of our EventQueue.

Combined with references to functions, what we would like to obtain is to give whoever is enqueuing an event a function to get the execution result; therefore, only authorized clients can call that special function and get the result. We need to model these two possible sequences:

  1. The client enqueues an event, but does not want the result back (it throws the function away). We don't need or want to store the result indefinitely.
  2. The client enqueues an event and keeps track of the return function. We shall keep the result as long as the return function is kept.

References to an object's function are references indeed, so we can play around with how python's Garbage Collector works. Inside our Results class we will create small objects without referencing them. We're going to give the EventQueue class a function to store a result, and the client a function to retrieve that very same result.

Once a result has been stored the event queuer will drop any reference to it, so it will remain in memory for as long as the client will keep a reference to the return function.

class Results:
	def getResultContainer(self):
		container = self._Container()
		return (container.setResult, container.getResult)

	class _Container:
		def __init__(self):
			self._hasResult = False
			self._resultIsException = False
			self._result = None

		def setResult(self, result, resultIsException):
			self._hasResult = True
			self._resultIsException = resultIsException
			self._result = result

		def getResult(self):
			if self._hasResult:
				if self._resultIsException:
					raise self._result
				else:
					return self._result

As you can see, the Results class forgets about the container immediately, but only returns a tuple (setter, getter) of functions – not the container itself. The container in itself is very simple and returns the result as expected, or raises the exception if that was the actual result. Is this okay? Once again, we failed to think concurrently.

Concurrency once again

Try and imagine two separate threads, the A thread in which a client is asking the result and the B thread with the event queue updating the result. Now try and follow these flows of executions and their result (on the same object):

  1. A.17) getResult()
  2. B.12) setResult(SomeError("Everything's wrong!"), True)
  3. B.13)     self._hasResult=True
  4. A.18)     if True:
  5. A.19)         if False:
  6. A.22)             return None
  7. B.14)     self._resultIsException = True
  8. B.15)     self._result = SomeError("Everything's wrong!")

What's the result? Thread A believes (correctly) that a result has been stored, but it got None as a result, and thread B is certain that its result has been stored and will be rerieved later on.

Furthermore, how can a client understand if the result was None or the result simply wasn't ready? And what if the client wanted to wait for the result in a simple and efficient way? Smile folks, it's time once again of Uncle Concurrency!

To wait or not to wait?

This time we are going to need a lock to handle mutual exclusion on setting the result, and some wait mechanism for threads who want a not-yet-ready result, which is exactly what the class threading.Condition() offers free of (computational) charge.

When a wait() is called by thread A on class cond, the thread is stopped until another thread, i.e. B calls notify() on cond. Then A continues its execution from the point it called the wait primitive.

Programmers familiar with Java and concurrency may notice a strict resemblance, but Python forces us to first acquire the lock, then wait or notify. And of course, waiting releases the lock for another thread to acquire it.

class _Container:
	def __init__(self):
		self._condition = threading.Condition()
		self._hasResult = False
		self._resultIsException = False
		self._result = None

	def setResult(self, result, resultIsException):
		with self._condition:
			self._hasResult = True
			self._resultIsException = resultIsException
			self._result = result
			self._condition.notify()

	def getResult(self):
		with self._condition:
			while not self._hasResult:
				self._condition.wait()
			if self._resultIsException:
				raise self._result
			else:
				return self._result

As you can see, a thread asking for the results hangs on line 18 until another thread sets the result and calls notify(). Note the while...wait cycle: it's not strictly correct in this context, but is generally used for larger classes in which a thread may be awakened for other reasons, and its sleeping condition may not be met.

Revisiting Queue

This idea about wait-and-notify interactions is so neat that we can revisit our Queue subclass to act in the same way. The thread which will wait() when there are no elements to process will be our runner thread (see below), and the notify() will be called whenever an element is enqueued.

class Queue:
	def __init__(self):
		self._list = []
		self._condition = threading.Condition()

	def enqueue(self, element, highPriority):
		with self._condition:
			if highPriority:
				self._list.insert(0, element)
			else:
				self._list.append(element)
			self._condition.notify()

	def hasMore(self):
		with self._condition:
			return len(self._list) > 0

	def dequeue(self):
		with self._condition:
			while not self.hasMore():
				self._condition.wait()
			return self._list.pop(0)

Our lock has become a threading.Condition() so we can call the wait and notify primitives. Now whenever a thread asks for the next element and there is none, it will block until another enqueues an event.

4. Threads: class Runner

The last subclass of EventQueue will introduce something we've already mentioned: a thread. A thread is a flow of execution, independent from other flows. We do need a thread, because we want our runner to call the queued functions from its own: the point of an event queue is exactly not having to call functions from the client's side.

Our Runner is derived from threading.Thread, and has to override the run() function, in which all the execution logic will be stored. Keep in mind, though, that calling run() will generate no new thread, it will just lounch the runner's logic on the calling thread. To start a new thread with its logic, one has to call the start() function, which effectively spawns a new flow of execution to run on its own.

class Runner(threading.Thread):
	def __init__(self, getNextFunc):
		threading.Thread.__init__(self)
		self._running = True
		self._getNextFunc = getNextFunc

	def run(self):
		while self._running:
			next = self._getNextFunc()
			self._execute(next)

	@staticmethod
	def packCall(func, args, kwargs, setResultFunc):
		return (func, args, kwargs, setResultFunc)

	def _execute(self, element):
		(func, args, kwargs, setResultFunc) = element
		try:
			result = func(*args, **kwargs)
			setResultFunc(result, resultIsException = False)
		except Exception, exception:
			setResultFunc(exception, resultIsException = True)

Note in the code above:

  • Our runner is defined without an explicit link to a getNext() function, which has to be provided on initialization. We will directly link this to Queue.dequeue(), as it does exactly what runner expects: blocks until an element is present and returns it.
  • The call to __init__ is mandatory for sub-classes of threading.Thread. It's for initialization purposes.
  • The run cycle employs the getNextFunc() passed on initialization, so we just need a pointer to a function to get the next event to execute.
  • We have moved the packCall function inside this subclass, since it will be the actual executor of the functions, and made it static (with the @staticmethod decorator) because it's independent of the instance into which it's being called.
  • On execution we save the result on the result function of the event, and if necessary we catch any exception and save it in the result as an error.

A graceful stop

We have yet to define a way to stop the thread, when EventQueue.stop(highPriority = False) will be called. First of all, if our thread is executing line 19 of the above code, meaning it's actually running the queued function, we won't be able to stop it, and we won't try. We shall wait until the thread is ready again to get another event and then set its _running variable to False, therefore stopping the loop in run() and exiting gracefully.

Should every class/every thread call this function? You've guessed it, we don't want to allow that. We want our stop function to be called by our Runner thread instance directly, and the best way to do it is to execute the stop funtion as another simple event.

# class Runner(threading.Thread) continued
	def getStopCall(self, setResultFunc):
		return (self._stop, [], {}, setResultFunc)

	def _stop(self):
		self._running = False

Whenever the client wants our event queue to stop, the EventQueue instance shall get the stop function and enqueue it, either with priority or not. The runner will then call it in its own thread and exit its running function afterwards.

5. Putting it all together

We have defined our public interface and our three subclasses and roles. Now it's time to glue those together and put some oil between the gears.

Basically we'll have to add some logic in methods of EventRunner to link the lower classes together, only considering – of course – their public methods.

Draft of a class diagram

Class diagram for EventQueue

Programmatic glue

Without much effort we link the functions together to obtain the following draft of our event queue:

import threading

class EventQueue:
	def __init__(self):
		self._queue = self.Queue()
		self._results = self.Results()
		self._runner = self.Runner(self._queue.dequeue)

		self._runner.start()

	def enqueue(self, func, args=[], kwargs={}, highPriority = False):
		(setResultFunc, getResultFunc) = self._results.getResultContainer()
		element = self.Runner.packCall(func, args, kwargs, setResultFunc)
		self._queue.enqueue(element, highPriority)

		return getResultFunc

	def stop(self, highPriority = False):
		(setResultFunc, getResultFunc) = self._results.getResultContainer()
		element = self._runner.getStopCall(setResultFunc)
		self._queue.enqueue(element, highPriority)

		return getResultFunc

	class Queue:
		def __init__(self):
			self._list = []
			self._condition = threading.Condition()

		def enqueue(self, element, highPriority):
			with self._condition:
				if highPriority:
					self._list.insert(0, element)
				else:
					self._list.append(element)
				self._condition.notify()

		def hasMore(self):
			with self._condition:
				return len(self._list) > 0

		def dequeue(self):
			with self._condition:
				while not self.hasMore():
					self._condition.wait()
				return self._list.pop(0)

	class Results:
		def getResultContainer(self):
			container = self._Container()
			return (container.setResult, container.getResult)

		class _Container:
			def __init__(self):
				self._condition = threading.Condition()
				self._hasResult = False
				self._resultIsException = False
				self._result = None

			def setResult(self, result, resultIsException):
				with self._condition:
					self._hasResult = True
					self._resultIsException = resultIsException
					self._result = result
					self._condition.notify()

			def getResult(self):
				with self._condition:
					while not self._hasResult:
						self._condition.wait()
					if self._resultIsException:
						raise self._result
					else:
						return self._result

	class Runner(threading.Thread):
		def __init__(self, getNextFunc):
			threading.Thread.__init__(self)
			self._running = True
			self._getNextFunc = getNextFunc

		def run(self):
			while self._running:
				next = self._getNextFunc()
				self._execute(next)

		def getStopCall(self, setResultFunc):
			return (self._stop, [], {}, setResultFunc)

		@staticmethod
		def packCall(func, args, kwargs, setResultFunc):
			return (func, args, kwargs, setResultFunc)

		def _execute(self, element):
			(func, args, kwargs, setResultFunc) = element
			try:
				result = func(*args, **kwargs)
				setResultFunc(result, resultIsException = False)
			except Exception, exception:
				setResultFunc(exception, resultIsException = True)

		def _stop(self):
			self._running = False

The initialization process is just instantiation of the subclasses and the green light for the runner thread to start(). Note that we instantiate the runner telling it to call the function self._queue.dequeue to get the next element. When a client wants to enqueue a function we return it the getResult function, whereas we keep instead the setResult and pass it to our runner which will update the result.

Further improvements

The code above works perfectly as described, but we want a cleaner implementation for clients and correctness in regards to jobs which will be never executed.

The basic idea, once EventQueue.stop() is called, is that every event that won't be processed will be marked as such; a clean way to do it is to create an exception class, UnprocessedEvent, and for each non-executed job (or added after the queue has been stopped) save its result as a thrown exception.

Failing to do so could lead to some threads waiting for results that will never come, because their events had fallen after a stop event. With this solution, these threads are awakened as soon as the runner stops, and they can detect the error.

class UnprocessedEvent(Exception):
	def __init__(self, reason):
		self._reason = reason

	def __str__(self):
		return str(self._reason)

	def __repr__(self):
		return "UnprocessedEvent(" + repr(self._reason) + ")"

This is how we mark events unprocessed; the only viable candidate for this function (it has to unpack the element) is, of course, Runner.

def flagError(self, element, message="Event has not been processed."):
	(func, args, kwargs, setResultFunc) = element
	setResultFunc(UnprocessedEvent(message), resultIsException = True)

Adding new events when the thread is dead will immediately drop the event and save the exception with a basic description:

def enqueue(self, func, args=[], kwargs={}, highPriority = False):
	(setResultFunc, getResultFunc) = self._results.getResultContainer()
	element = self.Runner.packCall(func, args, kwargs, setResultFunc)

	if self._runner.isAlive():
		self._queue.enqueue(element, highPriority)
	else:
		self._runner.flagError(element, "Event has been added after the stop() event.")

	return getResultFunc

And of course, when the stop() function is called we will need our EventQueue to fetch all the remaining events and flag them as not executed. We edit the Runner.getStopCall() and Runner._stop() functions:

def getStopCall(self, afterStopFunc, setResultFunc):
	return (self._stop, [afterStopFunc], {}, setResultFunc)

def _stop(self, afterStopFunc):
	self._running = False
	afterStopFunc()

And then we link the call to EventQueue._flushQueue(), which does not want to wait for another element, of course:

def stop(self, highPriority = False):
	(setResultFunc, getResultFunc) = self._results.getResultContainer()
	element = self._runner.getStopCall(self._flushQueue, setResultFunc)
	self._queue.enqueue(element, highPriority)

	return getResultFunc

def _flushQueue(self):
	while self._queue.hasMore():
		element = self._queue.dequeue()
		self._runner.flagError(element)

Another concurrency error

There is, unfortunately, another problem with the last solution regarding concurrency, and finding it is left to the reader. The next section presents the final code and fixes this error, but it's good exercise to spot such problems by themselves.

If you do find it you'll get a sticker saying: «Now you're thinking with concurrency». If not, don't worry, it takes a lot of practice to find such small errors, even though they may have a catastrophical impact on your program.

6. Result and download

Version Format Download Size
0.1 zip enqueuer-0.1.zip 30.28k
tar.xz enqueuer-0.1.tar.xz 17.18k

This tutorial is released under the terms of the GPLv3 or any later version. See the details.