Show More
@@ -1308,7 +1308,11 b' class Client(HasTraits):' | |||
|
1308 | 1308 | verbose : bool |
|
1309 | 1309 | Whether to return lengths only, or lists of ids for each element |
|
1310 | 1310 | """ |
|
1311 | engine_ids = self._build_targets(targets)[1] | |
|
1311 | if targets == 'all': | |
|
1312 | # allow 'all' to be evaluated on the engine | |
|
1313 | engine_ids = None | |
|
1314 | else: | |
|
1315 | engine_ids = self._build_targets(targets)[1] | |
|
1312 | 1316 | content = dict(targets=engine_ids, verbose=verbose) |
|
1313 | 1317 | self.session.send(self._query_socket, "queue_request", content=content) |
|
1314 | 1318 | idents,msg = self.session.recv(self._query_socket, 0) |
@@ -430,7 +430,7 b' class Hub(SessionFactory):' | |||
|
430 | 430 | """turn any valid targets argument into a list of integer ids""" |
|
431 | 431 | if targets is None: |
|
432 | 432 | # default to all |
|
433 |
|
|
|
433 | return self.ids | |
|
434 | 434 | |
|
435 | 435 | if isinstance(targets, (int,str,unicode)): |
|
436 | 436 | # only one target specified |
@@ -457,7 +457,7 b' class Hub(SessionFactory):' | |||
|
457 | 457 | def dispatch_monitor_traffic(self, msg): |
|
458 | 458 | """all ME and Task queue messages come through here, as well as |
|
459 | 459 | IOPub traffic.""" |
|
460 |
self.log.debug("monitor traffic: %r", msg[ |
|
|
460 | self.log.debug("monitor traffic: %r", msg[0]) | |
|
461 | 461 | switch = msg[0] |
|
462 | 462 | try: |
|
463 | 463 | idents, msg = self.session.feed_identities(msg[1:]) |
@@ -1271,17 +1271,17 b' class Hub(SessionFactory):' | |||
|
1271 | 1271 | buffer_lens = [] if 'buffers' in keys else None |
|
1272 | 1272 | result_buffer_lens = [] if 'result_buffers' in keys else None |
|
1273 | 1273 | else: |
|
1274 |
buffer_lens = |
|
|
1275 |
result_buffer_lens = |
|
|
1274 | buffer_lens = None | |
|
1275 | result_buffer_lens = None | |
|
1276 | 1276 | |
|
1277 | 1277 | for rec in records: |
|
1278 | 1278 | # buffers may be None, so double check |
|
1279 | b = rec.pop('buffers', empty) or empty | |
|
1279 | 1280 | if buffer_lens is not None: |
|
1280 | b = rec.pop('buffers', empty) or empty | |
|
1281 | 1281 | buffer_lens.append(len(b)) |
|
1282 | 1282 | buffers.extend(b) |
|
1283 | rb = rec.pop('result_buffers', empty) or empty | |
|
1283 | 1284 | if result_buffer_lens is not None: |
|
1284 | rb = rec.pop('result_buffers', empty) or empty | |
|
1285 | 1285 | result_buffer_lens.append(len(rb)) |
|
1286 | 1286 | buffers.extend(rb) |
|
1287 | 1287 | content = dict(status='ok', records=records, buffer_lens=buffer_lens, |
@@ -191,6 +191,8 b' class TaskScheduler(SessionFactory):' | |||
|
191 | 191 | |
|
192 | 192 | def start(self): |
|
193 | 193 | self.engine_stream.on_recv(self.dispatch_result, copy=False) |
|
194 | self.client_stream.on_recv(self.dispatch_submission, copy=False) | |
|
195 | ||
|
194 | 196 | self._notification_handlers = dict( |
|
195 | 197 | registration_notification = self._register_engine, |
|
196 | 198 | unregistration_notification = self._unregister_engine |
@@ -247,8 +249,7 b' class TaskScheduler(SessionFactory):' | |||
|
247 | 249 | self.completed[uid] = set() |
|
248 | 250 | self.failed[uid] = set() |
|
249 | 251 | self.pending[uid] = {} |
|
250 | if len(self.targets) == 1: | |
|
251 | self.resume_receiving() | |
|
252 | ||
|
252 | 253 | # rescan the graph: |
|
253 | 254 | self.update_graph(None) |
|
254 | 255 | |
@@ -256,7 +257,7 b' class TaskScheduler(SessionFactory):' | |||
|
256 | 257 | """Existing engine with ident `uid` became unavailable.""" |
|
257 | 258 | if len(self.targets) == 1: |
|
258 | 259 | # this was our only engine |
|
259 | self.stop_receiving() | |
|
260 | pass | |
|
260 | 261 | |
|
261 | 262 | # handle any potentially finished tasks: |
|
262 | 263 | self.engine_stream.flush() |
@@ -445,6 +446,11 b' class TaskScheduler(SessionFactory):' | |||
|
445 | 446 | |
|
446 | 447 | def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout): |
|
447 | 448 | """check location dependencies, and run if they are met.""" |
|
449 | self.log.debug("Attempting to assign task %s", msg_id) | |
|
450 | if not self.targets: | |
|
451 | # no engines, definitely can't run | |
|
452 | return False | |
|
453 | ||
|
448 | 454 | blacklist = self.blacklist.setdefault(msg_id, set()) |
|
449 | 455 | if follow or targets or blacklist or self.hwm: |
|
450 | 456 | # we need a can_run filter |
@@ -223,6 +223,14 b' class TestClient(ClusterTestCase):' | |||
|
223 | 223 | for rec in found: |
|
224 | 224 | self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed'])) |
|
225 | 225 | |
|
226 | def test_db_query_default_keys(self): | |
|
227 | """default db_query excludes buffers""" | |
|
228 | found = self.client.db_query({'msg_id': {'$ne' : ''}}) | |
|
229 | for rec in found: | |
|
230 | keys = set(rec.keys()) | |
|
231 | self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys) | |
|
232 | self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys) | |
|
233 | ||
|
226 | 234 | def test_db_query_msg_id(self): |
|
227 | 235 | """ensure msg_id is always in db queries""" |
|
228 | 236 | found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed']) |
General Comments 0
You need to be logged in to leave comments.
Login now