Use Python’s sys.settrace() for fun and for profit

The itch to scratch

Everyone in the software industry knows Kent Beck, the pioneers of extreme programming and test-driven development and the co-author of JUnit. One of his lesser known project was JUnitMax, which aims to reduce the time developers have to wait while tests are running. One of the ideas behind that is that when code changes, only the test cases that exercise the code need to be run, instead of running the entire suite. The idea makes a lot of sense to me, but at the time, I (and the development shop I was in) wasn’t practising enough TDD, so unit test time wasn’t a big problem for me back then.

Fast-forward a few years, now as the project in my current company gets bigger, the time it takes to run tests is slowly becoming an impeding factor of my productivity. I remembered JUnitMax and say to myself, wouldn’t it be neat if something like JUnitMax were available? As the name suggests, JUnitMax is for Java while my project is in Python. Java, being a statically-typed language, has the blessings of statical analysis, which means a tool like JUnitMax can figure out which test cases cover which lines of code simply by type analysis. Python, however, being a dynamic language, doesn’t have this ability.

A few days ago, while I was running unit tests with coverage, it dawned on me that if the coverage tool knows which lines of the source code is covered by unit tests, couldn’t the same technique be used to figure out which lines are covered by which test cases?

So, I started looking into coveragepy‘s source code, and watching its author Ned Batchelder‘s excellent PyCon2011 video on sys.settrace. I wanted to build a proof-of-concept tool that integrates with the de-facto Python unit-test tool nose, that, when run, gathers the information about which lines in the files in the source folder are covered by which test cases, and hence nostrils is born.

Here comes `sys.settrace()`

Python’s motto is “batteries included”. This is manifested in many Python’s stanndard library modules, such as ast (source code parsing) and dis (bytecode disassembly). One of which is the ability to make the Python interpreter call an external function whenever a line of code is being executed. You can do a lot of fun stuff with it, for example, Coverage.py uses this to build code coverage data; pdb uses it to insert breakpoints into a running application and change the way a Python program is executed.

How can it be used?

For nostrils, we need to write a nose plugin that installs the trace function when a test is encountered. The trace function records the line numbers and the current test case name. After all tests are run, we have our map.

A simple use case

To start, we need a simple use case:

# worker.py
# this is the code-under-test
def add(x, y):
    z = x + y
    return z

def subtract(x, y):
    z = x - y
    return z

 

# test_worker.py
# test cases

import worker

def test_add():
    assert 1 == worker.add(1, 0)

def test_add___negative():
    assert 0 == worker.add(-1, 1)

def test_subtract():
    assert 0 == worker.subtract(0, 0)

class TestFoo(object):

    def test_add(self):
        assert 5 == worker.add(5, 0)

As you can see, we have 4 tests and 2 methods-under-test. Our goal is that when running `nosetests –with-nostrils` (–with-nostrils is the switch to turn on the nostrils plugin), we get the following mappings:


worker.py

def add(x, y):
  z = x + y # test_add, test_add_negative, TestFoo.test_add
  return z  # test_add, test_add_negative, TestFoo.test_add

def subtract(x, y):
  z = x - y # test_subtract
  return z  # test_subtract

Nose plugin

I won’t go into the details about how to create a plugin for nose. You can read it here, and you can take a look at my sample setup here. In a nutshell, every plugin has a name, and when nose is supplied with –with-plugin_name, your plugin is activated. Nose provides a test lifecycle “hooks” that plugins can implement. For example, startTest is called when a test case is discovered and adapted into a nose TestCase. addSuccess is called when a test case succeeded. finalize is called when all tests are finished.

Here’s how my plugin looks like:

class Nostrils(Plugin):
    name = 'nostrils'

    def addError(self, test, err, *args):
        self._restore_tracefn()

    def addFailure(self, test, err, *args):
        self._restore_tracefn()

    def addSkip(self, test, err):
        self._restore_tracefn()

    def addSuccess(self, test, err):
        self._restore_tracefn()

    def startTest(self, test):
        self._current_test = test
        self._install_tracefn()

    def finalize(self, result):
        self._print()

    def _install_tracefn(self):
        self._orig_tracefn = sys.gettrace()
        sys.settrace(self._trace) # See below

    def _restore_tracefn(self):
        sys.settrace(self._orig_tracefn)

The idea is that we install the trace function when test starts, and restore the trace function back to what it was. We also keeps track of what’s the current test in self._current_test.

Trace function

Now let’s have a look at the trace function:

class Nostrils(Plugin):
  # ...
  def _trace(self, frame, event, arg):
    if event == 'line':
      self._trace_down(frame)
    return self._trace

  def _trace_down(self, frame):
    while frame is not None:
      if frame.f_code == test.__call__.func_code:
        break

      self._collect(frame)
      frame = frame.f_back

A trace function should take 3 parameters:

  • frame: the current frame object
  • event: what type of event that triggered the trace function? See here
  • &asterisk;args: any additional arguments

Here, I’m only interested in the ‘line’ event, which is triggered when a new line of code is being executed. When this happens, we invoke _trace_down, which walks the frame stack by recursing on frame.f_back. When it’s None, we’re at the bottom of the stack. Because we’re tracing the execution of tests, we can probably stop traversing when the code object of the frame is the entry point of the test case (if frame.f_code == test.__call__.func_code). This way, we save ourselves some unnecessary traversals.

Data Collection

There’s are few things we need to collect: filename, line number of the code being executed and the test case name that covers the code.

class Nostrils(Plugin):
  def __init__(self):
    super(Nostrils, self).__init__()
    self._data = defaultdict(
      lambda : defaultdict(
        lambda : set([])
      )
    )

  def _collect(self, frame):
    filename, lineno = frame.f_code.co_filename, frame.f_lineno
    self._data[filename][lineno].add("%s:%s.%s" % self._current_test.address())

The data structure we use here is a dictionary of dictionary. At the top level, the keys are filenames, and the values are dictionaries of with keys the line numbers and the values the set of test case names. The data structure looks like this:

{
  'foo.py':{
      1 : set(['test_foo.py:test_foo_case1', 'test_foo.py:test_foo_case2']),
      2 : set(['test_foo.py:test_foo_case1', 'test_foo.py:test_foo_case2']),
      3 : set(['test_foo.py:test_foo_case2'])
  }
}

There we have it! We have a prototype of what could become a PyUnitMax 😉

Potential Problems

  • Scale: Now I’m only running nostrils on trivial code base. Profiling and optimization is needed if nostrils were to be used in real-world cases.
  • Multi-threading: No consideration was given to multi-threading at this stage.

Collaborators welcome!

I have since refactored the code, revised the data structure and published it on github. Please provide me with feedbacks and suggestions.

Realtime notification delivery using rabbitmq, Tornado and websocket

Our company has “hack-off” days once a while, where we developers get to choose whatever we would like to work on and present it to the entire company by the end of the day. I have been hearing this websocket buzz for a while now and would like to build something interesting with it.

WebSocket

Websocket is a persistent bi-directional connection between the browser and the server. With websocket, web browser can post message to the server, but what’s more interesting is that the server is able to push messages to the client (browser). This breaks away from the traditional web application request/response model. Traditionally, the client makes the request and waits for the server to give an answer. AJAX is revolutionary, but essentially, it’s still the same model: the client asks the server whether there’s anything interesting, but not the other way around. With websocket, the server suddenly becomes more involved and able to deliver more engaged user experience.

Our company provides web application for online invoicing. The web application allows users to create clients, create invoices, send them to clients, and so on. Each one of these are “events” which gets sent to RabbitMQ. We then have a plethora of RabbitMQ consumers that read messages off the queue and do interesting stuff with them.

Proof of concept

For this hack-off, my goal is to write a RabbitMQ consumer that reads the messages off the message queue, and deliver (notify) them to the front-end using websocket.

I’ve heard good things about Tornado. Having read their docs on websocket request handler, I felt it’s straightforward enough for me, so I chose Tornado as my backend.

Pika

One problem arises, though: The tornado server will run as a regular server, waiting for incoming websocket connections. The RabbitMQ consumer also needs to be in the same process event loop, waiting for incoming messages from the message queue. I looked into a few solutions such as sparkplug and stormed-amqp, neither seem to be a good hit here. Finally, I stumbled on Pika. It comes with a Tornado event loop adapter, which allows rabbitmq consumer and websocket handlers to run inside the same event loop. Perfect.

The entry point looks like this:


application = tornado.web.Application([
    (r'/ws', handlers.MyWebSocketHandler),
])

def main():
    pika.log.setup(color=True)

    io_loop = tornado.ioloop.IOLoop.instance()

    # PikaClient is our rabbitmq consumer
    pc = client.PikaClient(io_loop)
    application.pc = pc
    application.pc.connect()

    application.listen(8888)
    io_loop.start()

class MyWebSocketHandler(tornado.websocket.WebSocketHandler):

    def open(self, *args, **kwargs):
        pika.log.info("WebSocket opened")

    def on_close(self):
        pika.log.info("WebSocket closed")

That was straightforward. However, I’m faced with the problem of how to make the amqp consumer notify websocket handlers when we receive a message from the message queue. We cannot get the handler instances from the tornado application object. Note, each websocket connection has a corresponding “MyWebSocketHandler“ instance. The instances are not available from the application object. Maybe there’s a way to get them by other means, but I’m not familiar with the tornado API enough to know that.

However, from the handler, we do get the “application“ object, and because we attached pika_client (our amqp consumer) to the application, we have access to it inside our socket handler. Hey, how about registering the handler with the client when the websocket is connected, and let the client “notify” the handler when events are received? Hey, isn’t that the observer pattern?

Here’s the code:

class MyWebSocketHandler(websocket.WebSocketHandler):

    def open(self, *args, **kwargs):
        self.application.pc.add_event_listener(self)
        pika.log.info("WebSocket opened")

    def on_close(self):
        pika.log.info("WebSocket closed")
        self.application.pc.remove_event_listener(self)

Now, our PikaClient object need to support add_event_listener() and remove_event_listener() methods.

class PikaClient(object):

    def __init__(self, io_loop):
        pika.log.info('PikaClient: __init__')
        self.io_loop = io_loop

        self.connected = False
        self.connecting = False
        self.connection = None
        self.channel = None

        self.event_listeners = set([])

    def connect(self):
        if self.connecting:
            pika.log.info('PikaClient: Already connecting to RabbitMQ')
            return

        pika.log.info('PikaClient: Connecting to RabbitMQ')
        self.connecting = True

        cred = pika.PlainCredentials('guest', 'guest')
        param = pika.ConnectionParameters(
            host='localhost',
            port=5672,
            virtual_host='/',
            credentials=cred
        )

        self.connection = TornadoConnection(param,
            on_open_callback=self.on_connected)
        self.connection.add_on_close_callback(self.on_closed)

    def on_connected(self, connection):
        pika.log.info('PikaClient: connected to RabbitMQ')
        self.connected = True
        self.connection = connection
        self.connection.channel(self.on_channel_open)

    def on_channel_open(self, channel):
        pika.log.info('PikaClient: Channel open, Declaring exchange')
        self.channel = channel
        # declare exchanges, which in turn, declare
        # queues, and bind exchange to queues

    def on_closed(self, connection):
        pika.log.info('PikaClient: rabbit connection closed')
        self.io_loop.stop()

    def on_message(self, channel, method, header, body):
        pika.log.info('PikaClient: message received: %s' % body)
        self.notify_listeners(event_factory(body))

    def notify_listeners(self, event_obj):
        # here we assume the message the sourcing app
        # post to the message queue is in JSON format
        event_json = json.dumps(event_obj)

        for listener in self.event_listeners:
            listener.write_message(event_json)
            pika.log.info('PikaClient: notified %s' % repr(listener))

    def add_event_listener(self, listener):
        self.event_listeners.add(listener)
        pika.log.info('PikaClient: listener %s added' % repr(listener))

    def remove_event_listener(self, listener):
        try:
            self.event_listeners.remove(listener)
            pika.log.info('PikaClient: listener %s removed' % repr(listener))
        except KeyError:
            pass

I left out the queue setup code here for brevity. `on_message` callback is called when the consumer gets a message from the queue. The client, in turn, notifies all registered websocket handlers. Obviously, in real applications, you may want to do some kind of credentials and filtering, so the right message get to the right receiver. Then we simply call handler.write_message(), so the message gets relayed to the front-end’s websocket.onmessage callback.

Here’s some front-end code:

(function($){
    $(document).ready(function() {
        var ws = new WebSocket('ws://localhost:8888/ws');
        ws.onmessage = function(evt){
            alert(evt.data);
        }
    });
})(jQuery);

Yes, we simply echo the message back. For the hackoff, I did parse the data, render a slightly more detailed notification message, and display the notification using jquery-toaster.

Conclusion

This is my first stab at websocket and the tornado web framework. I’m not an expert on either subject, so chances are there are better ways to achieve the same result.

I think websocket is a very interesting technology. It opens a wide range of possibilities for more interactive and engaging web applications. Our web application is of traditional architecture: server renders most of the page, and every request involves page loads. Having a websocket may not be very beneficial as the application doesn’t have that much of user interaction. My hackoff is more of a proof of concept. However, if the application is a one-page web app (no full page reloads), the websocket model works very well.