pylons dynamic xml/json web service
July 12, 2010 | by rob @ 6:49 pm | comments (0) | filed under [python]

While creating some REST style web services, I started to notice a pattern. Most of the controller actions generated dictionaries that were manually manipulated by separate mako templates. It finally made more sense to automatically generate JSON and XML from the structure of the dictionaries. Here are the methods used to generate JSON output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def jsonarray(array):
    '''create and encode a json array'''
    val = (jsonvalue(i) for i in array)
    return "[" + ",".join(val) + "]"
 
def jsondate(date):
    '''string format date'''
    return jsonencode(date.isoformat())
 
def jsondict(d):
    '''create and encode a json array'''
    res = lambda k,v : "%s : %s" % \
        (jsonencode(k), jsonvalue(v))
    res = ",".join((res(key, value) \
        for key, value in d.iteritems()))
    return "{%s}" % res
 
def jsonint(i): return str(i)
 
def jsonsimple(i): return jsonencode(str(i))
 
_jsonhandlers = {
    datetime : jsondate,
    dict : jsondict,
    int : jsonint,
    list : jsonarray,
    tuple : jsonarray,
}
 
def jsonvalue(value):
    '''handle json value conversion'''
    return _jsonhandlers.get(type(value), jsonsimple)(value)

and the XML:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def xmlarray(array, key="result"):
    '''create and encode an xml array'''
    vals = "</%s><%s>" % (key, key)
    vals = vals.join((xmlvalue(i) for i in array))
    return "<%s>%s</%s>" % (key, vals, key)
 
def xmldate(date):
    '''string format date'''
    return date.isoformat()
 
def xmldict(d):
    '''create and encode an xml array'''
    res = lambda k,v : "<%s>%s</%s>" % (k, xmlvalue(v), k)
    res = "".join((res(key, value) for key, value in d.iteritems()))
    return res
 
def xmlsimple(i): return i
 
_xmlhandlers = {
    datetime : xmldate,
    dict : xmldict,
    list : xmlarray,
    tuple : xmlarray,
}
 
def xmlvalue(value):
    '''handle xml value conversion'''
    return _xmlhandlers.get(type(value), xmlsimple)(value)

Calling these methods is fairly straight forward in the templates. After determining the output type (I’ve been using the “Accept” header), here is a JSON example:

1
${h.jsondict(c.result) | n}

and XML:

1
<result>${h.xmldict(c.result) | n}</result>
simple cron-style batch scheduling
June 19, 2010 | by rob @ 7:22 am | comments (0) | filed under [python]

Some projects require background processes to run on a schedule, independent of the requesting client. For example, statistical data might need to be extracted nightly from a large dataset. There are multiple job scheduling packages well suited for the task, but they might be overkill for simple applications. Also, they are often locked into a particular programming language or platform.

As an avid unix user, the cron daemon has been an irreplaceable tool. The parameters are intuitive, with options to set the minute, hour, day of month, month, and day of week, that allow any pattern of scheduling. It is possible to build a similar simple scheduling system for an application; two methods add a schedule and retrieve current schedules. These examples rely on python and mongodb, but can be easily adjusted for a different language and a relational database.

Here is the scheduler with constant definitions for the field keys. I use the required “params” key as a dictionary holding the task configuration. This code of course doesn’t handle all cron options, such as ranges and step values, but should suit many tasks. If a value isn’t specified by a named parameter, it becomes a wildcard match, similar to cron:



1
2
3
4
5
6
7
_datefields = ["minute", "hour", "day", "month", "dayofweek"]
def schedule(user, params, minute="*", hour="*", day="*", month="*", dow="*"):
    '''add a schedule item to the database'''
    schedule = {Extract.USER : user, S.PARAMS : params}
    vals = [minute, hour, day, month, dow]
    for cnt, key in enumerate(_datefields): schedule[key] = vals[cnt]
    mongodb.schedules.save(schedule)

And the schedule retrieval code, which returns current entries. This code basically looks for a match on either the ‘*’ pattern or the current timestamped value:

1
2
3
4
5
6
7
def current_schedules():
    '''find schedule items that need to be run'''
    dt = datetime.datetime.today()
    look = {}
    vals = [dt.minute, dt.hour, dt.day, dt.month, dt.weekday()]
    for c, k in enumerate(_datefields): look[k] = { "$in" : ["*", vals[c]]}
    return mongodb.schedules.find(look)

Here is some simple client code:

1
2
3
4
5
6
7
8
9
10
11
def run():
    try:
        while 1:
            for schedule in current_schedules():
                _log.info("batch item at [%s]" % datetime.today())
                some_execution_method(schedule["params"])
            # try to wake up at one second after hour
            time.sleep(60 - datetime.datetime.today().second + 1)
    except KeyboardInterrupt: print("killed...")
 
if __name__ == "__main__": run()

Of course, this code isn’t thread-safe or “multiple-client-safe”, so running multiple clients is probably not a good idea.

distributed processing with python and rabbitmq
June 6, 2010 | by rob @ 7:27 pm | comments (0) | filed under [python,web]

Most software developer types are aware of “MapReduce”, made famous by Google. Basically, a large set of data is split into pieces and distributed over a network to multiple “workers”, who process in parallel. The processed data is then returned to a “reducer” who aggregates the results into a final dataset.

There are several implementations of this strategy; of which, Hadoop seems to be the most respected in the open source world. Yahoo has fully embraced the project.

However, Hadoop does have some caveats. Configuring a cluster requires editing a multitude of XML files. Hadoop filesystem has to be installed. The master node is not fault tolerant. Typically the “slave” node locations are stored in file, though new ones can be added dynamically. Hadoop also requires jobs to be written in languages that will run on the JVM.

When evaluating Hadoop, we decided it was a little too complex for our first proof-of-concept for a client. Plus we want to use tools like pypy with our Python application to boost performance. But what was an alternative? Disco? Seemed reasonable. Unfortunately, messages had to be pushed via SSH to “workers”, making them difficult to organize within firewalls. And additional workers have to be added manually through the web interface.

Sometimes square pegs just won’t fit into round holes. So, what technology was round enough for our needs? We knew distributing tasks was naturally solved by queues; one went in, another went out, until the queue was empty. In the networked world, message queues can spread job information across machines and geography. AMQP allows advanced routing of these messages through exchanges and is widely supported by a multitude of languages. Message queuing systems are also designed to be highly fault tolerant and distributed.

And it worked pretty well. We used py-amqplib as a client attached to rabbitmq with some fairly basic code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from amqplib import client_0_8 as amqp
from dobie.extract import configval
import logging
import uuid
 
log = logging.getLogger(__name__)
 
class Connection(object):
    '''basic connection to mq server'''
 
    def __init__(self):
        self.host = configval("host")
        self.user_id = configval("user_id")
        self.password = configval("password")
        self.port = configval("port")
        self.connect()
 
    def connect(self):
        self.connection = amqp.Connection(
            host='%s:%s' % (self.host, self.port),
            userid=self.user_id, password=self.password
        )
 
class Base(object):
    '''shared code for publisher and consumer'''
 
    def __init__(self, queue, connection=None):
        self.connection = connection or Connection()
        self.channel = self.connection.connection.channel()
        self.ex = configval("exchange")
        self.queue = queue
        params = (self.queue, self.ex, self.queue)
        log.debug("queue [%s], exchange [%s], routing [%s]" % params)
 
    def setup(self):
        '''configure queue and exchange'''
        self.channel.queue_declare(
            queue=self.queue, durable=True, exclusive=False, auto_delete=False
        )
        self.channel.exchange_declare(
            exchange=self.ex, type='direct', durable=True, auto_delete=False
        )
        self.channel.queue_bind(
            queue=self.queue, exchange=self.ex, routing_key=self.queue
        )
        return self
 
    def close(self):
        '''close connections'''
        # TODO consider pooling
        self.channel.close()
        self.connection.connection.close()
 
    def __enter__(self):
        return self.setup()
 
    def __exit__(self, _type, value, traceback):
        self.close()
 
class Consumer(Base):
    '''consumer of messages'''
 
    def __init__(self, queue, callback, connection=None):
        Base.__init__(self, queue, connection)
        self.callback = callback
        self.cid = str(uuid.uuid4())
 
    def __enter__(self):
        def _callback(message):
            self.callback(self, message)
        Base.__enter__(self)
        self.channel.basic_consume(
            queue=self.queue, callback=_callback, consumer_tag=self.cid
        )
        return self
 
    def wait(self, count=-1):
        '''wait for messages. if count is set, only consume a limited number
           before exiting'''
        while count != 0:
            self.channel.wait()
            if count > 0: count = count - 1
 
class Publisher(Base):
    '''publisher of messages'''
 
    def __init__(self, queue, connection=None):
        Base.__init__(self, queue, connection)
        self.delivery_mode = int(configval("delivery_mode"))
 
    def publish(self, message):
        message = amqp.Message(message)
        message.properties['delivery_mode'] = self.delivery_mode
        self.channel.basic_publish(
            message, exchange=self.ex, routing_key=self.queue
        )

There are some potential problems with this code. Connections aren’t pooled and it only uses a “direct” exchange. However, it suited our needs.

We relied on dictionaries to transfer data. Since we planned to stick with python, we pickled them before sending them over the line, which rabbitmq was happy to accept.

There is one problem which we have yet to resolve: if a client is connected and processing messages, newly connected clients will only receive messages that are added later. Will post a fix if we figure it out.

extracting data with selenium
April 26, 2010 | by rob @ 8:15 am | comments (0) | filed under [python,web]

If you’ve spent any time developing for the web as an independent contractor, you’ve probably run into a client who wants to programmatically extract data from a web site. Typically the site is a front end to a database, modified by search terms or configurations. You might also find this client needs to extract a variety of data from a multitude of paths. Selenium, traditionally used as a testing tool, can be a great asset in such a situation.

The “development” portion of Selenium is a Firefox plugin that records user clicks, paths, and form entries as the user interacts with the browser. Normally, this “recording” is converted to a programming language (by Selenium) and tweaked for automated, continuous testing during web development. However, generating code from a selenium script opens up the world of flexibility provided by the underlying programming language. For example, we could use loop constructs to continuously click on a “next” button while searching for a specific piece of data. This data could then be stored in a database or file, even sent to a web service.

Here is a very basic example of such functionality:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from selenium import selenium
import unittest, time, re
 
class sample(unittest.TestCase):
 
    def setUp(self):
        self.verificationErrors = []
        self.selenium = selenium("localhost", 4444, "*chrome", "http://your.site.here")
        self.selenium.start()
        with open("parts.txt") as f: self.parts = f.readlines()
 
    def testSample(self):
        sel = self.selenium
        for part in self.parts:
            sel.open("/database.html")
            sel.type("partnumber", part.strip())
            sel.click("go")
            sel.wait_for_page_to_load("30000")
            # look for a numeric pattern
            sel.click("link=regexp:[0-9]{7}")
            while 1:
                try: 
                    sel.wait_for_page_to_load("30000")
                    for i in range(1,8):
                        id = sel.get_table("//table[3].%s.1" % i)
                        status = sel.get_table("//table[3].%s.5" % i)
                        print "id %s, status %s " % (id, status)
                        if status == "READY": self._processHit(id)
                    sel.click("next")
                except Exception as e:
                    print "error or end of processing encountered"
                    #print e
                    break
 
    def _processHit(self, id):
        print "process %s" % id
        sel = self.selenium
        sel.click("link=%s" % id)
        sel.wait_for_page_to_load("30000")
        with open("result.txt", "a") as f:
            f.write("id %s, name [%s]\n" % (id, sel.get_table("//table[3].6.2")))
 
    def tearDown(self):
        self.selenium.stop()
        self.assertEqual([], self.verificationErrors)
 
if __name__ == "__main__":
    unittest.main()

Basically the order of operations is:

  • load the part numbers from a file
  • for each part, go to the “database” page and perform a part search
  • follow the id pattern from the regular expression in the search result
  • search through the tables in the result page, looking for an item that’s “READY”
  • follow a ready link to extract more detail and record information in the “result.txt” file
  • It probably isn’t a great idea to open and close the “result.txt” file for each result, but this is a pretty basic example.