Writing Safe Twisted Code

Writing safe Twisted code has recently become a large focus of my job. Therefore, I have been doing a bit of thinking on the subject. I concluded that I would write a small post describing some of the considerations required when attempting to write reliable code with Twisted.

However, I wasn't sure how much Twisted knowledge to assume and it wasn't quite clear to me how to frame the post. So instead this post will try to evolve a minimal example to the point where the considerations I originally intended to demonstrate reveal themselves naturally. To get there, I will try to attempt to emulate some of the intuitional thinking a newer Twisted developer might progress through to reach the end result. At a minimum, you should at least know what a Deferred is. If you don't here is the official material on them. Before second guessing yourself though, yes they are just objects you attach callbacks to.

The task at hand

In this article we're going to iterate on a small script which will attempt to demonstrate some basic Twisted patterns by implementing a data pipeline with distinct asynchronous steps. In the real world this might be reflected by fetching some data from Service A, sending it to Service B for processing and finally sending it to Service C for persistence.

In our implementation, instead of using actual network services, we will emulate them by leaning on local Unix utilities available on our system. The premise is the same since talking to a network service and a local subprocess, while different, both rely on some deferred operation that we must handle asynchronously.

Our pipeline

The pipeline we will implement will essentially be the equivalent to the following on the commandline:

head -c 1024 /dev/urandom | wc -w | tee /tmp/result.txt

However, instead of utilizing pipes on the commandline, we will run each program individually with Twisted, taking its output and writing it to the standard input of the next program. While we work, try to keep the equivalency to network services in mind.

Talking to subprocesses

There are a few ways to spawn and communicate with subprocesses with Twisted including some nice helper functions located in twisted.internet.utils Each return a Deferred which fires with some aspect of the result of running the subprocess:

The problem with using these in our example is that none of these utility functions provide a mechanism for writing to the standard input of our subprocesses. Therefore, we will have to utilize spawnProcess which is a method of the reactor.

Here is the (abridged) signature as listed in the documentation:

spawnProcess(processProtocol, executable, args=())

The only parameter here that likely needs explanation is the processProtocol which the documentation describes thusly:

An object which will be notified of all events related to the created process. (type: IProcessProtocol provider)

Now you may not be familiar with writing process protocols but I assure it is quite simple. If you'd like a little background on how they are written and used you can read through the official "How To" on using processes. That said, a ProcessProtocol is merely a class who's methods are called when various events take place like when we connect to the process, or say, when the process writes to its standard output.

Building a pipe

We want to interact with all three of our processes in essentially the same way. We want to write some data to their standard input, wait until they are done executing, and consume anything they write to their standard output. When the process is finished, we'd like to be called back with the standard output.

We'll start by defining and writing the initializer for a new ProtocolProcess subclass we'll call Pipe:

class Pipe(protocol.ProcessProtocol):
    def __init__(self, d, input=None):
        self.deferred = d
        self.input = input
        self.output = ""

Pipe's initializer takes a Deferred instance and optionally some data for writing to a process' standard input. We save them to the instance and create a placeholder for the process' output.

Next we'll handle our connection to the process with the connectionMade method:

def connectionMade(self):
    if self.input:
        self.transport.write(self.input)
    self.transport.closeStdin()

As soon as the process starts we will use the transport of the protocol to write to its standard input if the Pipe has any to write. Then we close the standard input stream whether we wrote any data or not.

def outReceived(self, data):
    self.output += data

When the process writes to its standard output outReceived will be called with contents and here we're merely accumulating it.

class Pipe(protocol.ProcessProtocol):
    def __init__(self, d, input=None):
        self.deferred = d
        self.input = input
        self.output = ""

    def connectionMade(self):
        if self.input:
            self.transport.write(self.input)
        self.transport.closeStdin()

    def outReceived(self, data):
        self.output += data

    def processEnded(self, reason):
        self.deferred.callback(self.output)

With processEnded we complete our protocol by handling the termination of the remote process by firing the original Deferred instance with the contents of the process output.

With the ProcessProtocol completed let's give ourselves an easy to use helper that will return a Deferred which will fire after a specified process spins up, receives our input, and we've captured its output.

def make_pipe(cmd, *args, **kwargs):
    '''create a deferred that fires with the output of the process'''
    d = defer.Deferred()
    pipe = Pipe(d, kwargs.get('stdin'))
    args = [cmd] + list(args)
    reactor.spawnProcess(pipe, cmd, args)
    return d

Our helper takes the name of a command and any number of arguments. Optionally you can specify data with stdin to be written to the process' input.

After creating the Deferred instance that we eventually return, we pass it to a newly created instance of our ProcessProtocol class Pipe. We then finally call reactor.spawnProcess which schedules the spinning up of the process. Once the process has started our Pipe protocol will write to the standard input and suck up any output from the process. When the process terminates it will fire the Deferred we passed it. The same Deferred that is returned to the caller of this function.

With a simple way to start processes we can now return to the actual task at hand.

Generating data

The first step in our pipeline is to generate some data, the same way we might fetch some data from a remote webservice. In this case we'll simply read some bytes from /dev/urandom with the head utility. With our make_pipe helper this is a sinch.

def get_data(n):
    '''emulate network retreival of some data'''
    return make_pipe('head', '-c', str(n), '/dev/urandom')

get_data simply utilizes make_pipe to create a Deferred that will fire in precisely the way we wish it to. get_data does nothing more and simple returns this Deferred.

We'll then need a "callback function" for attaching to Deferred instances created by get_data which fire with the output of the process. We could print the output to the screen however since the data is random it will likely just output a bunch of garbage potentially putting our terminal into an invalid state. Instead we'll simply split the data on whitespace characters and print out how many 'words' there are:

def result(output):
    '''take some data and print out how many words there are'''
    print len(output.split())

Now we have everything we need to schedule the work and handle its result:

d = get_data(1024)
d.addCallback(result)

We simply take the Deferred instance and attach result as the callback. Since we don't want our program's reactor to keep running once our work is finished, we'll attach yet another callback which stops the reactor. This will allow our program to cleanly shutdown once the work is complete. We'll use a lambda to simply call reactor.stop:

d.addCallback(lambda _: reactor.stop())

If you didn't already know, addCallback will actually return the original Deferred instance. This means you can actually chain these together into something that looks like this:

# get some data
(get_data(1024)
    # then process the data
    .addCallback(result)
    # then stop the reactor
    .addCallback(lambda _: reactor.stop()))

You can decide for yourself which style to use. And with that the work is scheduled. We just need to start the reactor and kick it all off and our script is complete:

from twisted.internet import defer, task, reactor, protocol

class Pipe(protocol.ProcessProtocol):
    def __init__(self, d, input=None):
        self.deferred = d
        self.input = input
        self.output = ""

    def connectionMade(self):
        if self.input:
            self.transport.write(self.input)
        self.transport.closeStdin()

    def outReceived(self, data):
        self.output += data

    def processEnded(self, reason):
        self.deferred.callback(self.output)

def make_pipe(cmd, *args, **kwargs):
    '''create a deferred that fires with the output of the process'''
    d = defer.Deferred()
    pipe = Pipe(d, kwargs.get('stdin'))
    args = [cmd] + list(args)
    reactor.spawnProcess(pipe, cmd, args)
    return d

def get_data(n):
    '''emulate network retreival of some data'''
    return make_pipe('head', '-c', str(n), '/dev/urandom')

def result(output):
    '''take some data and print out how many words there are'''
    print len(output.split())

# get some data
(get_data(1024)
    # then process the data
    .addCallback(result)
    # then stop the reactdor
    .addCallback(lambda _: reactor.stop()))

reactor.run()

Here is what we get when when we run the script a few times:

In the next part we'll finish implementing the rest of our pipeline by adding support for wc and tee. In the third part we'll take a look at how to add exception handling in the case of failure.