Async
Func's Client() API can be invoked in asynchronous mode. This means that the API calls and network connections do not remain open and tasks can be very long running. This can be used in seemlessly in parallel with the Multiplexer feature for addressing a large number of systems both quickly and efficiently.
While this is contains lots of black magic behind the scenes on both the overlord and minion side, the API to do this remains very simple. This can be useful when building an app (like a Web app) on top of Func, and still wanting to be responsive. You can even get ajaxy-style partial results as they come in!
The API is illustrated by tests/async_test.py in the source directory. Though this API may change (see the source for more details), here's a bit of it...
from func.overlord.client import Client
import func.jobthing as jobthing
client = Client("*",nforks=10,async=True)
job_id = client.module_name.method_name(1,2,3,4)
(return_code, results) = client.job_status(job_id)
Return code is either
jobthing.JOB_ID_RUNNING = 0 jobthing.JOB_ID_FINISHED = 1 jobthing.JOB_ID_LOST_IN_SPACE = 2 jobthing.JOB_ID_PARTIAL = 3
If the job id is JOB_ID_LOST_IN_SPACE or JOB_ID_RUNNING, the result is undefined.
If the job id is JOB_ID_PARTIAL or JOB_ID_FINISHED, the results look like this:
{
"hostname1" : result_from_that_minion,
"hostname2" : result_from_that_minion
}
Task Deletion
Job status will be deleted after a certain amount of time expired or after there are Y amount of jobs, for some arbitrary X and Y. Basically if you start the job, you should do something with the status when you get it back and not expect to retrieve it indefinitely.
Assume data will be around for at least 60 minutes.
How It Works
Caution: There be dragons!
OVERLORD
[ func/overlord/client.py ]
Our journey starts in the Overlord() class, which has self.async variable indicating if we are running in async or normal (non-async or sync) mode. Its run() method is invoked each time you call any method that is not directly defined in Overlord() class (thru __getattr() magic). Because of all the delegation stuff, actual work, that is run on final overlord, is done in run_direct() method. If self.async is set, jobs are not called directly but jobthing.batch_run() is used.
[ func/jobthing.py ]
The first thing done in batch_run() method is to generate job_id. It is a string that is made using time when job was called, module, method, arguments that where used and glob value used to call the job. It's basically created like this:
job_id = "".join([glob,"-",module,"-",method,"-",pprint.pformat(time.time())])
At this moment, status for this job_id is set to JOB_ID_RUNNING in *overlord* status file (currently dbm file). The process is then forked and parent process is back to its duties while child takes care of the rest of the work. This shouldn't take too much time however since all that is done is to call forkbomb.batch_run() which returns job_is returned *from*the*minions*. This job_ids are then written to the overlord status file, status is changed to JOB_ID_PARTIAL and this child process exits.
[ func/forkbomb.py ]
batch_run() method from forkbomb method takes the list of minions that we want to run job on (poll) and divides it to the NFORKS number of smaller lists (buckets). There is one bucket for each fork that will be used. Now __forkbomb() method creates (using recursion) NFORKS and pass one bucket to each of them. __with_my_bucket() method, that iterates over the bucket and run the job on each minion assigned to it, is called on each worker process. Remote methods are called in almost the same way as normal (non-async), the only change is the "async." prefix added to the method name.
MINION
[ func/minion/server.py ]
The minion tale starts in _dispatch() method from FuncSSLXMLRPCServer class. If method name starts with "async.", it is removed and the method is called using jobthing.minion_async_run() instead of direct call.
[ func/jobthing.py ]
First thing done in minion_async_run() method is job_id generation. Minion side job_id has different format from the overlord one, it's created like this: job_id = "%s-minion" % pprint.pformat(time.time())
Now status (in the *minion* status file) of the job is set to JOB_ID_RUNNING with result equal to -1. The process is daemonized with double fork technique. Parent is back to it's normal duties with the daemonized fork running the job independently. After work is done, daemonized process writes job result and status (JOB_ID_FINISHED) to the minion status file and exits.
There are two things that are worth noting:
- There are two kinds of job_ids. There is one overlord job_id for each async call. This call can result in many minion job_ids (one on each minion).
- There are two *separate* status files - one on the overlord and one on the minion. They are used to store DIFFERENT informations. Minion status file stores the status (and returned result) of the running job. Overlord status file is used to map overlord job_id to the minion job_ids on each minion. After job is finished, overlord status file contains the result since we don't need minion job_ids anymore and this allows us to get result without doing any remote call (it's a Denis idea and I really think it's good). What was wrong in Denis patch was that he also updated overlord state file when there where partial results, overwriting remote job_ids and making it impossible to ask for the rest of results.
Here's how job status should change over time in case of successful call:
| OVERLORD | MINION1 | MINION2 | |
| step 1: | JOB_ID_RUNNING | - | - |
| step 2: | JOB_ID_RUNNING | JOB_ID_RUNNING | - |
| step 3: | JOB_ID_PARTIAL | JOB_ID_RUNNING | JOB_ID_RUNNING |
| step 4: | JOB_ID_PARTIAL | JOB_ID_FINISHED | JOB_ID_RUNNING |
| step 5: | JOB_ID_PARTIAL | JOB_ID_FINISHED | JOB_ID_FINISHED |
| step 6: | JOB_ID_FINISHED | JOB_ID_FINISHED | JOB_ID_FINISHED |
