WSGI, Twisted and Server Sent Events

Old-school web applications were easy to create. Big powerful frameworks like Django give you a lot of tools you can leverage. One weak point of all those WSGI framework is that they didn't integrate well with anything that broke outside the usual request-response cycle.

The usual approach nowadays is to use WebSockets for real-time communication between browser clients and web servers. The usual way to do that would be to use a server capable of handling many concurrent connections, and use a message bus from the WSGI app to communicate to that service. That is a lot of moving parts.

In my use case where I build a lot of intranet applications, deploying and maintaining all this infrastructure is a very big burden, so the result is usually to not even explore this kind of functionality.

However, given that I deploy on Twisted, I wanted to explore what kind of cool things I could build on it.

Enter SSE

Server-Sent Events aren't that new - they have just been shadowed by WebSockets. They are a simple data format that is send from the server to the client via a plain HTTP connection. The Javascript API is quite simple, and it even handles retries for you. It's compatible with a lot of recent browsers, but I haven't really done a lot of research on it.

Sample Code

Here is a very simple WSGI app (using bottle.py). It just has a form and a form POST handler.

from bottle import Bottle, template, request, run
app = Bottle()
@app.route('/hello/')
def greet():
return template('''
<html>
<body>
Please introduce yourself:
<form action="/knock" method="POST">
<input type="text" name="name" />
<input type="submit" />
</body>
</html>''', name=name)
@app.post('/knock')
def knock():
name = request.forms.get('name')
# this is the only new code added in our wsgi app
app.broadcast_message("{} knocked".format(name))
return template("<p>{{ name }} Knocked!</p>", name=name)
wsgi_app = app
if __name__ == '__main__':
run(app, host='localhost', port=8005)
view raw app.py hosted with ❤ by GitHub

And a basic twisted server to run it:

# crochet allows non-twisted apps to call twisted code
import crochet
crochet.no_setup()
from twisted.application import internet, service
from twisted.web import server, wsgi, static, resource
from twisted.internet import reactor
from twisted.python import threadpool
# boilerplate to get any WSGI app running under twisted
class WsgiRoot(resource.Resource):
def __init__(self, wsgi_resource):
resource.Resource.__init__(self)
self.wsgi_resource = wsgi_resource
def getChild(self, path, request):
path0 = request.prepath.pop(0)
request.postpath.insert(0, path0)
return self.wsgi_resource
# create a twisted.web resource from a WSGI app.
def get_wsgi_resource(wsgi_app):
pool = threadpool.ThreadPool()
pool.start()
# Allow Ctrl-C to get you out cleanly:
reactor.addSystemEventTrigger('after', 'shutdown', pool.stop)
wsgi_resource = wsgi.WSGIResource(reactor, pool, wsgi_app)
return wsgi_resource
def start():
# create an SSE resource that is effectively a singleton
from sse import SSEResource
sse_resource = SSEResource()
# attach its "broadcast_message" function to the WSGI app
from app import wsgi_app
wsgi_app.broadcast_message = sse_resource.broadcast_message_async
# serve everything together
root = WsgiRoot(get_wsgi_resource(wsgi_app)) # WSGI is the root
root.putChild("index.html", static.File("index.html")) # serve a static file
root.putChild("sse", sse_resource) # serve the SSE handler
main_site = server.Site(root)
server = internet.TCPServer(8005, main_site)
application = service.Application("twisted_wsgi_sse_integration")
server.setServiceParent(application)
return application
application = start()
# run this using twistd -ny server.py
view raw server.py hosted with ❤ by GitHub

And a SSE-savvy twisted.web resource:

import crochet
crochet.setup()
from twisted.web import resource, server
import random
from datetime import datetime
import json
def _format_sse(msg, event=None):
data = []
if event is not None:
data.append("event: {}\n".format(event))
for line in msg.splitlines():
data.append("data: {}\n".format(line))
data.append("\n")
return "".join(data)
class SSEResource(resource.Resource):
def __init__(self):
resource.Resource.__init__(self)
self._listeners = []
def add_listener(self, request):
print "listener connected", request
self._listeners.append(request)
request.notifyFinish().addBoth(self.remove_listener, request)
def remove_listener(self, reason, listener):
print "listener disconnected", listener, "reason", reason
self._listeners.remove(listener)
@crochet.run_in_reactor
def broadcast_message(self, msg, event=None):
self.broadcast_message_async(msg, event)
def broadcast_message_async(self, msg, event=None):
sse = _format_sse(msg, event)
for listener in self._listeners:
listener.write(sse)
def render_GET(self, request):
request.setHeader("Content-Type", "text/event-stream")
self.add_listener(request)
return server.NOT_DONE_YET
view raw sse.py hosted with ❤ by GitHub

And a very simple index.html

<html>
<head>
<script type="text/javascript">
var evtSource = new EventSource("/sse");
evtSource.onmessage = function(e) {
// onmessage is the generic handler
var eventList = document.getElementById("eventlist");
var newElement = document.createElement("li");
newElement.innerHTML = "message: " + e.data;
eventList.appendChild(newElement);
}
evtSource.addEventListener("ping", function(e) {
// addEventListener can be used for fine-tuning
var eventList = document.getElementById("eventlist");
var newElement = document.createElement("li");
var obj = JSON.parse(e.data);
newElement.innerHTML = "ping at " + obj.time;
eventList.appendChild(newElement);
});
</script>
</head>
<body>
<h1>Twisted WSGI Integration</h1>
<ol id="eventlist">
</ol>
</body>
</html>
view raw index.html hosted with ❤ by GitHub

How it works

The WSGI app just calls some Python code. Through crochet we ensure that it gets back a useful result (though in this case, we just throw it away). We use a plain POST to send data to the server. Converting that to an AJAX request is left as an exercise to the reader. The SSE handler is a singleton that keeps track of all the listeners that are connected to it, and broadcasts messages to it.

Does it scale?

It should! I have no experience running Twisted Web under heavy load but it's more than enough for intranet-style apps (even when I have 50 machines hitting some API endpoints quite frequently). If someone wants to run some testing, please get in touch.

What's next?

I would like to make this a bit more reusable, with some better discovery than the current "inject a global function into the namespace". Also, Django integration is something I'd like to investigate. And why not try if the same approach can be extended to web sockets as well?