Show More
@@ -0,0 +1,120 | |||||
|
1 | """test LoadBalancedView objects""" | |||
|
2 | # -*- coding: utf-8 -*- | |||
|
3 | #------------------------------------------------------------------------------- | |||
|
4 | # Copyright (C) 2011 The IPython Development Team | |||
|
5 | # | |||
|
6 | # Distributed under the terms of the BSD License. The full license is in | |||
|
7 | # the file COPYING, distributed as part of this software. | |||
|
8 | #------------------------------------------------------------------------------- | |||
|
9 | ||||
|
10 | #------------------------------------------------------------------------------- | |||
|
11 | # Imports | |||
|
12 | #------------------------------------------------------------------------------- | |||
|
13 | ||||
|
14 | import sys | |||
|
15 | import time | |||
|
16 | ||||
|
17 | import zmq | |||
|
18 | ||||
|
19 | from IPython import parallel as pmod | |||
|
20 | from IPython.parallel import error | |||
|
21 | ||||
|
22 | from IPython.parallel.tests import add_engines | |||
|
23 | ||||
|
24 | from .clienttest import ClusterTestCase, crash, wait, skip_without | |||
|
25 | ||||
|
26 | def setup(): | |||
|
27 | add_engines(3) | |||
|
28 | ||||
|
29 | class TestLoadBalancedView(ClusterTestCase): | |||
|
30 | ||||
|
31 | def setUp(self): | |||
|
32 | ClusterTestCase.setUp(self) | |||
|
33 | self.view = self.client.load_balanced_view() | |||
|
34 | ||||
|
35 | def test_z_crash_task(self): | |||
|
36 | """test graceful handling of engine death (balanced)""" | |||
|
37 | # self.add_engines(1) | |||
|
38 | ar = self.view.apply_async(crash) | |||
|
39 | self.assertRaisesRemote(error.EngineError, ar.get, 10) | |||
|
40 | eid = ar.engine_id | |||
|
41 | tic = time.time() | |||
|
42 | while eid in self.client.ids and time.time()-tic < 5: | |||
|
43 | time.sleep(.01) | |||
|
44 | self.client.spin() | |||
|
45 | self.assertFalse(eid in self.client.ids, "Engine should have died") | |||
|
46 | ||||
|
47 | def test_map(self): | |||
|
48 | def f(x): | |||
|
49 | return x**2 | |||
|
50 | data = range(16) | |||
|
51 | r = self.view.map_sync(f, data) | |||
|
52 | self.assertEquals(r, map(f, data)) | |||
|
53 | ||||
|
54 | def test_abort(self): | |||
|
55 | view = self.view | |||
|
56 | ar = self.client[:].apply_async(time.sleep, .5) | |||
|
57 | ar2 = view.apply_async(lambda : 2) | |||
|
58 | ar3 = view.apply_async(lambda : 3) | |||
|
59 | view.abort(ar2) | |||
|
60 | view.abort(ar3.msg_ids) | |||
|
61 | self.assertRaises(error.TaskAborted, ar2.get) | |||
|
62 | self.assertRaises(error.TaskAborted, ar3.get) | |||
|
63 | ||||
|
64 | def test_retries(self): | |||
|
65 | add_engines(3) | |||
|
66 | view = self.view | |||
|
67 | view.timeout = 1 # prevent hang if this doesn't behave | |||
|
68 | def fail(): | |||
|
69 | assert False | |||
|
70 | for r in range(len(self.client)-1): | |||
|
71 | with view.temp_flags(retries=r): | |||
|
72 | self.assertRaisesRemote(AssertionError, view.apply_sync, fail) | |||
|
73 | ||||
|
74 | with view.temp_flags(retries=len(self.client), timeout=0.25): | |||
|
75 | self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail) | |||
|
76 | ||||
|
77 | def test_invalid_dependency(self): | |||
|
78 | view = self.view | |||
|
79 | with view.temp_flags(after='12345'): | |||
|
80 | self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1) | |||
|
81 | ||||
|
82 | def test_impossible_dependency(self): | |||
|
83 | if len(self.client) < 2: | |||
|
84 | add_engines(2) | |||
|
85 | view = self.client.load_balanced_view() | |||
|
86 | ar1 = view.apply_async(lambda : 1) | |||
|
87 | ar1.get() | |||
|
88 | e1 = ar1.engine_id | |||
|
89 | e2 = e1 | |||
|
90 | while e2 == e1: | |||
|
91 | ar2 = view.apply_async(lambda : 1) | |||
|
92 | ar2.get() | |||
|
93 | e2 = ar2.engine_id | |||
|
94 | ||||
|
95 | with view.temp_flags(follow=[ar1, ar2]): | |||
|
96 | self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1) | |||
|
97 | ||||
|
98 | ||||
|
99 | def test_follow(self): | |||
|
100 | ar = self.view.apply_async(lambda : 1) | |||
|
101 | ar.get() | |||
|
102 | ars = [] | |||
|
103 | first_id = ar.engine_id | |||
|
104 | ||||
|
105 | self.view.follow = ar | |||
|
106 | for i in range(5): | |||
|
107 | ars.append(self.view.apply_async(lambda : 1)) | |||
|
108 | self.view.wait(ars) | |||
|
109 | for ar in ars: | |||
|
110 | self.assertEquals(ar.engine_id, first_id) | |||
|
111 | ||||
|
112 | def test_after(self): | |||
|
113 | view = self.view | |||
|
114 | ar = view.apply_async(time.sleep, 0.5) | |||
|
115 | with view.temp_flags(after=ar): | |||
|
116 | ar2 = view.apply_async(lambda : 1) | |||
|
117 | ||||
|
118 | ar.wait() | |||
|
119 | ar2.wait() | |||
|
120 | self.assertTrue(ar2.started > ar.completed) |
@@ -0,0 +1,37 | |||||
|
1 | """Tests for mongodb backend""" | |||
|
2 | ||||
|
3 | #------------------------------------------------------------------------------- | |||
|
4 | # Copyright (C) 2011 The IPython Development Team | |||
|
5 | # | |||
|
6 | # Distributed under the terms of the BSD License. The full license is in | |||
|
7 | # the file COPYING, distributed as part of this software. | |||
|
8 | #------------------------------------------------------------------------------- | |||
|
9 | ||||
|
10 | #------------------------------------------------------------------------------- | |||
|
11 | # Imports | |||
|
12 | #------------------------------------------------------------------------------- | |||
|
13 | ||||
|
14 | from nose import SkipTest | |||
|
15 | ||||
|
16 | from pymongo import Connection | |||
|
17 | from IPython.parallel.controller.mongodb import MongoDB | |||
|
18 | ||||
|
19 | from . import test_db | |||
|
20 | ||||
|
21 | try: | |||
|
22 | c = Connection() | |||
|
23 | except Exception: | |||
|
24 | c=None | |||
|
25 | ||||
|
26 | class TestMongoBackend(test_db.TestDictBackend): | |||
|
27 | """MongoDB backend tests""" | |||
|
28 | ||||
|
29 | def create_db(self): | |||
|
30 | try: | |||
|
31 | return MongoDB(database='iptestdb', _connection=c) | |||
|
32 | except Exception: | |||
|
33 | raise SkipTest("Couldn't connect to mongodb") | |||
|
34 | ||||
|
35 | def teardown(self): | |||
|
36 | if c is not None: | |||
|
37 | c.drop_database('iptestdb') |
@@ -0,0 +1,114 | |||||
|
1 | .. _parallel_db: | |||
|
2 | ||||
|
3 | ======================= | |||
|
4 | IPython's Task Database | |||
|
5 | ======================= | |||
|
6 | ||||
|
7 | The IPython Hub stores all task requests and results in a database. Currently supported backends | |||
|
8 | are: MongoDB, SQLite (the default), and an in-memory DictDB. The most common use case for | |||
|
9 | this is clients requesting results for tasks they did not submit, via: | |||
|
10 | ||||
|
11 | .. sourcecode:: ipython | |||
|
12 | ||||
|
13 | In [1]: rc.get_result(task_id) | |||
|
14 | ||||
|
15 | However, since we have this DB backend, we provide a direct query method in the :class:`client` | |||
|
16 | for users who want deeper introspection into their task history. The :meth:`db_query` method of | |||
|
17 | the Client is modeled after MongoDB queries, so if you have used MongoDB it should look | |||
|
18 | familiar. In fact, when the MongoDB backend is in use, the query is relayed directly. However, | |||
|
19 | when using other backends, the interface is emulated and only a subset of queries is possible. | |||
|
20 | ||||
|
21 | .. seealso:: | |||
|
22 | ||||
|
23 | MongoDB query docs: http://www.mongodb.org/display/DOCS/Querying | |||
|
24 | ||||
|
25 | :meth:`Client.db_query` takes a dictionary query object, with keys from the TaskRecord key list, | |||
|
26 | and values of either exact values to test, or MongoDB queries, which are dicts of The form: | |||
|
27 | ``{'operator' : 'argument(s)'}``. There is also an optional `keys` argument, that specifies | |||
|
28 | which subset of keys should be retrieved. The default is to retrieve all keys excluding the | |||
|
29 | request and result buffers. :meth:`db_query` returns a list of TaskRecord dicts. Also like | |||
|
30 | MongoDB, the `msg_id` key will always be included, whether requested or not. | |||
|
31 | ||||
|
32 | TaskRecord keys: | |||
|
33 | ||||
|
34 | =============== =============== ============= | |||
|
35 | Key Type Description | |||
|
36 | =============== =============== ============= | |||
|
37 | msg_id uuid(bytes) The msg ID | |||
|
38 | header dict The request header | |||
|
39 | content dict The request content (likely empty) | |||
|
40 | buffers list(bytes) buffers containing serialized request objects | |||
|
41 | submitted datetime timestamp for time of submission (set by client) | |||
|
42 | client_uuid uuid(bytes) IDENT of client's socket | |||
|
43 | engine_uuid uuid(bytes) IDENT of engine's socket | |||
|
44 | started datetime time task began execution on engine | |||
|
45 | completed datetime time task finished execution (success or failure) on engine | |||
|
46 | resubmitted datetime time of resubmission (if applicable) | |||
|
47 | result_header dict header for result | |||
|
48 | result_content dict content for result | |||
|
49 | result_buffers list(bytes) buffers containing serialized request objects | |||
|
50 | queue bytes The name of the queue for the task ('mux' or 'task') | |||
|
51 | pyin <unused> Python input (unused) | |||
|
52 | pyout <unused> Python output (unused) | |||
|
53 | pyerr <unused> Python traceback (unused) | |||
|
54 | stdout str Stream of stdout data | |||
|
55 | stderr str Stream of stderr data | |||
|
56 | ||||
|
57 | =============== =============== ============= | |||
|
58 | ||||
|
59 | MongoDB operators we emulate on all backends: | |||
|
60 | ||||
|
61 | ========== ================= | |||
|
62 | Operator Python equivalent | |||
|
63 | ========== ================= | |||
|
64 | '$in' in | |||
|
65 | '$nin' not in | |||
|
66 | '$eq' == | |||
|
67 | '$ne' != | |||
|
68 | '$ge' > | |||
|
69 | '$gte' >= | |||
|
70 | '$le' < | |||
|
71 | '$lte' <= | |||
|
72 | ========== ================= | |||
|
73 | ||||
|
74 | ||||
|
75 | The DB Query is useful for two primary cases: | |||
|
76 | ||||
|
77 | 1. deep polling of task status or metadata | |||
|
78 | 2. selecting a subset of tasks, on which to perform a later operation (e.g. wait on result, purge records, resubmit,...) | |||
|
79 | ||||
|
80 | Example Queries | |||
|
81 | =============== | |||
|
82 | ||||
|
83 | ||||
|
84 | To get all msg_ids that are not completed, only retrieving their ID and start time: | |||
|
85 | ||||
|
86 | .. sourcecode:: ipython | |||
|
87 | ||||
|
88 | In [1]: incomplete = rc.db_query({'complete' : None}, keys=['msg_id', 'started']) | |||
|
89 | ||||
|
90 | All jobs started in the last hour by me: | |||
|
91 | ||||
|
92 | .. sourcecode:: ipython | |||
|
93 | ||||
|
94 | In [1]: from datetime import datetime, timedelta | |||
|
95 | ||||
|
96 | In [2]: hourago = datetime.now() - timedelta(1./24) | |||
|
97 | ||||
|
98 | In [3]: recent = rc.db_query({'started' : {'$gte' : hourago }, | |||
|
99 | 'client_uuid' : rc.session.session}) | |||
|
100 | ||||
|
101 | All jobs started more than an hour ago, by clients *other than me*: | |||
|
102 | ||||
|
103 | .. sourcecode:: ipython | |||
|
104 | ||||
|
105 | In [3]: recent = rc.db_query({'started' : {'$le' : hourago }, | |||
|
106 | 'client_uuid' : {'$ne' : rc.session.session}}) | |||
|
107 | ||||
|
108 | Result headers for all jobs on engine 3 or 4: | |||
|
109 | ||||
|
110 | .. sourcecode:: ipython | |||
|
111 | ||||
|
112 | In [1]: uuids = map(rc._engines.get, (3,4)) | |||
|
113 | ||||
|
114 | In [2]: hist34 = rc.db_query({'engine_uuid' : {'$in' : uuids }, keys='result_header') |
@@ -1041,6 +1041,68 class Client(HasTraits): | |||||
1041 | ar.wait() |
|
1041 | ar.wait() | |
1042 |
|
1042 | |||
1043 | return ar |
|
1043 | return ar | |
|
1044 | ||||
|
1045 | @spin_first | |||
|
1046 | def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None): | |||
|
1047 | """Resubmit one or more tasks. | |||
|
1048 | ||||
|
1049 | in-flight tasks may not be resubmitted. | |||
|
1050 | ||||
|
1051 | Parameters | |||
|
1052 | ---------- | |||
|
1053 | ||||
|
1054 | indices_or_msg_ids : integer history index, str msg_id, or list of either | |||
|
1055 | The indices or msg_ids of indices to be retrieved | |||
|
1056 | ||||
|
1057 | block : bool | |||
|
1058 | Whether to wait for the result to be done | |||
|
1059 | ||||
|
1060 | Returns | |||
|
1061 | ------- | |||
|
1062 | ||||
|
1063 | AsyncHubResult | |||
|
1064 | A subclass of AsyncResult that retrieves results from the Hub | |||
|
1065 | ||||
|
1066 | """ | |||
|
1067 | block = self.block if block is None else block | |||
|
1068 | if indices_or_msg_ids is None: | |||
|
1069 | indices_or_msg_ids = -1 | |||
|
1070 | ||||
|
1071 | if not isinstance(indices_or_msg_ids, (list,tuple)): | |||
|
1072 | indices_or_msg_ids = [indices_or_msg_ids] | |||
|
1073 | ||||
|
1074 | theids = [] | |||
|
1075 | for id in indices_or_msg_ids: | |||
|
1076 | if isinstance(id, int): | |||
|
1077 | id = self.history[id] | |||
|
1078 | if not isinstance(id, str): | |||
|
1079 | raise TypeError("indices must be str or int, not %r"%id) | |||
|
1080 | theids.append(id) | |||
|
1081 | ||||
|
1082 | for msg_id in theids: | |||
|
1083 | self.outstanding.discard(msg_id) | |||
|
1084 | if msg_id in self.history: | |||
|
1085 | self.history.remove(msg_id) | |||
|
1086 | self.results.pop(msg_id, None) | |||
|
1087 | self.metadata.pop(msg_id, None) | |||
|
1088 | content = dict(msg_ids = theids) | |||
|
1089 | ||||
|
1090 | self.session.send(self._query_socket, 'resubmit_request', content) | |||
|
1091 | ||||
|
1092 | zmq.select([self._query_socket], [], []) | |||
|
1093 | idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK) | |||
|
1094 | if self.debug: | |||
|
1095 | pprint(msg) | |||
|
1096 | content = msg['content'] | |||
|
1097 | if content['status'] != 'ok': | |||
|
1098 | raise self._unwrap_exception(content) | |||
|
1099 | ||||
|
1100 | ar = AsyncHubResult(self, msg_ids=theids) | |||
|
1101 | ||||
|
1102 | if block: | |||
|
1103 | ar.wait() | |||
|
1104 | ||||
|
1105 | return ar | |||
1044 |
|
1106 | |||
1045 | @spin_first |
|
1107 | @spin_first | |
1046 | def result_status(self, msg_ids, status_only=True): |
|
1108 | def result_status(self, msg_ids, status_only=True): | |
@@ -1255,9 +1317,11 class Client(HasTraits): | |||||
1255 | query : mongodb query dict |
|
1317 | query : mongodb query dict | |
1256 | The search dict. See mongodb query docs for details. |
|
1318 | The search dict. See mongodb query docs for details. | |
1257 | keys : list of strs [optional] |
|
1319 | keys : list of strs [optional] | |
1258 |
T |
|
1320 | The subset of keys to be returned. The default is to fetch everything but buffers. | |
1259 | 'msg_id' will *always* be included. |
|
1321 | 'msg_id' will *always* be included. | |
1260 | """ |
|
1322 | """ | |
|
1323 | if isinstance(keys, basestring): | |||
|
1324 | keys = [keys] | |||
1261 | content = dict(query=query, keys=keys) |
|
1325 | content = dict(query=query, keys=keys) | |
1262 | self.session.send(self._query_socket, "db_request", content=content) |
|
1326 | self.session.send(self._query_socket, "db_request", content=content) | |
1263 | idents, msg = self.session.recv(self._query_socket, 0) |
|
1327 | idents, msg = self.session.recv(self._query_socket, 0) |
@@ -19,7 +19,7 from types import ModuleType | |||||
19 | import zmq |
|
19 | import zmq | |
20 |
|
20 | |||
21 | from IPython.testing import decorators as testdec |
|
21 | from IPython.testing import decorators as testdec | |
22 | from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance, CFloat |
|
22 | from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance, CFloat, CInt | |
23 |
|
23 | |||
24 | from IPython.external.decorator import decorator |
|
24 | from IPython.external.decorator import decorator | |
25 |
|
25 | |||
@@ -791,9 +791,10 class LoadBalancedView(View): | |||||
791 | follow=Any() |
|
791 | follow=Any() | |
792 | after=Any() |
|
792 | after=Any() | |
793 | timeout=CFloat() |
|
793 | timeout=CFloat() | |
|
794 | retries = CInt(0) | |||
794 |
|
795 | |||
795 | _task_scheme = Any() |
|
796 | _task_scheme = Any() | |
796 | _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout']) |
|
797 | _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries']) | |
797 |
|
798 | |||
798 | def __init__(self, client=None, socket=None, **flags): |
|
799 | def __init__(self, client=None, socket=None, **flags): | |
799 | super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags) |
|
800 | super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags) | |
@@ -851,7 +852,7 class LoadBalancedView(View): | |||||
851 | whether to create a MessageTracker to allow the user to |
|
852 | whether to create a MessageTracker to allow the user to | |
852 | safely edit after arrays and buffers during non-copying |
|
853 | safely edit after arrays and buffers during non-copying | |
853 | sends. |
|
854 | sends. | |
854 | # |
|
855 | ||
855 | after : Dependency or collection of msg_ids |
|
856 | after : Dependency or collection of msg_ids | |
856 | Only for load-balanced execution (targets=None) |
|
857 | Only for load-balanced execution (targets=None) | |
857 | Specify a list of msg_ids as a time-based dependency. |
|
858 | Specify a list of msg_ids as a time-based dependency. | |
@@ -869,6 +870,9 class LoadBalancedView(View): | |||||
869 | Specify an amount of time (in seconds) for the scheduler to |
|
870 | Specify an amount of time (in seconds) for the scheduler to | |
870 | wait for dependencies to be met before failing with a |
|
871 | wait for dependencies to be met before failing with a | |
871 | DependencyTimeout. |
|
872 | DependencyTimeout. | |
|
873 | ||||
|
874 | retries : int | |||
|
875 | Number of times a task will be retried on failure. | |||
872 | """ |
|
876 | """ | |
873 |
|
877 | |||
874 | super(LoadBalancedView, self).set_flags(**kwargs) |
|
878 | super(LoadBalancedView, self).set_flags(**kwargs) | |
@@ -892,7 +896,7 class LoadBalancedView(View): | |||||
892 | @save_ids |
|
896 | @save_ids | |
893 | def _really_apply(self, f, args=None, kwargs=None, block=None, track=None, |
|
897 | def _really_apply(self, f, args=None, kwargs=None, block=None, track=None, | |
894 | after=None, follow=None, timeout=None, |
|
898 | after=None, follow=None, timeout=None, | |
895 | targets=None): |
|
899 | targets=None, retries=None): | |
896 | """calls f(*args, **kwargs) on a remote engine, returning the result. |
|
900 | """calls f(*args, **kwargs) on a remote engine, returning the result. | |
897 |
|
901 | |||
898 | This method temporarily sets all of `apply`'s flags for a single call. |
|
902 | This method temporarily sets all of `apply`'s flags for a single call. | |
@@ -933,10 +937,11 class LoadBalancedView(View): | |||||
933 | raise RuntimeError(msg) |
|
937 | raise RuntimeError(msg) | |
934 |
|
938 | |||
935 | if self._task_scheme == 'pure': |
|
939 | if self._task_scheme == 'pure': | |
936 |
# pure zmq scheme doesn't support |
|
940 | # pure zmq scheme doesn't support extra features | |
937 |
msg = "Pure ZMQ scheduler doesn't support |
|
941 | msg = "Pure ZMQ scheduler doesn't support the following flags:" | |
938 | if (follow or after): |
|
942 | "follow, after, retries, targets, timeout" | |
939 | # hard fail on DAG dependencies |
|
943 | if (follow or after or retries or targets or timeout): | |
|
944 | # hard fail on Scheduler flags | |||
940 | raise RuntimeError(msg) |
|
945 | raise RuntimeError(msg) | |
941 | if isinstance(f, dependent): |
|
946 | if isinstance(f, dependent): | |
942 | # soft warn on functional dependencies |
|
947 | # soft warn on functional dependencies | |
@@ -948,10 +953,14 class LoadBalancedView(View): | |||||
948 | block = self.block if block is None else block |
|
953 | block = self.block if block is None else block | |
949 | track = self.track if track is None else track |
|
954 | track = self.track if track is None else track | |
950 | after = self.after if after is None else after |
|
955 | after = self.after if after is None else after | |
|
956 | retries = self.retries if retries is None else retries | |||
951 | follow = self.follow if follow is None else follow |
|
957 | follow = self.follow if follow is None else follow | |
952 | timeout = self.timeout if timeout is None else timeout |
|
958 | timeout = self.timeout if timeout is None else timeout | |
953 | targets = self.targets if targets is None else targets |
|
959 | targets = self.targets if targets is None else targets | |
954 |
|
960 | |||
|
961 | if not isinstance(retries, int): | |||
|
962 | raise TypeError('retries must be int, not %r'%type(retries)) | |||
|
963 | ||||
955 | if targets is None: |
|
964 | if targets is None: | |
956 | idents = [] |
|
965 | idents = [] | |
957 | else: |
|
966 | else: | |
@@ -959,7 +968,7 class LoadBalancedView(View): | |||||
959 |
|
968 | |||
960 | after = self._render_dependency(after) |
|
969 | after = self._render_dependency(after) | |
961 | follow = self._render_dependency(follow) |
|
970 | follow = self._render_dependency(follow) | |
962 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents) |
|
971 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries) | |
963 |
|
972 | |||
964 | msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track, |
|
973 | msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track, | |
965 | subheader=subheader) |
|
974 | subheader=subheader) |
@@ -146,7 +146,7 class DictDB(BaseDB): | |||||
146 | """Remove a record from the DB.""" |
|
146 | """Remove a record from the DB.""" | |
147 | matches = self._match(check) |
|
147 | matches = self._match(check) | |
148 | for m in matches: |
|
148 | for m in matches: | |
149 | del self._records[m] |
|
149 | del self._records[m['msg_id']] | |
150 |
|
150 | |||
151 | def drop_record(self, msg_id): |
|
151 | def drop_record(self, msg_id): | |
152 | """Remove a record from the DB.""" |
|
152 | """Remove a record from the DB.""" |
@@ -268,8 +268,15 class HubFactory(RegistrationFactory): | |||||
268 | } |
|
268 | } | |
269 | self.log.debug("Hub engine addrs: %s"%self.engine_info) |
|
269 | self.log.debug("Hub engine addrs: %s"%self.engine_info) | |
270 | self.log.debug("Hub client addrs: %s"%self.client_info) |
|
270 | self.log.debug("Hub client addrs: %s"%self.client_info) | |
|
271 | ||||
|
272 | # resubmit stream | |||
|
273 | r = ZMQStream(ctx.socket(zmq.XREQ), loop) | |||
|
274 | url = util.disambiguate_url(self.client_info['task'][-1]) | |||
|
275 | r.setsockopt(zmq.IDENTITY, self.session.session) | |||
|
276 | r.connect(url) | |||
|
277 | ||||
271 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, |
|
278 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, | |
272 | query=q, notifier=n, db=self.db, |
|
279 | query=q, notifier=n, resubmit=r, db=self.db, | |
273 | engine_info=self.engine_info, client_info=self.client_info, |
|
280 | engine_info=self.engine_info, client_info=self.client_info, | |
274 | logname=self.log.name) |
|
281 | logname=self.log.name) | |
275 |
|
282 | |||
@@ -315,8 +322,9 class Hub(LoggingFactory): | |||||
315 | loop=Instance(ioloop.IOLoop) |
|
322 | loop=Instance(ioloop.IOLoop) | |
316 | query=Instance(ZMQStream) |
|
323 | query=Instance(ZMQStream) | |
317 | monitor=Instance(ZMQStream) |
|
324 | monitor=Instance(ZMQStream) | |
318 | heartmonitor=Instance(HeartMonitor) |
|
|||
319 | notifier=Instance(ZMQStream) |
|
325 | notifier=Instance(ZMQStream) | |
|
326 | resubmit=Instance(ZMQStream) | |||
|
327 | heartmonitor=Instance(HeartMonitor) | |||
320 | db=Instance(object) |
|
328 | db=Instance(object) | |
321 | client_info=Dict() |
|
329 | client_info=Dict() | |
322 | engine_info=Dict() |
|
330 | engine_info=Dict() | |
@@ -379,6 +387,9 class Hub(LoggingFactory): | |||||
379 | 'connection_request': self.connection_request, |
|
387 | 'connection_request': self.connection_request, | |
380 | } |
|
388 | } | |
381 |
|
389 | |||
|
390 | # ignore resubmit replies | |||
|
391 | self.resubmit.on_recv(lambda msg: None, copy=False) | |||
|
392 | ||||
382 | self.log.info("hub::created hub") |
|
393 | self.log.info("hub::created hub") | |
383 |
|
394 | |||
384 | @property |
|
395 | @property | |
@@ -452,31 +463,31 class Hub(LoggingFactory): | |||||
452 | def dispatch_monitor_traffic(self, msg): |
|
463 | def dispatch_monitor_traffic(self, msg): | |
453 | """all ME and Task queue messages come through here, as well as |
|
464 | """all ME and Task queue messages come through here, as well as | |
454 | IOPub traffic.""" |
|
465 | IOPub traffic.""" | |
455 |
self.log.debug("monitor traffic: % |
|
466 | self.log.debug("monitor traffic: %r"%msg[:2]) | |
456 | switch = msg[0] |
|
467 | switch = msg[0] | |
457 | idents, msg = self.session.feed_identities(msg[1:]) |
|
468 | idents, msg = self.session.feed_identities(msg[1:]) | |
458 | if not idents: |
|
469 | if not idents: | |
459 |
self.log.error("Bad Monitor Message: % |
|
470 | self.log.error("Bad Monitor Message: %r"%msg) | |
460 | return |
|
471 | return | |
461 | handler = self.monitor_handlers.get(switch, None) |
|
472 | handler = self.monitor_handlers.get(switch, None) | |
462 | if handler is not None: |
|
473 | if handler is not None: | |
463 | handler(idents, msg) |
|
474 | handler(idents, msg) | |
464 | else: |
|
475 | else: | |
465 |
self.log.error("Invalid monitor topic: % |
|
476 | self.log.error("Invalid monitor topic: %r"%switch) | |
466 |
|
477 | |||
467 |
|
478 | |||
468 | def dispatch_query(self, msg): |
|
479 | def dispatch_query(self, msg): | |
469 | """Route registration requests and queries from clients.""" |
|
480 | """Route registration requests and queries from clients.""" | |
470 | idents, msg = self.session.feed_identities(msg) |
|
481 | idents, msg = self.session.feed_identities(msg) | |
471 | if not idents: |
|
482 | if not idents: | |
472 |
self.log.error("Bad Query Message: % |
|
483 | self.log.error("Bad Query Message: %r"%msg) | |
473 | return |
|
484 | return | |
474 | client_id = idents[0] |
|
485 | client_id = idents[0] | |
475 | try: |
|
486 | try: | |
476 | msg = self.session.unpack_message(msg, content=True) |
|
487 | msg = self.session.unpack_message(msg, content=True) | |
477 | except: |
|
488 | except: | |
478 | content = error.wrap_exception() |
|
489 | content = error.wrap_exception() | |
479 |
self.log.error("Bad Query Message: % |
|
490 | self.log.error("Bad Query Message: %r"%msg, exc_info=True) | |
480 | self.session.send(self.query, "hub_error", ident=client_id, |
|
491 | self.session.send(self.query, "hub_error", ident=client_id, | |
481 | content=content) |
|
492 | content=content) | |
482 | return |
|
493 | return | |
@@ -484,16 +495,17 class Hub(LoggingFactory): | |||||
484 | # print client_id, header, parent, content |
|
495 | # print client_id, header, parent, content | |
485 | #switch on message type: |
|
496 | #switch on message type: | |
486 | msg_type = msg['msg_type'] |
|
497 | msg_type = msg['msg_type'] | |
487 |
self.log.info("client::client % |
|
498 | self.log.info("client::client %r requested %r"%(client_id, msg_type)) | |
488 | handler = self.query_handlers.get(msg_type, None) |
|
499 | handler = self.query_handlers.get(msg_type, None) | |
489 | try: |
|
500 | try: | |
490 |
assert handler is not None, "Bad Message Type: % |
|
501 | assert handler is not None, "Bad Message Type: %r"%msg_type | |
491 | except: |
|
502 | except: | |
492 | content = error.wrap_exception() |
|
503 | content = error.wrap_exception() | |
493 |
self.log.error("Bad Message Type: % |
|
504 | self.log.error("Bad Message Type: %r"%msg_type, exc_info=True) | |
494 | self.session.send(self.query, "hub_error", ident=client_id, |
|
505 | self.session.send(self.query, "hub_error", ident=client_id, | |
495 | content=content) |
|
506 | content=content) | |
496 | return |
|
507 | return | |
|
508 | ||||
497 | else: |
|
509 | else: | |
498 | handler(idents, msg) |
|
510 | handler(idents, msg) | |
499 |
|
511 | |||
@@ -560,9 +572,9 class Hub(LoggingFactory): | |||||
560 | # it's posible iopub arrived first: |
|
572 | # it's posible iopub arrived first: | |
561 | existing = self.db.get_record(msg_id) |
|
573 | existing = self.db.get_record(msg_id) | |
562 | for key,evalue in existing.iteritems(): |
|
574 | for key,evalue in existing.iteritems(): | |
563 |
rvalue = record |
|
575 | rvalue = record.get(key, None) | |
564 | if evalue and rvalue and evalue != rvalue: |
|
576 | if evalue and rvalue and evalue != rvalue: | |
565 |
self.log. |
|
577 | self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue)) | |
566 | elif evalue and not rvalue: |
|
578 | elif evalue and not rvalue: | |
567 | record[key] = evalue |
|
579 | record[key] = evalue | |
568 | self.db.update_record(msg_id, record) |
|
580 | self.db.update_record(msg_id, record) | |
@@ -648,10 +660,22 class Hub(LoggingFactory): | |||||
648 | try: |
|
660 | try: | |
649 | # it's posible iopub arrived first: |
|
661 | # it's posible iopub arrived first: | |
650 | existing = self.db.get_record(msg_id) |
|
662 | existing = self.db.get_record(msg_id) | |
|
663 | if existing['resubmitted']: | |||
|
664 | for key in ('submitted', 'client_uuid', 'buffers'): | |||
|
665 | # don't clobber these keys on resubmit | |||
|
666 | # submitted and client_uuid should be different | |||
|
667 | # and buffers might be big, and shouldn't have changed | |||
|
668 | record.pop(key) | |||
|
669 | # still check content,header which should not change | |||
|
670 | # but are not expensive to compare as buffers | |||
|
671 | ||||
651 | for key,evalue in existing.iteritems(): |
|
672 | for key,evalue in existing.iteritems(): | |
652 | rvalue = record[key] |
|
673 | if key.endswith('buffers'): | |
|
674 | # don't compare buffers | |||
|
675 | continue | |||
|
676 | rvalue = record.get(key, None) | |||
653 | if evalue and rvalue and evalue != rvalue: |
|
677 | if evalue and rvalue and evalue != rvalue: | |
654 |
self.log. |
|
678 | self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue)) | |
655 | elif evalue and not rvalue: |
|
679 | elif evalue and not rvalue: | |
656 | record[key] = evalue |
|
680 | record[key] = evalue | |
657 | self.db.update_record(msg_id, record) |
|
681 | self.db.update_record(msg_id, record) | |
@@ -1042,42 +1066,99 class Hub(LoggingFactory): | |||||
1042 | except Exception: |
|
1066 | except Exception: | |
1043 | reply = error.wrap_exception() |
|
1067 | reply = error.wrap_exception() | |
1044 | else: |
|
1068 | else: | |
1045 | for msg_id in msg_ids: |
|
1069 | pending = filter(lambda m: m in self.pending, msg_ids) | |
1046 | if msg_id in self.all_completed: |
|
1070 | if pending: | |
1047 | self.db.drop_record(msg_id) |
|
1071 | try: | |
1048 | else: |
|
1072 | raise IndexError("msg pending: %r"%pending[0]) | |
1049 | if msg_id in self.pending: |
|
1073 | except: | |
1050 | try: |
|
1074 | reply = error.wrap_exception() | |
1051 | raise IndexError("msg pending: %r"%msg_id) |
|
1075 | else: | |
1052 |
|
|
1076 | try: | |
1053 | reply = error.wrap_exception() |
|
1077 | self.db.drop_matching_records(dict(msg_id={'$in':msg_ids})) | |
1054 |
|
|
1078 | except Exception: | |
|
1079 | reply = error.wrap_exception() | |||
|
1080 | ||||
|
1081 | if reply['status'] == 'ok': | |||
|
1082 | eids = content.get('engine_ids', []) | |||
|
1083 | for eid in eids: | |||
|
1084 | if eid not in self.engines: | |||
1055 | try: |
|
1085 | try: | |
1056 |
raise IndexError("No such |
|
1086 | raise IndexError("No such engine: %i"%eid) | |
1057 | except: |
|
1087 | except: | |
1058 | reply = error.wrap_exception() |
|
1088 | reply = error.wrap_exception() | |
1059 | break |
|
1089 | break | |
1060 | eids = content.get('engine_ids', []) |
|
1090 | msg_ids = self.completed.pop(eid) | |
1061 | for eid in eids: |
|
1091 | uid = self.engines[eid].queue | |
1062 | if eid not in self.engines: |
|
|||
1063 | try: |
|
1092 | try: | |
1064 | raise IndexError("No such engine: %i"%eid) |
|
1093 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) | |
1065 | except: |
|
1094 | except Exception: | |
1066 | reply = error.wrap_exception() |
|
1095 | reply = error.wrap_exception() | |
1067 | break |
|
1096 | break | |
1068 | msg_ids = self.completed.pop(eid) |
|
|||
1069 | uid = self.engines[eid].queue |
|
|||
1070 | try: |
|
|||
1071 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) |
|
|||
1072 | except Exception: |
|
|||
1073 | reply = error.wrap_exception() |
|
|||
1074 | break |
|
|||
1075 |
|
1097 | |||
1076 | self.session.send(self.query, 'purge_reply', content=reply, ident=client_id) |
|
1098 | self.session.send(self.query, 'purge_reply', content=reply, ident=client_id) | |
1077 |
|
1099 | |||
1078 |
def resubmit_task(self, client_id, msg |
|
1100 | def resubmit_task(self, client_id, msg): | |
1079 |
"""Resubmit |
|
1101 | """Resubmit one or more tasks.""" | |
1080 | raise NotImplementedError |
|
1102 | def finish(reply): | |
|
1103 | self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id) | |||
|
1104 | ||||
|
1105 | content = msg['content'] | |||
|
1106 | msg_ids = content['msg_ids'] | |||
|
1107 | reply = dict(status='ok') | |||
|
1108 | try: | |||
|
1109 | records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[ | |||
|
1110 | 'header', 'content', 'buffers']) | |||
|
1111 | except Exception: | |||
|
1112 | self.log.error('db::db error finding tasks to resubmit', exc_info=True) | |||
|
1113 | return finish(error.wrap_exception()) | |||
|
1114 | ||||
|
1115 | # validate msg_ids | |||
|
1116 | found_ids = [ rec['msg_id'] for rec in records ] | |||
|
1117 | invalid_ids = filter(lambda m: m in self.pending, found_ids) | |||
|
1118 | if len(records) > len(msg_ids): | |||
|
1119 | try: | |||
|
1120 | raise RuntimeError("DB appears to be in an inconsistent state." | |||
|
1121 | "More matching records were found than should exist") | |||
|
1122 | except Exception: | |||
|
1123 | return finish(error.wrap_exception()) | |||
|
1124 | elif len(records) < len(msg_ids): | |||
|
1125 | missing = [ m for m in msg_ids if m not in found_ids ] | |||
|
1126 | try: | |||
|
1127 | raise KeyError("No such msg(s): %s"%missing) | |||
|
1128 | except KeyError: | |||
|
1129 | return finish(error.wrap_exception()) | |||
|
1130 | elif invalid_ids: | |||
|
1131 | msg_id = invalid_ids[0] | |||
|
1132 | try: | |||
|
1133 | raise ValueError("Task %r appears to be inflight"%(msg_id)) | |||
|
1134 | except Exception: | |||
|
1135 | return finish(error.wrap_exception()) | |||
|
1136 | ||||
|
1137 | # clear the existing records | |||
|
1138 | rec = empty_record() | |||
|
1139 | map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted']) | |||
|
1140 | rec['resubmitted'] = datetime.now() | |||
|
1141 | rec['queue'] = 'task' | |||
|
1142 | rec['client_uuid'] = client_id[0] | |||
|
1143 | try: | |||
|
1144 | for msg_id in msg_ids: | |||
|
1145 | self.all_completed.discard(msg_id) | |||
|
1146 | self.db.update_record(msg_id, rec) | |||
|
1147 | except Exception: | |||
|
1148 | self.log.error('db::db error upating record', exc_info=True) | |||
|
1149 | reply = error.wrap_exception() | |||
|
1150 | else: | |||
|
1151 | # send the messages | |||
|
1152 | for rec in records: | |||
|
1153 | header = rec['header'] | |||
|
1154 | msg = self.session.msg(header['msg_type']) | |||
|
1155 | msg['content'] = rec['content'] | |||
|
1156 | msg['header'] = header | |||
|
1157 | msg['msg_id'] = rec['msg_id'] | |||
|
1158 | self.session.send(self.resubmit, msg, buffers=rec['buffers']) | |||
|
1159 | ||||
|
1160 | finish(dict(status='ok')) | |||
|
1161 | ||||
1081 |
|
1162 | |||
1082 | def _extract_record(self, rec): |
|
1163 | def _extract_record(self, rec): | |
1083 | """decompose a TaskRecord dict into subsection of reply for get_result""" |
|
1164 | """decompose a TaskRecord dict into subsection of reply for get_result""" | |
@@ -1124,12 +1205,20 class Hub(LoggingFactory): | |||||
1124 | for msg_id in msg_ids: |
|
1205 | for msg_id in msg_ids: | |
1125 | if msg_id in self.pending: |
|
1206 | if msg_id in self.pending: | |
1126 | pending.append(msg_id) |
|
1207 | pending.append(msg_id) | |
1127 |
elif msg_id in self.all_completed |
|
1208 | elif msg_id in self.all_completed: | |
1128 | completed.append(msg_id) |
|
1209 | completed.append(msg_id) | |
1129 | if not statusonly: |
|
1210 | if not statusonly: | |
1130 | c,bufs = self._extract_record(records[msg_id]) |
|
1211 | c,bufs = self._extract_record(records[msg_id]) | |
1131 | content[msg_id] = c |
|
1212 | content[msg_id] = c | |
1132 | buffers.extend(bufs) |
|
1213 | buffers.extend(bufs) | |
|
1214 | elif msg_id in records: | |||
|
1215 | if rec['completed']: | |||
|
1216 | completed.append(msg_id) | |||
|
1217 | c,bufs = self._extract_record(records[msg_id]) | |||
|
1218 | content[msg_id] = c | |||
|
1219 | buffers.extend(bufs) | |||
|
1220 | else: | |||
|
1221 | pending.append(msg_id) | |||
1133 | else: |
|
1222 | else: | |
1134 | try: |
|
1223 | try: | |
1135 | raise KeyError('No such message: '+msg_id) |
|
1224 | raise KeyError('No such message: '+msg_id) |
@@ -6,12 +6,10 | |||||
6 | # the file COPYING, distributed as part of this software. |
|
6 | # the file COPYING, distributed as part of this software. | |
7 | #----------------------------------------------------------------------------- |
|
7 | #----------------------------------------------------------------------------- | |
8 |
|
8 | |||
9 | from datetime import datetime |
|
|||
10 |
|
||||
11 | from pymongo import Connection |
|
9 | from pymongo import Connection | |
12 | from pymongo.binary import Binary |
|
10 | from pymongo.binary import Binary | |
13 |
|
11 | |||
14 | from IPython.utils.traitlets import Dict, List, CUnicode |
|
12 | from IPython.utils.traitlets import Dict, List, CUnicode, CStr, Instance | |
15 |
|
13 | |||
16 | from .dictdb import BaseDB |
|
14 | from .dictdb import BaseDB | |
17 |
|
15 | |||
@@ -25,15 +23,20 class MongoDB(BaseDB): | |||||
25 | connection_args = List(config=True) # args passed to pymongo.Connection |
|
23 | connection_args = List(config=True) # args passed to pymongo.Connection | |
26 | connection_kwargs = Dict(config=True) # kwargs passed to pymongo.Connection |
|
24 | connection_kwargs = Dict(config=True) # kwargs passed to pymongo.Connection | |
27 | database = CUnicode(config=True) # name of the mongodb database |
|
25 | database = CUnicode(config=True) # name of the mongodb database | |
28 | _table = Dict() |
|
26 | ||
|
27 | _connection = Instance(Connection) # pymongo connection | |||
29 |
|
28 | |||
30 | def __init__(self, **kwargs): |
|
29 | def __init__(self, **kwargs): | |
31 | super(MongoDB, self).__init__(**kwargs) |
|
30 | super(MongoDB, self).__init__(**kwargs) | |
32 | self._connection = Connection(*self.connection_args, **self.connection_kwargs) |
|
31 | if self._connection is None: | |
|
32 | self._connection = Connection(*self.connection_args, **self.connection_kwargs) | |||
33 | if not self.database: |
|
33 | if not self.database: | |
34 | self.database = self.session |
|
34 | self.database = self.session | |
35 | self._db = self._connection[self.database] |
|
35 | self._db = self._connection[self.database] | |
36 | self._records = self._db['task_records'] |
|
36 | self._records = self._db['task_records'] | |
|
37 | self._records.ensure_index('msg_id', unique=True) | |||
|
38 | self._records.ensure_index('submitted') # for sorting history | |||
|
39 | # for rec in self._records.find | |||
37 |
|
40 | |||
38 | def _binary_buffers(self, rec): |
|
41 | def _binary_buffers(self, rec): | |
39 | for key in ('buffers', 'result_buffers'): |
|
42 | for key in ('buffers', 'result_buffers'): | |
@@ -45,18 +48,21 class MongoDB(BaseDB): | |||||
45 | """Add a new Task Record, by msg_id.""" |
|
48 | """Add a new Task Record, by msg_id.""" | |
46 | # print rec |
|
49 | # print rec | |
47 | rec = self._binary_buffers(rec) |
|
50 | rec = self._binary_buffers(rec) | |
48 |
|
|
51 | self._records.insert(rec) | |
49 | self._table[msg_id] = obj_id |
|
|||
50 |
|
52 | |||
51 | def get_record(self, msg_id): |
|
53 | def get_record(self, msg_id): | |
52 | """Get a specific Task Record, by msg_id.""" |
|
54 | """Get a specific Task Record, by msg_id.""" | |
53 |
r |
|
55 | r = self._records.find_one({'msg_id': msg_id}) | |
|
56 | if not r: | |||
|
57 | # r will be '' if nothing is found | |||
|
58 | raise KeyError(msg_id) | |||
|
59 | return r | |||
54 |
|
60 | |||
55 | def update_record(self, msg_id, rec): |
|
61 | def update_record(self, msg_id, rec): | |
56 | """Update the data in an existing record.""" |
|
62 | """Update the data in an existing record.""" | |
57 | rec = self._binary_buffers(rec) |
|
63 | rec = self._binary_buffers(rec) | |
58 | obj_id = self._table[msg_id] |
|
64 | ||
59 |
self._records.update({'_id': |
|
65 | self._records.update({'msg_id':msg_id}, {'$set': rec}) | |
60 |
|
66 | |||
61 | def drop_matching_records(self, check): |
|
67 | def drop_matching_records(self, check): | |
62 | """Remove a record from the DB.""" |
|
68 | """Remove a record from the DB.""" | |
@@ -64,8 +70,7 class MongoDB(BaseDB): | |||||
64 |
|
70 | |||
65 | def drop_record(self, msg_id): |
|
71 | def drop_record(self, msg_id): | |
66 | """Remove a record from the DB.""" |
|
72 | """Remove a record from the DB.""" | |
67 | obj_id = self._table.pop(msg_id) |
|
73 | self._records.remove({'msg_id':msg_id}) | |
68 | self._records.remove(obj_id) |
|
|||
69 |
|
74 | |||
70 | def find_records(self, check, keys=None): |
|
75 | def find_records(self, check, keys=None): | |
71 | """Find records matching a query dict, optionally extracting subset of keys. |
|
76 | """Find records matching a query dict, optionally extracting subset of keys. |
@@ -137,6 +137,7 class TaskScheduler(SessionFactory): | |||||
137 |
|
137 | |||
138 | # internals: |
|
138 | # internals: | |
139 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] |
|
139 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] | |
|
140 | retries = Dict() # dict by msg_id of retries remaining (non-neg ints) | |||
140 | # waiting = List() # list of msg_ids ready to run, but haven't due to HWM |
|
141 | # waiting = List() # list of msg_ids ready to run, but haven't due to HWM | |
141 | depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow) |
|
142 | depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow) | |
142 | pending = Dict() # dict by engine_uuid of submitted tasks |
|
143 | pending = Dict() # dict by engine_uuid of submitted tasks | |
@@ -205,6 +206,8 class TaskScheduler(SessionFactory): | |||||
205 | self.pending[uid] = {} |
|
206 | self.pending[uid] = {} | |
206 | if len(self.targets) == 1: |
|
207 | if len(self.targets) == 1: | |
207 | self.resume_receiving() |
|
208 | self.resume_receiving() | |
|
209 | # rescan the graph: | |||
|
210 | self.update_graph(None) | |||
208 |
|
211 | |||
209 | def _unregister_engine(self, uid): |
|
212 | def _unregister_engine(self, uid): | |
210 | """Existing engine with ident `uid` became unavailable.""" |
|
213 | """Existing engine with ident `uid` became unavailable.""" | |
@@ -215,11 +218,11 class TaskScheduler(SessionFactory): | |||||
215 | # handle any potentially finished tasks: |
|
218 | # handle any potentially finished tasks: | |
216 | self.engine_stream.flush() |
|
219 | self.engine_stream.flush() | |
217 |
|
220 | |||
218 | self.completed.pop(uid) |
|
221 | # don't pop destinations, because they might be used later | |
219 | self.failed.pop(uid) |
|
|||
220 | # don't pop destinations, because it might be used later |
|
|||
221 | # map(self.destinations.pop, self.completed.pop(uid)) |
|
222 | # map(self.destinations.pop, self.completed.pop(uid)) | |
222 | # map(self.destinations.pop, self.failed.pop(uid)) |
|
223 | # map(self.destinations.pop, self.failed.pop(uid)) | |
|
224 | ||||
|
225 | # prevent this engine from receiving work | |||
223 | idx = self.targets.index(uid) |
|
226 | idx = self.targets.index(uid) | |
224 | self.targets.pop(idx) |
|
227 | self.targets.pop(idx) | |
225 | self.loads.pop(idx) |
|
228 | self.loads.pop(idx) | |
@@ -229,28 +232,40 class TaskScheduler(SessionFactory): | |||||
229 | if self.pending[uid]: |
|
232 | if self.pending[uid]: | |
230 | dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop) |
|
233 | dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop) | |
231 | dc.start() |
|
234 | dc.start() | |
|
235 | else: | |||
|
236 | self.completed.pop(uid) | |||
|
237 | self.failed.pop(uid) | |||
|
238 | ||||
232 |
|
239 | |||
233 | @logged |
|
240 | @logged | |
234 | def handle_stranded_tasks(self, engine): |
|
241 | def handle_stranded_tasks(self, engine): | |
235 | """Deal with jobs resident in an engine that died.""" |
|
242 | """Deal with jobs resident in an engine that died.""" | |
236 |
lost = self.pending |
|
243 | lost = self.pending[engine] | |
237 |
|
244 | for msg_id in lost.keys(): | ||
238 | for msg_id, (raw_msg, targets, MET, follow, timeout) in lost.iteritems(): |
|
245 | if msg_id not in self.pending[engine]: | |
239 | self.all_failed.add(msg_id) |
|
246 | # prevent double-handling of messages | |
240 | self.all_done.add(msg_id) |
|
247 | continue | |
|
248 | ||||
|
249 | raw_msg = lost[msg_id][0] | |||
|
250 | ||||
241 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
251 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | |
242 | msg = self.session.unpack_message(msg, copy=False, content=False) |
|
252 | msg = self.session.unpack_message(msg, copy=False, content=False) | |
243 | parent = msg['header'] |
|
253 | parent = msg['header'] | |
244 |
idents = [idents[0] |
|
254 | idents = [engine, idents[0]] | |
245 | # print (idents) |
|
255 | ||
|
256 | # build fake error reply | |||
246 | try: |
|
257 | try: | |
247 | raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id)) |
|
258 | raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id)) | |
248 | except: |
|
259 | except: | |
249 | content = error.wrap_exception() |
|
260 | content = error.wrap_exception() | |
250 |
msg = self.session. |
|
261 | msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'}) | |
251 | parent=parent, ident=idents) |
|
262 | raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents)) | |
252 | self.session.send(self.mon_stream, msg, ident=['outtask']+idents) |
|
263 | # and dispatch it | |
253 |
self. |
|
264 | self.dispatch_result(raw_reply) | |
|
265 | ||||
|
266 | # finally scrub completed/failed lists | |||
|
267 | self.completed.pop(engine) | |||
|
268 | self.failed.pop(engine) | |||
254 |
|
269 | |||
255 |
|
270 | |||
256 | #----------------------------------------------------------------------- |
|
271 | #----------------------------------------------------------------------- | |
@@ -277,6 +292,8 class TaskScheduler(SessionFactory): | |||||
277 |
|
292 | |||
278 | # targets |
|
293 | # targets | |
279 | targets = set(header.get('targets', [])) |
|
294 | targets = set(header.get('targets', [])) | |
|
295 | retries = header.get('retries', 0) | |||
|
296 | self.retries[msg_id] = retries | |||
280 |
|
297 | |||
281 | # time dependencies |
|
298 | # time dependencies | |
282 | after = Dependency(header.get('after', [])) |
|
299 | after = Dependency(header.get('after', [])) | |
@@ -315,7 +332,9 class TaskScheduler(SessionFactory): | |||||
315 | # time deps already met, try to run |
|
332 | # time deps already met, try to run | |
316 | if not self.maybe_run(msg_id, *args): |
|
333 | if not self.maybe_run(msg_id, *args): | |
317 | # can't run yet |
|
334 | # can't run yet | |
318 | self.save_unmet(msg_id, *args) |
|
335 | if msg_id not in self.all_failed: | |
|
336 | # could have failed as unreachable | |||
|
337 | self.save_unmet(msg_id, *args) | |||
319 | else: |
|
338 | else: | |
320 | self.save_unmet(msg_id, *args) |
|
339 | self.save_unmet(msg_id, *args) | |
321 |
|
340 | |||
@@ -328,7 +347,7 class TaskScheduler(SessionFactory): | |||||
328 | if msg_id in self.depending: |
|
347 | if msg_id in self.depending: | |
329 | raw,after,targets,follow,timeout = self.depending[msg_id] |
|
348 | raw,after,targets,follow,timeout = self.depending[msg_id] | |
330 | if timeout and timeout < now: |
|
349 | if timeout and timeout < now: | |
331 |
self.fail_unreachable(msg_id, |
|
350 | self.fail_unreachable(msg_id, error.TaskTimeout) | |
332 |
|
351 | |||
333 | @logged |
|
352 | @logged | |
334 | def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): |
|
353 | def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): | |
@@ -369,7 +388,7 class TaskScheduler(SessionFactory): | |||||
369 | # we need a can_run filter |
|
388 | # we need a can_run filter | |
370 | def can_run(idx): |
|
389 | def can_run(idx): | |
371 | # check hwm |
|
390 | # check hwm | |
372 | if self.loads[idx] == self.hwm: |
|
391 | if self.hwm and self.loads[idx] == self.hwm: | |
373 | return False |
|
392 | return False | |
374 | target = self.targets[idx] |
|
393 | target = self.targets[idx] | |
375 | # check blacklist |
|
394 | # check blacklist | |
@@ -382,6 +401,7 class TaskScheduler(SessionFactory): | |||||
382 | return follow.check(self.completed[target], self.failed[target]) |
|
401 | return follow.check(self.completed[target], self.failed[target]) | |
383 |
|
402 | |||
384 | indices = filter(can_run, range(len(self.targets))) |
|
403 | indices = filter(can_run, range(len(self.targets))) | |
|
404 | ||||
385 | if not indices: |
|
405 | if not indices: | |
386 | # couldn't run |
|
406 | # couldn't run | |
387 | if follow.all: |
|
407 | if follow.all: | |
@@ -395,12 +415,14 class TaskScheduler(SessionFactory): | |||||
395 | for m in follow.intersection(relevant): |
|
415 | for m in follow.intersection(relevant): | |
396 | dests.add(self.destinations[m]) |
|
416 | dests.add(self.destinations[m]) | |
397 | if len(dests) > 1: |
|
417 | if len(dests) > 1: | |
|
418 | self.depending[msg_id] = (raw_msg, targets, after, follow, timeout) | |||
398 | self.fail_unreachable(msg_id) |
|
419 | self.fail_unreachable(msg_id) | |
399 | return False |
|
420 | return False | |
400 | if targets: |
|
421 | if targets: | |
401 | # check blacklist+targets for impossibility |
|
422 | # check blacklist+targets for impossibility | |
402 | targets.difference_update(blacklist) |
|
423 | targets.difference_update(blacklist) | |
403 | if not targets or not targets.intersection(self.targets): |
|
424 | if not targets or not targets.intersection(self.targets): | |
|
425 | self.depending[msg_id] = (raw_msg, targets, after, follow, timeout) | |||
404 | self.fail_unreachable(msg_id) |
|
426 | self.fail_unreachable(msg_id) | |
405 | return False |
|
427 | return False | |
406 | return False |
|
428 | return False | |
@@ -454,20 +476,34 class TaskScheduler(SessionFactory): | |||||
454 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
476 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | |
455 | msg = self.session.unpack_message(msg, content=False, copy=False) |
|
477 | msg = self.session.unpack_message(msg, content=False, copy=False) | |
456 | engine = idents[0] |
|
478 | engine = idents[0] | |
457 | idx = self.targets.index(engine) |
|
479 | try: | |
458 | self.finish_job(idx) |
|
480 | idx = self.targets.index(engine) | |
|
481 | except ValueError: | |||
|
482 | pass # skip load-update for dead engines | |||
|
483 | else: | |||
|
484 | self.finish_job(idx) | |||
459 | except Exception: |
|
485 | except Exception: | |
460 | self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True) |
|
486 | self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True) | |
461 | return |
|
487 | return | |
462 |
|
488 | |||
463 | header = msg['header'] |
|
489 | header = msg['header'] | |
|
490 | parent = msg['parent_header'] | |||
464 | if header.get('dependencies_met', True): |
|
491 | if header.get('dependencies_met', True): | |
465 | success = (header['status'] == 'ok') |
|
492 | success = (header['status'] == 'ok') | |
466 | self.handle_result(idents, msg['parent_header'], raw_msg, success) |
|
493 | msg_id = parent['msg_id'] | |
467 | # send to Hub monitor |
|
494 | retries = self.retries[msg_id] | |
468 | self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False) |
|
495 | if not success and retries > 0: | |
|
496 | # failed | |||
|
497 | self.retries[msg_id] = retries - 1 | |||
|
498 | self.handle_unmet_dependency(idents, parent) | |||
|
499 | else: | |||
|
500 | del self.retries[msg_id] | |||
|
501 | # relay to client and update graph | |||
|
502 | self.handle_result(idents, parent, raw_msg, success) | |||
|
503 | # send to Hub monitor | |||
|
504 | self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False) | |||
469 | else: |
|
505 | else: | |
470 |
self.handle_unmet_dependency(idents, |
|
506 | self.handle_unmet_dependency(idents, parent) | |
471 |
|
507 | |||
472 | @logged |
|
508 | @logged | |
473 | def handle_result(self, idents, parent, raw_msg, success=True): |
|
509 | def handle_result(self, idents, parent, raw_msg, success=True): | |
@@ -511,13 +547,19 class TaskScheduler(SessionFactory): | |||||
511 | self.depending[msg_id] = args |
|
547 | self.depending[msg_id] = args | |
512 | self.fail_unreachable(msg_id) |
|
548 | self.fail_unreachable(msg_id) | |
513 | elif not self.maybe_run(msg_id, *args): |
|
549 | elif not self.maybe_run(msg_id, *args): | |
514 |
# resubmit failed |
|
550 | # resubmit failed | |
515 | self.save_unmet(msg_id, *args) |
|
551 | if msg_id not in self.all_failed: | |
|
552 | # put it back in our dependency tree | |||
|
553 | self.save_unmet(msg_id, *args) | |||
516 |
|
554 | |||
517 | if self.hwm: |
|
555 | if self.hwm: | |
518 | idx = self.targets.index(engine) |
|
556 | try: | |
519 | if self.loads[idx] == self.hwm-1: |
|
557 | idx = self.targets.index(engine) | |
520 | self.update_graph(None) |
|
558 | except ValueError: | |
|
559 | pass # skip load-update for dead engines | |||
|
560 | else: | |||
|
561 | if self.loads[idx] == self.hwm-1: | |||
|
562 | self.update_graph(None) | |||
521 |
|
563 | |||
522 |
|
564 | |||
523 |
|
565 | |||
@@ -526,7 +568,7 class TaskScheduler(SessionFactory): | |||||
526 | """dep_id just finished. Update our dependency |
|
568 | """dep_id just finished. Update our dependency | |
527 | graph and submit any jobs that just became runable. |
|
569 | graph and submit any jobs that just became runable. | |
528 |
|
570 | |||
529 | Called with dep_id=None to update graph for hwm, but without finishing |
|
571 | Called with dep_id=None to update entire graph for hwm, but without finishing | |
530 | a task. |
|
572 | a task. | |
531 | """ |
|
573 | """ | |
532 | # print ("\n\n***********") |
|
574 | # print ("\n\n***********") | |
@@ -538,9 +580,11 class TaskScheduler(SessionFactory): | |||||
538 | # print ("\n\n***********\n\n") |
|
580 | # print ("\n\n***********\n\n") | |
539 | # update any jobs that depended on the dependency |
|
581 | # update any jobs that depended on the dependency | |
540 | jobs = self.graph.pop(dep_id, []) |
|
582 | jobs = self.graph.pop(dep_id, []) | |
541 | # if we have HWM and an engine just become no longer full |
|
583 | ||
542 |
# recheck *all* jobs |
|
584 | # recheck *all* jobs if | |
543 | if self.hwm and any( [ load==self.hwm-1 for load in self.loads]): |
|
585 | # a) we have HWM and an engine just become no longer full | |
|
586 | # or b) dep_id was given as None | |||
|
587 | if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]): | |||
544 | jobs = self.depending.keys() |
|
588 | jobs = self.depending.keys() | |
545 |
|
589 | |||
546 | for msg_id in jobs: |
|
590 | for msg_id in jobs: |
@@ -27,16 +27,20 operators = { | |||||
27 | '$lt' : "<", |
|
27 | '$lt' : "<", | |
28 | '$gt' : ">", |
|
28 | '$gt' : ">", | |
29 | # null is handled weird with ==,!= |
|
29 | # null is handled weird with ==,!= | |
30 |
'$eq' : " |
|
30 | '$eq' : "=", | |
31 |
'$ne' : " |
|
31 | '$ne' : "!=", | |
32 | '$lte': "<=", |
|
32 | '$lte': "<=", | |
33 | '$gte': ">=", |
|
33 | '$gte': ">=", | |
34 |
'$in' : (' |
|
34 | '$in' : ('=', ' OR '), | |
35 |
'$nin': (' |
|
35 | '$nin': ('!=', ' AND '), | |
36 | # '$all': None, |
|
36 | # '$all': None, | |
37 | # '$mod': None, |
|
37 | # '$mod': None, | |
38 | # '$exists' : None |
|
38 | # '$exists' : None | |
39 | } |
|
39 | } | |
|
40 | null_operators = { | |||
|
41 | '=' : "IS NULL", | |||
|
42 | '!=' : "IS NOT NULL", | |||
|
43 | } | |||
40 |
|
44 | |||
41 | def _adapt_datetime(dt): |
|
45 | def _adapt_datetime(dt): | |
42 | return dt.strftime(ISO8601) |
|
46 | return dt.strftime(ISO8601) | |
@@ -205,17 +209,27 class SQLiteDB(BaseDB): | |||||
205 | raise KeyError("Unsupported operator: %r"%test) |
|
209 | raise KeyError("Unsupported operator: %r"%test) | |
206 | if isinstance(op, tuple): |
|
210 | if isinstance(op, tuple): | |
207 | op, join = op |
|
211 | op, join = op | |
208 | expr = "%s %s ?"%(name, op) |
|
212 | ||
209 | if isinstance(value, (tuple,list)): |
|
213 | if value is None and op in null_operators: | |
210 |
expr = |
|
214 | expr = "%s %s"%null_operators[op] | |
211 | args.extend(value) |
|
|||
212 | else: |
|
215 | else: | |
213 | args.append(value) |
|
216 | expr = "%s %s ?"%(name, op) | |
|
217 | if isinstance(value, (tuple,list)): | |||
|
218 | if op in null_operators and any([v is None for v in value]): | |||
|
219 | # equality tests don't work with NULL | |||
|
220 | raise ValueError("Cannot use %r test with NULL values on SQLite backend"%test) | |||
|
221 | expr = '( %s )'%( join.join([expr]*len(value)) ) | |||
|
222 | args.extend(value) | |||
|
223 | else: | |||
|
224 | args.append(value) | |||
214 | expressions.append(expr) |
|
225 | expressions.append(expr) | |
215 | else: |
|
226 | else: | |
216 | # it's an equality check |
|
227 | # it's an equality check | |
217 | expressions.append("%s IS ?"%name) |
|
228 | if sub_check is None: | |
218 |
|
|
229 | expressions.append("%s IS NULL") | |
|
230 | else: | |||
|
231 | expressions.append("%s = ?"%name) | |||
|
232 | args.append(sub_check) | |||
219 |
|
233 | |||
220 | expr = " AND ".join(expressions) |
|
234 | expr = " AND ".join(expressions) | |
221 | return expr, args |
|
235 | return expr, args |
@@ -176,6 +176,37 class StreamSession(object): | |||||
176 | header = extract_header(msg_or_header) |
|
176 | header = extract_header(msg_or_header) | |
177 | return header.get('key', None) == self.key |
|
177 | return header.get('key', None) == self.key | |
178 |
|
178 | |||
|
179 | ||||
|
180 | def serialize(self, msg, ident=None): | |||
|
181 | content = msg.get('content', {}) | |||
|
182 | if content is None: | |||
|
183 | content = self.none | |||
|
184 | elif isinstance(content, dict): | |||
|
185 | content = self.pack(content) | |||
|
186 | elif isinstance(content, bytes): | |||
|
187 | # content is already packed, as in a relayed message | |||
|
188 | pass | |||
|
189 | elif isinstance(content, unicode): | |||
|
190 | # should be bytes, but JSON often spits out unicode | |||
|
191 | content = content.encode('utf8') | |||
|
192 | else: | |||
|
193 | raise TypeError("Content incorrect type: %s"%type(content)) | |||
|
194 | ||||
|
195 | to_send = [] | |||
|
196 | ||||
|
197 | if isinstance(ident, list): | |||
|
198 | # accept list of idents | |||
|
199 | to_send.extend(ident) | |||
|
200 | elif ident is not None: | |||
|
201 | to_send.append(ident) | |||
|
202 | to_send.append(DELIM) | |||
|
203 | if self.key is not None: | |||
|
204 | to_send.append(self.key) | |||
|
205 | to_send.append(self.pack(msg['header'])) | |||
|
206 | to_send.append(self.pack(msg['parent_header'])) | |||
|
207 | to_send.append(content) | |||
|
208 | ||||
|
209 | return to_send | |||
179 |
|
210 | |||
180 | def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None, track=False): |
|
211 | def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None, track=False): | |
181 | """Build and send a message via stream or socket. |
|
212 | """Build and send a message via stream or socket. | |
@@ -221,33 +252,11 class StreamSession(object): | |||||
221 | # we got a Message, not a msg_type |
|
252 | # we got a Message, not a msg_type | |
222 | # don't build a new Message |
|
253 | # don't build a new Message | |
223 | msg = msg_or_type |
|
254 | msg = msg_or_type | |
224 | content = msg['content'] |
|
|||
225 | else: |
|
255 | else: | |
226 | msg = self.msg(msg_or_type, content, parent, subheader) |
|
256 | msg = self.msg(msg_or_type, content, parent, subheader) | |
227 |
|
257 | |||
228 | buffers = [] if buffers is None else buffers |
|
258 | buffers = [] if buffers is None else buffers | |
229 | to_send = [] |
|
259 | to_send = self.serialize(msg, ident) | |
230 | if isinstance(ident, list): |
|
|||
231 | # accept list of idents |
|
|||
232 | to_send.extend(ident) |
|
|||
233 | elif ident is not None: |
|
|||
234 | to_send.append(ident) |
|
|||
235 | to_send.append(DELIM) |
|
|||
236 | if self.key is not None: |
|
|||
237 | to_send.append(self.key) |
|
|||
238 | to_send.append(self.pack(msg['header'])) |
|
|||
239 | to_send.append(self.pack(msg['parent_header'])) |
|
|||
240 |
|
||||
241 | if content is None: |
|
|||
242 | content = self.none |
|
|||
243 | elif isinstance(content, dict): |
|
|||
244 | content = self.pack(content) |
|
|||
245 | elif isinstance(content, bytes): |
|
|||
246 | # content is already packed, as in a relayed message |
|
|||
247 | pass |
|
|||
248 | else: |
|
|||
249 | raise TypeError("Content incorrect type: %s"%type(content)) |
|
|||
250 | to_send.append(content) |
|
|||
251 | flag = 0 |
|
260 | flag = 0 | |
252 | if buffers: |
|
261 | if buffers: | |
253 | flag = zmq.SNDMORE |
|
262 | flag = zmq.SNDMORE |
@@ -48,7 +48,7 class TestProcessLauncher(LocalProcessLauncher): | |||||
48 | def setup(): |
|
48 | def setup(): | |
49 | cp = TestProcessLauncher() |
|
49 | cp = TestProcessLauncher() | |
50 | cp.cmd_and_args = ipcontroller_cmd_argv + \ |
|
50 | cp.cmd_and_args = ipcontroller_cmd_argv + \ | |
51 |
['--profile', 'iptest', '--log-level', '99', '-r' |
|
51 | ['--profile', 'iptest', '--log-level', '99', '-r'] | |
52 | cp.start() |
|
52 | cp.start() | |
53 | launchers.append(cp) |
|
53 | launchers.append(cp) | |
54 | cluster_dir = os.path.join(get_ipython_dir(), 'cluster_iptest') |
|
54 | cluster_dir = os.path.join(get_ipython_dir(), 'cluster_iptest') |
@@ -212,3 +212,33 class TestClient(ClusterTestCase): | |||||
212 | time.sleep(0.25) |
|
212 | time.sleep(0.25) | |
213 | self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids) |
|
213 | self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids) | |
214 |
|
214 | |||
|
215 | def test_resubmit(self): | |||
|
216 | def f(): | |||
|
217 | import random | |||
|
218 | return random.random() | |||
|
219 | v = self.client.load_balanced_view() | |||
|
220 | ar = v.apply_async(f) | |||
|
221 | r1 = ar.get(1) | |||
|
222 | ahr = self.client.resubmit(ar.msg_ids) | |||
|
223 | r2 = ahr.get(1) | |||
|
224 | self.assertFalse(r1 == r2) | |||
|
225 | ||||
|
226 | def test_resubmit_inflight(self): | |||
|
227 | """ensure ValueError on resubmit of inflight task""" | |||
|
228 | v = self.client.load_balanced_view() | |||
|
229 | ar = v.apply_async(time.sleep,1) | |||
|
230 | # give the message a chance to arrive | |||
|
231 | time.sleep(0.2) | |||
|
232 | self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids) | |||
|
233 | ar.get(2) | |||
|
234 | ||||
|
235 | def test_resubmit_badkey(self): | |||
|
236 | """ensure KeyError on resubmit of nonexistant task""" | |||
|
237 | self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid']) | |||
|
238 | ||||
|
239 | def test_purge_results(self): | |||
|
240 | hist = self.client.hub_history() | |||
|
241 | self.client.purge_results(hist) | |||
|
242 | newhist = self.client.hub_history() | |||
|
243 | self.assertTrue(len(newhist) == 0) | |||
|
244 |
@@ -15,10 +15,7 | |||||
15 | import tempfile |
|
15 | import tempfile | |
16 | import time |
|
16 | import time | |
17 |
|
17 | |||
18 | import uuid |
|
|||
19 |
|
||||
20 | from datetime import datetime, timedelta |
|
18 | from datetime import datetime, timedelta | |
21 | from random import choice, randint |
|
|||
22 | from unittest import TestCase |
|
19 | from unittest import TestCase | |
23 |
|
20 | |||
24 | from nose import SkipTest |
|
21 | from nose import SkipTest | |
@@ -157,6 +154,13 class TestDictBackend(TestCase): | |||||
157 | self.db.update_record(msg_id, dict(completed=datetime.now())) |
|
154 | self.db.update_record(msg_id, dict(completed=datetime.now())) | |
158 | rec = self.db.get_record(msg_id) |
|
155 | rec = self.db.get_record(msg_id) | |
159 | self.assertTrue(isinstance(rec['completed'], datetime)) |
|
156 | self.assertTrue(isinstance(rec['completed'], datetime)) | |
|
157 | ||||
|
158 | def test_drop_matching(self): | |||
|
159 | msg_ids = self.load_records(10) | |||
|
160 | query = {'msg_id' : {'$in':msg_ids}} | |||
|
161 | self.db.drop_matching_records(query) | |||
|
162 | recs = self.db.find_records(query) | |||
|
163 | self.assertTrue(len(recs)==0) | |||
160 |
|
164 | |||
161 | class TestSQLiteBackend(TestDictBackend): |
|
165 | class TestSQLiteBackend(TestDictBackend): | |
162 | def create_db(self): |
|
166 | def create_db(self): | |
@@ -164,19 +168,3 class TestSQLiteBackend(TestDictBackend): | |||||
164 |
|
168 | |||
165 | def tearDown(self): |
|
169 | def tearDown(self): | |
166 | self.db._db.close() |
|
170 | self.db._db.close() | |
167 |
|
||||
168 | # optional MongoDB test |
|
|||
169 | try: |
|
|||
170 | from IPython.parallel.controller.mongodb import MongoDB |
|
|||
171 | except ImportError: |
|
|||
172 | pass |
|
|||
173 | else: |
|
|||
174 | class TestMongoBackend(TestDictBackend): |
|
|||
175 | def create_db(self): |
|
|||
176 | try: |
|
|||
177 | return MongoDB(database='iptestdb') |
|
|||
178 | except Exception: |
|
|||
179 | raise SkipTest("Couldn't connect to mongodb instance") |
|
|||
180 |
|
||||
181 | def tearDown(self): |
|
|||
182 | self.db._connection.drop_database('iptestdb') |
|
@@ -21,7 +21,7 import zmq | |||||
21 | from IPython import parallel as pmod |
|
21 | from IPython import parallel as pmod | |
22 | from IPython.parallel import error |
|
22 | from IPython.parallel import error | |
23 | from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult |
|
23 | from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult | |
24 |
from IPython.parallel import |
|
24 | from IPython.parallel import DirectView | |
25 | from IPython.parallel.util import interactive |
|
25 | from IPython.parallel.util import interactive | |
26 |
|
26 | |||
27 | from IPython.parallel.tests import add_engines |
|
27 | from IPython.parallel.tests import add_engines | |
@@ -33,18 +33,6 def setup(): | |||||
33 |
|
33 | |||
34 | class TestView(ClusterTestCase): |
|
34 | class TestView(ClusterTestCase): | |
35 |
|
35 | |||
36 | def test_z_crash_task(self): |
|
|||
37 | """test graceful handling of engine death (balanced)""" |
|
|||
38 | # self.add_engines(1) |
|
|||
39 | ar = self.client[-1].apply_async(crash) |
|
|||
40 | self.assertRaisesRemote(error.EngineError, ar.get) |
|
|||
41 | eid = ar.engine_id |
|
|||
42 | tic = time.time() |
|
|||
43 | while eid in self.client.ids and time.time()-tic < 5: |
|
|||
44 | time.sleep(.01) |
|
|||
45 | self.client.spin() |
|
|||
46 | self.assertFalse(eid in self.client.ids, "Engine should have died") |
|
|||
47 |
|
||||
48 | def test_z_crash_mux(self): |
|
36 | def test_z_crash_mux(self): | |
49 | """test graceful handling of engine death (direct)""" |
|
37 | """test graceful handling of engine death (direct)""" | |
50 | # self.add_engines(1) |
|
38 | # self.add_engines(1) |
@@ -199,6 +199,7 def make_exclude(): | |||||
199 |
|
199 | |||
200 | if not have['pymongo']: |
|
200 | if not have['pymongo']: | |
201 | exclusions.append(ipjoin('parallel', 'controller', 'mongodb')) |
|
201 | exclusions.append(ipjoin('parallel', 'controller', 'mongodb')) | |
|
202 | exclusions.append(ipjoin('parallel', 'tests', 'test_mongodb')) | |||
202 |
|
203 | |||
203 | if not have['matplotlib']: |
|
204 | if not have['matplotlib']: | |
204 | exclusions.extend([ipjoin('lib', 'pylabtools'), |
|
205 | exclusions.extend([ipjoin('lib', 'pylabtools'), |
@@ -12,6 +12,7 Using IPython for parallel computing | |||||
12 | parallel_multiengine.txt |
|
12 | parallel_multiengine.txt | |
13 | parallel_task.txt |
|
13 | parallel_task.txt | |
14 | parallel_mpi.txt |
|
14 | parallel_mpi.txt | |
|
15 | parallel_db.txt | |||
15 | parallel_security.txt |
|
16 | parallel_security.txt | |
16 | parallel_winhpc.txt |
|
17 | parallel_winhpc.txt | |
17 | parallel_demos.txt |
|
18 | parallel_demos.txt |
@@ -292,6 +292,7 you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje | |||||
292 |
|
292 | |||
293 |
|
293 | |||
294 |
|
294 | |||
|
295 | ||||
295 | Impossible Dependencies |
|
296 | Impossible Dependencies | |
296 | *********************** |
|
297 | *********************** | |
297 |
|
298 | |||
@@ -313,6 +314,27 The basic cases that are checked: | |||||
313 | This analysis has not been proven to be rigorous, so it is likely possible for tasks |
|
314 | This analysis has not been proven to be rigorous, so it is likely possible for tasks | |
314 | to become impossible to run in obscure situations, so a timeout may be a good choice. |
|
315 | to become impossible to run in obscure situations, so a timeout may be a good choice. | |
315 |
|
316 | |||
|
317 | ||||
|
318 | Retries and Resubmit | |||
|
319 | ==================== | |||
|
320 | ||||
|
321 | Retries | |||
|
322 | ------- | |||
|
323 | ||||
|
324 | Another flag for tasks is `retries`. This is an integer, specifying how many times | |||
|
325 | a task should be resubmitted after failure. This is useful for tasks that should still run | |||
|
326 | if their engine was shutdown, or may have some statistical chance of failing. The default | |||
|
327 | is to not retry tasks. | |||
|
328 | ||||
|
329 | Resubmit | |||
|
330 | -------- | |||
|
331 | ||||
|
332 | Sometimes you may want to re-run a task. This could be because it failed for some reason, and | |||
|
333 | you have fixed the error, or because you want to restore the cluster to an interrupted state. | |||
|
334 | For this, the :class:`Client` has a :meth:`rc.resubmit` method. This simply takes one or more | |||
|
335 | msg_ids, and returns an :class:`AsyncHubResult` for the result(s). You cannot resubmit | |||
|
336 | a task that is pending - only those that have finished, either successful or unsuccessful. | |||
|
337 | ||||
316 | .. _parallel_schedulers: |
|
338 | .. _parallel_schedulers: | |
317 |
|
339 | |||
318 | Schedulers |
|
340 | Schedulers | |
@@ -391,6 +413,8 Disabled features when using the ZMQ Scheduler: | |||||
391 | TODO: performance comparisons |
|
413 | TODO: performance comparisons | |
392 |
|
414 | |||
393 |
|
415 | |||
|
416 | ||||
|
417 | ||||
394 | More details |
|
418 | More details | |
395 | ============ |
|
419 | ============ | |
396 |
|
420 |
General Comments 0
You need to be logged in to leave comments.
Login now