An alternative to threading for multiprocessing: faster, fewer resources, fewer synchronization problems
The system requirements for an asynchronous programming environment are the ability to do non-blocking IO, which means that functions like read and write will not block until data is available; and the ability to monitor multiple IO channels for activity simultaneously, usually via the select or poll syscall.
Basic asynchronous style implements the idea of starting an operation: when a user clicks this button, call this function. But it breaks down when that function needs to do something more complex than drawing something onscreen - performing a database query, for example.
We need something that can model an ongoing process without blocking. Sort of a deferred function return.
Twisted is not magic
Twisted is not magic. Python continues to operate exactly the way you already know it does. It has an efficient and portable event loop, along with a huge suite of utilities, protocols, interfaces, and so on -- and I won't be talking about those here.
Deferred: a result that's not ready yet
# see example1.py def print_user_info(user): d = mydb.get_user(user) def got_info(res): print "name:", res.name d.addCallback(got_info)
This is a very basic model of deferreds, but it handles the notion of ongoing processes nicely.
This particular style, using nested functions, is my favorite, but lots of people use separate (un-nested) functions instead.
multiple callbacks are called in sequence
# see example2.py def get_user_name(user): d = mydb.get_user(user) def extract_name(res): if res: return res.name return "(unknown)" d.addCallback(extract_name) return d def print_user_name(user): d = get_user_name(user) def print_name(name): print "Name:", name d.addCallback(print_name)
Here two callbacks are added to the same deferred. This is a common pattern: get a deferred, add a callback to do your thing, and return the same deferred.
Note that the deferred is initially fired with a User object. The extract_name callback returns a string, which is passed to the next callback in the chain, print_user_name.
This is where API docs are useful: mydb.get_user returns a Deferred that fires with a user object. get_user_name returns a Deferred that fires with the name of the user, or "(unknown)".
What about error handling? What if mydb.get_user raises an exception?
A deferred operation can fail, in which case the errback is called
# see example3.py def print_page_title(url): d = get_page_title(url) def got_info(title): # callback print "title:", title def fail_info(f): # errback print "failed!", f.type, f.value # (consume the error) d.addCallbacks(got_info, fail_info)
If get_page_title succeeds, then fail_info is never called. However, if the operation fails, then got_info is never called. Instead, fail_info is called wth a Failure object. Failures wrap Exception objects.
Multiple Callback and Errbacks
This is the "full grid" of callbacks and errbacks. Execution can, in principal, switch back and forth from callbacks to errbacks several times for a single deferred.
def get_contacts(username): d = im_service.get_contact_list(username) def cb(contacts): return [ ctct.username for ctct in contacts ] def eb(f): print "err getting contacts for %s: %s" \ % (username, f.value) return f d.addCallbacks(cb, eb) return d def ajax_get_contacts(request): d = get_contacts(request.query['username'][0]) # error checking!? def cb(ctctnames): return json.dumps({'contacts' : ctctnames}) def eb(f): return json.dumps({'error' : str(f.value)}) d.addCallbacks(cb, eb) return d
This is a somewhat overwrought example of a full set of callbacks and errbacks. Note that, because it just logs the error, the errback for get_contacts always returns the failure, passing it along to ajax_get_contacts' errback. This is what I mean by a "nonterminal" errback.
However, the errback in ajax_get_contacts *is* terminal. Presumably this function's deferred is expected to fire with a JSON string to hand back to the browser, and we want to handle error reporting with JavaScript and not with the usual browser mechanisms. So the failure is "eaten" here, and the web framework's callback, not its errback, will be called next.
However, an exception in the callback for get_contacts (maybe a contact object without a username attribute?) will cause the errback in ajax_get_contacts to be called and it will report the error correctly. Note that the errback for get_contacts is not called in this case!
There's still a bug here: if no username is specified, then ajax_get_contacts will raise an exception that probably won't be handled very well by the web framework (it will probably create a lot of HTML that will not parse as JSON).
def get_contacts(username): d = im_service.get_contact_list(username) def cb(contacts): return [ ctct.username for ctct in contacts ] d.addCallback(cb) return d def ajax_get_contacts(request): d = get_contacts(request.query['username'][0]) # BUG def cb(ctctnames): return json.dumps({'contacts' : ctctnames}) def eb(f): return json.dumps({'error' : str(f.value)}) d.addCallbacks(cb, eb) return d
This example is slightly simpler. Any error in im_service.get_contact_list will go directly to ajax_get_contacts' errback method, which will report it to the JavaScript in the browser correctly.
There's still a bug here: if no username is specified, then ajax_get_contacts will raise an exception. The web frameworkd is probably prepared to handle a failure, but not an exception.
from twisted.internet import defer def ajax_get_contacts(request): try: username = request.query['username'][0] except KeyError: return defer.succeed( json.dumps({'error' : 'no username specified'})) d = get_contacts(username) def cb(ctctnames): return json.dumps({'contacts' : ctctnames}) def eb(f): return json.dumps({'error' : str(f.value)}) d.addCallbacks(cb, eb) return d
This is one way to solve the problem: shortcut the whole contact lookup, and return an already-fired Deferred object to the web framework.
from twisted.internet import defer def ajax_get_contacts(request): try: username = request.query['username'][0] except KeyError: return defer.fail() # use current exception d = get_contacts(username) def cb(ctctnames): return json.dumps({'contacts' : ctctnames}) def eb(f): return json.dumps({'error' : str(f.value)}) d.addCallbacks(cb, eb) return d
Here, we return a failure automatically built from the KeyError to the web framework
from twisted.internet import defer def get_contacts(username): d = im_service.get_contact_list(username) def cb(contacts): return [ ctct.username for ctct in contacts ] d.addCallback(cb) return d def ajax_get_contacts(request): d = defer.succeed(None) def get_username(_): # ignore argument return request.query['username'][0] d.addCallback(get_username) d.addCallback(get_contacts) # this returns a deferred! def json_cb(ctctnames): return json.dumps({'contacts' : ctctnames}) def json_eb(f): return json.dumps({'error' : str(f.value)}) d.addCallbacks(json_cb, json_eb) return d
This moves the entire operation into the callback chain, by starting with an already-fired callback, and getting the username in the first callback. Callbacks always take an argument, and _ is a good way to indicate "I'm ignoring this argument".
get_username returns the username, which is then handed as a single argument to the next callback.
That callback is get_username which coincidentally takes a single arugment. However, it returns a deferred. Why does json_cb not get called with a deferred? Twisted automatically "nests" deferreds -- it basically adds the callbacks and errbacks from the inner deferred to the outer deferred. What actually happens is much more complex, since more callbacks and errbacks could be added to that inner deferred later, but let's not think about that right now.
# add a callback d.addCallback(cb, *args, **kwargs) # add an errback d.addErrback(cb, *args, **kwargs) # add one of each (side by side) d.addCallbacks(cb, eb) # add the same callable as callback and errback d.addBoth(callable) # fire with success d.callback(result) # fire with a failure from the current exception d.errback() # fire with a given failure d.errback(f) # new, fired Deferred d = defer.succeed(result) # new, failed Deferred d = defer.fail(f=current_exception)
Deferreds are conceptually fairly simple, although there are some gotchas.
Note that we haven't looked at direct asynchronous IO -- sockets, pipes, etc. In fact, Twisted generally uses function or method calls to handle those, e.g., lineReceived for the LineReceiver class.
sync | async |
---|---|
function call | return deferred |
sequential execution | d.addCallback() and return |
function return | d.callback() |
raise exception | d.errback() |
new thread | call deferred func & don't return |
join thread | add callback to thread's deferred |
With some careful programming, deferreds can model both sequential execution and threaded execution. These "threads" are lightweight and can be created and destroyed easily, and of course synchronization requirements are minimal.
Not following careful programming can lead to buggy, confusing spaghetti code. In particular, think about starting a new "thread" vs. sequential execution.
from twisted.internet.defer import DeferredList def parallelLookup(usernames): dl = [] for username in usernames: dl.append(get_user_info(username)) dl = DeferredList(dl, consumeErrors=True) def process(results): rv = {} for username, result in zip(usernames, results): if results[0]: rv[username] = results[1] else: rv[username] = None return rv dl.addCallback(process) return dl
This calls get_user_info for each username, and then bundles all of the resulting Deferreds into a single DeferredList -- in a fashion, it starts a bunch of threads and then joins them. DeferredList's behavior can be customized easily; in this case, it fires when all of its constituent deferreds are complete (fired or failed). It ignores errors internally (consumeErrors), and passes back a tuple (success, result_or_failed) for each deferred.
from twisted.python import defer from twisted.internet import reactor def countDown(N): d = defer.Deferred() def count(n): print n if n > 0: defer.callLater(1, count, n-1) else: d.callback(None) count(N) return d
Note that callLater does not use a Deferred -- just a callable. The countDown function is on example of how a function can return a deferred and schedule it to fire later.
from twisted.python import defer from twisted.internet.task import LoopingCall def wait_for_file(filename, poll_interval = 1): d = defer.Deferred() def poll(filename): if os.path.exists(filename): l.stop() d.callback(None) l = LoopingCall(poll) l.start(poll_interval) return d
LoopingCall calls its method on a schedule, specified to the start method. It is often used to poll for changes, e.g., in the filesystem.
If you feel like you need these, then there's probably a design error somewhere.
Not covered in this tutorial, but you should learn about:
A word of warning about Twisted: outside of the Twisted core, there are many abandoned projects. Usually the older projects, labeled "deprecated, use $newstuff", are the projects you want to use. The $newstuff was often never finished, or at least never documented. For example, Axiom replaces Enterprise, but Axiom is not ready yet.
Stick to the core, and do not be attracted by shiny things which sound like they might be useful. If in doubt, ask other Twisted programmers who did not work for DivMod.
Starting a new application in Twisted is hard and not recommended. For practice, consider working on Buildbot's unit tests
$ git clone git://github.com/djmitche/buildbot.git $ virtualenv sandbox $ source sandbox/bin/activate (sandbox)$ python setup.py develop (sandbox)$ trial buildbot.test(you can also fork the project on github if your git-fu is strong enough)