##// END OF EJS Templates
Merge pull request #1391 from minrk/no_engines...
Min RK -
r6104:821fac24 merge
parent child Browse files
Show More
@@ -1308,7 +1308,11 b' class Client(HasTraits):'
1308 1308 verbose : bool
1309 1309 Whether to return lengths only, or lists of ids for each element
1310 1310 """
1311 engine_ids = self._build_targets(targets)[1]
1311 if targets == 'all':
1312 # allow 'all' to be evaluated on the engine
1313 engine_ids = None
1314 else:
1315 engine_ids = self._build_targets(targets)[1]
1312 1316 content = dict(targets=engine_ids, verbose=verbose)
1313 1317 self.session.send(self._query_socket, "queue_request", content=content)
1314 1318 idents,msg = self.session.recv(self._query_socket, 0)
@@ -430,7 +430,7 b' class Hub(SessionFactory):'
430 430 """turn any valid targets argument into a list of integer ids"""
431 431 if targets is None:
432 432 # default to all
433 targets = self.ids
433 return self.ids
434 434
435 435 if isinstance(targets, (int,str,unicode)):
436 436 # only one target specified
@@ -457,7 +457,7 b' class Hub(SessionFactory):'
457 457 def dispatch_monitor_traffic(self, msg):
458 458 """all ME and Task queue messages come through here, as well as
459 459 IOPub traffic."""
460 self.log.debug("monitor traffic: %r", msg[:2])
460 self.log.debug("monitor traffic: %r", msg[0])
461 461 switch = msg[0]
462 462 try:
463 463 idents, msg = self.session.feed_identities(msg[1:])
@@ -1271,17 +1271,17 b' class Hub(SessionFactory):'
1271 1271 buffer_lens = [] if 'buffers' in keys else None
1272 1272 result_buffer_lens = [] if 'result_buffers' in keys else None
1273 1273 else:
1274 buffer_lens = []
1275 result_buffer_lens = []
1274 buffer_lens = None
1275 result_buffer_lens = None
1276 1276
1277 1277 for rec in records:
1278 1278 # buffers may be None, so double check
1279 b = rec.pop('buffers', empty) or empty
1279 1280 if buffer_lens is not None:
1280 b = rec.pop('buffers', empty) or empty
1281 1281 buffer_lens.append(len(b))
1282 1282 buffers.extend(b)
1283 rb = rec.pop('result_buffers', empty) or empty
1283 1284 if result_buffer_lens is not None:
1284 rb = rec.pop('result_buffers', empty) or empty
1285 1285 result_buffer_lens.append(len(rb))
1286 1286 buffers.extend(rb)
1287 1287 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
@@ -191,6 +191,8 b' class TaskScheduler(SessionFactory):'
191 191
192 192 def start(self):
193 193 self.engine_stream.on_recv(self.dispatch_result, copy=False)
194 self.client_stream.on_recv(self.dispatch_submission, copy=False)
195
194 196 self._notification_handlers = dict(
195 197 registration_notification = self._register_engine,
196 198 unregistration_notification = self._unregister_engine
@@ -247,8 +249,7 b' class TaskScheduler(SessionFactory):'
247 249 self.completed[uid] = set()
248 250 self.failed[uid] = set()
249 251 self.pending[uid] = {}
250 if len(self.targets) == 1:
251 self.resume_receiving()
252
252 253 # rescan the graph:
253 254 self.update_graph(None)
254 255
@@ -256,7 +257,7 b' class TaskScheduler(SessionFactory):'
256 257 """Existing engine with ident `uid` became unavailable."""
257 258 if len(self.targets) == 1:
258 259 # this was our only engine
259 self.stop_receiving()
260 pass
260 261
261 262 # handle any potentially finished tasks:
262 263 self.engine_stream.flush()
@@ -445,6 +446,11 b' class TaskScheduler(SessionFactory):'
445 446
446 447 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
447 448 """check location dependencies, and run if they are met."""
449 self.log.debug("Attempting to assign task %s", msg_id)
450 if not self.targets:
451 # no engines, definitely can't run
452 return False
453
448 454 blacklist = self.blacklist.setdefault(msg_id, set())
449 455 if follow or targets or blacklist or self.hwm:
450 456 # we need a can_run filter
@@ -223,6 +223,14 b' class TestClient(ClusterTestCase):'
223 223 for rec in found:
224 224 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
225 225
226 def test_db_query_default_keys(self):
227 """default db_query excludes buffers"""
228 found = self.client.db_query({'msg_id': {'$ne' : ''}})
229 for rec in found:
230 keys = set(rec.keys())
231 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
232 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
233
226 234 def test_db_query_msg_id(self):
227 235 """ensure msg_id is always in db queries"""
228 236 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
General Comments 0
You need to be logged in to leave comments. Login now