diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index ef2f246..c17434f 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -1317,9 +1317,11 @@ class Client(HasTraits): query : mongodb query dict The search dict. See mongodb query docs for details. keys : list of strs [optional] - THe subset of keys to be returned. The default is to fetch everything. + The subset of keys to be returned. The default is to fetch everything but buffers. 'msg_id' will *always* be included. """ + if isinstance(keys, basestring): + keys = [keys] content = dict(query=query, keys=keys) self.session.send(self._query_socket, "db_request", content=content) idents, msg = self.session.recv(self._query_socket, 0) diff --git a/docs/source/parallel/index.txt b/docs/source/parallel/index.txt index 0446207..ef81e70 100644 --- a/docs/source/parallel/index.txt +++ b/docs/source/parallel/index.txt @@ -12,6 +12,7 @@ Using IPython for parallel computing parallel_multiengine.txt parallel_task.txt parallel_mpi.txt + parallel_db.txt parallel_security.txt parallel_winhpc.txt parallel_demos.txt diff --git a/docs/source/parallel/parallel_db.txt b/docs/source/parallel/parallel_db.txt new file mode 100644 index 0000000..f3dea61 --- /dev/null +++ b/docs/source/parallel/parallel_db.txt @@ -0,0 +1,114 @@ +.. _parallel_db: + +======================= +IPython's Task Database +======================= + +The IPython Hub stores all task requests and results in a database. Currently supported backends +are: MongoDB, SQLite (the default), and an in-memory DictDB. The most common use case for +this is clients requesting results for tasks they did not submit, via: + +.. sourcecode:: ipython + + In [1]: rc.get_result(task_id) + +However, since we have this DB backend, we provide a direct query method in the :class:`client` +for users who want deeper introspection into their task history. The :meth:`db_query` method of +the Client is modeled after MongoDB queries, so if you have used MongoDB it should look +familiar. In fact, when the MongoDB backend is in use, the query is relayed directly. However, +when using other backends, the interface is emulated and only a subset of queries is possible. + +.. seealso:: + + MongoDB query docs: http://www.mongodb.org/display/DOCS/Querying + +:meth:`Client.db_query` takes a dictionary query object, with keys from the TaskRecord key list, +and values of either exact values to test, or MongoDB queries, which are dicts of The form: +``{'operator' : 'argument(s)'}``. There is also an optional `keys` argument, that specifies +which subset of keys should be retrieved. The default is to retrieve all keys excluding the +request and result buffers. :meth:`db_query` returns a list of TaskRecord dicts. Also like +MongoDB, the `msg_id` key will always be included, whether requested or not. + +TaskRecord keys: + +=============== =============== ============= +Key Type Description +=============== =============== ============= +msg_id uuid(bytes) The msg ID +header dict The request header +content dict The request content (likely empty) +buffers list(bytes) buffers containing serialized request objects +submitted datetime timestamp for time of submission (set by client) +client_uuid uuid(bytes) IDENT of client's socket +engine_uuid uuid(bytes) IDENT of engine's socket +started datetime time task began execution on engine +completed datetime time task finished execution (success or failure) on engine +resubmitted datetime time of resubmission (if applicable) +result_header dict header for result +result_content dict content for result +result_buffers list(bytes) buffers containing serialized request objects +queue bytes The name of the queue for the task ('mux' or 'task') +pyin Python input (unused) +pyout Python output (unused) +pyerr Python traceback (unused) +stdout str Stream of stdout data +stderr str Stream of stderr data + +=============== =============== ============= + +MongoDB operators we emulate on all backends: + +========== ================= +Operator Python equivalent +========== ================= + '$in' in + '$nin' not in + '$eq' == + '$ne' != + '$ge' > + '$gte' >= + '$le' < + '$lte' <= +========== ================= + + +The DB Query is useful for two primary cases: + +1. deep polling of task status or metadata +2. selecting a subset of tasks, on which to perform a later operation (e.g. wait on result, purge records, resubmit,...) + +Example Queries +=============== + + +To get all msg_ids that are not completed, only retrieving their ID and start time: + +.. sourcecode:: ipython + + In [1]: incomplete = rc.db_query({'complete' : None}, keys=['msg_id', 'started']) + +All jobs started in the last hour by me: + +.. sourcecode:: ipython + + In [1]: from datetime import datetime, timedelta + + In [2]: hourago = datetime.now() - timedelta(1./24) + + In [3]: recent = rc.db_query({'started' : {'$gte' : hourago }, + 'client_uuid' : rc.session.session}) + +All jobs started more than an hour ago, by clients *other than me*: + +.. sourcecode:: ipython + + In [3]: recent = rc.db_query({'started' : {'$le' : hourago }, + 'client_uuid' : {'$ne' : rc.session.session}}) + +Result headers for all jobs on engine 3 or 4: + +.. sourcecode:: ipython + + In [1]: uuids = map(rc._engines.get, (3,4)) + + In [2]: hist34 = rc.db_query({'engine_uuid' : {'$in' : uuids }, keys='result_header') diff --git a/docs/source/parallel/parallel_task.txt b/docs/source/parallel/parallel_task.txt index 9c32abd..e819d63 100644 --- a/docs/source/parallel/parallel_task.txt +++ b/docs/source/parallel/parallel_task.txt @@ -292,6 +292,7 @@ you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje + Impossible Dependencies *********************** @@ -313,6 +314,27 @@ The basic cases that are checked: This analysis has not been proven to be rigorous, so it is likely possible for tasks to become impossible to run in obscure situations, so a timeout may be a good choice. + +Retries and Resubmit +==================== + +Retries +------- + +Another flag for tasks is `retries`. This is an integer, specifying how many times +a task should be resubmitted after failure. This is useful for tasks that should still run +if their engine was shutdown, or may have some statistical chance of failing. The default +is to not retry tasks. + +Resubmit +-------- + +Sometimes you may want to re-run a task. This could be because it failed for some reason, and +you have fixed the error, or because you want to restore the cluster to an interrupted state. +For this, the :class:`Client` has a :meth:`rc.resubmit` method. This simply takes one or more +msg_ids, and returns an :class:`AsyncHubResult` for the result(s). You cannot resubmit +a task that is pending - only those that have finished, either successful or unsuccessful. + .. _parallel_schedulers: Schedulers @@ -391,6 +413,8 @@ Disabled features when using the ZMQ Scheduler: TODO: performance comparisons + + More details ============