##// END OF EJS Templates
Merge pull request #1391 from minrk/no_engines...
Min RK -
r6104:821fac24 merge
parent child Browse files
Show More
@@ -1308,7 +1308,11 b' class Client(HasTraits):'
1308 verbose : bool
1308 verbose : bool
1309 Whether to return lengths only, or lists of ids for each element
1309 Whether to return lengths only, or lists of ids for each element
1310 """
1310 """
1311 engine_ids = self._build_targets(targets)[1]
1311 if targets == 'all':
1312 # allow 'all' to be evaluated on the engine
1313 engine_ids = None
1314 else:
1315 engine_ids = self._build_targets(targets)[1]
1312 content = dict(targets=engine_ids, verbose=verbose)
1316 content = dict(targets=engine_ids, verbose=verbose)
1313 self.session.send(self._query_socket, "queue_request", content=content)
1317 self.session.send(self._query_socket, "queue_request", content=content)
1314 idents,msg = self.session.recv(self._query_socket, 0)
1318 idents,msg = self.session.recv(self._query_socket, 0)
@@ -430,7 +430,7 b' class Hub(SessionFactory):'
430 """turn any valid targets argument into a list of integer ids"""
430 """turn any valid targets argument into a list of integer ids"""
431 if targets is None:
431 if targets is None:
432 # default to all
432 # default to all
433 targets = self.ids
433 return self.ids
434
434
435 if isinstance(targets, (int,str,unicode)):
435 if isinstance(targets, (int,str,unicode)):
436 # only one target specified
436 # only one target specified
@@ -457,7 +457,7 b' class Hub(SessionFactory):'
457 def dispatch_monitor_traffic(self, msg):
457 def dispatch_monitor_traffic(self, msg):
458 """all ME and Task queue messages come through here, as well as
458 """all ME and Task queue messages come through here, as well as
459 IOPub traffic."""
459 IOPub traffic."""
460 self.log.debug("monitor traffic: %r", msg[:2])
460 self.log.debug("monitor traffic: %r", msg[0])
461 switch = msg[0]
461 switch = msg[0]
462 try:
462 try:
463 idents, msg = self.session.feed_identities(msg[1:])
463 idents, msg = self.session.feed_identities(msg[1:])
@@ -1271,17 +1271,17 b' class Hub(SessionFactory):'
1271 buffer_lens = [] if 'buffers' in keys else None
1271 buffer_lens = [] if 'buffers' in keys else None
1272 result_buffer_lens = [] if 'result_buffers' in keys else None
1272 result_buffer_lens = [] if 'result_buffers' in keys else None
1273 else:
1273 else:
1274 buffer_lens = []
1274 buffer_lens = None
1275 result_buffer_lens = []
1275 result_buffer_lens = None
1276
1276
1277 for rec in records:
1277 for rec in records:
1278 # buffers may be None, so double check
1278 # buffers may be None, so double check
1279 b = rec.pop('buffers', empty) or empty
1279 if buffer_lens is not None:
1280 if buffer_lens is not None:
1280 b = rec.pop('buffers', empty) or empty
1281 buffer_lens.append(len(b))
1281 buffer_lens.append(len(b))
1282 buffers.extend(b)
1282 buffers.extend(b)
1283 rb = rec.pop('result_buffers', empty) or empty
1283 if result_buffer_lens is not None:
1284 if result_buffer_lens is not None:
1284 rb = rec.pop('result_buffers', empty) or empty
1285 result_buffer_lens.append(len(rb))
1285 result_buffer_lens.append(len(rb))
1286 buffers.extend(rb)
1286 buffers.extend(rb)
1287 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1287 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
@@ -191,6 +191,8 b' class TaskScheduler(SessionFactory):'
191
191
192 def start(self):
192 def start(self):
193 self.engine_stream.on_recv(self.dispatch_result, copy=False)
193 self.engine_stream.on_recv(self.dispatch_result, copy=False)
194 self.client_stream.on_recv(self.dispatch_submission, copy=False)
195
194 self._notification_handlers = dict(
196 self._notification_handlers = dict(
195 registration_notification = self._register_engine,
197 registration_notification = self._register_engine,
196 unregistration_notification = self._unregister_engine
198 unregistration_notification = self._unregister_engine
@@ -247,8 +249,7 b' class TaskScheduler(SessionFactory):'
247 self.completed[uid] = set()
249 self.completed[uid] = set()
248 self.failed[uid] = set()
250 self.failed[uid] = set()
249 self.pending[uid] = {}
251 self.pending[uid] = {}
250 if len(self.targets) == 1:
252
251 self.resume_receiving()
252 # rescan the graph:
253 # rescan the graph:
253 self.update_graph(None)
254 self.update_graph(None)
254
255
@@ -256,7 +257,7 b' class TaskScheduler(SessionFactory):'
256 """Existing engine with ident `uid` became unavailable."""
257 """Existing engine with ident `uid` became unavailable."""
257 if len(self.targets) == 1:
258 if len(self.targets) == 1:
258 # this was our only engine
259 # this was our only engine
259 self.stop_receiving()
260 pass
260
261
261 # handle any potentially finished tasks:
262 # handle any potentially finished tasks:
262 self.engine_stream.flush()
263 self.engine_stream.flush()
@@ -445,6 +446,11 b' class TaskScheduler(SessionFactory):'
445
446
446 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
447 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
447 """check location dependencies, and run if they are met."""
448 """check location dependencies, and run if they are met."""
449 self.log.debug("Attempting to assign task %s", msg_id)
450 if not self.targets:
451 # no engines, definitely can't run
452 return False
453
448 blacklist = self.blacklist.setdefault(msg_id, set())
454 blacklist = self.blacklist.setdefault(msg_id, set())
449 if follow or targets or blacklist or self.hwm:
455 if follow or targets or blacklist or self.hwm:
450 # we need a can_run filter
456 # we need a can_run filter
@@ -223,6 +223,14 b' class TestClient(ClusterTestCase):'
223 for rec in found:
223 for rec in found:
224 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
224 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
225
225
226 def test_db_query_default_keys(self):
227 """default db_query excludes buffers"""
228 found = self.client.db_query({'msg_id': {'$ne' : ''}})
229 for rec in found:
230 keys = set(rec.keys())
231 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
232 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
233
226 def test_db_query_msg_id(self):
234 def test_db_query_msg_id(self):
227 """ensure msg_id is always in db queries"""
235 """ensure msg_id is always in db queries"""
228 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
236 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
General Comments 0
You need to be logged in to leave comments. Login now