##// END OF EJS Templates
Merge PR #413...
MinRK -
r3882:50deb546 merge
parent child Browse files
Show More
@@ -0,0 +1,120 b''
1 """test LoadBalancedView objects"""
2 # -*- coding: utf-8 -*-
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
10 #-------------------------------------------------------------------------------
11 # Imports
12 #-------------------------------------------------------------------------------
13
14 import sys
15 import time
16
17 import zmq
18
19 from IPython import parallel as pmod
20 from IPython.parallel import error
21
22 from IPython.parallel.tests import add_engines
23
24 from .clienttest import ClusterTestCase, crash, wait, skip_without
25
26 def setup():
27 add_engines(3)
28
29 class TestLoadBalancedView(ClusterTestCase):
30
31 def setUp(self):
32 ClusterTestCase.setUp(self)
33 self.view = self.client.load_balanced_view()
34
35 def test_z_crash_task(self):
36 """test graceful handling of engine death (balanced)"""
37 # self.add_engines(1)
38 ar = self.view.apply_async(crash)
39 self.assertRaisesRemote(error.EngineError, ar.get, 10)
40 eid = ar.engine_id
41 tic = time.time()
42 while eid in self.client.ids and time.time()-tic < 5:
43 time.sleep(.01)
44 self.client.spin()
45 self.assertFalse(eid in self.client.ids, "Engine should have died")
46
47 def test_map(self):
48 def f(x):
49 return x**2
50 data = range(16)
51 r = self.view.map_sync(f, data)
52 self.assertEquals(r, map(f, data))
53
54 def test_abort(self):
55 view = self.view
56 ar = self.client[:].apply_async(time.sleep, .5)
57 ar2 = view.apply_async(lambda : 2)
58 ar3 = view.apply_async(lambda : 3)
59 view.abort(ar2)
60 view.abort(ar3.msg_ids)
61 self.assertRaises(error.TaskAborted, ar2.get)
62 self.assertRaises(error.TaskAborted, ar3.get)
63
64 def test_retries(self):
65 add_engines(3)
66 view = self.view
67 view.timeout = 1 # prevent hang if this doesn't behave
68 def fail():
69 assert False
70 for r in range(len(self.client)-1):
71 with view.temp_flags(retries=r):
72 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
73
74 with view.temp_flags(retries=len(self.client), timeout=0.25):
75 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
76
77 def test_invalid_dependency(self):
78 view = self.view
79 with view.temp_flags(after='12345'):
80 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
81
82 def test_impossible_dependency(self):
83 if len(self.client) < 2:
84 add_engines(2)
85 view = self.client.load_balanced_view()
86 ar1 = view.apply_async(lambda : 1)
87 ar1.get()
88 e1 = ar1.engine_id
89 e2 = e1
90 while e2 == e1:
91 ar2 = view.apply_async(lambda : 1)
92 ar2.get()
93 e2 = ar2.engine_id
94
95 with view.temp_flags(follow=[ar1, ar2]):
96 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
97
98
99 def test_follow(self):
100 ar = self.view.apply_async(lambda : 1)
101 ar.get()
102 ars = []
103 first_id = ar.engine_id
104
105 self.view.follow = ar
106 for i in range(5):
107 ars.append(self.view.apply_async(lambda : 1))
108 self.view.wait(ars)
109 for ar in ars:
110 self.assertEquals(ar.engine_id, first_id)
111
112 def test_after(self):
113 view = self.view
114 ar = view.apply_async(time.sleep, 0.5)
115 with view.temp_flags(after=ar):
116 ar2 = view.apply_async(lambda : 1)
117
118 ar.wait()
119 ar2.wait()
120 self.assertTrue(ar2.started > ar.completed)
@@ -0,0 +1,37 b''
1 """Tests for mongodb backend"""
2
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
10 #-------------------------------------------------------------------------------
11 # Imports
12 #-------------------------------------------------------------------------------
13
14 from nose import SkipTest
15
16 from pymongo import Connection
17 from IPython.parallel.controller.mongodb import MongoDB
18
19 from . import test_db
20
21 try:
22 c = Connection()
23 except Exception:
24 c=None
25
26 class TestMongoBackend(test_db.TestDictBackend):
27 """MongoDB backend tests"""
28
29 def create_db(self):
30 try:
31 return MongoDB(database='iptestdb', _connection=c)
32 except Exception:
33 raise SkipTest("Couldn't connect to mongodb")
34
35 def teardown(self):
36 if c is not None:
37 c.drop_database('iptestdb')
@@ -0,0 +1,114 b''
1 .. _parallel_db:
2
3 =======================
4 IPython's Task Database
5 =======================
6
7 The IPython Hub stores all task requests and results in a database. Currently supported backends
8 are: MongoDB, SQLite (the default), and an in-memory DictDB. The most common use case for
9 this is clients requesting results for tasks they did not submit, via:
10
11 .. sourcecode:: ipython
12
13 In [1]: rc.get_result(task_id)
14
15 However, since we have this DB backend, we provide a direct query method in the :class:`client`
16 for users who want deeper introspection into their task history. The :meth:`db_query` method of
17 the Client is modeled after MongoDB queries, so if you have used MongoDB it should look
18 familiar. In fact, when the MongoDB backend is in use, the query is relayed directly. However,
19 when using other backends, the interface is emulated and only a subset of queries is possible.
20
21 .. seealso::
22
23 MongoDB query docs: http://www.mongodb.org/display/DOCS/Querying
24
25 :meth:`Client.db_query` takes a dictionary query object, with keys from the TaskRecord key list,
26 and values of either exact values to test, or MongoDB queries, which are dicts of The form:
27 ``{'operator' : 'argument(s)'}``. There is also an optional `keys` argument, that specifies
28 which subset of keys should be retrieved. The default is to retrieve all keys excluding the
29 request and result buffers. :meth:`db_query` returns a list of TaskRecord dicts. Also like
30 MongoDB, the `msg_id` key will always be included, whether requested or not.
31
32 TaskRecord keys:
33
34 =============== =============== =============
35 Key Type Description
36 =============== =============== =============
37 msg_id uuid(bytes) The msg ID
38 header dict The request header
39 content dict The request content (likely empty)
40 buffers list(bytes) buffers containing serialized request objects
41 submitted datetime timestamp for time of submission (set by client)
42 client_uuid uuid(bytes) IDENT of client's socket
43 engine_uuid uuid(bytes) IDENT of engine's socket
44 started datetime time task began execution on engine
45 completed datetime time task finished execution (success or failure) on engine
46 resubmitted datetime time of resubmission (if applicable)
47 result_header dict header for result
48 result_content dict content for result
49 result_buffers list(bytes) buffers containing serialized request objects
50 queue bytes The name of the queue for the task ('mux' or 'task')
51 pyin <unused> Python input (unused)
52 pyout <unused> Python output (unused)
53 pyerr <unused> Python traceback (unused)
54 stdout str Stream of stdout data
55 stderr str Stream of stderr data
56
57 =============== =============== =============
58
59 MongoDB operators we emulate on all backends:
60
61 ========== =================
62 Operator Python equivalent
63 ========== =================
64 '$in' in
65 '$nin' not in
66 '$eq' ==
67 '$ne' !=
68 '$ge' >
69 '$gte' >=
70 '$le' <
71 '$lte' <=
72 ========== =================
73
74
75 The DB Query is useful for two primary cases:
76
77 1. deep polling of task status or metadata
78 2. selecting a subset of tasks, on which to perform a later operation (e.g. wait on result, purge records, resubmit,...)
79
80 Example Queries
81 ===============
82
83
84 To get all msg_ids that are not completed, only retrieving their ID and start time:
85
86 .. sourcecode:: ipython
87
88 In [1]: incomplete = rc.db_query({'complete' : None}, keys=['msg_id', 'started'])
89
90 All jobs started in the last hour by me:
91
92 .. sourcecode:: ipython
93
94 In [1]: from datetime import datetime, timedelta
95
96 In [2]: hourago = datetime.now() - timedelta(1./24)
97
98 In [3]: recent = rc.db_query({'started' : {'$gte' : hourago },
99 'client_uuid' : rc.session.session})
100
101 All jobs started more than an hour ago, by clients *other than me*:
102
103 .. sourcecode:: ipython
104
105 In [3]: recent = rc.db_query({'started' : {'$le' : hourago },
106 'client_uuid' : {'$ne' : rc.session.session}})
107
108 Result headers for all jobs on engine 3 or 4:
109
110 .. sourcecode:: ipython
111
112 In [1]: uuids = map(rc._engines.get, (3,4))
113
114 In [2]: hist34 = rc.db_query({'engine_uuid' : {'$in' : uuids }, keys='result_header')
@@ -1041,6 +1041,68 b' class Client(HasTraits):'
1041 ar.wait()
1041 ar.wait()
1042
1042
1043 return ar
1043 return ar
1044
1045 @spin_first
1046 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1047 """Resubmit one or more tasks.
1048
1049 in-flight tasks may not be resubmitted.
1050
1051 Parameters
1052 ----------
1053
1054 indices_or_msg_ids : integer history index, str msg_id, or list of either
1055 The indices or msg_ids of indices to be retrieved
1056
1057 block : bool
1058 Whether to wait for the result to be done
1059
1060 Returns
1061 -------
1062
1063 AsyncHubResult
1064 A subclass of AsyncResult that retrieves results from the Hub
1065
1066 """
1067 block = self.block if block is None else block
1068 if indices_or_msg_ids is None:
1069 indices_or_msg_ids = -1
1070
1071 if not isinstance(indices_or_msg_ids, (list,tuple)):
1072 indices_or_msg_ids = [indices_or_msg_ids]
1073
1074 theids = []
1075 for id in indices_or_msg_ids:
1076 if isinstance(id, int):
1077 id = self.history[id]
1078 if not isinstance(id, str):
1079 raise TypeError("indices must be str or int, not %r"%id)
1080 theids.append(id)
1081
1082 for msg_id in theids:
1083 self.outstanding.discard(msg_id)
1084 if msg_id in self.history:
1085 self.history.remove(msg_id)
1086 self.results.pop(msg_id, None)
1087 self.metadata.pop(msg_id, None)
1088 content = dict(msg_ids = theids)
1089
1090 self.session.send(self._query_socket, 'resubmit_request', content)
1091
1092 zmq.select([self._query_socket], [], [])
1093 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1094 if self.debug:
1095 pprint(msg)
1096 content = msg['content']
1097 if content['status'] != 'ok':
1098 raise self._unwrap_exception(content)
1099
1100 ar = AsyncHubResult(self, msg_ids=theids)
1101
1102 if block:
1103 ar.wait()
1104
1105 return ar
1044
1106
1045 @spin_first
1107 @spin_first
1046 def result_status(self, msg_ids, status_only=True):
1108 def result_status(self, msg_ids, status_only=True):
@@ -1255,9 +1317,11 b' class Client(HasTraits):'
1255 query : mongodb query dict
1317 query : mongodb query dict
1256 The search dict. See mongodb query docs for details.
1318 The search dict. See mongodb query docs for details.
1257 keys : list of strs [optional]
1319 keys : list of strs [optional]
1258 THe subset of keys to be returned. The default is to fetch everything.
1320 The subset of keys to be returned. The default is to fetch everything but buffers.
1259 'msg_id' will *always* be included.
1321 'msg_id' will *always* be included.
1260 """
1322 """
1323 if isinstance(keys, basestring):
1324 keys = [keys]
1261 content = dict(query=query, keys=keys)
1325 content = dict(query=query, keys=keys)
1262 self.session.send(self._query_socket, "db_request", content=content)
1326 self.session.send(self._query_socket, "db_request", content=content)
1263 idents, msg = self.session.recv(self._query_socket, 0)
1327 idents, msg = self.session.recv(self._query_socket, 0)
@@ -19,7 +19,7 b' from types import ModuleType'
19 import zmq
19 import zmq
20
20
21 from IPython.testing import decorators as testdec
21 from IPython.testing import decorators as testdec
22 from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance, CFloat
22 from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance, CFloat, CInt
23
23
24 from IPython.external.decorator import decorator
24 from IPython.external.decorator import decorator
25
25
@@ -791,9 +791,10 b' class LoadBalancedView(View):'
791 follow=Any()
791 follow=Any()
792 after=Any()
792 after=Any()
793 timeout=CFloat()
793 timeout=CFloat()
794 retries = CInt(0)
794
795
795 _task_scheme = Any()
796 _task_scheme = Any()
796 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout'])
797 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
797
798
798 def __init__(self, client=None, socket=None, **flags):
799 def __init__(self, client=None, socket=None, **flags):
799 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
800 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
@@ -851,7 +852,7 b' class LoadBalancedView(View):'
851 whether to create a MessageTracker to allow the user to
852 whether to create a MessageTracker to allow the user to
852 safely edit after arrays and buffers during non-copying
853 safely edit after arrays and buffers during non-copying
853 sends.
854 sends.
854 #
855
855 after : Dependency or collection of msg_ids
856 after : Dependency or collection of msg_ids
856 Only for load-balanced execution (targets=None)
857 Only for load-balanced execution (targets=None)
857 Specify a list of msg_ids as a time-based dependency.
858 Specify a list of msg_ids as a time-based dependency.
@@ -869,6 +870,9 b' class LoadBalancedView(View):'
869 Specify an amount of time (in seconds) for the scheduler to
870 Specify an amount of time (in seconds) for the scheduler to
870 wait for dependencies to be met before failing with a
871 wait for dependencies to be met before failing with a
871 DependencyTimeout.
872 DependencyTimeout.
873
874 retries : int
875 Number of times a task will be retried on failure.
872 """
876 """
873
877
874 super(LoadBalancedView, self).set_flags(**kwargs)
878 super(LoadBalancedView, self).set_flags(**kwargs)
@@ -892,7 +896,7 b' class LoadBalancedView(View):'
892 @save_ids
896 @save_ids
893 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
897 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
894 after=None, follow=None, timeout=None,
898 after=None, follow=None, timeout=None,
895 targets=None):
899 targets=None, retries=None):
896 """calls f(*args, **kwargs) on a remote engine, returning the result.
900 """calls f(*args, **kwargs) on a remote engine, returning the result.
897
901
898 This method temporarily sets all of `apply`'s flags for a single call.
902 This method temporarily sets all of `apply`'s flags for a single call.
@@ -933,10 +937,11 b' class LoadBalancedView(View):'
933 raise RuntimeError(msg)
937 raise RuntimeError(msg)
934
938
935 if self._task_scheme == 'pure':
939 if self._task_scheme == 'pure':
936 # pure zmq scheme doesn't support dependencies
940 # pure zmq scheme doesn't support extra features
937 msg = "Pure ZMQ scheduler doesn't support dependencies"
941 msg = "Pure ZMQ scheduler doesn't support the following flags:"
938 if (follow or after):
942 "follow, after, retries, targets, timeout"
939 # hard fail on DAG dependencies
943 if (follow or after or retries or targets or timeout):
944 # hard fail on Scheduler flags
940 raise RuntimeError(msg)
945 raise RuntimeError(msg)
941 if isinstance(f, dependent):
946 if isinstance(f, dependent):
942 # soft warn on functional dependencies
947 # soft warn on functional dependencies
@@ -948,10 +953,14 b' class LoadBalancedView(View):'
948 block = self.block if block is None else block
953 block = self.block if block is None else block
949 track = self.track if track is None else track
954 track = self.track if track is None else track
950 after = self.after if after is None else after
955 after = self.after if after is None else after
956 retries = self.retries if retries is None else retries
951 follow = self.follow if follow is None else follow
957 follow = self.follow if follow is None else follow
952 timeout = self.timeout if timeout is None else timeout
958 timeout = self.timeout if timeout is None else timeout
953 targets = self.targets if targets is None else targets
959 targets = self.targets if targets is None else targets
954
960
961 if not isinstance(retries, int):
962 raise TypeError('retries must be int, not %r'%type(retries))
963
955 if targets is None:
964 if targets is None:
956 idents = []
965 idents = []
957 else:
966 else:
@@ -959,7 +968,7 b' class LoadBalancedView(View):'
959
968
960 after = self._render_dependency(after)
969 after = self._render_dependency(after)
961 follow = self._render_dependency(follow)
970 follow = self._render_dependency(follow)
962 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
971 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
963
972
964 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
973 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
965 subheader=subheader)
974 subheader=subheader)
@@ -146,7 +146,7 b' class DictDB(BaseDB):'
146 """Remove a record from the DB."""
146 """Remove a record from the DB."""
147 matches = self._match(check)
147 matches = self._match(check)
148 for m in matches:
148 for m in matches:
149 del self._records[m]
149 del self._records[m['msg_id']]
150
150
151 def drop_record(self, msg_id):
151 def drop_record(self, msg_id):
152 """Remove a record from the DB."""
152 """Remove a record from the DB."""
@@ -268,8 +268,15 b' class HubFactory(RegistrationFactory):'
268 }
268 }
269 self.log.debug("Hub engine addrs: %s"%self.engine_info)
269 self.log.debug("Hub engine addrs: %s"%self.engine_info)
270 self.log.debug("Hub client addrs: %s"%self.client_info)
270 self.log.debug("Hub client addrs: %s"%self.client_info)
271
272 # resubmit stream
273 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
274 url = util.disambiguate_url(self.client_info['task'][-1])
275 r.setsockopt(zmq.IDENTITY, self.session.session)
276 r.connect(url)
277
271 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
278 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
272 query=q, notifier=n, db=self.db,
279 query=q, notifier=n, resubmit=r, db=self.db,
273 engine_info=self.engine_info, client_info=self.client_info,
280 engine_info=self.engine_info, client_info=self.client_info,
274 logname=self.log.name)
281 logname=self.log.name)
275
282
@@ -315,8 +322,9 b' class Hub(LoggingFactory):'
315 loop=Instance(ioloop.IOLoop)
322 loop=Instance(ioloop.IOLoop)
316 query=Instance(ZMQStream)
323 query=Instance(ZMQStream)
317 monitor=Instance(ZMQStream)
324 monitor=Instance(ZMQStream)
318 heartmonitor=Instance(HeartMonitor)
319 notifier=Instance(ZMQStream)
325 notifier=Instance(ZMQStream)
326 resubmit=Instance(ZMQStream)
327 heartmonitor=Instance(HeartMonitor)
320 db=Instance(object)
328 db=Instance(object)
321 client_info=Dict()
329 client_info=Dict()
322 engine_info=Dict()
330 engine_info=Dict()
@@ -379,6 +387,9 b' class Hub(LoggingFactory):'
379 'connection_request': self.connection_request,
387 'connection_request': self.connection_request,
380 }
388 }
381
389
390 # ignore resubmit replies
391 self.resubmit.on_recv(lambda msg: None, copy=False)
392
382 self.log.info("hub::created hub")
393 self.log.info("hub::created hub")
383
394
384 @property
395 @property
@@ -452,31 +463,31 b' class Hub(LoggingFactory):'
452 def dispatch_monitor_traffic(self, msg):
463 def dispatch_monitor_traffic(self, msg):
453 """all ME and Task queue messages come through here, as well as
464 """all ME and Task queue messages come through here, as well as
454 IOPub traffic."""
465 IOPub traffic."""
455 self.log.debug("monitor traffic: %s"%msg[:2])
466 self.log.debug("monitor traffic: %r"%msg[:2])
456 switch = msg[0]
467 switch = msg[0]
457 idents, msg = self.session.feed_identities(msg[1:])
468 idents, msg = self.session.feed_identities(msg[1:])
458 if not idents:
469 if not idents:
459 self.log.error("Bad Monitor Message: %s"%msg)
470 self.log.error("Bad Monitor Message: %r"%msg)
460 return
471 return
461 handler = self.monitor_handlers.get(switch, None)
472 handler = self.monitor_handlers.get(switch, None)
462 if handler is not None:
473 if handler is not None:
463 handler(idents, msg)
474 handler(idents, msg)
464 else:
475 else:
465 self.log.error("Invalid monitor topic: %s"%switch)
476 self.log.error("Invalid monitor topic: %r"%switch)
466
477
467
478
468 def dispatch_query(self, msg):
479 def dispatch_query(self, msg):
469 """Route registration requests and queries from clients."""
480 """Route registration requests and queries from clients."""
470 idents, msg = self.session.feed_identities(msg)
481 idents, msg = self.session.feed_identities(msg)
471 if not idents:
482 if not idents:
472 self.log.error("Bad Query Message: %s"%msg)
483 self.log.error("Bad Query Message: %r"%msg)
473 return
484 return
474 client_id = idents[0]
485 client_id = idents[0]
475 try:
486 try:
476 msg = self.session.unpack_message(msg, content=True)
487 msg = self.session.unpack_message(msg, content=True)
477 except:
488 except:
478 content = error.wrap_exception()
489 content = error.wrap_exception()
479 self.log.error("Bad Query Message: %s"%msg, exc_info=True)
490 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
480 self.session.send(self.query, "hub_error", ident=client_id,
491 self.session.send(self.query, "hub_error", ident=client_id,
481 content=content)
492 content=content)
482 return
493 return
@@ -484,16 +495,17 b' class Hub(LoggingFactory):'
484 # print client_id, header, parent, content
495 # print client_id, header, parent, content
485 #switch on message type:
496 #switch on message type:
486 msg_type = msg['msg_type']
497 msg_type = msg['msg_type']
487 self.log.info("client::client %s requested %s"%(client_id, msg_type))
498 self.log.info("client::client %r requested %r"%(client_id, msg_type))
488 handler = self.query_handlers.get(msg_type, None)
499 handler = self.query_handlers.get(msg_type, None)
489 try:
500 try:
490 assert handler is not None, "Bad Message Type: %s"%msg_type
501 assert handler is not None, "Bad Message Type: %r"%msg_type
491 except:
502 except:
492 content = error.wrap_exception()
503 content = error.wrap_exception()
493 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
504 self.log.error("Bad Message Type: %r"%msg_type, exc_info=True)
494 self.session.send(self.query, "hub_error", ident=client_id,
505 self.session.send(self.query, "hub_error", ident=client_id,
495 content=content)
506 content=content)
496 return
507 return
508
497 else:
509 else:
498 handler(idents, msg)
510 handler(idents, msg)
499
511
@@ -560,9 +572,9 b' class Hub(LoggingFactory):'
560 # it's posible iopub arrived first:
572 # it's posible iopub arrived first:
561 existing = self.db.get_record(msg_id)
573 existing = self.db.get_record(msg_id)
562 for key,evalue in existing.iteritems():
574 for key,evalue in existing.iteritems():
563 rvalue = record[key]
575 rvalue = record.get(key, None)
564 if evalue and rvalue and evalue != rvalue:
576 if evalue and rvalue and evalue != rvalue:
565 self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
577 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
566 elif evalue and not rvalue:
578 elif evalue and not rvalue:
567 record[key] = evalue
579 record[key] = evalue
568 self.db.update_record(msg_id, record)
580 self.db.update_record(msg_id, record)
@@ -648,10 +660,22 b' class Hub(LoggingFactory):'
648 try:
660 try:
649 # it's posible iopub arrived first:
661 # it's posible iopub arrived first:
650 existing = self.db.get_record(msg_id)
662 existing = self.db.get_record(msg_id)
663 if existing['resubmitted']:
664 for key in ('submitted', 'client_uuid', 'buffers'):
665 # don't clobber these keys on resubmit
666 # submitted and client_uuid should be different
667 # and buffers might be big, and shouldn't have changed
668 record.pop(key)
669 # still check content,header which should not change
670 # but are not expensive to compare as buffers
671
651 for key,evalue in existing.iteritems():
672 for key,evalue in existing.iteritems():
652 rvalue = record[key]
673 if key.endswith('buffers'):
674 # don't compare buffers
675 continue
676 rvalue = record.get(key, None)
653 if evalue and rvalue and evalue != rvalue:
677 if evalue and rvalue and evalue != rvalue:
654 self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
678 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
655 elif evalue and not rvalue:
679 elif evalue and not rvalue:
656 record[key] = evalue
680 record[key] = evalue
657 self.db.update_record(msg_id, record)
681 self.db.update_record(msg_id, record)
@@ -1042,42 +1066,99 b' class Hub(LoggingFactory):'
1042 except Exception:
1066 except Exception:
1043 reply = error.wrap_exception()
1067 reply = error.wrap_exception()
1044 else:
1068 else:
1045 for msg_id in msg_ids:
1069 pending = filter(lambda m: m in self.pending, msg_ids)
1046 if msg_id in self.all_completed:
1070 if pending:
1047 self.db.drop_record(msg_id)
1071 try:
1048 else:
1072 raise IndexError("msg pending: %r"%pending[0])
1049 if msg_id in self.pending:
1073 except:
1050 try:
1074 reply = error.wrap_exception()
1051 raise IndexError("msg pending: %r"%msg_id)
1075 else:
1052 except:
1076 try:
1053 reply = error.wrap_exception()
1077 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1054 else:
1078 except Exception:
1079 reply = error.wrap_exception()
1080
1081 if reply['status'] == 'ok':
1082 eids = content.get('engine_ids', [])
1083 for eid in eids:
1084 if eid not in self.engines:
1055 try:
1085 try:
1056 raise IndexError("No such msg: %r"%msg_id)
1086 raise IndexError("No such engine: %i"%eid)
1057 except:
1087 except:
1058 reply = error.wrap_exception()
1088 reply = error.wrap_exception()
1059 break
1089 break
1060 eids = content.get('engine_ids', [])
1090 msg_ids = self.completed.pop(eid)
1061 for eid in eids:
1091 uid = self.engines[eid].queue
1062 if eid not in self.engines:
1063 try:
1092 try:
1064 raise IndexError("No such engine: %i"%eid)
1093 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1065 except:
1094 except Exception:
1066 reply = error.wrap_exception()
1095 reply = error.wrap_exception()
1067 break
1096 break
1068 msg_ids = self.completed.pop(eid)
1069 uid = self.engines[eid].queue
1070 try:
1071 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1072 except Exception:
1073 reply = error.wrap_exception()
1074 break
1075
1097
1076 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1098 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1077
1099
1078 def resubmit_task(self, client_id, msg, buffers):
1100 def resubmit_task(self, client_id, msg):
1079 """Resubmit a task."""
1101 """Resubmit one or more tasks."""
1080 raise NotImplementedError
1102 def finish(reply):
1103 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1104
1105 content = msg['content']
1106 msg_ids = content['msg_ids']
1107 reply = dict(status='ok')
1108 try:
1109 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1110 'header', 'content', 'buffers'])
1111 except Exception:
1112 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1113 return finish(error.wrap_exception())
1114
1115 # validate msg_ids
1116 found_ids = [ rec['msg_id'] for rec in records ]
1117 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1118 if len(records) > len(msg_ids):
1119 try:
1120 raise RuntimeError("DB appears to be in an inconsistent state."
1121 "More matching records were found than should exist")
1122 except Exception:
1123 return finish(error.wrap_exception())
1124 elif len(records) < len(msg_ids):
1125 missing = [ m for m in msg_ids if m not in found_ids ]
1126 try:
1127 raise KeyError("No such msg(s): %s"%missing)
1128 except KeyError:
1129 return finish(error.wrap_exception())
1130 elif invalid_ids:
1131 msg_id = invalid_ids[0]
1132 try:
1133 raise ValueError("Task %r appears to be inflight"%(msg_id))
1134 except Exception:
1135 return finish(error.wrap_exception())
1136
1137 # clear the existing records
1138 rec = empty_record()
1139 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1140 rec['resubmitted'] = datetime.now()
1141 rec['queue'] = 'task'
1142 rec['client_uuid'] = client_id[0]
1143 try:
1144 for msg_id in msg_ids:
1145 self.all_completed.discard(msg_id)
1146 self.db.update_record(msg_id, rec)
1147 except Exception:
1148 self.log.error('db::db error upating record', exc_info=True)
1149 reply = error.wrap_exception()
1150 else:
1151 # send the messages
1152 for rec in records:
1153 header = rec['header']
1154 msg = self.session.msg(header['msg_type'])
1155 msg['content'] = rec['content']
1156 msg['header'] = header
1157 msg['msg_id'] = rec['msg_id']
1158 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1159
1160 finish(dict(status='ok'))
1161
1081
1162
1082 def _extract_record(self, rec):
1163 def _extract_record(self, rec):
1083 """decompose a TaskRecord dict into subsection of reply for get_result"""
1164 """decompose a TaskRecord dict into subsection of reply for get_result"""
@@ -1124,12 +1205,20 b' class Hub(LoggingFactory):'
1124 for msg_id in msg_ids:
1205 for msg_id in msg_ids:
1125 if msg_id in self.pending:
1206 if msg_id in self.pending:
1126 pending.append(msg_id)
1207 pending.append(msg_id)
1127 elif msg_id in self.all_completed or msg_id in records:
1208 elif msg_id in self.all_completed:
1128 completed.append(msg_id)
1209 completed.append(msg_id)
1129 if not statusonly:
1210 if not statusonly:
1130 c,bufs = self._extract_record(records[msg_id])
1211 c,bufs = self._extract_record(records[msg_id])
1131 content[msg_id] = c
1212 content[msg_id] = c
1132 buffers.extend(bufs)
1213 buffers.extend(bufs)
1214 elif msg_id in records:
1215 if rec['completed']:
1216 completed.append(msg_id)
1217 c,bufs = self._extract_record(records[msg_id])
1218 content[msg_id] = c
1219 buffers.extend(bufs)
1220 else:
1221 pending.append(msg_id)
1133 else:
1222 else:
1134 try:
1223 try:
1135 raise KeyError('No such message: '+msg_id)
1224 raise KeyError('No such message: '+msg_id)
@@ -6,12 +6,10 b''
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 from datetime import datetime
10
11 from pymongo import Connection
9 from pymongo import Connection
12 from pymongo.binary import Binary
10 from pymongo.binary import Binary
13
11
14 from IPython.utils.traitlets import Dict, List, CUnicode
12 from IPython.utils.traitlets import Dict, List, CUnicode, CStr, Instance
15
13
16 from .dictdb import BaseDB
14 from .dictdb import BaseDB
17
15
@@ -25,15 +23,20 b' class MongoDB(BaseDB):'
25 connection_args = List(config=True) # args passed to pymongo.Connection
23 connection_args = List(config=True) # args passed to pymongo.Connection
26 connection_kwargs = Dict(config=True) # kwargs passed to pymongo.Connection
24 connection_kwargs = Dict(config=True) # kwargs passed to pymongo.Connection
27 database = CUnicode(config=True) # name of the mongodb database
25 database = CUnicode(config=True) # name of the mongodb database
28 _table = Dict()
26
27 _connection = Instance(Connection) # pymongo connection
29
28
30 def __init__(self, **kwargs):
29 def __init__(self, **kwargs):
31 super(MongoDB, self).__init__(**kwargs)
30 super(MongoDB, self).__init__(**kwargs)
32 self._connection = Connection(*self.connection_args, **self.connection_kwargs)
31 if self._connection is None:
32 self._connection = Connection(*self.connection_args, **self.connection_kwargs)
33 if not self.database:
33 if not self.database:
34 self.database = self.session
34 self.database = self.session
35 self._db = self._connection[self.database]
35 self._db = self._connection[self.database]
36 self._records = self._db['task_records']
36 self._records = self._db['task_records']
37 self._records.ensure_index('msg_id', unique=True)
38 self._records.ensure_index('submitted') # for sorting history
39 # for rec in self._records.find
37
40
38 def _binary_buffers(self, rec):
41 def _binary_buffers(self, rec):
39 for key in ('buffers', 'result_buffers'):
42 for key in ('buffers', 'result_buffers'):
@@ -45,18 +48,21 b' class MongoDB(BaseDB):'
45 """Add a new Task Record, by msg_id."""
48 """Add a new Task Record, by msg_id."""
46 # print rec
49 # print rec
47 rec = self._binary_buffers(rec)
50 rec = self._binary_buffers(rec)
48 obj_id = self._records.insert(rec)
51 self._records.insert(rec)
49 self._table[msg_id] = obj_id
50
52
51 def get_record(self, msg_id):
53 def get_record(self, msg_id):
52 """Get a specific Task Record, by msg_id."""
54 """Get a specific Task Record, by msg_id."""
53 return self._records.find_one(self._table[msg_id])
55 r = self._records.find_one({'msg_id': msg_id})
56 if not r:
57 # r will be '' if nothing is found
58 raise KeyError(msg_id)
59 return r
54
60
55 def update_record(self, msg_id, rec):
61 def update_record(self, msg_id, rec):
56 """Update the data in an existing record."""
62 """Update the data in an existing record."""
57 rec = self._binary_buffers(rec)
63 rec = self._binary_buffers(rec)
58 obj_id = self._table[msg_id]
64
59 self._records.update({'_id':obj_id}, {'$set': rec})
65 self._records.update({'msg_id':msg_id}, {'$set': rec})
60
66
61 def drop_matching_records(self, check):
67 def drop_matching_records(self, check):
62 """Remove a record from the DB."""
68 """Remove a record from the DB."""
@@ -64,8 +70,7 b' class MongoDB(BaseDB):'
64
70
65 def drop_record(self, msg_id):
71 def drop_record(self, msg_id):
66 """Remove a record from the DB."""
72 """Remove a record from the DB."""
67 obj_id = self._table.pop(msg_id)
73 self._records.remove({'msg_id':msg_id})
68 self._records.remove(obj_id)
69
74
70 def find_records(self, check, keys=None):
75 def find_records(self, check, keys=None):
71 """Find records matching a query dict, optionally extracting subset of keys.
76 """Find records matching a query dict, optionally extracting subset of keys.
@@ -137,6 +137,7 b' class TaskScheduler(SessionFactory):'
137
137
138 # internals:
138 # internals:
139 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
139 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
140 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
140 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
141 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
141 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
142 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
142 pending = Dict() # dict by engine_uuid of submitted tasks
143 pending = Dict() # dict by engine_uuid of submitted tasks
@@ -205,6 +206,8 b' class TaskScheduler(SessionFactory):'
205 self.pending[uid] = {}
206 self.pending[uid] = {}
206 if len(self.targets) == 1:
207 if len(self.targets) == 1:
207 self.resume_receiving()
208 self.resume_receiving()
209 # rescan the graph:
210 self.update_graph(None)
208
211
209 def _unregister_engine(self, uid):
212 def _unregister_engine(self, uid):
210 """Existing engine with ident `uid` became unavailable."""
213 """Existing engine with ident `uid` became unavailable."""
@@ -215,11 +218,11 b' class TaskScheduler(SessionFactory):'
215 # handle any potentially finished tasks:
218 # handle any potentially finished tasks:
216 self.engine_stream.flush()
219 self.engine_stream.flush()
217
220
218 self.completed.pop(uid)
221 # don't pop destinations, because they might be used later
219 self.failed.pop(uid)
220 # don't pop destinations, because it might be used later
221 # map(self.destinations.pop, self.completed.pop(uid))
222 # map(self.destinations.pop, self.completed.pop(uid))
222 # map(self.destinations.pop, self.failed.pop(uid))
223 # map(self.destinations.pop, self.failed.pop(uid))
224
225 # prevent this engine from receiving work
223 idx = self.targets.index(uid)
226 idx = self.targets.index(uid)
224 self.targets.pop(idx)
227 self.targets.pop(idx)
225 self.loads.pop(idx)
228 self.loads.pop(idx)
@@ -229,28 +232,40 b' class TaskScheduler(SessionFactory):'
229 if self.pending[uid]:
232 if self.pending[uid]:
230 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
233 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
231 dc.start()
234 dc.start()
235 else:
236 self.completed.pop(uid)
237 self.failed.pop(uid)
238
232
239
233 @logged
240 @logged
234 def handle_stranded_tasks(self, engine):
241 def handle_stranded_tasks(self, engine):
235 """Deal with jobs resident in an engine that died."""
242 """Deal with jobs resident in an engine that died."""
236 lost = self.pending.pop(engine)
243 lost = self.pending[engine]
237
244 for msg_id in lost.keys():
238 for msg_id, (raw_msg, targets, MET, follow, timeout) in lost.iteritems():
245 if msg_id not in self.pending[engine]:
239 self.all_failed.add(msg_id)
246 # prevent double-handling of messages
240 self.all_done.add(msg_id)
247 continue
248
249 raw_msg = lost[msg_id][0]
250
241 idents,msg = self.session.feed_identities(raw_msg, copy=False)
251 idents,msg = self.session.feed_identities(raw_msg, copy=False)
242 msg = self.session.unpack_message(msg, copy=False, content=False)
252 msg = self.session.unpack_message(msg, copy=False, content=False)
243 parent = msg['header']
253 parent = msg['header']
244 idents = [idents[0],engine]+idents[1:]
254 idents = [engine, idents[0]]
245 # print (idents)
255
256 # build fake error reply
246 try:
257 try:
247 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
258 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
248 except:
259 except:
249 content = error.wrap_exception()
260 content = error.wrap_exception()
250 msg = self.session.send(self.client_stream, 'apply_reply', content,
261 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
251 parent=parent, ident=idents)
262 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
252 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
263 # and dispatch it
253 self.update_graph(msg_id)
264 self.dispatch_result(raw_reply)
265
266 # finally scrub completed/failed lists
267 self.completed.pop(engine)
268 self.failed.pop(engine)
254
269
255
270
256 #-----------------------------------------------------------------------
271 #-----------------------------------------------------------------------
@@ -277,6 +292,8 b' class TaskScheduler(SessionFactory):'
277
292
278 # targets
293 # targets
279 targets = set(header.get('targets', []))
294 targets = set(header.get('targets', []))
295 retries = header.get('retries', 0)
296 self.retries[msg_id] = retries
280
297
281 # time dependencies
298 # time dependencies
282 after = Dependency(header.get('after', []))
299 after = Dependency(header.get('after', []))
@@ -315,7 +332,9 b' class TaskScheduler(SessionFactory):'
315 # time deps already met, try to run
332 # time deps already met, try to run
316 if not self.maybe_run(msg_id, *args):
333 if not self.maybe_run(msg_id, *args):
317 # can't run yet
334 # can't run yet
318 self.save_unmet(msg_id, *args)
335 if msg_id not in self.all_failed:
336 # could have failed as unreachable
337 self.save_unmet(msg_id, *args)
319 else:
338 else:
320 self.save_unmet(msg_id, *args)
339 self.save_unmet(msg_id, *args)
321
340
@@ -328,7 +347,7 b' class TaskScheduler(SessionFactory):'
328 if msg_id in self.depending:
347 if msg_id in self.depending:
329 raw,after,targets,follow,timeout = self.depending[msg_id]
348 raw,after,targets,follow,timeout = self.depending[msg_id]
330 if timeout and timeout < now:
349 if timeout and timeout < now:
331 self.fail_unreachable(msg_id, timeout=True)
350 self.fail_unreachable(msg_id, error.TaskTimeout)
332
351
333 @logged
352 @logged
334 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
353 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
@@ -369,7 +388,7 b' class TaskScheduler(SessionFactory):'
369 # we need a can_run filter
388 # we need a can_run filter
370 def can_run(idx):
389 def can_run(idx):
371 # check hwm
390 # check hwm
372 if self.loads[idx] == self.hwm:
391 if self.hwm and self.loads[idx] == self.hwm:
373 return False
392 return False
374 target = self.targets[idx]
393 target = self.targets[idx]
375 # check blacklist
394 # check blacklist
@@ -382,6 +401,7 b' class TaskScheduler(SessionFactory):'
382 return follow.check(self.completed[target], self.failed[target])
401 return follow.check(self.completed[target], self.failed[target])
383
402
384 indices = filter(can_run, range(len(self.targets)))
403 indices = filter(can_run, range(len(self.targets)))
404
385 if not indices:
405 if not indices:
386 # couldn't run
406 # couldn't run
387 if follow.all:
407 if follow.all:
@@ -395,12 +415,14 b' class TaskScheduler(SessionFactory):'
395 for m in follow.intersection(relevant):
415 for m in follow.intersection(relevant):
396 dests.add(self.destinations[m])
416 dests.add(self.destinations[m])
397 if len(dests) > 1:
417 if len(dests) > 1:
418 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
398 self.fail_unreachable(msg_id)
419 self.fail_unreachable(msg_id)
399 return False
420 return False
400 if targets:
421 if targets:
401 # check blacklist+targets for impossibility
422 # check blacklist+targets for impossibility
402 targets.difference_update(blacklist)
423 targets.difference_update(blacklist)
403 if not targets or not targets.intersection(self.targets):
424 if not targets or not targets.intersection(self.targets):
425 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
404 self.fail_unreachable(msg_id)
426 self.fail_unreachable(msg_id)
405 return False
427 return False
406 return False
428 return False
@@ -454,20 +476,34 b' class TaskScheduler(SessionFactory):'
454 idents,msg = self.session.feed_identities(raw_msg, copy=False)
476 idents,msg = self.session.feed_identities(raw_msg, copy=False)
455 msg = self.session.unpack_message(msg, content=False, copy=False)
477 msg = self.session.unpack_message(msg, content=False, copy=False)
456 engine = idents[0]
478 engine = idents[0]
457 idx = self.targets.index(engine)
479 try:
458 self.finish_job(idx)
480 idx = self.targets.index(engine)
481 except ValueError:
482 pass # skip load-update for dead engines
483 else:
484 self.finish_job(idx)
459 except Exception:
485 except Exception:
460 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
486 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
461 return
487 return
462
488
463 header = msg['header']
489 header = msg['header']
490 parent = msg['parent_header']
464 if header.get('dependencies_met', True):
491 if header.get('dependencies_met', True):
465 success = (header['status'] == 'ok')
492 success = (header['status'] == 'ok')
466 self.handle_result(idents, msg['parent_header'], raw_msg, success)
493 msg_id = parent['msg_id']
467 # send to Hub monitor
494 retries = self.retries[msg_id]
468 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
495 if not success and retries > 0:
496 # failed
497 self.retries[msg_id] = retries - 1
498 self.handle_unmet_dependency(idents, parent)
499 else:
500 del self.retries[msg_id]
501 # relay to client and update graph
502 self.handle_result(idents, parent, raw_msg, success)
503 # send to Hub monitor
504 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
469 else:
505 else:
470 self.handle_unmet_dependency(idents, msg['parent_header'])
506 self.handle_unmet_dependency(idents, parent)
471
507
472 @logged
508 @logged
473 def handle_result(self, idents, parent, raw_msg, success=True):
509 def handle_result(self, idents, parent, raw_msg, success=True):
@@ -511,13 +547,19 b' class TaskScheduler(SessionFactory):'
511 self.depending[msg_id] = args
547 self.depending[msg_id] = args
512 self.fail_unreachable(msg_id)
548 self.fail_unreachable(msg_id)
513 elif not self.maybe_run(msg_id, *args):
549 elif not self.maybe_run(msg_id, *args):
514 # resubmit failed, put it back in our dependency tree
550 # resubmit failed
515 self.save_unmet(msg_id, *args)
551 if msg_id not in self.all_failed:
552 # put it back in our dependency tree
553 self.save_unmet(msg_id, *args)
516
554
517 if self.hwm:
555 if self.hwm:
518 idx = self.targets.index(engine)
556 try:
519 if self.loads[idx] == self.hwm-1:
557 idx = self.targets.index(engine)
520 self.update_graph(None)
558 except ValueError:
559 pass # skip load-update for dead engines
560 else:
561 if self.loads[idx] == self.hwm-1:
562 self.update_graph(None)
521
563
522
564
523
565
@@ -526,7 +568,7 b' class TaskScheduler(SessionFactory):'
526 """dep_id just finished. Update our dependency
568 """dep_id just finished. Update our dependency
527 graph and submit any jobs that just became runable.
569 graph and submit any jobs that just became runable.
528
570
529 Called with dep_id=None to update graph for hwm, but without finishing
571 Called with dep_id=None to update entire graph for hwm, but without finishing
530 a task.
572 a task.
531 """
573 """
532 # print ("\n\n***********")
574 # print ("\n\n***********")
@@ -538,9 +580,11 b' class TaskScheduler(SessionFactory):'
538 # print ("\n\n***********\n\n")
580 # print ("\n\n***********\n\n")
539 # update any jobs that depended on the dependency
581 # update any jobs that depended on the dependency
540 jobs = self.graph.pop(dep_id, [])
582 jobs = self.graph.pop(dep_id, [])
541 # if we have HWM and an engine just become no longer full
583
542 # recheck *all* jobs:
584 # recheck *all* jobs if
543 if self.hwm and any( [ load==self.hwm-1 for load in self.loads]):
585 # a) we have HWM and an engine just become no longer full
586 # or b) dep_id was given as None
587 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
544 jobs = self.depending.keys()
588 jobs = self.depending.keys()
545
589
546 for msg_id in jobs:
590 for msg_id in jobs:
@@ -27,16 +27,20 b' operators = {'
27 '$lt' : "<",
27 '$lt' : "<",
28 '$gt' : ">",
28 '$gt' : ">",
29 # null is handled weird with ==,!=
29 # null is handled weird with ==,!=
30 '$eq' : "IS",
30 '$eq' : "=",
31 '$ne' : "IS NOT",
31 '$ne' : "!=",
32 '$lte': "<=",
32 '$lte': "<=",
33 '$gte': ">=",
33 '$gte': ">=",
34 '$in' : ('IS', ' OR '),
34 '$in' : ('=', ' OR '),
35 '$nin': ('IS NOT', ' AND '),
35 '$nin': ('!=', ' AND '),
36 # '$all': None,
36 # '$all': None,
37 # '$mod': None,
37 # '$mod': None,
38 # '$exists' : None
38 # '$exists' : None
39 }
39 }
40 null_operators = {
41 '=' : "IS NULL",
42 '!=' : "IS NOT NULL",
43 }
40
44
41 def _adapt_datetime(dt):
45 def _adapt_datetime(dt):
42 return dt.strftime(ISO8601)
46 return dt.strftime(ISO8601)
@@ -205,17 +209,27 b' class SQLiteDB(BaseDB):'
205 raise KeyError("Unsupported operator: %r"%test)
209 raise KeyError("Unsupported operator: %r"%test)
206 if isinstance(op, tuple):
210 if isinstance(op, tuple):
207 op, join = op
211 op, join = op
208 expr = "%s %s ?"%(name, op)
212
209 if isinstance(value, (tuple,list)):
213 if value is None and op in null_operators:
210 expr = '( %s )'%( join.join([expr]*len(value)) )
214 expr = "%s %s"%null_operators[op]
211 args.extend(value)
212 else:
215 else:
213 args.append(value)
216 expr = "%s %s ?"%(name, op)
217 if isinstance(value, (tuple,list)):
218 if op in null_operators and any([v is None for v in value]):
219 # equality tests don't work with NULL
220 raise ValueError("Cannot use %r test with NULL values on SQLite backend"%test)
221 expr = '( %s )'%( join.join([expr]*len(value)) )
222 args.extend(value)
223 else:
224 args.append(value)
214 expressions.append(expr)
225 expressions.append(expr)
215 else:
226 else:
216 # it's an equality check
227 # it's an equality check
217 expressions.append("%s IS ?"%name)
228 if sub_check is None:
218 args.append(sub_check)
229 expressions.append("%s IS NULL")
230 else:
231 expressions.append("%s = ?"%name)
232 args.append(sub_check)
219
233
220 expr = " AND ".join(expressions)
234 expr = " AND ".join(expressions)
221 return expr, args
235 return expr, args
@@ -176,6 +176,37 b' class StreamSession(object):'
176 header = extract_header(msg_or_header)
176 header = extract_header(msg_or_header)
177 return header.get('key', None) == self.key
177 return header.get('key', None) == self.key
178
178
179
180 def serialize(self, msg, ident=None):
181 content = msg.get('content', {})
182 if content is None:
183 content = self.none
184 elif isinstance(content, dict):
185 content = self.pack(content)
186 elif isinstance(content, bytes):
187 # content is already packed, as in a relayed message
188 pass
189 elif isinstance(content, unicode):
190 # should be bytes, but JSON often spits out unicode
191 content = content.encode('utf8')
192 else:
193 raise TypeError("Content incorrect type: %s"%type(content))
194
195 to_send = []
196
197 if isinstance(ident, list):
198 # accept list of idents
199 to_send.extend(ident)
200 elif ident is not None:
201 to_send.append(ident)
202 to_send.append(DELIM)
203 if self.key is not None:
204 to_send.append(self.key)
205 to_send.append(self.pack(msg['header']))
206 to_send.append(self.pack(msg['parent_header']))
207 to_send.append(content)
208
209 return to_send
179
210
180 def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None, track=False):
211 def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None, track=False):
181 """Build and send a message via stream or socket.
212 """Build and send a message via stream or socket.
@@ -221,33 +252,11 b' class StreamSession(object):'
221 # we got a Message, not a msg_type
252 # we got a Message, not a msg_type
222 # don't build a new Message
253 # don't build a new Message
223 msg = msg_or_type
254 msg = msg_or_type
224 content = msg['content']
225 else:
255 else:
226 msg = self.msg(msg_or_type, content, parent, subheader)
256 msg = self.msg(msg_or_type, content, parent, subheader)
227
257
228 buffers = [] if buffers is None else buffers
258 buffers = [] if buffers is None else buffers
229 to_send = []
259 to_send = self.serialize(msg, ident)
230 if isinstance(ident, list):
231 # accept list of idents
232 to_send.extend(ident)
233 elif ident is not None:
234 to_send.append(ident)
235 to_send.append(DELIM)
236 if self.key is not None:
237 to_send.append(self.key)
238 to_send.append(self.pack(msg['header']))
239 to_send.append(self.pack(msg['parent_header']))
240
241 if content is None:
242 content = self.none
243 elif isinstance(content, dict):
244 content = self.pack(content)
245 elif isinstance(content, bytes):
246 # content is already packed, as in a relayed message
247 pass
248 else:
249 raise TypeError("Content incorrect type: %s"%type(content))
250 to_send.append(content)
251 flag = 0
260 flag = 0
252 if buffers:
261 if buffers:
253 flag = zmq.SNDMORE
262 flag = zmq.SNDMORE
@@ -48,7 +48,7 b' class TestProcessLauncher(LocalProcessLauncher):'
48 def setup():
48 def setup():
49 cp = TestProcessLauncher()
49 cp = TestProcessLauncher()
50 cp.cmd_and_args = ipcontroller_cmd_argv + \
50 cp.cmd_and_args = ipcontroller_cmd_argv + \
51 ['--profile', 'iptest', '--log-level', '99', '-r', '--usethreads']
51 ['--profile', 'iptest', '--log-level', '99', '-r']
52 cp.start()
52 cp.start()
53 launchers.append(cp)
53 launchers.append(cp)
54 cluster_dir = os.path.join(get_ipython_dir(), 'cluster_iptest')
54 cluster_dir = os.path.join(get_ipython_dir(), 'cluster_iptest')
@@ -212,3 +212,33 b' class TestClient(ClusterTestCase):'
212 time.sleep(0.25)
212 time.sleep(0.25)
213 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
213 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
214
214
215 def test_resubmit(self):
216 def f():
217 import random
218 return random.random()
219 v = self.client.load_balanced_view()
220 ar = v.apply_async(f)
221 r1 = ar.get(1)
222 ahr = self.client.resubmit(ar.msg_ids)
223 r2 = ahr.get(1)
224 self.assertFalse(r1 == r2)
225
226 def test_resubmit_inflight(self):
227 """ensure ValueError on resubmit of inflight task"""
228 v = self.client.load_balanced_view()
229 ar = v.apply_async(time.sleep,1)
230 # give the message a chance to arrive
231 time.sleep(0.2)
232 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
233 ar.get(2)
234
235 def test_resubmit_badkey(self):
236 """ensure KeyError on resubmit of nonexistant task"""
237 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
238
239 def test_purge_results(self):
240 hist = self.client.hub_history()
241 self.client.purge_results(hist)
242 newhist = self.client.hub_history()
243 self.assertTrue(len(newhist) == 0)
244
@@ -15,10 +15,7 b''
15 import tempfile
15 import tempfile
16 import time
16 import time
17
17
18 import uuid
19
20 from datetime import datetime, timedelta
18 from datetime import datetime, timedelta
21 from random import choice, randint
22 from unittest import TestCase
19 from unittest import TestCase
23
20
24 from nose import SkipTest
21 from nose import SkipTest
@@ -157,6 +154,13 b' class TestDictBackend(TestCase):'
157 self.db.update_record(msg_id, dict(completed=datetime.now()))
154 self.db.update_record(msg_id, dict(completed=datetime.now()))
158 rec = self.db.get_record(msg_id)
155 rec = self.db.get_record(msg_id)
159 self.assertTrue(isinstance(rec['completed'], datetime))
156 self.assertTrue(isinstance(rec['completed'], datetime))
157
158 def test_drop_matching(self):
159 msg_ids = self.load_records(10)
160 query = {'msg_id' : {'$in':msg_ids}}
161 self.db.drop_matching_records(query)
162 recs = self.db.find_records(query)
163 self.assertTrue(len(recs)==0)
160
164
161 class TestSQLiteBackend(TestDictBackend):
165 class TestSQLiteBackend(TestDictBackend):
162 def create_db(self):
166 def create_db(self):
@@ -164,19 +168,3 b' class TestSQLiteBackend(TestDictBackend):'
164
168
165 def tearDown(self):
169 def tearDown(self):
166 self.db._db.close()
170 self.db._db.close()
167
168 # optional MongoDB test
169 try:
170 from IPython.parallel.controller.mongodb import MongoDB
171 except ImportError:
172 pass
173 else:
174 class TestMongoBackend(TestDictBackend):
175 def create_db(self):
176 try:
177 return MongoDB(database='iptestdb')
178 except Exception:
179 raise SkipTest("Couldn't connect to mongodb instance")
180
181 def tearDown(self):
182 self.db._connection.drop_database('iptestdb')
@@ -21,7 +21,7 b' import zmq'
21 from IPython import parallel as pmod
21 from IPython import parallel as pmod
22 from IPython.parallel import error
22 from IPython.parallel import error
23 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
23 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
24 from IPython.parallel import LoadBalancedView, DirectView
24 from IPython.parallel import DirectView
25 from IPython.parallel.util import interactive
25 from IPython.parallel.util import interactive
26
26
27 from IPython.parallel.tests import add_engines
27 from IPython.parallel.tests import add_engines
@@ -33,18 +33,6 b' def setup():'
33
33
34 class TestView(ClusterTestCase):
34 class TestView(ClusterTestCase):
35
35
36 def test_z_crash_task(self):
37 """test graceful handling of engine death (balanced)"""
38 # self.add_engines(1)
39 ar = self.client[-1].apply_async(crash)
40 self.assertRaisesRemote(error.EngineError, ar.get)
41 eid = ar.engine_id
42 tic = time.time()
43 while eid in self.client.ids and time.time()-tic < 5:
44 time.sleep(.01)
45 self.client.spin()
46 self.assertFalse(eid in self.client.ids, "Engine should have died")
47
48 def test_z_crash_mux(self):
36 def test_z_crash_mux(self):
49 """test graceful handling of engine death (direct)"""
37 """test graceful handling of engine death (direct)"""
50 # self.add_engines(1)
38 # self.add_engines(1)
@@ -199,6 +199,7 b' def make_exclude():'
199
199
200 if not have['pymongo']:
200 if not have['pymongo']:
201 exclusions.append(ipjoin('parallel', 'controller', 'mongodb'))
201 exclusions.append(ipjoin('parallel', 'controller', 'mongodb'))
202 exclusions.append(ipjoin('parallel', 'tests', 'test_mongodb'))
202
203
203 if not have['matplotlib']:
204 if not have['matplotlib']:
204 exclusions.extend([ipjoin('lib', 'pylabtools'),
205 exclusions.extend([ipjoin('lib', 'pylabtools'),
@@ -12,6 +12,7 b' Using IPython for parallel computing'
12 parallel_multiengine.txt
12 parallel_multiengine.txt
13 parallel_task.txt
13 parallel_task.txt
14 parallel_mpi.txt
14 parallel_mpi.txt
15 parallel_db.txt
15 parallel_security.txt
16 parallel_security.txt
16 parallel_winhpc.txt
17 parallel_winhpc.txt
17 parallel_demos.txt
18 parallel_demos.txt
@@ -292,6 +292,7 b' you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje'
292
292
293
293
294
294
295
295 Impossible Dependencies
296 Impossible Dependencies
296 ***********************
297 ***********************
297
298
@@ -313,6 +314,27 b' The basic cases that are checked:'
313 This analysis has not been proven to be rigorous, so it is likely possible for tasks
314 This analysis has not been proven to be rigorous, so it is likely possible for tasks
314 to become impossible to run in obscure situations, so a timeout may be a good choice.
315 to become impossible to run in obscure situations, so a timeout may be a good choice.
315
316
317
318 Retries and Resubmit
319 ====================
320
321 Retries
322 -------
323
324 Another flag for tasks is `retries`. This is an integer, specifying how many times
325 a task should be resubmitted after failure. This is useful for tasks that should still run
326 if their engine was shutdown, or may have some statistical chance of failing. The default
327 is to not retry tasks.
328
329 Resubmit
330 --------
331
332 Sometimes you may want to re-run a task. This could be because it failed for some reason, and
333 you have fixed the error, or because you want to restore the cluster to an interrupted state.
334 For this, the :class:`Client` has a :meth:`rc.resubmit` method. This simply takes one or more
335 msg_ids, and returns an :class:`AsyncHubResult` for the result(s). You cannot resubmit
336 a task that is pending - only those that have finished, either successful or unsuccessful.
337
316 .. _parallel_schedulers:
338 .. _parallel_schedulers:
317
339
318 Schedulers
340 Schedulers
@@ -391,6 +413,8 b' Disabled features when using the ZMQ Scheduler:'
391 TODO: performance comparisons
413 TODO: performance comparisons
392
414
393
415
416
417
394 More details
418 More details
395 ============
419 ============
396
420
General Comments 0
You need to be logged in to leave comments. Login now