##// END OF EJS Templates
resubmitted tasks are now wholly separate (new msg_ids)...
MinRK -
Show More
@@ -1,1570 +1,1566 b''
1 1 """A semi-synchronous Client for the ZMQ cluster
2 2
3 3 Authors:
4 4
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import json
20 20 import sys
21 21 from threading import Thread, Event
22 22 import time
23 23 import warnings
24 24 from datetime import datetime
25 25 from getpass import getpass
26 26 from pprint import pprint
27 27
28 28 pjoin = os.path.join
29 29
30 30 import zmq
31 31 # from zmq.eventloop import ioloop, zmqstream
32 32
33 33 from IPython.config.configurable import MultipleInstanceError
34 34 from IPython.core.application import BaseIPythonApplication
35 35
36 36 from IPython.utils.jsonutil import rekey
37 37 from IPython.utils.localinterfaces import LOCAL_IPS
38 38 from IPython.utils.path import get_ipython_dir
39 39 from IPython.utils.py3compat import cast_bytes
40 40 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
41 41 Dict, List, Bool, Set, Any)
42 42 from IPython.external.decorator import decorator
43 43 from IPython.external.ssh import tunnel
44 44
45 45 from IPython.parallel import Reference
46 46 from IPython.parallel import error
47 47 from IPython.parallel import util
48 48
49 49 from IPython.zmq.session import Session, Message
50 50
51 51 from .asyncresult import AsyncResult, AsyncHubResult
52 52 from IPython.core.profiledir import ProfileDir, ProfileDirError
53 53 from .view import DirectView, LoadBalancedView
54 54
55 55 if sys.version_info[0] >= 3:
56 56 # xrange is used in a couple 'isinstance' tests in py2
57 57 # should be just 'range' in 3k
58 58 xrange = range
59 59
60 60 #--------------------------------------------------------------------------
61 61 # Decorators for Client methods
62 62 #--------------------------------------------------------------------------
63 63
64 64 @decorator
65 65 def spin_first(f, self, *args, **kwargs):
66 66 """Call spin() to sync state prior to calling the method."""
67 67 self.spin()
68 68 return f(self, *args, **kwargs)
69 69
70 70
71 71 #--------------------------------------------------------------------------
72 72 # Classes
73 73 #--------------------------------------------------------------------------
74 74
75 75 class Metadata(dict):
76 76 """Subclass of dict for initializing metadata values.
77 77
78 78 Attribute access works on keys.
79 79
80 80 These objects have a strict set of keys - errors will raise if you try
81 81 to add new keys.
82 82 """
83 83 def __init__(self, *args, **kwargs):
84 84 dict.__init__(self)
85 85 md = {'msg_id' : None,
86 86 'submitted' : None,
87 87 'started' : None,
88 88 'completed' : None,
89 89 'received' : None,
90 90 'engine_uuid' : None,
91 91 'engine_id' : None,
92 92 'follow' : None,
93 93 'after' : None,
94 94 'status' : None,
95 95
96 96 'pyin' : None,
97 97 'pyout' : None,
98 98 'pyerr' : None,
99 99 'stdout' : '',
100 100 'stderr' : '',
101 101 'outputs' : [],
102 102 }
103 103 self.update(md)
104 104 self.update(dict(*args, **kwargs))
105 105
106 106 def __getattr__(self, key):
107 107 """getattr aliased to getitem"""
108 108 if key in self.iterkeys():
109 109 return self[key]
110 110 else:
111 111 raise AttributeError(key)
112 112
113 113 def __setattr__(self, key, value):
114 114 """setattr aliased to setitem, with strict"""
115 115 if key in self.iterkeys():
116 116 self[key] = value
117 117 else:
118 118 raise AttributeError(key)
119 119
120 120 def __setitem__(self, key, value):
121 121 """strict static key enforcement"""
122 122 if key in self.iterkeys():
123 123 dict.__setitem__(self, key, value)
124 124 else:
125 125 raise KeyError(key)
126 126
127 127
128 128 class Client(HasTraits):
129 129 """A semi-synchronous client to the IPython ZMQ cluster
130 130
131 131 Parameters
132 132 ----------
133 133
134 134 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
135 135 Connection information for the Hub's registration. If a json connector
136 136 file is given, then likely no further configuration is necessary.
137 137 [Default: use profile]
138 138 profile : bytes
139 139 The name of the Cluster profile to be used to find connector information.
140 140 If run from an IPython application, the default profile will be the same
141 141 as the running application, otherwise it will be 'default'.
142 142 context : zmq.Context
143 143 Pass an existing zmq.Context instance, otherwise the client will create its own.
144 144 debug : bool
145 145 flag for lots of message printing for debug purposes
146 146 timeout : int/float
147 147 time (in seconds) to wait for connection replies from the Hub
148 148 [Default: 10]
149 149
150 150 #-------------- session related args ----------------
151 151
152 152 config : Config object
153 153 If specified, this will be relayed to the Session for configuration
154 154 username : str
155 155 set username for the session object
156 156 packer : str (import_string) or callable
157 157 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
158 158 function to serialize messages. Must support same input as
159 159 JSON, and output must be bytes.
160 160 You can pass a callable directly as `pack`
161 161 unpacker : str (import_string) or callable
162 162 The inverse of packer. Only necessary if packer is specified as *not* one
163 163 of 'json' or 'pickle'.
164 164
165 165 #-------------- ssh related args ----------------
166 166 # These are args for configuring the ssh tunnel to be used
167 167 # credentials are used to forward connections over ssh to the Controller
168 168 # Note that the ip given in `addr` needs to be relative to sshserver
169 169 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
170 170 # and set sshserver as the same machine the Controller is on. However,
171 171 # the only requirement is that sshserver is able to see the Controller
172 172 # (i.e. is within the same trusted network).
173 173
174 174 sshserver : str
175 175 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
176 176 If keyfile or password is specified, and this is not, it will default to
177 177 the ip given in addr.
178 178 sshkey : str; path to ssh private key file
179 179 This specifies a key to be used in ssh login, default None.
180 180 Regular default ssh keys will be used without specifying this argument.
181 181 password : str
182 182 Your ssh password to sshserver. Note that if this is left None,
183 183 you will be prompted for it if passwordless key based login is unavailable.
184 184 paramiko : bool
185 185 flag for whether to use paramiko instead of shell ssh for tunneling.
186 186 [default: True on win32, False else]
187 187
188 188 ------- exec authentication args -------
189 189 If even localhost is untrusted, you can have some protection against
190 190 unauthorized execution by signing messages with HMAC digests.
191 191 Messages are still sent as cleartext, so if someone can snoop your
192 192 loopback traffic this will not protect your privacy, but will prevent
193 193 unauthorized execution.
194 194
195 195 exec_key : str
196 196 an authentication key or file containing a key
197 197 default: None
198 198
199 199
200 200 Attributes
201 201 ----------
202 202
203 203 ids : list of int engine IDs
204 204 requesting the ids attribute always synchronizes
205 205 the registration state. To request ids without synchronization,
206 206 use semi-private _ids attributes.
207 207
208 208 history : list of msg_ids
209 209 a list of msg_ids, keeping track of all the execution
210 210 messages you have submitted in order.
211 211
212 212 outstanding : set of msg_ids
213 213 a set of msg_ids that have been submitted, but whose
214 214 results have not yet been received.
215 215
216 216 results : dict
217 217 a dict of all our results, keyed by msg_id
218 218
219 219 block : bool
220 220 determines default behavior when block not specified
221 221 in execution methods
222 222
223 223 Methods
224 224 -------
225 225
226 226 spin
227 227 flushes incoming results and registration state changes
228 228 control methods spin, and requesting `ids` also ensures up to date
229 229
230 230 wait
231 231 wait on one or more msg_ids
232 232
233 233 execution methods
234 234 apply
235 235 legacy: execute, run
236 236
237 237 data movement
238 238 push, pull, scatter, gather
239 239
240 240 query methods
241 241 queue_status, get_result, purge, result_status
242 242
243 243 control methods
244 244 abort, shutdown
245 245
246 246 """
247 247
248 248
249 249 block = Bool(False)
250 250 outstanding = Set()
251 251 results = Instance('collections.defaultdict', (dict,))
252 252 metadata = Instance('collections.defaultdict', (Metadata,))
253 253 history = List()
254 254 debug = Bool(False)
255 255 _spin_thread = Any()
256 256 _stop_spinning = Any()
257 257
258 258 profile=Unicode()
259 259 def _profile_default(self):
260 260 if BaseIPythonApplication.initialized():
261 261 # an IPython app *might* be running, try to get its profile
262 262 try:
263 263 return BaseIPythonApplication.instance().profile
264 264 except (AttributeError, MultipleInstanceError):
265 265 # could be a *different* subclass of config.Application,
266 266 # which would raise one of these two errors.
267 267 return u'default'
268 268 else:
269 269 return u'default'
270 270
271 271
272 272 _outstanding_dict = Instance('collections.defaultdict', (set,))
273 273 _ids = List()
274 274 _connected=Bool(False)
275 275 _ssh=Bool(False)
276 276 _context = Instance('zmq.Context')
277 277 _config = Dict()
278 278 _engines=Instance(util.ReverseDict, (), {})
279 279 # _hub_socket=Instance('zmq.Socket')
280 280 _query_socket=Instance('zmq.Socket')
281 281 _control_socket=Instance('zmq.Socket')
282 282 _iopub_socket=Instance('zmq.Socket')
283 283 _notification_socket=Instance('zmq.Socket')
284 284 _mux_socket=Instance('zmq.Socket')
285 285 _task_socket=Instance('zmq.Socket')
286 286 _task_scheme=Unicode()
287 287 _closed = False
288 288 _ignored_control_replies=Integer(0)
289 289 _ignored_hub_replies=Integer(0)
290 290
291 291 def __new__(self, *args, **kw):
292 292 # don't raise on positional args
293 293 return HasTraits.__new__(self, **kw)
294 294
295 295 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
296 296 context=None, debug=False, exec_key=None,
297 297 sshserver=None, sshkey=None, password=None, paramiko=None,
298 298 timeout=10, **extra_args
299 299 ):
300 300 if profile:
301 301 super(Client, self).__init__(debug=debug, profile=profile)
302 302 else:
303 303 super(Client, self).__init__(debug=debug)
304 304 if context is None:
305 305 context = zmq.Context.instance()
306 306 self._context = context
307 307 self._stop_spinning = Event()
308 308
309 309 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
310 310 if self._cd is not None:
311 311 if url_or_file is None:
312 312 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
313 313 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
314 314 " Please specify at least one of url_or_file or profile."
315 315
316 316 if not util.is_url(url_or_file):
317 317 # it's not a url, try for a file
318 318 if not os.path.exists(url_or_file):
319 319 if self._cd:
320 320 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
321 321 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
322 322 with open(url_or_file) as f:
323 323 cfg = json.loads(f.read())
324 324 else:
325 325 cfg = {'url':url_or_file}
326 326
327 327 # sync defaults from args, json:
328 328 if sshserver:
329 329 cfg['ssh'] = sshserver
330 330 if exec_key:
331 331 cfg['exec_key'] = exec_key
332 332 exec_key = cfg['exec_key']
333 333 location = cfg.setdefault('location', None)
334 334 cfg['url'] = util.disambiguate_url(cfg['url'], location)
335 335 url = cfg['url']
336 336 proto,addr,port = util.split_url(url)
337 337 if location is not None and addr == '127.0.0.1':
338 338 # location specified, and connection is expected to be local
339 339 if location not in LOCAL_IPS and not sshserver:
340 340 # load ssh from JSON *only* if the controller is not on
341 341 # this machine
342 342 sshserver=cfg['ssh']
343 343 if location not in LOCAL_IPS and not sshserver:
344 344 # warn if no ssh specified, but SSH is probably needed
345 345 # This is only a warning, because the most likely cause
346 346 # is a local Controller on a laptop whose IP is dynamic
347 347 warnings.warn("""
348 348 Controller appears to be listening on localhost, but not on this machine.
349 349 If this is true, you should specify Client(...,sshserver='you@%s')
350 350 or instruct your controller to listen on an external IP."""%location,
351 351 RuntimeWarning)
352 352 elif not sshserver:
353 353 # otherwise sync with cfg
354 354 sshserver = cfg['ssh']
355 355
356 356 self._config = cfg
357 357
358 358 self._ssh = bool(sshserver or sshkey or password)
359 359 if self._ssh and sshserver is None:
360 360 # default to ssh via localhost
361 361 sshserver = url.split('://')[1].split(':')[0]
362 362 if self._ssh and password is None:
363 363 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
364 364 password=False
365 365 else:
366 366 password = getpass("SSH Password for %s: "%sshserver)
367 367 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
368 368
369 369 # configure and construct the session
370 370 if exec_key is not None:
371 371 if os.path.isfile(exec_key):
372 372 extra_args['keyfile'] = exec_key
373 373 else:
374 374 exec_key = cast_bytes(exec_key)
375 375 extra_args['key'] = exec_key
376 376 self.session = Session(**extra_args)
377 377
378 378 self._query_socket = self._context.socket(zmq.DEALER)
379 379 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
380 380 if self._ssh:
381 381 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
382 382 else:
383 383 self._query_socket.connect(url)
384 384
385 385 self.session.debug = self.debug
386 386
387 387 self._notification_handlers = {'registration_notification' : self._register_engine,
388 388 'unregistration_notification' : self._unregister_engine,
389 389 'shutdown_notification' : lambda msg: self.close(),
390 390 }
391 391 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
392 392 'apply_reply' : self._handle_apply_reply}
393 393 self._connect(sshserver, ssh_kwargs, timeout)
394 394
395 395 def __del__(self):
396 396 """cleanup sockets, but _not_ context."""
397 397 self.close()
398 398
399 399 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
400 400 if ipython_dir is None:
401 401 ipython_dir = get_ipython_dir()
402 402 if profile_dir is not None:
403 403 try:
404 404 self._cd = ProfileDir.find_profile_dir(profile_dir)
405 405 return
406 406 except ProfileDirError:
407 407 pass
408 408 elif profile is not None:
409 409 try:
410 410 self._cd = ProfileDir.find_profile_dir_by_name(
411 411 ipython_dir, profile)
412 412 return
413 413 except ProfileDirError:
414 414 pass
415 415 self._cd = None
416 416
417 417 def _update_engines(self, engines):
418 418 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
419 419 for k,v in engines.iteritems():
420 420 eid = int(k)
421 421 self._engines[eid] = v
422 422 self._ids.append(eid)
423 423 self._ids = sorted(self._ids)
424 424 if sorted(self._engines.keys()) != range(len(self._engines)) and \
425 425 self._task_scheme == 'pure' and self._task_socket:
426 426 self._stop_scheduling_tasks()
427 427
428 428 def _stop_scheduling_tasks(self):
429 429 """Stop scheduling tasks because an engine has been unregistered
430 430 from a pure ZMQ scheduler.
431 431 """
432 432 self._task_socket.close()
433 433 self._task_socket = None
434 434 msg = "An engine has been unregistered, and we are using pure " +\
435 435 "ZMQ task scheduling. Task farming will be disabled."
436 436 if self.outstanding:
437 437 msg += " If you were running tasks when this happened, " +\
438 438 "some `outstanding` msg_ids may never resolve."
439 439 warnings.warn(msg, RuntimeWarning)
440 440
441 441 def _build_targets(self, targets):
442 442 """Turn valid target IDs or 'all' into two lists:
443 443 (int_ids, uuids).
444 444 """
445 445 if not self._ids:
446 446 # flush notification socket if no engines yet, just in case
447 447 if not self.ids:
448 448 raise error.NoEnginesRegistered("Can't build targets without any engines")
449 449
450 450 if targets is None:
451 451 targets = self._ids
452 452 elif isinstance(targets, basestring):
453 453 if targets.lower() == 'all':
454 454 targets = self._ids
455 455 else:
456 456 raise TypeError("%r not valid str target, must be 'all'"%(targets))
457 457 elif isinstance(targets, int):
458 458 if targets < 0:
459 459 targets = self.ids[targets]
460 460 if targets not in self._ids:
461 461 raise IndexError("No such engine: %i"%targets)
462 462 targets = [targets]
463 463
464 464 if isinstance(targets, slice):
465 465 indices = range(len(self._ids))[targets]
466 466 ids = self.ids
467 467 targets = [ ids[i] for i in indices ]
468 468
469 469 if not isinstance(targets, (tuple, list, xrange)):
470 470 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
471 471
472 472 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
473 473
474 474 def _connect(self, sshserver, ssh_kwargs, timeout):
475 475 """setup all our socket connections to the cluster. This is called from
476 476 __init__."""
477 477
478 478 # Maybe allow reconnecting?
479 479 if self._connected:
480 480 return
481 481 self._connected=True
482 482
483 483 def connect_socket(s, url):
484 484 url = util.disambiguate_url(url, self._config['location'])
485 485 if self._ssh:
486 486 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
487 487 else:
488 488 return s.connect(url)
489 489
490 490 self.session.send(self._query_socket, 'connection_request')
491 491 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
492 492 poller = zmq.Poller()
493 493 poller.register(self._query_socket, zmq.POLLIN)
494 494 # poll expects milliseconds, timeout is seconds
495 495 evts = poller.poll(timeout*1000)
496 496 if not evts:
497 497 raise error.TimeoutError("Hub connection request timed out")
498 498 idents,msg = self.session.recv(self._query_socket,mode=0)
499 499 if self.debug:
500 500 pprint(msg)
501 501 msg = Message(msg)
502 502 content = msg.content
503 503 self._config['registration'] = dict(content)
504 504 if content.status == 'ok':
505 505 ident = self.session.bsession
506 506 if content.mux:
507 507 self._mux_socket = self._context.socket(zmq.DEALER)
508 508 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
509 509 connect_socket(self._mux_socket, content.mux)
510 510 if content.task:
511 511 self._task_scheme, task_addr = content.task
512 512 self._task_socket = self._context.socket(zmq.DEALER)
513 513 self._task_socket.setsockopt(zmq.IDENTITY, ident)
514 514 connect_socket(self._task_socket, task_addr)
515 515 if content.notification:
516 516 self._notification_socket = self._context.socket(zmq.SUB)
517 517 connect_socket(self._notification_socket, content.notification)
518 518 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
519 519 # if content.query:
520 520 # self._query_socket = self._context.socket(zmq.DEALER)
521 521 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
522 522 # connect_socket(self._query_socket, content.query)
523 523 if content.control:
524 524 self._control_socket = self._context.socket(zmq.DEALER)
525 525 self._control_socket.setsockopt(zmq.IDENTITY, ident)
526 526 connect_socket(self._control_socket, content.control)
527 527 if content.iopub:
528 528 self._iopub_socket = self._context.socket(zmq.SUB)
529 529 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
530 530 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
531 531 connect_socket(self._iopub_socket, content.iopub)
532 532 self._update_engines(dict(content.engines))
533 533 else:
534 534 self._connected = False
535 535 raise Exception("Failed to connect!")
536 536
537 537 #--------------------------------------------------------------------------
538 538 # handlers and callbacks for incoming messages
539 539 #--------------------------------------------------------------------------
540 540
541 541 def _unwrap_exception(self, content):
542 542 """unwrap exception, and remap engine_id to int."""
543 543 e = error.unwrap_exception(content)
544 544 # print e.traceback
545 545 if e.engine_info:
546 546 e_uuid = e.engine_info['engine_uuid']
547 547 eid = self._engines[e_uuid]
548 548 e.engine_info['engine_id'] = eid
549 549 return e
550 550
551 551 def _extract_metadata(self, header, parent, content):
552 552 md = {'msg_id' : parent['msg_id'],
553 553 'received' : datetime.now(),
554 554 'engine_uuid' : header.get('engine', None),
555 555 'follow' : parent.get('follow', []),
556 556 'after' : parent.get('after', []),
557 557 'status' : content['status'],
558 558 }
559 559
560 560 if md['engine_uuid'] is not None:
561 561 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
562 562
563 563 if 'date' in parent:
564 564 md['submitted'] = parent['date']
565 565 if 'started' in header:
566 566 md['started'] = header['started']
567 567 if 'date' in header:
568 568 md['completed'] = header['date']
569 569 return md
570 570
571 571 def _register_engine(self, msg):
572 572 """Register a new engine, and update our connection info."""
573 573 content = msg['content']
574 574 eid = content['id']
575 575 d = {eid : content['queue']}
576 576 self._update_engines(d)
577 577
578 578 def _unregister_engine(self, msg):
579 579 """Unregister an engine that has died."""
580 580 content = msg['content']
581 581 eid = int(content['id'])
582 582 if eid in self._ids:
583 583 self._ids.remove(eid)
584 584 uuid = self._engines.pop(eid)
585 585
586 586 self._handle_stranded_msgs(eid, uuid)
587 587
588 588 if self._task_socket and self._task_scheme == 'pure':
589 589 self._stop_scheduling_tasks()
590 590
591 591 def _handle_stranded_msgs(self, eid, uuid):
592 592 """Handle messages known to be on an engine when the engine unregisters.
593 593
594 594 It is possible that this will fire prematurely - that is, an engine will
595 595 go down after completing a result, and the client will be notified
596 596 of the unregistration and later receive the successful result.
597 597 """
598 598
599 599 outstanding = self._outstanding_dict[uuid]
600 600
601 601 for msg_id in list(outstanding):
602 602 if msg_id in self.results:
603 603 # we already
604 604 continue
605 605 try:
606 606 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
607 607 except:
608 608 content = error.wrap_exception()
609 609 # build a fake message:
610 610 parent = {}
611 611 header = {}
612 612 parent['msg_id'] = msg_id
613 613 header['engine'] = uuid
614 614 header['date'] = datetime.now()
615 615 msg = dict(parent_header=parent, header=header, content=content)
616 616 self._handle_apply_reply(msg)
617 617
618 618 def _handle_execute_reply(self, msg):
619 619 """Save the reply to an execute_request into our results.
620 620
621 621 execute messages are never actually used. apply is used instead.
622 622 """
623 623
624 624 parent = msg['parent_header']
625 625 msg_id = parent['msg_id']
626 626 if msg_id not in self.outstanding:
627 627 if msg_id in self.history:
628 628 print ("got stale result: %s"%msg_id)
629 629 else:
630 630 print ("got unknown result: %s"%msg_id)
631 631 else:
632 632 self.outstanding.remove(msg_id)
633 633
634 634 content = msg['content']
635 635 header = msg['header']
636 636
637 637 # construct metadata:
638 638 md = self.metadata[msg_id]
639 639 md.update(self._extract_metadata(header, parent, content))
640 640 # is this redundant?
641 641 self.metadata[msg_id] = md
642 642
643 643 e_outstanding = self._outstanding_dict[md['engine_uuid']]
644 644 if msg_id in e_outstanding:
645 645 e_outstanding.remove(msg_id)
646 646
647 647 # construct result:
648 648 if content['status'] == 'ok':
649 649 self.results[msg_id] = content
650 650 elif content['status'] == 'aborted':
651 651 self.results[msg_id] = error.TaskAborted(msg_id)
652 652 elif content['status'] == 'resubmitted':
653 653 # TODO: handle resubmission
654 654 pass
655 655 else:
656 656 self.results[msg_id] = self._unwrap_exception(content)
657 657
658 658 def _handle_apply_reply(self, msg):
659 659 """Save the reply to an apply_request into our results."""
660 660 parent = msg['parent_header']
661 661 msg_id = parent['msg_id']
662 662 if msg_id not in self.outstanding:
663 663 if msg_id in self.history:
664 664 print ("got stale result: %s"%msg_id)
665 665 print self.results[msg_id]
666 666 print msg
667 667 else:
668 668 print ("got unknown result: %s"%msg_id)
669 669 else:
670 670 self.outstanding.remove(msg_id)
671 671 content = msg['content']
672 672 header = msg['header']
673 673
674 674 # construct metadata:
675 675 md = self.metadata[msg_id]
676 676 md.update(self._extract_metadata(header, parent, content))
677 677 # is this redundant?
678 678 self.metadata[msg_id] = md
679 679
680 680 e_outstanding = self._outstanding_dict[md['engine_uuid']]
681 681 if msg_id in e_outstanding:
682 682 e_outstanding.remove(msg_id)
683 683
684 684 # construct result:
685 685 if content['status'] == 'ok':
686 686 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
687 687 elif content['status'] == 'aborted':
688 688 self.results[msg_id] = error.TaskAborted(msg_id)
689 689 elif content['status'] == 'resubmitted':
690 690 # TODO: handle resubmission
691 691 pass
692 692 else:
693 693 self.results[msg_id] = self._unwrap_exception(content)
694 694
695 695 def _flush_notifications(self):
696 696 """Flush notifications of engine registrations waiting
697 697 in ZMQ queue."""
698 698 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
699 699 while msg is not None:
700 700 if self.debug:
701 701 pprint(msg)
702 702 msg_type = msg['header']['msg_type']
703 703 handler = self._notification_handlers.get(msg_type, None)
704 704 if handler is None:
705 705 raise Exception("Unhandled message type: %s"%msg.msg_type)
706 706 else:
707 707 handler(msg)
708 708 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
709 709
710 710 def _flush_results(self, sock):
711 711 """Flush task or queue results waiting in ZMQ queue."""
712 712 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
713 713 while msg is not None:
714 714 if self.debug:
715 715 pprint(msg)
716 716 msg_type = msg['header']['msg_type']
717 717 handler = self._queue_handlers.get(msg_type, None)
718 718 if handler is None:
719 719 raise Exception("Unhandled message type: %s"%msg.msg_type)
720 720 else:
721 721 handler(msg)
722 722 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
723 723
724 724 def _flush_control(self, sock):
725 725 """Flush replies from the control channel waiting
726 726 in the ZMQ queue.
727 727
728 728 Currently: ignore them."""
729 729 if self._ignored_control_replies <= 0:
730 730 return
731 731 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
732 732 while msg is not None:
733 733 self._ignored_control_replies -= 1
734 734 if self.debug:
735 735 pprint(msg)
736 736 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
737 737
738 738 def _flush_ignored_control(self):
739 739 """flush ignored control replies"""
740 740 while self._ignored_control_replies > 0:
741 741 self.session.recv(self._control_socket)
742 742 self._ignored_control_replies -= 1
743 743
744 744 def _flush_ignored_hub_replies(self):
745 745 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
746 746 while msg is not None:
747 747 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
748 748
749 749 def _flush_iopub(self, sock):
750 750 """Flush replies from the iopub channel waiting
751 751 in the ZMQ queue.
752 752 """
753 753 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
754 754 while msg is not None:
755 755 if self.debug:
756 756 pprint(msg)
757 757 parent = msg['parent_header']
758 758 # ignore IOPub messages with no parent.
759 759 # Caused by print statements or warnings from before the first execution.
760 760 if not parent:
761 761 continue
762 762 msg_id = parent['msg_id']
763 763 content = msg['content']
764 764 header = msg['header']
765 765 msg_type = msg['header']['msg_type']
766 766
767 767 # init metadata:
768 768 md = self.metadata[msg_id]
769 769
770 770 if msg_type == 'stream':
771 771 name = content['name']
772 772 s = md[name] or ''
773 773 md[name] = s + content['data']
774 774 elif msg_type == 'pyerr':
775 775 md.update({'pyerr' : self._unwrap_exception(content)})
776 776 elif msg_type == 'pyin':
777 777 md.update({'pyin' : content['code']})
778 778 elif msg_type == 'display_data':
779 779 md['outputs'].append(content.get('data'))
780 780 elif msg_type == 'pyout':
781 781 md['pyout'] = content.get('data')
782 782 else:
783 783 # unhandled msg_type (status, etc.)
784 784 pass
785 785
786 786 # reduntant?
787 787 self.metadata[msg_id] = md
788 788
789 789 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
790 790
791 791 #--------------------------------------------------------------------------
792 792 # len, getitem
793 793 #--------------------------------------------------------------------------
794 794
795 795 def __len__(self):
796 796 """len(client) returns # of engines."""
797 797 return len(self.ids)
798 798
799 799 def __getitem__(self, key):
800 800 """index access returns DirectView multiplexer objects
801 801
802 802 Must be int, slice, or list/tuple/xrange of ints"""
803 803 if not isinstance(key, (int, slice, tuple, list, xrange)):
804 804 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
805 805 else:
806 806 return self.direct_view(key)
807 807
808 808 #--------------------------------------------------------------------------
809 809 # Begin public methods
810 810 #--------------------------------------------------------------------------
811 811
812 812 @property
813 813 def ids(self):
814 814 """Always up-to-date ids property."""
815 815 self._flush_notifications()
816 816 # always copy:
817 817 return list(self._ids)
818 818
819 819 def close(self):
820 820 if self._closed:
821 821 return
822 822 self.stop_spin_thread()
823 823 snames = filter(lambda n: n.endswith('socket'), dir(self))
824 824 for socket in map(lambda name: getattr(self, name), snames):
825 825 if isinstance(socket, zmq.Socket) and not socket.closed:
826 826 socket.close()
827 827 self._closed = True
828 828
829 829 def _spin_every(self, interval=1):
830 830 """target func for use in spin_thread"""
831 831 while True:
832 832 if self._stop_spinning.is_set():
833 833 return
834 834 time.sleep(interval)
835 835 self.spin()
836 836
837 837 def spin_thread(self, interval=1):
838 838 """call Client.spin() in a background thread on some regular interval
839 839
840 840 This helps ensure that messages don't pile up too much in the zmq queue
841 841 while you are working on other things, or just leaving an idle terminal.
842 842
843 843 It also helps limit potential padding of the `received` timestamp
844 844 on AsyncResult objects, used for timings.
845 845
846 846 Parameters
847 847 ----------
848 848
849 849 interval : float, optional
850 850 The interval on which to spin the client in the background thread
851 851 (simply passed to time.sleep).
852 852
853 853 Notes
854 854 -----
855 855
856 856 For precision timing, you may want to use this method to put a bound
857 857 on the jitter (in seconds) in `received` timestamps used
858 858 in AsyncResult.wall_time.
859 859
860 860 """
861 861 if self._spin_thread is not None:
862 862 self.stop_spin_thread()
863 863 self._stop_spinning.clear()
864 864 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
865 865 self._spin_thread.daemon = True
866 866 self._spin_thread.start()
867 867
868 868 def stop_spin_thread(self):
869 869 """stop background spin_thread, if any"""
870 870 if self._spin_thread is not None:
871 871 self._stop_spinning.set()
872 872 self._spin_thread.join()
873 873 self._spin_thread = None
874 874
875 875 def spin(self):
876 876 """Flush any registration notifications and execution results
877 877 waiting in the ZMQ queue.
878 878 """
879 879 if self._notification_socket:
880 880 self._flush_notifications()
881 881 if self._mux_socket:
882 882 self._flush_results(self._mux_socket)
883 883 if self._task_socket:
884 884 self._flush_results(self._task_socket)
885 885 if self._control_socket:
886 886 self._flush_control(self._control_socket)
887 887 if self._iopub_socket:
888 888 self._flush_iopub(self._iopub_socket)
889 889 if self._query_socket:
890 890 self._flush_ignored_hub_replies()
891 891
892 892 def wait(self, jobs=None, timeout=-1):
893 893 """waits on one or more `jobs`, for up to `timeout` seconds.
894 894
895 895 Parameters
896 896 ----------
897 897
898 898 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
899 899 ints are indices to self.history
900 900 strs are msg_ids
901 901 default: wait on all outstanding messages
902 902 timeout : float
903 903 a time in seconds, after which to give up.
904 904 default is -1, which means no timeout
905 905
906 906 Returns
907 907 -------
908 908
909 909 True : when all msg_ids are done
910 910 False : timeout reached, some msg_ids still outstanding
911 911 """
912 912 tic = time.time()
913 913 if jobs is None:
914 914 theids = self.outstanding
915 915 else:
916 916 if isinstance(jobs, (int, basestring, AsyncResult)):
917 917 jobs = [jobs]
918 918 theids = set()
919 919 for job in jobs:
920 920 if isinstance(job, int):
921 921 # index access
922 922 job = self.history[job]
923 923 elif isinstance(job, AsyncResult):
924 924 map(theids.add, job.msg_ids)
925 925 continue
926 926 theids.add(job)
927 927 if not theids.intersection(self.outstanding):
928 928 return True
929 929 self.spin()
930 930 while theids.intersection(self.outstanding):
931 931 if timeout >= 0 and ( time.time()-tic ) > timeout:
932 932 break
933 933 time.sleep(1e-3)
934 934 self.spin()
935 935 return len(theids.intersection(self.outstanding)) == 0
936 936
937 937 #--------------------------------------------------------------------------
938 938 # Control methods
939 939 #--------------------------------------------------------------------------
940 940
941 941 @spin_first
942 942 def clear(self, targets=None, block=None):
943 943 """Clear the namespace in target(s)."""
944 944 block = self.block if block is None else block
945 945 targets = self._build_targets(targets)[0]
946 946 for t in targets:
947 947 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
948 948 error = False
949 949 if block:
950 950 self._flush_ignored_control()
951 951 for i in range(len(targets)):
952 952 idents,msg = self.session.recv(self._control_socket,0)
953 953 if self.debug:
954 954 pprint(msg)
955 955 if msg['content']['status'] != 'ok':
956 956 error = self._unwrap_exception(msg['content'])
957 957 else:
958 958 self._ignored_control_replies += len(targets)
959 959 if error:
960 960 raise error
961 961
962 962
963 963 @spin_first
964 964 def abort(self, jobs=None, targets=None, block=None):
965 965 """Abort specific jobs from the execution queues of target(s).
966 966
967 967 This is a mechanism to prevent jobs that have already been submitted
968 968 from executing.
969 969
970 970 Parameters
971 971 ----------
972 972
973 973 jobs : msg_id, list of msg_ids, or AsyncResult
974 974 The jobs to be aborted
975 975
976 976 If unspecified/None: abort all outstanding jobs.
977 977
978 978 """
979 979 block = self.block if block is None else block
980 980 jobs = jobs if jobs is not None else list(self.outstanding)
981 981 targets = self._build_targets(targets)[0]
982 982
983 983 msg_ids = []
984 984 if isinstance(jobs, (basestring,AsyncResult)):
985 985 jobs = [jobs]
986 986 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
987 987 if bad_ids:
988 988 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
989 989 for j in jobs:
990 990 if isinstance(j, AsyncResult):
991 991 msg_ids.extend(j.msg_ids)
992 992 else:
993 993 msg_ids.append(j)
994 994 content = dict(msg_ids=msg_ids)
995 995 for t in targets:
996 996 self.session.send(self._control_socket, 'abort_request',
997 997 content=content, ident=t)
998 998 error = False
999 999 if block:
1000 1000 self._flush_ignored_control()
1001 1001 for i in range(len(targets)):
1002 1002 idents,msg = self.session.recv(self._control_socket,0)
1003 1003 if self.debug:
1004 1004 pprint(msg)
1005 1005 if msg['content']['status'] != 'ok':
1006 1006 error = self._unwrap_exception(msg['content'])
1007 1007 else:
1008 1008 self._ignored_control_replies += len(targets)
1009 1009 if error:
1010 1010 raise error
1011 1011
1012 1012 @spin_first
1013 1013 def shutdown(self, targets=None, restart=False, hub=False, block=None):
1014 1014 """Terminates one or more engine processes, optionally including the hub."""
1015 1015 block = self.block if block is None else block
1016 1016 if hub:
1017 1017 targets = 'all'
1018 1018 targets = self._build_targets(targets)[0]
1019 1019 for t in targets:
1020 1020 self.session.send(self._control_socket, 'shutdown_request',
1021 1021 content={'restart':restart},ident=t)
1022 1022 error = False
1023 1023 if block or hub:
1024 1024 self._flush_ignored_control()
1025 1025 for i in range(len(targets)):
1026 1026 idents,msg = self.session.recv(self._control_socket, 0)
1027 1027 if self.debug:
1028 1028 pprint(msg)
1029 1029 if msg['content']['status'] != 'ok':
1030 1030 error = self._unwrap_exception(msg['content'])
1031 1031 else:
1032 1032 self._ignored_control_replies += len(targets)
1033 1033
1034 1034 if hub:
1035 1035 time.sleep(0.25)
1036 1036 self.session.send(self._query_socket, 'shutdown_request')
1037 1037 idents,msg = self.session.recv(self._query_socket, 0)
1038 1038 if self.debug:
1039 1039 pprint(msg)
1040 1040 if msg['content']['status'] != 'ok':
1041 1041 error = self._unwrap_exception(msg['content'])
1042 1042
1043 1043 if error:
1044 1044 raise error
1045 1045
1046 1046 #--------------------------------------------------------------------------
1047 1047 # Execution related methods
1048 1048 #--------------------------------------------------------------------------
1049 1049
1050 1050 def _maybe_raise(self, result):
1051 1051 """wrapper for maybe raising an exception if apply failed."""
1052 1052 if isinstance(result, error.RemoteError):
1053 1053 raise result
1054 1054
1055 1055 return result
1056 1056
1057 1057 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1058 1058 ident=None):
1059 1059 """construct and send an apply message via a socket.
1060 1060
1061 1061 This is the principal method with which all engine execution is performed by views.
1062 1062 """
1063 1063
1064 1064 assert not self._closed, "cannot use me anymore, I'm closed!"
1065 1065 # defaults:
1066 1066 args = args if args is not None else []
1067 1067 kwargs = kwargs if kwargs is not None else {}
1068 1068 subheader = subheader if subheader is not None else {}
1069 1069
1070 1070 # validate arguments
1071 1071 if not callable(f) and not isinstance(f, Reference):
1072 1072 raise TypeError("f must be callable, not %s"%type(f))
1073 1073 if not isinstance(args, (tuple, list)):
1074 1074 raise TypeError("args must be tuple or list, not %s"%type(args))
1075 1075 if not isinstance(kwargs, dict):
1076 1076 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1077 1077 if not isinstance(subheader, dict):
1078 1078 raise TypeError("subheader must be dict, not %s"%type(subheader))
1079 1079
1080 1080 bufs = util.pack_apply_message(f,args,kwargs)
1081 1081
1082 1082 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1083 1083 subheader=subheader, track=track)
1084 1084
1085 1085 msg_id = msg['header']['msg_id']
1086 1086 self.outstanding.add(msg_id)
1087 1087 if ident:
1088 1088 # possibly routed to a specific engine
1089 1089 if isinstance(ident, list):
1090 1090 ident = ident[-1]
1091 1091 if ident in self._engines.values():
1092 1092 # save for later, in case of engine death
1093 1093 self._outstanding_dict[ident].add(msg_id)
1094 1094 self.history.append(msg_id)
1095 1095 self.metadata[msg_id]['submitted'] = datetime.now()
1096 1096
1097 1097 return msg
1098 1098
1099 1099 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1100 1100 """construct and send an execute request via a socket.
1101 1101
1102 1102 """
1103 1103
1104 1104 assert not self._closed, "cannot use me anymore, I'm closed!"
1105 1105 # defaults:
1106 1106 subheader = subheader if subheader is not None else {}
1107 1107
1108 1108 # validate arguments
1109 1109 if not isinstance(code, basestring):
1110 1110 raise TypeError("code must be text, not %s" % type(code))
1111 1111 if not isinstance(subheader, dict):
1112 1112 raise TypeError("subheader must be dict, not %s" % type(subheader))
1113 1113
1114 1114 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1115 1115
1116 1116
1117 1117 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1118 1118 subheader=subheader)
1119 1119
1120 1120 msg_id = msg['header']['msg_id']
1121 1121 self.outstanding.add(msg_id)
1122 1122 if ident:
1123 1123 # possibly routed to a specific engine
1124 1124 if isinstance(ident, list):
1125 1125 ident = ident[-1]
1126 1126 if ident in self._engines.values():
1127 1127 # save for later, in case of engine death
1128 1128 self._outstanding_dict[ident].add(msg_id)
1129 1129 self.history.append(msg_id)
1130 1130 self.metadata[msg_id]['submitted'] = datetime.now()
1131 1131
1132 1132 return msg
1133 1133
1134 1134 #--------------------------------------------------------------------------
1135 1135 # construct a View object
1136 1136 #--------------------------------------------------------------------------
1137 1137
1138 1138 def load_balanced_view(self, targets=None):
1139 1139 """construct a DirectView object.
1140 1140
1141 1141 If no arguments are specified, create a LoadBalancedView
1142 1142 using all engines.
1143 1143
1144 1144 Parameters
1145 1145 ----------
1146 1146
1147 1147 targets: list,slice,int,etc. [default: use all engines]
1148 1148 The subset of engines across which to load-balance
1149 1149 """
1150 1150 if targets == 'all':
1151 1151 targets = None
1152 1152 if targets is not None:
1153 1153 targets = self._build_targets(targets)[1]
1154 1154 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1155 1155
1156 1156 def direct_view(self, targets='all'):
1157 1157 """construct a DirectView object.
1158 1158
1159 1159 If no targets are specified, create a DirectView using all engines.
1160 1160
1161 1161 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1162 1162 evaluate the target engines at each execution, whereas rc[:] will connect to
1163 1163 all *current* engines, and that list will not change.
1164 1164
1165 1165 That is, 'all' will always use all engines, whereas rc[:] will not use
1166 1166 engines added after the DirectView is constructed.
1167 1167
1168 1168 Parameters
1169 1169 ----------
1170 1170
1171 1171 targets: list,slice,int,etc. [default: use all engines]
1172 1172 The engines to use for the View
1173 1173 """
1174 1174 single = isinstance(targets, int)
1175 1175 # allow 'all' to be lazily evaluated at each execution
1176 1176 if targets != 'all':
1177 1177 targets = self._build_targets(targets)[1]
1178 1178 if single:
1179 1179 targets = targets[0]
1180 1180 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1181 1181
1182 1182 #--------------------------------------------------------------------------
1183 1183 # Query methods
1184 1184 #--------------------------------------------------------------------------
1185 1185
1186 1186 @spin_first
1187 1187 def get_result(self, indices_or_msg_ids=None, block=None):
1188 1188 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1189 1189
1190 1190 If the client already has the results, no request to the Hub will be made.
1191 1191
1192 1192 This is a convenient way to construct AsyncResult objects, which are wrappers
1193 1193 that include metadata about execution, and allow for awaiting results that
1194 1194 were not submitted by this Client.
1195 1195
1196 1196 It can also be a convenient way to retrieve the metadata associated with
1197 1197 blocking execution, since it always retrieves
1198 1198
1199 1199 Examples
1200 1200 --------
1201 1201 ::
1202 1202
1203 1203 In [10]: r = client.apply()
1204 1204
1205 1205 Parameters
1206 1206 ----------
1207 1207
1208 1208 indices_or_msg_ids : integer history index, str msg_id, or list of either
1209 1209 The indices or msg_ids of indices to be retrieved
1210 1210
1211 1211 block : bool
1212 1212 Whether to wait for the result to be done
1213 1213
1214 1214 Returns
1215 1215 -------
1216 1216
1217 1217 AsyncResult
1218 1218 A single AsyncResult object will always be returned.
1219 1219
1220 1220 AsyncHubResult
1221 1221 A subclass of AsyncResult that retrieves results from the Hub
1222 1222
1223 1223 """
1224 1224 block = self.block if block is None else block
1225 1225 if indices_or_msg_ids is None:
1226 1226 indices_or_msg_ids = -1
1227 1227
1228 1228 if not isinstance(indices_or_msg_ids, (list,tuple)):
1229 1229 indices_or_msg_ids = [indices_or_msg_ids]
1230 1230
1231 1231 theids = []
1232 1232 for id in indices_or_msg_ids:
1233 1233 if isinstance(id, int):
1234 1234 id = self.history[id]
1235 1235 if not isinstance(id, basestring):
1236 1236 raise TypeError("indices must be str or int, not %r"%id)
1237 1237 theids.append(id)
1238 1238
1239 1239 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1240 1240 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1241 1241
1242 1242 if remote_ids:
1243 1243 ar = AsyncHubResult(self, msg_ids=theids)
1244 1244 else:
1245 1245 ar = AsyncResult(self, msg_ids=theids)
1246 1246
1247 1247 if block:
1248 1248 ar.wait()
1249 1249
1250 1250 return ar
1251 1251
1252 1252 @spin_first
1253 1253 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1254 1254 """Resubmit one or more tasks.
1255 1255
1256 1256 in-flight tasks may not be resubmitted.
1257 1257
1258 1258 Parameters
1259 1259 ----------
1260 1260
1261 1261 indices_or_msg_ids : integer history index, str msg_id, or list of either
1262 1262 The indices or msg_ids of indices to be retrieved
1263 1263
1264 1264 block : bool
1265 1265 Whether to wait for the result to be done
1266 1266
1267 1267 Returns
1268 1268 -------
1269 1269
1270 1270 AsyncHubResult
1271 1271 A subclass of AsyncResult that retrieves results from the Hub
1272 1272
1273 1273 """
1274 1274 block = self.block if block is None else block
1275 1275 if indices_or_msg_ids is None:
1276 1276 indices_or_msg_ids = -1
1277 1277
1278 1278 if not isinstance(indices_or_msg_ids, (list,tuple)):
1279 1279 indices_or_msg_ids = [indices_or_msg_ids]
1280 1280
1281 1281 theids = []
1282 1282 for id in indices_or_msg_ids:
1283 1283 if isinstance(id, int):
1284 1284 id = self.history[id]
1285 1285 if not isinstance(id, basestring):
1286 1286 raise TypeError("indices must be str or int, not %r"%id)
1287 1287 theids.append(id)
1288 1288
1289 for msg_id in theids:
1290 self.outstanding.discard(msg_id)
1291 if msg_id in self.history:
1292 self.history.remove(msg_id)
1293 self.results.pop(msg_id, None)
1294 self.metadata.pop(msg_id, None)
1295 1289 content = dict(msg_ids = theids)
1296 1290
1297 1291 self.session.send(self._query_socket, 'resubmit_request', content)
1298 1292
1299 1293 zmq.select([self._query_socket], [], [])
1300 1294 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1301 1295 if self.debug:
1302 1296 pprint(msg)
1303 1297 content = msg['content']
1304 1298 if content['status'] != 'ok':
1305 1299 raise self._unwrap_exception(content)
1300 mapping = content['resubmitted']
1301 new_ids = [ mapping[msg_id] for msg_id in theids ]
1306 1302
1307 ar = AsyncHubResult(self, msg_ids=theids)
1303 ar = AsyncHubResult(self, msg_ids=new_ids)
1308 1304
1309 1305 if block:
1310 1306 ar.wait()
1311 1307
1312 1308 return ar
1313 1309
1314 1310 @spin_first
1315 1311 def result_status(self, msg_ids, status_only=True):
1316 1312 """Check on the status of the result(s) of the apply request with `msg_ids`.
1317 1313
1318 1314 If status_only is False, then the actual results will be retrieved, else
1319 1315 only the status of the results will be checked.
1320 1316
1321 1317 Parameters
1322 1318 ----------
1323 1319
1324 1320 msg_ids : list of msg_ids
1325 1321 if int:
1326 1322 Passed as index to self.history for convenience.
1327 1323 status_only : bool (default: True)
1328 1324 if False:
1329 1325 Retrieve the actual results of completed tasks.
1330 1326
1331 1327 Returns
1332 1328 -------
1333 1329
1334 1330 results : dict
1335 1331 There will always be the keys 'pending' and 'completed', which will
1336 1332 be lists of msg_ids that are incomplete or complete. If `status_only`
1337 1333 is False, then completed results will be keyed by their `msg_id`.
1338 1334 """
1339 1335 if not isinstance(msg_ids, (list,tuple)):
1340 1336 msg_ids = [msg_ids]
1341 1337
1342 1338 theids = []
1343 1339 for msg_id in msg_ids:
1344 1340 if isinstance(msg_id, int):
1345 1341 msg_id = self.history[msg_id]
1346 1342 if not isinstance(msg_id, basestring):
1347 1343 raise TypeError("msg_ids must be str, not %r"%msg_id)
1348 1344 theids.append(msg_id)
1349 1345
1350 1346 completed = []
1351 1347 local_results = {}
1352 1348
1353 1349 # comment this block out to temporarily disable local shortcut:
1354 1350 for msg_id in theids:
1355 1351 if msg_id in self.results:
1356 1352 completed.append(msg_id)
1357 1353 local_results[msg_id] = self.results[msg_id]
1358 1354 theids.remove(msg_id)
1359 1355
1360 1356 if theids: # some not locally cached
1361 1357 content = dict(msg_ids=theids, status_only=status_only)
1362 1358 msg = self.session.send(self._query_socket, "result_request", content=content)
1363 1359 zmq.select([self._query_socket], [], [])
1364 1360 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1365 1361 if self.debug:
1366 1362 pprint(msg)
1367 1363 content = msg['content']
1368 1364 if content['status'] != 'ok':
1369 1365 raise self._unwrap_exception(content)
1370 1366 buffers = msg['buffers']
1371 1367 else:
1372 1368 content = dict(completed=[],pending=[])
1373 1369
1374 1370 content['completed'].extend(completed)
1375 1371
1376 1372 if status_only:
1377 1373 return content
1378 1374
1379 1375 failures = []
1380 1376 # load cached results into result:
1381 1377 content.update(local_results)
1382 1378
1383 1379 # update cache with results:
1384 1380 for msg_id in sorted(theids):
1385 1381 if msg_id in content['completed']:
1386 1382 rec = content[msg_id]
1387 1383 parent = rec['header']
1388 1384 header = rec['result_header']
1389 1385 rcontent = rec['result_content']
1390 1386 iodict = rec['io']
1391 1387 if isinstance(rcontent, str):
1392 1388 rcontent = self.session.unpack(rcontent)
1393 1389
1394 1390 md = self.metadata[msg_id]
1395 1391 md.update(self._extract_metadata(header, parent, rcontent))
1396 1392 if rec.get('received'):
1397 1393 md['received'] = rec['received']
1398 1394 md.update(iodict)
1399 1395
1400 1396 if rcontent['status'] == 'ok':
1401 1397 res,buffers = util.unserialize_object(buffers)
1402 1398 else:
1403 1399 print rcontent
1404 1400 res = self._unwrap_exception(rcontent)
1405 1401 failures.append(res)
1406 1402
1407 1403 self.results[msg_id] = res
1408 1404 content[msg_id] = res
1409 1405
1410 1406 if len(theids) == 1 and failures:
1411 1407 raise failures[0]
1412 1408
1413 1409 error.collect_exceptions(failures, "result_status")
1414 1410 return content
1415 1411
1416 1412 @spin_first
1417 1413 def queue_status(self, targets='all', verbose=False):
1418 1414 """Fetch the status of engine queues.
1419 1415
1420 1416 Parameters
1421 1417 ----------
1422 1418
1423 1419 targets : int/str/list of ints/strs
1424 1420 the engines whose states are to be queried.
1425 1421 default : all
1426 1422 verbose : bool
1427 1423 Whether to return lengths only, or lists of ids for each element
1428 1424 """
1429 1425 if targets == 'all':
1430 1426 # allow 'all' to be evaluated on the engine
1431 1427 engine_ids = None
1432 1428 else:
1433 1429 engine_ids = self._build_targets(targets)[1]
1434 1430 content = dict(targets=engine_ids, verbose=verbose)
1435 1431 self.session.send(self._query_socket, "queue_request", content=content)
1436 1432 idents,msg = self.session.recv(self._query_socket, 0)
1437 1433 if self.debug:
1438 1434 pprint(msg)
1439 1435 content = msg['content']
1440 1436 status = content.pop('status')
1441 1437 if status != 'ok':
1442 1438 raise self._unwrap_exception(content)
1443 1439 content = rekey(content)
1444 1440 if isinstance(targets, int):
1445 1441 return content[targets]
1446 1442 else:
1447 1443 return content
1448 1444
1449 1445 @spin_first
1450 1446 def purge_results(self, jobs=[], targets=[]):
1451 1447 """Tell the Hub to forget results.
1452 1448
1453 1449 Individual results can be purged by msg_id, or the entire
1454 1450 history of specific targets can be purged.
1455 1451
1456 1452 Use `purge_results('all')` to scrub everything from the Hub's db.
1457 1453
1458 1454 Parameters
1459 1455 ----------
1460 1456
1461 1457 jobs : str or list of str or AsyncResult objects
1462 1458 the msg_ids whose results should be forgotten.
1463 1459 targets : int/str/list of ints/strs
1464 1460 The targets, by int_id, whose entire history is to be purged.
1465 1461
1466 1462 default : None
1467 1463 """
1468 1464 if not targets and not jobs:
1469 1465 raise ValueError("Must specify at least one of `targets` and `jobs`")
1470 1466 if targets:
1471 1467 targets = self._build_targets(targets)[1]
1472 1468
1473 1469 # construct msg_ids from jobs
1474 1470 if jobs == 'all':
1475 1471 msg_ids = jobs
1476 1472 else:
1477 1473 msg_ids = []
1478 1474 if isinstance(jobs, (basestring,AsyncResult)):
1479 1475 jobs = [jobs]
1480 1476 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1481 1477 if bad_ids:
1482 1478 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1483 1479 for j in jobs:
1484 1480 if isinstance(j, AsyncResult):
1485 1481 msg_ids.extend(j.msg_ids)
1486 1482 else:
1487 1483 msg_ids.append(j)
1488 1484
1489 1485 content = dict(engine_ids=targets, msg_ids=msg_ids)
1490 1486 self.session.send(self._query_socket, "purge_request", content=content)
1491 1487 idents, msg = self.session.recv(self._query_socket, 0)
1492 1488 if self.debug:
1493 1489 pprint(msg)
1494 1490 content = msg['content']
1495 1491 if content['status'] != 'ok':
1496 1492 raise self._unwrap_exception(content)
1497 1493
1498 1494 @spin_first
1499 1495 def hub_history(self):
1500 1496 """Get the Hub's history
1501 1497
1502 1498 Just like the Client, the Hub has a history, which is a list of msg_ids.
1503 1499 This will contain the history of all clients, and, depending on configuration,
1504 1500 may contain history across multiple cluster sessions.
1505 1501
1506 1502 Any msg_id returned here is a valid argument to `get_result`.
1507 1503
1508 1504 Returns
1509 1505 -------
1510 1506
1511 1507 msg_ids : list of strs
1512 1508 list of all msg_ids, ordered by task submission time.
1513 1509 """
1514 1510
1515 1511 self.session.send(self._query_socket, "history_request", content={})
1516 1512 idents, msg = self.session.recv(self._query_socket, 0)
1517 1513
1518 1514 if self.debug:
1519 1515 pprint(msg)
1520 1516 content = msg['content']
1521 1517 if content['status'] != 'ok':
1522 1518 raise self._unwrap_exception(content)
1523 1519 else:
1524 1520 return content['history']
1525 1521
1526 1522 @spin_first
1527 1523 def db_query(self, query, keys=None):
1528 1524 """Query the Hub's TaskRecord database
1529 1525
1530 1526 This will return a list of task record dicts that match `query`
1531 1527
1532 1528 Parameters
1533 1529 ----------
1534 1530
1535 1531 query : mongodb query dict
1536 1532 The search dict. See mongodb query docs for details.
1537 1533 keys : list of strs [optional]
1538 1534 The subset of keys to be returned. The default is to fetch everything but buffers.
1539 1535 'msg_id' will *always* be included.
1540 1536 """
1541 1537 if isinstance(keys, basestring):
1542 1538 keys = [keys]
1543 1539 content = dict(query=query, keys=keys)
1544 1540 self.session.send(self._query_socket, "db_request", content=content)
1545 1541 idents, msg = self.session.recv(self._query_socket, 0)
1546 1542 if self.debug:
1547 1543 pprint(msg)
1548 1544 content = msg['content']
1549 1545 if content['status'] != 'ok':
1550 1546 raise self._unwrap_exception(content)
1551 1547
1552 1548 records = content['records']
1553 1549
1554 1550 buffer_lens = content['buffer_lens']
1555 1551 result_buffer_lens = content['result_buffer_lens']
1556 1552 buffers = msg['buffers']
1557 1553 has_bufs = buffer_lens is not None
1558 1554 has_rbufs = result_buffer_lens is not None
1559 1555 for i,rec in enumerate(records):
1560 1556 # relink buffers
1561 1557 if has_bufs:
1562 1558 blen = buffer_lens[i]
1563 1559 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1564 1560 if has_rbufs:
1565 1561 blen = result_buffer_lens[i]
1566 1562 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1567 1563
1568 1564 return records
1569 1565
1570 1566 __all__ = [ 'Client' ]
@@ -1,1304 +1,1310 b''
1 1 """The IPython Controller Hub with 0MQ
2 2 This is the master object that handles connections from engines and clients,
3 3 and monitors traffic through the various queues.
4 4
5 5 Authors:
6 6
7 7 * Min RK
8 8 """
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2010-2011 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Imports
18 18 #-----------------------------------------------------------------------------
19 19 from __future__ import print_function
20 20
21 21 import sys
22 22 import time
23 23 from datetime import datetime
24 24
25 25 import zmq
26 26 from zmq.eventloop import ioloop
27 27 from zmq.eventloop.zmqstream import ZMQStream
28 28
29 29 # internal:
30 30 from IPython.utils.importstring import import_item
31 31 from IPython.utils.py3compat import cast_bytes
32 32 from IPython.utils.traitlets import (
33 33 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
34 34 )
35 35
36 36 from IPython.parallel import error, util
37 37 from IPython.parallel.factory import RegistrationFactory
38 38
39 39 from IPython.zmq.session import SessionFactory
40 40
41 41 from .heartmonitor import HeartMonitor
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Code
45 45 #-----------------------------------------------------------------------------
46 46
47 47 def _passer(*args, **kwargs):
48 48 return
49 49
50 50 def _printer(*args, **kwargs):
51 51 print (args)
52 52 print (kwargs)
53 53
54 54 def empty_record():
55 55 """Return an empty dict with all record keys."""
56 56 return {
57 57 'msg_id' : None,
58 58 'header' : None,
59 59 'content': None,
60 60 'buffers': None,
61 61 'submitted': None,
62 62 'client_uuid' : None,
63 63 'engine_uuid' : None,
64 64 'started': None,
65 65 'completed': None,
66 66 'resubmitted': None,
67 67 'received': None,
68 68 'result_header' : None,
69 69 'result_content' : None,
70 70 'result_buffers' : None,
71 71 'queue' : None,
72 72 'pyin' : None,
73 73 'pyout': None,
74 74 'pyerr': None,
75 75 'stdout': '',
76 76 'stderr': '',
77 77 }
78 78
79 79 def init_record(msg):
80 80 """Initialize a TaskRecord based on a request."""
81 81 header = msg['header']
82 82 return {
83 83 'msg_id' : header['msg_id'],
84 84 'header' : header,
85 85 'content': msg['content'],
86 86 'buffers': msg['buffers'],
87 87 'submitted': header['date'],
88 88 'client_uuid' : None,
89 89 'engine_uuid' : None,
90 90 'started': None,
91 91 'completed': None,
92 92 'resubmitted': None,
93 93 'received': None,
94 94 'result_header' : None,
95 95 'result_content' : None,
96 96 'result_buffers' : None,
97 97 'queue' : None,
98 98 'pyin' : None,
99 99 'pyout': None,
100 100 'pyerr': None,
101 101 'stdout': '',
102 102 'stderr': '',
103 103 }
104 104
105 105
106 106 class EngineConnector(HasTraits):
107 107 """A simple object for accessing the various zmq connections of an object.
108 108 Attributes are:
109 109 id (int): engine ID
110 110 uuid (str): uuid (unused?)
111 111 queue (str): identity of queue's XREQ socket
112 112 registration (str): identity of registration XREQ socket
113 113 heartbeat (str): identity of heartbeat XREQ socket
114 114 """
115 115 id=Integer(0)
116 116 queue=CBytes()
117 117 control=CBytes()
118 118 registration=CBytes()
119 119 heartbeat=CBytes()
120 120 pending=Set()
121 121
122 122 class HubFactory(RegistrationFactory):
123 123 """The Configurable for setting up a Hub."""
124 124
125 125 # port-pairs for monitoredqueues:
126 126 hb = Tuple(Integer,Integer,config=True,
127 127 help="""XREQ/SUB Port pair for Engine heartbeats""")
128 128 def _hb_default(self):
129 129 return tuple(util.select_random_ports(2))
130 130
131 131 mux = Tuple(Integer,Integer,config=True,
132 132 help="""Engine/Client Port pair for MUX queue""")
133 133
134 134 def _mux_default(self):
135 135 return tuple(util.select_random_ports(2))
136 136
137 137 task = Tuple(Integer,Integer,config=True,
138 138 help="""Engine/Client Port pair for Task queue""")
139 139 def _task_default(self):
140 140 return tuple(util.select_random_ports(2))
141 141
142 142 control = Tuple(Integer,Integer,config=True,
143 143 help="""Engine/Client Port pair for Control queue""")
144 144
145 145 def _control_default(self):
146 146 return tuple(util.select_random_ports(2))
147 147
148 148 iopub = Tuple(Integer,Integer,config=True,
149 149 help="""Engine/Client Port pair for IOPub relay""")
150 150
151 151 def _iopub_default(self):
152 152 return tuple(util.select_random_ports(2))
153 153
154 154 # single ports:
155 155 mon_port = Integer(config=True,
156 156 help="""Monitor (SUB) port for queue traffic""")
157 157
158 158 def _mon_port_default(self):
159 159 return util.select_random_ports(1)[0]
160 160
161 161 notifier_port = Integer(config=True,
162 162 help="""PUB port for sending engine status notifications""")
163 163
164 164 def _notifier_port_default(self):
165 165 return util.select_random_ports(1)[0]
166 166
167 167 engine_ip = Unicode('127.0.0.1', config=True,
168 168 help="IP on which to listen for engine connections. [default: loopback]")
169 169 engine_transport = Unicode('tcp', config=True,
170 170 help="0MQ transport for engine connections. [default: tcp]")
171 171
172 172 client_ip = Unicode('127.0.0.1', config=True,
173 173 help="IP on which to listen for client connections. [default: loopback]")
174 174 client_transport = Unicode('tcp', config=True,
175 175 help="0MQ transport for client connections. [default : tcp]")
176 176
177 177 monitor_ip = Unicode('127.0.0.1', config=True,
178 178 help="IP on which to listen for monitor messages. [default: loopback]")
179 179 monitor_transport = Unicode('tcp', config=True,
180 180 help="0MQ transport for monitor messages. [default : tcp]")
181 181
182 182 monitor_url = Unicode('')
183 183
184 184 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
185 185 config=True, help="""The class to use for the DB backend""")
186 186
187 187 # not configurable
188 188 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
189 189 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
190 190
191 191 def _ip_changed(self, name, old, new):
192 192 self.engine_ip = new
193 193 self.client_ip = new
194 194 self.monitor_ip = new
195 195 self._update_monitor_url()
196 196
197 197 def _update_monitor_url(self):
198 198 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
199 199
200 200 def _transport_changed(self, name, old, new):
201 201 self.engine_transport = new
202 202 self.client_transport = new
203 203 self.monitor_transport = new
204 204 self._update_monitor_url()
205 205
206 206 def __init__(self, **kwargs):
207 207 super(HubFactory, self).__init__(**kwargs)
208 208 self._update_monitor_url()
209 209
210 210
211 211 def construct(self):
212 212 self.init_hub()
213 213
214 214 def start(self):
215 215 self.heartmonitor.start()
216 216 self.log.info("Heartmonitor started")
217 217
218 218 def init_hub(self):
219 219 """construct"""
220 220 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
221 221 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
222 222
223 223 ctx = self.context
224 224 loop = self.loop
225 225
226 226 # Registrar socket
227 227 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
228 228 q.bind(client_iface % self.regport)
229 229 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
230 230 if self.client_ip != self.engine_ip:
231 231 q.bind(engine_iface % self.regport)
232 232 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
233 233
234 234 ### Engine connections ###
235 235
236 236 # heartbeat
237 237 hpub = ctx.socket(zmq.PUB)
238 238 hpub.bind(engine_iface % self.hb[0])
239 239 hrep = ctx.socket(zmq.ROUTER)
240 240 hrep.bind(engine_iface % self.hb[1])
241 241 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
242 242 pingstream=ZMQStream(hpub,loop),
243 243 pongstream=ZMQStream(hrep,loop)
244 244 )
245 245
246 246 ### Client connections ###
247 247 # Notifier socket
248 248 n = ZMQStream(ctx.socket(zmq.PUB), loop)
249 249 n.bind(client_iface%self.notifier_port)
250 250
251 251 ### build and launch the queues ###
252 252
253 253 # monitor socket
254 254 sub = ctx.socket(zmq.SUB)
255 255 sub.setsockopt(zmq.SUBSCRIBE, b"")
256 256 sub.bind(self.monitor_url)
257 257 sub.bind('inproc://monitor')
258 258 sub = ZMQStream(sub, loop)
259 259
260 260 # connect the db
261 261 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
262 262 # cdir = self.config.Global.cluster_dir
263 263 self.db = import_item(str(self.db_class))(session=self.session.session,
264 264 config=self.config, log=self.log)
265 265 time.sleep(.25)
266 266 try:
267 267 scheme = self.config.TaskScheduler.scheme_name
268 268 except AttributeError:
269 269 from .scheduler import TaskScheduler
270 270 scheme = TaskScheduler.scheme_name.get_default_value()
271 271 # build connection dicts
272 272 self.engine_info = {
273 273 'control' : engine_iface%self.control[1],
274 274 'mux': engine_iface%self.mux[1],
275 275 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
276 276 'task' : engine_iface%self.task[1],
277 277 'iopub' : engine_iface%self.iopub[1],
278 278 # 'monitor' : engine_iface%self.mon_port,
279 279 }
280 280
281 281 self.client_info = {
282 282 'control' : client_iface%self.control[0],
283 283 'mux': client_iface%self.mux[0],
284 284 'task' : (scheme, client_iface%self.task[0]),
285 285 'iopub' : client_iface%self.iopub[0],
286 286 'notification': client_iface%self.notifier_port
287 287 }
288 288 self.log.debug("Hub engine addrs: %s", self.engine_info)
289 289 self.log.debug("Hub client addrs: %s", self.client_info)
290 290
291 291 # resubmit stream
292 292 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
293 293 url = util.disambiguate_url(self.client_info['task'][-1])
294 294 r.setsockopt(zmq.IDENTITY, self.session.bsession)
295 295 r.connect(url)
296 296
297 297 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
298 298 query=q, notifier=n, resubmit=r, db=self.db,
299 299 engine_info=self.engine_info, client_info=self.client_info,
300 300 log=self.log)
301 301
302 302
303 303 class Hub(SessionFactory):
304 304 """The IPython Controller Hub with 0MQ connections
305 305
306 306 Parameters
307 307 ==========
308 308 loop: zmq IOLoop instance
309 309 session: Session object
310 310 <removed> context: zmq context for creating new connections (?)
311 311 queue: ZMQStream for monitoring the command queue (SUB)
312 312 query: ZMQStream for engine registration and client queries requests (XREP)
313 313 heartbeat: HeartMonitor object checking the pulse of the engines
314 314 notifier: ZMQStream for broadcasting engine registration changes (PUB)
315 315 db: connection to db for out of memory logging of commands
316 316 NotImplemented
317 317 engine_info: dict of zmq connection information for engines to connect
318 318 to the queues.
319 319 client_info: dict of zmq connection information for engines to connect
320 320 to the queues.
321 321 """
322 322 # internal data structures:
323 323 ids=Set() # engine IDs
324 324 keytable=Dict()
325 325 by_ident=Dict()
326 326 engines=Dict()
327 327 clients=Dict()
328 328 hearts=Dict()
329 329 pending=Set()
330 330 queues=Dict() # pending msg_ids keyed by engine_id
331 331 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
332 332 completed=Dict() # completed msg_ids keyed by engine_id
333 333 all_completed=Set() # completed msg_ids keyed by engine_id
334 334 dead_engines=Set() # completed msg_ids keyed by engine_id
335 335 unassigned=Set() # set of task msg_ds not yet assigned a destination
336 336 incoming_registrations=Dict()
337 337 registration_timeout=Integer()
338 338 _idcounter=Integer(0)
339 339
340 340 # objects from constructor:
341 341 query=Instance(ZMQStream)
342 342 monitor=Instance(ZMQStream)
343 343 notifier=Instance(ZMQStream)
344 344 resubmit=Instance(ZMQStream)
345 345 heartmonitor=Instance(HeartMonitor)
346 346 db=Instance(object)
347 347 client_info=Dict()
348 348 engine_info=Dict()
349 349
350 350
351 351 def __init__(self, **kwargs):
352 352 """
353 353 # universal:
354 354 loop: IOLoop for creating future connections
355 355 session: streamsession for sending serialized data
356 356 # engine:
357 357 queue: ZMQStream for monitoring queue messages
358 358 query: ZMQStream for engine+client registration and client requests
359 359 heartbeat: HeartMonitor object for tracking engines
360 360 # extra:
361 361 db: ZMQStream for db connection (NotImplemented)
362 362 engine_info: zmq address/protocol dict for engine connections
363 363 client_info: zmq address/protocol dict for client connections
364 364 """
365 365
366 366 super(Hub, self).__init__(**kwargs)
367 367 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
368 368
369 369 # validate connection dicts:
370 370 for k,v in self.client_info.iteritems():
371 371 if k == 'task':
372 372 util.validate_url_container(v[1])
373 373 else:
374 374 util.validate_url_container(v)
375 375 # util.validate_url_container(self.client_info)
376 376 util.validate_url_container(self.engine_info)
377 377
378 378 # register our callbacks
379 379 self.query.on_recv(self.dispatch_query)
380 380 self.monitor.on_recv(self.dispatch_monitor_traffic)
381 381
382 382 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
383 383 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
384 384
385 385 self.monitor_handlers = {b'in' : self.save_queue_request,
386 386 b'out': self.save_queue_result,
387 387 b'intask': self.save_task_request,
388 388 b'outtask': self.save_task_result,
389 389 b'tracktask': self.save_task_destination,
390 390 b'incontrol': _passer,
391 391 b'outcontrol': _passer,
392 392 b'iopub': self.save_iopub_message,
393 393 }
394 394
395 395 self.query_handlers = {'queue_request': self.queue_status,
396 396 'result_request': self.get_results,
397 397 'history_request': self.get_history,
398 398 'db_request': self.db_query,
399 399 'purge_request': self.purge_results,
400 400 'load_request': self.check_load,
401 401 'resubmit_request': self.resubmit_task,
402 402 'shutdown_request': self.shutdown_request,
403 403 'registration_request' : self.register_engine,
404 404 'unregistration_request' : self.unregister_engine,
405 405 'connection_request': self.connection_request,
406 406 }
407 407
408 408 # ignore resubmit replies
409 409 self.resubmit.on_recv(lambda msg: None, copy=False)
410 410
411 411 self.log.info("hub::created hub")
412 412
413 413 @property
414 414 def _next_id(self):
415 415 """gemerate a new ID.
416 416
417 417 No longer reuse old ids, just count from 0."""
418 418 newid = self._idcounter
419 419 self._idcounter += 1
420 420 return newid
421 421 # newid = 0
422 422 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
423 423 # # print newid, self.ids, self.incoming_registrations
424 424 # while newid in self.ids or newid in incoming:
425 425 # newid += 1
426 426 # return newid
427 427
428 428 #-----------------------------------------------------------------------------
429 429 # message validation
430 430 #-----------------------------------------------------------------------------
431 431
432 432 def _validate_targets(self, targets):
433 433 """turn any valid targets argument into a list of integer ids"""
434 434 if targets is None:
435 435 # default to all
436 436 return self.ids
437 437
438 438 if isinstance(targets, (int,str,unicode)):
439 439 # only one target specified
440 440 targets = [targets]
441 441 _targets = []
442 442 for t in targets:
443 443 # map raw identities to ids
444 444 if isinstance(t, (str,unicode)):
445 445 t = self.by_ident.get(cast_bytes(t), t)
446 446 _targets.append(t)
447 447 targets = _targets
448 448 bad_targets = [ t for t in targets if t not in self.ids ]
449 449 if bad_targets:
450 450 raise IndexError("No Such Engine: %r" % bad_targets)
451 451 if not targets:
452 452 raise IndexError("No Engines Registered")
453 453 return targets
454 454
455 455 #-----------------------------------------------------------------------------
456 456 # dispatch methods (1 per stream)
457 457 #-----------------------------------------------------------------------------
458 458
459 459
460 460 @util.log_errors
461 461 def dispatch_monitor_traffic(self, msg):
462 462 """all ME and Task queue messages come through here, as well as
463 463 IOPub traffic."""
464 464 self.log.debug("monitor traffic: %r", msg[0])
465 465 switch = msg[0]
466 466 try:
467 467 idents, msg = self.session.feed_identities(msg[1:])
468 468 except ValueError:
469 469 idents=[]
470 470 if not idents:
471 471 self.log.error("Monitor message without topic: %r", msg)
472 472 return
473 473 handler = self.monitor_handlers.get(switch, None)
474 474 if handler is not None:
475 475 handler(idents, msg)
476 476 else:
477 477 self.log.error("Unrecognized monitor topic: %r", switch)
478 478
479 479
480 480 @util.log_errors
481 481 def dispatch_query(self, msg):
482 482 """Route registration requests and queries from clients."""
483 483 try:
484 484 idents, msg = self.session.feed_identities(msg)
485 485 except ValueError:
486 486 idents = []
487 487 if not idents:
488 488 self.log.error("Bad Query Message: %r", msg)
489 489 return
490 490 client_id = idents[0]
491 491 try:
492 492 msg = self.session.unserialize(msg, content=True)
493 493 except Exception:
494 494 content = error.wrap_exception()
495 495 self.log.error("Bad Query Message: %r", msg, exc_info=True)
496 496 self.session.send(self.query, "hub_error", ident=client_id,
497 497 content=content)
498 498 return
499 499 # print client_id, header, parent, content
500 500 #switch on message type:
501 501 msg_type = msg['header']['msg_type']
502 502 self.log.info("client::client %r requested %r", client_id, msg_type)
503 503 handler = self.query_handlers.get(msg_type, None)
504 504 try:
505 505 assert handler is not None, "Bad Message Type: %r" % msg_type
506 506 except:
507 507 content = error.wrap_exception()
508 508 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
509 509 self.session.send(self.query, "hub_error", ident=client_id,
510 510 content=content)
511 511 return
512 512
513 513 else:
514 514 handler(idents, msg)
515 515
516 516 def dispatch_db(self, msg):
517 517 """"""
518 518 raise NotImplementedError
519 519
520 520 #---------------------------------------------------------------------------
521 521 # handler methods (1 per event)
522 522 #---------------------------------------------------------------------------
523 523
524 524 #----------------------- Heartbeat --------------------------------------
525 525
526 526 def handle_new_heart(self, heart):
527 527 """handler to attach to heartbeater.
528 528 Called when a new heart starts to beat.
529 529 Triggers completion of registration."""
530 530 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
531 531 if heart not in self.incoming_registrations:
532 532 self.log.info("heartbeat::ignoring new heart: %r", heart)
533 533 else:
534 534 self.finish_registration(heart)
535 535
536 536
537 537 def handle_heart_failure(self, heart):
538 538 """handler to attach to heartbeater.
539 539 called when a previously registered heart fails to respond to beat request.
540 540 triggers unregistration"""
541 541 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
542 542 eid = self.hearts.get(heart, None)
543 543 queue = self.engines[eid].queue
544 544 if eid is None or self.keytable[eid] in self.dead_engines:
545 545 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
546 546 else:
547 547 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
548 548
549 549 #----------------------- MUX Queue Traffic ------------------------------
550 550
551 551 def save_queue_request(self, idents, msg):
552 552 if len(idents) < 2:
553 553 self.log.error("invalid identity prefix: %r", idents)
554 554 return
555 555 queue_id, client_id = idents[:2]
556 556 try:
557 557 msg = self.session.unserialize(msg)
558 558 except Exception:
559 559 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
560 560 return
561 561
562 562 eid = self.by_ident.get(queue_id, None)
563 563 if eid is None:
564 564 self.log.error("queue::target %r not registered", queue_id)
565 565 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
566 566 return
567 567 record = init_record(msg)
568 568 msg_id = record['msg_id']
569 569 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
570 570 # Unicode in records
571 571 record['engine_uuid'] = queue_id.decode('ascii')
572 572 record['client_uuid'] = client_id.decode('ascii')
573 573 record['queue'] = 'mux'
574 574
575 575 try:
576 576 # it's posible iopub arrived first:
577 577 existing = self.db.get_record(msg_id)
578 578 for key,evalue in existing.iteritems():
579 579 rvalue = record.get(key, None)
580 580 if evalue and rvalue and evalue != rvalue:
581 581 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
582 582 elif evalue and not rvalue:
583 583 record[key] = evalue
584 584 try:
585 585 self.db.update_record(msg_id, record)
586 586 except Exception:
587 587 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
588 588 except KeyError:
589 589 try:
590 590 self.db.add_record(msg_id, record)
591 591 except Exception:
592 592 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
593 593
594 594
595 595 self.pending.add(msg_id)
596 596 self.queues[eid].append(msg_id)
597 597
598 598 def save_queue_result(self, idents, msg):
599 599 if len(idents) < 2:
600 600 self.log.error("invalid identity prefix: %r", idents)
601 601 return
602 602
603 603 client_id, queue_id = idents[:2]
604 604 try:
605 605 msg = self.session.unserialize(msg)
606 606 except Exception:
607 607 self.log.error("queue::engine %r sent invalid message to %r: %r",
608 608 queue_id, client_id, msg, exc_info=True)
609 609 return
610 610
611 611 eid = self.by_ident.get(queue_id, None)
612 612 if eid is None:
613 613 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
614 614 return
615 615
616 616 parent = msg['parent_header']
617 617 if not parent:
618 618 return
619 619 msg_id = parent['msg_id']
620 620 if msg_id in self.pending:
621 621 self.pending.remove(msg_id)
622 622 self.all_completed.add(msg_id)
623 623 self.queues[eid].remove(msg_id)
624 624 self.completed[eid].append(msg_id)
625 625 self.log.info("queue::request %r completed on %s", msg_id, eid)
626 626 elif msg_id not in self.all_completed:
627 627 # it could be a result from a dead engine that died before delivering the
628 628 # result
629 629 self.log.warn("queue:: unknown msg finished %r", msg_id)
630 630 return
631 631 # update record anyway, because the unregistration could have been premature
632 632 rheader = msg['header']
633 633 completed = rheader['date']
634 634 started = rheader.get('started', None)
635 635 result = {
636 636 'result_header' : rheader,
637 637 'result_content': msg['content'],
638 638 'received': datetime.now(),
639 639 'started' : started,
640 640 'completed' : completed
641 641 }
642 642
643 643 result['result_buffers'] = msg['buffers']
644 644 try:
645 645 self.db.update_record(msg_id, result)
646 646 except Exception:
647 647 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
648 648
649 649
650 650 #--------------------- Task Queue Traffic ------------------------------
651 651
652 652 def save_task_request(self, idents, msg):
653 653 """Save the submission of a task."""
654 654 client_id = idents[0]
655 655
656 656 try:
657 657 msg = self.session.unserialize(msg)
658 658 except Exception:
659 659 self.log.error("task::client %r sent invalid task message: %r",
660 660 client_id, msg, exc_info=True)
661 661 return
662 662 record = init_record(msg)
663 663
664 664 record['client_uuid'] = client_id.decode('ascii')
665 665 record['queue'] = 'task'
666 666 header = msg['header']
667 667 msg_id = header['msg_id']
668 668 self.pending.add(msg_id)
669 669 self.unassigned.add(msg_id)
670 670 try:
671 671 # it's posible iopub arrived first:
672 672 existing = self.db.get_record(msg_id)
673 673 if existing['resubmitted']:
674 674 for key in ('submitted', 'client_uuid', 'buffers'):
675 675 # don't clobber these keys on resubmit
676 676 # submitted and client_uuid should be different
677 677 # and buffers might be big, and shouldn't have changed
678 678 record.pop(key)
679 679 # still check content,header which should not change
680 680 # but are not expensive to compare as buffers
681 681
682 682 for key,evalue in existing.iteritems():
683 683 if key.endswith('buffers'):
684 684 # don't compare buffers
685 685 continue
686 686 rvalue = record.get(key, None)
687 687 if evalue and rvalue and evalue != rvalue:
688 688 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
689 689 elif evalue and not rvalue:
690 690 record[key] = evalue
691 691 try:
692 692 self.db.update_record(msg_id, record)
693 693 except Exception:
694 694 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
695 695 except KeyError:
696 696 try:
697 697 self.db.add_record(msg_id, record)
698 698 except Exception:
699 699 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
700 700 except Exception:
701 701 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
702 702
703 703 def save_task_result(self, idents, msg):
704 704 """save the result of a completed task."""
705 705 client_id = idents[0]
706 706 try:
707 707 msg = self.session.unserialize(msg)
708 708 except Exception:
709 709 self.log.error("task::invalid task result message send to %r: %r",
710 710 client_id, msg, exc_info=True)
711 711 return
712 712
713 713 parent = msg['parent_header']
714 714 if not parent:
715 715 # print msg
716 716 self.log.warn("Task %r had no parent!", msg)
717 717 return
718 718 msg_id = parent['msg_id']
719 719 if msg_id in self.unassigned:
720 720 self.unassigned.remove(msg_id)
721 721
722 722 header = msg['header']
723 723 engine_uuid = header.get('engine', u'')
724 724 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
725 725
726 726 status = header.get('status', None)
727 727
728 728 if msg_id in self.pending:
729 729 self.log.info("task::task %r finished on %s", msg_id, eid)
730 730 self.pending.remove(msg_id)
731 731 self.all_completed.add(msg_id)
732 732 if eid is not None:
733 733 if status != 'aborted':
734 734 self.completed[eid].append(msg_id)
735 735 if msg_id in self.tasks[eid]:
736 736 self.tasks[eid].remove(msg_id)
737 737 completed = header['date']
738 738 started = header.get('started', None)
739 739 result = {
740 740 'result_header' : header,
741 741 'result_content': msg['content'],
742 742 'started' : started,
743 743 'completed' : completed,
744 744 'received' : datetime.now(),
745 745 'engine_uuid': engine_uuid,
746 746 }
747 747
748 748 result['result_buffers'] = msg['buffers']
749 749 try:
750 750 self.db.update_record(msg_id, result)
751 751 except Exception:
752 752 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
753 753
754 754 else:
755 755 self.log.debug("task::unknown task %r finished", msg_id)
756 756
757 757 def save_task_destination(self, idents, msg):
758 758 try:
759 759 msg = self.session.unserialize(msg, content=True)
760 760 except Exception:
761 761 self.log.error("task::invalid task tracking message", exc_info=True)
762 762 return
763 763 content = msg['content']
764 764 # print (content)
765 765 msg_id = content['msg_id']
766 766 engine_uuid = content['engine_id']
767 767 eid = self.by_ident[cast_bytes(engine_uuid)]
768 768
769 769 self.log.info("task::task %r arrived on %r", msg_id, eid)
770 770 if msg_id in self.unassigned:
771 771 self.unassigned.remove(msg_id)
772 772 # else:
773 773 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
774 774
775 775 self.tasks[eid].append(msg_id)
776 776 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
777 777 try:
778 778 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
779 779 except Exception:
780 780 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
781 781
782 782
783 783 def mia_task_request(self, idents, msg):
784 784 raise NotImplementedError
785 785 client_id = idents[0]
786 786 # content = dict(mia=self.mia,status='ok')
787 787 # self.session.send('mia_reply', content=content, idents=client_id)
788 788
789 789
790 790 #--------------------- IOPub Traffic ------------------------------
791 791
792 792 def save_iopub_message(self, topics, msg):
793 793 """save an iopub message into the db"""
794 794 # print (topics)
795 795 try:
796 796 msg = self.session.unserialize(msg, content=True)
797 797 except Exception:
798 798 self.log.error("iopub::invalid IOPub message", exc_info=True)
799 799 return
800 800
801 801 parent = msg['parent_header']
802 802 if not parent:
803 803 self.log.error("iopub::invalid IOPub message: %r", msg)
804 804 return
805 805 msg_id = parent['msg_id']
806 806 msg_type = msg['header']['msg_type']
807 807 content = msg['content']
808 808
809 809 # ensure msg_id is in db
810 810 try:
811 811 rec = self.db.get_record(msg_id)
812 812 except KeyError:
813 813 rec = empty_record()
814 814 rec['msg_id'] = msg_id
815 815 self.db.add_record(msg_id, rec)
816 816 # stream
817 817 d = {}
818 818 if msg_type == 'stream':
819 819 name = content['name']
820 820 s = rec[name] or ''
821 821 d[name] = s + content['data']
822 822
823 823 elif msg_type == 'pyerr':
824 824 d['pyerr'] = content
825 825 elif msg_type == 'pyin':
826 826 d['pyin'] = content['code']
827 827 else:
828 828 d[msg_type] = content.get('data', '')
829 829
830 830 try:
831 831 self.db.update_record(msg_id, d)
832 832 except Exception:
833 833 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
834 834
835 835
836 836
837 837 #-------------------------------------------------------------------------
838 838 # Registration requests
839 839 #-------------------------------------------------------------------------
840 840
841 841 def connection_request(self, client_id, msg):
842 842 """Reply with connection addresses for clients."""
843 843 self.log.info("client::client %r connected", client_id)
844 844 content = dict(status='ok')
845 845 content.update(self.client_info)
846 846 jsonable = {}
847 847 for k,v in self.keytable.iteritems():
848 848 if v not in self.dead_engines:
849 849 jsonable[str(k)] = v.decode('ascii')
850 850 content['engines'] = jsonable
851 851 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
852 852
853 853 def register_engine(self, reg, msg):
854 854 """Register a new engine."""
855 855 content = msg['content']
856 856 try:
857 857 queue = cast_bytes(content['queue'])
858 858 except KeyError:
859 859 self.log.error("registration::queue not specified", exc_info=True)
860 860 return
861 861 heart = content.get('heartbeat', None)
862 862 if heart:
863 863 heart = cast_bytes(heart)
864 864 """register a new engine, and create the socket(s) necessary"""
865 865 eid = self._next_id
866 866 # print (eid, queue, reg, heart)
867 867
868 868 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
869 869
870 870 content = dict(id=eid,status='ok')
871 871 content.update(self.engine_info)
872 872 # check if requesting available IDs:
873 873 if queue in self.by_ident:
874 874 try:
875 875 raise KeyError("queue_id %r in use" % queue)
876 876 except:
877 877 content = error.wrap_exception()
878 878 self.log.error("queue_id %r in use", queue, exc_info=True)
879 879 elif heart in self.hearts: # need to check unique hearts?
880 880 try:
881 881 raise KeyError("heart_id %r in use" % heart)
882 882 except:
883 883 self.log.error("heart_id %r in use", heart, exc_info=True)
884 884 content = error.wrap_exception()
885 885 else:
886 886 for h, pack in self.incoming_registrations.iteritems():
887 887 if heart == h:
888 888 try:
889 889 raise KeyError("heart_id %r in use" % heart)
890 890 except:
891 891 self.log.error("heart_id %r in use", heart, exc_info=True)
892 892 content = error.wrap_exception()
893 893 break
894 894 elif queue == pack[1]:
895 895 try:
896 896 raise KeyError("queue_id %r in use" % queue)
897 897 except:
898 898 self.log.error("queue_id %r in use", queue, exc_info=True)
899 899 content = error.wrap_exception()
900 900 break
901 901
902 902 msg = self.session.send(self.query, "registration_reply",
903 903 content=content,
904 904 ident=reg)
905 905
906 906 if content['status'] == 'ok':
907 907 if heart in self.heartmonitor.hearts:
908 908 # already beating
909 909 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
910 910 self.finish_registration(heart)
911 911 else:
912 912 purge = lambda : self._purge_stalled_registration(heart)
913 913 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
914 914 dc.start()
915 915 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
916 916 else:
917 917 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
918 918 return eid
919 919
920 920 def unregister_engine(self, ident, msg):
921 921 """Unregister an engine that explicitly requested to leave."""
922 922 try:
923 923 eid = msg['content']['id']
924 924 except:
925 925 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
926 926 return
927 927 self.log.info("registration::unregister_engine(%r)", eid)
928 928 # print (eid)
929 929 uuid = self.keytable[eid]
930 930 content=dict(id=eid, queue=uuid.decode('ascii'))
931 931 self.dead_engines.add(uuid)
932 932 # self.ids.remove(eid)
933 933 # uuid = self.keytable.pop(eid)
934 934 #
935 935 # ec = self.engines.pop(eid)
936 936 # self.hearts.pop(ec.heartbeat)
937 937 # self.by_ident.pop(ec.queue)
938 938 # self.completed.pop(eid)
939 939 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
940 940 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
941 941 dc.start()
942 942 ############## TODO: HANDLE IT ################
943 943
944 944 if self.notifier:
945 945 self.session.send(self.notifier, "unregistration_notification", content=content)
946 946
947 947 def _handle_stranded_msgs(self, eid, uuid):
948 948 """Handle messages known to be on an engine when the engine unregisters.
949 949
950 950 It is possible that this will fire prematurely - that is, an engine will
951 951 go down after completing a result, and the client will be notified
952 952 that the result failed and later receive the actual result.
953 953 """
954 954
955 955 outstanding = self.queues[eid]
956 956
957 957 for msg_id in outstanding:
958 958 self.pending.remove(msg_id)
959 959 self.all_completed.add(msg_id)
960 960 try:
961 961 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
962 962 except:
963 963 content = error.wrap_exception()
964 964 # build a fake header:
965 965 header = {}
966 966 header['engine'] = uuid
967 967 header['date'] = datetime.now()
968 968 rec = dict(result_content=content, result_header=header, result_buffers=[])
969 969 rec['completed'] = header['date']
970 970 rec['engine_uuid'] = uuid
971 971 try:
972 972 self.db.update_record(msg_id, rec)
973 973 except Exception:
974 974 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
975 975
976 976
977 977 def finish_registration(self, heart):
978 978 """Second half of engine registration, called after our HeartMonitor
979 979 has received a beat from the Engine's Heart."""
980 980 try:
981 981 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
982 982 except KeyError:
983 983 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
984 984 return
985 985 self.log.info("registration::finished registering engine %i:%r", eid, queue)
986 986 if purge is not None:
987 987 purge.stop()
988 988 control = queue
989 989 self.ids.add(eid)
990 990 self.keytable[eid] = queue
991 991 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
992 992 control=control, heartbeat=heart)
993 993 self.by_ident[queue] = eid
994 994 self.queues[eid] = list()
995 995 self.tasks[eid] = list()
996 996 self.completed[eid] = list()
997 997 self.hearts[heart] = eid
998 998 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
999 999 if self.notifier:
1000 1000 self.session.send(self.notifier, "registration_notification", content=content)
1001 1001 self.log.info("engine::Engine Connected: %i", eid)
1002 1002
1003 1003 def _purge_stalled_registration(self, heart):
1004 1004 if heart in self.incoming_registrations:
1005 1005 eid = self.incoming_registrations.pop(heart)[0]
1006 1006 self.log.info("registration::purging stalled registration: %i", eid)
1007 1007 else:
1008 1008 pass
1009 1009
1010 1010 #-------------------------------------------------------------------------
1011 1011 # Client Requests
1012 1012 #-------------------------------------------------------------------------
1013 1013
1014 1014 def shutdown_request(self, client_id, msg):
1015 1015 """handle shutdown request."""
1016 1016 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1017 1017 # also notify other clients of shutdown
1018 1018 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1019 1019 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1020 1020 dc.start()
1021 1021
1022 1022 def _shutdown(self):
1023 1023 self.log.info("hub::hub shutting down.")
1024 1024 time.sleep(0.1)
1025 1025 sys.exit(0)
1026 1026
1027 1027
1028 1028 def check_load(self, client_id, msg):
1029 1029 content = msg['content']
1030 1030 try:
1031 1031 targets = content['targets']
1032 1032 targets = self._validate_targets(targets)
1033 1033 except:
1034 1034 content = error.wrap_exception()
1035 1035 self.session.send(self.query, "hub_error",
1036 1036 content=content, ident=client_id)
1037 1037 return
1038 1038
1039 1039 content = dict(status='ok')
1040 1040 # loads = {}
1041 1041 for t in targets:
1042 1042 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1043 1043 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1044 1044
1045 1045
1046 1046 def queue_status(self, client_id, msg):
1047 1047 """Return the Queue status of one or more targets.
1048 1048 if verbose: return the msg_ids
1049 1049 else: return len of each type.
1050 1050 keys: queue (pending MUX jobs)
1051 1051 tasks (pending Task jobs)
1052 1052 completed (finished jobs from both queues)"""
1053 1053 content = msg['content']
1054 1054 targets = content['targets']
1055 1055 try:
1056 1056 targets = self._validate_targets(targets)
1057 1057 except:
1058 1058 content = error.wrap_exception()
1059 1059 self.session.send(self.query, "hub_error",
1060 1060 content=content, ident=client_id)
1061 1061 return
1062 1062 verbose = content.get('verbose', False)
1063 1063 content = dict(status='ok')
1064 1064 for t in targets:
1065 1065 queue = self.queues[t]
1066 1066 completed = self.completed[t]
1067 1067 tasks = self.tasks[t]
1068 1068 if not verbose:
1069 1069 queue = len(queue)
1070 1070 completed = len(completed)
1071 1071 tasks = len(tasks)
1072 1072 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1073 1073 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1074 1074 # print (content)
1075 1075 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1076 1076
1077 1077 def purge_results(self, client_id, msg):
1078 1078 """Purge results from memory. This method is more valuable before we move
1079 1079 to a DB based message storage mechanism."""
1080 1080 content = msg['content']
1081 1081 self.log.info("Dropping records with %s", content)
1082 1082 msg_ids = content.get('msg_ids', [])
1083 1083 reply = dict(status='ok')
1084 1084 if msg_ids == 'all':
1085 1085 try:
1086 1086 self.db.drop_matching_records(dict(completed={'$ne':None}))
1087 1087 except Exception:
1088 1088 reply = error.wrap_exception()
1089 1089 else:
1090 1090 pending = filter(lambda m: m in self.pending, msg_ids)
1091 1091 if pending:
1092 1092 try:
1093 1093 raise IndexError("msg pending: %r" % pending[0])
1094 1094 except:
1095 1095 reply = error.wrap_exception()
1096 1096 else:
1097 1097 try:
1098 1098 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1099 1099 except Exception:
1100 1100 reply = error.wrap_exception()
1101 1101
1102 1102 if reply['status'] == 'ok':
1103 1103 eids = content.get('engine_ids', [])
1104 1104 for eid in eids:
1105 1105 if eid not in self.engines:
1106 1106 try:
1107 1107 raise IndexError("No such engine: %i" % eid)
1108 1108 except:
1109 1109 reply = error.wrap_exception()
1110 1110 break
1111 1111 uid = self.engines[eid].queue
1112 1112 try:
1113 1113 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1114 1114 except Exception:
1115 1115 reply = error.wrap_exception()
1116 1116 break
1117 1117
1118 1118 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1119 1119
1120 1120 def resubmit_task(self, client_id, msg):
1121 1121 """Resubmit one or more tasks."""
1122 1122 def finish(reply):
1123 1123 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1124 1124
1125 1125 content = msg['content']
1126 1126 msg_ids = content['msg_ids']
1127 1127 reply = dict(status='ok')
1128 1128 try:
1129 1129 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1130 1130 'header', 'content', 'buffers'])
1131 1131 except Exception:
1132 1132 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1133 1133 return finish(error.wrap_exception())
1134 1134
1135 1135 # validate msg_ids
1136 1136 found_ids = [ rec['msg_id'] for rec in records ]
1137 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1137 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1138 1138 if len(records) > len(msg_ids):
1139 1139 try:
1140 1140 raise RuntimeError("DB appears to be in an inconsistent state."
1141 1141 "More matching records were found than should exist")
1142 1142 except Exception:
1143 1143 return finish(error.wrap_exception())
1144 1144 elif len(records) < len(msg_ids):
1145 1145 missing = [ m for m in msg_ids if m not in found_ids ]
1146 1146 try:
1147 1147 raise KeyError("No such msg(s): %r" % missing)
1148 1148 except KeyError:
1149 1149 return finish(error.wrap_exception())
1150 elif invalid_ids:
1151 msg_id = invalid_ids[0]
1150 elif pending_ids:
1151 pass
1152 # no need to raise on resubmit of pending task, now that we
1153 # resubmit under new ID, but do we want to raise anyway?
1154 # msg_id = invalid_ids[0]
1155 # try:
1156 # raise ValueError("Task(s) %r appears to be inflight" % )
1157 # except Exception:
1158 # return finish(error.wrap_exception())
1159
1160 # mapping of original IDs to resubmitted IDs
1161 resubmitted = {}
1162
1163 # send the messages
1164 for rec in records:
1165 header = rec['header']
1166 msg = self.session.msg(header['msg_type'])
1167 msg_id = msg['msg_id']
1168 msg['content'] = rec['content']
1169 header.update(msg['header'])
1170 msg['header'] = header
1171
1172 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1173
1174 resubmitted[rec['msg_id']] = msg_id
1175 self.pending.add(msg_id)
1176 msg['buffers'] = []
1152 1177 try:
1153 raise ValueError("Task %r appears to be inflight" % msg_id)
1178 self.db.add_record(msg_id, init_record(msg))
1154 1179 except Exception:
1155 return finish(error.wrap_exception())
1180 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1156 1181
1157 # clear the existing records
1158 now = datetime.now()
1159 rec = empty_record()
1160 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1161 rec['resubmitted'] = now
1162 rec['queue'] = 'task'
1163 rec['client_uuid'] = client_id[0]
1164 try:
1165 for msg_id in msg_ids:
1166 self.all_completed.discard(msg_id)
1167 self.db.update_record(msg_id, rec)
1168 except Exception:
1169 self.log.error('db::db error upating record', exc_info=True)
1170 reply = error.wrap_exception()
1171 else:
1172 # send the messages
1173 for rec in records:
1174 header = rec['header']
1175 # include resubmitted in header to prevent digest collision
1176 header['resubmitted'] = now
1177 msg = self.session.msg(header['msg_type'])
1178 msg['content'] = rec['content']
1179 msg['header'] = header
1180 msg['header']['msg_id'] = rec['msg_id']
1181 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1182
1183 finish(dict(status='ok'))
1182 finish(dict(status='ok', resubmitted=resubmitted))
1183
1184 # store the new IDs in the Task DB
1185 for msg_id, resubmit_id in resubmitted.iteritems():
1186 try:
1187 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1188 except Exception:
1189 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1184 1190
1185 1191
1186 1192 def _extract_record(self, rec):
1187 1193 """decompose a TaskRecord dict into subsection of reply for get_result"""
1188 1194 io_dict = {}
1189 1195 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1190 1196 io_dict[key] = rec[key]
1191 1197 content = { 'result_content': rec['result_content'],
1192 1198 'header': rec['header'],
1193 1199 'result_header' : rec['result_header'],
1194 1200 'received' : rec['received'],
1195 1201 'io' : io_dict,
1196 1202 }
1197 1203 if rec['result_buffers']:
1198 1204 buffers = map(bytes, rec['result_buffers'])
1199 1205 else:
1200 1206 buffers = []
1201 1207
1202 1208 return content, buffers
1203 1209
1204 1210 def get_results(self, client_id, msg):
1205 1211 """Get the result of 1 or more messages."""
1206 1212 content = msg['content']
1207 1213 msg_ids = sorted(set(content['msg_ids']))
1208 1214 statusonly = content.get('status_only', False)
1209 1215 pending = []
1210 1216 completed = []
1211 1217 content = dict(status='ok')
1212 1218 content['pending'] = pending
1213 1219 content['completed'] = completed
1214 1220 buffers = []
1215 1221 if not statusonly:
1216 1222 try:
1217 1223 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1218 1224 # turn match list into dict, for faster lookup
1219 1225 records = {}
1220 1226 for rec in matches:
1221 1227 records[rec['msg_id']] = rec
1222 1228 except Exception:
1223 1229 content = error.wrap_exception()
1224 1230 self.session.send(self.query, "result_reply", content=content,
1225 1231 parent=msg, ident=client_id)
1226 1232 return
1227 1233 else:
1228 1234 records = {}
1229 1235 for msg_id in msg_ids:
1230 1236 if msg_id in self.pending:
1231 1237 pending.append(msg_id)
1232 1238 elif msg_id in self.all_completed:
1233 1239 completed.append(msg_id)
1234 1240 if not statusonly:
1235 1241 c,bufs = self._extract_record(records[msg_id])
1236 1242 content[msg_id] = c
1237 1243 buffers.extend(bufs)
1238 1244 elif msg_id in records:
1239 1245 if rec['completed']:
1240 1246 completed.append(msg_id)
1241 1247 c,bufs = self._extract_record(records[msg_id])
1242 1248 content[msg_id] = c
1243 1249 buffers.extend(bufs)
1244 1250 else:
1245 1251 pending.append(msg_id)
1246 1252 else:
1247 1253 try:
1248 1254 raise KeyError('No such message: '+msg_id)
1249 1255 except:
1250 1256 content = error.wrap_exception()
1251 1257 break
1252 1258 self.session.send(self.query, "result_reply", content=content,
1253 1259 parent=msg, ident=client_id,
1254 1260 buffers=buffers)
1255 1261
1256 1262 def get_history(self, client_id, msg):
1257 1263 """Get a list of all msg_ids in our DB records"""
1258 1264 try:
1259 1265 msg_ids = self.db.get_history()
1260 1266 except Exception as e:
1261 1267 content = error.wrap_exception()
1262 1268 else:
1263 1269 content = dict(status='ok', history=msg_ids)
1264 1270
1265 1271 self.session.send(self.query, "history_reply", content=content,
1266 1272 parent=msg, ident=client_id)
1267 1273
1268 1274 def db_query(self, client_id, msg):
1269 1275 """Perform a raw query on the task record database."""
1270 1276 content = msg['content']
1271 1277 query = content.get('query', {})
1272 1278 keys = content.get('keys', None)
1273 1279 buffers = []
1274 1280 empty = list()
1275 1281 try:
1276 1282 records = self.db.find_records(query, keys)
1277 1283 except Exception as e:
1278 1284 content = error.wrap_exception()
1279 1285 else:
1280 1286 # extract buffers from reply content:
1281 1287 if keys is not None:
1282 1288 buffer_lens = [] if 'buffers' in keys else None
1283 1289 result_buffer_lens = [] if 'result_buffers' in keys else None
1284 1290 else:
1285 1291 buffer_lens = None
1286 1292 result_buffer_lens = None
1287 1293
1288 1294 for rec in records:
1289 1295 # buffers may be None, so double check
1290 1296 b = rec.pop('buffers', empty) or empty
1291 1297 if buffer_lens is not None:
1292 1298 buffer_lens.append(len(b))
1293 1299 buffers.extend(b)
1294 1300 rb = rec.pop('result_buffers', empty) or empty
1295 1301 if result_buffer_lens is not None:
1296 1302 result_buffer_lens.append(len(rb))
1297 1303 buffers.extend(rb)
1298 1304 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1299 1305 result_buffer_lens=result_buffer_lens)
1300 1306 # self.log.debug (content)
1301 1307 self.session.send(self.query, "db_reply", content=content,
1302 1308 parent=msg, ident=client_id,
1303 1309 buffers=buffers)
1304 1310
@@ -1,412 +1,412 b''
1 1 """A TaskRecord backend using sqlite3
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 import json
15 15 import os
16 16 import cPickle as pickle
17 17 from datetime import datetime
18 18
19 19 try:
20 20 import sqlite3
21 21 except ImportError:
22 22 sqlite3 = None
23 23
24 24 from zmq.eventloop import ioloop
25 25
26 26 from IPython.utils.traitlets import Unicode, Instance, List, Dict
27 27 from .dictdb import BaseDB
28 28 from IPython.utils.jsonutil import date_default, extract_dates, squash_dates
29 29
30 30 #-----------------------------------------------------------------------------
31 31 # SQLite operators, adapters, and converters
32 32 #-----------------------------------------------------------------------------
33 33
34 34 try:
35 35 buffer
36 36 except NameError:
37 37 # py3k
38 38 buffer = memoryview
39 39
40 40 operators = {
41 41 '$lt' : "<",
42 42 '$gt' : ">",
43 43 # null is handled weird with ==,!=
44 44 '$eq' : "=",
45 45 '$ne' : "!=",
46 46 '$lte': "<=",
47 47 '$gte': ">=",
48 48 '$in' : ('=', ' OR '),
49 49 '$nin': ('!=', ' AND '),
50 50 # '$all': None,
51 51 # '$mod': None,
52 52 # '$exists' : None
53 53 }
54 54 null_operators = {
55 55 '=' : "IS NULL",
56 56 '!=' : "IS NOT NULL",
57 57 }
58 58
59 59 def _adapt_dict(d):
60 60 return json.dumps(d, default=date_default)
61 61
62 62 def _convert_dict(ds):
63 63 if ds is None:
64 64 return ds
65 65 else:
66 66 if isinstance(ds, bytes):
67 67 # If I understand the sqlite doc correctly, this will always be utf8
68 68 ds = ds.decode('utf8')
69 69 return extract_dates(json.loads(ds))
70 70
71 71 def _adapt_bufs(bufs):
72 72 # this is *horrible*
73 73 # copy buffers into single list and pickle it:
74 74 if bufs and isinstance(bufs[0], (bytes, buffer)):
75 75 return sqlite3.Binary(pickle.dumps(map(bytes, bufs),-1))
76 76 elif bufs:
77 77 return bufs
78 78 else:
79 79 return None
80 80
81 81 def _convert_bufs(bs):
82 82 if bs is None:
83 83 return []
84 84 else:
85 85 return pickle.loads(bytes(bs))
86 86
87 87 #-----------------------------------------------------------------------------
88 88 # SQLiteDB class
89 89 #-----------------------------------------------------------------------------
90 90
91 91 class SQLiteDB(BaseDB):
92 92 """SQLite3 TaskRecord backend."""
93 93
94 94 filename = Unicode('tasks.db', config=True,
95 95 help="""The filename of the sqlite task database. [default: 'tasks.db']""")
96 96 location = Unicode('', config=True,
97 97 help="""The directory containing the sqlite task database. The default
98 98 is to use the cluster_dir location.""")
99 99 table = Unicode("", config=True,
100 100 help="""The SQLite Table to use for storing tasks for this session. If unspecified,
101 101 a new table will be created with the Hub's IDENT. Specifying the table will result
102 102 in tasks from previous sessions being available via Clients' db_query and
103 103 get_result methods.""")
104 104
105 105 if sqlite3 is not None:
106 106 _db = Instance('sqlite3.Connection')
107 107 else:
108 108 _db = None
109 109 # the ordered list of column names
110 110 _keys = List(['msg_id' ,
111 111 'header' ,
112 112 'content',
113 113 'buffers',
114 114 'submitted',
115 115 'client_uuid' ,
116 116 'engine_uuid' ,
117 117 'started',
118 118 'completed',
119 119 'resubmitted',
120 120 'received',
121 121 'result_header' ,
122 122 'result_content' ,
123 123 'result_buffers' ,
124 124 'queue' ,
125 125 'pyin' ,
126 126 'pyout',
127 127 'pyerr',
128 128 'stdout',
129 129 'stderr',
130 130 ])
131 131 # sqlite datatypes for checking that db is current format
132 132 _types = Dict({'msg_id' : 'text' ,
133 133 'header' : 'dict text',
134 134 'content' : 'dict text',
135 135 'buffers' : 'bufs blob',
136 136 'submitted' : 'timestamp',
137 137 'client_uuid' : 'text',
138 138 'engine_uuid' : 'text',
139 139 'started' : 'timestamp',
140 140 'completed' : 'timestamp',
141 'resubmitted' : 'timestamp',
141 'resubmitted' : 'text',
142 142 'received' : 'timestamp',
143 143 'result_header' : 'dict text',
144 144 'result_content' : 'dict text',
145 145 'result_buffers' : 'bufs blob',
146 146 'queue' : 'text',
147 147 'pyin' : 'text',
148 148 'pyout' : 'text',
149 149 'pyerr' : 'text',
150 150 'stdout' : 'text',
151 151 'stderr' : 'text',
152 152 })
153 153
154 154 def __init__(self, **kwargs):
155 155 super(SQLiteDB, self).__init__(**kwargs)
156 156 if sqlite3 is None:
157 157 raise ImportError("SQLiteDB requires sqlite3")
158 158 if not self.table:
159 159 # use session, and prefix _, since starting with # is illegal
160 160 self.table = '_'+self.session.replace('-','_')
161 161 if not self.location:
162 162 # get current profile
163 163 from IPython.core.application import BaseIPythonApplication
164 164 if BaseIPythonApplication.initialized():
165 165 app = BaseIPythonApplication.instance()
166 166 if app.profile_dir is not None:
167 167 self.location = app.profile_dir.location
168 168 else:
169 169 self.location = u'.'
170 170 else:
171 171 self.location = u'.'
172 172 self._init_db()
173 173
174 174 # register db commit as 2s periodic callback
175 175 # to prevent clogging pipes
176 176 # assumes we are being run in a zmq ioloop app
177 177 loop = ioloop.IOLoop.instance()
178 178 pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop)
179 179 pc.start()
180 180
181 181 def _defaults(self, keys=None):
182 182 """create an empty record"""
183 183 d = {}
184 184 keys = self._keys if keys is None else keys
185 185 for key in keys:
186 186 d[key] = None
187 187 return d
188 188
189 189 def _check_table(self):
190 190 """Ensure that an incorrect table doesn't exist
191 191
192 192 If a bad (old) table does exist, return False
193 193 """
194 194 cursor = self._db.execute("PRAGMA table_info(%s)"%self.table)
195 195 lines = cursor.fetchall()
196 196 if not lines:
197 197 # table does not exist
198 198 return True
199 199 types = {}
200 200 keys = []
201 201 for line in lines:
202 202 keys.append(line[1])
203 203 types[line[1]] = line[2]
204 204 if self._keys != keys:
205 205 # key mismatch
206 206 self.log.warn('keys mismatch')
207 207 return False
208 208 for key in self._keys:
209 209 if types[key] != self._types[key]:
210 210 self.log.warn(
211 211 'type mismatch: %s: %s != %s'%(key,types[key],self._types[key])
212 212 )
213 213 return False
214 214 return True
215 215
216 216 def _init_db(self):
217 217 """Connect to the database and get new session number."""
218 218 # register adapters
219 219 sqlite3.register_adapter(dict, _adapt_dict)
220 220 sqlite3.register_converter('dict', _convert_dict)
221 221 sqlite3.register_adapter(list, _adapt_bufs)
222 222 sqlite3.register_converter('bufs', _convert_bufs)
223 223 # connect to the db
224 224 dbfile = os.path.join(self.location, self.filename)
225 225 self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES,
226 226 # isolation_level = None)#,
227 227 cached_statements=64)
228 228 # print dir(self._db)
229 229 first_table = previous_table = self.table
230 230 i=0
231 231 while not self._check_table():
232 232 i+=1
233 233 self.table = first_table+'_%i'%i
234 234 self.log.warn(
235 235 "Table %s exists and doesn't match db format, trying %s"%
236 236 (previous_table, self.table)
237 237 )
238 238 previous_table = self.table
239 239
240 240 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
241 241 (msg_id text PRIMARY KEY,
242 242 header dict text,
243 243 content dict text,
244 244 buffers bufs blob,
245 245 submitted timestamp,
246 246 client_uuid text,
247 247 engine_uuid text,
248 248 started timestamp,
249 249 completed timestamp,
250 resubmitted timestamp,
250 resubmitted text,
251 251 received timestamp,
252 252 result_header dict text,
253 253 result_content dict text,
254 254 result_buffers bufs blob,
255 255 queue text,
256 256 pyin text,
257 257 pyout text,
258 258 pyerr text,
259 259 stdout text,
260 260 stderr text)
261 261 """%self.table)
262 262 self._db.commit()
263 263
264 264 def _dict_to_list(self, d):
265 265 """turn a mongodb-style record dict into a list."""
266 266
267 267 return [ d[key] for key in self._keys ]
268 268
269 269 def _list_to_dict(self, line, keys=None):
270 270 """Inverse of dict_to_list"""
271 271 keys = self._keys if keys is None else keys
272 272 d = self._defaults(keys)
273 273 for key,value in zip(keys, line):
274 274 d[key] = value
275 275
276 276 return d
277 277
278 278 def _render_expression(self, check):
279 279 """Turn a mongodb-style search dict into an SQL query."""
280 280 expressions = []
281 281 args = []
282 282
283 283 skeys = set(check.keys())
284 284 skeys.difference_update(set(self._keys))
285 285 skeys.difference_update(set(['buffers', 'result_buffers']))
286 286 if skeys:
287 287 raise KeyError("Illegal testing key(s): %s"%skeys)
288 288
289 289 for name,sub_check in check.iteritems():
290 290 if isinstance(sub_check, dict):
291 291 for test,value in sub_check.iteritems():
292 292 try:
293 293 op = operators[test]
294 294 except KeyError:
295 295 raise KeyError("Unsupported operator: %r"%test)
296 296 if isinstance(op, tuple):
297 297 op, join = op
298 298
299 299 if value is None and op in null_operators:
300 300 expr = "%s %s" % (name, null_operators[op])
301 301 else:
302 302 expr = "%s %s ?"%(name, op)
303 303 if isinstance(value, (tuple,list)):
304 304 if op in null_operators and any([v is None for v in value]):
305 305 # equality tests don't work with NULL
306 306 raise ValueError("Cannot use %r test with NULL values on SQLite backend"%test)
307 307 expr = '( %s )'%( join.join([expr]*len(value)) )
308 308 args.extend(value)
309 309 else:
310 310 args.append(value)
311 311 expressions.append(expr)
312 312 else:
313 313 # it's an equality check
314 314 if sub_check is None:
315 315 expressions.append("%s IS NULL" % name)
316 316 else:
317 317 expressions.append("%s = ?"%name)
318 318 args.append(sub_check)
319 319
320 320 expr = " AND ".join(expressions)
321 321 return expr, args
322 322
323 323 def add_record(self, msg_id, rec):
324 324 """Add a new Task Record, by msg_id."""
325 325 d = self._defaults()
326 326 d.update(rec)
327 327 d['msg_id'] = msg_id
328 328 line = self._dict_to_list(d)
329 329 tups = '(%s)'%(','.join(['?']*len(line)))
330 330 self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line)
331 331 # self._db.commit()
332 332
333 333 def get_record(self, msg_id):
334 334 """Get a specific Task Record, by msg_id."""
335 335 cursor = self._db.execute("""SELECT * FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
336 336 line = cursor.fetchone()
337 337 if line is None:
338 338 raise KeyError("No such msg: %r"%msg_id)
339 339 return self._list_to_dict(line)
340 340
341 341 def update_record(self, msg_id, rec):
342 342 """Update the data in an existing record."""
343 343 query = "UPDATE %s SET "%self.table
344 344 sets = []
345 345 keys = sorted(rec.keys())
346 346 values = []
347 347 for key in keys:
348 348 sets.append('%s = ?'%key)
349 349 values.append(rec[key])
350 350 query += ', '.join(sets)
351 351 query += ' WHERE msg_id == ?'
352 352 values.append(msg_id)
353 353 self._db.execute(query, values)
354 354 # self._db.commit()
355 355
356 356 def drop_record(self, msg_id):
357 357 """Remove a record from the DB."""
358 358 self._db.execute("""DELETE FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
359 359 # self._db.commit()
360 360
361 361 def drop_matching_records(self, check):
362 362 """Remove a record from the DB."""
363 363 expr,args = self._render_expression(check)
364 364 query = "DELETE FROM %s WHERE %s"%(self.table, expr)
365 365 self._db.execute(query,args)
366 366 # self._db.commit()
367 367
368 368 def find_records(self, check, keys=None):
369 369 """Find records matching a query dict, optionally extracting subset of keys.
370 370
371 371 Returns list of matching records.
372 372
373 373 Parameters
374 374 ----------
375 375
376 376 check: dict
377 377 mongodb-style query argument
378 378 keys: list of strs [optional]
379 379 if specified, the subset of keys to extract. msg_id will *always* be
380 380 included.
381 381 """
382 382 if keys:
383 383 bad_keys = [ key for key in keys if key not in self._keys ]
384 384 if bad_keys:
385 385 raise KeyError("Bad record key(s): %s"%bad_keys)
386 386
387 387 if keys:
388 388 # ensure msg_id is present and first:
389 389 if 'msg_id' in keys:
390 390 keys.remove('msg_id')
391 391 keys.insert(0, 'msg_id')
392 392 req = ', '.join(keys)
393 393 else:
394 394 req = '*'
395 395 expr,args = self._render_expression(check)
396 396 query = """SELECT %s FROM %s WHERE %s"""%(req, self.table, expr)
397 397 cursor = self._db.execute(query, args)
398 398 matches = cursor.fetchall()
399 399 records = []
400 400 for line in matches:
401 401 rec = self._list_to_dict(line, keys)
402 402 records.append(rec)
403 403 return records
404 404
405 405 def get_history(self):
406 406 """get all msg_ids, ordered by time submitted."""
407 407 query = """SELECT msg_id FROM %s ORDER by submitted ASC"""%self.table
408 408 cursor = self._db.execute(query)
409 409 # will be a list of length 1 tuples
410 410 return [ tup[0] for tup in cursor.fetchall()]
411 411
412 412 __all__ = ['SQLiteDB'] No newline at end of file
@@ -1,386 +1,387 b''
1 1 """Tests for parallel client.py
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 from __future__ import division
20 20
21 21 import time
22 22 from datetime import datetime
23 23 from tempfile import mktemp
24 24
25 25 import zmq
26 26
27 27 from IPython.parallel.client import client as clientmod
28 28 from IPython.parallel import error
29 29 from IPython.parallel import AsyncResult, AsyncHubResult
30 30 from IPython.parallel import LoadBalancedView, DirectView
31 31
32 32 from clienttest import ClusterTestCase, segfault, wait, add_engines
33 33
34 34 def setup():
35 35 add_engines(4, total=True)
36 36
37 37 class TestClient(ClusterTestCase):
38 38
39 39 def test_ids(self):
40 40 n = len(self.client.ids)
41 41 self.add_engines(2)
42 42 self.assertEquals(len(self.client.ids), n+2)
43 43
44 44 def test_view_indexing(self):
45 45 """test index access for views"""
46 46 self.minimum_engines(4)
47 47 targets = self.client._build_targets('all')[-1]
48 48 v = self.client[:]
49 49 self.assertEquals(v.targets, targets)
50 50 t = self.client.ids[2]
51 51 v = self.client[t]
52 52 self.assert_(isinstance(v, DirectView))
53 53 self.assertEquals(v.targets, t)
54 54 t = self.client.ids[2:4]
55 55 v = self.client[t]
56 56 self.assert_(isinstance(v, DirectView))
57 57 self.assertEquals(v.targets, t)
58 58 v = self.client[::2]
59 59 self.assert_(isinstance(v, DirectView))
60 60 self.assertEquals(v.targets, targets[::2])
61 61 v = self.client[1::3]
62 62 self.assert_(isinstance(v, DirectView))
63 63 self.assertEquals(v.targets, targets[1::3])
64 64 v = self.client[:-3]
65 65 self.assert_(isinstance(v, DirectView))
66 66 self.assertEquals(v.targets, targets[:-3])
67 67 v = self.client[-1]
68 68 self.assert_(isinstance(v, DirectView))
69 69 self.assertEquals(v.targets, targets[-1])
70 70 self.assertRaises(TypeError, lambda : self.client[None])
71 71
72 72 def test_lbview_targets(self):
73 73 """test load_balanced_view targets"""
74 74 v = self.client.load_balanced_view()
75 75 self.assertEquals(v.targets, None)
76 76 v = self.client.load_balanced_view(-1)
77 77 self.assertEquals(v.targets, [self.client.ids[-1]])
78 78 v = self.client.load_balanced_view('all')
79 79 self.assertEquals(v.targets, None)
80 80
81 81 def test_dview_targets(self):
82 82 """test direct_view targets"""
83 83 v = self.client.direct_view()
84 84 self.assertEquals(v.targets, 'all')
85 85 v = self.client.direct_view('all')
86 86 self.assertEquals(v.targets, 'all')
87 87 v = self.client.direct_view(-1)
88 88 self.assertEquals(v.targets, self.client.ids[-1])
89 89
90 90 def test_lazy_all_targets(self):
91 91 """test lazy evaluation of rc.direct_view('all')"""
92 92 v = self.client.direct_view()
93 93 self.assertEquals(v.targets, 'all')
94 94
95 95 def double(x):
96 96 return x*2
97 97 seq = range(100)
98 98 ref = [ double(x) for x in seq ]
99 99
100 100 # add some engines, which should be used
101 101 self.add_engines(1)
102 102 n1 = len(self.client.ids)
103 103
104 104 # simple apply
105 105 r = v.apply_sync(lambda : 1)
106 106 self.assertEquals(r, [1] * n1)
107 107
108 108 # map goes through remotefunction
109 109 r = v.map_sync(double, seq)
110 110 self.assertEquals(r, ref)
111 111
112 112 # add a couple more engines, and try again
113 113 self.add_engines(2)
114 114 n2 = len(self.client.ids)
115 115 self.assertNotEquals(n2, n1)
116 116
117 117 # apply
118 118 r = v.apply_sync(lambda : 1)
119 119 self.assertEquals(r, [1] * n2)
120 120
121 121 # map
122 122 r = v.map_sync(double, seq)
123 123 self.assertEquals(r, ref)
124 124
125 125 def test_targets(self):
126 126 """test various valid targets arguments"""
127 127 build = self.client._build_targets
128 128 ids = self.client.ids
129 129 idents,targets = build(None)
130 130 self.assertEquals(ids, targets)
131 131
132 132 def test_clear(self):
133 133 """test clear behavior"""
134 134 self.minimum_engines(2)
135 135 v = self.client[:]
136 136 v.block=True
137 137 v.push(dict(a=5))
138 138 v.pull('a')
139 139 id0 = self.client.ids[-1]
140 140 self.client.clear(targets=id0, block=True)
141 141 a = self.client[:-1].get('a')
142 142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 143 self.client.clear(block=True)
144 144 for i in self.client.ids:
145 145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146 146
147 147 def test_get_result(self):
148 148 """test getting results from the Hub."""
149 149 c = clientmod.Client(profile='iptest')
150 150 t = c.ids[-1]
151 151 ar = c[t].apply_async(wait, 1)
152 152 # give the monitor time to notice the message
153 153 time.sleep(.25)
154 154 ahr = self.client.get_result(ar.msg_ids)
155 155 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 156 self.assertEquals(ahr.get(), ar.get())
157 157 ar2 = self.client.get_result(ar.msg_ids)
158 158 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 159 c.close()
160 160
161 161 def test_ids_list(self):
162 162 """test client.ids"""
163 163 ids = self.client.ids
164 164 self.assertEquals(ids, self.client._ids)
165 165 self.assertFalse(ids is self.client._ids)
166 166 ids.remove(ids[-1])
167 167 self.assertNotEquals(ids, self.client._ids)
168 168
169 169 def test_queue_status(self):
170 170 ids = self.client.ids
171 171 id0 = ids[0]
172 172 qs = self.client.queue_status(targets=id0)
173 173 self.assertTrue(isinstance(qs, dict))
174 174 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
175 175 allqs = self.client.queue_status()
176 176 self.assertTrue(isinstance(allqs, dict))
177 177 intkeys = list(allqs.keys())
178 178 intkeys.remove('unassigned')
179 179 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
180 180 unassigned = allqs.pop('unassigned')
181 181 for eid,qs in allqs.items():
182 182 self.assertTrue(isinstance(qs, dict))
183 183 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
184 184
185 185 def test_shutdown(self):
186 186 ids = self.client.ids
187 187 id0 = ids[0]
188 188 self.client.shutdown(id0, block=True)
189 189 while id0 in self.client.ids:
190 190 time.sleep(0.1)
191 191 self.client.spin()
192 192
193 193 self.assertRaises(IndexError, lambda : self.client[id0])
194 194
195 195 def test_result_status(self):
196 196 pass
197 197 # to be written
198 198
199 199 def test_db_query_dt(self):
200 200 """test db query by date"""
201 201 hist = self.client.hub_history()
202 202 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
203 203 tic = middle['submitted']
204 204 before = self.client.db_query({'submitted' : {'$lt' : tic}})
205 205 after = self.client.db_query({'submitted' : {'$gte' : tic}})
206 206 self.assertEquals(len(before)+len(after),len(hist))
207 207 for b in before:
208 208 self.assertTrue(b['submitted'] < tic)
209 209 for a in after:
210 210 self.assertTrue(a['submitted'] >= tic)
211 211 same = self.client.db_query({'submitted' : tic})
212 212 for s in same:
213 213 self.assertTrue(s['submitted'] == tic)
214 214
215 215 def test_db_query_keys(self):
216 216 """test extracting subset of record keys"""
217 217 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
218 218 for rec in found:
219 219 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
220 220
221 221 def test_db_query_default_keys(self):
222 222 """default db_query excludes buffers"""
223 223 found = self.client.db_query({'msg_id': {'$ne' : ''}})
224 224 for rec in found:
225 225 keys = set(rec.keys())
226 226 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
227 227 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
228 228
229 229 def test_db_query_msg_id(self):
230 230 """ensure msg_id is always in db queries"""
231 231 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
232 232 for rec in found:
233 233 self.assertTrue('msg_id' in rec.keys())
234 234 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
235 235 for rec in found:
236 236 self.assertTrue('msg_id' in rec.keys())
237 237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
238 238 for rec in found:
239 239 self.assertTrue('msg_id' in rec.keys())
240 240
241 241 def test_db_query_get_result(self):
242 242 """pop in db_query shouldn't pop from result itself"""
243 243 self.client[:].apply_sync(lambda : 1)
244 244 found = self.client.db_query({'msg_id': {'$ne' : ''}})
245 245 rc2 = clientmod.Client(profile='iptest')
246 246 # If this bug is not fixed, this call will hang:
247 247 ar = rc2.get_result(self.client.history[-1])
248 248 ar.wait(2)
249 249 self.assertTrue(ar.ready())
250 250 ar.get()
251 251 rc2.close()
252 252
253 253 def test_db_query_in(self):
254 254 """test db query with '$in','$nin' operators"""
255 255 hist = self.client.hub_history()
256 256 even = hist[::2]
257 257 odd = hist[1::2]
258 258 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
259 259 found = [ r['msg_id'] for r in recs ]
260 260 self.assertEquals(set(even), set(found))
261 261 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
262 262 found = [ r['msg_id'] for r in recs ]
263 263 self.assertEquals(set(odd), set(found))
264 264
265 265 def test_hub_history(self):
266 266 hist = self.client.hub_history()
267 267 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
268 268 recdict = {}
269 269 for rec in recs:
270 270 recdict[rec['msg_id']] = rec
271 271
272 272 latest = datetime(1984,1,1)
273 273 for msg_id in hist:
274 274 rec = recdict[msg_id]
275 275 newt = rec['submitted']
276 276 self.assertTrue(newt >= latest)
277 277 latest = newt
278 278 ar = self.client[-1].apply_async(lambda : 1)
279 279 ar.get()
280 280 time.sleep(0.25)
281 281 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
282 282
283 283 def _wait_for_idle(self):
284 284 """wait for an engine to become idle, according to the Hub"""
285 285 rc = self.client
286 286
287 287 # timeout 2s, polling every 100ms
288 288 for i in range(20):
289 289 qs = rc.queue_status()
290 290 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
291 291 time.sleep(0.1)
292 292 else:
293 293 break
294 294
295 295 # ensure Hub up to date:
296 296 qs = rc.queue_status()
297 297 self.assertEquals(qs['unassigned'], 0)
298 298 for eid in rc.ids:
299 299 self.assertEquals(qs[eid]['tasks'], 0)
300 300
301 301
302 302 def test_resubmit(self):
303 303 def f():
304 304 import random
305 305 return random.random()
306 306 v = self.client.load_balanced_view()
307 307 ar = v.apply_async(f)
308 308 r1 = ar.get(1)
309 309 # give the Hub a chance to notice:
310 310 self._wait_for_idle()
311 311 ahr = self.client.resubmit(ar.msg_ids)
312 312 r2 = ahr.get(1)
313 313 self.assertFalse(r1 == r2)
314 314
315 315 def test_resubmit_aborted(self):
316 316 def f():
317 317 import random
318 318 return random.random()
319 319 v = self.client.load_balanced_view()
320 320 # restrict to one engine, so we can put a sleep
321 321 # ahead of the task, so it will get aborted
322 322 eid = self.client.ids[-1]
323 323 v.targets = [eid]
324 324 sleep = v.apply_async(time.sleep, 0.5)
325 325 ar = v.apply_async(f)
326 326 ar.abort()
327 327 self.assertRaises(error.TaskAborted, ar.get)
328 328 # Give the Hub a chance to get up to date:
329 329 self._wait_for_idle()
330 330 ahr = self.client.resubmit(ar.msg_ids)
331 331 r2 = ahr.get(1)
332 332
333 333 def test_resubmit_inflight(self):
334 """ensure ValueError on resubmit of inflight task"""
334 """resubmit of inflight task"""
335 335 v = self.client.load_balanced_view()
336 336 ar = v.apply_async(time.sleep,1)
337 337 # give the message a chance to arrive
338 338 time.sleep(0.2)
339 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
339 ahr = self.client.resubmit(ar.msg_ids)
340 340 ar.get(2)
341 ahr.get(2)
341 342
342 343 def test_resubmit_badkey(self):
343 344 """ensure KeyError on resubmit of nonexistant task"""
344 345 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
345 346
346 347 def test_purge_results(self):
347 348 # ensure there are some tasks
348 349 for i in range(5):
349 350 self.client[:].apply_sync(lambda : 1)
350 351 # Wait for the Hub to realise the result is done:
351 352 # This prevents a race condition, where we
352 353 # might purge a result the Hub still thinks is pending.
353 354 time.sleep(0.1)
354 355 rc2 = clientmod.Client(profile='iptest')
355 356 hist = self.client.hub_history()
356 357 ahr = rc2.get_result([hist[-1]])
357 358 ahr.wait(10)
358 359 self.client.purge_results(hist[-1])
359 360 newhist = self.client.hub_history()
360 361 self.assertEquals(len(newhist)+1,len(hist))
361 362 rc2.spin()
362 363 rc2.close()
363 364
364 365 def test_purge_all_results(self):
365 366 self.client.purge_results('all')
366 367 hist = self.client.hub_history()
367 368 self.assertEquals(len(hist), 0)
368 369
369 370 def test_spin_thread(self):
370 371 self.client.spin_thread(0.01)
371 372 ar = self.client[-1].apply_async(lambda : 1)
372 373 time.sleep(0.1)
373 374 self.assertTrue(ar.wall_time < 0.1,
374 375 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
375 376 )
376 377
377 378 def test_stop_spin_thread(self):
378 379 self.client.spin_thread(0.01)
379 380 self.client.stop_spin_thread()
380 381 ar = self.client[-1].apply_async(lambda : 1)
381 382 time.sleep(0.15)
382 383 self.assertTrue(ar.wall_time > 0.1,
383 384 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
384 385 )
385 386
386 387
@@ -1,137 +1,137 b''
1 1 .. _parallel_db:
2 2
3 3 =======================
4 4 IPython's Task Database
5 5 =======================
6 6
7 7 The IPython Hub stores all task requests and results in a database. Currently supported backends
8 8 are: MongoDB, SQLite (the default), and an in-memory DictDB. The most common use case for
9 9 this is clients requesting results for tasks they did not submit, via:
10 10
11 11 .. sourcecode:: ipython
12 12
13 13 In [1]: rc.get_result(task_id)
14 14
15 15 However, since we have this DB backend, we provide a direct query method in the :class:`client`
16 16 for users who want deeper introspection into their task history. The :meth:`db_query` method of
17 17 the Client is modeled after MongoDB queries, so if you have used MongoDB it should look
18 18 familiar. In fact, when the MongoDB backend is in use, the query is relayed directly. However,
19 19 when using other backends, the interface is emulated and only a subset of queries is possible.
20 20
21 21 .. seealso::
22 22
23 23 MongoDB query docs: http://www.mongodb.org/display/DOCS/Querying
24 24
25 25 :meth:`Client.db_query` takes a dictionary query object, with keys from the TaskRecord key list,
26 26 and values of either exact values to test, or MongoDB queries, which are dicts of The form:
27 27 ``{'operator' : 'argument(s)'}``. There is also an optional `keys` argument, that specifies
28 28 which subset of keys should be retrieved. The default is to retrieve all keys excluding the
29 29 request and result buffers. :meth:`db_query` returns a list of TaskRecord dicts. Also like
30 30 MongoDB, the `msg_id` key will always be included, whether requested or not.
31 31
32 32 TaskRecord keys:
33 33
34 34 =============== =============== =============
35 35 Key Type Description
36 36 =============== =============== =============
37 msg_id uuid(bytes) The msg ID
37 msg_id uuid(ascii) The msg ID
38 38 header dict The request header
39 39 content dict The request content (likely empty)
40 40 buffers list(bytes) buffers containing serialized request objects
41 41 submitted datetime timestamp for time of submission (set by client)
42 42 client_uuid uuid(bytes) IDENT of client's socket
43 43 engine_uuid uuid(bytes) IDENT of engine's socket
44 44 started datetime time task began execution on engine
45 45 completed datetime time task finished execution (success or failure) on engine
46 resubmitted datetime time of resubmission (if applicable)
46 resubmitted uuid(ascii) msg_id of resubmitted task (if applicable)
47 47 result_header dict header for result
48 48 result_content dict content for result
49 49 result_buffers list(bytes) buffers containing serialized request objects
50 50 queue bytes The name of the queue for the task ('mux' or 'task')
51 51 pyin <unused> Python input (unused)
52 52 pyout <unused> Python output (unused)
53 53 pyerr <unused> Python traceback (unused)
54 54 stdout str Stream of stdout data
55 55 stderr str Stream of stderr data
56 56
57 57 =============== =============== =============
58 58
59 59 MongoDB operators we emulate on all backends:
60 60
61 61 ========== =================
62 62 Operator Python equivalent
63 63 ========== =================
64 64 '$in' in
65 65 '$nin' not in
66 66 '$eq' ==
67 67 '$ne' !=
68 68 '$ge' >
69 69 '$gte' >=
70 70 '$le' <
71 71 '$lte' <=
72 72 ========== =================
73 73
74 74
75 75 The DB Query is useful for two primary cases:
76 76
77 77 1. deep polling of task status or metadata
78 78 2. selecting a subset of tasks, on which to perform a later operation (e.g. wait on result, purge records, resubmit,...)
79 79
80 80 Example Queries
81 81 ===============
82 82
83 83
84 84 To get all msg_ids that are not completed, only retrieving their ID and start time:
85 85
86 86 .. sourcecode:: ipython
87 87
88 88 In [1]: incomplete = rc.db_query({'complete' : None}, keys=['msg_id', 'started'])
89 89
90 90 All jobs started in the last hour by me:
91 91
92 92 .. sourcecode:: ipython
93 93
94 94 In [1]: from datetime import datetime, timedelta
95 95
96 96 In [2]: hourago = datetime.now() - timedelta(1./24)
97 97
98 98 In [3]: recent = rc.db_query({'started' : {'$gte' : hourago },
99 99 'client_uuid' : rc.session.session})
100 100
101 101 All jobs started more than an hour ago, by clients *other than me*:
102 102
103 103 .. sourcecode:: ipython
104 104
105 105 In [3]: recent = rc.db_query({'started' : {'$le' : hourago },
106 106 'client_uuid' : {'$ne' : rc.session.session}})
107 107
108 108 Result headers for all jobs on engine 3 or 4:
109 109
110 110 .. sourcecode:: ipython
111 111
112 112 In [1]: uuids = map(rc._engines.get, (3,4))
113 113
114 114 In [2]: hist34 = rc.db_query({'engine_uuid' : {'$in' : uuids }, keys='result_header')
115 115
116 116
117 117 Cost
118 118 ====
119 119
120 120 The advantage of the database backends is, of course, that large amounts of
121 121 data can be stored that won't fit in memory. The default 'backend' is actually
122 122 to just store all of this information in a Python dictionary. This is very fast,
123 123 but will run out of memory quickly if you move a lot of data around, or your
124 124 cluster is to run for a long time.
125 125
126 126 Unfortunately, the DB backends (SQLite and MongoDB) right now are rather slow,
127 127 and can still consume large amounts of resources, particularly if large tasks
128 128 or results are being created at a high frequency.
129 129
130 130 For this reason, we have added :class:`~.NoDB`,a dummy backend that doesn't
131 131 actually store any information. When you use this database, nothing is stored,
132 132 and any request for results will result in a KeyError. This obviously prevents
133 133 later requests for results and task resubmission from functioning, but
134 134 sometimes those nice features are not as useful as keeping Hub memory under
135 135 control.
136 136
137 137
General Comments 0
You need to be logged in to leave comments. Login now