diff --git a/IPython/zmq/forward.py b/IPython/zmq/forward.py new file mode 100644 index 0000000..d39ce97 --- /dev/null +++ b/IPython/zmq/forward.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python + +# +# This file is adapted from a paramiko demo, and thus LGPL 2.1. +# Original Copyright (C) 2003-2007 Robey Pointer +# Edits Copyright (C) 2010 The IPython Team +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +Sample script showing how to do local port forwarding over paramiko. + +This script connects to the requested SSH server and sets up local port +forwarding (the openssh -L option) from a local port through a tunneled +connection to a destination reachable from the SSH server machine. +""" + +from __future__ import print_function + +import getpass +import os +import socket +import select +import SocketServer +import sys +from optparse import OptionParser + +import paramiko + +SSH_PORT = 22 +DEFAULT_PORT = 4000 + +g_verbose = False + + +class ForwardServer (SocketServer.ThreadingTCPServer): + daemon_threads = True + allow_reuse_address = True + + +class Handler (SocketServer.BaseRequestHandler): + + def handle(self): + try: + chan = self.ssh_transport.open_channel('direct-tcpip', + (self.chain_host, self.chain_port), + self.request.getpeername()) + except Exception, e: + verbose('Incoming request to %s:%d failed: %s' % (self.chain_host, + self.chain_port, + repr(e))) + return + if chan is None: + verbose('Incoming request to %s:%d was rejected by the SSH server.' % + (self.chain_host, self.chain_port)) + return + + verbose('Connected! Tunnel open %r -> %r -> %r' % (self.request.getpeername(), + chan.getpeername(), (self.chain_host, self.chain_port))) + while True: + r, w, x = select.select([self.request, chan], [], []) + if self.request in r: + data = self.request.recv(1024) + if len(data) == 0: + break + chan.send(data) + if chan in r: + data = chan.recv(1024) + if len(data) == 0: + break + self.request.send(data) + chan.close() + self.request.close() + verbose('Tunnel closed from %r' % (self.request.getpeername(),)) + + +def forward_tunnel(local_port, remote_host, remote_port, transport): + # this is a little convoluted, but lets me configure things for the Handler + # object. (SocketServer doesn't give Handlers any way to access the outer + # server normally.) + class SubHander (Handler): + chain_host = remote_host + chain_port = remote_port + ssh_transport = transport + ForwardServer(('', local_port), SubHander).serve_forever() + + +def verbose(s): + if g_verbose: + print (s) + + +HELP = """\ +Set up a forward tunnel across an SSH server, using paramiko. A local port +(given with -p) is forwarded across an SSH session to an address:port from +the SSH server. This is similar to the openssh -L option. +""" + + +def get_host_port(spec, default_port): + "parse 'hostname:22' into a host and port, with the port optional" + args = (spec.split(':', 1) + [default_port])[:2] + args[1] = int(args[1]) + return args[0], args[1] + + +def parse_options(): + global g_verbose + + parser = OptionParser(usage='usage: %prog [options] [:]', + version='%prog 1.0', description=HELP) + parser.add_option('-q', '--quiet', action='store_false', dest='verbose', default=True, + help='squelch all informational output') + parser.add_option('-p', '--local-port', action='store', type='int', dest='port', + default=DEFAULT_PORT, + help='local port to forward (default: %d)' % DEFAULT_PORT) + parser.add_option('-u', '--user', action='store', type='string', dest='user', + default=getpass.getuser(), + help='username for SSH authentication (default: %s)' % getpass.getuser()) + parser.add_option('-K', '--key', action='store', type='string', dest='keyfile', + default=None, + help='private key file to use for SSH authentication') + parser.add_option('', '--no-key', action='store_false', dest='look_for_keys', default=True, + help='don\'t look for or use a private key file') + parser.add_option('-P', '--password', action='store_true', dest='readpass', default=False, + help='read password (for key or password auth) from stdin') + parser.add_option('-r', '--remote', action='store', type='string', dest='remote', default=None, metavar='host:port', + help='remote host and port to forward to') + options, args = parser.parse_args() + + if len(args) != 1: + parser.error('Incorrect number of arguments.') + if options.remote is None: + parser.error('Remote address required (-r).') + + g_verbose = options.verbose + server_host, server_port = get_host_port(args[0], SSH_PORT) + remote_host, remote_port = get_host_port(options.remote, SSH_PORT) + return options, (server_host, server_port), (remote_host, remote_port) + + +def main(): + options, server, remote = parse_options() + + password = None + if options.readpass: + password = getpass.getpass('Enter SSH password: ') + + client = paramiko.SSHClient() + client.load_system_host_keys() + client.set_missing_host_key_policy(paramiko.WarningPolicy()) + + verbose('Connecting to ssh host %s:%d ...' % (server[0], server[1])) + try: + client.connect(server[0], server[1], username=options.user, key_filename=options.keyfile, + look_for_keys=options.look_for_keys, password=password) + except Exception as e: + print ('*** Failed to connect to %s:%d: %r' % (server[0], server[1], e)) + sys.exit(1) + + verbose('Now forwarding port %d to %s:%d ...' % (options.port, remote[0], remote[1])) + + try: + forward_tunnel(options.port, remote[0], remote[1], client.get_transport()) + except KeyboardInterrupt: + print ('C-c: Port forwarding stopped.') + sys.exit(0) + + +if __name__ == '__main__': + main() diff --git a/IPython/zmq/parallel/dependency.py b/IPython/zmq/parallel/dependency.py index a5c2fcc..964a823 100644 --- a/IPython/zmq/parallel/dependency.py +++ b/IPython/zmq/parallel/dependency.py @@ -11,15 +11,7 @@ ANYWHERE = 1 << 3 class UnmetDependency(Exception): pass -class depend2(object): - """dependency decorator""" - def __init__(self, f, *args, **kwargs): - self.dependency = (f,args,kwargs) - - def __call__(self, f, *args, **kwargs): - f._dependency = self.dependency - return decorator(_depend_wrapper, f) - + class depend(object): """Dependency decorator, for use with tasks.""" def __init__(self, f, *args, **kwargs): diff --git a/IPython/zmq/parallel/kernelstarter.py b/IPython/zmq/parallel/kernelstarter.py new file mode 100644 index 0000000..bb3a6bc --- /dev/null +++ b/IPython/zmq/parallel/kernelstarter.py @@ -0,0 +1,217 @@ +"""KernelStarter class that intercepts Control Queue messages, and handles process management.""" + +from zmq.eventloop import ioloop +from streamsession import StreamSession + +class KernelStarter(object): + """Object for resetting/killing the Kernel.""" + + + def __init__(self, session, upstream, downstream, *kernel_args, **kernel_kwargs): + self.session = session + self.upstream = upstream + self.downstream = downstream + self.kernel_args = kernel_args + self.kernel_kwargs = kernel_kwargs + self.handlers = {} + for method in 'shutdown_request shutdown_reply'.split(): + self.handlers[method] = getattr(self, method) + + def start(self): + self.upstream.on_recv(self.dispatch_request) + self.downstream.on_recv(self.dispatch_reply) + + #-------------------------------------------------------------------------- + # Dispatch methods + #-------------------------------------------------------------------------- + + def dispatch_request(self, raw_msg): + idents, msg = self.session.feed_identities() + try: + msg = self.session.unpack_message(msg, content=False) + except: + print ("bad msg: %s"%msg) + + msgtype = msg['msg_type'] + handler = self.handlers.get(msgtype, None) + if handler is None: + self.downstream.send_multipart(raw_msg) + else: + handler(msg) + + def dispatch_reply(self, raw_msg): + idents, msg = self.session.feed_identities() + try: + msg = self.session.unpack_message(msg, content=False) + except: + print ("bad msg: %s"%msg) + + msgtype = msg['msg_type'] + handler = self.handlers.get(msgtype, None) + if handler is None: + self.upstream.send_multipart(raw_msg) + else: + handler(msg) + + #-------------------------------------------------------------------------- + # Handlers + #-------------------------------------------------------------------------- + + def shutdown_request(self, msg): + + + #-------------------------------------------------------------------------- + # Kernel process management methods, from KernelManager: + #-------------------------------------------------------------------------- + + def _check_local(addr): + if isinstance(addr, tuple): + addr = addr[0] + return addr in LOCAL_IPS + + def start_kernel(self, **kw): + """Starts a kernel process and configures the manager to use it. + + If random ports (port=0) are being used, this method must be called + before the channels are created. + + Parameters: + ----------- + ipython : bool, optional (default True) + Whether to use an IPython kernel instead of a plain Python kernel. + """ + self.kernel = Process(target=make_kernel, args=self.kernel_args, + kwargs=self.kernel_kwargs) + + def shutdown_kernel(self, restart=False): + """ Attempts to the stop the kernel process cleanly. If the kernel + cannot be stopped, it is killed, if possible. + """ + # FIXME: Shutdown does not work on Windows due to ZMQ errors! + if sys.platform == 'win32': + self.kill_kernel() + return + + # Don't send any additional kernel kill messages immediately, to give + # the kernel a chance to properly execute shutdown actions. Wait for at + # most 1s, checking every 0.1s. + self.xreq_channel.shutdown(restart=restart) + for i in range(10): + if self.is_alive: + time.sleep(0.1) + else: + break + else: + # OK, we've waited long enough. + if self.has_kernel: + self.kill_kernel() + + def restart_kernel(self, now=False): + """Restarts a kernel with the same arguments that were used to launch + it. If the old kernel was launched with random ports, the same ports + will be used for the new kernel. + + Parameters + ---------- + now : bool, optional + If True, the kernel is forcefully restarted *immediately*, without + having a chance to do any cleanup action. Otherwise the kernel is + given 1s to clean up before a forceful restart is issued. + + In all cases the kernel is restarted, the only difference is whether + it is given a chance to perform a clean shutdown or not. + """ + if self._launch_args is None: + raise RuntimeError("Cannot restart the kernel. " + "No previous call to 'start_kernel'.") + else: + if self.has_kernel: + if now: + self.kill_kernel() + else: + self.shutdown_kernel(restart=True) + self.start_kernel(**self._launch_args) + + # FIXME: Messages get dropped in Windows due to probable ZMQ bug + # unless there is some delay here. + if sys.platform == 'win32': + time.sleep(0.2) + + @property + def has_kernel(self): + """Returns whether a kernel process has been specified for the kernel + manager. + """ + return self.kernel is not None + + def kill_kernel(self): + """ Kill the running kernel. """ + if self.has_kernel: + # Pause the heart beat channel if it exists. + if self._hb_channel is not None: + self._hb_channel.pause() + + # Attempt to kill the kernel. + try: + self.kernel.kill() + except OSError, e: + # In Windows, we will get an Access Denied error if the process + # has already terminated. Ignore it. + if not (sys.platform == 'win32' and e.winerror == 5): + raise + self.kernel = None + else: + raise RuntimeError("Cannot kill kernel. No kernel is running!") + + def interrupt_kernel(self): + """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is + well supported on all platforms. + """ + if self.has_kernel: + if sys.platform == 'win32': + from parentpoller import ParentPollerWindows as Poller + Poller.send_interrupt(self.kernel.win32_interrupt_event) + else: + self.kernel.send_signal(signal.SIGINT) + else: + raise RuntimeError("Cannot interrupt kernel. No kernel is running!") + + def signal_kernel(self, signum): + """ Sends a signal to the kernel. Note that since only SIGTERM is + supported on Windows, this function is only useful on Unix systems. + """ + if self.has_kernel: + self.kernel.send_signal(signum) + else: + raise RuntimeError("Cannot signal kernel. No kernel is running!") + + @property + def is_alive(self): + """Is the kernel process still running?""" + # FIXME: not using a heartbeat means this method is broken for any + # remote kernel, it's only capable of handling local kernels. + if self.has_kernel: + if self.kernel.poll() is None: + return True + else: + return False + else: + # We didn't start the kernel with this KernelManager so we don't + # know if it is running. We should use a heartbeat for this case. + return True + + +def make_starter(up_addr, down_addr, *args, **kwargs): + """entry point script for launching a kernelstarter in a subprocess""" + loop = ioloop.IOLoop.instance() + ctx = zmq.Context() + session = StreamSession() + upstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop) + upstream.connect(up_addr) + downstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop) + downstream.connect(down_addr) + + starter = KernelStarter(session, upstream, downstream, *args, **kwargs) + starter.start() + loop.start() + \ No newline at end of file diff --git a/IPython/zmq/tunnel.py b/IPython/zmq/tunnel.py new file mode 100644 index 0000000..b6b1a36 --- /dev/null +++ b/IPython/zmq/tunnel.py @@ -0,0 +1,123 @@ + + +#----------------------------------------- +# Imports +#----------------------------------------- + +from __future__ import print_function + +import os,sys +from multiprocessing import Process +from getpass import getpass, getuser + +try: + import paramiko +except ImportError: + paramiko = None +else: + from forward import forward_tunnel + +from IPython.external import pexpect + + +def launch_ssh_tunnel(lport, rport, server, remoteip='127.0.0.1', keyfile=None, timeout=15): + """Create an ssh tunnel using command-line ssh that connects port lport + on this machine to localhost:rport on server. The tunnel + will automatically close when not in use, remaining open + for a minimum of timeout seconds for an initial connection. + """ + ssh="ssh " + if keyfile: + ssh += "-i " + keyfile + cmd = ssh + " -f -L %i:127.0.0.1:%i %s sleep %i"%(lport, rport, server, timeout) + tunnel = pexpect.spawn(cmd) + failed = False + while True: + try: + tunnel.expect('[Pp]assword:', timeout=.1) + except pexpect.TIMEOUT: + continue + except pexpect.EOF: + if tunnel.exitstatus: + print (tunnel.exitstatus) + print (tunnel.before) + print (tunnel.after) + raise RuntimeError("tunnel '%s' failed to start"%(cmd)) + else: + return tunnel.pid + else: + if failed: + print("Password rejected, try again") + tunnel.sendline(getpass()) + failed = True + +def _split_server(server): + if '@' in server: + username,server = server.split('@', 1) + else: + username = getuser() + if ':' in server: + server, port = server.split(':') + port = int(port) + else: + port = 22 + return username, server, port + +def launch_paramiko_tunnel(lport, rport, server, remoteip='127.0.0.1', keyfile=None): + """launch a tunner with paramiko in a subprocess""" + if paramiko is None: + raise ImportError("Paramiko not available") + server = _split_server(server) + if keyfile is None: + passwd = getpass("%s@%s's password: "%(server[0], server[1])) + else: + passwd = None + p = Process(target=_paramiko_tunnel, + args=(lport, rport, server, remoteip), + kwargs=dict(keyfile=keyfile, password=passwd)) + p.daemon=False + p.start() + return p + + +def _paramiko_tunnel(lport, rport, server, remoteip, keyfile=None, password=None): + """function for actually starting a paramiko tunnel, to be passed + to multiprocessing.Process(target=this). + """ + username, server, port = server + client = paramiko.SSHClient() + client.load_system_host_keys() + client.set_missing_host_key_policy(paramiko.WarningPolicy()) + + try: + client.connect(server, port, username=username, key_filename=keyfile, + look_for_keys=True, password=password) + except Exception as e: + print ('*** Failed to connect to %s:%d: %r' % (server, port, e)) + sys.exit(1) + + print ('Now forwarding port %d to %s:%d ...' % (lport, server, rport)) + + try: + forward_tunnel(lport, remoteip, rport, client.get_transport()) + except KeyboardInterrupt: + print ('C-c: Port forwarding stopped.') + sys.exit(0) + + +__all__ = ['launch_ssh_tunnel', 'launch_paramiko_tunnel'] + + + + + + + + + + + + + + +