Writing Safe Twisted Code1 year, 6 months ago
Writing safe Twisted code has recently become a large focus of my job. Therefore, I have been doing a bit of thinking on the subject. I concluded that I would write a small post describing some of the considerations required when attempting to write reliable code with Twisted.
However, I wasn't sure how much Twisted knowledge to assume and it wasn't quite clear to me how to frame the post. So instead this post will try to evolve a minimal example to the point where the considerations I originally intended to demonstrate reveal themselves naturally. To get there, I will try to attempt to emulate some of the intuitional thinking a newer Twisted developer might progress through to reach the end result. At a minimum, you should at least know what a
Deferred is. If you don't here is the official material on them. Before second guessing yourself though, yes they are just objects you attach callbacks to.
The task at hand
In this article we're going to iterate on a small script which will attempt to demonstrate some basic Twisted patterns by implementing a data pipeline with distinct asynchronous steps. In the real world this might be reflected by fetching some data from Service A, sending it to Service B for processing and finally sending it to Service C for persistence.
In our implementation, instead of using actual network services, we will emulate them by leaning on local Unix utilities available on our system. The premise is the same since talking to a network service and a local subprocess, while different, both rely on some deferred operation that we must handle asynchronously.
- Fetch data: utilize /dev/urandom to get some bytes
head -c 1024 /dev/urandom
- Process data: utilize wc to count the words
- Persist data: utilize tee to write the data to file
The pipeline we will implement will essentially be the equivalent to the following on the commandline:
head -c 1024 /dev/urandom | wc -w | tee /tmp/result.txt
However, instead of utilizing pipes on the commandline, we will run each program individually with Twisted, taking its output and writing it to the standard input of the next program. While we work, try to keep the equivalency to network services in mind.
Talking to subprocesses
There are a few ways to spawn and communicate with subprocesses with Twisted including some nice helper functions located in
twisted.internet.utils Each return a
Deferred which fires with some aspect of the result of running the subprocess:
getProcessValuefires with the exit code
getProcessOutputfires with whatever the process writes to standard output
getProcessValueAndOutputfires with both
The problem with using these in our example is that none of these utility functions provide a mechanism for writing to the standard input of our subprocesses. Therefore, we will have to utilize
spawnProcess which is a method of the reactor.
Here is the (abridged) signature as listed in the documentation:
spawnProcess(processProtocol, executable, args=())
The only parameter here that likely needs explanation is the
processProtocol which the documentation describes thusly:
An object which will be notified of all events related to the created process. (type: IProcessProtocol provider)
Now you may not be familiar with writing process protocols but I assure it is quite simple. If you'd like a little background on how they are written and used you can read through the official "How To" on using processes. That said, a
ProcessProtocol is merely a class who's methods are called when various events take place like when we connect to the process, or say, when the process writes to its standard output.
Building a pipe
We want to interact with all three of our processes in essentially the same way. We want to write some data to their standard input, wait until they are done executing, and consume anything they write to their standard output. When the process is finished, we'd like to be called back with the standard output.
We'll start by defining and writing the initializer for a new
ProtocolProcess subclass we'll call
class Pipe(protocol.ProcessProtocol): def __init__(self, d, input=None): self.deferred = d self.input = input self.output = ""
Pipe's initializer takes a
Deferred instance and optionally some data for writing to a process' standard input. We save them to the instance and create a placeholder for the process' output.
Next we'll handle our connection to the process with the
def connectionMade(self): if self.input: self.transport.write(self.input) self.transport.closeStdin()
As soon as the process starts we will use the
transport of the
protocol to write to its standard input if the
Pipe has any to write. Then we close the standard input stream whether we wrote any data or not.
def outReceived(self, data): self.output += data
When the process writes to its standard output
outReceived will be called with contents and here we're merely accumulating it.
class Pipe(protocol.ProcessProtocol): def __init__(self, d, input=None): self.deferred = d self.input = input self.output = "" def connectionMade(self): if self.input: self.transport.write(self.input) self.transport.closeStdin() def outReceived(self, data): self.output += data def processEnded(self, reason): self.deferred.callback(self.output)
processEnded we complete our protocol by handling the termination of the remote process by firing the original
Deferred instance with the contents of the process output.
ProcessProtocol completed let's give ourselves an easy to use helper that will return a
Deferred which will fire after a specified process spins up, receives our input, and we've captured its output.
def make_pipe(cmd, *args, **kwargs): '''create a deferred that fires with the output of the process''' d = defer.Deferred() pipe = Pipe(d, kwargs.get('stdin')) args = [cmd] + list(args) reactor.spawnProcess(pipe, cmd, args) return d
Our helper takes the name of a command and any number of arguments. Optionally you can specify data with
stdin to be written to the process' input.
After creating the
Deferred instance that we eventually return, we pass it to a newly created instance of our
Pipe. We then finally call
reactor.spawnProcess which schedules the spinning up of the process. Once the process has started our
Pipe protocol will write to the standard input and suck up any output from the process. When the process terminates it will fire the
Deferred we passed it. The same
Deferred that is returned to the caller of this function.
With a simple way to start processes we can now return to the actual task at hand.
The first step in our pipeline is to generate some data, the same way we might fetch some data from a remote webservice. In this case we'll simply read some bytes from
/dev/urandom with the
head utility. With our
make_pipe helper this is a sinch.
def get_data(n): '''emulate network retreival of some data''' return make_pipe('head', '-c', str(n), '/dev/urandom')
get_data simply utilizes
make_pipe to create a
Deferred that will fire in precisely the way we wish it to.
get_data does nothing more and simple returns this
We'll then need a "callback function" for attaching to
Deferred instances created by
get_data which fire with the output of the process. We could print the output to the screen however since the data is random it will likely just output a bunch of garbage potentially putting our terminal into an invalid state. Instead we'll simply split the data on whitespace characters and print out how many 'words' there are:
def result(output): '''take some data and print out how many words there are''' print len(output.split())
Now we have everything we need to schedule the work and handle its result:
d = get_data(1024) d.addCallback(result)
We simply take the
Deferred instance and attach
result as the callback. Since we don't want our program's reactor to keep running once our work is finished, we'll attach yet another callback which stops the reactor. This will allow our program to cleanly shutdown once the work is complete. We'll use a lambda to simply call
d.addCallback(lambda _: reactor.stop())
If you didn't already know,
addCallback will actually return the original
Deferred instance. This means you can actually chain these together into something that looks like this:
# get some data (get_data(1024) # then process the data .addCallback(result) # then stop the reactor .addCallback(lambda _: reactor.stop()))
You can decide for yourself which style to use. And with that the work is scheduled. We just need to start the reactor and kick it all off and our script is complete:
from twisted.internet import defer, task, reactor, protocol class Pipe(protocol.ProcessProtocol): def __init__(self, d, input=None): self.deferred = d self.input = input self.output = "" def connectionMade(self): if self.input: self.transport.write(self.input) self.transport.closeStdin() def outReceived(self, data): self.output += data def processEnded(self, reason): self.deferred.callback(self.output) def make_pipe(cmd, *args, **kwargs): '''create a deferred that fires with the output of the process''' d = defer.Deferred() pipe = Pipe(d, kwargs.get('stdin')) args = [cmd] + list(args) reactor.spawnProcess(pipe, cmd, args) return d def get_data(n): '''emulate network retreival of some data''' return make_pipe('head', '-c', str(n), '/dev/urandom') def result(output): '''take some data and print out how many words there are''' print len(output.split()) # get some data (get_data(1024) # then process the data .addCallback(result) # then stop the reactdor .addCallback(lambda _: reactor.stop())) reactor.run()
Here is what we get when when we run the script a few times:
In the next part we'll finish implementing the rest of our pipeline by adding support for
tee. In the third part we'll take a look at how to add exception handling in the case of failure.