##// END OF EJS Templates
serve local files as `files/foo`
serve local files as `files/foo`

File last commit:

r5344:293d3eed
r5826:e1508187
Show More
streamkernel.py
439 lines | 15.6 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 """
Kernel adapted from kernel.py to use ZMQ Streams
MinRK
update recently changed modules with Authors in docstring
r4018
Authors:
* Min RK
* Brian Granger
* Fernando Perez
* Evan Patterson
MinRK
prep newparallel for rebase...
r3539 """
MinRK
copyright statements
r3660 #-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 #-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
# Standard library imports.
MinRK
use print_function
r3553 from __future__ import print_function
MinRK
resort imports in a cleaner order
r3631
MinRK
prep newparallel for rebase...
r3539 import sys
import time
MinRK
resort imports in a cleaner order
r3631
from code import CommandCompiler
MinRK
add timestamps all messages; fix reply on wrong channel bug.
r3578 from datetime import datetime
MinRK
control channel progress
r3540 from pprint import pprint
MinRK
prep newparallel for rebase...
r3539
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 # System library imports.
MinRK
prep newparallel for rebase...
r3539 import zmq
from zmq.eventloop import ioloop, zmqstream
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 # Local imports.
MinRK
add Integer traitlet...
r5344 from IPython.utils.traitlets import Instance, List, Integer, Dict, Set, Unicode, CBytes
MinRK
added dependency decorator
r3546 from IPython.zmq.completer import KernelCompleter
MinRK
propagate iopub to clients
r3602
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel.error import wrap_exception
from IPython.parallel.factory import SessionFactory
MinRK
cleanup per review...
r4161 from IPython.parallel.util import serialize_object, unpack_apply_message, asbytes
MinRK
prep newparallel for rebase...
r3539
MinRK
control channel progress
r3540 def printer(*args):
MinRK
propagate iopub to clients
r3602 pprint(args, stream=sys.__stdout__)
MinRK
control channel progress
r3540
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
add '-s' for startup script in ipengine...
r3684 class _Passer(zmqstream.ZMQStream):
"""Empty class that implements `send()` that does nothing.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 Subclass ZMQStream for Session typechecking
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add '-s' for startup script in ipengine...
r3684 """
def __init__(self, *args, **kwargs):
pass
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 def send(self, *args, **kwargs):
pass
send_multipart = send
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 #-----------------------------------------------------------------------------
# Main kernel class
#-----------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539
MinRK
Refactor newparallel to use Config system...
r3604 class Kernel(SessionFactory):
MinRK
prep newparallel for rebase...
r3539
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 #---------------------------------------------------------------------------
# Kernel interface
#---------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 # kwargs:
MinRK
cleanup parallel traits...
r3988 exec_lines = List(Unicode, config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="List of lines to execute")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update parallel code for py3k...
r4155 # identities:
MinRK
add Integer traitlet...
r5344 int_id = Integer(-1)
MinRK
update parallel code for py3k...
r4155 bident = CBytes()
ident = Unicode()
def _ident_changed(self, name, old, new):
MinRK
cleanup per review...
r4161 self.bident = asbytes(new)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 control_stream = Instance(zmqstream.ZMQStream)
task_stream = Instance(zmqstream.ZMQStream)
iopub_stream = Instance(zmqstream.ZMQStream)
MinRK
organize IPython.parallel into subpackages
r3673 client = Instance('IPython.parallel.Client')
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 # internals
shell_streams = List()
compiler = Instance(CommandCompiler, (), {})
completer = Instance(KernelCompleter)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 aborted = Set()
shell_handlers = Dict()
control_handlers = Dict()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 def _set_prefix(self):
self.prefix = "engine.%s"%self.int_id
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 def _connect_completer(self):
self.completer = KernelCompleter(self.user_ns)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 def __init__(self, **kwargs):
super(Kernel, self).__init__(**kwargs)
MinRK
Refactor newparallel to use Config system...
r3604 self._set_prefix()
self._connect_completer()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 self.on_trait_change(self._set_prefix, 'id')
self.on_trait_change(self._connect_completer, 'user_ns')
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 # Build dict of handlers for message types
Bernardo B. Marques
remove all trailling spaces
r4872 for msg_type in ['execute_request', 'complete_request', 'apply_request',
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 'clear_request']:
self.shell_handlers[msg_type] = getattr(self, msg_type)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
MinRK
prep newparallel for rebase...
r3539 self.control_handlers[msg_type] = getattr(self, msg_type)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 self._initial_exec_lines()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel/error.py to zmq, improve error propagation.
r3583 def _wrap_exception(self, method=None):
MinRK
testing fixes
r3641 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
MinRK
adapt kernel/error.py to zmq, improve error propagation.
r3583 content=wrap_exception(e_info)
return content
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 def _initial_exec_lines(self):
s = _Passer()
content = dict(silent=True, user_variable=[],user_expressions=[])
for line in self.exec_lines:
MinRK
rework logging connections
r3610 self.log.debug("executing initialization: %s"%line)
MinRK
Refactor newparallel to use Config system...
r3604 content.update({'code':line})
msg = self.session.msg('execute_request', content)
self.execute_request(s, [], msg)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 #-------------------- control handlers -----------------------------
MinRK
control channel progress
r3540 def abort_queues(self):
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 for stream in self.shell_streams:
MinRK
control channel progress
r3540 if stream:
self.abort_queue(stream)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def abort_queue(self, stream):
while True:
MinRK
properly handle nothing to recv in StreamKernel.abort
r4011 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
if msg is None:
return
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
rework logging connections
r3610 self.log.info("Aborting:")
self.log.info(str(msg))
Brian E. Granger
Fixing code to assume msg_type and msg_id are top-level....
r4230 msg_type = msg['header']['msg_type']
MinRK
prep newparallel for rebase...
r3539 reply_type = msg_type.split('_')[0] + '_reply'
# reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
# self.reply_socket.send(ident,zmq.SNDMORE)
# self.reply_socket.send_json(reply_msg)
Bernardo B. Marques
remove all trailling spaces
r4872 reply_msg = self.session.send(stream, reply_type,
MinRK
fix typo in StreamKernel.abort_queue...
r4137 content={'status' : 'aborted'}, parent=msg, ident=idents)
MinRK
rework logging connections
r3610 self.log.debug(str(reply_msg))
MinRK
prep newparallel for rebase...
r3539 # We need to wait a bit for requests to come in. This can probably
# be set shorter for true asynchronous clients.
time.sleep(0.05)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def abort_request(self, stream, ident, parent):
MinRK
control channel progress
r3540 """abort a specifig msg by id"""
MinRK
prep newparallel for rebase...
r3539 msg_ids = parent['content'].get('msg_ids', None)
MinRK
control channel progress
r3540 if isinstance(msg_ids, basestring):
msg_ids = [msg_ids]
MinRK
prep newparallel for rebase...
r3539 if not msg_ids:
MinRK
control channel progress
r3540 self.abort_queues()
MinRK
prep newparallel for rebase...
r3539 for mid in msg_ids:
MinRK
control channel progress
r3540 self.aborted.add(str(mid))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 content = dict(status='ok')
Bernardo B. Marques
remove all trailling spaces
r4872 reply_msg = self.session.send(stream, 'abort_reply', content=content,
MinRK
update API after sagedays29...
r3664 parent=parent, ident=ident)
MinRK
rework logging connections
r3610 self.log.debug(str(reply_msg))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 def shutdown_request(self, stream, ident, parent):
MinRK
use wrap_exception in controller, fix clear on kernel
r3560 """kill ourself. This should really be handled in an external process"""
MinRK
Clients can now shutdown the controller.
r3580 try:
self.abort_queues()
except:
MinRK
adapt kernel/error.py to zmq, improve error propagation.
r3583 content = self._wrap_exception('shutdown')
MinRK
Clients can now shutdown the controller.
r3580 else:
content = dict(parent['content'])
content['status'] = 'ok'
MinRK
added exec_key and fixed client.shutdown
r3575 msg = self.session.send(stream, 'shutdown_reply',
content=content, parent=parent, ident=ident)
MinRK
update API after sagedays29...
r3664 self.log.debug(str(msg))
MinRK
Clients can now shutdown the controller.
r3580 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
dc.start()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def dispatch_control(self, msg):
idents,msg = self.session.feed_identities(msg, copy=False)
MinRK
added exec_key and fixed client.shutdown
r3575 try:
Brian E. Granger
Renaming unpack_message to unserialize and updating docstrings.
r4231 msg = self.session.unserialize(msg, content=True, copy=False)
MinRK
added exec_key and fixed client.shutdown
r3575 except:
MinRK
rework logging connections
r3610 self.log.error("Invalid Message", exc_info=True)
MinRK
added exec_key and fixed client.shutdown
r3575 return
MinRK
update parallel code for py3k...
r4155 else:
self.log.debug("Control received, %s", msg)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 header = msg['header']
msg_id = header['msg_id']
Brian E. Granger
Adding temp refs to msg_type to prevent nested dict gets.
r4236 msg_type = header['msg_type']
handler = self.control_handlers.get(msg_type, None)
MinRK
prep newparallel for rebase...
r3539 if handler is None:
Brian E. Granger
Adding temp refs to msg_type to prevent nested dict gets.
r4236 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg_type)
MinRK
prep newparallel for rebase...
r3539 else:
MinRK
control channel progress
r3540 handler(self.control_stream, idents, msg)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539
#-------------------- queue helpers ------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def check_dependencies(self, dependencies):
if not dependencies:
return True
if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
anyorall = dependencies[0]
dependencies = dependencies[1]
else:
anyorall = 'all'
results = self.client.get_results(dependencies,status_only=True)
if results['status'] != 'ok':
return False
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 if anyorall == 'any':
if not results['completed']:
return False
else:
if results['pending']:
return False
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 return True
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
control channel progress
r3540 def check_aborted(self, msg_id):
return msg_id in self.aborted
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 #-------------------- queue handlers -----------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 def clear_request(self, stream, idents, parent):
"""Clear our namespace."""
self.user_ns = {}
Bernardo B. Marques
remove all trailling spaces
r4872 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 content = dict(status='ok'))
MinRK
Refactor newparallel to use Config system...
r3604 self._initial_exec_lines()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def execute_request(self, stream, ident, parent):
MinRK
rework logging connections
r3610 self.log.debug('execute request %s'%parent)
MinRK
prep newparallel for rebase...
r3539 try:
code = parent[u'content'][u'code']
except:
MinRK
rework logging connections
r3610 self.log.error("Got bad msg: %s"%parent, exc_info=True)
MinRK
prep newparallel for rebase...
r3539 return
MinRK
propagate iopub to clients
r3602 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
MinRK
cleanup per review...
r4161 ident=asbytes('%s.pyin'%self.prefix))
MinRK
handle datetime objects in Session...
r4008 started = datetime.now()
MinRK
prep newparallel for rebase...
r3539 try:
comp_code = self.compiler(code, '<zmq-kernel>')
# allow for not overriding displayhook
if hasattr(sys.displayhook, 'set_parent'):
sys.displayhook.set_parent(parent)
MinRK
propagate iopub to clients
r3602 sys.stdout.set_parent(parent)
sys.stderr.set_parent(parent)
MinRK
prep newparallel for rebase...
r3539 exec comp_code in self.user_ns, self.user_ns
except:
MinRK
adapt kernel/error.py to zmq, improve error propagation.
r3583 exc_content = self._wrap_exception('execute')
MinRK
prep newparallel for rebase...
r3539 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
MinRK
propagate iopub to clients
r3602 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
MinRK
cleanup per review...
r4161 ident=asbytes('%s.pyerr'%self.prefix))
MinRK
prep newparallel for rebase...
r3539 reply_content = exc_content
else:
reply_content = {'status' : 'ok'}
Bernardo B. Marques
remove all trailling spaces
r4872
reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
MinRK
add timestamps all messages; fix reply on wrong channel bug.
r3578 ident=ident, subheader = dict(started=started))
MinRK
rework logging connections
r3610 self.log.debug(str(reply_msg))
MinRK
prep newparallel for rebase...
r3539 if reply_msg['content']['status'] == u'error':
MinRK
control channel progress
r3540 self.abort_queues()
MinRK
prep newparallel for rebase...
r3539
def complete_request(self, stream, ident, parent):
matches = {'matches' : self.complete(parent),
'status' : 'ok'}
completion_msg = self.session.send(stream, 'complete_reply',
matches, parent, ident)
# print >> sys.__stdout__, completion_msg
def complete(self, msg):
return self.completer.complete(msg.content.line, msg.content.text)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def apply_request(self, stream, ident, parent):
MinRK
support iterating through map results as they arrive
r3627 # flush previous reply, so this request won't block it
stream.flush(zmq.POLLOUT)
MinRK
prep newparallel for rebase...
r3539 try:
content = parent[u'content']
bufs = parent[u'buffers']
msg_id = parent['header']['msg_id']
MinRK
update API after sagedays29...
r3664 # bound = parent['header'].get('bound', False)
MinRK
prep newparallel for rebase...
r3539 except:
MinRK
rework logging connections
r3610 self.log.error("Got bad msg: %s"%parent, exc_info=True)
MinRK
prep newparallel for rebase...
r3539 return
# pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 # self.iopub_stream.send(pyin_msg)
# self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
MinRK
Refactor newparallel to use Config system...
r3604 sub = {'dependencies_met' : True, 'engine' : self.ident,
MinRK
handle datetime objects in Session...
r4008 'started': datetime.now()}
MinRK
prep newparallel for rebase...
r3539 try:
# allow for not overriding displayhook
if hasattr(sys.displayhook, 'set_parent'):
sys.displayhook.set_parent(parent)
MinRK
propagate iopub to clients
r3602 sys.stdout.set_parent(parent)
sys.stderr.set_parent(parent)
MinRK
prep newparallel for rebase...
r3539 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
MinRK
reflect revised apply_bound pattern
r3655 working = self.user_ns
Bernardo B. Marques
remove all trailling spaces
r4872 # suffix =
MinRK
reflect revised apply_bound pattern
r3655 prefix = "_"+str(msg_id).replace("-","")+"_"
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
MinRK
update API after sagedays29...
r3664 # if bound:
# bound_ns = Namespace(working)
# args = [bound_ns]+list(args)
MinRK
Improvements to dependency handling...
r3607 fname = getattr(f, '__name__', 'f')
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
reflect revised apply_bound pattern
r3655 fname = prefix+"f"
argname = prefix+"args"
kwargname = prefix+"kwargs"
resultname = prefix+"result"
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
reflect revised apply_bound pattern
r3655 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
MinRK
prep newparallel for rebase...
r3539 # print ns
working.update(ns)
code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
MinRK
reflect revised apply_bound pattern
r3655 try:
exec code in working,working
result = working.get(resultname)
finally:
MinRK
prep newparallel for rebase...
r3539 for key in ns.iterkeys():
MinRK
reflect revised apply_bound pattern
r3655 working.pop(key)
MinRK
update API after sagedays29...
r3664 # if bound:
# working.update(bound_ns)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 packed_result,buf = serialize_object(result)
result_buf = [packed_result]+buf
except:
MinRK
adapt kernel/error.py to zmq, improve error propagation.
r3583 exc_content = self._wrap_exception('apply')
MinRK
prep newparallel for rebase...
r3539 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
MinRK
propagate iopub to clients
r3602 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
MinRK
cleanup per review...
r4161 ident=asbytes('%s.pyerr'%self.prefix))
MinRK
prep newparallel for rebase...
r3539 reply_content = exc_content
result_buf = []
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Improvements to dependency handling...
r3607 if exc_content['ename'] == 'UnmetDependency':
MinRK
use wrap_exception in controller, fix clear on kernel
r3560 sub['dependencies_met'] = False
MinRK
prep newparallel for rebase...
r3539 else:
reply_content = {'status' : 'ok'}
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Improvements to dependency handling...
r3607 # put 'ok'/'error' status in header, for scheduler introspection:
sub['status'] = reply_content['status']
Bernardo B. Marques
remove all trailling spaces
r4872
reply_msg = self.session.send(stream, u'apply_reply', reply_content,
MinRK
added dependency decorator
r3546 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 # flush i/o
Bernardo B. Marques
remove all trailling spaces
r4872 # should this be before reply_msg is sent, like in the single-kernel code,
MinRK
update API after sagedays29...
r3664 # or should nothing get in the way of real results?
sys.stdout.flush()
sys.stderr.flush()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def dispatch_queue(self, stream, msg):
MinRK
use new stream.flush()
r3541 self.control_stream.flush()
MinRK
prep newparallel for rebase...
r3539 idents,msg = self.session.feed_identities(msg, copy=False)
MinRK
added exec_key and fixed client.shutdown
r3575 try:
Brian E. Granger
Renaming unpack_message to unserialize and updating docstrings.
r4231 msg = self.session.unserialize(msg, content=True, copy=False)
MinRK
added exec_key and fixed client.shutdown
r3575 except:
MinRK
rework logging connections
r3610 self.log.error("Invalid Message", exc_info=True)
MinRK
added exec_key and fixed client.shutdown
r3575 return
MinRK
update parallel code for py3k...
r4155 else:
self.log.debug("Message received, %s", msg)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 header = msg['header']
msg_id = header['msg_id']
Brian E. Granger
Fixing another bug in msg_type refactoring.
r4238 msg_type = msg['header']['msg_type']
MinRK
prep newparallel for rebase...
r3539 if self.check_aborted(msg_id):
MinRK
control channel progress
r3540 self.aborted.remove(msg_id)
# is it safe to assume a msg_id will not be resubmitted?
Brian E. Granger
Adding temp refs to msg_type to prevent nested dict gets.
r4236 reply_type = msg_type.split('_')[0] + '_reply'
MinRK
better handle aborted/unschedulers tasks
r3687 status = {'status' : 'aborted'}
reply_msg = self.session.send(stream, reply_type, subheader=status,
content=status, parent=msg, ident=idents)
MinRK
control channel progress
r3540 return
Brian E. Granger
Adding temp refs to msg_type to prevent nested dict gets.
r4236 handler = self.shell_handlers.get(msg_type, None)
MinRK
prep newparallel for rebase...
r3539 if handler is None:
Brian E. Granger
Adding temp refs to msg_type to prevent nested dict gets.
r4236 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg_type)
MinRK
prep newparallel for rebase...
r3539 else:
handler(stream, idents, msg)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def start(self):
#### stream mode:
if self.control_stream:
self.control_stream.on_recv(self.dispatch_control, copy=False)
MinRK
control channel progress
r3540 self.control_stream.on_err(printer)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add timestamps all messages; fix reply on wrong channel bug.
r3578 def make_dispatcher(stream):
def dispatcher(msg):
return self.dispatch_queue(stream, msg)
return dispatcher
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 for s in self.shell_streams:
MinRK
add timestamps all messages; fix reply on wrong channel bug.
r3578 s.on_recv(make_dispatcher(s), copy=False)
MinRK
Improvements to dependency handling...
r3607 s.on_err(printer)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 if self.iopub_stream:
self.iopub_stream.on_err(printer)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 #### while True mode:
# while True:
# idle = True
# try:
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 # msg = self.shell_stream.socket.recv_multipart(
MinRK
prep newparallel for rebase...
r3539 # zmq.NOBLOCK, copy=False)
# except zmq.ZMQError, e:
# if e.errno != zmq.EAGAIN:
# raise e
# else:
# idle=False
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 # self.dispatch_queue(self.shell_stream, msg)
Bernardo B. Marques
remove all trailling spaces
r4872 #
MinRK
prep newparallel for rebase...
r3539 # if not self.task_stream.empty():
# idle=False
# msg = self.task_stream.recv_multipart()
# self.dispatch_queue(self.task_stream, msg)
# if idle:
# # don't busywait
# time.sleep(1e-3)