simple cron-style batch scheduling
June 19, 2010 | by rob @ 7:22 am | comments (0) | filed under [python]

Some projects require background processes to run on a schedule, independent of the requesting client. For example, statistical data might need to be extracted nightly from a large dataset. There are multiple job scheduling packages well suited for the task, but they might be overkill for simple applications. Also, they are often locked into a particular programming language or platform.

As an avid unix user, the cron daemon has been an irreplaceable tool. The parameters are intuitive, with options to set the minute, hour, day of month, month, and day of week, that allow any pattern of scheduling. It is possible to build a similar simple scheduling system for an application; two methods add a schedule and retrieve current schedules. These examples rely on python and mongodb, but can be easily adjusted for a different language and a relational database.

Here is the scheduler with constant definitions for the field keys. I use the required “params” key as a dictionary holding the task configuration. This code of course doesn’t handle all cron options, such as ranges and step values, but should suit many tasks. If a value isn’t specified by a named parameter, it becomes a wildcard match, similar to cron:



1
2
3
4
5
6
7
_datefields = ["minute", "hour", "day", "month", "dayofweek"]
def schedule(user, params, minute="*", hour="*", day="*", month="*", dow="*"):
    '''add a schedule item to the database'''
    schedule = {Extract.USER : user, S.PARAMS : params}
    vals = [minute, hour, day, month, dow]
    for cnt, key in enumerate(_datefields): schedule[key] = vals[cnt]
    mongodb.schedules.save(schedule)

And the schedule retrieval code, which returns current entries. This code basically looks for a match on either the ‘*’ pattern or the current timestamped value:

1
2
3
4
5
6
7
def current_schedules():
    '''find schedule items that need to be run'''
    dt = datetime.datetime.today()
    look = {}
    vals = [dt.minute, dt.hour, dt.day, dt.month, dt.weekday()]
    for c, k in enumerate(_datefields): look[k] = { "$in" : ["*", vals[c]]}
    return mongodb.schedules.find(look)

Here is some simple client code:

1
2
3
4
5
6
7
8
9
10
11
def run():
    try:
        while 1:
            for schedule in current_schedules():
                _log.info("batch item at [%s]" % datetime.today())
                some_execution_method(schedule["params"])
            # try to wake up at one second after hour
            time.sleep(60 - datetime.datetime.today().second + 1)
    except KeyboardInterrupt: print("killed...")
 
if __name__ == "__main__": run()

Of course, this code isn’t thread-safe or “multiple-client-safe”, so running multiple clients is probably not a good idea.

distributed processing with python and rabbitmq
June 6, 2010 | by rob @ 7:27 pm | comments (0) | filed under [python,web]

Most software developer types are aware of “MapReduce”, made famous by Google. Basically, a large set of data is split into pieces and distributed over a network to multiple “workers”, who process in parallel. The processed data is then returned to a “reducer” who aggregates the results into a final dataset.

There are several implementations of this strategy; of which, Hadoop seems to be the most respected in the open source world. Yahoo has fully embraced the project.

However, Hadoop does have some caveats. Configuring a cluster requires editing a multitude of XML files. Hadoop filesystem has to be installed. The master node is not fault tolerant. Typically the “slave” node locations are stored in file, though new ones can be added dynamically. Hadoop also requires jobs to be written in languages that will run on the JVM.

When evaluating Hadoop, we decided it was a little too complex for our first proof-of-concept for a client. Plus we want to use tools like pypy with our Python application to boost performance. But what was an alternative? Disco? Seemed reasonable. Unfortunately, messages had to be pushed via SSH to “workers”, making them difficult to organize within firewalls. And additional workers have to be added manually through the web interface.

Sometimes square pegs just won’t fit into round holes. So, what technology was round enough for our needs? We knew distributing tasks was naturally solved by queues; one went in, another went out, until the queue was empty. In the networked world, message queues can spread job information across machines and geography. AMQP allows advanced routing of these messages through exchanges and is widely supported by a multitude of languages. Message queuing systems are also designed to be highly fault tolerant and distributed.

And it worked pretty well. We used py-amqplib as a client attached to rabbitmq with some fairly basic code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from amqplib import client_0_8 as amqp
from dobie.extract import configval
import logging
import uuid
 
log = logging.getLogger(__name__)
 
class Connection(object):
    '''basic connection to mq server'''
 
    def __init__(self):
        self.host = configval("host")
        self.user_id = configval("user_id")
        self.password = configval("password")
        self.port = configval("port")
        self.connect()
 
    def connect(self):
        self.connection = amqp.Connection(
            host='%s:%s' % (self.host, self.port),
            userid=self.user_id, password=self.password
        )
 
class Base(object):
    '''shared code for publisher and consumer'''
 
    def __init__(self, queue, connection=None):
        self.connection = connection or Connection()
        self.channel = self.connection.connection.channel()
        self.ex = configval("exchange")
        self.queue = queue
        params = (self.queue, self.ex, self.queue)
        log.debug("queue [%s], exchange [%s], routing [%s]" % params)
 
    def setup(self):
        '''configure queue and exchange'''
        self.channel.queue_declare(
            queue=self.queue, durable=True, exclusive=False, auto_delete=False
        )
        self.channel.exchange_declare(
            exchange=self.ex, type='direct', durable=True, auto_delete=False
        )
        self.channel.queue_bind(
            queue=self.queue, exchange=self.ex, routing_key=self.queue
        )
        return self
 
    def close(self):
        '''close connections'''
        # TODO consider pooling
        self.channel.close()
        self.connection.connection.close()
 
    def __enter__(self):
        return self.setup()
 
    def __exit__(self, _type, value, traceback):
        self.close()
 
class Consumer(Base):
    '''consumer of messages'''
 
    def __init__(self, queue, callback, connection=None):
        Base.__init__(self, queue, connection)
        self.callback = callback
        self.cid = str(uuid.uuid4())
 
    def __enter__(self):
        def _callback(message):
            self.callback(self, message)
        Base.__enter__(self)
        self.channel.basic_consume(
            queue=self.queue, callback=_callback, consumer_tag=self.cid
        )
        return self
 
    def wait(self, count=-1):
        '''wait for messages. if count is set, only consume a limited number
           before exiting'''
        while count != 0:
            self.channel.wait()
            if count > 0: count = count - 1
 
class Publisher(Base):
    '''publisher of messages'''
 
    def __init__(self, queue, connection=None):
        Base.__init__(self, queue, connection)
        self.delivery_mode = int(configval("delivery_mode"))
 
    def publish(self, message):
        message = amqp.Message(message)
        message.properties['delivery_mode'] = self.delivery_mode
        self.channel.basic_publish(
            message, exchange=self.ex, routing_key=self.queue
        )

There are some potential problems with this code. Connections aren’t pooled and it only uses a “direct” exchange. However, it suited our needs.

We relied on dictionaries to transfer data. Since we planned to stick with python, we pickled them before sending them over the line, which rabbitmq was happy to accept.

There is one problem which we have yet to resolve: if a client is connected and processing messages, newly connected clients will only receive messages that are added later. Will post a fix if we figure it out.