Show More
@@ -1308,7 +1308,11 b' class Client(HasTraits):' | |||||
1308 | verbose : bool |
|
1308 | verbose : bool | |
1309 | Whether to return lengths only, or lists of ids for each element |
|
1309 | Whether to return lengths only, or lists of ids for each element | |
1310 | """ |
|
1310 | """ | |
1311 | engine_ids = self._build_targets(targets)[1] |
|
1311 | if targets == 'all': | |
|
1312 | # allow 'all' to be evaluated on the engine | |||
|
1313 | engine_ids = None | |||
|
1314 | else: | |||
|
1315 | engine_ids = self._build_targets(targets)[1] | |||
1312 | content = dict(targets=engine_ids, verbose=verbose) |
|
1316 | content = dict(targets=engine_ids, verbose=verbose) | |
1313 | self.session.send(self._query_socket, "queue_request", content=content) |
|
1317 | self.session.send(self._query_socket, "queue_request", content=content) | |
1314 | idents,msg = self.session.recv(self._query_socket, 0) |
|
1318 | idents,msg = self.session.recv(self._query_socket, 0) |
@@ -430,7 +430,7 b' class Hub(SessionFactory):' | |||||
430 | """turn any valid targets argument into a list of integer ids""" |
|
430 | """turn any valid targets argument into a list of integer ids""" | |
431 | if targets is None: |
|
431 | if targets is None: | |
432 | # default to all |
|
432 | # default to all | |
433 |
|
|
433 | return self.ids | |
434 |
|
434 | |||
435 | if isinstance(targets, (int,str,unicode)): |
|
435 | if isinstance(targets, (int,str,unicode)): | |
436 | # only one target specified |
|
436 | # only one target specified | |
@@ -457,7 +457,7 b' class Hub(SessionFactory):' | |||||
457 | def dispatch_monitor_traffic(self, msg): |
|
457 | def dispatch_monitor_traffic(self, msg): | |
458 | """all ME and Task queue messages come through here, as well as |
|
458 | """all ME and Task queue messages come through here, as well as | |
459 | IOPub traffic.""" |
|
459 | IOPub traffic.""" | |
460 |
self.log.debug("monitor traffic: %r", msg[ |
|
460 | self.log.debug("monitor traffic: %r", msg[0]) | |
461 | switch = msg[0] |
|
461 | switch = msg[0] | |
462 | try: |
|
462 | try: | |
463 | idents, msg = self.session.feed_identities(msg[1:]) |
|
463 | idents, msg = self.session.feed_identities(msg[1:]) | |
@@ -1271,17 +1271,17 b' class Hub(SessionFactory):' | |||||
1271 | buffer_lens = [] if 'buffers' in keys else None |
|
1271 | buffer_lens = [] if 'buffers' in keys else None | |
1272 | result_buffer_lens = [] if 'result_buffers' in keys else None |
|
1272 | result_buffer_lens = [] if 'result_buffers' in keys else None | |
1273 | else: |
|
1273 | else: | |
1274 |
buffer_lens = |
|
1274 | buffer_lens = None | |
1275 |
result_buffer_lens = |
|
1275 | result_buffer_lens = None | |
1276 |
|
1276 | |||
1277 | for rec in records: |
|
1277 | for rec in records: | |
1278 | # buffers may be None, so double check |
|
1278 | # buffers may be None, so double check | |
|
1279 | b = rec.pop('buffers', empty) or empty | |||
1279 | if buffer_lens is not None: |
|
1280 | if buffer_lens is not None: | |
1280 | b = rec.pop('buffers', empty) or empty |
|
|||
1281 | buffer_lens.append(len(b)) |
|
1281 | buffer_lens.append(len(b)) | |
1282 | buffers.extend(b) |
|
1282 | buffers.extend(b) | |
|
1283 | rb = rec.pop('result_buffers', empty) or empty | |||
1283 | if result_buffer_lens is not None: |
|
1284 | if result_buffer_lens is not None: | |
1284 | rb = rec.pop('result_buffers', empty) or empty |
|
|||
1285 | result_buffer_lens.append(len(rb)) |
|
1285 | result_buffer_lens.append(len(rb)) | |
1286 | buffers.extend(rb) |
|
1286 | buffers.extend(rb) | |
1287 | content = dict(status='ok', records=records, buffer_lens=buffer_lens, |
|
1287 | content = dict(status='ok', records=records, buffer_lens=buffer_lens, |
@@ -191,6 +191,8 b' class TaskScheduler(SessionFactory):' | |||||
191 |
|
191 | |||
192 | def start(self): |
|
192 | def start(self): | |
193 | self.engine_stream.on_recv(self.dispatch_result, copy=False) |
|
193 | self.engine_stream.on_recv(self.dispatch_result, copy=False) | |
|
194 | self.client_stream.on_recv(self.dispatch_submission, copy=False) | |||
|
195 | ||||
194 | self._notification_handlers = dict( |
|
196 | self._notification_handlers = dict( | |
195 | registration_notification = self._register_engine, |
|
197 | registration_notification = self._register_engine, | |
196 | unregistration_notification = self._unregister_engine |
|
198 | unregistration_notification = self._unregister_engine | |
@@ -247,8 +249,7 b' class TaskScheduler(SessionFactory):' | |||||
247 | self.completed[uid] = set() |
|
249 | self.completed[uid] = set() | |
248 | self.failed[uid] = set() |
|
250 | self.failed[uid] = set() | |
249 | self.pending[uid] = {} |
|
251 | self.pending[uid] = {} | |
250 | if len(self.targets) == 1: |
|
252 | ||
251 | self.resume_receiving() |
|
|||
252 | # rescan the graph: |
|
253 | # rescan the graph: | |
253 | self.update_graph(None) |
|
254 | self.update_graph(None) | |
254 |
|
255 | |||
@@ -256,7 +257,7 b' class TaskScheduler(SessionFactory):' | |||||
256 | """Existing engine with ident `uid` became unavailable.""" |
|
257 | """Existing engine with ident `uid` became unavailable.""" | |
257 | if len(self.targets) == 1: |
|
258 | if len(self.targets) == 1: | |
258 | # this was our only engine |
|
259 | # this was our only engine | |
259 | self.stop_receiving() |
|
260 | pass | |
260 |
|
261 | |||
261 | # handle any potentially finished tasks: |
|
262 | # handle any potentially finished tasks: | |
262 | self.engine_stream.flush() |
|
263 | self.engine_stream.flush() | |
@@ -445,6 +446,11 b' class TaskScheduler(SessionFactory):' | |||||
445 |
|
446 | |||
446 | def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout): |
|
447 | def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout): | |
447 | """check location dependencies, and run if they are met.""" |
|
448 | """check location dependencies, and run if they are met.""" | |
|
449 | self.log.debug("Attempting to assign task %s", msg_id) | |||
|
450 | if not self.targets: | |||
|
451 | # no engines, definitely can't run | |||
|
452 | return False | |||
|
453 | ||||
448 | blacklist = self.blacklist.setdefault(msg_id, set()) |
|
454 | blacklist = self.blacklist.setdefault(msg_id, set()) | |
449 | if follow or targets or blacklist or self.hwm: |
|
455 | if follow or targets or blacklist or self.hwm: | |
450 | # we need a can_run filter |
|
456 | # we need a can_run filter |
@@ -223,6 +223,14 b' class TestClient(ClusterTestCase):' | |||||
223 | for rec in found: |
|
223 | for rec in found: | |
224 | self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed'])) |
|
224 | self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed'])) | |
225 |
|
225 | |||
|
226 | def test_db_query_default_keys(self): | |||
|
227 | """default db_query excludes buffers""" | |||
|
228 | found = self.client.db_query({'msg_id': {'$ne' : ''}}) | |||
|
229 | for rec in found: | |||
|
230 | keys = set(rec.keys()) | |||
|
231 | self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys) | |||
|
232 | self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys) | |||
|
233 | ||||
226 | def test_db_query_msg_id(self): |
|
234 | def test_db_query_msg_id(self): | |
227 | """ensure msg_id is always in db queries""" |
|
235 | """ensure msg_id is always in db queries""" | |
228 | found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed']) |
|
236 | found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed']) |
General Comments 0
You need to be logged in to leave comments.
Login now