distributed processing with python and rabbitmq
June 6, 2010 | by rob @ 7:27 pm | comments (0) | filed under [python,web]

Most software developer types are aware of “MapReduce”, made famous by Google. Basically, a large set of data is split into pieces and distributed over a network to multiple “workers”, who process in parallel. The processed data is then returned to a “reducer” who aggregates the results into a final dataset.

There are several implementations of this strategy; of which, Hadoop seems to be the most respected in the open source world. Yahoo has fully embraced the project.

However, Hadoop does have some caveats. Configuring a cluster requires editing a multitude of XML files. Hadoop filesystem has to be installed. The master node is not fault tolerant. Typically the “slave” node locations are stored in file, though new ones can be added dynamically. Hadoop also requires jobs to be written in languages that will run on the JVM.

When evaluating Hadoop, we decided it was a little too complex for our first proof-of-concept for a client. Plus we want to use tools like pypy with our Python application to boost performance. But what was an alternative? Disco? Seemed reasonable. Unfortunately, messages had to be pushed via SSH to “workers”, making them difficult to organize within firewalls. And additional workers have to be added manually through the web interface.

Sometimes square pegs just won’t fit into round holes. So, what technology was round enough for our needs? We knew distributing tasks was naturally solved by queues; one went in, another went out, until the queue was empty. In the networked world, message queues can spread job information across machines and geography. AMQP allows advanced routing of these messages through exchanges and is widely supported by a multitude of languages. Message queuing systems are also designed to be highly fault tolerant and distributed.

And it worked pretty well. We used py-amqplib as a client attached to rabbitmq with some fairly basic code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from amqplib import client_0_8 as amqp
from dobie.extract import configval
import logging
import uuid
 
log = logging.getLogger(__name__)
 
class Connection(object):
    '''basic connection to mq server'''
 
    def __init__(self):
        self.host = configval("host")
        self.user_id = configval("user_id")
        self.password = configval("password")
        self.port = configval("port")
        self.connect()
 
    def connect(self):
        self.connection = amqp.Connection(
            host='%s:%s' % (self.host, self.port),
            userid=self.user_id, password=self.password
        )
 
class Base(object):
    '''shared code for publisher and consumer'''
 
    def __init__(self, queue, connection=None):
        self.connection = connection or Connection()
        self.channel = self.connection.connection.channel()
        self.ex = configval("exchange")
        self.queue = queue
        params = (self.queue, self.ex, self.queue)
        log.debug("queue [%s], exchange [%s], routing [%s]" % params)
 
    def setup(self):
        '''configure queue and exchange'''
        self.channel.queue_declare(
            queue=self.queue, durable=True, exclusive=False, auto_delete=False
        )
        self.channel.exchange_declare(
            exchange=self.ex, type='direct', durable=True, auto_delete=False
        )
        self.channel.queue_bind(
            queue=self.queue, exchange=self.ex, routing_key=self.queue
        )
        return self
 
    def close(self):
        '''close connections'''
        # TODO consider pooling
        self.channel.close()
        self.connection.connection.close()
 
    def __enter__(self):
        return self.setup()
 
    def __exit__(self, _type, value, traceback):
        self.close()
 
class Consumer(Base):
    '''consumer of messages'''
 
    def __init__(self, queue, callback, connection=None):
        Base.__init__(self, queue, connection)
        self.callback = callback
        self.cid = str(uuid.uuid4())
 
    def __enter__(self):
        def _callback(message):
            self.callback(self, message)
        Base.__enter__(self)
        self.channel.basic_consume(
            queue=self.queue, callback=_callback, consumer_tag=self.cid
        )
        return self
 
    def wait(self, count=-1):
        '''wait for messages. if count is set, only consume a limited number
           before exiting'''
        while count != 0:
            self.channel.wait()
            if count > 0: count = count - 1
 
class Publisher(Base):
    '''publisher of messages'''
 
    def __init__(self, queue, connection=None):
        Base.__init__(self, queue, connection)
        self.delivery_mode = int(configval("delivery_mode"))
 
    def publish(self, message):
        message = amqp.Message(message)
        message.properties['delivery_mode'] = self.delivery_mode
        self.channel.basic_publish(
            message, exchange=self.ex, routing_key=self.queue
        )

There are some potential problems with this code. Connections aren’t pooled and it only uses a “direct” exchange. However, it suited our needs.

We relied on dictionaries to transfer data. Since we planned to stick with python, we pickled them before sending them over the line, which rabbitmq was happy to accept.

There is one problem which we have yet to resolve: if a client is connected and processing messages, newly connected clients will only receive messages that are added later. Will post a fix if we figure it out.

no comments »