distributed processing with python and rabbitmq
June 6, 2010 | by rob @ 7:27 pm | comments (0) | filed under [python,web]

Most software developer types are aware of “MapReduce”, made famous by Google. Basically, a large set of data is split into pieces and distributed over a network to multiple “workers”, who process in parallel. The processed data is then returned to a “reducer” who aggregates the results into a final dataset.

There are several implementations of this strategy; of which, Hadoop seems to be the most respected in the open source world. Yahoo has fully embraced the project.

However, Hadoop does have some caveats. Configuring a cluster requires editing a multitude of XML files. Hadoop filesystem has to be installed. The master node is not fault tolerant. Typically the “slave” node locations are stored in file, though new ones can be added dynamically. Hadoop also requires jobs to be written in languages that will run on the JVM.

When evaluating Hadoop, we decided it was a little too complex for our first proof-of-concept for a client. Plus we want to use tools like pypy with our Python application to boost performance. But what was an alternative? Disco? Seemed reasonable. Unfortunately, messages had to be pushed via SSH to “workers”, making them difficult to organize within firewalls. And additional workers have to be added manually through the web interface.

Sometimes square pegs just won’t fit into round holes. So, what technology was round enough for our needs? We knew distributing tasks was naturally solved by queues; one went in, another went out, until the queue was empty. In the networked world, message queues can spread job information across machines and geography. AMQP allows advanced routing of these messages through exchanges and is widely supported by a multitude of languages. Message queuing systems are also designed to be highly fault tolerant and distributed.

And it worked pretty well. We used py-amqplib as a client attached to rabbitmq with some fairly basic code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from amqplib import client_0_8 as amqp
from dobie.extract import configval
import logging
import uuid
 
log = logging.getLogger(__name__)
 
class Connection(object):
    '''basic connection to mq server'''
 
    def __init__(self):
        self.host = configval("host")
        self.user_id = configval("user_id")
        self.password = configval("password")
        self.port = configval("port")
        self.connect()
 
    def connect(self):
        self.connection = amqp.Connection(
            host='%s:%s' % (self.host, self.port),
            userid=self.user_id, password=self.password
        )
 
class Base(object):
    '''shared code for publisher and consumer'''
 
    def __init__(self, queue, connection=None):
        self.connection = connection or Connection()
        self.channel = self.connection.connection.channel()
        self.ex = configval("exchange")
        self.queue = queue
        params = (self.queue, self.ex, self.queue)
        log.debug("queue [%s], exchange [%s], routing [%s]" % params)
 
    def setup(self):
        '''configure queue and exchange'''
        self.channel.queue_declare(
            queue=self.queue, durable=True, exclusive=False, auto_delete=False
        )
        self.channel.exchange_declare(
            exchange=self.ex, type='direct', durable=True, auto_delete=False
        )
        self.channel.queue_bind(
            queue=self.queue, exchange=self.ex, routing_key=self.queue
        )
        return self
 
    def close(self):
        '''close connections'''
        # TODO consider pooling
        self.channel.close()
        self.connection.connection.close()
 
    def __enter__(self):
        return self.setup()
 
    def __exit__(self, _type, value, traceback):
        self.close()
 
class Consumer(Base):
    '''consumer of messages'''
 
    def __init__(self, queue, callback, connection=None):
        Base.__init__(self, queue, connection)
        self.callback = callback
        self.cid = str(uuid.uuid4())
 
    def __enter__(self):
        def _callback(message):
            self.callback(self, message)
        Base.__enter__(self)
        self.channel.basic_consume(
            queue=self.queue, callback=_callback, consumer_tag=self.cid
        )
        return self
 
    def wait(self, count=-1):
        '''wait for messages. if count is set, only consume a limited number
           before exiting'''
        while count != 0:
            self.channel.wait()
            if count > 0: count = count - 1
 
class Publisher(Base):
    '''publisher of messages'''
 
    def __init__(self, queue, connection=None):
        Base.__init__(self, queue, connection)
        self.delivery_mode = int(configval("delivery_mode"))
 
    def publish(self, message):
        message = amqp.Message(message)
        message.properties['delivery_mode'] = self.delivery_mode
        self.channel.basic_publish(
            message, exchange=self.ex, routing_key=self.queue
        )

There are some potential problems with this code. Connections aren’t pooled and it only uses a “direct” exchange. However, it suited our needs.

We relied on dictionaries to transfer data. Since we planned to stick with python, we pickled them before sending them over the line, which rabbitmq was happy to accept.

There is one problem which we have yet to resolve: if a client is connected and processing messages, newly connected clients will only receive messages that are added later. Will post a fix if we figure it out.

the disconnect between complex web apps and HTTP
May 14, 2010 | by rob @ 2:35 pm | comments (0) | filed under [web]

fundamental disconnect

Any web application that approaches moderate complexity almost always stumbles across a data transmission problem. Modeling relationships that need to be stored back at the server, such as parent-child or groups, with key-value pairs is not a natural fit. Anyone who worked with struts in the early part of the decade is very aware of this problem.

On the other hand, the DOM and more generally XML feature a natural method of designating these associations by embedding tags. For instance, in HTML one could code:

1
2
3
4
5
6
<div id="parent">
  <span class="name">jerry</span>
  <div class="child">
    <input class="name" value="joey"></input>
  </div>
</div>

which indicates “joey” is the child of “jerry”. Now imagine trying to transmit this relationship with HTTP parameters. This might work:

parent=jerry&jerrychild=joey

but deciphering such a statement on the receiving end would be complex and fragile. A document database like CouchDB might fit the task, but then you would have to create javascript code to generate JSON from the child elements. In other words, double code both the page generation and the javascript to “serialize” objects.

real world experience

At a recent client, we faced this problem in a dramatic fashion. The group was comprised mostly of front-end experts, who took XML data from a custom CMS and transformed it with XSL to produce their final content. They couldn’t write the server code to store the submitted data back into the database. And they wanted to create very complex relationships in their data models.

modeling data in the DOM

To mitigate this skill-set disconnect, we created a javascript library in jQuery to generate an XML document from embedded tags in the DOM. This library iteratively loops through a specified element and it’s children, searching for specific tags. If an element has the class “complex”, a tag is generated with the value from the “id” or “name” attribute before it’s children are processed. If an element has a “primitive” class, a simple element is generated from it’s “id” or “name” attribute and value. An attribute is created by specifying an “id” or “name” attribute with an “att” tag.

Following our earlier example:

1
2
3
4
5
6
7
<div id="parent" class="complex">
  <span name="name" class="primitive">jerry</span>
  <div name="child" class="complex">
      <span name="birth-order" class="att">1</span>
      <input name="name" class="primitive" value="joey"/>
  </div>
</div>

would produce:

1
2
3
4
5
6
<parent>
  <name>jerry</name>
  <child birth-order="1">
    <name>joey</name>
  </child>
</parent>

This data can be submitted (via ajax) to the backend for processing, updating, validation, whatever. You can download library here:

http://github.com/greyrl/relationalxml/blob/master/js/jquery.dom.js

You will also need the custom jQuery “class” and “util” packages. After importing the libraries, you can generate XML Documents or strings:

1
2
3
4
// documents
var docs = $("#parent").xmlGen();
// strings
var docStrings = $("#parent").xmlGen().innerXML();

If you want to send that data back to the server, you should also consider Base64 encoding.

caveats

If the id attribute for a group cannot be unique, this library does require a user to bastardize the DOM by adding the unsupported (at least on most elements) “name” attribute. However, we feel it is a small price to pay for flexibility.

extracting data with selenium
April 26, 2010 | by rob @ 8:15 am | comments (0) | filed under [python,web]

If you’ve spent any time developing for the web as an independent contractor, you’ve probably run into a client who wants to programmatically extract data from a web site. Typically the site is a front end to a database, modified by search terms or configurations. You might also find this client needs to extract a variety of data from a multitude of paths. Selenium, traditionally used as a testing tool, can be a great asset in such a situation.

The “development” portion of Selenium is a Firefox plugin that records user clicks, paths, and form entries as the user interacts with the browser. Normally, this “recording” is converted to a programming language (by Selenium) and tweaked for automated, continuous testing during web development. However, generating code from a selenium script opens up the world of flexibility provided by the underlying programming language. For example, we could use loop constructs to continuously click on a “next” button while searching for a specific piece of data. This data could then be stored in a database or file, even sent to a web service.

Here is a very basic example of such functionality:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from selenium import selenium
import unittest, time, re
 
class sample(unittest.TestCase):
 
    def setUp(self):
        self.verificationErrors = []
        self.selenium = selenium("localhost", 4444, "*chrome", "http://your.site.here")
        self.selenium.start()
        with open("parts.txt") as f: self.parts = f.readlines()
 
    def testSample(self):
        sel = self.selenium
        for part in self.parts:
            sel.open("/database.html")
            sel.type("partnumber", part.strip())
            sel.click("go")
            sel.wait_for_page_to_load("30000")
            # look for a numeric pattern
            sel.click("link=regexp:[0-9]{7}")
            while 1:
                try: 
                    sel.wait_for_page_to_load("30000")
                    for i in range(1,8):
                        id = sel.get_table("//table[3].%s.1" % i)
                        status = sel.get_table("//table[3].%s.5" % i)
                        print "id %s, status %s " % (id, status)
                        if status == "READY": self._processHit(id)
                    sel.click("next")
                except Exception as e:
                    print "error or end of processing encountered"
                    #print e
                    break
 
    def _processHit(self, id):
        print "process %s" % id
        sel = self.selenium
        sel.click("link=%s" % id)
        sel.wait_for_page_to_load("30000")
        with open("result.txt", "a") as f:
            f.write("id %s, name [%s]\n" % (id, sel.get_table("//table[3].6.2")))
 
    def tearDown(self):
        self.selenium.stop()
        self.assertEqual([], self.verificationErrors)
 
if __name__ == "__main__":
    unittest.main()

Basically the order of operations is:

  • load the part numbers from a file
  • for each part, go to the “database” page and perform a part search
  • follow the id pattern from the regular expression in the search result
  • search through the tables in the result page, looking for an item that’s “READY”
  • follow a ready link to extract more detail and record information in the “result.txt” file
  • It probably isn’t a great idea to open and close the “result.txt” file for each result, but this is a pretty basic example.