##// END OF EJS Templates
add 'received' timestamp to DB...
MinRK -
Show More
@@ -1,1452 +1,1454 b''
1 1 """A semi-synchronous Client for the ZMQ cluster
2 2
3 3 Authors:
4 4
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import json
20 20 import sys
21 21 import time
22 22 import warnings
23 23 from datetime import datetime
24 24 from getpass import getpass
25 25 from pprint import pprint
26 26
27 27 pjoin = os.path.join
28 28
29 29 import zmq
30 30 # from zmq.eventloop import ioloop, zmqstream
31 31
32 32 from IPython.config.configurable import MultipleInstanceError
33 33 from IPython.core.application import BaseIPythonApplication
34 34
35 35 from IPython.utils.jsonutil import rekey
36 36 from IPython.utils.localinterfaces import LOCAL_IPS
37 37 from IPython.utils.path import get_ipython_dir
38 38 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
39 39 Dict, List, Bool, Set)
40 40 from IPython.external.decorator import decorator
41 41 from IPython.external.ssh import tunnel
42 42
43 43 from IPython.parallel import Reference
44 44 from IPython.parallel import error
45 45 from IPython.parallel import util
46 46
47 47 from IPython.zmq.session import Session, Message
48 48
49 49 from .asyncresult import AsyncResult, AsyncHubResult
50 50 from IPython.core.profiledir import ProfileDir, ProfileDirError
51 51 from .view import DirectView, LoadBalancedView
52 52
53 53 if sys.version_info[0] >= 3:
54 54 # xrange is used in a couple 'isinstance' tests in py2
55 55 # should be just 'range' in 3k
56 56 xrange = range
57 57
58 58 #--------------------------------------------------------------------------
59 59 # Decorators for Client methods
60 60 #--------------------------------------------------------------------------
61 61
62 62 @decorator
63 63 def spin_first(f, self, *args, **kwargs):
64 64 """Call spin() to sync state prior to calling the method."""
65 65 self.spin()
66 66 return f(self, *args, **kwargs)
67 67
68 68
69 69 #--------------------------------------------------------------------------
70 70 # Classes
71 71 #--------------------------------------------------------------------------
72 72
73 73 class Metadata(dict):
74 74 """Subclass of dict for initializing metadata values.
75 75
76 76 Attribute access works on keys.
77 77
78 78 These objects have a strict set of keys - errors will raise if you try
79 79 to add new keys.
80 80 """
81 81 def __init__(self, *args, **kwargs):
82 82 dict.__init__(self)
83 83 md = {'msg_id' : None,
84 84 'submitted' : None,
85 85 'started' : None,
86 86 'completed' : None,
87 87 'received' : None,
88 88 'engine_uuid' : None,
89 89 'engine_id' : None,
90 90 'follow' : None,
91 91 'after' : None,
92 92 'status' : None,
93 93
94 94 'pyin' : None,
95 95 'pyout' : None,
96 96 'pyerr' : None,
97 97 'stdout' : '',
98 98 'stderr' : '',
99 99 }
100 100 self.update(md)
101 101 self.update(dict(*args, **kwargs))
102 102
103 103 def __getattr__(self, key):
104 104 """getattr aliased to getitem"""
105 105 if key in self.iterkeys():
106 106 return self[key]
107 107 else:
108 108 raise AttributeError(key)
109 109
110 110 def __setattr__(self, key, value):
111 111 """setattr aliased to setitem, with strict"""
112 112 if key in self.iterkeys():
113 113 self[key] = value
114 114 else:
115 115 raise AttributeError(key)
116 116
117 117 def __setitem__(self, key, value):
118 118 """strict static key enforcement"""
119 119 if key in self.iterkeys():
120 120 dict.__setitem__(self, key, value)
121 121 else:
122 122 raise KeyError(key)
123 123
124 124
125 125 class Client(HasTraits):
126 126 """A semi-synchronous client to the IPython ZMQ cluster
127 127
128 128 Parameters
129 129 ----------
130 130
131 131 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
132 132 Connection information for the Hub's registration. If a json connector
133 133 file is given, then likely no further configuration is necessary.
134 134 [Default: use profile]
135 135 profile : bytes
136 136 The name of the Cluster profile to be used to find connector information.
137 137 If run from an IPython application, the default profile will be the same
138 138 as the running application, otherwise it will be 'default'.
139 139 context : zmq.Context
140 140 Pass an existing zmq.Context instance, otherwise the client will create its own.
141 141 debug : bool
142 142 flag for lots of message printing for debug purposes
143 143 timeout : int/float
144 144 time (in seconds) to wait for connection replies from the Hub
145 145 [Default: 10]
146 146
147 147 #-------------- session related args ----------------
148 148
149 149 config : Config object
150 150 If specified, this will be relayed to the Session for configuration
151 151 username : str
152 152 set username for the session object
153 153 packer : str (import_string) or callable
154 154 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
155 155 function to serialize messages. Must support same input as
156 156 JSON, and output must be bytes.
157 157 You can pass a callable directly as `pack`
158 158 unpacker : str (import_string) or callable
159 159 The inverse of packer. Only necessary if packer is specified as *not* one
160 160 of 'json' or 'pickle'.
161 161
162 162 #-------------- ssh related args ----------------
163 163 # These are args for configuring the ssh tunnel to be used
164 164 # credentials are used to forward connections over ssh to the Controller
165 165 # Note that the ip given in `addr` needs to be relative to sshserver
166 166 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
167 167 # and set sshserver as the same machine the Controller is on. However,
168 168 # the only requirement is that sshserver is able to see the Controller
169 169 # (i.e. is within the same trusted network).
170 170
171 171 sshserver : str
172 172 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
173 173 If keyfile or password is specified, and this is not, it will default to
174 174 the ip given in addr.
175 175 sshkey : str; path to ssh private key file
176 176 This specifies a key to be used in ssh login, default None.
177 177 Regular default ssh keys will be used without specifying this argument.
178 178 password : str
179 179 Your ssh password to sshserver. Note that if this is left None,
180 180 you will be prompted for it if passwordless key based login is unavailable.
181 181 paramiko : bool
182 182 flag for whether to use paramiko instead of shell ssh for tunneling.
183 183 [default: True on win32, False else]
184 184
185 185 ------- exec authentication args -------
186 186 If even localhost is untrusted, you can have some protection against
187 187 unauthorized execution by signing messages with HMAC digests.
188 188 Messages are still sent as cleartext, so if someone can snoop your
189 189 loopback traffic this will not protect your privacy, but will prevent
190 190 unauthorized execution.
191 191
192 192 exec_key : str
193 193 an authentication key or file containing a key
194 194 default: None
195 195
196 196
197 197 Attributes
198 198 ----------
199 199
200 200 ids : list of int engine IDs
201 201 requesting the ids attribute always synchronizes
202 202 the registration state. To request ids without synchronization,
203 203 use semi-private _ids attributes.
204 204
205 205 history : list of msg_ids
206 206 a list of msg_ids, keeping track of all the execution
207 207 messages you have submitted in order.
208 208
209 209 outstanding : set of msg_ids
210 210 a set of msg_ids that have been submitted, but whose
211 211 results have not yet been received.
212 212
213 213 results : dict
214 214 a dict of all our results, keyed by msg_id
215 215
216 216 block : bool
217 217 determines default behavior when block not specified
218 218 in execution methods
219 219
220 220 Methods
221 221 -------
222 222
223 223 spin
224 224 flushes incoming results and registration state changes
225 225 control methods spin, and requesting `ids` also ensures up to date
226 226
227 227 wait
228 228 wait on one or more msg_ids
229 229
230 230 execution methods
231 231 apply
232 232 legacy: execute, run
233 233
234 234 data movement
235 235 push, pull, scatter, gather
236 236
237 237 query methods
238 238 queue_status, get_result, purge, result_status
239 239
240 240 control methods
241 241 abort, shutdown
242 242
243 243 """
244 244
245 245
246 246 block = Bool(False)
247 247 outstanding = Set()
248 248 results = Instance('collections.defaultdict', (dict,))
249 249 metadata = Instance('collections.defaultdict', (Metadata,))
250 250 history = List()
251 251 debug = Bool(False)
252 252
253 253 profile=Unicode()
254 254 def _profile_default(self):
255 255 if BaseIPythonApplication.initialized():
256 256 # an IPython app *might* be running, try to get its profile
257 257 try:
258 258 return BaseIPythonApplication.instance().profile
259 259 except (AttributeError, MultipleInstanceError):
260 260 # could be a *different* subclass of config.Application,
261 261 # which would raise one of these two errors.
262 262 return u'default'
263 263 else:
264 264 return u'default'
265 265
266 266
267 267 _outstanding_dict = Instance('collections.defaultdict', (set,))
268 268 _ids = List()
269 269 _connected=Bool(False)
270 270 _ssh=Bool(False)
271 271 _context = Instance('zmq.Context')
272 272 _config = Dict()
273 273 _engines=Instance(util.ReverseDict, (), {})
274 274 # _hub_socket=Instance('zmq.Socket')
275 275 _query_socket=Instance('zmq.Socket')
276 276 _control_socket=Instance('zmq.Socket')
277 277 _iopub_socket=Instance('zmq.Socket')
278 278 _notification_socket=Instance('zmq.Socket')
279 279 _mux_socket=Instance('zmq.Socket')
280 280 _task_socket=Instance('zmq.Socket')
281 281 _task_scheme=Unicode()
282 282 _closed = False
283 283 _ignored_control_replies=Integer(0)
284 284 _ignored_hub_replies=Integer(0)
285 285
286 286 def __new__(self, *args, **kw):
287 287 # don't raise on positional args
288 288 return HasTraits.__new__(self, **kw)
289 289
290 290 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
291 291 context=None, debug=False, exec_key=None,
292 292 sshserver=None, sshkey=None, password=None, paramiko=None,
293 293 timeout=10, **extra_args
294 294 ):
295 295 if profile:
296 296 super(Client, self).__init__(debug=debug, profile=profile)
297 297 else:
298 298 super(Client, self).__init__(debug=debug)
299 299 if context is None:
300 300 context = zmq.Context.instance()
301 301 self._context = context
302 302
303 303 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
304 304 if self._cd is not None:
305 305 if url_or_file is None:
306 306 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
307 307 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
308 308 " Please specify at least one of url_or_file or profile."
309 309
310 310 if not util.is_url(url_or_file):
311 311 # it's not a url, try for a file
312 312 if not os.path.exists(url_or_file):
313 313 if self._cd:
314 314 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
315 315 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
316 316 with open(url_or_file) as f:
317 317 cfg = json.loads(f.read())
318 318 else:
319 319 cfg = {'url':url_or_file}
320 320
321 321 # sync defaults from args, json:
322 322 if sshserver:
323 323 cfg['ssh'] = sshserver
324 324 if exec_key:
325 325 cfg['exec_key'] = exec_key
326 326 exec_key = cfg['exec_key']
327 327 location = cfg.setdefault('location', None)
328 328 cfg['url'] = util.disambiguate_url(cfg['url'], location)
329 329 url = cfg['url']
330 330 proto,addr,port = util.split_url(url)
331 331 if location is not None and addr == '127.0.0.1':
332 332 # location specified, and connection is expected to be local
333 333 if location not in LOCAL_IPS and not sshserver:
334 334 # load ssh from JSON *only* if the controller is not on
335 335 # this machine
336 336 sshserver=cfg['ssh']
337 337 if location not in LOCAL_IPS and not sshserver:
338 338 # warn if no ssh specified, but SSH is probably needed
339 339 # This is only a warning, because the most likely cause
340 340 # is a local Controller on a laptop whose IP is dynamic
341 341 warnings.warn("""
342 342 Controller appears to be listening on localhost, but not on this machine.
343 343 If this is true, you should specify Client(...,sshserver='you@%s')
344 344 or instruct your controller to listen on an external IP."""%location,
345 345 RuntimeWarning)
346 346 elif not sshserver:
347 347 # otherwise sync with cfg
348 348 sshserver = cfg['ssh']
349 349
350 350 self._config = cfg
351 351
352 352 self._ssh = bool(sshserver or sshkey or password)
353 353 if self._ssh and sshserver is None:
354 354 # default to ssh via localhost
355 355 sshserver = url.split('://')[1].split(':')[0]
356 356 if self._ssh and password is None:
357 357 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
358 358 password=False
359 359 else:
360 360 password = getpass("SSH Password for %s: "%sshserver)
361 361 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
362 362
363 363 # configure and construct the session
364 364 if exec_key is not None:
365 365 if os.path.isfile(exec_key):
366 366 extra_args['keyfile'] = exec_key
367 367 else:
368 368 exec_key = util.asbytes(exec_key)
369 369 extra_args['key'] = exec_key
370 370 self.session = Session(**extra_args)
371 371
372 372 self._query_socket = self._context.socket(zmq.DEALER)
373 373 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
374 374 if self._ssh:
375 375 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
376 376 else:
377 377 self._query_socket.connect(url)
378 378
379 379 self.session.debug = self.debug
380 380
381 381 self._notification_handlers = {'registration_notification' : self._register_engine,
382 382 'unregistration_notification' : self._unregister_engine,
383 383 'shutdown_notification' : lambda msg: self.close(),
384 384 }
385 385 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
386 386 'apply_reply' : self._handle_apply_reply}
387 387 self._connect(sshserver, ssh_kwargs, timeout)
388 388
389 389 def __del__(self):
390 390 """cleanup sockets, but _not_ context."""
391 391 self.close()
392 392
393 393 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
394 394 if ipython_dir is None:
395 395 ipython_dir = get_ipython_dir()
396 396 if profile_dir is not None:
397 397 try:
398 398 self._cd = ProfileDir.find_profile_dir(profile_dir)
399 399 return
400 400 except ProfileDirError:
401 401 pass
402 402 elif profile is not None:
403 403 try:
404 404 self._cd = ProfileDir.find_profile_dir_by_name(
405 405 ipython_dir, profile)
406 406 return
407 407 except ProfileDirError:
408 408 pass
409 409 self._cd = None
410 410
411 411 def _update_engines(self, engines):
412 412 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
413 413 for k,v in engines.iteritems():
414 414 eid = int(k)
415 415 self._engines[eid] = v
416 416 self._ids.append(eid)
417 417 self._ids = sorted(self._ids)
418 418 if sorted(self._engines.keys()) != range(len(self._engines)) and \
419 419 self._task_scheme == 'pure' and self._task_socket:
420 420 self._stop_scheduling_tasks()
421 421
422 422 def _stop_scheduling_tasks(self):
423 423 """Stop scheduling tasks because an engine has been unregistered
424 424 from a pure ZMQ scheduler.
425 425 """
426 426 self._task_socket.close()
427 427 self._task_socket = None
428 428 msg = "An engine has been unregistered, and we are using pure " +\
429 429 "ZMQ task scheduling. Task farming will be disabled."
430 430 if self.outstanding:
431 431 msg += " If you were running tasks when this happened, " +\
432 432 "some `outstanding` msg_ids may never resolve."
433 433 warnings.warn(msg, RuntimeWarning)
434 434
435 435 def _build_targets(self, targets):
436 436 """Turn valid target IDs or 'all' into two lists:
437 437 (int_ids, uuids).
438 438 """
439 439 if not self._ids:
440 440 # flush notification socket if no engines yet, just in case
441 441 if not self.ids:
442 442 raise error.NoEnginesRegistered("Can't build targets without any engines")
443 443
444 444 if targets is None:
445 445 targets = self._ids
446 446 elif isinstance(targets, basestring):
447 447 if targets.lower() == 'all':
448 448 targets = self._ids
449 449 else:
450 450 raise TypeError("%r not valid str target, must be 'all'"%(targets))
451 451 elif isinstance(targets, int):
452 452 if targets < 0:
453 453 targets = self.ids[targets]
454 454 if targets not in self._ids:
455 455 raise IndexError("No such engine: %i"%targets)
456 456 targets = [targets]
457 457
458 458 if isinstance(targets, slice):
459 459 indices = range(len(self._ids))[targets]
460 460 ids = self.ids
461 461 targets = [ ids[i] for i in indices ]
462 462
463 463 if not isinstance(targets, (tuple, list, xrange)):
464 464 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
465 465
466 466 return [util.asbytes(self._engines[t]) for t in targets], list(targets)
467 467
468 468 def _connect(self, sshserver, ssh_kwargs, timeout):
469 469 """setup all our socket connections to the cluster. This is called from
470 470 __init__."""
471 471
472 472 # Maybe allow reconnecting?
473 473 if self._connected:
474 474 return
475 475 self._connected=True
476 476
477 477 def connect_socket(s, url):
478 478 url = util.disambiguate_url(url, self._config['location'])
479 479 if self._ssh:
480 480 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
481 481 else:
482 482 return s.connect(url)
483 483
484 484 self.session.send(self._query_socket, 'connection_request')
485 485 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
486 486 poller = zmq.Poller()
487 487 poller.register(self._query_socket, zmq.POLLIN)
488 488 # poll expects milliseconds, timeout is seconds
489 489 evts = poller.poll(timeout*1000)
490 490 if not evts:
491 491 raise error.TimeoutError("Hub connection request timed out")
492 492 idents,msg = self.session.recv(self._query_socket,mode=0)
493 493 if self.debug:
494 494 pprint(msg)
495 495 msg = Message(msg)
496 496 content = msg.content
497 497 self._config['registration'] = dict(content)
498 498 if content.status == 'ok':
499 499 ident = self.session.bsession
500 500 if content.mux:
501 501 self._mux_socket = self._context.socket(zmq.DEALER)
502 502 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
503 503 connect_socket(self._mux_socket, content.mux)
504 504 if content.task:
505 505 self._task_scheme, task_addr = content.task
506 506 self._task_socket = self._context.socket(zmq.DEALER)
507 507 self._task_socket.setsockopt(zmq.IDENTITY, ident)
508 508 connect_socket(self._task_socket, task_addr)
509 509 if content.notification:
510 510 self._notification_socket = self._context.socket(zmq.SUB)
511 511 connect_socket(self._notification_socket, content.notification)
512 512 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
513 513 # if content.query:
514 514 # self._query_socket = self._context.socket(zmq.DEALER)
515 515 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
516 516 # connect_socket(self._query_socket, content.query)
517 517 if content.control:
518 518 self._control_socket = self._context.socket(zmq.DEALER)
519 519 self._control_socket.setsockopt(zmq.IDENTITY, ident)
520 520 connect_socket(self._control_socket, content.control)
521 521 if content.iopub:
522 522 self._iopub_socket = self._context.socket(zmq.SUB)
523 523 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
524 524 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
525 525 connect_socket(self._iopub_socket, content.iopub)
526 526 self._update_engines(dict(content.engines))
527 527 else:
528 528 self._connected = False
529 529 raise Exception("Failed to connect!")
530 530
531 531 #--------------------------------------------------------------------------
532 532 # handlers and callbacks for incoming messages
533 533 #--------------------------------------------------------------------------
534 534
535 535 def _unwrap_exception(self, content):
536 536 """unwrap exception, and remap engine_id to int."""
537 537 e = error.unwrap_exception(content)
538 538 # print e.traceback
539 539 if e.engine_info:
540 540 e_uuid = e.engine_info['engine_uuid']
541 541 eid = self._engines[e_uuid]
542 542 e.engine_info['engine_id'] = eid
543 543 return e
544 544
545 545 def _extract_metadata(self, header, parent, content):
546 546 md = {'msg_id' : parent['msg_id'],
547 547 'received' : datetime.now(),
548 548 'engine_uuid' : header.get('engine', None),
549 549 'follow' : parent.get('follow', []),
550 550 'after' : parent.get('after', []),
551 551 'status' : content['status'],
552 552 }
553 553
554 554 if md['engine_uuid'] is not None:
555 555 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
556 556
557 557 if 'date' in parent:
558 558 md['submitted'] = parent['date']
559 559 if 'started' in header:
560 560 md['started'] = header['started']
561 561 if 'date' in header:
562 562 md['completed'] = header['date']
563 563 return md
564 564
565 565 def _register_engine(self, msg):
566 566 """Register a new engine, and update our connection info."""
567 567 content = msg['content']
568 568 eid = content['id']
569 569 d = {eid : content['queue']}
570 570 self._update_engines(d)
571 571
572 572 def _unregister_engine(self, msg):
573 573 """Unregister an engine that has died."""
574 574 content = msg['content']
575 575 eid = int(content['id'])
576 576 if eid in self._ids:
577 577 self._ids.remove(eid)
578 578 uuid = self._engines.pop(eid)
579 579
580 580 self._handle_stranded_msgs(eid, uuid)
581 581
582 582 if self._task_socket and self._task_scheme == 'pure':
583 583 self._stop_scheduling_tasks()
584 584
585 585 def _handle_stranded_msgs(self, eid, uuid):
586 586 """Handle messages known to be on an engine when the engine unregisters.
587 587
588 588 It is possible that this will fire prematurely - that is, an engine will
589 589 go down after completing a result, and the client will be notified
590 590 of the unregistration and later receive the successful result.
591 591 """
592 592
593 593 outstanding = self._outstanding_dict[uuid]
594 594
595 595 for msg_id in list(outstanding):
596 596 if msg_id in self.results:
597 597 # we already
598 598 continue
599 599 try:
600 600 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
601 601 except:
602 602 content = error.wrap_exception()
603 603 # build a fake message:
604 604 parent = {}
605 605 header = {}
606 606 parent['msg_id'] = msg_id
607 607 header['engine'] = uuid
608 608 header['date'] = datetime.now()
609 609 msg = dict(parent_header=parent, header=header, content=content)
610 610 self._handle_apply_reply(msg)
611 611
612 612 def _handle_execute_reply(self, msg):
613 613 """Save the reply to an execute_request into our results.
614 614
615 615 execute messages are never actually used. apply is used instead.
616 616 """
617 617
618 618 parent = msg['parent_header']
619 619 msg_id = parent['msg_id']
620 620 if msg_id not in self.outstanding:
621 621 if msg_id in self.history:
622 622 print ("got stale result: %s"%msg_id)
623 623 else:
624 624 print ("got unknown result: %s"%msg_id)
625 625 else:
626 626 self.outstanding.remove(msg_id)
627 627 self.results[msg_id] = self._unwrap_exception(msg['content'])
628 628
629 629 def _handle_apply_reply(self, msg):
630 630 """Save the reply to an apply_request into our results."""
631 631 parent = msg['parent_header']
632 632 msg_id = parent['msg_id']
633 633 if msg_id not in self.outstanding:
634 634 if msg_id in self.history:
635 635 print ("got stale result: %s"%msg_id)
636 636 print self.results[msg_id]
637 637 print msg
638 638 else:
639 639 print ("got unknown result: %s"%msg_id)
640 640 else:
641 641 self.outstanding.remove(msg_id)
642 642 content = msg['content']
643 643 header = msg['header']
644 644
645 645 # construct metadata:
646 646 md = self.metadata[msg_id]
647 647 md.update(self._extract_metadata(header, parent, content))
648 648 # is this redundant?
649 649 self.metadata[msg_id] = md
650 650
651 651 e_outstanding = self._outstanding_dict[md['engine_uuid']]
652 652 if msg_id in e_outstanding:
653 653 e_outstanding.remove(msg_id)
654 654
655 655 # construct result:
656 656 if content['status'] == 'ok':
657 657 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
658 658 elif content['status'] == 'aborted':
659 659 self.results[msg_id] = error.TaskAborted(msg_id)
660 660 elif content['status'] == 'resubmitted':
661 661 # TODO: handle resubmission
662 662 pass
663 663 else:
664 664 self.results[msg_id] = self._unwrap_exception(content)
665 665
666 666 def _flush_notifications(self):
667 667 """Flush notifications of engine registrations waiting
668 668 in ZMQ queue."""
669 669 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
670 670 while msg is not None:
671 671 if self.debug:
672 672 pprint(msg)
673 673 msg_type = msg['header']['msg_type']
674 674 handler = self._notification_handlers.get(msg_type, None)
675 675 if handler is None:
676 676 raise Exception("Unhandled message type: %s"%msg.msg_type)
677 677 else:
678 678 handler(msg)
679 679 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
680 680
681 681 def _flush_results(self, sock):
682 682 """Flush task or queue results waiting in ZMQ queue."""
683 683 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
684 684 while msg is not None:
685 685 if self.debug:
686 686 pprint(msg)
687 687 msg_type = msg['header']['msg_type']
688 688 handler = self._queue_handlers.get(msg_type, None)
689 689 if handler is None:
690 690 raise Exception("Unhandled message type: %s"%msg.msg_type)
691 691 else:
692 692 handler(msg)
693 693 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
694 694
695 695 def _flush_control(self, sock):
696 696 """Flush replies from the control channel waiting
697 697 in the ZMQ queue.
698 698
699 699 Currently: ignore them."""
700 700 if self._ignored_control_replies <= 0:
701 701 return
702 702 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
703 703 while msg is not None:
704 704 self._ignored_control_replies -= 1
705 705 if self.debug:
706 706 pprint(msg)
707 707 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
708 708
709 709 def _flush_ignored_control(self):
710 710 """flush ignored control replies"""
711 711 while self._ignored_control_replies > 0:
712 712 self.session.recv(self._control_socket)
713 713 self._ignored_control_replies -= 1
714 714
715 715 def _flush_ignored_hub_replies(self):
716 716 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
717 717 while msg is not None:
718 718 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
719 719
720 720 def _flush_iopub(self, sock):
721 721 """Flush replies from the iopub channel waiting
722 722 in the ZMQ queue.
723 723 """
724 724 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
725 725 while msg is not None:
726 726 if self.debug:
727 727 pprint(msg)
728 728 parent = msg['parent_header']
729 729 # ignore IOPub messages with no parent.
730 730 # Caused by print statements or warnings from before the first execution.
731 731 if not parent:
732 732 continue
733 733 msg_id = parent['msg_id']
734 734 content = msg['content']
735 735 header = msg['header']
736 736 msg_type = msg['header']['msg_type']
737 737
738 738 # init metadata:
739 739 md = self.metadata[msg_id]
740 740
741 741 if msg_type == 'stream':
742 742 name = content['name']
743 743 s = md[name] or ''
744 744 md[name] = s + content['data']
745 745 elif msg_type == 'pyerr':
746 746 md.update({'pyerr' : self._unwrap_exception(content)})
747 747 elif msg_type == 'pyin':
748 748 md.update({'pyin' : content['code']})
749 749 else:
750 750 md.update({msg_type : content.get('data', '')})
751 751
752 752 # reduntant?
753 753 self.metadata[msg_id] = md
754 754
755 755 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
756 756
757 757 #--------------------------------------------------------------------------
758 758 # len, getitem
759 759 #--------------------------------------------------------------------------
760 760
761 761 def __len__(self):
762 762 """len(client) returns # of engines."""
763 763 return len(self.ids)
764 764
765 765 def __getitem__(self, key):
766 766 """index access returns DirectView multiplexer objects
767 767
768 768 Must be int, slice, or list/tuple/xrange of ints"""
769 769 if not isinstance(key, (int, slice, tuple, list, xrange)):
770 770 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
771 771 else:
772 772 return self.direct_view(key)
773 773
774 774 #--------------------------------------------------------------------------
775 775 # Begin public methods
776 776 #--------------------------------------------------------------------------
777 777
778 778 @property
779 779 def ids(self):
780 780 """Always up-to-date ids property."""
781 781 self._flush_notifications()
782 782 # always copy:
783 783 return list(self._ids)
784 784
785 785 def close(self):
786 786 if self._closed:
787 787 return
788 788 snames = filter(lambda n: n.endswith('socket'), dir(self))
789 789 for socket in map(lambda name: getattr(self, name), snames):
790 790 if isinstance(socket, zmq.Socket) and not socket.closed:
791 791 socket.close()
792 792 self._closed = True
793 793
794 794 def spin(self):
795 795 """Flush any registration notifications and execution results
796 796 waiting in the ZMQ queue.
797 797 """
798 798 if self._notification_socket:
799 799 self._flush_notifications()
800 800 if self._mux_socket:
801 801 self._flush_results(self._mux_socket)
802 802 if self._task_socket:
803 803 self._flush_results(self._task_socket)
804 804 if self._control_socket:
805 805 self._flush_control(self._control_socket)
806 806 if self._iopub_socket:
807 807 self._flush_iopub(self._iopub_socket)
808 808 if self._query_socket:
809 809 self._flush_ignored_hub_replies()
810 810
811 811 def wait(self, jobs=None, timeout=-1):
812 812 """waits on one or more `jobs`, for up to `timeout` seconds.
813 813
814 814 Parameters
815 815 ----------
816 816
817 817 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
818 818 ints are indices to self.history
819 819 strs are msg_ids
820 820 default: wait on all outstanding messages
821 821 timeout : float
822 822 a time in seconds, after which to give up.
823 823 default is -1, which means no timeout
824 824
825 825 Returns
826 826 -------
827 827
828 828 True : when all msg_ids are done
829 829 False : timeout reached, some msg_ids still outstanding
830 830 """
831 831 tic = time.time()
832 832 if jobs is None:
833 833 theids = self.outstanding
834 834 else:
835 835 if isinstance(jobs, (int, basestring, AsyncResult)):
836 836 jobs = [jobs]
837 837 theids = set()
838 838 for job in jobs:
839 839 if isinstance(job, int):
840 840 # index access
841 841 job = self.history[job]
842 842 elif isinstance(job, AsyncResult):
843 843 map(theids.add, job.msg_ids)
844 844 continue
845 845 theids.add(job)
846 846 if not theids.intersection(self.outstanding):
847 847 return True
848 848 self.spin()
849 849 while theids.intersection(self.outstanding):
850 850 if timeout >= 0 and ( time.time()-tic ) > timeout:
851 851 break
852 852 time.sleep(1e-3)
853 853 self.spin()
854 854 return len(theids.intersection(self.outstanding)) == 0
855 855
856 856 #--------------------------------------------------------------------------
857 857 # Control methods
858 858 #--------------------------------------------------------------------------
859 859
860 860 @spin_first
861 861 def clear(self, targets=None, block=None):
862 862 """Clear the namespace in target(s)."""
863 863 block = self.block if block is None else block
864 864 targets = self._build_targets(targets)[0]
865 865 for t in targets:
866 866 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
867 867 error = False
868 868 if block:
869 869 self._flush_ignored_control()
870 870 for i in range(len(targets)):
871 871 idents,msg = self.session.recv(self._control_socket,0)
872 872 if self.debug:
873 873 pprint(msg)
874 874 if msg['content']['status'] != 'ok':
875 875 error = self._unwrap_exception(msg['content'])
876 876 else:
877 877 self._ignored_control_replies += len(targets)
878 878 if error:
879 879 raise error
880 880
881 881
882 882 @spin_first
883 883 def abort(self, jobs=None, targets=None, block=None):
884 884 """Abort specific jobs from the execution queues of target(s).
885 885
886 886 This is a mechanism to prevent jobs that have already been submitted
887 887 from executing.
888 888
889 889 Parameters
890 890 ----------
891 891
892 892 jobs : msg_id, list of msg_ids, or AsyncResult
893 893 The jobs to be aborted
894 894
895 895 If unspecified/None: abort all outstanding jobs.
896 896
897 897 """
898 898 block = self.block if block is None else block
899 899 jobs = jobs if jobs is not None else list(self.outstanding)
900 900 targets = self._build_targets(targets)[0]
901 901
902 902 msg_ids = []
903 903 if isinstance(jobs, (basestring,AsyncResult)):
904 904 jobs = [jobs]
905 905 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
906 906 if bad_ids:
907 907 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
908 908 for j in jobs:
909 909 if isinstance(j, AsyncResult):
910 910 msg_ids.extend(j.msg_ids)
911 911 else:
912 912 msg_ids.append(j)
913 913 content = dict(msg_ids=msg_ids)
914 914 for t in targets:
915 915 self.session.send(self._control_socket, 'abort_request',
916 916 content=content, ident=t)
917 917 error = False
918 918 if block:
919 919 self._flush_ignored_control()
920 920 for i in range(len(targets)):
921 921 idents,msg = self.session.recv(self._control_socket,0)
922 922 if self.debug:
923 923 pprint(msg)
924 924 if msg['content']['status'] != 'ok':
925 925 error = self._unwrap_exception(msg['content'])
926 926 else:
927 927 self._ignored_control_replies += len(targets)
928 928 if error:
929 929 raise error
930 930
931 931 @spin_first
932 932 def shutdown(self, targets=None, restart=False, hub=False, block=None):
933 933 """Terminates one or more engine processes, optionally including the hub."""
934 934 block = self.block if block is None else block
935 935 if hub:
936 936 targets = 'all'
937 937 targets = self._build_targets(targets)[0]
938 938 for t in targets:
939 939 self.session.send(self._control_socket, 'shutdown_request',
940 940 content={'restart':restart},ident=t)
941 941 error = False
942 942 if block or hub:
943 943 self._flush_ignored_control()
944 944 for i in range(len(targets)):
945 945 idents,msg = self.session.recv(self._control_socket, 0)
946 946 if self.debug:
947 947 pprint(msg)
948 948 if msg['content']['status'] != 'ok':
949 949 error = self._unwrap_exception(msg['content'])
950 950 else:
951 951 self._ignored_control_replies += len(targets)
952 952
953 953 if hub:
954 954 time.sleep(0.25)
955 955 self.session.send(self._query_socket, 'shutdown_request')
956 956 idents,msg = self.session.recv(self._query_socket, 0)
957 957 if self.debug:
958 958 pprint(msg)
959 959 if msg['content']['status'] != 'ok':
960 960 error = self._unwrap_exception(msg['content'])
961 961
962 962 if error:
963 963 raise error
964 964
965 965 #--------------------------------------------------------------------------
966 966 # Execution related methods
967 967 #--------------------------------------------------------------------------
968 968
969 969 def _maybe_raise(self, result):
970 970 """wrapper for maybe raising an exception if apply failed."""
971 971 if isinstance(result, error.RemoteError):
972 972 raise result
973 973
974 974 return result
975 975
976 976 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
977 977 ident=None):
978 978 """construct and send an apply message via a socket.
979 979
980 980 This is the principal method with which all engine execution is performed by views.
981 981 """
982 982
983 983 assert not self._closed, "cannot use me anymore, I'm closed!"
984 984 # defaults:
985 985 args = args if args is not None else []
986 986 kwargs = kwargs if kwargs is not None else {}
987 987 subheader = subheader if subheader is not None else {}
988 988
989 989 # validate arguments
990 990 if not callable(f) and not isinstance(f, Reference):
991 991 raise TypeError("f must be callable, not %s"%type(f))
992 992 if not isinstance(args, (tuple, list)):
993 993 raise TypeError("args must be tuple or list, not %s"%type(args))
994 994 if not isinstance(kwargs, dict):
995 995 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
996 996 if not isinstance(subheader, dict):
997 997 raise TypeError("subheader must be dict, not %s"%type(subheader))
998 998
999 999 bufs = util.pack_apply_message(f,args,kwargs)
1000 1000
1001 1001 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1002 1002 subheader=subheader, track=track)
1003 1003
1004 1004 msg_id = msg['header']['msg_id']
1005 1005 self.outstanding.add(msg_id)
1006 1006 if ident:
1007 1007 # possibly routed to a specific engine
1008 1008 if isinstance(ident, list):
1009 1009 ident = ident[-1]
1010 1010 if ident in self._engines.values():
1011 1011 # save for later, in case of engine death
1012 1012 self._outstanding_dict[ident].add(msg_id)
1013 1013 self.history.append(msg_id)
1014 1014 self.metadata[msg_id]['submitted'] = datetime.now()
1015 1015
1016 1016 return msg
1017 1017
1018 1018 #--------------------------------------------------------------------------
1019 1019 # construct a View object
1020 1020 #--------------------------------------------------------------------------
1021 1021
1022 1022 def load_balanced_view(self, targets=None):
1023 1023 """construct a DirectView object.
1024 1024
1025 1025 If no arguments are specified, create a LoadBalancedView
1026 1026 using all engines.
1027 1027
1028 1028 Parameters
1029 1029 ----------
1030 1030
1031 1031 targets: list,slice,int,etc. [default: use all engines]
1032 1032 The subset of engines across which to load-balance
1033 1033 """
1034 1034 if targets == 'all':
1035 1035 targets = None
1036 1036 if targets is not None:
1037 1037 targets = self._build_targets(targets)[1]
1038 1038 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1039 1039
1040 1040 def direct_view(self, targets='all'):
1041 1041 """construct a DirectView object.
1042 1042
1043 1043 If no targets are specified, create a DirectView using all engines.
1044 1044
1045 1045 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1046 1046 evaluate the target engines at each execution, whereas rc[:] will connect to
1047 1047 all *current* engines, and that list will not change.
1048 1048
1049 1049 That is, 'all' will always use all engines, whereas rc[:] will not use
1050 1050 engines added after the DirectView is constructed.
1051 1051
1052 1052 Parameters
1053 1053 ----------
1054 1054
1055 1055 targets: list,slice,int,etc. [default: use all engines]
1056 1056 The engines to use for the View
1057 1057 """
1058 1058 single = isinstance(targets, int)
1059 1059 # allow 'all' to be lazily evaluated at each execution
1060 1060 if targets != 'all':
1061 1061 targets = self._build_targets(targets)[1]
1062 1062 if single:
1063 1063 targets = targets[0]
1064 1064 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1065 1065
1066 1066 #--------------------------------------------------------------------------
1067 1067 # Query methods
1068 1068 #--------------------------------------------------------------------------
1069 1069
1070 1070 @spin_first
1071 1071 def get_result(self, indices_or_msg_ids=None, block=None):
1072 1072 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1073 1073
1074 1074 If the client already has the results, no request to the Hub will be made.
1075 1075
1076 1076 This is a convenient way to construct AsyncResult objects, which are wrappers
1077 1077 that include metadata about execution, and allow for awaiting results that
1078 1078 were not submitted by this Client.
1079 1079
1080 1080 It can also be a convenient way to retrieve the metadata associated with
1081 1081 blocking execution, since it always retrieves
1082 1082
1083 1083 Examples
1084 1084 --------
1085 1085 ::
1086 1086
1087 1087 In [10]: r = client.apply()
1088 1088
1089 1089 Parameters
1090 1090 ----------
1091 1091
1092 1092 indices_or_msg_ids : integer history index, str msg_id, or list of either
1093 1093 The indices or msg_ids of indices to be retrieved
1094 1094
1095 1095 block : bool
1096 1096 Whether to wait for the result to be done
1097 1097
1098 1098 Returns
1099 1099 -------
1100 1100
1101 1101 AsyncResult
1102 1102 A single AsyncResult object will always be returned.
1103 1103
1104 1104 AsyncHubResult
1105 1105 A subclass of AsyncResult that retrieves results from the Hub
1106 1106
1107 1107 """
1108 1108 block = self.block if block is None else block
1109 1109 if indices_or_msg_ids is None:
1110 1110 indices_or_msg_ids = -1
1111 1111
1112 1112 if not isinstance(indices_or_msg_ids, (list,tuple)):
1113 1113 indices_or_msg_ids = [indices_or_msg_ids]
1114 1114
1115 1115 theids = []
1116 1116 for id in indices_or_msg_ids:
1117 1117 if isinstance(id, int):
1118 1118 id = self.history[id]
1119 1119 if not isinstance(id, basestring):
1120 1120 raise TypeError("indices must be str or int, not %r"%id)
1121 1121 theids.append(id)
1122 1122
1123 1123 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1124 1124 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1125 1125
1126 1126 if remote_ids:
1127 1127 ar = AsyncHubResult(self, msg_ids=theids)
1128 1128 else:
1129 1129 ar = AsyncResult(self, msg_ids=theids)
1130 1130
1131 1131 if block:
1132 1132 ar.wait()
1133 1133
1134 1134 return ar
1135 1135
1136 1136 @spin_first
1137 1137 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1138 1138 """Resubmit one or more tasks.
1139 1139
1140 1140 in-flight tasks may not be resubmitted.
1141 1141
1142 1142 Parameters
1143 1143 ----------
1144 1144
1145 1145 indices_or_msg_ids : integer history index, str msg_id, or list of either
1146 1146 The indices or msg_ids of indices to be retrieved
1147 1147
1148 1148 block : bool
1149 1149 Whether to wait for the result to be done
1150 1150
1151 1151 Returns
1152 1152 -------
1153 1153
1154 1154 AsyncHubResult
1155 1155 A subclass of AsyncResult that retrieves results from the Hub
1156 1156
1157 1157 """
1158 1158 block = self.block if block is None else block
1159 1159 if indices_or_msg_ids is None:
1160 1160 indices_or_msg_ids = -1
1161 1161
1162 1162 if not isinstance(indices_or_msg_ids, (list,tuple)):
1163 1163 indices_or_msg_ids = [indices_or_msg_ids]
1164 1164
1165 1165 theids = []
1166 1166 for id in indices_or_msg_ids:
1167 1167 if isinstance(id, int):
1168 1168 id = self.history[id]
1169 1169 if not isinstance(id, basestring):
1170 1170 raise TypeError("indices must be str or int, not %r"%id)
1171 1171 theids.append(id)
1172 1172
1173 1173 for msg_id in theids:
1174 1174 self.outstanding.discard(msg_id)
1175 1175 if msg_id in self.history:
1176 1176 self.history.remove(msg_id)
1177 1177 self.results.pop(msg_id, None)
1178 1178 self.metadata.pop(msg_id, None)
1179 1179 content = dict(msg_ids = theids)
1180 1180
1181 1181 self.session.send(self._query_socket, 'resubmit_request', content)
1182 1182
1183 1183 zmq.select([self._query_socket], [], [])
1184 1184 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1185 1185 if self.debug:
1186 1186 pprint(msg)
1187 1187 content = msg['content']
1188 1188 if content['status'] != 'ok':
1189 1189 raise self._unwrap_exception(content)
1190 1190
1191 1191 ar = AsyncHubResult(self, msg_ids=theids)
1192 1192
1193 1193 if block:
1194 1194 ar.wait()
1195 1195
1196 1196 return ar
1197 1197
1198 1198 @spin_first
1199 1199 def result_status(self, msg_ids, status_only=True):
1200 1200 """Check on the status of the result(s) of the apply request with `msg_ids`.
1201 1201
1202 1202 If status_only is False, then the actual results will be retrieved, else
1203 1203 only the status of the results will be checked.
1204 1204
1205 1205 Parameters
1206 1206 ----------
1207 1207
1208 1208 msg_ids : list of msg_ids
1209 1209 if int:
1210 1210 Passed as index to self.history for convenience.
1211 1211 status_only : bool (default: True)
1212 1212 if False:
1213 1213 Retrieve the actual results of completed tasks.
1214 1214
1215 1215 Returns
1216 1216 -------
1217 1217
1218 1218 results : dict
1219 1219 There will always be the keys 'pending' and 'completed', which will
1220 1220 be lists of msg_ids that are incomplete or complete. If `status_only`
1221 1221 is False, then completed results will be keyed by their `msg_id`.
1222 1222 """
1223 1223 if not isinstance(msg_ids, (list,tuple)):
1224 1224 msg_ids = [msg_ids]
1225 1225
1226 1226 theids = []
1227 1227 for msg_id in msg_ids:
1228 1228 if isinstance(msg_id, int):
1229 1229 msg_id = self.history[msg_id]
1230 1230 if not isinstance(msg_id, basestring):
1231 1231 raise TypeError("msg_ids must be str, not %r"%msg_id)
1232 1232 theids.append(msg_id)
1233 1233
1234 1234 completed = []
1235 1235 local_results = {}
1236 1236
1237 1237 # comment this block out to temporarily disable local shortcut:
1238 1238 for msg_id in theids:
1239 1239 if msg_id in self.results:
1240 1240 completed.append(msg_id)
1241 1241 local_results[msg_id] = self.results[msg_id]
1242 1242 theids.remove(msg_id)
1243 1243
1244 1244 if theids: # some not locally cached
1245 1245 content = dict(msg_ids=theids, status_only=status_only)
1246 1246 msg = self.session.send(self._query_socket, "result_request", content=content)
1247 1247 zmq.select([self._query_socket], [], [])
1248 1248 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1249 1249 if self.debug:
1250 1250 pprint(msg)
1251 1251 content = msg['content']
1252 1252 if content['status'] != 'ok':
1253 1253 raise self._unwrap_exception(content)
1254 1254 buffers = msg['buffers']
1255 1255 else:
1256 1256 content = dict(completed=[],pending=[])
1257 1257
1258 1258 content['completed'].extend(completed)
1259 1259
1260 1260 if status_only:
1261 1261 return content
1262 1262
1263 1263 failures = []
1264 1264 # load cached results into result:
1265 1265 content.update(local_results)
1266 1266
1267 1267 # update cache with results:
1268 1268 for msg_id in sorted(theids):
1269 1269 if msg_id in content['completed']:
1270 1270 rec = content[msg_id]
1271 1271 parent = rec['header']
1272 1272 header = rec['result_header']
1273 1273 rcontent = rec['result_content']
1274 1274 iodict = rec['io']
1275 1275 if isinstance(rcontent, str):
1276 1276 rcontent = self.session.unpack(rcontent)
1277 1277
1278 1278 md = self.metadata[msg_id]
1279 1279 md.update(self._extract_metadata(header, parent, rcontent))
1280 if rec.get('received'):
1281 md['received'] = rec['received']
1280 1282 md.update(iodict)
1281 1283
1282 1284 if rcontent['status'] == 'ok':
1283 1285 res,buffers = util.unserialize_object(buffers)
1284 1286 else:
1285 1287 print rcontent
1286 1288 res = self._unwrap_exception(rcontent)
1287 1289 failures.append(res)
1288 1290
1289 1291 self.results[msg_id] = res
1290 1292 content[msg_id] = res
1291 1293
1292 1294 if len(theids) == 1 and failures:
1293 1295 raise failures[0]
1294 1296
1295 1297 error.collect_exceptions(failures, "result_status")
1296 1298 return content
1297 1299
1298 1300 @spin_first
1299 1301 def queue_status(self, targets='all', verbose=False):
1300 1302 """Fetch the status of engine queues.
1301 1303
1302 1304 Parameters
1303 1305 ----------
1304 1306
1305 1307 targets : int/str/list of ints/strs
1306 1308 the engines whose states are to be queried.
1307 1309 default : all
1308 1310 verbose : bool
1309 1311 Whether to return lengths only, or lists of ids for each element
1310 1312 """
1311 1313 if targets == 'all':
1312 1314 # allow 'all' to be evaluated on the engine
1313 1315 engine_ids = None
1314 1316 else:
1315 1317 engine_ids = self._build_targets(targets)[1]
1316 1318 content = dict(targets=engine_ids, verbose=verbose)
1317 1319 self.session.send(self._query_socket, "queue_request", content=content)
1318 1320 idents,msg = self.session.recv(self._query_socket, 0)
1319 1321 if self.debug:
1320 1322 pprint(msg)
1321 1323 content = msg['content']
1322 1324 status = content.pop('status')
1323 1325 if status != 'ok':
1324 1326 raise self._unwrap_exception(content)
1325 1327 content = rekey(content)
1326 1328 if isinstance(targets, int):
1327 1329 return content[targets]
1328 1330 else:
1329 1331 return content
1330 1332
1331 1333 @spin_first
1332 1334 def purge_results(self, jobs=[], targets=[]):
1333 1335 """Tell the Hub to forget results.
1334 1336
1335 1337 Individual results can be purged by msg_id, or the entire
1336 1338 history of specific targets can be purged.
1337 1339
1338 1340 Use `purge_results('all')` to scrub everything from the Hub's db.
1339 1341
1340 1342 Parameters
1341 1343 ----------
1342 1344
1343 1345 jobs : str or list of str or AsyncResult objects
1344 1346 the msg_ids whose results should be forgotten.
1345 1347 targets : int/str/list of ints/strs
1346 1348 The targets, by int_id, whose entire history is to be purged.
1347 1349
1348 1350 default : None
1349 1351 """
1350 1352 if not targets and not jobs:
1351 1353 raise ValueError("Must specify at least one of `targets` and `jobs`")
1352 1354 if targets:
1353 1355 targets = self._build_targets(targets)[1]
1354 1356
1355 1357 # construct msg_ids from jobs
1356 1358 if jobs == 'all':
1357 1359 msg_ids = jobs
1358 1360 else:
1359 1361 msg_ids = []
1360 1362 if isinstance(jobs, (basestring,AsyncResult)):
1361 1363 jobs = [jobs]
1362 1364 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1363 1365 if bad_ids:
1364 1366 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1365 1367 for j in jobs:
1366 1368 if isinstance(j, AsyncResult):
1367 1369 msg_ids.extend(j.msg_ids)
1368 1370 else:
1369 1371 msg_ids.append(j)
1370 1372
1371 1373 content = dict(engine_ids=targets, msg_ids=msg_ids)
1372 1374 self.session.send(self._query_socket, "purge_request", content=content)
1373 1375 idents, msg = self.session.recv(self._query_socket, 0)
1374 1376 if self.debug:
1375 1377 pprint(msg)
1376 1378 content = msg['content']
1377 1379 if content['status'] != 'ok':
1378 1380 raise self._unwrap_exception(content)
1379 1381
1380 1382 @spin_first
1381 1383 def hub_history(self):
1382 1384 """Get the Hub's history
1383 1385
1384 1386 Just like the Client, the Hub has a history, which is a list of msg_ids.
1385 1387 This will contain the history of all clients, and, depending on configuration,
1386 1388 may contain history across multiple cluster sessions.
1387 1389
1388 1390 Any msg_id returned here is a valid argument to `get_result`.
1389 1391
1390 1392 Returns
1391 1393 -------
1392 1394
1393 1395 msg_ids : list of strs
1394 1396 list of all msg_ids, ordered by task submission time.
1395 1397 """
1396 1398
1397 1399 self.session.send(self._query_socket, "history_request", content={})
1398 1400 idents, msg = self.session.recv(self._query_socket, 0)
1399 1401
1400 1402 if self.debug:
1401 1403 pprint(msg)
1402 1404 content = msg['content']
1403 1405 if content['status'] != 'ok':
1404 1406 raise self._unwrap_exception(content)
1405 1407 else:
1406 1408 return content['history']
1407 1409
1408 1410 @spin_first
1409 1411 def db_query(self, query, keys=None):
1410 1412 """Query the Hub's TaskRecord database
1411 1413
1412 1414 This will return a list of task record dicts that match `query`
1413 1415
1414 1416 Parameters
1415 1417 ----------
1416 1418
1417 1419 query : mongodb query dict
1418 1420 The search dict. See mongodb query docs for details.
1419 1421 keys : list of strs [optional]
1420 1422 The subset of keys to be returned. The default is to fetch everything but buffers.
1421 1423 'msg_id' will *always* be included.
1422 1424 """
1423 1425 if isinstance(keys, basestring):
1424 1426 keys = [keys]
1425 1427 content = dict(query=query, keys=keys)
1426 1428 self.session.send(self._query_socket, "db_request", content=content)
1427 1429 idents, msg = self.session.recv(self._query_socket, 0)
1428 1430 if self.debug:
1429 1431 pprint(msg)
1430 1432 content = msg['content']
1431 1433 if content['status'] != 'ok':
1432 1434 raise self._unwrap_exception(content)
1433 1435
1434 1436 records = content['records']
1435 1437
1436 1438 buffer_lens = content['buffer_lens']
1437 1439 result_buffer_lens = content['result_buffer_lens']
1438 1440 buffers = msg['buffers']
1439 1441 has_bufs = buffer_lens is not None
1440 1442 has_rbufs = result_buffer_lens is not None
1441 1443 for i,rec in enumerate(records):
1442 1444 # relink buffers
1443 1445 if has_bufs:
1444 1446 blen = buffer_lens[i]
1445 1447 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1446 1448 if has_rbufs:
1447 1449 blen = result_buffer_lens[i]
1448 1450 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1449 1451
1450 1452 return records
1451 1453
1452 1454 __all__ = [ 'Client' ]
@@ -1,1293 +1,1298 b''
1 1 """The IPython Controller Hub with 0MQ
2 2 This is the master object that handles connections from engines and clients,
3 3 and monitors traffic through the various queues.
4 4
5 5 Authors:
6 6
7 7 * Min RK
8 8 """
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2010-2011 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Imports
18 18 #-----------------------------------------------------------------------------
19 19 from __future__ import print_function
20 20
21 21 import sys
22 22 import time
23 23 from datetime import datetime
24 24
25 25 import zmq
26 26 from zmq.eventloop import ioloop
27 27 from zmq.eventloop.zmqstream import ZMQStream
28 28
29 29 # internal:
30 30 from IPython.utils.importstring import import_item
31 31 from IPython.utils.traitlets import (
32 32 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
33 33 )
34 34
35 35 from IPython.parallel import error, util
36 36 from IPython.parallel.factory import RegistrationFactory
37 37
38 38 from IPython.zmq.session import SessionFactory
39 39
40 40 from .heartmonitor import HeartMonitor
41 41
42 42 #-----------------------------------------------------------------------------
43 43 # Code
44 44 #-----------------------------------------------------------------------------
45 45
46 46 def _passer(*args, **kwargs):
47 47 return
48 48
49 49 def _printer(*args, **kwargs):
50 50 print (args)
51 51 print (kwargs)
52 52
53 53 def empty_record():
54 54 """Return an empty dict with all record keys."""
55 55 return {
56 56 'msg_id' : None,
57 57 'header' : None,
58 58 'content': None,
59 59 'buffers': None,
60 60 'submitted': None,
61 61 'client_uuid' : None,
62 62 'engine_uuid' : None,
63 63 'started': None,
64 64 'completed': None,
65 65 'resubmitted': None,
66 'received': None,
66 67 'result_header' : None,
67 68 'result_content' : None,
68 69 'result_buffers' : None,
69 70 'queue' : None,
70 71 'pyin' : None,
71 72 'pyout': None,
72 73 'pyerr': None,
73 74 'stdout': '',
74 75 'stderr': '',
75 76 }
76 77
77 78 def init_record(msg):
78 79 """Initialize a TaskRecord based on a request."""
79 80 header = msg['header']
80 81 return {
81 82 'msg_id' : header['msg_id'],
82 83 'header' : header,
83 84 'content': msg['content'],
84 85 'buffers': msg['buffers'],
85 86 'submitted': header['date'],
86 87 'client_uuid' : None,
87 88 'engine_uuid' : None,
88 89 'started': None,
89 90 'completed': None,
90 91 'resubmitted': None,
92 'received': None,
91 93 'result_header' : None,
92 94 'result_content' : None,
93 95 'result_buffers' : None,
94 96 'queue' : None,
95 97 'pyin' : None,
96 98 'pyout': None,
97 99 'pyerr': None,
98 100 'stdout': '',
99 101 'stderr': '',
100 102 }
101 103
102 104
103 105 class EngineConnector(HasTraits):
104 106 """A simple object for accessing the various zmq connections of an object.
105 107 Attributes are:
106 108 id (int): engine ID
107 109 uuid (str): uuid (unused?)
108 110 queue (str): identity of queue's XREQ socket
109 111 registration (str): identity of registration XREQ socket
110 112 heartbeat (str): identity of heartbeat XREQ socket
111 113 """
112 114 id=Integer(0)
113 115 queue=CBytes()
114 116 control=CBytes()
115 117 registration=CBytes()
116 118 heartbeat=CBytes()
117 119 pending=Set()
118 120
119 121 class HubFactory(RegistrationFactory):
120 122 """The Configurable for setting up a Hub."""
121 123
122 124 # port-pairs for monitoredqueues:
123 125 hb = Tuple(Integer,Integer,config=True,
124 126 help="""XREQ/SUB Port pair for Engine heartbeats""")
125 127 def _hb_default(self):
126 128 return tuple(util.select_random_ports(2))
127 129
128 130 mux = Tuple(Integer,Integer,config=True,
129 131 help="""Engine/Client Port pair for MUX queue""")
130 132
131 133 def _mux_default(self):
132 134 return tuple(util.select_random_ports(2))
133 135
134 136 task = Tuple(Integer,Integer,config=True,
135 137 help="""Engine/Client Port pair for Task queue""")
136 138 def _task_default(self):
137 139 return tuple(util.select_random_ports(2))
138 140
139 141 control = Tuple(Integer,Integer,config=True,
140 142 help="""Engine/Client Port pair for Control queue""")
141 143
142 144 def _control_default(self):
143 145 return tuple(util.select_random_ports(2))
144 146
145 147 iopub = Tuple(Integer,Integer,config=True,
146 148 help="""Engine/Client Port pair for IOPub relay""")
147 149
148 150 def _iopub_default(self):
149 151 return tuple(util.select_random_ports(2))
150 152
151 153 # single ports:
152 154 mon_port = Integer(config=True,
153 155 help="""Monitor (SUB) port for queue traffic""")
154 156
155 157 def _mon_port_default(self):
156 158 return util.select_random_ports(1)[0]
157 159
158 160 notifier_port = Integer(config=True,
159 161 help="""PUB port for sending engine status notifications""")
160 162
161 163 def _notifier_port_default(self):
162 164 return util.select_random_ports(1)[0]
163 165
164 166 engine_ip = Unicode('127.0.0.1', config=True,
165 167 help="IP on which to listen for engine connections. [default: loopback]")
166 168 engine_transport = Unicode('tcp', config=True,
167 169 help="0MQ transport for engine connections. [default: tcp]")
168 170
169 171 client_ip = Unicode('127.0.0.1', config=True,
170 172 help="IP on which to listen for client connections. [default: loopback]")
171 173 client_transport = Unicode('tcp', config=True,
172 174 help="0MQ transport for client connections. [default : tcp]")
173 175
174 176 monitor_ip = Unicode('127.0.0.1', config=True,
175 177 help="IP on which to listen for monitor messages. [default: loopback]")
176 178 monitor_transport = Unicode('tcp', config=True,
177 179 help="0MQ transport for monitor messages. [default : tcp]")
178 180
179 181 monitor_url = Unicode('')
180 182
181 183 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
182 184 config=True, help="""The class to use for the DB backend""")
183 185
184 186 # not configurable
185 187 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
186 188 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
187 189
188 190 def _ip_changed(self, name, old, new):
189 191 self.engine_ip = new
190 192 self.client_ip = new
191 193 self.monitor_ip = new
192 194 self._update_monitor_url()
193 195
194 196 def _update_monitor_url(self):
195 197 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
196 198
197 199 def _transport_changed(self, name, old, new):
198 200 self.engine_transport = new
199 201 self.client_transport = new
200 202 self.monitor_transport = new
201 203 self._update_monitor_url()
202 204
203 205 def __init__(self, **kwargs):
204 206 super(HubFactory, self).__init__(**kwargs)
205 207 self._update_monitor_url()
206 208
207 209
208 210 def construct(self):
209 211 self.init_hub()
210 212
211 213 def start(self):
212 214 self.heartmonitor.start()
213 215 self.log.info("Heartmonitor started")
214 216
215 217 def init_hub(self):
216 218 """construct"""
217 219 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
218 220 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
219 221
220 222 ctx = self.context
221 223 loop = self.loop
222 224
223 225 # Registrar socket
224 226 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
225 227 q.bind(client_iface % self.regport)
226 228 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
227 229 if self.client_ip != self.engine_ip:
228 230 q.bind(engine_iface % self.regport)
229 231 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
230 232
231 233 ### Engine connections ###
232 234
233 235 # heartbeat
234 236 hpub = ctx.socket(zmq.PUB)
235 237 hpub.bind(engine_iface % self.hb[0])
236 238 hrep = ctx.socket(zmq.ROUTER)
237 239 hrep.bind(engine_iface % self.hb[1])
238 240 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
239 241 pingstream=ZMQStream(hpub,loop),
240 242 pongstream=ZMQStream(hrep,loop)
241 243 )
242 244
243 245 ### Client connections ###
244 246 # Notifier socket
245 247 n = ZMQStream(ctx.socket(zmq.PUB), loop)
246 248 n.bind(client_iface%self.notifier_port)
247 249
248 250 ### build and launch the queues ###
249 251
250 252 # monitor socket
251 253 sub = ctx.socket(zmq.SUB)
252 254 sub.setsockopt(zmq.SUBSCRIBE, b"")
253 255 sub.bind(self.monitor_url)
254 256 sub.bind('inproc://monitor')
255 257 sub = ZMQStream(sub, loop)
256 258
257 259 # connect the db
258 260 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
259 261 # cdir = self.config.Global.cluster_dir
260 262 self.db = import_item(str(self.db_class))(session=self.session.session,
261 263 config=self.config, log=self.log)
262 264 time.sleep(.25)
263 265 try:
264 266 scheme = self.config.TaskScheduler.scheme_name
265 267 except AttributeError:
266 268 from .scheduler import TaskScheduler
267 269 scheme = TaskScheduler.scheme_name.get_default_value()
268 270 # build connection dicts
269 271 self.engine_info = {
270 272 'control' : engine_iface%self.control[1],
271 273 'mux': engine_iface%self.mux[1],
272 274 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
273 275 'task' : engine_iface%self.task[1],
274 276 'iopub' : engine_iface%self.iopub[1],
275 277 # 'monitor' : engine_iface%self.mon_port,
276 278 }
277 279
278 280 self.client_info = {
279 281 'control' : client_iface%self.control[0],
280 282 'mux': client_iface%self.mux[0],
281 283 'task' : (scheme, client_iface%self.task[0]),
282 284 'iopub' : client_iface%self.iopub[0],
283 285 'notification': client_iface%self.notifier_port
284 286 }
285 287 self.log.debug("Hub engine addrs: %s", self.engine_info)
286 288 self.log.debug("Hub client addrs: %s", self.client_info)
287 289
288 290 # resubmit stream
289 291 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
290 292 url = util.disambiguate_url(self.client_info['task'][-1])
291 293 r.setsockopt(zmq.IDENTITY, self.session.bsession)
292 294 r.connect(url)
293 295
294 296 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
295 297 query=q, notifier=n, resubmit=r, db=self.db,
296 298 engine_info=self.engine_info, client_info=self.client_info,
297 299 log=self.log)
298 300
299 301
300 302 class Hub(SessionFactory):
301 303 """The IPython Controller Hub with 0MQ connections
302 304
303 305 Parameters
304 306 ==========
305 307 loop: zmq IOLoop instance
306 308 session: Session object
307 309 <removed> context: zmq context for creating new connections (?)
308 310 queue: ZMQStream for monitoring the command queue (SUB)
309 311 query: ZMQStream for engine registration and client queries requests (XREP)
310 312 heartbeat: HeartMonitor object checking the pulse of the engines
311 313 notifier: ZMQStream for broadcasting engine registration changes (PUB)
312 314 db: connection to db for out of memory logging of commands
313 315 NotImplemented
314 316 engine_info: dict of zmq connection information for engines to connect
315 317 to the queues.
316 318 client_info: dict of zmq connection information for engines to connect
317 319 to the queues.
318 320 """
319 321 # internal data structures:
320 322 ids=Set() # engine IDs
321 323 keytable=Dict()
322 324 by_ident=Dict()
323 325 engines=Dict()
324 326 clients=Dict()
325 327 hearts=Dict()
326 328 pending=Set()
327 329 queues=Dict() # pending msg_ids keyed by engine_id
328 330 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
329 331 completed=Dict() # completed msg_ids keyed by engine_id
330 332 all_completed=Set() # completed msg_ids keyed by engine_id
331 333 dead_engines=Set() # completed msg_ids keyed by engine_id
332 334 unassigned=Set() # set of task msg_ds not yet assigned a destination
333 335 incoming_registrations=Dict()
334 336 registration_timeout=Integer()
335 337 _idcounter=Integer(0)
336 338
337 339 # objects from constructor:
338 340 query=Instance(ZMQStream)
339 341 monitor=Instance(ZMQStream)
340 342 notifier=Instance(ZMQStream)
341 343 resubmit=Instance(ZMQStream)
342 344 heartmonitor=Instance(HeartMonitor)
343 345 db=Instance(object)
344 346 client_info=Dict()
345 347 engine_info=Dict()
346 348
347 349
348 350 def __init__(self, **kwargs):
349 351 """
350 352 # universal:
351 353 loop: IOLoop for creating future connections
352 354 session: streamsession for sending serialized data
353 355 # engine:
354 356 queue: ZMQStream for monitoring queue messages
355 357 query: ZMQStream for engine+client registration and client requests
356 358 heartbeat: HeartMonitor object for tracking engines
357 359 # extra:
358 360 db: ZMQStream for db connection (NotImplemented)
359 361 engine_info: zmq address/protocol dict for engine connections
360 362 client_info: zmq address/protocol dict for client connections
361 363 """
362 364
363 365 super(Hub, self).__init__(**kwargs)
364 366 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
365 367
366 368 # validate connection dicts:
367 369 for k,v in self.client_info.iteritems():
368 370 if k == 'task':
369 371 util.validate_url_container(v[1])
370 372 else:
371 373 util.validate_url_container(v)
372 374 # util.validate_url_container(self.client_info)
373 375 util.validate_url_container(self.engine_info)
374 376
375 377 # register our callbacks
376 378 self.query.on_recv(self.dispatch_query)
377 379 self.monitor.on_recv(self.dispatch_monitor_traffic)
378 380
379 381 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
380 382 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
381 383
382 384 self.monitor_handlers = {b'in' : self.save_queue_request,
383 385 b'out': self.save_queue_result,
384 386 b'intask': self.save_task_request,
385 387 b'outtask': self.save_task_result,
386 388 b'tracktask': self.save_task_destination,
387 389 b'incontrol': _passer,
388 390 b'outcontrol': _passer,
389 391 b'iopub': self.save_iopub_message,
390 392 }
391 393
392 394 self.query_handlers = {'queue_request': self.queue_status,
393 395 'result_request': self.get_results,
394 396 'history_request': self.get_history,
395 397 'db_request': self.db_query,
396 398 'purge_request': self.purge_results,
397 399 'load_request': self.check_load,
398 400 'resubmit_request': self.resubmit_task,
399 401 'shutdown_request': self.shutdown_request,
400 402 'registration_request' : self.register_engine,
401 403 'unregistration_request' : self.unregister_engine,
402 404 'connection_request': self.connection_request,
403 405 }
404 406
405 407 # ignore resubmit replies
406 408 self.resubmit.on_recv(lambda msg: None, copy=False)
407 409
408 410 self.log.info("hub::created hub")
409 411
410 412 @property
411 413 def _next_id(self):
412 414 """gemerate a new ID.
413 415
414 416 No longer reuse old ids, just count from 0."""
415 417 newid = self._idcounter
416 418 self._idcounter += 1
417 419 return newid
418 420 # newid = 0
419 421 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
420 422 # # print newid, self.ids, self.incoming_registrations
421 423 # while newid in self.ids or newid in incoming:
422 424 # newid += 1
423 425 # return newid
424 426
425 427 #-----------------------------------------------------------------------------
426 428 # message validation
427 429 #-----------------------------------------------------------------------------
428 430
429 431 def _validate_targets(self, targets):
430 432 """turn any valid targets argument into a list of integer ids"""
431 433 if targets is None:
432 434 # default to all
433 435 return self.ids
434 436
435 437 if isinstance(targets, (int,str,unicode)):
436 438 # only one target specified
437 439 targets = [targets]
438 440 _targets = []
439 441 for t in targets:
440 442 # map raw identities to ids
441 443 if isinstance(t, (str,unicode)):
442 444 t = self.by_ident.get(t, t)
443 445 _targets.append(t)
444 446 targets = _targets
445 447 bad_targets = [ t for t in targets if t not in self.ids ]
446 448 if bad_targets:
447 449 raise IndexError("No Such Engine: %r" % bad_targets)
448 450 if not targets:
449 451 raise IndexError("No Engines Registered")
450 452 return targets
451 453
452 454 #-----------------------------------------------------------------------------
453 455 # dispatch methods (1 per stream)
454 456 #-----------------------------------------------------------------------------
455 457
456 458
457 459 def dispatch_monitor_traffic(self, msg):
458 460 """all ME and Task queue messages come through here, as well as
459 461 IOPub traffic."""
460 462 self.log.debug("monitor traffic: %r", msg[0])
461 463 switch = msg[0]
462 464 try:
463 465 idents, msg = self.session.feed_identities(msg[1:])
464 466 except ValueError:
465 467 idents=[]
466 468 if not idents:
467 469 self.log.error("Bad Monitor Message: %r", msg)
468 470 return
469 471 handler = self.monitor_handlers.get(switch, None)
470 472 if handler is not None:
471 473 handler(idents, msg)
472 474 else:
473 475 self.log.error("Invalid monitor topic: %r", switch)
474 476
475 477
476 478 def dispatch_query(self, msg):
477 479 """Route registration requests and queries from clients."""
478 480 try:
479 481 idents, msg = self.session.feed_identities(msg)
480 482 except ValueError:
481 483 idents = []
482 484 if not idents:
483 485 self.log.error("Bad Query Message: %r", msg)
484 486 return
485 487 client_id = idents[0]
486 488 try:
487 489 msg = self.session.unserialize(msg, content=True)
488 490 except Exception:
489 491 content = error.wrap_exception()
490 492 self.log.error("Bad Query Message: %r", msg, exc_info=True)
491 493 self.session.send(self.query, "hub_error", ident=client_id,
492 494 content=content)
493 495 return
494 496 # print client_id, header, parent, content
495 497 #switch on message type:
496 498 msg_type = msg['header']['msg_type']
497 499 self.log.info("client::client %r requested %r", client_id, msg_type)
498 500 handler = self.query_handlers.get(msg_type, None)
499 501 try:
500 502 assert handler is not None, "Bad Message Type: %r" % msg_type
501 503 except:
502 504 content = error.wrap_exception()
503 505 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
504 506 self.session.send(self.query, "hub_error", ident=client_id,
505 507 content=content)
506 508 return
507 509
508 510 else:
509 511 handler(idents, msg)
510 512
511 513 def dispatch_db(self, msg):
512 514 """"""
513 515 raise NotImplementedError
514 516
515 517 #---------------------------------------------------------------------------
516 518 # handler methods (1 per event)
517 519 #---------------------------------------------------------------------------
518 520
519 521 #----------------------- Heartbeat --------------------------------------
520 522
521 523 def handle_new_heart(self, heart):
522 524 """handler to attach to heartbeater.
523 525 Called when a new heart starts to beat.
524 526 Triggers completion of registration."""
525 527 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
526 528 if heart not in self.incoming_registrations:
527 529 self.log.info("heartbeat::ignoring new heart: %r", heart)
528 530 else:
529 531 self.finish_registration(heart)
530 532
531 533
532 534 def handle_heart_failure(self, heart):
533 535 """handler to attach to heartbeater.
534 536 called when a previously registered heart fails to respond to beat request.
535 537 triggers unregistration"""
536 538 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
537 539 eid = self.hearts.get(heart, None)
538 540 queue = self.engines[eid].queue
539 541 if eid is None or self.keytable[eid] in self.dead_engines:
540 542 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
541 543 else:
542 544 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
543 545
544 546 #----------------------- MUX Queue Traffic ------------------------------
545 547
546 548 def save_queue_request(self, idents, msg):
547 549 if len(idents) < 2:
548 550 self.log.error("invalid identity prefix: %r", idents)
549 551 return
550 552 queue_id, client_id = idents[:2]
551 553 try:
552 554 msg = self.session.unserialize(msg)
553 555 except Exception:
554 556 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
555 557 return
556 558
557 559 eid = self.by_ident.get(queue_id, None)
558 560 if eid is None:
559 561 self.log.error("queue::target %r not registered", queue_id)
560 562 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
561 563 return
562 564 record = init_record(msg)
563 565 msg_id = record['msg_id']
564 566 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
565 567 # Unicode in records
566 568 record['engine_uuid'] = queue_id.decode('ascii')
567 569 record['client_uuid'] = client_id.decode('ascii')
568 570 record['queue'] = 'mux'
569 571
570 572 try:
571 573 # it's posible iopub arrived first:
572 574 existing = self.db.get_record(msg_id)
573 575 for key,evalue in existing.iteritems():
574 576 rvalue = record.get(key, None)
575 577 if evalue and rvalue and evalue != rvalue:
576 578 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
577 579 elif evalue and not rvalue:
578 580 record[key] = evalue
579 581 try:
580 582 self.db.update_record(msg_id, record)
581 583 except Exception:
582 584 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
583 585 except KeyError:
584 586 try:
585 587 self.db.add_record(msg_id, record)
586 588 except Exception:
587 589 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
588 590
589 591
590 592 self.pending.add(msg_id)
591 593 self.queues[eid].append(msg_id)
592 594
593 595 def save_queue_result(self, idents, msg):
594 596 if len(idents) < 2:
595 597 self.log.error("invalid identity prefix: %r", idents)
596 598 return
597 599
598 600 client_id, queue_id = idents[:2]
599 601 try:
600 602 msg = self.session.unserialize(msg)
601 603 except Exception:
602 604 self.log.error("queue::engine %r sent invalid message to %r: %r",
603 605 queue_id, client_id, msg, exc_info=True)
604 606 return
605 607
606 608 eid = self.by_ident.get(queue_id, None)
607 609 if eid is None:
608 610 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
609 611 return
610 612
611 613 parent = msg['parent_header']
612 614 if not parent:
613 615 return
614 616 msg_id = parent['msg_id']
615 617 if msg_id in self.pending:
616 618 self.pending.remove(msg_id)
617 619 self.all_completed.add(msg_id)
618 620 self.queues[eid].remove(msg_id)
619 621 self.completed[eid].append(msg_id)
620 622 self.log.info("queue::request %r completed on %s", msg_id, eid)
621 623 elif msg_id not in self.all_completed:
622 624 # it could be a result from a dead engine that died before delivering the
623 625 # result
624 626 self.log.warn("queue:: unknown msg finished %r", msg_id)
625 627 return
626 628 # update record anyway, because the unregistration could have been premature
627 629 rheader = msg['header']
628 630 completed = rheader['date']
629 631 started = rheader.get('started', None)
630 632 result = {
631 633 'result_header' : rheader,
632 634 'result_content': msg['content'],
635 'received': datetime.now(),
633 636 'started' : started,
634 637 'completed' : completed
635 638 }
636 639
637 640 result['result_buffers'] = msg['buffers']
638 641 try:
639 642 self.db.update_record(msg_id, result)
640 643 except Exception:
641 644 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
642 645
643 646
644 647 #--------------------- Task Queue Traffic ------------------------------
645 648
646 649 def save_task_request(self, idents, msg):
647 650 """Save the submission of a task."""
648 651 client_id = idents[0]
649 652
650 653 try:
651 654 msg = self.session.unserialize(msg)
652 655 except Exception:
653 656 self.log.error("task::client %r sent invalid task message: %r",
654 657 client_id, msg, exc_info=True)
655 658 return
656 659 record = init_record(msg)
657 660
658 661 record['client_uuid'] = client_id.decode('ascii')
659 662 record['queue'] = 'task'
660 663 header = msg['header']
661 664 msg_id = header['msg_id']
662 665 self.pending.add(msg_id)
663 666 self.unassigned.add(msg_id)
664 667 try:
665 668 # it's posible iopub arrived first:
666 669 existing = self.db.get_record(msg_id)
667 670 if existing['resubmitted']:
668 671 for key in ('submitted', 'client_uuid', 'buffers'):
669 672 # don't clobber these keys on resubmit
670 673 # submitted and client_uuid should be different
671 674 # and buffers might be big, and shouldn't have changed
672 675 record.pop(key)
673 676 # still check content,header which should not change
674 677 # but are not expensive to compare as buffers
675 678
676 679 for key,evalue in existing.iteritems():
677 680 if key.endswith('buffers'):
678 681 # don't compare buffers
679 682 continue
680 683 rvalue = record.get(key, None)
681 684 if evalue and rvalue and evalue != rvalue:
682 685 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
683 686 elif evalue and not rvalue:
684 687 record[key] = evalue
685 688 try:
686 689 self.db.update_record(msg_id, record)
687 690 except Exception:
688 691 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
689 692 except KeyError:
690 693 try:
691 694 self.db.add_record(msg_id, record)
692 695 except Exception:
693 696 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
694 697 except Exception:
695 698 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
696 699
697 700 def save_task_result(self, idents, msg):
698 701 """save the result of a completed task."""
699 702 client_id = idents[0]
700 703 try:
701 704 msg = self.session.unserialize(msg)
702 705 except Exception:
703 706 self.log.error("task::invalid task result message send to %r: %r",
704 707 client_id, msg, exc_info=True)
705 708 return
706 709
707 710 parent = msg['parent_header']
708 711 if not parent:
709 712 # print msg
710 713 self.log.warn("Task %r had no parent!", msg)
711 714 return
712 715 msg_id = parent['msg_id']
713 716 if msg_id in self.unassigned:
714 717 self.unassigned.remove(msg_id)
715 718
716 719 header = msg['header']
717 720 engine_uuid = header.get('engine', None)
718 721 eid = self.by_ident.get(engine_uuid, None)
719 722
720 723 if msg_id in self.pending:
721 724 self.log.info("task::task %r finished on %s", msg_id, eid)
722 725 self.pending.remove(msg_id)
723 726 self.all_completed.add(msg_id)
724 727 if eid is not None:
725 728 self.completed[eid].append(msg_id)
726 729 if msg_id in self.tasks[eid]:
727 730 self.tasks[eid].remove(msg_id)
728 731 completed = header['date']
729 732 started = header.get('started', None)
730 733 result = {
731 734 'result_header' : header,
732 735 'result_content': msg['content'],
733 736 'started' : started,
734 737 'completed' : completed,
735 'engine_uuid': engine_uuid
738 'received' : datetime.now(),
739 'engine_uuid': engine_uuid,
736 740 }
737 741
738 742 result['result_buffers'] = msg['buffers']
739 743 try:
740 744 self.db.update_record(msg_id, result)
741 745 except Exception:
742 746 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
743 747
744 748 else:
745 749 self.log.debug("task::unknown task %r finished", msg_id)
746 750
747 751 def save_task_destination(self, idents, msg):
748 752 try:
749 753 msg = self.session.unserialize(msg, content=True)
750 754 except Exception:
751 755 self.log.error("task::invalid task tracking message", exc_info=True)
752 756 return
753 757 content = msg['content']
754 758 # print (content)
755 759 msg_id = content['msg_id']
756 760 engine_uuid = content['engine_id']
757 761 eid = self.by_ident[util.asbytes(engine_uuid)]
758 762
759 763 self.log.info("task::task %r arrived on %r", msg_id, eid)
760 764 if msg_id in self.unassigned:
761 765 self.unassigned.remove(msg_id)
762 766 # else:
763 767 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
764 768
765 769 self.tasks[eid].append(msg_id)
766 770 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
767 771 try:
768 772 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
769 773 except Exception:
770 774 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
771 775
772 776
773 777 def mia_task_request(self, idents, msg):
774 778 raise NotImplementedError
775 779 client_id = idents[0]
776 780 # content = dict(mia=self.mia,status='ok')
777 781 # self.session.send('mia_reply', content=content, idents=client_id)
778 782
779 783
780 784 #--------------------- IOPub Traffic ------------------------------
781 785
782 786 def save_iopub_message(self, topics, msg):
783 787 """save an iopub message into the db"""
784 788 # print (topics)
785 789 try:
786 790 msg = self.session.unserialize(msg, content=True)
787 791 except Exception:
788 792 self.log.error("iopub::invalid IOPub message", exc_info=True)
789 793 return
790 794
791 795 parent = msg['parent_header']
792 796 if not parent:
793 797 self.log.error("iopub::invalid IOPub message: %r", msg)
794 798 return
795 799 msg_id = parent['msg_id']
796 800 msg_type = msg['header']['msg_type']
797 801 content = msg['content']
798 802
799 803 # ensure msg_id is in db
800 804 try:
801 805 rec = self.db.get_record(msg_id)
802 806 except KeyError:
803 807 rec = empty_record()
804 808 rec['msg_id'] = msg_id
805 809 self.db.add_record(msg_id, rec)
806 810 # stream
807 811 d = {}
808 812 if msg_type == 'stream':
809 813 name = content['name']
810 814 s = rec[name] or ''
811 815 d[name] = s + content['data']
812 816
813 817 elif msg_type == 'pyerr':
814 818 d['pyerr'] = content
815 819 elif msg_type == 'pyin':
816 820 d['pyin'] = content['code']
817 821 else:
818 822 d[msg_type] = content.get('data', '')
819 823
820 824 try:
821 825 self.db.update_record(msg_id, d)
822 826 except Exception:
823 827 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
824 828
825 829
826 830
827 831 #-------------------------------------------------------------------------
828 832 # Registration requests
829 833 #-------------------------------------------------------------------------
830 834
831 835 def connection_request(self, client_id, msg):
832 836 """Reply with connection addresses for clients."""
833 837 self.log.info("client::client %r connected", client_id)
834 838 content = dict(status='ok')
835 839 content.update(self.client_info)
836 840 jsonable = {}
837 841 for k,v in self.keytable.iteritems():
838 842 if v not in self.dead_engines:
839 843 jsonable[str(k)] = v.decode('ascii')
840 844 content['engines'] = jsonable
841 845 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
842 846
843 847 def register_engine(self, reg, msg):
844 848 """Register a new engine."""
845 849 content = msg['content']
846 850 try:
847 851 queue = util.asbytes(content['queue'])
848 852 except KeyError:
849 853 self.log.error("registration::queue not specified", exc_info=True)
850 854 return
851 855 heart = content.get('heartbeat', None)
852 856 if heart:
853 857 heart = util.asbytes(heart)
854 858 """register a new engine, and create the socket(s) necessary"""
855 859 eid = self._next_id
856 860 # print (eid, queue, reg, heart)
857 861
858 862 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
859 863
860 864 content = dict(id=eid,status='ok')
861 865 content.update(self.engine_info)
862 866 # check if requesting available IDs:
863 867 if queue in self.by_ident:
864 868 try:
865 869 raise KeyError("queue_id %r in use" % queue)
866 870 except:
867 871 content = error.wrap_exception()
868 872 self.log.error("queue_id %r in use", queue, exc_info=True)
869 873 elif heart in self.hearts: # need to check unique hearts?
870 874 try:
871 875 raise KeyError("heart_id %r in use" % heart)
872 876 except:
873 877 self.log.error("heart_id %r in use", heart, exc_info=True)
874 878 content = error.wrap_exception()
875 879 else:
876 880 for h, pack in self.incoming_registrations.iteritems():
877 881 if heart == h:
878 882 try:
879 883 raise KeyError("heart_id %r in use" % heart)
880 884 except:
881 885 self.log.error("heart_id %r in use", heart, exc_info=True)
882 886 content = error.wrap_exception()
883 887 break
884 888 elif queue == pack[1]:
885 889 try:
886 890 raise KeyError("queue_id %r in use" % queue)
887 891 except:
888 892 self.log.error("queue_id %r in use", queue, exc_info=True)
889 893 content = error.wrap_exception()
890 894 break
891 895
892 896 msg = self.session.send(self.query, "registration_reply",
893 897 content=content,
894 898 ident=reg)
895 899
896 900 if content['status'] == 'ok':
897 901 if heart in self.heartmonitor.hearts:
898 902 # already beating
899 903 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
900 904 self.finish_registration(heart)
901 905 else:
902 906 purge = lambda : self._purge_stalled_registration(heart)
903 907 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
904 908 dc.start()
905 909 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
906 910 else:
907 911 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
908 912 return eid
909 913
910 914 def unregister_engine(self, ident, msg):
911 915 """Unregister an engine that explicitly requested to leave."""
912 916 try:
913 917 eid = msg['content']['id']
914 918 except:
915 919 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
916 920 return
917 921 self.log.info("registration::unregister_engine(%r)", eid)
918 922 # print (eid)
919 923 uuid = self.keytable[eid]
920 924 content=dict(id=eid, queue=uuid.decode('ascii'))
921 925 self.dead_engines.add(uuid)
922 926 # self.ids.remove(eid)
923 927 # uuid = self.keytable.pop(eid)
924 928 #
925 929 # ec = self.engines.pop(eid)
926 930 # self.hearts.pop(ec.heartbeat)
927 931 # self.by_ident.pop(ec.queue)
928 932 # self.completed.pop(eid)
929 933 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
930 934 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
931 935 dc.start()
932 936 ############## TODO: HANDLE IT ################
933 937
934 938 if self.notifier:
935 939 self.session.send(self.notifier, "unregistration_notification", content=content)
936 940
937 941 def _handle_stranded_msgs(self, eid, uuid):
938 942 """Handle messages known to be on an engine when the engine unregisters.
939 943
940 944 It is possible that this will fire prematurely - that is, an engine will
941 945 go down after completing a result, and the client will be notified
942 946 that the result failed and later receive the actual result.
943 947 """
944 948
945 949 outstanding = self.queues[eid]
946 950
947 951 for msg_id in outstanding:
948 952 self.pending.remove(msg_id)
949 953 self.all_completed.add(msg_id)
950 954 try:
951 955 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
952 956 except:
953 957 content = error.wrap_exception()
954 958 # build a fake header:
955 959 header = {}
956 960 header['engine'] = uuid
957 961 header['date'] = datetime.now()
958 962 rec = dict(result_content=content, result_header=header, result_buffers=[])
959 963 rec['completed'] = header['date']
960 964 rec['engine_uuid'] = uuid
961 965 try:
962 966 self.db.update_record(msg_id, rec)
963 967 except Exception:
964 968 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
965 969
966 970
967 971 def finish_registration(self, heart):
968 972 """Second half of engine registration, called after our HeartMonitor
969 973 has received a beat from the Engine's Heart."""
970 974 try:
971 975 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
972 976 except KeyError:
973 977 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
974 978 return
975 979 self.log.info("registration::finished registering engine %i:%r", eid, queue)
976 980 if purge is not None:
977 981 purge.stop()
978 982 control = queue
979 983 self.ids.add(eid)
980 984 self.keytable[eid] = queue
981 985 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
982 986 control=control, heartbeat=heart)
983 987 self.by_ident[queue] = eid
984 988 self.queues[eid] = list()
985 989 self.tasks[eid] = list()
986 990 self.completed[eid] = list()
987 991 self.hearts[heart] = eid
988 992 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
989 993 if self.notifier:
990 994 self.session.send(self.notifier, "registration_notification", content=content)
991 995 self.log.info("engine::Engine Connected: %i", eid)
992 996
993 997 def _purge_stalled_registration(self, heart):
994 998 if heart in self.incoming_registrations:
995 999 eid = self.incoming_registrations.pop(heart)[0]
996 1000 self.log.info("registration::purging stalled registration: %i", eid)
997 1001 else:
998 1002 pass
999 1003
1000 1004 #-------------------------------------------------------------------------
1001 1005 # Client Requests
1002 1006 #-------------------------------------------------------------------------
1003 1007
1004 1008 def shutdown_request(self, client_id, msg):
1005 1009 """handle shutdown request."""
1006 1010 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1007 1011 # also notify other clients of shutdown
1008 1012 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1009 1013 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1010 1014 dc.start()
1011 1015
1012 1016 def _shutdown(self):
1013 1017 self.log.info("hub::hub shutting down.")
1014 1018 time.sleep(0.1)
1015 1019 sys.exit(0)
1016 1020
1017 1021
1018 1022 def check_load(self, client_id, msg):
1019 1023 content = msg['content']
1020 1024 try:
1021 1025 targets = content['targets']
1022 1026 targets = self._validate_targets(targets)
1023 1027 except:
1024 1028 content = error.wrap_exception()
1025 1029 self.session.send(self.query, "hub_error",
1026 1030 content=content, ident=client_id)
1027 1031 return
1028 1032
1029 1033 content = dict(status='ok')
1030 1034 # loads = {}
1031 1035 for t in targets:
1032 1036 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1033 1037 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1034 1038
1035 1039
1036 1040 def queue_status(self, client_id, msg):
1037 1041 """Return the Queue status of one or more targets.
1038 1042 if verbose: return the msg_ids
1039 1043 else: return len of each type.
1040 1044 keys: queue (pending MUX jobs)
1041 1045 tasks (pending Task jobs)
1042 1046 completed (finished jobs from both queues)"""
1043 1047 content = msg['content']
1044 1048 targets = content['targets']
1045 1049 try:
1046 1050 targets = self._validate_targets(targets)
1047 1051 except:
1048 1052 content = error.wrap_exception()
1049 1053 self.session.send(self.query, "hub_error",
1050 1054 content=content, ident=client_id)
1051 1055 return
1052 1056 verbose = content.get('verbose', False)
1053 1057 content = dict(status='ok')
1054 1058 for t in targets:
1055 1059 queue = self.queues[t]
1056 1060 completed = self.completed[t]
1057 1061 tasks = self.tasks[t]
1058 1062 if not verbose:
1059 1063 queue = len(queue)
1060 1064 completed = len(completed)
1061 1065 tasks = len(tasks)
1062 1066 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1063 1067 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1064 1068 # print (content)
1065 1069 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1066 1070
1067 1071 def purge_results(self, client_id, msg):
1068 1072 """Purge results from memory. This method is more valuable before we move
1069 1073 to a DB based message storage mechanism."""
1070 1074 content = msg['content']
1071 1075 self.log.info("Dropping records with %s", content)
1072 1076 msg_ids = content.get('msg_ids', [])
1073 1077 reply = dict(status='ok')
1074 1078 if msg_ids == 'all':
1075 1079 try:
1076 1080 self.db.drop_matching_records(dict(completed={'$ne':None}))
1077 1081 except Exception:
1078 1082 reply = error.wrap_exception()
1079 1083 else:
1080 1084 pending = filter(lambda m: m in self.pending, msg_ids)
1081 1085 if pending:
1082 1086 try:
1083 1087 raise IndexError("msg pending: %r" % pending[0])
1084 1088 except:
1085 1089 reply = error.wrap_exception()
1086 1090 else:
1087 1091 try:
1088 1092 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1089 1093 except Exception:
1090 1094 reply = error.wrap_exception()
1091 1095
1092 1096 if reply['status'] == 'ok':
1093 1097 eids = content.get('engine_ids', [])
1094 1098 for eid in eids:
1095 1099 if eid not in self.engines:
1096 1100 try:
1097 1101 raise IndexError("No such engine: %i" % eid)
1098 1102 except:
1099 1103 reply = error.wrap_exception()
1100 1104 break
1101 1105 uid = self.engines[eid].queue
1102 1106 try:
1103 1107 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1104 1108 except Exception:
1105 1109 reply = error.wrap_exception()
1106 1110 break
1107 1111
1108 1112 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1109 1113
1110 1114 def resubmit_task(self, client_id, msg):
1111 1115 """Resubmit one or more tasks."""
1112 1116 def finish(reply):
1113 1117 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1114 1118
1115 1119 content = msg['content']
1116 1120 msg_ids = content['msg_ids']
1117 1121 reply = dict(status='ok')
1118 1122 try:
1119 1123 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1120 1124 'header', 'content', 'buffers'])
1121 1125 except Exception:
1122 1126 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1123 1127 return finish(error.wrap_exception())
1124 1128
1125 1129 # validate msg_ids
1126 1130 found_ids = [ rec['msg_id'] for rec in records ]
1127 1131 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1128 1132 if len(records) > len(msg_ids):
1129 1133 try:
1130 1134 raise RuntimeError("DB appears to be in an inconsistent state."
1131 1135 "More matching records were found than should exist")
1132 1136 except Exception:
1133 1137 return finish(error.wrap_exception())
1134 1138 elif len(records) < len(msg_ids):
1135 1139 missing = [ m for m in msg_ids if m not in found_ids ]
1136 1140 try:
1137 1141 raise KeyError("No such msg(s): %r" % missing)
1138 1142 except KeyError:
1139 1143 return finish(error.wrap_exception())
1140 1144 elif invalid_ids:
1141 1145 msg_id = invalid_ids[0]
1142 1146 try:
1143 1147 raise ValueError("Task %r appears to be inflight" % msg_id)
1144 1148 except Exception:
1145 1149 return finish(error.wrap_exception())
1146 1150
1147 1151 # clear the existing records
1148 1152 now = datetime.now()
1149 1153 rec = empty_record()
1150 1154 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1151 1155 rec['resubmitted'] = now
1152 1156 rec['queue'] = 'task'
1153 1157 rec['client_uuid'] = client_id[0]
1154 1158 try:
1155 1159 for msg_id in msg_ids:
1156 1160 self.all_completed.discard(msg_id)
1157 1161 self.db.update_record(msg_id, rec)
1158 1162 except Exception:
1159 1163 self.log.error('db::db error upating record', exc_info=True)
1160 1164 reply = error.wrap_exception()
1161 1165 else:
1162 1166 # send the messages
1163 1167 for rec in records:
1164 1168 header = rec['header']
1165 1169 # include resubmitted in header to prevent digest collision
1166 1170 header['resubmitted'] = now
1167 1171 msg = self.session.msg(header['msg_type'])
1168 1172 msg['content'] = rec['content']
1169 1173 msg['header'] = header
1170 1174 msg['header']['msg_id'] = rec['msg_id']
1171 1175 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1172 1176
1173 1177 finish(dict(status='ok'))
1174 1178
1175 1179
1176 1180 def _extract_record(self, rec):
1177 1181 """decompose a TaskRecord dict into subsection of reply for get_result"""
1178 1182 io_dict = {}
1179 for key in 'pyin pyout pyerr stdout stderr'.split():
1183 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1180 1184 io_dict[key] = rec[key]
1181 1185 content = { 'result_content': rec['result_content'],
1182 1186 'header': rec['header'],
1183 1187 'result_header' : rec['result_header'],
1188 'received' : rec['received'],
1184 1189 'io' : io_dict,
1185 1190 }
1186 1191 if rec['result_buffers']:
1187 1192 buffers = map(bytes, rec['result_buffers'])
1188 1193 else:
1189 1194 buffers = []
1190 1195
1191 1196 return content, buffers
1192 1197
1193 1198 def get_results(self, client_id, msg):
1194 1199 """Get the result of 1 or more messages."""
1195 1200 content = msg['content']
1196 1201 msg_ids = sorted(set(content['msg_ids']))
1197 1202 statusonly = content.get('status_only', False)
1198 1203 pending = []
1199 1204 completed = []
1200 1205 content = dict(status='ok')
1201 1206 content['pending'] = pending
1202 1207 content['completed'] = completed
1203 1208 buffers = []
1204 1209 if not statusonly:
1205 1210 try:
1206 1211 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1207 1212 # turn match list into dict, for faster lookup
1208 1213 records = {}
1209 1214 for rec in matches:
1210 1215 records[rec['msg_id']] = rec
1211 1216 except Exception:
1212 1217 content = error.wrap_exception()
1213 1218 self.session.send(self.query, "result_reply", content=content,
1214 1219 parent=msg, ident=client_id)
1215 1220 return
1216 1221 else:
1217 1222 records = {}
1218 1223 for msg_id in msg_ids:
1219 1224 if msg_id in self.pending:
1220 1225 pending.append(msg_id)
1221 1226 elif msg_id in self.all_completed:
1222 1227 completed.append(msg_id)
1223 1228 if not statusonly:
1224 1229 c,bufs = self._extract_record(records[msg_id])
1225 1230 content[msg_id] = c
1226 1231 buffers.extend(bufs)
1227 1232 elif msg_id in records:
1228 1233 if rec['completed']:
1229 1234 completed.append(msg_id)
1230 1235 c,bufs = self._extract_record(records[msg_id])
1231 1236 content[msg_id] = c
1232 1237 buffers.extend(bufs)
1233 1238 else:
1234 1239 pending.append(msg_id)
1235 1240 else:
1236 1241 try:
1237 1242 raise KeyError('No such message: '+msg_id)
1238 1243 except:
1239 1244 content = error.wrap_exception()
1240 1245 break
1241 1246 self.session.send(self.query, "result_reply", content=content,
1242 1247 parent=msg, ident=client_id,
1243 1248 buffers=buffers)
1244 1249
1245 1250 def get_history(self, client_id, msg):
1246 1251 """Get a list of all msg_ids in our DB records"""
1247 1252 try:
1248 1253 msg_ids = self.db.get_history()
1249 1254 except Exception as e:
1250 1255 content = error.wrap_exception()
1251 1256 else:
1252 1257 content = dict(status='ok', history=msg_ids)
1253 1258
1254 1259 self.session.send(self.query, "history_reply", content=content,
1255 1260 parent=msg, ident=client_id)
1256 1261
1257 1262 def db_query(self, client_id, msg):
1258 1263 """Perform a raw query on the task record database."""
1259 1264 content = msg['content']
1260 1265 query = content.get('query', {})
1261 1266 keys = content.get('keys', None)
1262 1267 buffers = []
1263 1268 empty = list()
1264 1269 try:
1265 1270 records = self.db.find_records(query, keys)
1266 1271 except Exception as e:
1267 1272 content = error.wrap_exception()
1268 1273 else:
1269 1274 # extract buffers from reply content:
1270 1275 if keys is not None:
1271 1276 buffer_lens = [] if 'buffers' in keys else None
1272 1277 result_buffer_lens = [] if 'result_buffers' in keys else None
1273 1278 else:
1274 1279 buffer_lens = None
1275 1280 result_buffer_lens = None
1276 1281
1277 1282 for rec in records:
1278 1283 # buffers may be None, so double check
1279 1284 b = rec.pop('buffers', empty) or empty
1280 1285 if buffer_lens is not None:
1281 1286 buffer_lens.append(len(b))
1282 1287 buffers.extend(b)
1283 1288 rb = rec.pop('result_buffers', empty) or empty
1284 1289 if result_buffer_lens is not None:
1285 1290 result_buffer_lens.append(len(rb))
1286 1291 buffers.extend(rb)
1287 1292 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1288 1293 result_buffer_lens=result_buffer_lens)
1289 1294 # self.log.debug (content)
1290 1295 self.session.send(self.query, "db_reply", content=content,
1291 1296 parent=msg, ident=client_id,
1292 1297 buffers=buffers)
1293 1298
@@ -1,408 +1,411 b''
1 1 """A TaskRecord backend using sqlite3
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 import json
15 15 import os
16 16 import cPickle as pickle
17 17 from datetime import datetime
18 18
19 19 try:
20 20 import sqlite3
21 21 except ImportError:
22 22 sqlite3 = None
23 23
24 24 from zmq.eventloop import ioloop
25 25
26 26 from IPython.utils.traitlets import Unicode, Instance, List, Dict
27 27 from .dictdb import BaseDB
28 28 from IPython.utils.jsonutil import date_default, extract_dates, squash_dates
29 29
30 30 #-----------------------------------------------------------------------------
31 31 # SQLite operators, adapters, and converters
32 32 #-----------------------------------------------------------------------------
33 33
34 34 try:
35 35 buffer
36 36 except NameError:
37 37 # py3k
38 38 buffer = memoryview
39 39
40 40 operators = {
41 41 '$lt' : "<",
42 42 '$gt' : ">",
43 43 # null is handled weird with ==,!=
44 44 '$eq' : "=",
45 45 '$ne' : "!=",
46 46 '$lte': "<=",
47 47 '$gte': ">=",
48 48 '$in' : ('=', ' OR '),
49 49 '$nin': ('!=', ' AND '),
50 50 # '$all': None,
51 51 # '$mod': None,
52 52 # '$exists' : None
53 53 }
54 54 null_operators = {
55 55 '=' : "IS NULL",
56 56 '!=' : "IS NOT NULL",
57 57 }
58 58
59 59 def _adapt_dict(d):
60 60 return json.dumps(d, default=date_default)
61 61
62 62 def _convert_dict(ds):
63 63 if ds is None:
64 64 return ds
65 65 else:
66 66 if isinstance(ds, bytes):
67 67 # If I understand the sqlite doc correctly, this will always be utf8
68 68 ds = ds.decode('utf8')
69 69 return extract_dates(json.loads(ds))
70 70
71 71 def _adapt_bufs(bufs):
72 72 # this is *horrible*
73 73 # copy buffers into single list and pickle it:
74 74 if bufs and isinstance(bufs[0], (bytes, buffer)):
75 75 return sqlite3.Binary(pickle.dumps(map(bytes, bufs),-1))
76 76 elif bufs:
77 77 return bufs
78 78 else:
79 79 return None
80 80
81 81 def _convert_bufs(bs):
82 82 if bs is None:
83 83 return []
84 84 else:
85 85 return pickle.loads(bytes(bs))
86 86
87 87 #-----------------------------------------------------------------------------
88 88 # SQLiteDB class
89 89 #-----------------------------------------------------------------------------
90 90
91 91 class SQLiteDB(BaseDB):
92 92 """SQLite3 TaskRecord backend."""
93 93
94 94 filename = Unicode('tasks.db', config=True,
95 95 help="""The filename of the sqlite task database. [default: 'tasks.db']""")
96 96 location = Unicode('', config=True,
97 97 help="""The directory containing the sqlite task database. The default
98 98 is to use the cluster_dir location.""")
99 99 table = Unicode("", config=True,
100 100 help="""The SQLite Table to use for storing tasks for this session. If unspecified,
101 101 a new table will be created with the Hub's IDENT. Specifying the table will result
102 102 in tasks from previous sessions being available via Clients' db_query and
103 103 get_result methods.""")
104 104
105 105 if sqlite3 is not None:
106 106 _db = Instance('sqlite3.Connection')
107 107 else:
108 108 _db = None
109 109 # the ordered list of column names
110 110 _keys = List(['msg_id' ,
111 111 'header' ,
112 112 'content',
113 113 'buffers',
114 114 'submitted',
115 115 'client_uuid' ,
116 116 'engine_uuid' ,
117 117 'started',
118 118 'completed',
119 119 'resubmitted',
120 'received',
120 121 'result_header' ,
121 122 'result_content' ,
122 123 'result_buffers' ,
123 124 'queue' ,
124 125 'pyin' ,
125 126 'pyout',
126 127 'pyerr',
127 128 'stdout',
128 129 'stderr',
129 130 ])
130 131 # sqlite datatypes for checking that db is current format
131 132 _types = Dict({'msg_id' : 'text' ,
132 133 'header' : 'dict text',
133 134 'content' : 'dict text',
134 135 'buffers' : 'bufs blob',
135 136 'submitted' : 'timestamp',
136 137 'client_uuid' : 'text',
137 138 'engine_uuid' : 'text',
138 139 'started' : 'timestamp',
139 140 'completed' : 'timestamp',
140 141 'resubmitted' : 'timestamp',
142 'received' : 'timestamp',
141 143 'result_header' : 'dict text',
142 144 'result_content' : 'dict text',
143 145 'result_buffers' : 'bufs blob',
144 146 'queue' : 'text',
145 147 'pyin' : 'text',
146 148 'pyout' : 'text',
147 149 'pyerr' : 'text',
148 150 'stdout' : 'text',
149 151 'stderr' : 'text',
150 152 })
151 153
152 154 def __init__(self, **kwargs):
153 155 super(SQLiteDB, self).__init__(**kwargs)
154 156 if sqlite3 is None:
155 157 raise ImportError("SQLiteDB requires sqlite3")
156 158 if not self.table:
157 159 # use session, and prefix _, since starting with # is illegal
158 160 self.table = '_'+self.session.replace('-','_')
159 161 if not self.location:
160 162 # get current profile
161 163 from IPython.core.application import BaseIPythonApplication
162 164 if BaseIPythonApplication.initialized():
163 165 app = BaseIPythonApplication.instance()
164 166 if app.profile_dir is not None:
165 167 self.location = app.profile_dir.location
166 168 else:
167 169 self.location = u'.'
168 170 else:
169 171 self.location = u'.'
170 172 self._init_db()
171 173
172 174 # register db commit as 2s periodic callback
173 175 # to prevent clogging pipes
174 176 # assumes we are being run in a zmq ioloop app
175 177 loop = ioloop.IOLoop.instance()
176 178 pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop)
177 179 pc.start()
178 180
179 181 def _defaults(self, keys=None):
180 182 """create an empty record"""
181 183 d = {}
182 184 keys = self._keys if keys is None else keys
183 185 for key in keys:
184 186 d[key] = None
185 187 return d
186 188
187 189 def _check_table(self):
188 190 """Ensure that an incorrect table doesn't exist
189 191
190 192 If a bad (old) table does exist, return False
191 193 """
192 194 cursor = self._db.execute("PRAGMA table_info(%s)"%self.table)
193 195 lines = cursor.fetchall()
194 196 if not lines:
195 197 # table does not exist
196 198 return True
197 199 types = {}
198 200 keys = []
199 201 for line in lines:
200 202 keys.append(line[1])
201 203 types[line[1]] = line[2]
202 204 if self._keys != keys:
203 205 # key mismatch
204 206 self.log.warn('keys mismatch')
205 207 return False
206 208 for key in self._keys:
207 209 if types[key] != self._types[key]:
208 210 self.log.warn(
209 211 'type mismatch: %s: %s != %s'%(key,types[key],self._types[key])
210 212 )
211 213 return False
212 214 return True
213 215
214 216 def _init_db(self):
215 217 """Connect to the database and get new session number."""
216 218 # register adapters
217 219 sqlite3.register_adapter(dict, _adapt_dict)
218 220 sqlite3.register_converter('dict', _convert_dict)
219 221 sqlite3.register_adapter(list, _adapt_bufs)
220 222 sqlite3.register_converter('bufs', _convert_bufs)
221 223 # connect to the db
222 224 dbfile = os.path.join(self.location, self.filename)
223 225 self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES,
224 226 # isolation_level = None)#,
225 227 cached_statements=64)
226 228 # print dir(self._db)
227 229 first_table = self.table
228 230 i=0
229 231 while not self._check_table():
230 232 i+=1
231 233 self.table = first_table+'_%i'%i
232 234 self.log.warn(
233 235 "Table %s exists and doesn't match db format, trying %s"%
234 236 (first_table,self.table)
235 237 )
236 238
237 239 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
238 240 (msg_id text PRIMARY KEY,
239 241 header dict text,
240 242 content dict text,
241 243 buffers bufs blob,
242 244 submitted timestamp,
243 245 client_uuid text,
244 246 engine_uuid text,
245 247 started timestamp,
246 248 completed timestamp,
247 249 resubmitted timestamp,
250 received timestamp,
248 251 result_header dict text,
249 252 result_content dict text,
250 253 result_buffers bufs blob,
251 254 queue text,
252 255 pyin text,
253 256 pyout text,
254 257 pyerr text,
255 258 stdout text,
256 259 stderr text)
257 260 """%self.table)
258 261 self._db.commit()
259 262
260 263 def _dict_to_list(self, d):
261 264 """turn a mongodb-style record dict into a list."""
262 265
263 266 return [ d[key] for key in self._keys ]
264 267
265 268 def _list_to_dict(self, line, keys=None):
266 269 """Inverse of dict_to_list"""
267 270 keys = self._keys if keys is None else keys
268 271 d = self._defaults(keys)
269 272 for key,value in zip(keys, line):
270 273 d[key] = value
271 274
272 275 return d
273 276
274 277 def _render_expression(self, check):
275 278 """Turn a mongodb-style search dict into an SQL query."""
276 279 expressions = []
277 280 args = []
278 281
279 282 skeys = set(check.keys())
280 283 skeys.difference_update(set(self._keys))
281 284 skeys.difference_update(set(['buffers', 'result_buffers']))
282 285 if skeys:
283 286 raise KeyError("Illegal testing key(s): %s"%skeys)
284 287
285 288 for name,sub_check in check.iteritems():
286 289 if isinstance(sub_check, dict):
287 290 for test,value in sub_check.iteritems():
288 291 try:
289 292 op = operators[test]
290 293 except KeyError:
291 294 raise KeyError("Unsupported operator: %r"%test)
292 295 if isinstance(op, tuple):
293 296 op, join = op
294 297
295 298 if value is None and op in null_operators:
296 299 expr = "%s %s" % (name, null_operators[op])
297 300 else:
298 301 expr = "%s %s ?"%(name, op)
299 302 if isinstance(value, (tuple,list)):
300 303 if op in null_operators and any([v is None for v in value]):
301 304 # equality tests don't work with NULL
302 305 raise ValueError("Cannot use %r test with NULL values on SQLite backend"%test)
303 306 expr = '( %s )'%( join.join([expr]*len(value)) )
304 307 args.extend(value)
305 308 else:
306 309 args.append(value)
307 310 expressions.append(expr)
308 311 else:
309 312 # it's an equality check
310 313 if sub_check is None:
311 314 expressions.append("%s IS NULL" % name)
312 315 else:
313 316 expressions.append("%s = ?"%name)
314 317 args.append(sub_check)
315 318
316 319 expr = " AND ".join(expressions)
317 320 return expr, args
318 321
319 322 def add_record(self, msg_id, rec):
320 323 """Add a new Task Record, by msg_id."""
321 324 d = self._defaults()
322 325 d.update(rec)
323 326 d['msg_id'] = msg_id
324 327 line = self._dict_to_list(d)
325 328 tups = '(%s)'%(','.join(['?']*len(line)))
326 329 self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line)
327 330 # self._db.commit()
328 331
329 332 def get_record(self, msg_id):
330 333 """Get a specific Task Record, by msg_id."""
331 334 cursor = self._db.execute("""SELECT * FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
332 335 line = cursor.fetchone()
333 336 if line is None:
334 337 raise KeyError("No such msg: %r"%msg_id)
335 338 return self._list_to_dict(line)
336 339
337 340 def update_record(self, msg_id, rec):
338 341 """Update the data in an existing record."""
339 342 query = "UPDATE %s SET "%self.table
340 343 sets = []
341 344 keys = sorted(rec.keys())
342 345 values = []
343 346 for key in keys:
344 347 sets.append('%s = ?'%key)
345 348 values.append(rec[key])
346 349 query += ', '.join(sets)
347 350 query += ' WHERE msg_id == ?'
348 351 values.append(msg_id)
349 352 self._db.execute(query, values)
350 353 # self._db.commit()
351 354
352 355 def drop_record(self, msg_id):
353 356 """Remove a record from the DB."""
354 357 self._db.execute("""DELETE FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
355 358 # self._db.commit()
356 359
357 360 def drop_matching_records(self, check):
358 361 """Remove a record from the DB."""
359 362 expr,args = self._render_expression(check)
360 363 query = "DELETE FROM %s WHERE %s"%(self.table, expr)
361 364 self._db.execute(query,args)
362 365 # self._db.commit()
363 366
364 367 def find_records(self, check, keys=None):
365 368 """Find records matching a query dict, optionally extracting subset of keys.
366 369
367 370 Returns list of matching records.
368 371
369 372 Parameters
370 373 ----------
371 374
372 375 check: dict
373 376 mongodb-style query argument
374 377 keys: list of strs [optional]
375 378 if specified, the subset of keys to extract. msg_id will *always* be
376 379 included.
377 380 """
378 381 if keys:
379 382 bad_keys = [ key for key in keys if key not in self._keys ]
380 383 if bad_keys:
381 384 raise KeyError("Bad record key(s): %s"%bad_keys)
382 385
383 386 if keys:
384 387 # ensure msg_id is present and first:
385 388 if 'msg_id' in keys:
386 389 keys.remove('msg_id')
387 390 keys.insert(0, 'msg_id')
388 391 req = ', '.join(keys)
389 392 else:
390 393 req = '*'
391 394 expr,args = self._render_expression(check)
392 395 query = """SELECT %s FROM %s WHERE %s"""%(req, self.table, expr)
393 396 cursor = self._db.execute(query, args)
394 397 matches = cursor.fetchall()
395 398 records = []
396 399 for line in matches:
397 400 rec = self._list_to_dict(line, keys)
398 401 records.append(rec)
399 402 return records
400 403
401 404 def get_history(self):
402 405 """get all msg_ids, ordered by time submitted."""
403 406 query = """SELECT msg_id FROM %s ORDER by submitted ASC"""%self.table
404 407 cursor = self._db.execute(query)
405 408 # will be a list of length 1 tuples
406 409 return [ tup[0] for tup in cursor.fetchall()]
407 410
408 411 __all__ = ['SQLiteDB'] No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now