From c5a35798c5a052670a0744778ba3f19a89b0210c 2011-06-20 23:40:09 From: MinRK Date: 2011-06-20 23:40:09 Subject: [PATCH] zmq kernels now started via newapp --- diff --git a/IPython/frontend/qt/console/ipythonqt.py b/IPython/frontend/qt/console/ipythonqt.py index 3607b9a..03034e2 100644 --- a/IPython/frontend/qt/console/ipythonqt.py +++ b/IPython/frontend/qt/console/ipythonqt.py @@ -217,9 +217,12 @@ def main(): if args.pure: kwargs['ipython']=False else: - kwargs['colors']=colors + extra = [] + if colors: + extra.append("colors=%s"%colors) if args.pylab: - kwargs['pylab']=args.pylab + extra.append("pylab=%s"%args.pylab) + kwargs['extra_arguments'] = extra kernel_manager.start_kernel(**kwargs) kernel_manager.start_channels() diff --git a/IPython/zmq/entry_point.py b/IPython/zmq/entry_point.py index 47c3b4a..ab444c8 100644 --- a/IPython/zmq/entry_point.py +++ b/IPython/zmq/entry_point.py @@ -9,159 +9,14 @@ import socket from subprocess import Popen, PIPE import sys -# System library imports. -import zmq - # Local imports. -from IPython.core.ultratb import FormattedTB -from IPython.external.argparse import ArgumentParser -from IPython.utils import io -from IPython.utils.localinterfaces import LOCALHOST -from displayhook import DisplayHook -from heartbeat import Heartbeat -from iostream import OutStream -from parentpoller import ParentPollerUnix, ParentPollerWindows -from session import Session - - -def bind_port(socket, ip, port): - """ Binds the specified ZMQ socket. If the port is zero, a random port is - chosen. Returns the port that was bound. - """ - connection = 'tcp://%s' % ip - if port <= 0: - port = socket.bind_to_random_port(connection) - else: - connection += ':%i' % port - socket.bind(connection) - return port - - -def make_argument_parser(): - """ Creates an ArgumentParser for the generic arguments supported by all - kernel entry points. - """ - parser = ArgumentParser() - parser.add_argument('--ip', type=str, default=LOCALHOST, - help='set the kernel\'s IP address [default: local]') - parser.add_argument('--xrep', type=int, metavar='PORT', default=0, - help='set the XREP channel port [default: random]') - parser.add_argument('--pub', type=int, metavar='PORT', default=0, - help='set the PUB channel port [default: random]') - parser.add_argument('--req', type=int, metavar='PORT', default=0, - help='set the REQ channel port [default: random]') - parser.add_argument('--hb', type=int, metavar='PORT', default=0, - help='set the heartbeat port [default: random]') - parser.add_argument('--no-stdout', action='store_true', - help='redirect stdout to the null device') - parser.add_argument('--no-stderr', action='store_true', - help='redirect stderr to the null device') - - if sys.platform == 'win32': - parser.add_argument('--interrupt', type=int, metavar='HANDLE', - default=0, help='interrupt this process when ' - 'HANDLE is signaled') - parser.add_argument('--parent', type=int, metavar='HANDLE', - default=0, help='kill this process if the process ' - 'with HANDLE dies') - else: - parser.add_argument('--parent', action='store_true', - help='kill this process if its parent dies') - - return parser - - -def make_kernel(namespace, kernel_factory, - out_stream_factory=None, display_hook_factory=None): - """ Creates a kernel, redirects stdout/stderr, and installs a display hook - and exception handler. - """ - # Re-direct stdout/stderr, if necessary. - if namespace.no_stdout or namespace.no_stderr: - blackhole = file(os.devnull, 'w') - if namespace.no_stdout: - sys.stdout = sys.__stdout__ = blackhole - if namespace.no_stderr: - sys.stderr = sys.__stderr__ = blackhole - - # Install minimal exception handling - sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor', - ostream=sys.__stdout__) +from parentpoller import ParentPollerWindows - # Create a context, a session, and the kernel sockets. - io.raw_print("Starting the kernel at pid:", os.getpid()) - context = zmq.Context() - # Uncomment this to try closing the context. - # atexit.register(context.close) - session = Session(username=u'kernel') - reply_socket = context.socket(zmq.XREP) - xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep) - io.raw_print("XREP Channel on port", xrep_port) - pub_socket = context.socket(zmq.PUB) - pub_port = bind_port(pub_socket, namespace.ip, namespace.pub) - io.raw_print("PUB Channel on port", pub_port) - - req_socket = context.socket(zmq.XREQ) - req_port = bind_port(req_socket, namespace.ip, namespace.req) - io.raw_print("REQ Channel on port", req_port) - - hb = Heartbeat(context, (namespace.ip, namespace.hb)) - hb.start() - hb_port = hb.port - io.raw_print("Heartbeat REP Channel on port", hb_port) - - # Helper to make it easier to connect to an existing kernel, until we have - # single-port connection negotiation fully implemented. - io.raw_print("To connect another client to this kernel, use:") - io.raw_print("-e --xreq {0} --sub {1} --rep {2} --hb {3}".format( - xrep_port, pub_port, req_port, hb_port)) - - # Redirect input streams and set a display hook. - if out_stream_factory: - sys.stdout = out_stream_factory(session, pub_socket, u'stdout') - sys.stderr = out_stream_factory(session, pub_socket, u'stderr') - if display_hook_factory: - sys.displayhook = display_hook_factory(session, pub_socket) - - # Create the kernel. - kernel = kernel_factory(session=session, reply_socket=reply_socket, - pub_socket=pub_socket, req_socket=req_socket) - kernel.record_ports(xrep_port=xrep_port, pub_port=pub_port, - req_port=req_port, hb_port=hb_port) - return kernel - - -def start_kernel(namespace, kernel): - """ Starts a kernel. - """ - # Configure this kernel process to poll the parent process, if necessary. - if sys.platform == 'win32': - if namespace.interrupt or namespace.parent: - poller = ParentPollerWindows(namespace.interrupt, namespace.parent) - poller.start() - elif namespace.parent: - poller = ParentPollerUnix() - poller.start() - - # Start the kernel mainloop. - kernel.start() - - -def make_default_main(kernel_factory): - """ Creates the simplest possible kernel entry point. - """ - def main(): - namespace = make_argument_parser().parse_args() - kernel = make_kernel(namespace, kernel_factory, OutStream, DisplayHook) - start_kernel(namespace, kernel) - return main - - -def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0, hb_port=0, - stdin=None, stdout=None, stderr=None, - executable=None, independent=False, extra_arguments=[]): +def base_launch_kernel(code, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0, + ip=None, stdin=None, stdout=None, stderr=None, + executable=None, independent=False, extra_arguments=[]): """ Launches a localhost kernel, binding to the specified ports. Parameters @@ -169,18 +24,21 @@ def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0, hb_port=0, code : str, A string of Python code that imports and executes a kernel entry point. - xrep_port : int, optional + shell_port : int, optional The port to use for XREP channel. - pub_port : int, optional + iopub_port : int, optional The port to use for the SUB channel. - req_port : int, optional + stdin_port : int, optional The port to use for the REQ (raw input) channel. hb_port : int, optional The port to use for the hearbeat REP channel. + ip : str, optional + The ip address the kernel will bind to. + stdin, stdout, stderr : optional (default None) Standards streams, as defined in subprocess.Popen. @@ -199,13 +57,13 @@ def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0, hb_port=0, Returns ------- A tuple of form: - (kernel_process, xrep_port, pub_port, req_port) + (kernel_process, shell_port, iopub_port, stdin_port, hb_port) where kernel_process is a Popen object and the ports are integers. """ # Find open ports as necessary. ports = [] - ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + \ - int(req_port <= 0) + int(hb_port <= 0) + ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \ + int(stdin_port <= 0) + int(hb_port <= 0) for i in xrange(ports_needed): sock = socket.socket() sock.bind(('', 0)) @@ -214,28 +72,31 @@ def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0, hb_port=0, port = sock.getsockname()[1] sock.close() ports[i] = port - if xrep_port <= 0: - xrep_port = ports.pop(0) - if pub_port <= 0: - pub_port = ports.pop(0) - if req_port <= 0: - req_port = ports.pop(0) + if shell_port <= 0: + shell_port = ports.pop(0) + if iopub_port <= 0: + iopub_port = ports.pop(0) + if stdin_port <= 0: + stdin_port = ports.pop(0) if hb_port <= 0: hb_port = ports.pop(0) # Build the kernel launch command. if executable is None: executable = sys.executable - arguments = [ executable, '-c', code, '--xrep', str(xrep_port), - '--pub', str(pub_port), '--req', str(req_port), - '--hb', str(hb_port) ] + arguments = [ executable, '-c', code, 'shell=%i'%shell_port, + 'iopub=%i'%iopub_port, 'stdin=%i'%stdin_port, + 'hb=%i'%hb_port + ] + if ip is not None: + arguments.append('ip=%s'%ip) arguments.extend(extra_arguments) # Spawn a kernel. if sys.platform == 'win32': # Create a Win32 event for interrupting the kernel. interrupt_event = ParentPollerWindows.create_interrupt_event() - arguments += [ '--interrupt', str(int(interrupt_event)) ] + arguments += [ 'interrupt=%i'%interrupt_event ] # If this process in running on pythonw, stdin, stdout, and stderr are # invalid. Popen will fail unless they are suitably redirected. We don't @@ -273,7 +134,7 @@ def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0, hb_port=0, handle = DuplicateHandle(pid, pid, pid, 0, True, # Inheritable by new processes. DUPLICATE_SAME_ACCESS) - proc = Popen(arguments + ['--parent', str(int(handle))], + proc = Popen(arguments + ['parent=%i'%int(handle)], stdin=_stdin, stdout=_stdout, stderr=_stderr) # Attach the interrupt event to the Popen objet so it can be used later. @@ -293,7 +154,7 @@ def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0, hb_port=0, proc = Popen(arguments, preexec_fn=lambda: os.setsid(), stdin=stdin, stdout=stdout, stderr=stderr) else: - proc = Popen(arguments + ['--parent'], + proc = Popen(arguments + ['parent=1'], stdin=stdin, stdout=stdout, stderr=stderr) - return proc, xrep_port, pub_port, req_port, hb_port + return proc, shell_port, iopub_port, stdin_port, hb_port diff --git a/IPython/zmq/ipkernel.py b/IPython/zmq/ipkernel.py index 12587a4..73dc8dc 100755 --- a/IPython/zmq/ipkernel.py +++ b/IPython/zmq/ipkernel.py @@ -27,37 +27,21 @@ import zmq # Local imports. from IPython.config.configurable import Configurable +from IPython.config.application import boolean_flag +from IPython.core.newapplication import ProfileDir from IPython.utils import io from IPython.utils.jsonutil import json_clean from IPython.lib import pylabtools -from IPython.utils.traitlets import Instance, Float -from entry_point import (base_launch_kernel, make_argument_parser, make_kernel, - start_kernel) +from IPython.utils.traitlets import ( + List, Instance, Float, Dict, Bool, Int, Unicode, CaselessStrEnum +) +from entry_point import base_launch_kernel +from kernelapp import KernelApp, kernel_flags, kernel_aliases from iostream import OutStream from session import Session, Message from zmqshell import ZMQInteractiveShell -#----------------------------------------------------------------------------- -# Globals -#----------------------------------------------------------------------------- - -# Module-level logger -logger = logging.getLogger(__name__) -# FIXME: this needs to be done more cleanly later, once we have proper -# configuration support. This is a library, so it shouldn't set a stream -# handler, see: -# http://docs.python.org/library/logging.html#configuring-logging-for-a-library -# But this lets us at least do developer debugging for now by manually turning -# it on/off. And once we have full config support, the client entry points -# will select their logging handlers, as well as passing to this library the -# logging level. - -if 0: # dbg - set to 1 to actually see the messages. - logger.addHandler(logging.StreamHandler()) - logger.setLevel(logging.DEBUG) - -# /FIXME #----------------------------------------------------------------------------- # Main kernel class @@ -71,9 +55,10 @@ class Kernel(Configurable): shell = Instance('IPython.core.interactiveshell.InteractiveShellABC') session = Instance(Session) - reply_socket = Instance('zmq.Socket') - pub_socket = Instance('zmq.Socket') - req_socket = Instance('zmq.Socket') + shell_socket = Instance('zmq.Socket') + iopub_socket = Instance('zmq.Socket') + stdin_socket = Instance('zmq.Socket') + log = Instance(logging.Logger) # Private interface @@ -100,7 +85,8 @@ class Kernel(Configurable): # This is a dict of port number that the kernel is listening on. It is set # by record_ports and used by connect_request. - _recorded_ports = None + _recorded_ports = Dict() + def __init__(self, **kwargs): @@ -111,11 +97,11 @@ class Kernel(Configurable): atexit.register(self._at_shutdown) # Initialize the InteractiveShell subclass - self.shell = ZMQInteractiveShell.instance() + self.shell = ZMQInteractiveShell.instance(config=self.config) self.shell.displayhook.session = self.session - self.shell.displayhook.pub_socket = self.pub_socket + self.shell.displayhook.pub_socket = self.iopub_socket self.shell.display_pub.session = self.session - self.shell.display_pub.pub_socket = self.pub_socket + self.shell.display_pub.pub_socket = self.iopub_socket # TMP - hack while developing self.shell._reply_content = None @@ -131,7 +117,7 @@ class Kernel(Configurable): def do_one_iteration(self): """Do one iteration of the kernel's evaluation loop. """ - ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK) + ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK) if msg is None: return @@ -143,21 +129,20 @@ class Kernel(Configurable): # Print some info about this message and leave a '--->' marker, so it's # easier to trace visually the message chain when debugging. Each # handler prints its message at the end. - # Eventually we'll move these from stdout to a logger. - logger.debug('\n*** MESSAGE TYPE:'+str(msg['msg_type'])+'***') - logger.debug(' Content: '+str(msg['content'])+'\n --->\n ') + self.log.debug('\n*** MESSAGE TYPE:'+str(msg['msg_type'])+'***') + self.log.debug(' Content: '+str(msg['content'])+'\n --->\n ') # Find and call actual handler for message handler = self.handlers.get(msg['msg_type'], None) if handler is None: - logger.error("UNKNOWN MESSAGE TYPE:" +str(msg)) + self.log.error("UNKNOWN MESSAGE TYPE:" +str(msg)) else: handler(ident, msg) # Check whether we should exit, in case the incoming message set the # exit flag on if self.shell.exit_now: - logger.debug('\nExiting IPython kernel...') + self.log.debug('\nExiting IPython kernel...') # We do a normal, clean exit, which allows any actions registered # via atexit (such as history saving) to take place. sys.exit(0) @@ -166,26 +151,27 @@ class Kernel(Configurable): def start(self): """ Start the kernel main loop. """ + poller = zmq.Poller() + poller.register(self.shell_socket, zmq.POLLIN) while True: try: - time.sleep(self._poll_interval) + # scale by extra factor of 10, because there is no + # reason for this to be anything less than ~ 0.1s + # since it is a real poller and will respond + # to events immediately + poller.poll(10*1000*self._poll_interval) self.do_one_iteration() except KeyboardInterrupt: # Ctrl-C shouldn't crash the kernel io.raw_print("KeyboardInterrupt caught in kernel") - def record_ports(self, xrep_port, pub_port, req_port, hb_port): + def record_ports(self, ports): """Record the ports that this kernel is using. The creator of the Kernel instance must call this methods if they want the :meth:`connect_request` method to return the port numbers. """ - self._recorded_ports = { - 'xrep_port' : xrep_port, - 'pub_port' : pub_port, - 'req_port' : req_port, - 'hb_port' : hb_port - } + self._recorded_ports = ports #--------------------------------------------------------------------------- # Kernel request handlers @@ -194,11 +180,11 @@ class Kernel(Configurable): def _publish_pyin(self, code, parent): """Publish the code request on the pyin stream.""" - pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent) + pyin_msg = self.session.send(self.iopub_socket, u'pyin',{u'code':code}, parent=parent) def execute_request(self, ident, parent): - status_msg = self.session.send(self.pub_socket, + status_msg = self.session.send(self.iopub_socket, u'status', {u'execution_state':u'busy'}, parent=parent @@ -209,8 +195,8 @@ class Kernel(Configurable): code = content[u'code'] silent = content[u'silent'] except: - logger.error("Got bad msg: ") - logger.error(str(Message(parent))) + self.log.error("Got bad msg: ") + self.log.error(str(Message(parent))) return shell = self.shell # we'll need this a lot here @@ -298,14 +284,14 @@ class Kernel(Configurable): time.sleep(self._execute_sleep) # Send the reply. - reply_msg = self.session.send(self.reply_socket, u'execute_reply', + reply_msg = self.session.send(self.shell_socket, u'execute_reply', reply_content, parent, ident=ident) - logger.debug(str(reply_msg)) + self.log.debug(str(reply_msg)) if reply_msg['content']['status'] == u'error': self._abort_queue() - status_msg = self.session.send(self.pub_socket, + status_msg = self.session.send(self.iopub_socket, u'status', {u'execution_state':u'idle'}, parent=parent @@ -316,17 +302,17 @@ class Kernel(Configurable): matches = {'matches' : matches, 'matched_text' : txt, 'status' : 'ok'} - completion_msg = self.session.send(self.reply_socket, 'complete_reply', + completion_msg = self.session.send(self.shell_socket, 'complete_reply', matches, parent, ident) - logger.debug(str(completion_msg)) + self.log.debug(str(completion_msg)) def object_info_request(self, ident, parent): object_info = self.shell.object_inspect(parent['content']['oname']) # Before we send this object over, we scrub it for JSON usage oinfo = json_clean(object_info) - msg = self.session.send(self.reply_socket, 'object_info_reply', + msg = self.session.send(self.shell_socket, 'object_info_reply', oinfo, parent, ident) - logger.debug(msg) + self.log.debug(msg) def history_request(self, ident, parent): # We need to pull these out, as passing **kwargs doesn't work with @@ -353,18 +339,18 @@ class Kernel(Configurable): else: hist = [] content = {'history' : list(hist)} - msg = self.session.send(self.reply_socket, 'history_reply', + msg = self.session.send(self.shell_socket, 'history_reply', content, parent, ident) - logger.debug(str(msg)) + self.log.debug(str(msg)) def connect_request(self, ident, parent): if self._recorded_ports is not None: content = self._recorded_ports.copy() else: content = {} - msg = self.session.send(self.reply_socket, 'connect_reply', + msg = self.session.send(self.shell_socket, 'connect_reply', content, parent, ident) - logger.debug(msg) + self.log.debug(msg) def shutdown_request(self, ident, parent): self.shell.exit_now = True @@ -377,19 +363,19 @@ class Kernel(Configurable): def _abort_queue(self): while True: - ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK) + ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK) if msg is None: break else: assert ident is not None, \ "Unexpected missing message part." - logger.debug("Aborting:\n"+str(Message(msg))) + self.log.debug("Aborting:\n"+str(Message(msg))) msg_type = msg['msg_type'] reply_type = msg_type.split('_')[0] + '_reply' - reply_msg = self.session.send(self.reply_socket, reply_type, + reply_msg = self.session.send(self.shell_socket, reply_type, {'status' : 'aborted'}, msg, ident=ident) - logger.debug(reply_msg) + self.log.debug(reply_msg) # We need to wait a bit for requests to come in. This can probably # be set shorter for true asynchronous clients. time.sleep(0.1) @@ -401,15 +387,15 @@ class Kernel(Configurable): # Send the input request. content = dict(prompt=prompt) - msg = self.session.send(self.req_socket, u'input_request', content, parent) + msg = self.session.send(self.stdin_socket, u'input_request', content, parent) # Await a response. - ident, reply = self.session.recv(self.req_socket, 0) + ident, reply = self.session.recv(self.stdin_socket, 0) try: value = reply['content']['value'] except: - logger.error("Got bad raw_input reply: ") - logger.error(str(Message(parent))) + self.log.error("Got bad raw_input reply: ") + self.log.error(str(Message(parent))) value = '' return value @@ -461,9 +447,9 @@ class Kernel(Configurable): """ # io.rprint("Kernel at_shutdown") # dbg if self._shutdown_message is not None: - self.session.send(self.reply_socket, self._shutdown_message) - self.session.send(self.pub_socket, self._shutdown_message) - logger.debug(str(self._shutdown_message)) + self.session.send(self.shell_socket, self._shutdown_message) + self.session.send(self.iopub_socket, self._shutdown_message) + self.log.debug(str(self._shutdown_message)) # A very short sleep to give zmq time to flush its message buffers # before Python truly shuts down. time.sleep(0.01) @@ -569,120 +555,191 @@ class GTKKernel(Kernel): #----------------------------------------------------------------------------- -# Kernel main and launch functions +# Aliases and Flags for the IPKernelApp #----------------------------------------------------------------------------- -def launch_kernel(ip=None, xrep_port=0, pub_port=0, req_port=0, hb_port=0, - stdin=None, stdout=None, stderr=None, - executable=None, independent=False, pylab=False, colors=None): - """Launches a localhost kernel, binding to the specified ports. +flags = dict(kernel_flags) + +addflag = lambda *args: flags.update(boolean_flag(*args)) +addflag('automagic', 'InteractiveShell.automagic', + """Turn on the auto calling of magic commands. Type %%magic at the + IPython prompt for more information.""", + 'Turn off the auto calling of magic commands.' +) +addflag('banner', 'InteractiveShell.display_banner', + "Display a banner upon starting IPython.", + "Don't display a banner upon starting IPython." +) +addflag('pdb', 'InteractiveShell.pdb', + "Enable auto calling the pdb debugger after every exception.", + "Disable auto calling the pdb debugger after every exception." +) +addflag('pprint', 'PlainTextFormatter.pprint', + "Enable auto pretty printing of results.", + "Disable auto auto pretty printing of results." +) +addflag('color-info', 'InteractiveShell.color_info', + """IPython can display information about objects via a set of func- + tions, and optionally can use colors for this, syntax highlighting + source code and various other elements. However, because this + information is passed through a pager (like 'less') and many pagers get + confused with color codes, this option is off by default. You can test + it and turn it on permanently in your ipython_config.py file if it + works for you. Test it and turn it on permanently if it works with + your system. The magic function %%color_info allows you to toggle this + inter- actively for testing.""", + "Disable using colors for info related things." +) +addflag('deep-reload', 'InteractiveShell.deep_reload', + """Enable deep (recursive) reloading by default. IPython can use the + deep_reload module which reloads changes in modules recursively (it + replaces the reload() function, so you don't need to change anything to + use it). deep_reload() forces a full reload of modules whose code may + have changed, which the default reload() function does not. When + deep_reload is off, IPython will use the normal reload(), but + deep_reload will still be available as dreload(). This fea- ture is off + by default [which means that you have both normal reload() and + dreload()].""", + "Disable deep (recursive) reloading by default." +) +addflag('readline', 'InteractiveShell.readline_use', + "Enable readline for command line usage.", + "Disable readline for command line usage." +) + +flags['pylab'] = ( + {'IPKernelApp' : {'pylab' : 'auto'}}, + """Pre-load matplotlib and numpy for interactive use with + the default matplotlib backend.""" +) + +aliases = dict(kernel_aliases) + +# it's possible we don't want short aliases for *all* of these: +aliases.update(dict( + autocall='InteractiveShell.autocall', + cache_size='InteractiveShell.cache_size', + colors='InteractiveShell.colors', + logfile='InteractiveShell.logfile', + log_append='InteractiveShell.logappend', + pi1='InteractiveShell.prompt_in1', + pi2='InteractiveShell.prompt_in2', + po='InteractiveShell.prompt_out', + si='InteractiveShell.separate_in', + so='InteractiveShell.separate_out', + so2='InteractiveShell.separate_out2', + xmode='InteractiveShell.xmode', + c='IPKernelApp.code_to_run', + ext='IPKernelApp.extra_extension', + pylab='IPKernelApp.pylab', +)) - Parameters - ---------- - ip : str, optional - The ip address the kernel will bind to. - - xrep_port : int, optional - The port to use for XREP channel. +#----------------------------------------------------------------------------- +# The IPKernelApp class +#----------------------------------------------------------------------------- - pub_port : int, optional - The port to use for the SUB channel. +class IPKernelApp(KernelApp): + name = 'ipkernel' + + aliases = Dict(aliases) + flags = Dict(flags) + classes = [Kernel, ZMQInteractiveShell, ProfileDir] + # configurables + pylab = CaselessStrEnum(['tk', 'qt', 'wx', 'gtk', 'osx', 'inline', 'auto'], + config=True, + help="""Pre-load matplotlib and numpy for interactive use, + selecting a particular matplotlib backend and loop integration. + """ + ) + extensions = List(Unicode, config=True, + help="A list of dotted module names of IPython extensions to load." + ) + extra_extension = Unicode('', config=True, + help="dotted module name of an IPython extension to load." + ) + def _extra_extension_changed(self, name, old, new): + if new: + # add to self.extensions + self.extensions.append(new) + + exec_files = List(Unicode, config=True, + help="""List of files to run at IPython startup.""" + ) + file_to_run = Unicode('', config=True, + help="""A file to be run""") + def _file_to_run_changed(self, name, old, new): + self.exec_files.append(new) + + exec_lines = List(Unicode, config=True, + help="""lines of code to run at IPython startup.""" + ) + code_to_run = Unicode('', config=True, + help="Execute the given command string." + ) + def _code_to_run_changed(self, name, old, new): + self.exec_lines.append(new) + + def init_kernel(self): + kernel_factory = Kernel + + kernel_map = { + 'qt' : QtKernel, + 'qt4': QtKernel, + 'inline': Kernel, + 'osx': TkKernel, + 'wx' : WxKernel, + 'tk' : TkKernel, + 'gtk': GTKKernel, + } - req_port : int, optional - The port to use for the REQ (raw input) channel. + if self.pylab: + key = None if self.pylab == 'auto' else self.pylab + gui, backend = pylabtools.find_gui_and_backend(key) + kernel_factory = kernel_map.get(gui) + if kernel_factory is None: + raise ValueError('GUI is not supported: %r' % gui) + pylabtools.activate_matplotlib(backend) + + kernel = kernel_factory(config=self.config, session=self.session, + shell_socket=self.shell_socket, + iopub_socket=self.iopub_socket, + stdin_socket=self.stdin_socket, + log=self.log + ) + self.kernel = kernel + kernel.record_ports(self.ports) - hb_port : int, optional - The port to use for the hearbeat REP channel. + if self.pylab: + pylabtools.import_pylab(kernel.shell.user_ns, backend, + shell=kernel.shell) - stdin, stdout, stderr : optional (default None) - Standards streams, as defined in subprocess.Popen. - executable : str, optional (default sys.executable) - The Python executable to use for the kernel process. - independent : bool, optional (default False) - If set, the kernel process is guaranteed to survive if this process - dies. If not set, an effort is made to ensure that the kernel is killed - when this process dies. Note that in this case it is still good practice - to kill kernels manually before exiting. +#----------------------------------------------------------------------------- +# Kernel main and launch functions +#----------------------------------------------------------------------------- - pylab : bool or string, optional (default False) - If not False, the kernel will be launched with pylab enabled. If a - string is passed, matplotlib will use the specified backend. Otherwise, - matplotlib's default backend will be used. +def launch_kernel(*args, **kwargs): + """Launches a localhost IPython kernel, binding to the specified ports. - colors : None or string, optional (default None) - If not None, specify the color scheme. One of (NoColor, LightBG, Linux) + This function simply calls entry_point.base_launch_kernel with the right first + command to start an ipkernel. See base_launch_kernel for arguments. Returns ------- A tuple of form: - (kernel_process, xrep_port, pub_port, req_port) + (kernel_process, shell_port, iopub_port, stdin_port, hb_port) where kernel_process is a Popen object and the ports are integers. """ - extra_arguments = [] - if pylab: - extra_arguments.append('--pylab') - if isinstance(pylab, basestring): - extra_arguments.append(pylab) - if ip is not None: - extra_arguments.append('--ip') - if isinstance(ip, basestring): - extra_arguments.append(ip) - if colors is not None: - extra_arguments.append('--colors') - extra_arguments.append(colors) return base_launch_kernel('from IPython.zmq.ipkernel import main; main()', - xrep_port, pub_port, req_port, hb_port, - stdin, stdout, stderr, - executable, independent, extra_arguments) + *args, **kwargs) def main(): - """ The IPython kernel main entry point. - """ - parser = make_argument_parser() - parser.add_argument('--pylab', type=str, metavar='GUI', nargs='?', - const='auto', help = \ -"Pre-load matplotlib and numpy for interactive use. If GUI is not \ -given, the GUI backend is matplotlib's, otherwise use one of: \ -['tk', 'gtk', 'qt', 'wx', 'osx', 'inline'].") - parser.add_argument('--colors', - type=str, dest='colors', - help="Set the color scheme (NoColor, Linux, and LightBG).", - metavar='ZMQInteractiveShell.colors') - namespace = parser.parse_args() - - kernel_class = Kernel - - kernel_classes = { - 'qt' : QtKernel, - 'qt4': QtKernel, - 'inline': Kernel, - 'osx': TkKernel, - 'wx' : WxKernel, - 'tk' : TkKernel, - 'gtk': GTKKernel, - } - if namespace.pylab: - if namespace.pylab == 'auto': - gui, backend = pylabtools.find_gui_and_backend() - else: - gui, backend = pylabtools.find_gui_and_backend(namespace.pylab) - kernel_class = kernel_classes.get(gui) - if kernel_class is None: - raise ValueError('GUI is not supported: %r' % gui) - pylabtools.activate_matplotlib(backend) - if namespace.colors: - ZMQInteractiveShell.colors=namespace.colors - - kernel = make_kernel(namespace, kernel_class, OutStream) - - if namespace.pylab: - pylabtools.import_pylab(kernel.shell.user_ns, backend, - shell=kernel.shell) - - start_kernel(namespace, kernel) + """Run a PyKernel as an application""" + app = IPKernelApp() + app.initialize() + app.start() if __name__ == '__main__': diff --git a/IPython/zmq/kernelapp.py b/IPython/zmq/kernelapp.py new file mode 100644 index 0000000..8e21a2b --- /dev/null +++ b/IPython/zmq/kernelapp.py @@ -0,0 +1,213 @@ +#!/usr/bin/env python +"""An Application for launching a kernel + +Authors +------- +* MinRK +""" +#----------------------------------------------------------------------------- +# Copyright (C) 2011 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING.txt, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +# Standard library imports. +import os +import sys + +# System library imports. +import zmq + +# IPython imports. +from IPython.core.ultratb import FormattedTB +from IPython.core.newapplication import ( + BaseIPythonApplication, base_flags, base_aliases +) +from IPython.utils import io +from IPython.utils.localinterfaces import LOCALHOST +from IPython.utils.traitlets import Any, Instance, Dict, Unicode, Int, Bool +from IPython.utils.importstring import import_item +# local imports +from IPython.zmq.heartbeat import Heartbeat +from IPython.zmq.parentpoller import ParentPollerUnix, ParentPollerWindows +from IPython.zmq.session import Session + + +#----------------------------------------------------------------------------- +# Flags and Aliases +#----------------------------------------------------------------------------- + +kernel_aliases = dict(base_aliases) +kernel_aliases.update({ + 'ip' : 'KernelApp.ip', + 'hb' : 'KernelApp.hb_port', + 'shell' : 'KernelApp.shell_port', + 'iopub' : 'KernelApp.iopub_port', + 'stdin' : 'KernelApp.stdin_port', + 'parent': 'KernelApp.parent', +}) +if sys.platform.startswith('win'): + kernel_aliases['interrupt'] = 'KernelApp.interrupt' + +kernel_flags = dict(base_flags) +kernel_flags.update({ + 'no-stdout' : ( + {'KernelApp' : {'no_stdout' : True}}, + "redirect stdout to the null device"), + 'no-stderr' : ( + {'KernelApp' : {'no_stderr' : True}}, + "redirect stderr to the null device"), +}) + + +#----------------------------------------------------------------------------- +# Application class for starting a Kernel +#----------------------------------------------------------------------------- + +class KernelApp(BaseIPythonApplication): + name='pykernel' + aliases = Dict(kernel_aliases) + flags = Dict(kernel_flags) + + # the kernel class, as an importstring + kernel_class = Unicode('IPython.zmq.pykernel.Kernel') + kernel = Any() + poller = Any() # don't restrict this even though current pollers are all Threads + heartbeat = Instance(Heartbeat) + session = Instance('IPython.zmq.session.Session') + ports = Dict() + + # connection info: + ip = Unicode(LOCALHOST, config=True, + help="Set the IP or interface on which the kernel will listen.") + hb_port = Int(0, config=True, help="set the heartbeat port [default: random]") + shell_port = Int(0, config=True, help="set the shell (XREP) port [default: random]") + iopub_port = Int(0, config=True, help="set the iopub (PUB) port [default: random]") + stdin_port = Int(0, config=True, help="set the stdin (XREQ) port [default: random]") + + # streams, etc. + no_stdout = Bool(False, config=True, help="redirect stdout to the null device") + no_stderr = Bool(False, config=True, help="redirect stderr to the null device") + outstream_class = Unicode('IPython.zmq.iostream.OutStream', config=True, + help="The importstring for the OutStream factory") + displayhook_class = Unicode('IPython.zmq.displayhook.DisplayHook', config=True, + help="The importstring for the DisplayHook factory") + + # polling + parent = Int(0, config=True, + help="""kill this process if its parent dies. On Windows, the argument + specifies the HANDLE of the parent process, otherwise it is simply boolean. + """) + interrupt = Int(0, config=True, + help="""ONLY USED ON WINDOWS + Interrupt this process when the parent is signalled. + """) + + def init_crash_handler(self): + # Install minimal exception handling + sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor', + ostream=sys.__stdout__) + + def init_poller(self): + if sys.platform == 'win32': + if self.interrupt or self.parent: + self.poller = ParentPollerWindows(self.interrupt, self.parent) + elif self.parent: + self.poller = ParentPollerUnix() + + def _bind_socket(self, s, port): + iface = 'tcp://%s' % self.ip + if port <= 0: + port = s.bind_to_random_port(iface) + else: + s.bind(iface + ':%i'%port) + return port + + def init_sockets(self): + # Create a context, a session, and the kernel sockets. + io.raw_print("Starting the kernel at pid:", os.getpid()) + context = zmq.Context.instance() + # Uncomment this to try closing the context. + # atexit.register(context.term) + + self.shell_socket = context.socket(zmq.XREP) + self.shell_port = self._bind_socket(self.shell_socket, self.shell_port) + self.log.debug("shell XREP Channel on port: %i"%self.shell_port) + + self.iopub_socket = context.socket(zmq.PUB) + self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port) + self.log.debug("iopub PUB Channel on port: %i"%self.iopub_port) + + self.stdin_socket = context.socket(zmq.XREQ) + self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port) + self.log.debug("stdin XREQ Channel on port: %i"%self.stdin_port) + + self.heartbeat = Heartbeat(context, (self.ip, self.hb_port)) + self.hb_port = self.heartbeat.port + self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port) + + # Helper to make it easier to connect to an existing kernel, until we have + # single-port connection negotiation fully implemented. + self.log.info("To connect another client to this kernel, use:") + self.log.info("--external shell={0} iopub={1} stdin={2} hb={3}".format( + self.shell_port, self.iopub_port, self.stdin_port, self.hb_port)) + + + self.ports = dict(shell=self.shell_port, iopub=self.iopub_port, + stdin=self.stdin_port, hb=self.hb_port) + + def init_session(self): + """create our session object""" + self.session = Session(username=u'kernel') + + def init_io(self): + """redirects stdout/stderr, and installs a display hook""" + # Re-direct stdout/stderr, if necessary. + if self.no_stdout or self.no_stderr: + blackhole = file(os.devnull, 'w') + if self.no_stdout: + sys.stdout = sys.__stdout__ = blackhole + if self.no_stderr: + sys.stderr = sys.__stderr__ = blackhole + + # Redirect input streams and set a display hook. + + if self.outstream_class: + outstream_factory = import_item(str(self.outstream_class)) + sys.stdout = outstream_factory(self.session, self.iopub_socket, u'stdout') + sys.stderr = outstream_factory(self.session, self.iopub_socket, u'stderr') + if self.displayhook_class: + displayhook_factory = import_item(str(self.displayhook_class)) + sys.displayhook = displayhook_factory(self.session, self.iopub_socket) + + def init_kernel(self): + """Create the Kernel object itself""" + kernel_factory = import_item(str(self.kernel_class)) + self.kernel = kernel_factory(config=self.config, session=self.session, + shell_socket=self.shell_socket, + iopub_socket=self.iopub_socket, + stdin_socket=self.stdin_socket, + ) + self.kernel.record_ports(self.ports) + + def initialize(self, argv=None): + super(KernelApp, self).initialize(argv) + self.init_session() + self.init_poller() + self.init_sockets() + self.init_io() + self.init_kernel() + + def start(self): + self.heartbeat.start() + if self.poller is not None: + self.poller.start() + try: + self.kernel.start() + except KeyboardInterrupt: + pass diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index 4663aee..40801f8 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -782,8 +782,8 @@ class KernelManager(HasTraits): else: from pykernel import launch_kernel self.kernel, xrep, pub, req, _hb = launch_kernel( - xrep_port=xreq[1], pub_port=sub[1], - req_port=rep[1], hb_port=hb[1], **kw) + shell_port=xreq[1], iopub_port=sub[1], + stdin_port=rep[1], hb_port=hb[1], **kw) self.xreq_address = (xreq[0], xrep) self.sub_address = (sub[0], pub) self.rep_address = (rep[0], req) diff --git a/IPython/zmq/logtopics.rst b/IPython/zmq/logtopics.rst new file mode 100644 index 0000000..e8d9f51 --- /dev/null +++ b/IPython/zmq/logtopics.rst @@ -0,0 +1,13 @@ +======================= +Log Topic Specification +======================= + +we use pyzmq to broadcast log events over a PUB socket. Engines, Controllers, etc. can all +broadcast. SUB sockets can be used to view the logs, and ZMQ topics are used to help +select out what to follow. + +the PUBHandler object that emits the logs can ascribe topics to log messages. The order is: + +..[.] + +root_topic is specified as an attribute diff --git a/IPython/zmq/pykernel.py b/IPython/zmq/pykernel.py index 01e1c9d..7efc825 100755 --- a/IPython/zmq/pykernel.py +++ b/IPython/zmq/pykernel.py @@ -25,10 +25,11 @@ import traceback import zmq # Local imports. -from IPython.utils.traitlets import HasTraits, Instance, Float +from IPython.utils.traitlets import HasTraits, Instance, Dict, Float from completer import KernelCompleter -from entry_point import base_launch_kernel, make_default_main +from entry_point import base_launch_kernel from session import Session, Message +from kernelapp import KernelApp #----------------------------------------------------------------------------- # Main kernel class @@ -49,16 +50,16 @@ class Kernel(HasTraits): # This is a dict of port number that the kernel is listening on. It is set # by record_ports and used by connect_request. - _recorded_ports = None + _recorded_ports = Dict() #--------------------------------------------------------------------------- # Kernel interface #--------------------------------------------------------------------------- session = Instance(Session) - reply_socket = Instance('zmq.Socket') - pub_socket = Instance('zmq.Socket') - req_socket = Instance('zmq.Socket') + shell_socket = Instance('zmq.Socket') + iopub_socket = Instance('zmq.Socket') + stdin_socket = Instance('zmq.Socket') def __init__(self, **kwargs): super(Kernel, self).__init__(**kwargs) @@ -78,7 +79,7 @@ class Kernel(HasTraits): """ Start the kernel main loop. """ while True: - ident,msg = self.session.recv(self.reply_socket,0) + ident,msg = self.session.recv(self.shell_socket,0) assert ident is not None, "Missing message part." omsg = Message(msg) print>>sys.__stdout__ @@ -89,18 +90,13 @@ class Kernel(HasTraits): else: handler(ident, omsg) - def record_ports(self, xrep_port, pub_port, req_port, hb_port): + def record_ports(self, ports): """Record the ports that this kernel is using. The creator of the Kernel instance must call this methods if they want the :meth:`connect_request` method to return the port numbers. """ - self._recorded_ports = { - 'xrep_port' : xrep_port, - 'pub_port' : pub_port, - 'req_port' : req_port, - 'hb_port' : hb_port - } + self._recorded_ports = ports #--------------------------------------------------------------------------- # Kernel request handlers @@ -113,7 +109,7 @@ class Kernel(HasTraits): print>>sys.__stderr__, "Got bad msg: " print>>sys.__stderr__, Message(parent) return - pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent) + pyin_msg = self.session.send(self.iopub_socket, u'pyin',{u'code':code}, parent=parent) try: comp_code = self.compiler(code, '') @@ -138,7 +134,7 @@ class Kernel(HasTraits): u'ename' : unicode(etype.__name__), u'evalue' : unicode(evalue) } - exc_msg = self.session.send(self.pub_socket, u'pyerr', exc_content, parent) + exc_msg = self.session.send(self.iopub_socket, u'pyerr', exc_content, parent) reply_content = exc_content else: reply_content = { 'status' : 'ok', 'payload' : {} } @@ -153,7 +149,7 @@ class Kernel(HasTraits): time.sleep(self._execute_sleep) # Send the reply. - reply_msg = self.session.send(self.reply_socket, u'execute_reply', reply_content, parent, ident=ident) + reply_msg = self.session.send(self.shell_socket, u'execute_reply', reply_content, parent, ident=ident) print>>sys.__stdout__, Message(reply_msg) if reply_msg['content']['status'] == u'error': self._abort_queue() @@ -161,22 +157,22 @@ class Kernel(HasTraits): def complete_request(self, ident, parent): matches = {'matches' : self._complete(parent), 'status' : 'ok'} - completion_msg = self.session.send(self.reply_socket, 'complete_reply', + completion_msg = self.session.send(self.shell_socket, 'complete_reply', matches, parent, ident) print >> sys.__stdout__, completion_msg def object_info_request(self, ident, parent): context = parent['content']['oname'].split('.') object_info = self._object_info(context) - msg = self.session.send(self.reply_socket, 'object_info_reply', + msg = self.session.send(self.shell_socket, 'object_info_reply', object_info, parent, ident) print >> sys.__stdout__, msg def shutdown_request(self, ident, parent): content = dict(parent['content']) - msg = self.session.send(self.reply_socket, 'shutdown_reply', + msg = self.session.send(self.shell_socket, 'shutdown_reply', content, parent, ident) - msg = self.session.send(self.pub_socket, 'shutdown_reply', + msg = self.session.send(self.iopub_socket, 'shutdown_reply', content, parent, ident) print >> sys.__stdout__, msg time.sleep(0.1) @@ -197,7 +193,7 @@ class Kernel(HasTraits): print>>sys.__stdout__, Message(msg) msg_type = msg['msg_type'] reply_type = msg_type.split('_')[0] + '_reply' - reply_msg = self.session.send(self.reply_socket, reply_type, {'status':'aborted'}, msg, ident=ident) + reply_msg = self.session.send(self.shell_socket, reply_type, {'status':'aborted'}, msg, ident=ident) print>>sys.__stdout__, Message(reply_msg) # We need to wait a bit for requests to come in. This can probably # be set shorter for true asynchronous clients. @@ -210,10 +206,10 @@ class Kernel(HasTraits): # Send the input request. content = dict(prompt=prompt) - msg = self.session.send(self.req_socket, u'input_request', content, parent) + msg = self.session.send(self.stdin_socket, u'input_request', content, parent) # Await a response. - ident,reply = self.session.recv(self.req_socket, 0) + ident,reply = self.session.recv(self.stdin_socket, 0) try: value = reply['content']['value'] except: @@ -259,58 +255,26 @@ class Kernel(HasTraits): # Kernel main and launch functions #----------------------------------------------------------------------------- -def launch_kernel(ip=None, xrep_port=0, pub_port=0, req_port=0, hb_port=0, - stdin=None, stdout=None, stderr=None, - executable=None, independent=False): - """ Launches a localhost kernel, binding to the specified ports. - - Parameters - ---------- - ip : str, optional - The ip address the kernel will bind to. +def launch_kernel(*args, **kwargs): + """ Launches a simple Python kernel, binding to the specified ports. - xrep_port : int, optional - The port to use for XREP channel. - - pub_port : int, optional - The port to use for the SUB channel. - - req_port : int, optional - The port to use for the REQ (raw input) channel. - - hb_port : int, optional - The port to use for the hearbeat REP channel. - - stdin, stdout, stderr : optional (default None) - Standards streams, as defined in subprocess.Popen. - - executable : str, optional (default sys.executable) - The Python executable to use for the kernel process. - - independent : bool, optional (default False) - If set, the kernel process is guaranteed to survive if this process - dies. If not set, an effort is made to ensure that the kernel is killed - when this process dies. Note that in this case it is still good practice - to kill kernels manually before exiting. + This function simply calls entry_point.base_launch_kernel with the right first + command to start a pykernel. See base_launch_kernel for arguments. Returns ------- A tuple of form: - (kernel_process, xrep_port, pub_port, req_port) + (kernel_process, xrep_port, pub_port, req_port, hb_port) where kernel_process is a Popen object and the ports are integers. """ - extra_arguments = [] - if ip is not None: - extra_arguments.append('--ip') - if isinstance(ip, basestring): - extra_arguments.append(ip) - return base_launch_kernel('from IPython.zmq.pykernel import main; main()', - xrep_port, pub_port, req_port, hb_port, - stdin, stdout, stderr, - executable, independent, extra_arguments) + *args, **kwargs) -main = make_default_main(Kernel) +def main(): + """Run a PyKernel as an application""" + app = KernelApp() + app.initialize() + app.start() if __name__ == '__main__': main() diff --git a/IPython/zmq/tests/test_session.py b/IPython/zmq/tests/test_session.py new file mode 100644 index 0000000..051c11c --- /dev/null +++ b/IPython/zmq/tests/test_session.py @@ -0,0 +1,111 @@ +"""test building messages with streamsession""" + +#------------------------------------------------------------------------------- +# Copyright (C) 2011 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#------------------------------------------------------------------------------- + +#------------------------------------------------------------------------------- +# Imports +#------------------------------------------------------------------------------- + +import os +import uuid +import zmq + +from zmq.tests import BaseZMQTestCase +from zmq.eventloop.zmqstream import ZMQStream +# from IPython.zmq.tests import SessionTestCase +from IPython.parallel import streamsession as ss + +class SessionTestCase(BaseZMQTestCase): + + def setUp(self): + BaseZMQTestCase.setUp(self) + self.session = ss.StreamSession() + +class TestSession(SessionTestCase): + + def test_msg(self): + """message format""" + msg = self.session.msg('execute') + thekeys = set('header msg_id parent_header msg_type content'.split()) + s = set(msg.keys()) + self.assertEquals(s, thekeys) + self.assertTrue(isinstance(msg['content'],dict)) + self.assertTrue(isinstance(msg['header'],dict)) + self.assertTrue(isinstance(msg['parent_header'],dict)) + self.assertEquals(msg['msg_type'], 'execute') + + + + def test_args(self): + """initialization arguments for StreamSession""" + s = self.session + self.assertTrue(s.pack is ss.default_packer) + self.assertTrue(s.unpack is ss.default_unpacker) + self.assertEquals(s.username, os.environ.get('USER', 'username')) + + s = ss.StreamSession() + self.assertEquals(s.username, os.environ.get('USER', 'username')) + + self.assertRaises(TypeError, ss.StreamSession, pack='hi') + self.assertRaises(TypeError, ss.StreamSession, unpack='hi') + u = str(uuid.uuid4()) + s = ss.StreamSession(username='carrot', session=u) + self.assertEquals(s.session, u) + self.assertEquals(s.username, 'carrot') + + def test_tracking(self): + """test tracking messages""" + a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR) + s = self.session + stream = ZMQStream(a) + msg = s.send(a, 'hello', track=False) + self.assertTrue(msg['tracker'] is None) + msg = s.send(a, 'hello', track=True) + self.assertTrue(isinstance(msg['tracker'], zmq.MessageTracker)) + M = zmq.Message(b'hi there', track=True) + msg = s.send(a, 'hello', buffers=[M], track=True) + t = msg['tracker'] + self.assertTrue(isinstance(t, zmq.MessageTracker)) + self.assertRaises(zmq.NotDone, t.wait, .1) + del M + t.wait(1) # this will raise + + + # def test_rekey(self): + # """rekeying dict around json str keys""" + # d = {'0': uuid.uuid4(), 0:uuid.uuid4()} + # self.assertRaises(KeyError, ss.rekey, d) + # + # d = {'0': uuid.uuid4(), 1:uuid.uuid4(), 'asdf':uuid.uuid4()} + # d2 = {0:d['0'],1:d[1],'asdf':d['asdf']} + # rd = ss.rekey(d) + # self.assertEquals(d2,rd) + # + # d = {'1.5':uuid.uuid4(),'1':uuid.uuid4()} + # d2 = {1.5:d['1.5'],1:d['1']} + # rd = ss.rekey(d) + # self.assertEquals(d2,rd) + # + # d = {'1.0':uuid.uuid4(),'1':uuid.uuid4()} + # self.assertRaises(KeyError, ss.rekey, d) + # + def test_unique_msg_ids(self): + """test that messages receive unique ids""" + ids = set() + for i in range(2**12): + h = self.session.msg_header('test') + msg_id = h['msg_id'] + self.assertTrue(msg_id not in ids) + ids.add(msg_id) + + def test_feed_identities(self): + """scrub the front for zmq IDENTITIES""" + theids = "engine client other".split() + content = dict(code='whoda',stuff=object()) + themsg = self.session.msg('execute',content=content) + pmsg = theids