##// END OF EJS Templates
adapt kernel's ipcluster and Launchers to newparallel
MinRK -
Show More
This diff has been collapsed as it changes many lines, (502 lines changed) Show them Hide them
@@ -0,0 +1,502
1 #!/usr/bin/env python
2 # encoding: utf-8
3 """
4 The ipcluster application.
5 """
6
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 import logging
19 import os
20 import signal
21 import logging
22
23 from zmq.eventloop import ioloop
24
25 from IPython.external.argparse import ArgumentParser, SUPPRESS
26 from IPython.utils.importstring import import_item
27 from IPython.zmq.parallel.clusterdir import (
28 ApplicationWithClusterDir, ClusterDirConfigLoader,
29 ClusterDirError, PIDFileError
30 )
31
32
33 #-----------------------------------------------------------------------------
34 # Module level variables
35 #-----------------------------------------------------------------------------
36
37
38 default_config_file_name = u'ipcluster_config.py'
39
40
41 _description = """\
42 Start an IPython cluster for parallel computing.\n\n
43
44 An IPython cluster consists of 1 controller and 1 or more engines.
45 This command automates the startup of these processes using a wide
46 range of startup methods (SSH, local processes, PBS, mpiexec,
47 Windows HPC Server 2008). To start a cluster with 4 engines on your
48 local host simply do 'ipcluster start -n 4'. For more complex usage
49 you will typically do 'ipcluster create -p mycluster', then edit
50 configuration files, followed by 'ipcluster start -p mycluster -n 4'.
51 """
52
53
54 # Exit codes for ipcluster
55
56 # This will be the exit code if the ipcluster appears to be running because
57 # a .pid file exists
58 ALREADY_STARTED = 10
59
60
61 # This will be the exit code if ipcluster stop is run, but there is not .pid
62 # file to be found.
63 ALREADY_STOPPED = 11
64
65
66 #-----------------------------------------------------------------------------
67 # Command line options
68 #-----------------------------------------------------------------------------
69
70
71 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
72
73 def _add_arguments(self):
74 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
75 # its defaults on self.parser. Instead, we will put those on
76 # default options on our subparsers.
77
78 # This has all the common options that all subcommands use
79 parent_parser1 = ArgumentParser(
80 add_help=False,
81 argument_default=SUPPRESS
82 )
83 self._add_ipython_dir(parent_parser1)
84 self._add_log_level(parent_parser1)
85
86 # This has all the common options that other subcommands use
87 parent_parser2 = ArgumentParser(
88 add_help=False,
89 argument_default=SUPPRESS
90 )
91 self._add_cluster_profile(parent_parser2)
92 self._add_cluster_dir(parent_parser2)
93 self._add_work_dir(parent_parser2)
94 paa = parent_parser2.add_argument
95 paa('--log-to-file',
96 action='store_true', dest='Global.log_to_file',
97 help='Log to a file in the log directory (default is stdout)')
98
99 # Create the object used to create the subparsers.
100 subparsers = self.parser.add_subparsers(
101 dest='Global.subcommand',
102 title='ipcluster subcommands',
103 description=
104 """ipcluster has a variety of subcommands. The general way of
105 running ipcluster is 'ipcluster <cmd> [options]'. To get help
106 on a particular subcommand do 'ipcluster <cmd> -h'."""
107 # help="For more help, type 'ipcluster <cmd> -h'",
108 )
109
110 # The "list" subcommand parser
111 parser_list = subparsers.add_parser(
112 'list',
113 parents=[parent_parser1],
114 argument_default=SUPPRESS,
115 help="List all clusters in cwd and ipython_dir.",
116 description=
117 """List all available clusters, by cluster directory, that can
118 be found in the current working directly or in the ipython
119 directory. Cluster directories are named using the convention
120 'cluster_<profile>'."""
121 )
122
123 # The "create" subcommand parser
124 parser_create = subparsers.add_parser(
125 'create',
126 parents=[parent_parser1, parent_parser2],
127 argument_default=SUPPRESS,
128 help="Create a new cluster directory.",
129 description=
130 """Create an ipython cluster directory by its profile name or
131 cluster directory path. Cluster directories contain
132 configuration, log and security related files and are named
133 using the convention 'cluster_<profile>'. By default they are
134 located in your ipython directory. Once created, you will
135 probably need to edit the configuration files in the cluster
136 directory to configure your cluster. Most users will create a
137 cluster directory by profile name,
138 'ipcluster create -p mycluster', which will put the directory
139 in '<ipython_dir>/cluster_mycluster'.
140 """
141 )
142 paa = parser_create.add_argument
143 paa('--reset-config',
144 dest='Global.reset_config', action='store_true',
145 help=
146 """Recopy the default config files to the cluster directory.
147 You will loose any modifications you have made to these files.""")
148
149 # The "start" subcommand parser
150 parser_start = subparsers.add_parser(
151 'start',
152 parents=[parent_parser1, parent_parser2],
153 argument_default=SUPPRESS,
154 help="Start a cluster.",
155 description=
156 """Start an ipython cluster by its profile name or cluster
157 directory. Cluster directories contain configuration, log and
158 security related files and are named using the convention
159 'cluster_<profile>' and should be creating using the 'start'
160 subcommand of 'ipcluster'. If your cluster directory is in
161 the cwd or the ipython directory, you can simply refer to it
162 using its profile name, 'ipcluster start -n 4 -p <profile>`,
163 otherwise use the '--cluster-dir' option.
164 """
165 )
166 paa = parser_start.add_argument
167 paa('-n', '--number',
168 type=int, dest='Global.n',
169 help='The number of engines to start.',
170 metavar='Global.n')
171 paa('--clean-logs',
172 dest='Global.clean_logs', action='store_true',
173 help='Delete old log flies before starting.')
174 paa('--no-clean-logs',
175 dest='Global.clean_logs', action='store_false',
176 help="Don't delete old log flies before starting.")
177 paa('--daemon',
178 dest='Global.daemonize', action='store_true',
179 help='Daemonize the ipcluster program. This implies --log-to-file')
180 paa('--no-daemon',
181 dest='Global.daemonize', action='store_false',
182 help="Dont't daemonize the ipcluster program.")
183
184 # The "stop" subcommand parser
185 parser_stop = subparsers.add_parser(
186 'stop',
187 parents=[parent_parser1, parent_parser2],
188 argument_default=SUPPRESS,
189 help="Stop a running cluster.",
190 description=
191 """Stop a running ipython cluster by its profile name or cluster
192 directory. Cluster directories are named using the convention
193 'cluster_<profile>'. If your cluster directory is in
194 the cwd or the ipython directory, you can simply refer to it
195 using its profile name, 'ipcluster stop -p <profile>`, otherwise
196 use the '--cluster-dir' option.
197 """
198 )
199 paa = parser_stop.add_argument
200 paa('--signal',
201 dest='Global.signal', type=int,
202 help="The signal number to use in stopping the cluster (default=2).",
203 metavar="Global.signal")
204
205
206 #-----------------------------------------------------------------------------
207 # Main application
208 #-----------------------------------------------------------------------------
209
210
211 class IPClusterApp(ApplicationWithClusterDir):
212
213 name = u'ipclusterz'
214 description = _description
215 usage = None
216 command_line_loader = IPClusterAppConfigLoader
217 default_config_file_name = default_config_file_name
218 default_log_level = logging.INFO
219 auto_create_cluster_dir = False
220
221 def create_default_config(self):
222 super(IPClusterApp, self).create_default_config()
223 self.default_config.Global.controller_launcher = \
224 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
225 self.default_config.Global.engine_launcher = \
226 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
227 self.default_config.Global.n = 2
228 self.default_config.Global.reset_config = False
229 self.default_config.Global.clean_logs = True
230 self.default_config.Global.signal = 2
231 self.default_config.Global.daemonize = False
232
233 def find_resources(self):
234 subcommand = self.command_line_config.Global.subcommand
235 if subcommand=='list':
236 self.list_cluster_dirs()
237 # Exit immediately because there is nothing left to do.
238 self.exit()
239 elif subcommand=='create':
240 self.auto_create_cluster_dir = True
241 super(IPClusterApp, self).find_resources()
242 elif subcommand=='start' or subcommand=='stop':
243 self.auto_create_cluster_dir = True
244 try:
245 super(IPClusterApp, self).find_resources()
246 except ClusterDirError:
247 raise ClusterDirError(
248 "Could not find a cluster directory. A cluster dir must "
249 "be created before running 'ipcluster start'. Do "
250 "'ipcluster create -h' or 'ipcluster list -h' for more "
251 "information about creating and listing cluster dirs."
252 )
253
254 def list_cluster_dirs(self):
255 # Find the search paths
256 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
257 if cluster_dir_paths:
258 cluster_dir_paths = cluster_dir_paths.split(':')
259 else:
260 cluster_dir_paths = []
261 try:
262 ipython_dir = self.command_line_config.Global.ipython_dir
263 except AttributeError:
264 ipython_dir = self.default_config.Global.ipython_dir
265 paths = [os.getcwd(), ipython_dir] + \
266 cluster_dir_paths
267 paths = list(set(paths))
268
269 self.log.info('Searching for cluster dirs in paths: %r' % paths)
270 for path in paths:
271 files = os.listdir(path)
272 for f in files:
273 full_path = os.path.join(path, f)
274 if os.path.isdir(full_path) and f.startswith('cluster_'):
275 profile = full_path.split('_')[-1]
276 start_cmd = 'ipcluster start -p %s -n 4' % profile
277 print start_cmd + " ==> " + full_path
278
279 def pre_construct(self):
280 # IPClusterApp.pre_construct() is where we cd to the working directory.
281 super(IPClusterApp, self).pre_construct()
282 config = self.master_config
283 try:
284 daemon = config.Global.daemonize
285 if daemon:
286 config.Global.log_to_file = True
287 except AttributeError:
288 pass
289
290 def construct(self):
291 config = self.master_config
292 subcmd = config.Global.subcommand
293 reset = config.Global.reset_config
294 if subcmd == 'list':
295 return
296 if subcmd == 'create':
297 self.log.info('Copying default config files to cluster directory '
298 '[overwrite=%r]' % (reset,))
299 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
300 if subcmd =='start':
301 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
302 self.start_logging()
303 self.loop = ioloop.IOLoop.instance()
304 # reactor.callWhenRunning(self.start_launchers)
305 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
306 dc.start()
307
308 def start_launchers(self):
309 config = self.master_config
310
311 # Create the launchers. In both bases, we set the work_dir of
312 # the launcher to the cluster_dir. This is where the launcher's
313 # subprocesses will be launched. It is not where the controller
314 # and engine will be launched.
315 el_class = import_item(config.Global.engine_launcher)
316 self.engine_launcher = el_class(
317 work_dir=self.cluster_dir, config=config
318 )
319 cl_class = import_item(config.Global.controller_launcher)
320 self.controller_launcher = cl_class(
321 work_dir=self.cluster_dir, config=config
322 )
323
324 # Setup signals
325 signal.signal(signal.SIGINT, self.sigint_handler)
326
327 # Setup the observing of stopping. If the controller dies, shut
328 # everything down as that will be completely fatal for the engines.
329 self.controller_launcher.on_stop(self.stop_launchers)
330 # d1.addCallback(self.stop_launchers)
331 # But, we don't monitor the stopping of engines. An engine dying
332 # is just fine and in principle a user could start a new engine.
333 # Also, if we did monitor engine stopping, it is difficult to
334 # know what to do when only some engines die. Currently, the
335 # observing of engine stopping is inconsistent. Some launchers
336 # might trigger on a single engine stopping, other wait until
337 # all stop. TODO: think more about how to handle this.
338
339 # Start the controller and engines
340 self._stopping = False # Make sure stop_launchers is not called 2x.
341 d = self.start_controller()
342 self.start_engines()
343 self.startup_message()
344 # d.addCallback(self.start_engines)
345 # d.addCallback(self.startup_message)
346 # If the controller or engines fail to start, stop everything
347 # d.addErrback(self.stop_launchers)
348 return d
349
350 def startup_message(self, r=None):
351 logging.info("IPython cluster: started")
352 return r
353
354 def start_controller(self, r=None):
355 # logging.info("In start_controller")
356 config = self.master_config
357 d = self.controller_launcher.start(
358 cluster_dir=config.Global.cluster_dir
359 )
360 return d
361
362 def start_engines(self, r=None):
363 # logging.info("In start_engines")
364 config = self.master_config
365 d = self.engine_launcher.start(
366 config.Global.n,
367 cluster_dir=config.Global.cluster_dir
368 )
369 return d
370
371 def stop_controller(self, r=None):
372 # logging.info("In stop_controller")
373 if self.controller_launcher.running:
374 return self.controller_launcher.stop()
375
376 def stop_engines(self, r=None):
377 # logging.info("In stop_engines")
378 if self.engine_launcher.running:
379 d = self.engine_launcher.stop()
380 # d.addErrback(self.log_err)
381 return d
382 else:
383 return None
384
385 def log_err(self, f):
386 logging.error(f.getTraceback())
387 return None
388
389 def stop_launchers(self, r=None):
390 if not self._stopping:
391 self._stopping = True
392 # if isinstance(r, failure.Failure):
393 # logging.error('Unexpected error in ipcluster:')
394 # logging.info(r.getTraceback())
395 logging.error("IPython cluster: stopping")
396 # These return deferreds. We are not doing anything with them
397 # but we are holding refs to them as a reminder that they
398 # do return deferreds.
399 d1 = self.stop_engines()
400 d2 = self.stop_controller()
401 # Wait a few seconds to let things shut down.
402 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
403 dc.start()
404 # reactor.callLater(4.0, reactor.stop)
405
406 def sigint_handler(self, signum, frame):
407 self.stop_launchers()
408
409 def start_logging(self):
410 # Remove old log files of the controller and engine
411 if self.master_config.Global.clean_logs:
412 log_dir = self.master_config.Global.log_dir
413 for f in os.listdir(log_dir):
414 if f.startswith('ipengine' + '-'):
415 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
416 os.remove(os.path.join(log_dir, f))
417 if f.startswith('ipcontroller' + '-'):
418 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
419 os.remove(os.path.join(log_dir, f))
420 # This will remote old log files for ipcluster itself
421 super(IPClusterApp, self).start_logging()
422
423 def start_app(self):
424 """Start the application, depending on what subcommand is used."""
425 subcmd = self.master_config.Global.subcommand
426 if subcmd=='create' or subcmd=='list':
427 return
428 elif subcmd=='start':
429 self.start_app_start()
430 elif subcmd=='stop':
431 self.start_app_stop()
432
433 def start_app_start(self):
434 """Start the app for the start subcommand."""
435 config = self.master_config
436 # First see if the cluster is already running
437 try:
438 pid = self.get_pid_from_file()
439 except PIDFileError:
440 pass
441 else:
442 self.log.critical(
443 'Cluster is already running with [pid=%s]. '
444 'use "ipcluster stop" to stop the cluster.' % pid
445 )
446 # Here I exit with a unusual exit status that other processes
447 # can watch for to learn how I existed.
448 self.exit(ALREADY_STARTED)
449
450 # Now log and daemonize
451 self.log.info(
452 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
453 )
454 # TODO: Get daemonize working on Windows or as a Windows Server.
455 if config.Global.daemonize:
456 if os.name=='posix':
457 from twisted.scripts._twistd_unix import daemonize
458 daemonize()
459
460 # Now write the new pid file AFTER our new forked pid is active.
461 self.write_pid_file()
462 try:
463 self.loop.start()
464 except:
465 logging.info("stopping...")
466 self.remove_pid_file()
467
468 def start_app_stop(self):
469 """Start the app for the stop subcommand."""
470 config = self.master_config
471 try:
472 pid = self.get_pid_from_file()
473 except PIDFileError:
474 self.log.critical(
475 'Problem reading pid file, cluster is probably not running.'
476 )
477 # Here I exit with a unusual exit status that other processes
478 # can watch for to learn how I existed.
479 self.exit(ALREADY_STOPPED)
480 else:
481 if os.name=='posix':
482 sig = config.Global.signal
483 self.log.info(
484 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
485 )
486 os.kill(pid, sig)
487 elif os.name=='nt':
488 # As of right now, we don't support daemonize on Windows, so
489 # stop will not do anything. Minimally, it should clean up the
490 # old .pid files.
491 self.remove_pid_file()
492
493
494 def launch_new_instance():
495 """Create and run the IPython cluster."""
496 app = IPClusterApp()
497 app.start()
498
499
500 if __name__ == '__main__':
501 launch_new_instance()
502
This diff has been collapsed as it changes many lines, (824 lines changed) Show them Hide them
@@ -0,0 +1,824
1 #!/usr/bin/env python
2 # encoding: utf-8
3 """
4 Facilities for launching IPython processes asynchronously.
5 """
6
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 import os
19 import re
20 import sys
21 import logging
22
23 from signal import SIGINT
24 try:
25 from signal import SIGKILL
26 except ImportError:
27 SIGKILL=SIGTERM
28
29 from subprocess import Popen, PIPE
30
31 from zmq.eventloop import ioloop
32
33 from IPython.config.configurable import Configurable
34 from IPython.utils.traitlets import Str, Int, List, Unicode, Instance
35 from IPython.utils.path import get_ipython_module_path
36 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
37
38 # from IPython.kernel.winhpcjob import (
39 # IPControllerTask, IPEngineTask,
40 # IPControllerJob, IPEngineSetJob
41 # )
42
43
44 #-----------------------------------------------------------------------------
45 # Paths to the kernel apps
46 #-----------------------------------------------------------------------------
47
48
49 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
50 'IPython.zmq.parallel.ipclusterapp'
51 ))
52
53 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
54 'IPython.zmq.parallel.ipengineapp'
55 ))
56
57 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
58 'IPython.zmq.parallel.ipcontrollerapp'
59 ))
60
61 #-----------------------------------------------------------------------------
62 # Base launchers and errors
63 #-----------------------------------------------------------------------------
64
65
66 class LauncherError(Exception):
67 pass
68
69
70 class ProcessStateError(LauncherError):
71 pass
72
73
74 class UnknownStatus(LauncherError):
75 pass
76
77
78 class BaseLauncher(Configurable):
79 """An asbtraction for starting, stopping and signaling a process."""
80
81 # In all of the launchers, the work_dir is where child processes will be
82 # run. This will usually be the cluster_dir, but may not be. any work_dir
83 # passed into the __init__ method will override the config value.
84 # This should not be used to set the work_dir for the actual engine
85 # and controller. Instead, use their own config files or the
86 # controller_args, engine_args attributes of the launchers to add
87 # the --work-dir option.
88 work_dir = Unicode(u'.')
89 loop = Instance('zmq.eventloop.ioloop.IOLoop')
90 def _loop_default(self):
91 return ioloop.IOLoop.instance()
92
93 def __init__(self, work_dir=u'.', config=None):
94 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config)
95 self.state = 'before' # can be before, running, after
96 self.stop_callbacks = []
97 self.start_data = None
98 self.stop_data = None
99
100 @property
101 def args(self):
102 """A list of cmd and args that will be used to start the process.
103
104 This is what is passed to :func:`spawnProcess` and the first element
105 will be the process name.
106 """
107 return self.find_args()
108
109 def find_args(self):
110 """The ``.args`` property calls this to find the args list.
111
112 Subcommand should implement this to construct the cmd and args.
113 """
114 raise NotImplementedError('find_args must be implemented in a subclass')
115
116 @property
117 def arg_str(self):
118 """The string form of the program arguments."""
119 return ' '.join(self.args)
120
121 @property
122 def running(self):
123 """Am I running."""
124 if self.state == 'running':
125 return True
126 else:
127 return False
128
129 def start(self):
130 """Start the process.
131
132 This must return a deferred that fires with information about the
133 process starting (like a pid, job id, etc.).
134 """
135 raise NotImplementedError('start must be implemented in a subclass')
136
137 def stop(self):
138 """Stop the process and notify observers of stopping.
139
140 This must return a deferred that fires with information about the
141 processing stopping, like errors that occur while the process is
142 attempting to be shut down. This deferred won't fire when the process
143 actually stops. To observe the actual process stopping, see
144 :func:`observe_stop`.
145 """
146 raise NotImplementedError('stop must be implemented in a subclass')
147
148 def on_stop(self, f):
149 """Get a deferred that will fire when the process stops.
150
151 The deferred will fire with data that contains information about
152 the exit status of the process.
153 """
154 if self.state=='after':
155 return f(self.stop_data)
156 else:
157 self.stop_callbacks.append(f)
158
159 def notify_start(self, data):
160 """Call this to trigger startup actions.
161
162 This logs the process startup and sets the state to 'running'. It is
163 a pass-through so it can be used as a callback.
164 """
165
166 logging.info('Process %r started: %r' % (self.args[0], data))
167 self.start_data = data
168 self.state = 'running'
169 return data
170
171 def notify_stop(self, data):
172 """Call this to trigger process stop actions.
173
174 This logs the process stopping and sets the state to 'after'. Call
175 this to trigger all the deferreds from :func:`observe_stop`."""
176
177 logging.info('Process %r stopped: %r' % (self.args[0], data))
178 self.stop_data = data
179 self.state = 'after'
180 for i in range(len(self.stop_callbacks)):
181 d = self.stop_callbacks.pop()
182 d(data)
183 return data
184
185 def signal(self, sig):
186 """Signal the process.
187
188 Return a semi-meaningless deferred after signaling the process.
189
190 Parameters
191 ----------
192 sig : str or int
193 'KILL', 'INT', etc., or any signal number
194 """
195 raise NotImplementedError('signal must be implemented in a subclass')
196
197
198 #-----------------------------------------------------------------------------
199 # Local process launchers
200 #-----------------------------------------------------------------------------
201
202
203 class LocalProcessLauncher(BaseLauncher):
204 """Start and stop an external process in an asynchronous manner.
205
206 This will launch the external process with a working directory of
207 ``self.work_dir``.
208 """
209
210 # This is used to to construct self.args, which is passed to
211 # spawnProcess.
212 cmd_and_args = List([])
213 poll_frequency = Int(100) # in ms
214
215 def __init__(self, work_dir=u'.', config=None):
216 super(LocalProcessLauncher, self).__init__(
217 work_dir=work_dir, config=config
218 )
219 self.process = None
220 self.start_deferred = None
221 self.poller = None
222
223 def find_args(self):
224 return self.cmd_and_args
225
226 def start(self):
227 if self.state == 'before':
228 self.process = Popen(self.args,
229 stdout=PIPE,stderr=PIPE,stdin=PIPE,
230 env=os.environ,
231 cwd=self.work_dir
232 )
233
234 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
235 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
236 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
237 self.poller.start()
238 self.notify_start(self.process.pid)
239 else:
240 s = 'The process was already started and has state: %r' % self.state
241 raise ProcessStateError(s)
242
243 def stop(self):
244 return self.interrupt_then_kill()
245
246 def signal(self, sig):
247 if self.state == 'running':
248 self.process.send_signal(sig)
249
250 def interrupt_then_kill(self, delay=2.0):
251 """Send INT, wait a delay and then send KILL."""
252 self.signal(SIGINT)
253 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
254 self.killer.start()
255
256 # callbacks, etc:
257
258 def handle_stdout(self, fd, events):
259 line = self.process.stdout.readline()
260 # a stopped process will be readable but return empty strings
261 if line:
262 logging.info(line[:-1])
263 else:
264 self.poll()
265
266 def handle_stderr(self, fd, events):
267 line = self.process.stderr.readline()
268 # a stopped process will be readable but return empty strings
269 if line:
270 logging.error(line[:-1])
271 else:
272 self.poll()
273
274 def poll(self):
275 status = self.process.poll()
276 if status is not None:
277 self.poller.stop()
278 self.loop.remove_handler(self.process.stdout.fileno())
279 self.loop.remove_handler(self.process.stderr.fileno())
280 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
281 return status
282
283 class LocalControllerLauncher(LocalProcessLauncher):
284 """Launch a controller as a regular external process."""
285
286 controller_cmd = List(ipcontroller_cmd_argv, config=True)
287 # Command line arguments to ipcontroller.
288 controller_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True)
289
290 def find_args(self):
291 return self.controller_cmd + self.controller_args
292
293 def start(self, cluster_dir):
294 """Start the controller by cluster_dir."""
295 self.controller_args.extend(['--cluster-dir', cluster_dir])
296 self.cluster_dir = unicode(cluster_dir)
297 logging.info("Starting LocalControllerLauncher: %r" % self.args)
298 return super(LocalControllerLauncher, self).start()
299
300
301 class LocalEngineLauncher(LocalProcessLauncher):
302 """Launch a single engine as a regular externall process."""
303
304 engine_cmd = List(ipengine_cmd_argv, config=True)
305 # Command line arguments for ipengine.
306 engine_args = List(
307 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
308 )
309
310 def find_args(self):
311 return self.engine_cmd + self.engine_args
312
313 def start(self, cluster_dir):
314 """Start the engine by cluster_dir."""
315 self.engine_args.extend(['--cluster-dir', cluster_dir])
316 self.cluster_dir = unicode(cluster_dir)
317 return super(LocalEngineLauncher, self).start()
318
319
320 class LocalEngineSetLauncher(BaseLauncher):
321 """Launch a set of engines as regular external processes."""
322
323 # Command line arguments for ipengine.
324 engine_args = List(
325 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
326 )
327 # launcher class
328 launcher_class = LocalEngineLauncher
329
330 def __init__(self, work_dir=u'.', config=None):
331 super(LocalEngineSetLauncher, self).__init__(
332 work_dir=work_dir, config=config
333 )
334 self.launchers = {}
335 self.stop_data = {}
336
337 def start(self, n, cluster_dir):
338 """Start n engines by profile or cluster_dir."""
339 self.cluster_dir = unicode(cluster_dir)
340 dlist = []
341 for i in range(n):
342 el = self.launcher_class(work_dir=self.work_dir, config=self.config)
343 # Copy the engine args over to each engine launcher.
344 import copy
345 el.engine_args = copy.deepcopy(self.engine_args)
346 el.on_stop(self._notice_engine_stopped)
347 d = el.start(cluster_dir)
348 if i==0:
349 logging.info("Starting LocalEngineSetLauncher: %r" % el.args)
350 self.launchers[i] = el
351 dlist.append(d)
352 self.notify_start(dlist)
353 # The consumeErrors here could be dangerous
354 # dfinal = gatherBoth(dlist, consumeErrors=True)
355 # dfinal.addCallback(self.notify_start)
356 return dlist
357
358 def find_args(self):
359 return ['engine set']
360
361 def signal(self, sig):
362 dlist = []
363 for el in self.launchers.itervalues():
364 d = el.signal(sig)
365 dlist.append(d)
366 # dfinal = gatherBoth(dlist, consumeErrors=True)
367 return dlist
368
369 def interrupt_then_kill(self, delay=1.0):
370 dlist = []
371 for el in self.launchers.itervalues():
372 d = el.interrupt_then_kill(delay)
373 dlist.append(d)
374 # dfinal = gatherBoth(dlist, consumeErrors=True)
375 return dlist
376
377 def stop(self):
378 return self.interrupt_then_kill()
379
380 def _notice_engine_stopped(self, data):
381 print "notice", data
382 pid = data['pid']
383 for idx,el in self.launchers.iteritems():
384 if el.process.pid == pid:
385 break
386 self.launchers.pop(idx)
387 self.stop_data[idx] = data
388 if not self.launchers:
389 self.notify_stop(self.stop_data)
390
391
392 #-----------------------------------------------------------------------------
393 # MPIExec launchers
394 #-----------------------------------------------------------------------------
395
396
397 class MPIExecLauncher(LocalProcessLauncher):
398 """Launch an external process using mpiexec."""
399
400 # The mpiexec command to use in starting the process.
401 mpi_cmd = List(['mpiexec'], config=True)
402 # The command line arguments to pass to mpiexec.
403 mpi_args = List([], config=True)
404 # The program to start using mpiexec.
405 program = List(['date'], config=True)
406 # The command line argument to the program.
407 program_args = List([], config=True)
408 # The number of instances of the program to start.
409 n = Int(1, config=True)
410
411 def find_args(self):
412 """Build self.args using all the fields."""
413 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
414 self.program + self.program_args
415
416 def start(self, n):
417 """Start n instances of the program using mpiexec."""
418 self.n = n
419 return super(MPIExecLauncher, self).start()
420
421
422 class MPIExecControllerLauncher(MPIExecLauncher):
423 """Launch a controller using mpiexec."""
424
425 controller_cmd = List(ipcontroller_cmd_argv, config=True)
426 # Command line arguments to ipcontroller.
427 controller_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True)
428 n = Int(1, config=False)
429
430 def start(self, cluster_dir):
431 """Start the controller by cluster_dir."""
432 self.controller_args.extend(['--cluster-dir', cluster_dir])
433 self.cluster_dir = unicode(cluster_dir)
434 logging.info("Starting MPIExecControllerLauncher: %r" % self.args)
435 return super(MPIExecControllerLauncher, self).start(1)
436
437 def find_args(self):
438 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
439 self.controller_cmd + self.controller_args
440
441
442 class MPIExecEngineSetLauncher(MPIExecLauncher):
443
444 engine_cmd = List(ipengine_cmd_argv, config=True)
445 # Command line arguments for ipengine.
446 engine_args = List(
447 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
448 )
449 n = Int(1, config=True)
450
451 def start(self, n, cluster_dir):
452 """Start n engines by profile or cluster_dir."""
453 self.engine_args.extend(['--cluster-dir', cluster_dir])
454 self.cluster_dir = unicode(cluster_dir)
455 self.n = n
456 logging.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
457 return super(MPIExecEngineSetLauncher, self).start(n)
458
459 def find_args(self):
460 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
461 self.engine_cmd + self.engine_args
462
463
464 #-----------------------------------------------------------------------------
465 # SSH launchers
466 #-----------------------------------------------------------------------------
467
468 # TODO: Get SSH Launcher working again.
469
470 class SSHLauncher(LocalProcessLauncher):
471 """A minimal launcher for ssh.
472
473 To be useful this will probably have to be extended to use the ``sshx``
474 idea for environment variables. There could be other things this needs
475 as well.
476 """
477
478 ssh_cmd = List(['ssh'], config=True)
479 ssh_args = List([], config=True)
480 program = List(['date'], config=True)
481 program_args = List([], config=True)
482 hostname = Str('', config=True)
483 user = Str(os.environ.get('USER','username'), config=True)
484 location = Str('')
485
486 def _hostname_changed(self, name, old, new):
487 self.location = '%s@%s' % (self.user, new)
488
489 def _user_changed(self, name, old, new):
490 self.location = '%s@%s' % (new, self.hostname)
491
492 def find_args(self):
493 return self.ssh_cmd + self.ssh_args + [self.location] + \
494 self.program + self.program_args
495
496 def start(self, cluster_dir, hostname=None, user=None):
497 if hostname is not None:
498 self.hostname = hostname
499 if user is not None:
500 self.user = user
501 return super(SSHLauncher, self).start()
502
503
504 class SSHControllerLauncher(SSHLauncher):
505
506 program = List(ipcontroller_cmd_argv, config=True)
507 # Command line arguments to ipcontroller.
508 program_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True)
509
510
511 class SSHEngineLauncher(SSHLauncher):
512 program = List(ipengine_cmd_argv, config=True)
513 # Command line arguments for ipengine.
514 program_args = List(
515 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
516 )
517
518 class SSHEngineSetLauncher(LocalEngineSetLauncher):
519 launcher_class = SSHEngineLauncher
520
521
522 #-----------------------------------------------------------------------------
523 # Windows HPC Server 2008 scheduler launchers
524 #-----------------------------------------------------------------------------
525
526
527 # # This is only used on Windows.
528 # def find_job_cmd():
529 # if os.name=='nt':
530 # try:
531 # return find_cmd('job')
532 # except FindCmdError:
533 # return 'job'
534 # else:
535 # return 'job'
536 #
537 #
538 # class WindowsHPCLauncher(BaseLauncher):
539 #
540 # # A regular expression used to get the job id from the output of the
541 # # submit_command.
542 # job_id_regexp = Str(r'\d+', config=True)
543 # # The filename of the instantiated job script.
544 # job_file_name = Unicode(u'ipython_job.xml', config=True)
545 # # The full path to the instantiated job script. This gets made dynamically
546 # # by combining the work_dir with the job_file_name.
547 # job_file = Unicode(u'')
548 # # The hostname of the scheduler to submit the job to
549 # scheduler = Str('', config=True)
550 # job_cmd = Str(find_job_cmd(), config=True)
551 #
552 # def __init__(self, work_dir=u'.', config=None):
553 # super(WindowsHPCLauncher, self).__init__(
554 # work_dir=work_dir, config=config
555 # )
556 #
557 # @property
558 # def job_file(self):
559 # return os.path.join(self.work_dir, self.job_file_name)
560 #
561 # def write_job_file(self, n):
562 # raise NotImplementedError("Implement write_job_file in a subclass.")
563 #
564 # def find_args(self):
565 # return ['job.exe']
566 #
567 # def parse_job_id(self, output):
568 # """Take the output of the submit command and return the job id."""
569 # m = re.search(self.job_id_regexp, output)
570 # if m is not None:
571 # job_id = m.group()
572 # else:
573 # raise LauncherError("Job id couldn't be determined: %s" % output)
574 # self.job_id = job_id
575 # logging.info('Job started with job id: %r' % job_id)
576 # return job_id
577 #
578 # @inlineCallbacks
579 # def start(self, n):
580 # """Start n copies of the process using the Win HPC job scheduler."""
581 # self.write_job_file(n)
582 # args = [
583 # 'submit',
584 # '/jobfile:%s' % self.job_file,
585 # '/scheduler:%s' % self.scheduler
586 # ]
587 # logging.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
588 # # Twisted will raise DeprecationWarnings if we try to pass unicode to this
589 # output = yield getProcessOutput(str(self.job_cmd),
590 # [str(a) for a in args],
591 # env=dict((str(k),str(v)) for k,v in os.environ.items()),
592 # path=self.work_dir
593 # )
594 # job_id = self.parse_job_id(output)
595 # self.notify_start(job_id)
596 # defer.returnValue(job_id)
597 #
598 # @inlineCallbacks
599 # def stop(self):
600 # args = [
601 # 'cancel',
602 # self.job_id,
603 # '/scheduler:%s' % self.scheduler
604 # ]
605 # logging.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
606 # try:
607 # # Twisted will raise DeprecationWarnings if we try to pass unicode to this
608 # output = yield getProcessOutput(str(self.job_cmd),
609 # [str(a) for a in args],
610 # env=dict((str(k),str(v)) for k,v in os.environ.iteritems()),
611 # path=self.work_dir
612 # )
613 # except:
614 # output = 'The job already appears to be stoppped: %r' % self.job_id
615 # self.notify_stop(output) # Pass the output of the kill cmd
616 # defer.returnValue(output)
617 #
618 #
619 # class WindowsHPCControllerLauncher(WindowsHPCLauncher):
620 #
621 # job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
622 # extra_args = List([], config=False)
623 #
624 # def write_job_file(self, n):
625 # job = IPControllerJob(config=self.config)
626 #
627 # t = IPControllerTask(config=self.config)
628 # # The tasks work directory is *not* the actual work directory of
629 # # the controller. It is used as the base path for the stdout/stderr
630 # # files that the scheduler redirects to.
631 # t.work_directory = self.cluster_dir
632 # # Add the --cluster-dir and from self.start().
633 # t.controller_args.extend(self.extra_args)
634 # job.add_task(t)
635 #
636 # logging.info("Writing job description file: %s" % self.job_file)
637 # job.write(self.job_file)
638 #
639 # @property
640 # def job_file(self):
641 # return os.path.join(self.cluster_dir, self.job_file_name)
642 #
643 # def start(self, cluster_dir):
644 # """Start the controller by cluster_dir."""
645 # self.extra_args = ['--cluster-dir', cluster_dir]
646 # self.cluster_dir = unicode(cluster_dir)
647 # return super(WindowsHPCControllerLauncher, self).start(1)
648 #
649 #
650 # class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
651 #
652 # job_file_name = Unicode(u'ipengineset_job.xml', config=True)
653 # extra_args = List([], config=False)
654 #
655 # def write_job_file(self, n):
656 # job = IPEngineSetJob(config=self.config)
657 #
658 # for i in range(n):
659 # t = IPEngineTask(config=self.config)
660 # # The tasks work directory is *not* the actual work directory of
661 # # the engine. It is used as the base path for the stdout/stderr
662 # # files that the scheduler redirects to.
663 # t.work_directory = self.cluster_dir
664 # # Add the --cluster-dir and from self.start().
665 # t.engine_args.extend(self.extra_args)
666 # job.add_task(t)
667 #
668 # logging.info("Writing job description file: %s" % self.job_file)
669 # job.write(self.job_file)
670 #
671 # @property
672 # def job_file(self):
673 # return os.path.join(self.cluster_dir, self.job_file_name)
674 #
675 # def start(self, n, cluster_dir):
676 # """Start the controller by cluster_dir."""
677 # self.extra_args = ['--cluster-dir', cluster_dir]
678 # self.cluster_dir = unicode(cluster_dir)
679 # return super(WindowsHPCEngineSetLauncher, self).start(n)
680 #
681 #
682 # #-----------------------------------------------------------------------------
683 # # Batch (PBS) system launchers
684 # #-----------------------------------------------------------------------------
685 #
686 # # TODO: Get PBS launcher working again.
687 #
688 # class BatchSystemLauncher(BaseLauncher):
689 # """Launch an external process using a batch system.
690 #
691 # This class is designed to work with UNIX batch systems like PBS, LSF,
692 # GridEngine, etc. The overall model is that there are different commands
693 # like qsub, qdel, etc. that handle the starting and stopping of the process.
694 #
695 # This class also has the notion of a batch script. The ``batch_template``
696 # attribute can be set to a string that is a template for the batch script.
697 # This template is instantiated using Itpl. Thus the template can use
698 # ${n} fot the number of instances. Subclasses can add additional variables
699 # to the template dict.
700 # """
701 #
702 # # Subclasses must fill these in. See PBSEngineSet
703 # # The name of the command line program used to submit jobs.
704 # submit_command = Str('', config=True)
705 # # The name of the command line program used to delete jobs.
706 # delete_command = Str('', config=True)
707 # # A regular expression used to get the job id from the output of the
708 # # submit_command.
709 # job_id_regexp = Str('', config=True)
710 # # The string that is the batch script template itself.
711 # batch_template = Str('', config=True)
712 # # The filename of the instantiated batch script.
713 # batch_file_name = Unicode(u'batch_script', config=True)
714 # # The full path to the instantiated batch script.
715 # batch_file = Unicode(u'')
716 #
717 # def __init__(self, work_dir=u'.', config=None):
718 # super(BatchSystemLauncher, self).__init__(
719 # work_dir=work_dir, config=config
720 # )
721 # self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
722 # self.context = {}
723 #
724 # def parse_job_id(self, output):
725 # """Take the output of the submit command and return the job id."""
726 # m = re.match(self.job_id_regexp, output)
727 # if m is not None:
728 # job_id = m.group()
729 # else:
730 # raise LauncherError("Job id couldn't be determined: %s" % output)
731 # self.job_id = job_id
732 # logging.info('Job started with job id: %r' % job_id)
733 # return job_id
734 #
735 # def write_batch_script(self, n):
736 # """Instantiate and write the batch script to the work_dir."""
737 # self.context['n'] = n
738 # script_as_string = Itpl.itplns(self.batch_template, self.context)
739 # logging.info('Writing instantiated batch script: %s' % self.batch_file)
740 # f = open(self.batch_file, 'w')
741 # f.write(script_as_string)
742 # f.close()
743 #
744 # @inlineCallbacks
745 # def start(self, n):
746 # """Start n copies of the process using a batch system."""
747 # self.write_batch_script(n)
748 # output = yield getProcessOutput(self.submit_command,
749 # [self.batch_file], env=os.environ)
750 # job_id = self.parse_job_id(output)
751 # self.notify_start(job_id)
752 # defer.returnValue(job_id)
753 #
754 # @inlineCallbacks
755 # def stop(self):
756 # output = yield getProcessOutput(self.delete_command,
757 # [self.job_id], env=os.environ
758 # )
759 # self.notify_stop(output) # Pass the output of the kill cmd
760 # defer.returnValue(output)
761 #
762 #
763 # class PBSLauncher(BatchSystemLauncher):
764 # """A BatchSystemLauncher subclass for PBS."""
765 #
766 # submit_command = Str('qsub', config=True)
767 # delete_command = Str('qdel', config=True)
768 # job_id_regexp = Str(r'\d+', config=True)
769 # batch_template = Str('', config=True)
770 # batch_file_name = Unicode(u'pbs_batch_script', config=True)
771 # batch_file = Unicode(u'')
772 #
773 #
774 # class PBSControllerLauncher(PBSLauncher):
775 # """Launch a controller using PBS."""
776 #
777 # batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
778 #
779 # def start(self, cluster_dir):
780 # """Start the controller by profile or cluster_dir."""
781 # # Here we save profile and cluster_dir in the context so they
782 # # can be used in the batch script template as ${profile} and
783 # # ${cluster_dir}
784 # self.context['cluster_dir'] = cluster_dir
785 # self.cluster_dir = unicode(cluster_dir)
786 # logging.info("Starting PBSControllerLauncher: %r" % self.args)
787 # return super(PBSControllerLauncher, self).start(1)
788 #
789 #
790 # class PBSEngineSetLauncher(PBSLauncher):
791 #
792 # batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
793 #
794 # def start(self, n, cluster_dir):
795 # """Start n engines by profile or cluster_dir."""
796 # self.program_args.extend(['--cluster-dir', cluster_dir])
797 # self.cluster_dir = unicode(cluster_dir)
798 # logging.info('Starting PBSEngineSetLauncher: %r' % self.args)
799 # return super(PBSEngineSetLauncher, self).start(n)
800
801
802 #-----------------------------------------------------------------------------
803 # A launcher for ipcluster itself!
804 #-----------------------------------------------------------------------------
805
806
807 class IPClusterLauncher(LocalProcessLauncher):
808 """Launch the ipcluster program in an external process."""
809
810 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
811 # Command line arguments to pass to ipcluster.
812 ipcluster_args = List(
813 ['--clean-logs', '--log-to-file', '--log-level', str(logging.ERROR)], config=True)
814 ipcluster_subcommand = Str('start')
815 ipcluster_n = Int(2)
816
817 def find_args(self):
818 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
819 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
820
821 def start(self):
822 logging.info("Starting ipcluster: %r" % self.args)
823 return super(IPClusterLauncher, self).start()
824
@@ -1,1172 +1,1164
1 1 """A semi-synchronous Client for the ZMQ controller"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 13 import os
14 14 import time
15 15 from getpass import getpass
16 16 from pprint import pprint
17 17 from datetime import datetime
18 18
19 19 import zmq
20 20 from zmq.eventloop import ioloop, zmqstream
21 21
22 22 from IPython.external.decorator import decorator
23 23 from IPython.zmq import tunnel
24 24
25 25 import streamsession as ss
26 26 # from remotenamespace import RemoteNamespace
27 27 from view import DirectView, LoadBalancedView
28 28 from dependency import Dependency, depend, require
29 29 import error
30 30 import map as Map
31 31 from asyncresult import AsyncResult, AsyncMapResult
32 32 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
33 33 from util import ReverseDict
34 34
35 35 #--------------------------------------------------------------------------
36 36 # helpers for implementing old MEC API via client.apply
37 37 #--------------------------------------------------------------------------
38 38
39 39 def _push(ns):
40 40 """helper method for implementing `client.push` via `client.apply`"""
41 41 globals().update(ns)
42 42
43 43 def _pull(keys):
44 44 """helper method for implementing `client.pull` via `client.apply`"""
45 45 g = globals()
46 46 if isinstance(keys, (list,tuple, set)):
47 47 for key in keys:
48 48 if not g.has_key(key):
49 49 raise NameError("name '%s' is not defined"%key)
50 50 return map(g.get, keys)
51 51 else:
52 52 if not g.has_key(keys):
53 53 raise NameError("name '%s' is not defined"%keys)
54 54 return g.get(keys)
55 55
56 56 def _clear():
57 57 """helper method for implementing `client.clear` via `client.apply`"""
58 58 globals().clear()
59 59
60 60 def _execute(code):
61 61 """helper method for implementing `client.execute` via `client.apply`"""
62 62 exec code in globals()
63 63
64 64
65 65 #--------------------------------------------------------------------------
66 66 # Decorators for Client methods
67 67 #--------------------------------------------------------------------------
68 68
69 69 @decorator
70 70 def spinfirst(f, self, *args, **kwargs):
71 71 """Call spin() to sync state prior to calling the method."""
72 72 self.spin()
73 73 return f(self, *args, **kwargs)
74 74
75 75 @decorator
76 76 def defaultblock(f, self, *args, **kwargs):
77 77 """Default to self.block; preserve self.block."""
78 78 block = kwargs.get('block',None)
79 79 block = self.block if block is None else block
80 80 saveblock = self.block
81 81 self.block = block
82 82 try:
83 83 ret = f(self, *args, **kwargs)
84 84 finally:
85 85 self.block = saveblock
86 86 return ret
87 87
88 88
89 89 #--------------------------------------------------------------------------
90 90 # Classes
91 91 #--------------------------------------------------------------------------
92 92
93 class ResultDict(dict):
94 """A subclass of dict that raises errors if it has them."""
95 def __getitem__(self, key):
96 res = dict.__getitem__(self, key)
97 if isinstance(res, error.KernelError):
98 raise res
99 return res
100
101 93 class Metadata(dict):
102 94 """Subclass of dict for initializing metadata values."""
103 95 def __init__(self, *args, **kwargs):
104 96 dict.__init__(self)
105 97 md = {'msg_id' : None,
106 98 'submitted' : None,
107 99 'started' : None,
108 100 'completed' : None,
109 101 'received' : None,
110 102 'engine_uuid' : None,
111 103 'engine_id' : None,
112 104 'follow' : None,
113 105 'after' : None,
114 106 'status' : None,
115 107
116 108 'pyin' : None,
117 109 'pyout' : None,
118 110 'pyerr' : None,
119 111 'stdout' : '',
120 112 'stderr' : '',
121 113 }
122 114 self.update(md)
123 115 self.update(dict(*args, **kwargs))
124 116
125 117
126 118
127 119 class Client(object):
128 120 """A semi-synchronous client to the IPython ZMQ controller
129 121
130 122 Parameters
131 123 ----------
132 124
133 125 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
134 126 The address of the controller's registration socket.
135 127 [Default: 'tcp://127.0.0.1:10101']
136 128 context : zmq.Context
137 129 Pass an existing zmq.Context instance, otherwise the client will create its own
138 130 username : bytes
139 131 set username to be passed to the Session object
140 132 debug : bool
141 133 flag for lots of message printing for debug purposes
142 134
143 135 #-------------- ssh related args ----------------
144 136 # These are args for configuring the ssh tunnel to be used
145 137 # credentials are used to forward connections over ssh to the Controller
146 138 # Note that the ip given in `addr` needs to be relative to sshserver
147 139 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
148 140 # and set sshserver as the same machine the Controller is on. However,
149 141 # the only requirement is that sshserver is able to see the Controller
150 142 # (i.e. is within the same trusted network).
151 143
152 144 sshserver : str
153 145 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
154 146 If keyfile or password is specified, and this is not, it will default to
155 147 the ip given in addr.
156 148 sshkey : str; path to public ssh key file
157 149 This specifies a key to be used in ssh login, default None.
158 150 Regular default ssh keys will be used without specifying this argument.
159 151 password : str;
160 152 Your ssh password to sshserver. Note that if this is left None,
161 153 you will be prompted for it if passwordless key based login is unavailable.
162 154
163 155 #------- exec authentication args -------
164 156 # If even localhost is untrusted, you can have some protection against
165 157 # unauthorized execution by using a key. Messages are still sent
166 158 # as cleartext, so if someone can snoop your loopback traffic this will
167 159 # not help anything.
168 160
169 161 exec_key : str
170 162 an authentication key or file containing a key
171 163 default: None
172 164
173 165
174 166 Attributes
175 167 ----------
176 168 ids : set of int engine IDs
177 169 requesting the ids attribute always synchronizes
178 170 the registration state. To request ids without synchronization,
179 171 use semi-private _ids attributes.
180 172
181 173 history : list of msg_ids
182 174 a list of msg_ids, keeping track of all the execution
183 175 messages you have submitted in order.
184 176
185 177 outstanding : set of msg_ids
186 178 a set of msg_ids that have been submitted, but whose
187 179 results have not yet been received.
188 180
189 181 results : dict
190 182 a dict of all our results, keyed by msg_id
191 183
192 184 block : bool
193 185 determines default behavior when block not specified
194 186 in execution methods
195 187
196 188 Methods
197 189 -------
198 190 spin : flushes incoming results and registration state changes
199 191 control methods spin, and requesting `ids` also ensures up to date
200 192
201 193 barrier : wait on one or more msg_ids
202 194
203 195 execution methods: apply/apply_bound/apply_to/apply_bound
204 196 legacy: execute, run
205 197
206 198 query methods: queue_status, get_result, purge
207 199
208 200 control methods: abort, kill
209 201
210 202 """
211 203
212 204
213 205 _connected=False
214 206 _ssh=False
215 207 _engines=None
216 208 _addr='tcp://127.0.0.1:10101'
217 209 _registration_socket=None
218 210 _query_socket=None
219 211 _control_socket=None
220 212 _iopub_socket=None
221 213 _notification_socket=None
222 214 _mux_socket=None
223 215 _task_socket=None
224 216 block = False
225 217 outstanding=None
226 218 results = None
227 219 history = None
228 220 debug = False
229 221 targets = None
230 222
231 223 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
232 224 sshserver=None, sshkey=None, password=None, paramiko=None,
233 225 exec_key=None,):
234 226 if context is None:
235 227 context = zmq.Context()
236 228 self.context = context
237 229 self.targets = 'all'
238 230 self._addr = addr
239 231 self._ssh = bool(sshserver or sshkey or password)
240 232 if self._ssh and sshserver is None:
241 233 # default to the same
242 234 sshserver = addr.split('://')[1].split(':')[0]
243 235 if self._ssh and password is None:
244 236 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
245 237 password=False
246 238 else:
247 239 password = getpass("SSH Password for %s: "%sshserver)
248 240 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
249 241
250 242 if exec_key is not None and os.path.isfile(exec_key):
251 243 arg = 'keyfile'
252 244 else:
253 245 arg = 'key'
254 246 key_arg = {arg:exec_key}
255 247 if username is None:
256 248 self.session = ss.StreamSession(**key_arg)
257 249 else:
258 250 self.session = ss.StreamSession(username, **key_arg)
259 251 self._registration_socket = self.context.socket(zmq.XREQ)
260 252 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
261 253 if self._ssh:
262 254 tunnel.tunnel_connection(self._registration_socket, addr, sshserver, **ssh_kwargs)
263 255 else:
264 256 self._registration_socket.connect(addr)
265 257 self._engines = ReverseDict()
266 258 self._ids = set()
267 259 self.outstanding=set()
268 260 self.results = {}
269 261 self.metadata = {}
270 262 self.history = []
271 263 self.debug = debug
272 264 self.session.debug = debug
273 265
274 266 self._notification_handlers = {'registration_notification' : self._register_engine,
275 267 'unregistration_notification' : self._unregister_engine,
276 268 }
277 269 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
278 270 'apply_reply' : self._handle_apply_reply}
279 271 self._connect(sshserver, ssh_kwargs)
280 272
281 273
282 274 @property
283 275 def ids(self):
284 276 """Always up to date ids property."""
285 277 self._flush_notifications()
286 278 return self._ids
287 279
288 280 def _update_engines(self, engines):
289 281 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
290 282 for k,v in engines.iteritems():
291 283 eid = int(k)
292 284 self._engines[eid] = bytes(v) # force not unicode
293 285 self._ids.add(eid)
294 286
295 287 def _build_targets(self, targets):
296 288 """Turn valid target IDs or 'all' into two lists:
297 289 (int_ids, uuids).
298 290 """
299 291 if targets is None:
300 292 targets = self._ids
301 293 elif isinstance(targets, str):
302 294 if targets.lower() == 'all':
303 295 targets = self._ids
304 296 else:
305 297 raise TypeError("%r not valid str target, must be 'all'"%(targets))
306 298 elif isinstance(targets, int):
307 299 targets = [targets]
308 300 return [self._engines[t] for t in targets], list(targets)
309 301
310 302 def _connect(self, sshserver, ssh_kwargs):
311 303 """setup all our socket connections to the controller. This is called from
312 304 __init__."""
313 305 if self._connected:
314 306 return
315 307 self._connected=True
316 308
317 309 def connect_socket(s, addr):
318 310 if self._ssh:
319 311 return tunnel.tunnel_connection(s, addr, sshserver, **ssh_kwargs)
320 312 else:
321 313 return s.connect(addr)
322 314
323 315 self.session.send(self._registration_socket, 'connection_request')
324 316 idents,msg = self.session.recv(self._registration_socket,mode=0)
325 317 if self.debug:
326 318 pprint(msg)
327 319 msg = ss.Message(msg)
328 320 content = msg.content
329 321 if content.status == 'ok':
330 322 if content.mux:
331 323 self._mux_socket = self.context.socket(zmq.PAIR)
332 324 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
333 325 connect_socket(self._mux_socket, content.mux)
334 326 if content.task:
335 327 self._task_socket = self.context.socket(zmq.PAIR)
336 328 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
337 329 connect_socket(self._task_socket, content.task)
338 330 if content.notification:
339 331 self._notification_socket = self.context.socket(zmq.SUB)
340 332 connect_socket(self._notification_socket, content.notification)
341 333 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
342 334 if content.query:
343 335 self._query_socket = self.context.socket(zmq.PAIR)
344 336 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
345 337 connect_socket(self._query_socket, content.query)
346 338 if content.control:
347 339 self._control_socket = self.context.socket(zmq.PAIR)
348 340 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
349 341 connect_socket(self._control_socket, content.control)
350 342 if content.iopub:
351 343 self._iopub_socket = self.context.socket(zmq.SUB)
352 344 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
353 345 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
354 346 connect_socket(self._iopub_socket, content.iopub)
355 347 self._update_engines(dict(content.engines))
356 348
357 349 else:
358 350 self._connected = False
359 351 raise Exception("Failed to connect!")
360 352
361 353 #--------------------------------------------------------------------------
362 354 # handlers and callbacks for incoming messages
363 355 #--------------------------------------------------------------------------
364 356
365 357 def _register_engine(self, msg):
366 358 """Register a new engine, and update our connection info."""
367 359 content = msg['content']
368 360 eid = content['id']
369 361 d = {eid : content['queue']}
370 362 self._update_engines(d)
371 363 self._ids.add(int(eid))
372 364
373 365 def _unregister_engine(self, msg):
374 366 """Unregister an engine that has died."""
375 367 content = msg['content']
376 368 eid = int(content['id'])
377 369 if eid in self._ids:
378 370 self._ids.remove(eid)
379 371 self._engines.pop(eid)
380 372
381 373 def _extract_metadata(self, header, parent, content):
382 374 md = {'msg_id' : parent['msg_id'],
383 375 'submitted' : datetime.strptime(parent['date'], ss.ISO8601),
384 376 'started' : datetime.strptime(header['started'], ss.ISO8601),
385 377 'completed' : datetime.strptime(header['date'], ss.ISO8601),
386 378 'received' : datetime.now(),
387 379 'engine_uuid' : header['engine'],
388 380 'engine_id' : self._engines.get(header['engine'], None),
389 381 'follow' : parent['follow'],
390 382 'after' : parent['after'],
391 383 'status' : content['status'],
392 384 }
393 385 return md
394 386
395 387 def _handle_execute_reply(self, msg):
396 388 """Save the reply to an execute_request into our results.
397 389
398 390 execute messages are never actually used. apply is used instead.
399 391 """
400 392
401 393 parent = msg['parent_header']
402 394 msg_id = parent['msg_id']
403 395 if msg_id not in self.outstanding:
404 396 print("got unknown result: %s"%msg_id)
405 397 else:
406 398 self.outstanding.remove(msg_id)
407 399 self.results[msg_id] = ss.unwrap_exception(msg['content'])
408 400
409 401 def _handle_apply_reply(self, msg):
410 402 """Save the reply to an apply_request into our results."""
411 403 parent = msg['parent_header']
412 404 msg_id = parent['msg_id']
413 405 if msg_id not in self.outstanding:
414 406 print ("got unknown result: %s"%msg_id)
415 407 else:
416 408 self.outstanding.remove(msg_id)
417 409 content = msg['content']
418 410 header = msg['header']
419 411
420 412 # construct metadata:
421 413 md = self.metadata.setdefault(msg_id, Metadata())
422 414 md.update(self._extract_metadata(header, parent, content))
423 415 self.metadata[msg_id] = md
424 416
425 417 # construct result:
426 418 if content['status'] == 'ok':
427 419 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
428 420 elif content['status'] == 'aborted':
429 421 self.results[msg_id] = error.AbortedTask(msg_id)
430 422 elif content['status'] == 'resubmitted':
431 423 # TODO: handle resubmission
432 424 pass
433 425 else:
434 426 e = ss.unwrap_exception(content)
435 427 e_uuid = e.engine_info['engineid']
436 428 eid = self._engines[e_uuid]
437 429 e.engine_info['engineid'] = eid
438 430 self.results[msg_id] = e
439 431
440 432 def _flush_notifications(self):
441 433 """Flush notifications of engine registrations waiting
442 434 in ZMQ queue."""
443 435 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
444 436 while msg is not None:
445 437 if self.debug:
446 438 pprint(msg)
447 439 msg = msg[-1]
448 440 msg_type = msg['msg_type']
449 441 handler = self._notification_handlers.get(msg_type, None)
450 442 if handler is None:
451 443 raise Exception("Unhandled message type: %s"%msg.msg_type)
452 444 else:
453 445 handler(msg)
454 446 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
455 447
456 448 def _flush_results(self, sock):
457 449 """Flush task or queue results waiting in ZMQ queue."""
458 450 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
459 451 while msg is not None:
460 452 if self.debug:
461 453 pprint(msg)
462 454 msg = msg[-1]
463 455 msg_type = msg['msg_type']
464 456 handler = self._queue_handlers.get(msg_type, None)
465 457 if handler is None:
466 458 raise Exception("Unhandled message type: %s"%msg.msg_type)
467 459 else:
468 460 handler(msg)
469 461 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
470 462
471 463 def _flush_control(self, sock):
472 464 """Flush replies from the control channel waiting
473 465 in the ZMQ queue.
474 466
475 467 Currently: ignore them."""
476 468 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
477 469 while msg is not None:
478 470 if self.debug:
479 471 pprint(msg)
480 472 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
481 473
482 474 def _flush_iopub(self, sock):
483 475 """Flush replies from the iopub channel waiting
484 476 in the ZMQ queue.
485 477 """
486 478 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
487 479 while msg is not None:
488 480 if self.debug:
489 481 pprint(msg)
490 482 msg = msg[-1]
491 483 parent = msg['parent_header']
492 484 msg_id = parent['msg_id']
493 485 content = msg['content']
494 486 header = msg['header']
495 487 msg_type = msg['msg_type']
496 488
497 489 # init metadata:
498 490 md = self.metadata.setdefault(msg_id, Metadata())
499 491
500 492 if msg_type == 'stream':
501 493 name = content['name']
502 494 s = md[name] or ''
503 495 md[name] = s + content['data']
504 496 elif msg_type == 'pyerr':
505 497 md.update({'pyerr' : ss.unwrap_exception(content)})
506 498 else:
507 499 md.update({msg_type : content['data']})
508 500
509 501 self.metadata[msg_id] = md
510 502
511 503 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
512 504
513 505 #--------------------------------------------------------------------------
514 506 # getitem
515 507 #--------------------------------------------------------------------------
516 508
517 509 def __getitem__(self, key):
518 510 """Dict access returns DirectView multiplexer objects or,
519 511 if key is None, a LoadBalancedView."""
520 512 if key is None:
521 513 return LoadBalancedView(self)
522 514 if isinstance(key, int):
523 515 if key not in self.ids:
524 516 raise IndexError("No such engine: %i"%key)
525 517 return DirectView(self, key)
526 518
527 519 if isinstance(key, slice):
528 520 indices = range(len(self.ids))[key]
529 521 ids = sorted(self._ids)
530 522 key = [ ids[i] for i in indices ]
531 523 # newkeys = sorted(self._ids)[thekeys[k]]
532 524
533 525 if isinstance(key, (tuple, list, xrange)):
534 526 _,targets = self._build_targets(list(key))
535 527 return DirectView(self, targets)
536 528 else:
537 529 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
538 530
539 531 #--------------------------------------------------------------------------
540 532 # Begin public methods
541 533 #--------------------------------------------------------------------------
542 534
543 535 @property
544 536 def remote(self):
545 537 """property for convenient RemoteFunction generation.
546 538
547 539 >>> @client.remote
548 540 ... def f():
549 541 import os
550 542 print (os.getpid())
551 543 """
552 544 return remote(self, block=self.block)
553 545
554 546 def spin(self):
555 547 """Flush any registration notifications and execution results
556 548 waiting in the ZMQ queue.
557 549 """
558 550 if self._notification_socket:
559 551 self._flush_notifications()
560 552 if self._mux_socket:
561 553 self._flush_results(self._mux_socket)
562 554 if self._task_socket:
563 555 self._flush_results(self._task_socket)
564 556 if self._control_socket:
565 557 self._flush_control(self._control_socket)
566 558 if self._iopub_socket:
567 559 self._flush_iopub(self._iopub_socket)
568 560
569 561 def barrier(self, msg_ids=None, timeout=-1):
570 562 """waits on one or more `msg_ids`, for up to `timeout` seconds.
571 563
572 564 Parameters
573 565 ----------
574 566 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
575 567 ints are indices to self.history
576 568 strs are msg_ids
577 569 default: wait on all outstanding messages
578 570 timeout : float
579 571 a time in seconds, after which to give up.
580 572 default is -1, which means no timeout
581 573
582 574 Returns
583 575 -------
584 576 True : when all msg_ids are done
585 577 False : timeout reached, some msg_ids still outstanding
586 578 """
587 579 tic = time.time()
588 580 if msg_ids is None:
589 581 theids = self.outstanding
590 582 else:
591 583 if isinstance(msg_ids, (int, str, AsyncResult)):
592 584 msg_ids = [msg_ids]
593 585 theids = set()
594 586 for msg_id in msg_ids:
595 587 if isinstance(msg_id, int):
596 588 msg_id = self.history[msg_id]
597 589 elif isinstance(msg_id, AsyncResult):
598 590 map(theids.add, msg_id.msg_ids)
599 591 continue
600 592 theids.add(msg_id)
601 593 if not theids.intersection(self.outstanding):
602 594 return True
603 595 self.spin()
604 596 while theids.intersection(self.outstanding):
605 597 if timeout >= 0 and ( time.time()-tic ) > timeout:
606 598 break
607 599 time.sleep(1e-3)
608 600 self.spin()
609 601 return len(theids.intersection(self.outstanding)) == 0
610 602
611 603 #--------------------------------------------------------------------------
612 604 # Control methods
613 605 #--------------------------------------------------------------------------
614 606
615 607 @spinfirst
616 608 @defaultblock
617 609 def clear(self, targets=None, block=None):
618 610 """Clear the namespace in target(s)."""
619 611 targets = self._build_targets(targets)[0]
620 612 for t in targets:
621 613 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
622 614 error = False
623 615 if self.block:
624 616 for i in range(len(targets)):
625 617 idents,msg = self.session.recv(self._control_socket,0)
626 618 if self.debug:
627 619 pprint(msg)
628 620 if msg['content']['status'] != 'ok':
629 621 error = ss.unwrap_exception(msg['content'])
630 622 if error:
631 623 return error
632 624
633 625
634 626 @spinfirst
635 627 @defaultblock
636 628 def abort(self, msg_ids = None, targets=None, block=None):
637 629 """Abort the execution queues of target(s)."""
638 630 targets = self._build_targets(targets)[0]
639 631 if isinstance(msg_ids, basestring):
640 632 msg_ids = [msg_ids]
641 633 content = dict(msg_ids=msg_ids)
642 634 for t in targets:
643 635 self.session.send(self._control_socket, 'abort_request',
644 636 content=content, ident=t)
645 637 error = False
646 638 if self.block:
647 639 for i in range(len(targets)):
648 640 idents,msg = self.session.recv(self._control_socket,0)
649 641 if self.debug:
650 642 pprint(msg)
651 643 if msg['content']['status'] != 'ok':
652 644 error = ss.unwrap_exception(msg['content'])
653 645 if error:
654 646 return error
655 647
656 648 @spinfirst
657 649 @defaultblock
658 650 def shutdown(self, targets=None, restart=False, controller=False, block=None):
659 651 """Terminates one or more engine processes, optionally including the controller."""
660 652 if controller:
661 653 targets = 'all'
662 654 targets = self._build_targets(targets)[0]
663 655 for t in targets:
664 656 self.session.send(self._control_socket, 'shutdown_request',
665 657 content={'restart':restart},ident=t)
666 658 error = False
667 659 if block or controller:
668 660 for i in range(len(targets)):
669 661 idents,msg = self.session.recv(self._control_socket,0)
670 662 if self.debug:
671 663 pprint(msg)
672 664 if msg['content']['status'] != 'ok':
673 665 error = ss.unwrap_exception(msg['content'])
674 666
675 667 if controller:
676 668 time.sleep(0.25)
677 669 self.session.send(self._query_socket, 'shutdown_request')
678 670 idents,msg = self.session.recv(self._query_socket, 0)
679 671 if self.debug:
680 672 pprint(msg)
681 673 if msg['content']['status'] != 'ok':
682 674 error = ss.unwrap_exception(msg['content'])
683 675
684 676 if error:
685 677 raise error
686 678
687 679 #--------------------------------------------------------------------------
688 680 # Execution methods
689 681 #--------------------------------------------------------------------------
690 682
691 683 @defaultblock
692 684 def execute(self, code, targets='all', block=None):
693 685 """Executes `code` on `targets` in blocking or nonblocking manner.
694 686
695 687 ``execute`` is always `bound` (affects engine namespace)
696 688
697 689 Parameters
698 690 ----------
699 691 code : str
700 692 the code string to be executed
701 693 targets : int/str/list of ints/strs
702 694 the engines on which to execute
703 695 default : all
704 696 block : bool
705 697 whether or not to wait until done to return
706 698 default: self.block
707 699 """
708 700 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
709 701 return result
710 702
711 703 def run(self, code, block=None):
712 704 """Runs `code` on an engine.
713 705
714 706 Calls to this are load-balanced.
715 707
716 708 ``run`` is never `bound` (no effect on engine namespace)
717 709
718 710 Parameters
719 711 ----------
720 712 code : str
721 713 the code string to be executed
722 714 block : bool
723 715 whether or not to wait until done
724 716
725 717 """
726 718 result = self.apply(_execute, (code,), targets=None, block=block, bound=False)
727 719 return result
728 720
729 721 def _maybe_raise(self, result):
730 722 """wrapper for maybe raising an exception if apply failed."""
731 723 if isinstance(result, error.RemoteError):
732 724 raise result
733 725
734 726 return result
735 727
736 728 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
737 729 after=None, follow=None):
738 730 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
739 731
740 732 This is the central execution command for the client.
741 733
742 734 Parameters
743 735 ----------
744 736
745 737 f : function
746 738 The fuction to be called remotely
747 739 args : tuple/list
748 740 The positional arguments passed to `f`
749 741 kwargs : dict
750 742 The keyword arguments passed to `f`
751 743 bound : bool (default: True)
752 744 Whether to execute in the Engine(s) namespace, or in a clean
753 745 namespace not affecting the engine.
754 746 block : bool (default: self.block)
755 747 Whether to wait for the result, or return immediately.
756 748 False:
757 749 returns msg_id(s)
758 750 if multiple targets:
759 751 list of ids
760 752 True:
761 753 returns actual result(s) of f(*args, **kwargs)
762 754 if multiple targets:
763 755 dict of results, by engine ID
764 756 targets : int,list of ints, 'all', None
765 757 Specify the destination of the job.
766 758 if None:
767 759 Submit via Task queue for load-balancing.
768 760 if 'all':
769 761 Run on all active engines
770 762 if list:
771 763 Run on each specified engine
772 764 if int:
773 765 Run on single engine
774 766
775 767 after : Dependency or collection of msg_ids
776 768 Only for load-balanced execution (targets=None)
777 769 Specify a list of msg_ids as a time-based dependency.
778 770 This job will only be run *after* the dependencies
779 771 have been met.
780 772
781 773 follow : Dependency or collection of msg_ids
782 774 Only for load-balanced execution (targets=None)
783 775 Specify a list of msg_ids as a location-based dependency.
784 776 This job will only be run on an engine where this dependency
785 777 is met.
786 778
787 779 Returns
788 780 -------
789 781 if block is False:
790 782 if single target:
791 783 return msg_id
792 784 else:
793 785 return list of msg_ids
794 786 ? (should this be dict like block=True) ?
795 787 else:
796 788 if single target:
797 789 return result of f(*args, **kwargs)
798 790 else:
799 791 return dict of results, keyed by engine
800 792 """
801 793
802 794 # defaults:
803 795 block = block if block is not None else self.block
804 796 args = args if args is not None else []
805 797 kwargs = kwargs if kwargs is not None else {}
806 798
807 799 # enforce types of f,args,kwrags
808 800 if not callable(f):
809 801 raise TypeError("f must be callable, not %s"%type(f))
810 802 if not isinstance(args, (tuple, list)):
811 803 raise TypeError("args must be tuple or list, not %s"%type(args))
812 804 if not isinstance(kwargs, dict):
813 805 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
814 806
815 807 if isinstance(after, Dependency):
816 808 after = after.as_dict()
817 809 elif isinstance(after, AsyncResult):
818 810 after=after.msg_ids
819 811 elif after is None:
820 812 after = []
821 813 if isinstance(follow, Dependency):
822 814 follow = follow.as_dict()
823 815 elif isinstance(follow, AsyncResult):
824 816 follow=follow.msg_ids
825 817 elif follow is None:
826 818 follow = []
827 819 options = dict(bound=bound, block=block, after=after, follow=follow)
828 820
829 821 if targets is None:
830 822 return self._apply_balanced(f, args, kwargs, **options)
831 823 else:
832 824 return self._apply_direct(f, args, kwargs, targets=targets, **options)
833 825
834 826 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
835 827 after=None, follow=None):
836 828 """The underlying method for applying functions in a load balanced
837 829 manner, via the task queue."""
838 830
839 831 subheader = dict(after=after, follow=follow)
840 832 bufs = ss.pack_apply_message(f,args,kwargs)
841 833 content = dict(bound=bound)
842 834
843 835 msg = self.session.send(self._task_socket, "apply_request",
844 836 content=content, buffers=bufs, subheader=subheader)
845 837 msg_id = msg['msg_id']
846 838 self.outstanding.add(msg_id)
847 839 self.history.append(msg_id)
848 840 ar = AsyncResult(self, [msg_id], fname=f.__name__)
849 841 if block:
850 842 return ar.get()
851 843 else:
852 844 return ar
853 845
854 846 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
855 847 after=None, follow=None):
856 848 """Then underlying method for applying functions to specific engines
857 849 via the MUX queue."""
858 850
859 851 queues,targets = self._build_targets(targets)
860 852
861 853 subheader = dict(after=after, follow=follow)
862 854 content = dict(bound=bound)
863 855 bufs = ss.pack_apply_message(f,args,kwargs)
864 856
865 857 msg_ids = []
866 858 for queue in queues:
867 859 msg = self.session.send(self._mux_socket, "apply_request",
868 860 content=content, buffers=bufs,ident=queue, subheader=subheader)
869 861 msg_id = msg['msg_id']
870 862 self.outstanding.add(msg_id)
871 863 self.history.append(msg_id)
872 864 msg_ids.append(msg_id)
873 865 ar = AsyncResult(self, msg_ids, fname=f.__name__)
874 866 if block:
875 867 return ar.get()
876 868 else:
877 869 return ar
878 870
879 871 #--------------------------------------------------------------------------
880 872 # Map and decorators
881 873 #--------------------------------------------------------------------------
882 874
883 875 def map(self, f, *sequences):
884 876 """Parallel version of builtin `map`, using all our engines."""
885 877 pf = ParallelFunction(self, f, block=self.block,
886 878 bound=True, targets='all')
887 879 return pf.map(*sequences)
888 880
889 881 def parallel(self, bound=True, targets='all', block=True):
890 882 """Decorator for making a ParallelFunction."""
891 883 return parallel(self, bound=bound, targets=targets, block=block)
892 884
893 885 def remote(self, bound=True, targets='all', block=True):
894 886 """Decorator for making a RemoteFunction."""
895 887 return remote(self, bound=bound, targets=targets, block=block)
896 888
897 889 #--------------------------------------------------------------------------
898 890 # Data movement
899 891 #--------------------------------------------------------------------------
900 892
901 893 @defaultblock
902 894 def push(self, ns, targets='all', block=None):
903 895 """Push the contents of `ns` into the namespace on `target`"""
904 896 if not isinstance(ns, dict):
905 897 raise TypeError("Must be a dict, not %s"%type(ns))
906 898 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
907 899 return result
908 900
909 901 @defaultblock
910 902 def pull(self, keys, targets='all', block=None):
911 903 """Pull objects from `target`'s namespace by `keys`"""
912 904 if isinstance(keys, str):
913 905 pass
914 906 elif isinstance(keys, (list,tuple,set)):
915 907 for key in keys:
916 908 if not isinstance(key, str):
917 909 raise TypeError
918 910 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
919 911 return result
920 912
921 913 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
922 914 """
923 915 Partition a Python sequence and send the partitions to a set of engines.
924 916 """
925 917 block = block if block is not None else self.block
926 918 targets = self._build_targets(targets)[-1]
927 919 mapObject = Map.dists[dist]()
928 920 nparts = len(targets)
929 921 msg_ids = []
930 922 for index, engineid in enumerate(targets):
931 923 partition = mapObject.getPartition(seq, index, nparts)
932 924 if flatten and len(partition) == 1:
933 925 r = self.push({key: partition[0]}, targets=engineid, block=False)
934 926 else:
935 927 r = self.push({key: partition}, targets=engineid, block=False)
936 928 msg_ids.extend(r.msg_ids)
937 929 r = AsyncResult(self, msg_ids, fname='scatter')
938 930 if block:
939 931 return r.get()
940 932 else:
941 933 return r
942 934
943 935 def gather(self, key, dist='b', targets='all', block=None):
944 936 """
945 937 Gather a partitioned sequence on a set of engines as a single local seq.
946 938 """
947 939 block = block if block is not None else self.block
948 940
949 941 targets = self._build_targets(targets)[-1]
950 942 mapObject = Map.dists[dist]()
951 943 msg_ids = []
952 944 for index, engineid in enumerate(targets):
953 945 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
954 946
955 947 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
956 948 if block:
957 949 return r.get()
958 950 else:
959 951 return r
960 952
961 953 #--------------------------------------------------------------------------
962 954 # Query methods
963 955 #--------------------------------------------------------------------------
964 956
965 957 @spinfirst
966 958 def get_results(self, msg_ids, status_only=False):
967 959 """Returns the result of the execute or task request with `msg_ids`.
968 960
969 961 Parameters
970 962 ----------
971 963 msg_ids : list of ints or msg_ids
972 964 if int:
973 965 Passed as index to self.history for convenience.
974 966 status_only : bool (default: False)
975 967 if False:
976 968 return the actual results
977 969
978 970 Returns
979 971 -------
980 972
981 973 results : dict
982 974 There will always be the keys 'pending' and 'completed', which will
983 975 be lists of msg_ids.
984 976 """
985 977 if not isinstance(msg_ids, (list,tuple)):
986 978 msg_ids = [msg_ids]
987 979 theids = []
988 980 for msg_id in msg_ids:
989 981 if isinstance(msg_id, int):
990 982 msg_id = self.history[msg_id]
991 983 if not isinstance(msg_id, str):
992 984 raise TypeError("msg_ids must be str, not %r"%msg_id)
993 985 theids.append(msg_id)
994 986
995 987 completed = []
996 988 local_results = {}
997 989 # temporarily disable local shortcut
998 990 # for msg_id in list(theids):
999 991 # if msg_id in self.results:
1000 992 # completed.append(msg_id)
1001 993 # local_results[msg_id] = self.results[msg_id]
1002 994 # theids.remove(msg_id)
1003 995
1004 996 if theids: # some not locally cached
1005 997 content = dict(msg_ids=theids, status_only=status_only)
1006 998 msg = self.session.send(self._query_socket, "result_request", content=content)
1007 999 zmq.select([self._query_socket], [], [])
1008 1000 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1009 1001 if self.debug:
1010 1002 pprint(msg)
1011 1003 content = msg['content']
1012 1004 if content['status'] != 'ok':
1013 1005 raise ss.unwrap_exception(content)
1014 1006 buffers = msg['buffers']
1015 1007 else:
1016 1008 content = dict(completed=[],pending=[])
1017 1009
1018 1010 content['completed'].extend(completed)
1019 1011
1020 1012 if status_only:
1021 1013 return content
1022 1014
1023 1015 failures = []
1024 1016 # load cached results into result:
1025 1017 content.update(local_results)
1026 1018 # update cache with results:
1027 1019 for msg_id in sorted(theids):
1028 1020 if msg_id in content['completed']:
1029 1021 rec = content[msg_id]
1030 1022 parent = rec['header']
1031 1023 header = rec['result_header']
1032 1024 rcontent = rec['result_content']
1033 1025 iodict = rec['io']
1034 1026 if isinstance(rcontent, str):
1035 1027 rcontent = self.session.unpack(rcontent)
1036 1028
1037 1029 md = self.metadata.setdefault(msg_id, Metadata())
1038 1030 md.update(self._extract_metadata(header, parent, rcontent))
1039 1031 md.update(iodict)
1040 1032
1041 1033 if rcontent['status'] == 'ok':
1042 1034 res,buffers = ss.unserialize_object(buffers)
1043 1035 else:
1044 1036 res = ss.unwrap_exception(rcontent)
1045 1037 failures.append(res)
1046 1038
1047 1039 self.results[msg_id] = res
1048 1040 content[msg_id] = res
1049 1041
1050 1042 error.collect_exceptions(failures, "get_results")
1051 1043 return content
1052 1044
1053 1045 @spinfirst
1054 1046 def queue_status(self, targets=None, verbose=False):
1055 1047 """Fetch the status of engine queues.
1056 1048
1057 1049 Parameters
1058 1050 ----------
1059 1051 targets : int/str/list of ints/strs
1060 1052 the engines on which to execute
1061 1053 default : all
1062 1054 verbose : bool
1063 1055 Whether to return lengths only, or lists of ids for each element
1064 1056 """
1065 1057 targets = self._build_targets(targets)[1]
1066 1058 content = dict(targets=targets, verbose=verbose)
1067 1059 self.session.send(self._query_socket, "queue_request", content=content)
1068 1060 idents,msg = self.session.recv(self._query_socket, 0)
1069 1061 if self.debug:
1070 1062 pprint(msg)
1071 1063 content = msg['content']
1072 1064 status = content.pop('status')
1073 1065 if status != 'ok':
1074 1066 raise ss.unwrap_exception(content)
1075 1067 return ss.rekey(content)
1076 1068
1077 1069 @spinfirst
1078 1070 def purge_results(self, msg_ids=[], targets=[]):
1079 1071 """Tell the controller to forget results.
1080 1072
1081 1073 Individual results can be purged by msg_id, or the entire
1082 1074 history of specific targets can be purged.
1083 1075
1084 1076 Parameters
1085 1077 ----------
1086 1078 msg_ids : str or list of strs
1087 1079 the msg_ids whose results should be forgotten.
1088 1080 targets : int/str/list of ints/strs
1089 1081 The targets, by uuid or int_id, whose entire history is to be purged.
1090 1082 Use `targets='all'` to scrub everything from the controller's memory.
1091 1083
1092 1084 default : None
1093 1085 """
1094 1086 if not targets and not msg_ids:
1095 1087 raise ValueError
1096 1088 if targets:
1097 1089 targets = self._build_targets(targets)[1]
1098 1090 content = dict(targets=targets, msg_ids=msg_ids)
1099 1091 self.session.send(self._query_socket, "purge_request", content=content)
1100 1092 idents, msg = self.session.recv(self._query_socket, 0)
1101 1093 if self.debug:
1102 1094 pprint(msg)
1103 1095 content = msg['content']
1104 1096 if content['status'] != 'ok':
1105 1097 raise ss.unwrap_exception(content)
1106 1098
1107 1099 #----------------------------------------
1108 1100 # activate for %px,%autopx magics
1109 1101 #----------------------------------------
1110 1102 def activate(self):
1111 1103 """Make this `View` active for parallel magic commands.
1112 1104
1113 1105 IPython has a magic command syntax to work with `MultiEngineClient` objects.
1114 1106 In a given IPython session there is a single active one. While
1115 1107 there can be many `Views` created and used by the user,
1116 1108 there is only one active one. The active `View` is used whenever
1117 1109 the magic commands %px and %autopx are used.
1118 1110
1119 1111 The activate() method is called on a given `View` to make it
1120 1112 active. Once this has been done, the magic commands can be used.
1121 1113 """
1122 1114
1123 1115 try:
1124 1116 # This is injected into __builtins__.
1125 1117 ip = get_ipython()
1126 1118 except NameError:
1127 1119 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1128 1120 else:
1129 1121 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1130 1122 if pmagic is not None:
1131 1123 pmagic.active_multiengine_client = self
1132 1124 else:
1133 1125 print "You must first load the parallelmagic extension " \
1134 1126 "by doing '%load_ext parallelmagic'"
1135 1127
1136 1128 class AsynClient(Client):
1137 1129 """An Asynchronous client, using the Tornado Event Loop.
1138 1130 !!!unfinished!!!"""
1139 1131 io_loop = None
1140 1132 _queue_stream = None
1141 1133 _notifier_stream = None
1142 1134 _task_stream = None
1143 1135 _control_stream = None
1144 1136
1145 1137 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1146 1138 Client.__init__(self, addr, context, username, debug)
1147 1139 if io_loop is None:
1148 1140 io_loop = ioloop.IOLoop.instance()
1149 1141 self.io_loop = io_loop
1150 1142
1151 1143 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1152 1144 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1153 1145 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1154 1146 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1155 1147
1156 1148 def spin(self):
1157 1149 for stream in (self.queue_stream, self.notifier_stream,
1158 1150 self.task_stream, self.control_stream):
1159 1151 stream.flush()
1160 1152
1161 1153 __all__ = [ 'Client',
1162 1154 'depend',
1163 1155 'require',
1164 1156 'remote',
1165 1157 'parallel',
1166 1158 'RemoteFunction',
1167 1159 'ParallelFunction',
1168 1160 'DirectView',
1169 1161 'LoadBalancedView',
1170 1162 'AsyncResult',
1171 1163 'AsyncMapResult'
1172 1164 ]
@@ -1,549 +1,537
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython cluster directory
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import os
21 21 import shutil
22 22 import sys
23 23 import logging
24 24 import warnings
25 25
26 26 from IPython.config.loader import PyFileConfigLoader
27 27 from IPython.core.application import Application, BaseAppConfigLoader
28 28 from IPython.config.configurable import Configurable
29 29 from IPython.core.crashhandler import CrashHandler
30 30 from IPython.core import release
31 31 from IPython.utils.path import (
32 32 get_ipython_package_dir,
33 33 expand_path
34 34 )
35 35 from IPython.utils.traitlets import Unicode
36 36
37 37 #-----------------------------------------------------------------------------
38 # Warnings control
39 #-----------------------------------------------------------------------------
40 # Twisted generates annoying warnings with Python 2.6, as will do other code
41 # that imports 'sets' as of today
42 warnings.filterwarnings('ignore', 'the sets module is deprecated',
43 DeprecationWarning )
44
45 # This one also comes from Twisted
46 warnings.filterwarnings('ignore', 'the sha module is deprecated',
47 DeprecationWarning)
48
49 #-----------------------------------------------------------------------------
50 38 # Module errors
51 39 #-----------------------------------------------------------------------------
52 40
53 41 class ClusterDirError(Exception):
54 42 pass
55 43
56 44
57 45 class PIDFileError(Exception):
58 46 pass
59 47
60 48
61 49 #-----------------------------------------------------------------------------
62 50 # Class for managing cluster directories
63 51 #-----------------------------------------------------------------------------
64 52
65 53 class ClusterDir(Configurable):
66 54 """An object to manage the cluster directory and its resources.
67 55
68 56 The cluster directory is used by :command:`ipengine`,
69 57 :command:`ipcontroller` and :command:`ipclsuter` to manage the
70 58 configuration, logging and security of these applications.
71 59
72 60 This object knows how to find, create and manage these directories. This
73 61 should be used by any code that want's to handle cluster directories.
74 62 """
75 63
76 64 security_dir_name = Unicode('security')
77 65 log_dir_name = Unicode('log')
78 66 pid_dir_name = Unicode('pid')
79 67 security_dir = Unicode(u'')
80 68 log_dir = Unicode(u'')
81 69 pid_dir = Unicode(u'')
82 70 location = Unicode(u'')
83 71
84 72 def __init__(self, location=u''):
85 73 super(ClusterDir, self).__init__(location=location)
86 74
87 75 def _location_changed(self, name, old, new):
88 76 if not os.path.isdir(new):
89 77 os.makedirs(new)
90 78 self.security_dir = os.path.join(new, self.security_dir_name)
91 79 self.log_dir = os.path.join(new, self.log_dir_name)
92 80 self.pid_dir = os.path.join(new, self.pid_dir_name)
93 81 self.check_dirs()
94 82
95 83 def _log_dir_changed(self, name, old, new):
96 84 self.check_log_dir()
97 85
98 86 def check_log_dir(self):
99 87 if not os.path.isdir(self.log_dir):
100 88 os.mkdir(self.log_dir)
101 89
102 90 def _security_dir_changed(self, name, old, new):
103 91 self.check_security_dir()
104 92
105 93 def check_security_dir(self):
106 94 if not os.path.isdir(self.security_dir):
107 95 os.mkdir(self.security_dir, 0700)
108 96 os.chmod(self.security_dir, 0700)
109 97
110 98 def _pid_dir_changed(self, name, old, new):
111 99 self.check_pid_dir()
112 100
113 101 def check_pid_dir(self):
114 102 if not os.path.isdir(self.pid_dir):
115 103 os.mkdir(self.pid_dir, 0700)
116 104 os.chmod(self.pid_dir, 0700)
117 105
118 106 def check_dirs(self):
119 107 self.check_security_dir()
120 108 self.check_log_dir()
121 109 self.check_pid_dir()
122 110
123 111 def load_config_file(self, filename):
124 112 """Load a config file from the top level of the cluster dir.
125 113
126 114 Parameters
127 115 ----------
128 116 filename : unicode or str
129 117 The filename only of the config file that must be located in
130 118 the top-level of the cluster directory.
131 119 """
132 120 loader = PyFileConfigLoader(filename, self.location)
133 121 return loader.load_config()
134 122
135 123 def copy_config_file(self, config_file, path=None, overwrite=False):
136 124 """Copy a default config file into the active cluster directory.
137 125
138 126 Default configuration files are kept in :mod:`IPython.config.default`.
139 127 This function moves these from that location to the working cluster
140 128 directory.
141 129 """
142 130 if path is None:
143 131 import IPython.config.default
144 132 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
145 133 path = os.path.sep.join(path)
146 134 src = os.path.join(path, config_file)
147 135 dst = os.path.join(self.location, config_file)
148 136 if not os.path.isfile(dst) or overwrite:
149 137 shutil.copy(src, dst)
150 138
151 139 def copy_all_config_files(self, path=None, overwrite=False):
152 140 """Copy all config files into the active cluster directory."""
153 141 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
154 142 u'ipcluster_config.py']:
155 143 self.copy_config_file(f, path=path, overwrite=overwrite)
156 144
157 145 @classmethod
158 146 def create_cluster_dir(csl, cluster_dir):
159 147 """Create a new cluster directory given a full path.
160 148
161 149 Parameters
162 150 ----------
163 151 cluster_dir : str
164 152 The full path to the cluster directory. If it does exist, it will
165 153 be used. If not, it will be created.
166 154 """
167 155 return ClusterDir(location=cluster_dir)
168 156
169 157 @classmethod
170 158 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
171 159 """Create a cluster dir by profile name and path.
172 160
173 161 Parameters
174 162 ----------
175 163 path : str
176 164 The path (directory) to put the cluster directory in.
177 165 profile : str
178 166 The name of the profile. The name of the cluster directory will
179 167 be "clusterz_<profile>".
180 168 """
181 169 if not os.path.isdir(path):
182 170 raise ClusterDirError('Directory not found: %s' % path)
183 171 cluster_dir = os.path.join(path, u'clusterz_' + profile)
184 172 return ClusterDir(location=cluster_dir)
185 173
186 174 @classmethod
187 175 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
188 176 """Find an existing cluster dir by profile name, return its ClusterDir.
189 177
190 178 This searches through a sequence of paths for a cluster dir. If it
191 179 is not found, a :class:`ClusterDirError` exception will be raised.
192 180
193 181 The search path algorithm is:
194 182 1. ``os.getcwd()``
195 183 2. ``ipython_dir``
196 184 3. The directories found in the ":" separated
197 185 :env:`IPCLUSTER_DIR_PATH` environment variable.
198 186
199 187 Parameters
200 188 ----------
201 189 ipython_dir : unicode or str
202 190 The IPython directory to use.
203 191 profile : unicode or str
204 192 The name of the profile. The name of the cluster directory
205 193 will be "clusterz_<profile>".
206 194 """
207 195 dirname = u'clusterz_' + profile
208 196 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
209 197 if cluster_dir_paths:
210 198 cluster_dir_paths = cluster_dir_paths.split(':')
211 199 else:
212 200 cluster_dir_paths = []
213 201 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
214 202 for p in paths:
215 203 cluster_dir = os.path.join(p, dirname)
216 204 if os.path.isdir(cluster_dir):
217 205 return ClusterDir(location=cluster_dir)
218 206 else:
219 207 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
220 208
221 209 @classmethod
222 210 def find_cluster_dir(cls, cluster_dir):
223 211 """Find/create a cluster dir and return its ClusterDir.
224 212
225 213 This will create the cluster directory if it doesn't exist.
226 214
227 215 Parameters
228 216 ----------
229 217 cluster_dir : unicode or str
230 218 The path of the cluster directory. This is expanded using
231 219 :func:`IPython.utils.genutils.expand_path`.
232 220 """
233 221 cluster_dir = expand_path(cluster_dir)
234 222 if not os.path.isdir(cluster_dir):
235 223 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
236 224 return ClusterDir(location=cluster_dir)
237 225
238 226
239 227 #-----------------------------------------------------------------------------
240 228 # Command line options
241 229 #-----------------------------------------------------------------------------
242 230
243 231 class ClusterDirConfigLoader(BaseAppConfigLoader):
244 232
245 233 def _add_cluster_profile(self, parser):
246 234 paa = parser.add_argument
247 235 paa('-p', '--profile',
248 236 dest='Global.profile',type=unicode,
249 237 help=
250 238 """The string name of the profile to be used. This determines the name
251 239 of the cluster dir as: cluster_<profile>. The default profile is named
252 240 'default'. The cluster directory is resolve this way if the
253 241 --cluster-dir option is not used.""",
254 242 metavar='Global.profile')
255 243
256 244 def _add_cluster_dir(self, parser):
257 245 paa = parser.add_argument
258 246 paa('--cluster-dir',
259 247 dest='Global.cluster_dir',type=unicode,
260 248 help="""Set the cluster dir. This overrides the logic used by the
261 249 --profile option.""",
262 250 metavar='Global.cluster_dir')
263 251
264 252 def _add_work_dir(self, parser):
265 253 paa = parser.add_argument
266 254 paa('--work-dir',
267 255 dest='Global.work_dir',type=unicode,
268 256 help='Set the working dir for the process.',
269 257 metavar='Global.work_dir')
270 258
271 259 def _add_clean_logs(self, parser):
272 260 paa = parser.add_argument
273 261 paa('--clean-logs',
274 262 dest='Global.clean_logs', action='store_true',
275 263 help='Delete old log flies before starting.')
276 264
277 265 def _add_no_clean_logs(self, parser):
278 266 paa = parser.add_argument
279 267 paa('--no-clean-logs',
280 268 dest='Global.clean_logs', action='store_false',
281 269 help="Don't Delete old log flies before starting.")
282 270
283 271 def _add_arguments(self):
284 272 super(ClusterDirConfigLoader, self)._add_arguments()
285 273 self._add_cluster_profile(self.parser)
286 274 self._add_cluster_dir(self.parser)
287 275 self._add_work_dir(self.parser)
288 276 self._add_clean_logs(self.parser)
289 277 self._add_no_clean_logs(self.parser)
290 278
291 279
292 280 #-----------------------------------------------------------------------------
293 281 # Crash handler for this application
294 282 #-----------------------------------------------------------------------------
295 283
296 284
297 285 _message_template = """\
298 286 Oops, $self.app_name crashed. We do our best to make it stable, but...
299 287
300 288 A crash report was automatically generated with the following information:
301 289 - A verbatim copy of the crash traceback.
302 290 - Data on your current $self.app_name configuration.
303 291
304 292 It was left in the file named:
305 293 \t'$self.crash_report_fname'
306 294 If you can email this file to the developers, the information in it will help
307 295 them in understanding and correcting the problem.
308 296
309 297 You can mail it to: $self.contact_name at $self.contact_email
310 298 with the subject '$self.app_name Crash Report'.
311 299
312 300 If you want to do it now, the following command will work (under Unix):
313 301 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
314 302
315 303 To ensure accurate tracking of this issue, please file a report about it at:
316 304 $self.bug_tracker
317 305 """
318 306
319 307 class ClusterDirCrashHandler(CrashHandler):
320 308 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
321 309
322 310 message_template = _message_template
323 311
324 312 def __init__(self, app):
325 313 contact_name = release.authors['Brian'][0]
326 314 contact_email = release.authors['Brian'][1]
327 315 bug_tracker = 'http://github.com/ipython/ipython/issues'
328 316 super(ClusterDirCrashHandler,self).__init__(
329 317 app, contact_name, contact_email, bug_tracker
330 318 )
331 319
332 320
333 321 #-----------------------------------------------------------------------------
334 322 # Main application
335 323 #-----------------------------------------------------------------------------
336 324
337 325 class ApplicationWithClusterDir(Application):
338 326 """An application that puts everything into a cluster directory.
339 327
340 328 Instead of looking for things in the ipython_dir, this type of application
341 329 will use its own private directory called the "cluster directory"
342 330 for things like config files, log files, etc.
343 331
344 332 The cluster directory is resolved as follows:
345 333
346 334 * If the ``--cluster-dir`` option is given, it is used.
347 335 * If ``--cluster-dir`` is not given, the application directory is
348 336 resolve using the profile name as ``cluster_<profile>``. The search
349 337 path for this directory is then i) cwd if it is found there
350 338 and ii) in ipython_dir otherwise.
351 339
352 340 The config file for the application is to be put in the cluster
353 341 dir and named the value of the ``config_file_name`` class attribute.
354 342 """
355 343
356 344 command_line_loader = ClusterDirConfigLoader
357 345 crash_handler_class = ClusterDirCrashHandler
358 346 auto_create_cluster_dir = True
359 347 # temporarily override default_log_level to DEBUG
360 348 default_log_level = logging.DEBUG
361 349
362 350 def create_default_config(self):
363 351 super(ApplicationWithClusterDir, self).create_default_config()
364 352 self.default_config.Global.profile = u'default'
365 353 self.default_config.Global.cluster_dir = u''
366 354 self.default_config.Global.work_dir = os.getcwd()
367 355 self.default_config.Global.log_to_file = False
368 356 self.default_config.Global.log_url = None
369 357 self.default_config.Global.clean_logs = False
370 358
371 359 def find_resources(self):
372 360 """This resolves the cluster directory.
373 361
374 362 This tries to find the cluster directory and if successful, it will
375 363 have done:
376 364 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
377 365 the application.
378 366 * Sets ``self.cluster_dir`` attribute of the application and config
379 367 objects.
380 368
381 369 The algorithm used for this is as follows:
382 370 1. Try ``Global.cluster_dir``.
383 371 2. Try using ``Global.profile``.
384 372 3. If both of these fail and ``self.auto_create_cluster_dir`` is
385 373 ``True``, then create the new cluster dir in the IPython directory.
386 374 4. If all fails, then raise :class:`ClusterDirError`.
387 375 """
388 376
389 377 try:
390 378 cluster_dir = self.command_line_config.Global.cluster_dir
391 379 except AttributeError:
392 380 cluster_dir = self.default_config.Global.cluster_dir
393 381 cluster_dir = expand_path(cluster_dir)
394 382 try:
395 383 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
396 384 except ClusterDirError:
397 385 pass
398 386 else:
399 387 self.log.info('Using existing cluster dir: %s' % \
400 388 self.cluster_dir_obj.location
401 389 )
402 390 self.finish_cluster_dir()
403 391 return
404 392
405 393 try:
406 394 self.profile = self.command_line_config.Global.profile
407 395 except AttributeError:
408 396 self.profile = self.default_config.Global.profile
409 397 try:
410 398 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
411 399 self.ipython_dir, self.profile)
412 400 except ClusterDirError:
413 401 pass
414 402 else:
415 403 self.log.info('Using existing cluster dir: %s' % \
416 404 self.cluster_dir_obj.location
417 405 )
418 406 self.finish_cluster_dir()
419 407 return
420 408
421 409 if self.auto_create_cluster_dir:
422 410 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
423 411 self.ipython_dir, self.profile
424 412 )
425 413 self.log.info('Creating new cluster dir: %s' % \
426 414 self.cluster_dir_obj.location
427 415 )
428 416 self.finish_cluster_dir()
429 417 else:
430 418 raise ClusterDirError('Could not find a valid cluster directory.')
431 419
432 420 def finish_cluster_dir(self):
433 421 # Set the cluster directory
434 422 self.cluster_dir = self.cluster_dir_obj.location
435 423
436 424 # These have to be set because they could be different from the one
437 425 # that we just computed. Because command line has the highest
438 426 # priority, this will always end up in the master_config.
439 427 self.default_config.Global.cluster_dir = self.cluster_dir
440 428 self.command_line_config.Global.cluster_dir = self.cluster_dir
441 429
442 430 def find_config_file_name(self):
443 431 """Find the config file name for this application."""
444 432 # For this type of Application it should be set as a class attribute.
445 433 if not hasattr(self, 'default_config_file_name'):
446 434 self.log.critical("No config filename found")
447 435 else:
448 436 self.config_file_name = self.default_config_file_name
449 437
450 438 def find_config_file_paths(self):
451 439 # Set the search path to to the cluster directory. We should NOT
452 440 # include IPython.config.default here as the default config files
453 441 # are ALWAYS automatically moved to the cluster directory.
454 442 conf_dir = os.path.join(get_ipython_package_dir(), 'config', 'default')
455 443 self.config_file_paths = (self.cluster_dir,)
456 444
457 445 def pre_construct(self):
458 446 # The log and security dirs were set earlier, but here we put them
459 447 # into the config and log them.
460 448 config = self.master_config
461 449 sdir = self.cluster_dir_obj.security_dir
462 450 self.security_dir = config.Global.security_dir = sdir
463 451 ldir = self.cluster_dir_obj.log_dir
464 452 self.log_dir = config.Global.log_dir = ldir
465 453 pdir = self.cluster_dir_obj.pid_dir
466 454 self.pid_dir = config.Global.pid_dir = pdir
467 455 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
468 456 config.Global.work_dir = unicode(expand_path(config.Global.work_dir))
469 457 # Change to the working directory. We do this just before construct
470 458 # is called so all the components there have the right working dir.
471 459 self.to_work_dir()
472 460
473 461 def to_work_dir(self):
474 462 wd = self.master_config.Global.work_dir
475 463 if unicode(wd) != unicode(os.getcwd()):
476 464 os.chdir(wd)
477 465 self.log.info("Changing to working dir: %s" % wd)
478 466
479 467 def start_logging(self):
480 468 # Remove old log files
481 469 if self.master_config.Global.clean_logs:
482 470 log_dir = self.master_config.Global.log_dir
483 471 for f in os.listdir(log_dir):
484 472 if f.startswith(self.name + u'-') and f.endswith('.log'):
485 473 os.remove(os.path.join(log_dir, f))
486 474 # Start logging to the new log file
487 475 if self.master_config.Global.log_to_file:
488 476 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
489 477 logfile = os.path.join(self.log_dir, log_filename)
490 478 open_log_file = open(logfile, 'w')
491 479 elif self.master_config.Global.log_url:
492 480 open_log_file = None
493 481 else:
494 482 open_log_file = sys.stdout
495 483 logger = logging.getLogger()
496 484 level = self.log_level
497 485 self.log = logger
498 486 # since we've reconnected the logger, we need to reconnect the log-level
499 487 self.log_level = level
500 488 if open_log_file is not None and self._log_handler not in self.log.handlers:
501 489 self.log.addHandler(self._log_handler)
502 490 # log.startLogging(open_log_file)
503 491
504 492 def write_pid_file(self, overwrite=False):
505 493 """Create a .pid file in the pid_dir with my pid.
506 494
507 495 This must be called after pre_construct, which sets `self.pid_dir`.
508 496 This raises :exc:`PIDFileError` if the pid file exists already.
509 497 """
510 498 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
511 499 if os.path.isfile(pid_file):
512 500 pid = self.get_pid_from_file()
513 501 if not overwrite:
514 502 raise PIDFileError(
515 503 'The pid file [%s] already exists. \nThis could mean that this '
516 504 'server is already running with [pid=%s].' % (pid_file, pid)
517 505 )
518 506 with open(pid_file, 'w') as f:
519 507 self.log.info("Creating pid file: %s" % pid_file)
520 508 f.write(repr(os.getpid())+'\n')
521 509
522 510 def remove_pid_file(self):
523 511 """Remove the pid file.
524 512
525 513 This should be called at shutdown by registering a callback with
526 514 :func:`reactor.addSystemEventTrigger`. This needs to return
527 515 ``None``.
528 516 """
529 517 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
530 518 if os.path.isfile(pid_file):
531 519 try:
532 520 self.log.info("Removing pid file: %s" % pid_file)
533 521 os.remove(pid_file)
534 522 except:
535 523 self.log.warn("Error removing the pid file: %s" % pid_file)
536 524
537 525 def get_pid_from_file(self):
538 526 """Get the pid from the pid file.
539 527
540 528 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
541 529 """
542 530 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
543 531 if os.path.isfile(pid_file):
544 532 with open(pid_file, 'r') as f:
545 533 pid = int(f.read().strip())
546 534 return pid
547 535 else:
548 536 raise PIDFileError('pid file not found: %s' % pid_file)
549 537
@@ -1,345 +1,110
1 1 #!/usr/bin/env python
2 2 """The IPython Controller with 0MQ
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
3 This is a collection of one Hub and several Schedulers.
5 4 """
6 5 #-----------------------------------------------------------------------------
7 6 # Copyright (C) 2010 The IPython Development Team
8 7 #
9 8 # Distributed under the terms of the BSD License. The full license is in
10 9 # the file COPYING, distributed as part of this software.
11 10 #-----------------------------------------------------------------------------
12 11
13 12 #-----------------------------------------------------------------------------
14 13 # Imports
15 14 #-----------------------------------------------------------------------------
16 15 from __future__ import print_function
17 16
18 import os
19 import sys
20 import time
21 17 import logging
22 18 from multiprocessing import Process
23 19
24 20 import zmq
25 from zmq.eventloop import ioloop
26 from zmq.eventloop.zmqstream import ZMQStream
27 # from zmq.devices import ProcessMonitoredQueue
28 21
29 22 # internal:
30 23 from IPython.utils.importstring import import_item
31 24 from IPython.utils.traitlets import Int, Str, Instance, List, Bool
32 from IPython.zmq.entry_point import bind_port
33 25
34 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
35 connect_logger, parse_url, signal_children, generate_exec_key,
36 local_logger)
26 from entry_point import signal_children
37 27
38 28
39 import streamsession as session
40 import heartmonitor
41 29 from scheduler import launch_scheduler
42 30 from hub import Hub, HubFactory
43 31
44 from dictdb import DictDB
45 try:
46 import pymongo
47 except ImportError:
48 MongoDB=None
49 else:
50 from mongodb import MongoDB
51
52 #-------------------------------------------------------------------------
53 # Entry Point
54 #-------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33 # Configurable
34 #-----------------------------------------------------------------------------
55 35
56 def make_argument_parser():
57 """Make an argument parser"""
58 parser = make_base_argument_parser()
59
60 parser.add_argument('--client', type=int, metavar='PORT', default=0,
61 help='set the XREP port for clients [default: random]')
62 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
63 help='set the PUB socket for registration notification [default: random]')
64 parser.add_argument('--hb', type=str, metavar='PORTS',
65 help='set the 2 ports for heartbeats [default: random]')
66 parser.add_argument('--ping', type=int, default=100,
67 help='set the heartbeat period in ms [default: 100]')
68 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
69 help='set the SUB port for queue monitoring [default: random]')
70 parser.add_argument('--mux', type=str, metavar='PORTS',
71 help='set the XREP ports for the MUX queue [default: random]')
72 parser.add_argument('--task', type=str, metavar='PORTS',
73 help='set the XREP/XREQ ports for the task queue [default: random]')
74 parser.add_argument('--control', type=str, metavar='PORTS',
75 help='set the XREP ports for the control queue [default: random]')
76 parser.add_argument('--iopub', type=str, metavar='PORTS',
77 help='set the PUB/SUB ports for the iopub relay [default: random]')
78 parser.add_argument('--scheduler', type=str, default='lru',
79 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
80 help='select the task scheduler [default: Python LRU]')
81 parser.add_argument('--mongodb', action='store_true',
82 help='Use MongoDB task storage [default: in-memory]')
83 parser.add_argument('--session', type=str, default=None,
84 help='Manually specify the session id.')
85
86 return parser
87 36
88 37 class ControllerFactory(HubFactory):
89 38 """Configurable for setting up a Hub and Schedulers."""
90 39
91 40 scheme = Str('pure', config=True)
92 41 usethreads = Bool(False, config=True)
93 42
94 43 # internal
95 44 children = List()
96 45 mq_class = Str('zmq.devices.ProcessMonitoredQueue')
97 46
98 47 def _update_mq(self):
99 48 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if self.usethreads else 'Process')
100 49
101 50 def __init__(self, **kwargs):
102 51 super(ControllerFactory, self).__init__(**kwargs)
103 52 self.subconstructors.append(self.construct_schedulers)
104 53 self._update_mq()
105 54 self.on_trait_change(self._update_mq, 'usethreads')
106 55
107 56 def start(self):
108 57 super(ControllerFactory, self).start()
109 58 for child in self.children:
110 59 child.start()
111 60 if not self.usethreads:
112 61 signal_children([ getattr(c, 'launcher', c) for c in self.children ])
113 62
114 63
115 64 def construct_schedulers(self):
116 65 children = self.children
117 66 mq = import_item(self.mq_class)
118 67
119 68 # IOPub relay (in a Process)
120 69 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
121 70 q.bind_in(self.client_addrs['iopub'])
122 71 q.bind_out(self.engine_addrs['iopub'])
123 72 q.setsockopt_out(zmq.SUBSCRIBE, '')
124 73 q.connect_mon(self.monitor_url)
125 74 q.daemon=True
126 75 children.append(q)
127 76
128 77 # Multiplexer Queue (in a Process)
129 78 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
130 79 q.bind_in(self.client_addrs['mux'])
131 80 q.bind_out(self.engine_addrs['mux'])
132 81 q.connect_mon(self.monitor_url)
133 82 q.daemon=True
134 83 children.append(q)
135 84
136 85 # Control Queue (in a Process)
137 86 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
138 87 q.bind_in(self.client_addrs['control'])
139 88 q.bind_out(self.engine_addrs['control'])
140 89 q.connect_mon(self.monitor_url)
141 90 q.daemon=True
142 91 children.append(q)
143 92 # Task Queue (in a Process)
144 93 if self.scheme == 'pure':
145 94 logging.warn("task::using pure XREQ Task scheduler")
146 95 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
147 96 q.bind_in(self.client_addrs['task'])
148 97 q.bind_out(self.engine_addrs['task'])
149 98 q.connect_mon(self.monitor_url)
150 99 q.daemon=True
151 100 children.append(q)
152 101 elif self.scheme == 'none':
153 102 logging.warn("task::using no Task scheduler")
154 103
155 104 else:
156 105 logging.warn("task::using Python %s Task scheduler"%self.scheme)
157 106 sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
158 107 q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme))
159 108 q.daemon=True
160 109 children.append(q)
161
162
163 def main(argv=None):
164 """DO NOT USE ME ANYMORE"""
165
166 parser = make_argument_parser()
167
168 args = parser.parse_args(argv)
169 parse_url(args)
170
171 iface="%s://%s"%(args.transport,args.ip)+':%i'
172
173 random_ports = 0
174 if args.hb:
175 hb = split_ports(args.hb, 2)
176 else:
177 hb = select_random_ports(2)
178 if args.mux:
179 mux = split_ports(args.mux, 2)
180 else:
181 mux = None
182 random_ports += 2
183 if args.iopub:
184 iopub = split_ports(args.iopub, 2)
185 else:
186 iopub = None
187 random_ports += 2
188 if args.task:
189 task = split_ports(args.task, 2)
190 else:
191 task = None
192 random_ports += 2
193 if args.control:
194 control = split_ports(args.control, 2)
195 else:
196 control = None
197 random_ports += 2
198
199 ctx = zmq.Context()
200 loop = ioloop.IOLoop.instance()
201
202
203 # Registrar socket
204 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
205 regport = bind_port(reg, args.ip, args.regport)
206
207 ### Engine connections ###
208
209 # heartbeat
210 hpub = ctx.socket(zmq.PUB)
211 bind_port(hpub, args.ip, hb[0])
212 hrep = ctx.socket(zmq.XREP)
213 bind_port(hrep, args.ip, hb[1])
214
215 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
216 hmon.start()
217
218 ### Client connections ###
219 # Clientele socket
220 c = ZMQStream(ctx.socket(zmq.XREP), loop)
221 cport = bind_port(c, args.ip, args.client)
222 # Notifier socket
223 n = ZMQStream(ctx.socket(zmq.PUB), loop)
224 nport = bind_port(n, args.ip, args.notice)
225
226 ### Key File ###
227 if args.execkey and not os.path.isfile(args.execkey):
228 generate_exec_key(args.execkey)
229
230 thesession = session.StreamSession(username=args.ident or "controller",
231 keyfile=args.execkey, session=args.session)
232
233 ### build and launch the queues ###
234
235 # monitor socket
236 sub = ctx.socket(zmq.SUB)
237 sub.setsockopt(zmq.SUBSCRIBE, "")
238 monport = bind_port(sub, args.ip, args.monitor)
239 sub = ZMQStream(sub, loop)
240
241 ports = select_random_ports(random_ports)
242 children = []
243
244 # IOPub relay (in a Process)
245 if not iopub:
246 iopub = (ports.pop(),ports.pop())
247 q = ProcessMonitoredQueue(zmq.SUB, zmq.PUB, zmq.PUB, 'iopub', 'N/A')
248 q.bind_in(iface%iopub[1])
249 q.bind_out(iface%iopub[0])
250 q.setsockopt_in(zmq.SUBSCRIBE, '')
251 q.connect_mon(iface%monport)
252 q.daemon=True
253 q.start()
254 children.append(q.launcher)
255
256 # Multiplexer Queue (in a Process)
257 if not mux:
258 mux = (ports.pop(),ports.pop())
259 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
260 q.bind_in(iface%mux[0])
261 q.bind_out(iface%mux[1])
262 q.connect_mon(iface%monport)
263 q.daemon=True
264 q.start()
265 children.append(q.launcher)
266
267 # Control Queue (in a Process)
268 if not control:
269 control = (ports.pop(),ports.pop())
270 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
271 q.bind_in(iface%control[0])
272 q.bind_out(iface%control[1])
273 q.connect_mon(iface%monport)
274 q.daemon=True
275 q.start()
276 children.append(q.launcher)
277 # Task Queue (in a Process)
278 if not task:
279 task = (ports.pop(),ports.pop())
280 if args.scheduler == 'pure':
281 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
282 q.bind_in(iface%task[0])
283 q.bind_out(iface%task[1])
284 q.connect_mon(iface%monport)
285 q.daemon=True
286 q.start()
287 children.append(q.launcher)
288 else:
289 log_addr = iface%args.logport if args.logport else None
290 sargs = (iface%task[0], iface%task[1], iface%monport, iface%nport,
291 log_addr, args.loglevel, args.scheduler)
292 print (sargs)
293 q = Process(target=launch_scheduler, args=sargs)
294 q.daemon=True
295 q.start()
296 children.append(q)
297
298 if args.mongodb:
299 from mongodb import MongoDB
300 db = MongoDB(thesession.session)
301 else:
302 db = DictDB()
303 time.sleep(.25)
304
305 # build connection dicts
306 engine_addrs = {
307 'control' : iface%control[1],
308 'mux': iface%mux[1],
309 'heartbeat': (iface%hb[0], iface%hb[1]),
310 'task' : iface%task[1],
311 'iopub' : iface%iopub[1],
312 'monitor' : iface%monport,
313 }
314
315 client_addrs = {
316 'control' : iface%control[0],
317 'query': iface%cport,
318 'mux': iface%mux[0],
319 'task' : iface%task[0],
320 'iopub' : iface%iopub[0],
321 'notification': iface%nport
322 }
323
324 # setup logging
325 if args.logport:
326 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
327 else:
328 local_logger(args.loglevel)
329
330 # register relay of signals to the children
331 signal_children(children)
332 hub = Hub(loop=loop, session=thesession, monitor=sub, heartmonitor=hmon,
333 registrar=reg, clientele=c, notifier=n, db=db,
334 engine_addrs=engine_addrs, client_addrs=client_addrs)
335
336 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
337 dc.start()
338 try:
339 loop.start()
340 except KeyboardInterrupt:
341 print ("interrupted, exiting...", file=sys.__stderr__)
342
343 110
344 if __name__ == '__main__':
345 main()
@@ -1,190 +1,141
1 1 #!/usr/bin/env python
2 2 """A simple engine that talks to a controller over 0MQ.
3 3 it handles registration, etc. and launches a kernel
4 4 connected to the Controller's queue(s).
5 5 """
6 6 from __future__ import print_function
7 7 import sys
8 8 import time
9 import traceback
10 9 import uuid
11 10 import logging
12 11 from pprint import pprint
13 12
14 13 import zmq
15 14 from zmq.eventloop import ioloop, zmqstream
16 15
17 16 # internal
18 17 from IPython.config.configurable import Configurable
19 18 from IPython.utils.traitlets import Instance, Str, Dict, Int, Type
20 19 # from IPython.utils.localinterfaces import LOCALHOST
21 20
22 21 from factory import RegistrationFactory
23 22
24 from streamsession import Message, StreamSession
25 from streamkernel import Kernel, make_kernel
23 from streamsession import Message
24 from streamkernel import Kernel
26 25 import heartmonitor
27 from entry_point import (make_base_argument_parser, connect_engine_logger, parse_url,
28 local_logger)
29 # import taskthread
30 26
31 27 def printer(*msg):
32 28 # print (logging.handlers, file=sys.__stdout__)
33 29 logging.info(str(msg))
34 30
35 31 class EngineFactory(RegistrationFactory):
36 32 """IPython engine"""
37 33
38 34 # configurables:
39 35 user_ns=Dict(config=True)
40 36 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True)
41 37 display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True)
42 38
43 39 # not configurable:
44 40 id=Int(allow_none=True)
45 41 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
46 42 kernel=Instance(Kernel)
47 43
48 44
49 45 def __init__(self, **kwargs):
50 46 super(EngineFactory, self).__init__(**kwargs)
51 47 ctx = self.context
52 48
53 49 reg = ctx.socket(zmq.PAIR)
54 50 reg.setsockopt(zmq.IDENTITY, self.ident)
55 51 reg.connect(self.url)
56 52 self.registrar = zmqstream.ZMQStream(reg, self.loop)
57 53
58 54 def register(self):
59 55 """send the registration_request"""
60 56
61 57 logging.info("registering")
62 58 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
63 59 self.registrar.on_recv(self.complete_registration)
64 60 # print (self.session.key)
65 61 self.session.send(self.registrar, "registration_request",content=content)
66 62
67 63 def complete_registration(self, msg):
68 64 # print msg
69 65 ctx = self.context
70 66 loop = self.loop
71 67 identity = self.ident
72 68 print (identity)
73 69
74 70 idents,msg = self.session.feed_identities(msg)
75 71 msg = Message(self.session.unpack_message(msg))
76 72
77 73 if msg.content.status == 'ok':
78 74 self.id = int(msg.content.id)
79 75
80 76 # create Shell Streams (MUX, Task, etc.):
81 77 queue_addr = msg.content.mux
82 78 shell_addrs = [ str(queue_addr) ]
83 79 task_addr = msg.content.task
84 80 if task_addr:
85 81 shell_addrs.append(str(task_addr))
86 82 shell_streams = []
87 83 for addr in shell_addrs:
88 84 stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop)
89 85 stream.setsockopt(zmq.IDENTITY, identity)
90 86 stream.connect(addr)
91 87 shell_streams.append(stream)
92 88
93 89 # control stream:
94 90 control_addr = str(msg.content.control)
95 91 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop)
96 92 control_stream.setsockopt(zmq.IDENTITY, identity)
97 93 control_stream.connect(control_addr)
98 94
99 95 # create iopub stream:
100 96 iopub_addr = msg.content.iopub
101 97 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
102 98 iopub_stream.setsockopt(zmq.IDENTITY, identity)
103 99 iopub_stream.connect(iopub_addr)
104 100
105 101 # launch heartbeat
106 102 hb_addrs = msg.content.heartbeat
107 103 # print (hb_addrs)
108 104
109 105 # # Redirect input streams and set a display hook.
110 # if self.out_stream_factory:
111 # sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
112 # sys.stdout.topic = 'engine.%i.stdout'%self.id
113 # sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
114 # sys.stderr.topic = 'engine.%i.stderr'%self.id
115 # if self.display_hook_factory:
116 # sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
117 # sys.displayhook.topic = 'engine.%i.pyout'%self.id
106 if self.out_stream_factory:
107 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
108 sys.stdout.topic = 'engine.%i.stdout'%self.id
109 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
110 sys.stderr.topic = 'engine.%i.stderr'%self.id
111 if self.display_hook_factory:
112 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
113 sys.displayhook.topic = 'engine.%i.pyout'%self.id
118 114
119 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
120 115 self.kernel = Kernel(int_id=self.id, ident=self.ident, session=self.session,
121 116 control_stream=control_stream,
122 117 shell_streams=shell_streams, iopub_stream=iopub_stream, loop=loop,
123 118 user_ns = self.user_ns, config=self.config)
124 119 self.kernel.start()
125 120
126 121 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
122 # ioloop.DelayedCallback(heart.start, 1000, self.loop).start()
127 123 heart.start()
128 124
129 125
130 126 else:
131 127 logging.error("Registration Failed: %s"%msg)
132 128 raise Exception("Registration Failed: %s"%msg)
133 129
134 130 logging.info("Completed registration with id %i"%self.id)
135 131
136 132
137 133 def unregister(self):
138 134 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
139 135 time.sleep(1)
140 136 sys.exit(0)
141 137
142 138 def start(self):
143 139 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
144 140 dc.start()
145 141
146
147
148 def main(argv=None, user_ns=None):
149 """DO NOT USE ME ANYMORE"""
150 parser = make_base_argument_parser()
151
152 args = parser.parse_args(argv)
153
154 parse_url(args)
155
156 iface="%s://%s"%(args.transport,args.ip)+':%i'
157
158 loop = ioloop.IOLoop.instance()
159 session = StreamSession(keyfile=args.execkey)
160 # print (session.key)
161 ctx = zmq.Context()
162
163 # setup logging
164
165 reg_conn = iface % args.regport
166 print (reg_conn, file=sys.__stdout__)
167 print ("Starting the engine...", file=sys.__stderr__)
168
169 reg = ctx.socket(zmq.PAIR)
170 reg.connect(reg_conn)
171 reg = zmqstream.ZMQStream(reg, loop)
172
173 e = Engine(context=ctx, loop=loop, session=session, registrar=reg,
174 ident=args.ident or '', user_ns=user_ns)
175 if args.logport:
176 print ("connecting logger to %s"%(iface%args.logport), file=sys.__stdout__)
177 connect_engine_logger(ctx, iface%args.logport, e, loglevel=args.loglevel)
178 else:
179 local_logger(args.loglevel)
180
181 dc = ioloop.DelayedCallback(e.start, 0, loop)
182 dc.start()
183 try:
184 loop.start()
185 except KeyboardInterrupt:
186 print ("interrupted, exiting...", file=sys.__stderr__)
187
188 # Execution as a script
189 if __name__ == '__main__':
190 main()
@@ -1,167 +1,118
1 1 """ Defines helper functions for creating kernel entry points and process
2 2 launchers.
3 3
4 4 ************
5 5 NOTE: Most of this module has been deprecated by moving to Configurables
6 6 ************
7 7 """
8 8
9 9 # Standard library imports.
10 10 import logging
11 11 import atexit
12 12 import sys
13 13 import os
14 14 import stat
15 15 import socket
16 16 from subprocess import Popen, PIPE
17 17 from signal import signal, SIGINT, SIGABRT, SIGTERM
18 18 try:
19 19 from signal import SIGKILL
20 20 except ImportError:
21 21 SIGKILL=None
22 22
23 23 # System library imports.
24 24 import zmq
25 25 from zmq.log import handlers
26 26 # Local imports.
27 27 from IPython.core.ultratb import FormattedTB
28 28 from IPython.external.argparse import ArgumentParser
29 29 from IPython.zmq.log import EnginePUBHandler
30 30
31 def split_ports(s, n):
32 """Parser helper for multiport strings"""
33 if not s:
34 return tuple([0]*n)
35 ports = map(int, s.split(','))
36 if len(ports) != n:
37 raise ValueError
38 return ports
39
40 31 _random_ports = set()
41 32
42 33 def select_random_ports(n):
43 34 """Selects and return n random ports that are available."""
44 35 ports = []
45 36 for i in xrange(n):
46 37 sock = socket.socket()
47 38 sock.bind(('', 0))
48 39 while sock.getsockname()[1] in _random_ports:
49 40 sock.close()
50 41 sock = socket.socket()
51 42 sock.bind(('', 0))
52 43 ports.append(sock)
53 44 for i, sock in enumerate(ports):
54 45 port = sock.getsockname()[1]
55 46 sock.close()
56 47 ports[i] = port
57 48 _random_ports.add(port)
58 49 return ports
59 50
60 def parse_url(args):
61 """Ensure args.url contains full transport://interface:port"""
62 if args.url:
63 iface = args.url.split('://',1)
64 if len(args) == 2:
65 args.transport,iface = iface
66 iface = iface.split(':')
67 args.ip = iface[0]
68 if iface[1]:
69 args.regport = iface[1]
70 args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport)
71
72 51 def signal_children(children):
73 52 """Relay interupt/term signals to children, for more solid process cleanup."""
74 53 def terminate_children(sig, frame):
75 54 logging.critical("Got signal %i, terminating children..."%sig)
76 55 for child in children:
77 56 child.terminate()
78 57
79 58 sys.exit(sig != SIGINT)
80 59 # sys.exit(sig)
81 60 for sig in (SIGINT, SIGABRT, SIGTERM):
82 61 signal(sig, terminate_children)
83 62
84 63 def generate_exec_key(keyfile):
85 64 import uuid
86 65 newkey = str(uuid.uuid4())
87 66 with open(keyfile, 'w') as f:
88 67 # f.write('ipython-key ')
89 68 f.write(newkey+'\n')
90 69 # set user-only RW permissions (0600)
91 70 # this will have no effect on Windows
92 71 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
93
94
95 def make_base_argument_parser():
96 """ Creates an ArgumentParser for the generic arguments supported by all
97 ipcluster entry points.
98 """
99
100 parser = ArgumentParser()
101 parser.add_argument('--ip', type=str, default='127.0.0.1',
102 help='set the controller\'s IP address [default: local]')
103 parser.add_argument('--transport', type=str, default='tcp',
104 help='set the transport to use [default: tcp]')
105 parser.add_argument('--regport', type=int, metavar='PORT', default=10101,
106 help='set the XREP port for registration [default: 10101]')
107 parser.add_argument('--logport', type=int, metavar='PORT', default=0,
108 help='set the PUB port for remote logging [default: log to stdout]')
109 parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.INFO,
110 help='set the log level [default: INFO]')
111 parser.add_argument('--ident', type=str,
112 help='set the ZMQ identity [default: random]')
113 parser.add_argument('--packer', type=str, default='json',
114 choices=['json','pickle'],
115 help='set the message format method [default: json]')
116 parser.add_argument('--url', type=str,
117 help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101')
118 parser.add_argument('--execkey', type=str,
119 help="File containing key for authenticating requests.")
120 72
121 return parser
122 73
123 74 def integer_loglevel(loglevel):
124 75 try:
125 76 loglevel = int(loglevel)
126 77 except ValueError:
127 78 if isinstance(loglevel, str):
128 79 loglevel = getattr(logging, loglevel)
129 80 return loglevel
130 81
131 82 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
132 83 logger = logging.getLogger()
133 84 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
134 85 # don't add a second PUBHandler
135 86 return
136 87 loglevel = integer_loglevel(loglevel)
137 88 lsock = context.socket(zmq.PUB)
138 89 lsock.connect(iface)
139 90 handler = handlers.PUBHandler(lsock)
140 91 handler.setLevel(loglevel)
141 92 handler.root_topic = root
142 93 logger.addHandler(handler)
143 94 logger.setLevel(loglevel)
144 95
145 96 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
146 97 logger = logging.getLogger()
147 98 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
148 99 # don't add a second PUBHandler
149 100 return
150 101 loglevel = integer_loglevel(loglevel)
151 102 lsock = context.socket(zmq.PUB)
152 103 lsock.connect(iface)
153 104 handler = EnginePUBHandler(engine, lsock)
154 105 handler.setLevel(loglevel)
155 106 logger.addHandler(handler)
156 107 logger.setLevel(loglevel)
157 108
158 109 def local_logger(loglevel=logging.DEBUG):
159 110 loglevel = integer_loglevel(loglevel)
160 111 logger = logging.getLogger()
161 112 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
162 113 # don't add a second StreamHandler
163 114 return
164 115 handler = logging.StreamHandler()
165 116 handler.setLevel(loglevel)
166 117 logger.addHandler(handler)
167 118 logger.setLevel(loglevel)
@@ -1,1079 +1,1047
1 1 #!/usr/bin/env python
2 2 """The IPython Controller Hub with 0MQ
3 3 This is the master object that handles connections from engines and clients,
4 4 and monitors traffic through the various queues.
5 5 """
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2010 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 import sys
19 19 from datetime import datetime
20 20 import time
21 21 import logging
22 22
23 23 import zmq
24 24 from zmq.eventloop import ioloop
25 25 from zmq.eventloop.zmqstream import ZMQStream
26 26
27 27 # internal:
28 28 from IPython.config.configurable import Configurable
29 29 from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict, Set, List, Bool
30 30 from IPython.utils.importstring import import_item
31 31
32 32 from entry_point import select_random_ports
33 33 from factory import RegistrationFactory
34 34
35 35 from streamsession import Message, wrap_exception, ISO8601
36 36 from heartmonitor import HeartMonitor
37 37 from util import validate_url_container
38 38
39 39 try:
40 40 from pymongo.binary import Binary
41 41 except ImportError:
42 42 MongoDB=None
43 43 else:
44 44 from mongodb import MongoDB
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Code
48 48 #-----------------------------------------------------------------------------
49 49
50 50 def _passer(*args, **kwargs):
51 51 return
52 52
53 53 def _printer(*args, **kwargs):
54 54 print (args)
55 55 print (kwargs)
56 56
57 57 def init_record(msg):
58 58 """Initialize a TaskRecord based on a request."""
59 59 header = msg['header']
60 60 return {
61 61 'msg_id' : header['msg_id'],
62 62 'header' : header,
63 63 'content': msg['content'],
64 64 'buffers': msg['buffers'],
65 65 'submitted': datetime.strptime(header['date'], ISO8601),
66 66 'client_uuid' : None,
67 67 'engine_uuid' : None,
68 68 'started': None,
69 69 'completed': None,
70 70 'resubmitted': None,
71 71 'result_header' : None,
72 72 'result_content' : None,
73 73 'result_buffers' : None,
74 74 'queue' : None,
75 75 'pyin' : None,
76 76 'pyout': None,
77 77 'pyerr': None,
78 78 'stdout': '',
79 79 'stderr': '',
80 80 }
81 81
82 82
83 83 class EngineConnector(HasTraits):
84 84 """A simple object for accessing the various zmq connections of an object.
85 85 Attributes are:
86 86 id (int): engine ID
87 87 uuid (str): uuid (unused?)
88 88 queue (str): identity of queue's XREQ socket
89 89 registration (str): identity of registration XREQ socket
90 90 heartbeat (str): identity of heartbeat XREQ socket
91 91 """
92 92 id=Int(0)
93 93 queue=Str()
94 94 control=Str()
95 95 registration=Str()
96 96 heartbeat=Str()
97 97 pending=Set()
98 98
99 99 def __init__(self, **kwargs):
100 100 super(EngineConnector, self).__init__(**kwargs)
101 101 logging.info("engine::Engine Connected: %i"%self.id)
102 102
103 103 class HubFactory(RegistrationFactory):
104 104 """The Configurable for setting up a Hub."""
105 105
106 106 # port-pairs for monitoredqueues:
107 107 hb = Instance(list, config=True)
108 108 def _hb_default(self):
109 109 return select_random_ports(2)
110 110
111 111 mux = Instance(list, config=True)
112 112 def _mux_default(self):
113 113 return select_random_ports(2)
114 114
115 115 task = Instance(list, config=True)
116 116 def _task_default(self):
117 117 return select_random_ports(2)
118 118
119 119 control = Instance(list, config=True)
120 120 def _control_default(self):
121 121 return select_random_ports(2)
122 122
123 123 iopub = Instance(list, config=True)
124 124 def _iopub_default(self):
125 125 return select_random_ports(2)
126 126
127 127 # single ports:
128 128 mon_port = Instance(int, config=True)
129 129 def _mon_port_default(self):
130 130 return select_random_ports(1)[0]
131 131
132 132 query_port = Instance(int, config=True)
133 133 def _query_port_default(self):
134 134 return select_random_ports(1)[0]
135 135
136 136 notifier_port = Instance(int, config=True)
137 137 def _notifier_port_default(self):
138 138 return select_random_ports(1)[0]
139 139
140 140 ping = Int(1000, config=True) # ping frequency
141 141
142 142 engine_ip = Str('127.0.0.1', config=True)
143 143 engine_transport = Str('tcp', config=True)
144 144
145 145 client_ip = Str('127.0.0.1', config=True)
146 146 client_transport = Str('tcp', config=True)
147 147
148 148 monitor_ip = Str('127.0.0.1', config=True)
149 149 monitor_transport = Str('tcp', config=True)
150 150
151 151 monitor_url = Str('')
152 152
153 153 db_class = Str('IPython.zmq.parallel.dictdb.DictDB', config=True)
154 154
155 155 # not configurable
156 156 db = Instance('IPython.zmq.parallel.dictdb.BaseDB')
157 157 heartmonitor = Instance('IPython.zmq.parallel.heartmonitor.HeartMonitor')
158 158 subconstructors = List()
159 159 _constructed = Bool(False)
160 160
161 def _ip_changed(self, name, old, new):
162 self.engine_ip = new
163 self.client_ip = new
164 self.monitor_ip = new
165 self._update_monitor_url()
166
161 167 def _update_monitor_url(self):
162 168 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
163 169
164 def _sync_ips(self):
165 self.engine_ip = self.ip
166 self.client_ip = self.ip
167 self.monitor_ip = self.ip
168 self._update_monitor_url()
169
170 def _sync_transports(self):
171 self.engine_transport = self.transport
172 self.client_transport = self.transport
173 self.monitor_transport = self.transport
170 def _transport_changed(self, name, old, new):
171 self.engine_transport = new
172 self.client_transport = new
173 self.monitor_transport = new
174 174 self._update_monitor_url()
175 175
176 176 def __init__(self, **kwargs):
177 177 super(HubFactory, self).__init__(**kwargs)
178 178 self._update_monitor_url()
179 self.on_trait_change(self._sync_ips, 'ip')
180 self.on_trait_change(self._sync_transports, 'transport')
179 # self.on_trait_change(self._sync_ips, 'ip')
180 # self.on_trait_change(self._sync_transports, 'transport')
181 181 self.subconstructors.append(self.construct_hub)
182 182
183 183
184 184 def construct(self):
185 185 assert not self._constructed, "already constructed!"
186 186
187 187 for subc in self.subconstructors:
188 188 subc()
189 189
190 190 self._constructed = True
191 191
192 192
193 193 def start(self):
194 194 assert self._constructed, "must be constructed by self.construct() first!"
195 195 self.heartmonitor.start()
196 196 logging.info("Heartmonitor started")
197 197
198 198 def construct_hub(self):
199 199 """construct"""
200 200 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
201 201 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
202 202
203 203 ctx = self.context
204 204 loop = self.loop
205 205
206 206 # Registrar socket
207 207 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
208 208 reg.bind(client_iface % self.regport)
209 209 logging.info("Hub listening on %s for registration."%(client_iface%self.regport))
210 210 if self.client_ip != self.engine_ip:
211 211 reg.bind(engine_iface % self.regport)
212 212 logging.info("Hub listening on %s for registration."%(engine_iface%self.regport))
213 213
214 214 ### Engine connections ###
215 215
216 216 # heartbeat
217 217 hpub = ctx.socket(zmq.PUB)
218 218 hpub.bind(engine_iface % self.hb[0])
219 219 hrep = ctx.socket(zmq.XREP)
220 220 hrep.bind(engine_iface % self.hb[1])
221 221
222 222 self.heartmonitor = HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop), self.ping)
223 223
224 224 ### Client connections ###
225 225 # Clientele socket
226 226 c = ZMQStream(ctx.socket(zmq.XREP), loop)
227 227 c.bind(client_iface%self.query_port)
228 228 # Notifier socket
229 229 n = ZMQStream(ctx.socket(zmq.PUB), loop)
230 230 n.bind(client_iface%self.notifier_port)
231 231
232 232 ### build and launch the queues ###
233 233
234 234 # monitor socket
235 235 sub = ctx.socket(zmq.SUB)
236 236 sub.setsockopt(zmq.SUBSCRIBE, "")
237 237 sub.bind(self.monitor_url)
238 238 sub = ZMQStream(sub, loop)
239 239
240 240 # connect the db
241 241 self.db = import_item(self.db_class)()
242 242 time.sleep(.25)
243 243
244 244 # build connection dicts
245 245 self.engine_addrs = {
246 246 'control' : engine_iface%self.control[1],
247 247 'mux': engine_iface%self.mux[1],
248 248 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
249 249 'task' : engine_iface%self.task[1],
250 250 'iopub' : engine_iface%self.iopub[1],
251 251 # 'monitor' : engine_iface%self.mon_port,
252 252 }
253 253
254 254 self.client_addrs = {
255 255 'control' : client_iface%self.control[0],
256 256 'query': client_iface%self.query_port,
257 257 'mux': client_iface%self.mux[0],
258 258 'task' : client_iface%self.task[0],
259 259 'iopub' : client_iface%self.iopub[0],
260 260 'notification': client_iface%self.notifier_port
261 261 }
262 262 logging.debug("hub::Hub engine addrs: %s"%self.engine_addrs)
263 263 logging.debug("hub::Hub client addrs: %s"%self.client_addrs)
264 264 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
265 265 registrar=reg, clientele=c, notifier=n, db=self.db,
266 266 engine_addrs=self.engine_addrs, client_addrs=self.client_addrs)
267 267
268 268
269 269 class Hub(HasTraits):
270 270 """The IPython Controller Hub with 0MQ connections
271 271
272 272 Parameters
273 273 ==========
274 274 loop: zmq IOLoop instance
275 275 session: StreamSession object
276 276 <removed> context: zmq context for creating new connections (?)
277 277 queue: ZMQStream for monitoring the command queue (SUB)
278 278 registrar: ZMQStream for engine registration requests (XREP)
279 279 heartbeat: HeartMonitor object checking the pulse of the engines
280 280 clientele: ZMQStream for client connections (XREP)
281 281 not used for jobs, only query/control commands
282 282 notifier: ZMQStream for broadcasting engine registration changes (PUB)
283 283 db: connection to db for out of memory logging of commands
284 284 NotImplemented
285 285 engine_addrs: dict of zmq connection information for engines to connect
286 286 to the queues.
287 287 client_addrs: dict of zmq connection information for engines to connect
288 288 to the queues.
289 289 """
290 290 # internal data structures:
291 291 ids=Set() # engine IDs
292 292 keytable=Dict()
293 293 by_ident=Dict()
294 294 engines=Dict()
295 295 clients=Dict()
296 296 hearts=Dict()
297 297 pending=Set()
298 298 queues=Dict() # pending msg_ids keyed by engine_id
299 299 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
300 300 completed=Dict() # completed msg_ids keyed by engine_id
301 301 all_completed=Set() # completed msg_ids keyed by engine_id
302 302 # mia=None
303 303 incoming_registrations=Dict()
304 304 registration_timeout=Int()
305 305 _idcounter=Int(0)
306 306
307 307 # objects from constructor:
308 308 loop=Instance(ioloop.IOLoop)
309 309 registrar=Instance(ZMQStream)
310 310 clientele=Instance(ZMQStream)
311 311 monitor=Instance(ZMQStream)
312 312 heartmonitor=Instance(HeartMonitor)
313 313 notifier=Instance(ZMQStream)
314 314 db=Instance(object)
315 315 client_addrs=Dict()
316 316 engine_addrs=Dict()
317 317
318 318
319 319 def __init__(self, **kwargs):
320 320 """
321 321 # universal:
322 322 loop: IOLoop for creating future connections
323 323 session: streamsession for sending serialized data
324 324 # engine:
325 325 queue: ZMQStream for monitoring queue messages
326 326 registrar: ZMQStream for engine registration
327 327 heartbeat: HeartMonitor object for tracking engines
328 328 # client:
329 329 clientele: ZMQStream for client connections
330 330 # extra:
331 331 db: ZMQStream for db connection (NotImplemented)
332 332 engine_addrs: zmq address/protocol dict for engine connections
333 333 client_addrs: zmq address/protocol dict for client connections
334 334 """
335 335
336 336 super(Hub, self).__init__(**kwargs)
337 self.ids = set()
338 self.pending = set()
339 # self.keytable={}
340 # self.incoming_registrations={}
341 # self.engines = {}
342 # self.by_ident = {}
343 # self.clients = {}
344 # self.hearts = {}
345 # self.mia = set()
346 337 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
347 # this is the stuff that will move to DB:
348 # self.pending = set() # pending messages, keyed by msg_id
349 # self.queues = {} # pending msg_ids keyed by engine_id
350 # self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
351 # self.completed = {} # completed msg_ids keyed by engine_id
352 # self.all_completed = set()
353 # self._idcounter = 0
354 # self.sockets = {}
355 # self.loop = loop
356 # self.session = session
357 # self.registrar = registrar
358 # self.clientele = clientele
359 # self.queue = queue
360 # self.heartmonitor = heartbeat
361 # self.notifier = notifier
362 # self.db = db
363 338
364 339 # validate connection dicts:
365 # self.client_addrs = client_addrs
366 340 validate_url_container(self.client_addrs)
367
368 # assert isinstance(self.client_addrs['queue'], str)
369 # assert isinstance(self.client_addrs['control'], str)
370 # self.hb_addrs = hb_addrs
371 341 validate_url_container(self.engine_addrs)
372 # self.engine_addrs = engine_addrs
373 # assert isinstance(self.engine_addrs['queue'], str)
374 # assert isinstance(self.engine_addrs['control'], str)
375 # assert len(engine_addrs['heartbeat']) == 2
376 342
377 343 # register our callbacks
378 344 self.registrar.on_recv(self.dispatch_register_request)
379 345 self.clientele.on_recv(self.dispatch_client_msg)
380 346 self.monitor.on_recv(self.dispatch_monitor_traffic)
381 347
382 348 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
383 349 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
384 350
385 351 self.monitor_handlers = { 'in' : self.save_queue_request,
386 352 'out': self.save_queue_result,
387 353 'intask': self.save_task_request,
388 354 'outtask': self.save_task_result,
389 355 'tracktask': self.save_task_destination,
390 356 'incontrol': _passer,
391 357 'outcontrol': _passer,
392 358 'iopub': self.save_iopub_message,
393 359 }
394 360
395 361 self.client_handlers = {'queue_request': self.queue_status,
396 362 'result_request': self.get_results,
397 363 'purge_request': self.purge_results,
398 364 'load_request': self.check_load,
399 365 'resubmit_request': self.resubmit_task,
400 366 'shutdown_request': self.shutdown_request,
401 367 }
402 368
403 369 self.registrar_handlers = {'registration_request' : self.register_engine,
404 370 'unregistration_request' : self.unregister_engine,
405 371 'connection_request': self.connection_request,
406 372 }
407 373
408 374 logging.info("hub::created hub")
409 375
410 376 @property
411 377 def _next_id(self):
412 """gemerate a new ID"""
378 """gemerate a new ID.
379
380 No longer reuse old ids, just count from 0."""
413 381 newid = self._idcounter
414 382 self._idcounter += 1
415 383 return newid
416 384 # newid = 0
417 385 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
418 386 # # print newid, self.ids, self.incoming_registrations
419 387 # while newid in self.ids or newid in incoming:
420 388 # newid += 1
421 389 # return newid
422 390
423 391 #-----------------------------------------------------------------------------
424 392 # message validation
425 393 #-----------------------------------------------------------------------------
426 394
427 395 def _validate_targets(self, targets):
428 396 """turn any valid targets argument into a list of integer ids"""
429 397 if targets is None:
430 398 # default to all
431 399 targets = self.ids
432 400
433 401 if isinstance(targets, (int,str,unicode)):
434 402 # only one target specified
435 403 targets = [targets]
436 404 _targets = []
437 405 for t in targets:
438 406 # map raw identities to ids
439 407 if isinstance(t, (str,unicode)):
440 408 t = self.by_ident.get(t, t)
441 409 _targets.append(t)
442 410 targets = _targets
443 411 bad_targets = [ t for t in targets if t not in self.ids ]
444 412 if bad_targets:
445 413 raise IndexError("No Such Engine: %r"%bad_targets)
446 414 if not targets:
447 415 raise IndexError("No Engines Registered")
448 416 return targets
449 417
450 418 def _validate_client_msg(self, msg):
451 419 """validates and unpacks headers of a message. Returns False if invalid,
452 420 (ident, header, parent, content)"""
453 421 client_id = msg[0]
454 422 try:
455 423 msg = self.session.unpack_message(msg[1:], content=True)
456 424 except:
457 425 logging.error("client::Invalid Message %s"%msg, exc_info=True)
458 426 return False
459 427
460 428 msg_type = msg.get('msg_type', None)
461 429 if msg_type is None:
462 430 return False
463 431 header = msg.get('header')
464 432 # session doesn't handle split content for now:
465 433 return client_id, msg
466 434
467 435
468 436 #-----------------------------------------------------------------------------
469 437 # dispatch methods (1 per stream)
470 438 #-----------------------------------------------------------------------------
471 439
472 440 def dispatch_register_request(self, msg):
473 441 """"""
474 442 logging.debug("registration::dispatch_register_request(%s)"%msg)
475 443 idents,msg = self.session.feed_identities(msg)
476 444 if not idents:
477 445 logging.error("Bad Queue Message: %s"%msg, exc_info=True)
478 446 return
479 447 try:
480 448 msg = self.session.unpack_message(msg,content=True)
481 449 except:
482 450 logging.error("registration::got bad registration message: %s"%msg, exc_info=True)
483 451 return
484 452
485 453 msg_type = msg['msg_type']
486 454 content = msg['content']
487 455
488 456 handler = self.registrar_handlers.get(msg_type, None)
489 457 if handler is None:
490 458 logging.error("registration::got bad registration message: %s"%msg)
491 459 else:
492 460 handler(idents, msg)
493 461
494 462 def dispatch_monitor_traffic(self, msg):
495 463 """all ME and Task queue messages come through here, as well as
496 464 IOPub traffic."""
497 465 logging.debug("monitor traffic: %s"%msg[:2])
498 466 switch = msg[0]
499 467 idents, msg = self.session.feed_identities(msg[1:])
500 468 if not idents:
501 469 logging.error("Bad Monitor Message: %s"%msg)
502 470 return
503 471 handler = self.monitor_handlers.get(switch, None)
504 472 if handler is not None:
505 473 handler(idents, msg)
506 474 else:
507 475 logging.error("Invalid monitor topic: %s"%switch)
508 476
509 477
510 478 def dispatch_client_msg(self, msg):
511 479 """Route messages from clients"""
512 480 idents, msg = self.session.feed_identities(msg)
513 481 if not idents:
514 482 logging.error("Bad Client Message: %s"%msg)
515 483 return
516 484 client_id = idents[0]
517 485 try:
518 486 msg = self.session.unpack_message(msg, content=True)
519 487 except:
520 488 content = wrap_exception()
521 489 logging.error("Bad Client Message: %s"%msg, exc_info=True)
522 490 self.session.send(self.clientele, "hub_error", ident=client_id,
523 491 content=content)
524 492 return
525 493
526 494 # print client_id, header, parent, content
527 495 #switch on message type:
528 496 msg_type = msg['msg_type']
529 497 logging.info("client:: client %s requested %s"%(client_id, msg_type))
530 498 handler = self.client_handlers.get(msg_type, None)
531 499 try:
532 500 assert handler is not None, "Bad Message Type: %s"%msg_type
533 501 except:
534 502 content = wrap_exception()
535 503 logging.error("Bad Message Type: %s"%msg_type, exc_info=True)
536 504 self.session.send(self.clientele, "hub_error", ident=client_id,
537 505 content=content)
538 506 return
539 507 else:
540 508 handler(client_id, msg)
541 509
542 510 def dispatch_db(self, msg):
543 511 """"""
544 512 raise NotImplementedError
545 513
546 514 #---------------------------------------------------------------------------
547 515 # handler methods (1 per event)
548 516 #---------------------------------------------------------------------------
549 517
550 518 #----------------------- Heartbeat --------------------------------------
551 519
552 520 def handle_new_heart(self, heart):
553 521 """handler to attach to heartbeater.
554 522 Called when a new heart starts to beat.
555 523 Triggers completion of registration."""
556 524 logging.debug("heartbeat::handle_new_heart(%r)"%heart)
557 525 if heart not in self.incoming_registrations:
558 526 logging.info("heartbeat::ignoring new heart: %r"%heart)
559 527 else:
560 528 self.finish_registration(heart)
561 529
562 530
563 531 def handle_heart_failure(self, heart):
564 532 """handler to attach to heartbeater.
565 533 called when a previously registered heart fails to respond to beat request.
566 534 triggers unregistration"""
567 535 logging.debug("heartbeat::handle_heart_failure(%r)"%heart)
568 536 eid = self.hearts.get(heart, None)
569 537 queue = self.engines[eid].queue
570 538 if eid is None:
571 539 logging.info("heartbeat::ignoring heart failure %r"%heart)
572 540 else:
573 541 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
574 542
575 543 #----------------------- MUX Queue Traffic ------------------------------
576 544
577 545 def save_queue_request(self, idents, msg):
578 546 if len(idents) < 2:
579 547 logging.error("invalid identity prefix: %s"%idents)
580 548 return
581 549 queue_id, client_id = idents[:2]
582 550 try:
583 551 msg = self.session.unpack_message(msg, content=False)
584 552 except:
585 553 logging.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
586 554 return
587 555
588 556 eid = self.by_ident.get(queue_id, None)
589 557 if eid is None:
590 558 logging.error("queue::target %r not registered"%queue_id)
591 559 logging.debug("queue:: valid are: %s"%(self.by_ident.keys()))
592 560 return
593 561
594 562 header = msg['header']
595 563 msg_id = header['msg_id']
596 564 record = init_record(msg)
597 565 record['engine_uuid'] = queue_id
598 566 record['client_uuid'] = client_id
599 567 record['queue'] = 'mux'
600 568 if MongoDB is not None and isinstance(self.db, MongoDB):
601 569 record['buffers'] = map(Binary, record['buffers'])
602 570 self.pending.add(msg_id)
603 571 self.queues[eid].append(msg_id)
604 572 self.db.add_record(msg_id, record)
605 573
606 574 def save_queue_result(self, idents, msg):
607 575 if len(idents) < 2:
608 576 logging.error("invalid identity prefix: %s"%idents)
609 577 return
610 578
611 579 client_id, queue_id = idents[:2]
612 580 try:
613 581 msg = self.session.unpack_message(msg, content=False)
614 582 except:
615 583 logging.error("queue::engine %r sent invalid message to %r: %s"%(
616 584 queue_id,client_id, msg), exc_info=True)
617 585 return
618 586
619 587 eid = self.by_ident.get(queue_id, None)
620 588 if eid is None:
621 589 logging.error("queue::unknown engine %r is sending a reply: "%queue_id)
622 590 logging.debug("queue:: %s"%msg[2:])
623 591 return
624 592
625 593 parent = msg['parent_header']
626 594 if not parent:
627 595 return
628 596 msg_id = parent['msg_id']
629 597 if msg_id in self.pending:
630 598 self.pending.remove(msg_id)
631 599 self.all_completed.add(msg_id)
632 600 self.queues[eid].remove(msg_id)
633 601 self.completed[eid].append(msg_id)
634 602 rheader = msg['header']
635 603 completed = datetime.strptime(rheader['date'], ISO8601)
636 604 started = rheader.get('started', None)
637 605 if started is not None:
638 606 started = datetime.strptime(started, ISO8601)
639 607 result = {
640 608 'result_header' : rheader,
641 609 'result_content': msg['content'],
642 610 'started' : started,
643 611 'completed' : completed
644 612 }
645 613 if MongoDB is not None and isinstance(self.db, MongoDB):
646 614 result['result_buffers'] = map(Binary, msg['buffers'])
647 615 else:
648 616 result['result_buffers'] = msg['buffers']
649 617 self.db.update_record(msg_id, result)
650 618 else:
651 619 logging.debug("queue:: unknown msg finished %s"%msg_id)
652 620
653 621 #--------------------- Task Queue Traffic ------------------------------
654 622
655 623 def save_task_request(self, idents, msg):
656 624 """Save the submission of a task."""
657 625 client_id = idents[0]
658 626
659 627 try:
660 628 msg = self.session.unpack_message(msg, content=False)
661 629 except:
662 630 logging.error("task::client %r sent invalid task message: %s"%(
663 631 client_id, msg), exc_info=True)
664 632 return
665 633 record = init_record(msg)
666 634 if MongoDB is not None and isinstance(self.db, MongoDB):
667 635 record['buffers'] = map(Binary, record['buffers'])
668 636 record['client_uuid'] = client_id
669 637 record['queue'] = 'task'
670 638 header = msg['header']
671 639 msg_id = header['msg_id']
672 640 self.pending.add(msg_id)
673 641 self.db.add_record(msg_id, record)
674 642
675 643 def save_task_result(self, idents, msg):
676 644 """save the result of a completed task."""
677 645 client_id = idents[0]
678 646 try:
679 647 msg = self.session.unpack_message(msg, content=False)
680 648 except:
681 649 logging.error("task::invalid task result message send to %r: %s"%(
682 650 client_id, msg), exc_info=True)
683 651 raise
684 652 return
685 653
686 654 parent = msg['parent_header']
687 655 if not parent:
688 656 # print msg
689 657 logging.warn("Task %r had no parent!"%msg)
690 658 return
691 659 msg_id = parent['msg_id']
692 660
693 661 header = msg['header']
694 662 engine_uuid = header.get('engine', None)
695 663 eid = self.by_ident.get(engine_uuid, None)
696 664
697 665 if msg_id in self.pending:
698 666 self.pending.remove(msg_id)
699 667 self.all_completed.add(msg_id)
700 668 if eid is not None:
701 669 self.completed[eid].append(msg_id)
702 670 if msg_id in self.tasks[eid]:
703 671 self.tasks[eid].remove(msg_id)
704 672 completed = datetime.strptime(header['date'], ISO8601)
705 673 started = header.get('started', None)
706 674 if started is not None:
707 675 started = datetime.strptime(started, ISO8601)
708 676 result = {
709 677 'result_header' : header,
710 678 'result_content': msg['content'],
711 679 'started' : started,
712 680 'completed' : completed,
713 681 'engine_uuid': engine_uuid
714 682 }
715 683 if MongoDB is not None and isinstance(self.db, MongoDB):
716 684 result['result_buffers'] = map(Binary, msg['buffers'])
717 685 else:
718 686 result['result_buffers'] = msg['buffers']
719 687 self.db.update_record(msg_id, result)
720 688
721 689 else:
722 690 logging.debug("task::unknown task %s finished"%msg_id)
723 691
724 692 def save_task_destination(self, idents, msg):
725 693 try:
726 694 msg = self.session.unpack_message(msg, content=True)
727 695 except:
728 696 logging.error("task::invalid task tracking message", exc_info=True)
729 697 return
730 698 content = msg['content']
731 699 print (content)
732 700 msg_id = content['msg_id']
733 701 engine_uuid = content['engine_id']
734 702 eid = self.by_ident[engine_uuid]
735 703
736 704 logging.info("task::task %s arrived on %s"%(msg_id, eid))
737 705 # if msg_id in self.mia:
738 706 # self.mia.remove(msg_id)
739 707 # else:
740 708 # logging.debug("task::task %s not listed as MIA?!"%(msg_id))
741 709
742 710 self.tasks[eid].append(msg_id)
743 711 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
744 712 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
745 713
746 714 def mia_task_request(self, idents, msg):
747 715 raise NotImplementedError
748 716 client_id = idents[0]
749 717 # content = dict(mia=self.mia,status='ok')
750 718 # self.session.send('mia_reply', content=content, idents=client_id)
751 719
752 720
753 721 #--------------------- IOPub Traffic ------------------------------
754 722
755 723 def save_iopub_message(self, topics, msg):
756 724 """save an iopub message into the db"""
757 725 print (topics)
758 726 try:
759 727 msg = self.session.unpack_message(msg, content=True)
760 728 except:
761 729 logging.error("iopub::invalid IOPub message", exc_info=True)
762 730 return
763 731
764 732 parent = msg['parent_header']
765 733 if not parent:
766 734 logging.error("iopub::invalid IOPub message: %s"%msg)
767 735 return
768 736 msg_id = parent['msg_id']
769 737 msg_type = msg['msg_type']
770 738 content = msg['content']
771 739
772 740 # ensure msg_id is in db
773 741 try:
774 742 rec = self.db.get_record(msg_id)
775 743 except:
776 744 logging.error("iopub::IOPub message has invalid parent", exc_info=True)
777 745 return
778 746 # stream
779 747 d = {}
780 748 if msg_type == 'stream':
781 749 name = content['name']
782 750 s = rec[name] or ''
783 751 d[name] = s + content['data']
784 752
785 753 elif msg_type == 'pyerr':
786 754 d['pyerr'] = content
787 755 else:
788 756 d[msg_type] = content['data']
789 757
790 758 self.db.update_record(msg_id, d)
791 759
792 760
793 761
794 762 #-------------------------------------------------------------------------
795 763 # Registration requests
796 764 #-------------------------------------------------------------------------
797 765
798 766 def connection_request(self, client_id, msg):
799 767 """Reply with connection addresses for clients."""
800 768 logging.info("client::client %s connected"%client_id)
801 769 content = dict(status='ok')
802 770 content.update(self.client_addrs)
803 771 jsonable = {}
804 772 for k,v in self.keytable.iteritems():
805 773 jsonable[str(k)] = v
806 774 content['engines'] = jsonable
807 775 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
808 776
809 777 def register_engine(self, reg, msg):
810 778 """Register a new engine."""
811 779 content = msg['content']
812 780 try:
813 781 queue = content['queue']
814 782 except KeyError:
815 783 logging.error("registration::queue not specified", exc_info=True)
816 784 return
817 785 heart = content.get('heartbeat', None)
818 786 """register a new engine, and create the socket(s) necessary"""
819 787 eid = self._next_id
820 788 # print (eid, queue, reg, heart)
821 789
822 790 logging.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
823 791
824 792 content = dict(id=eid,status='ok')
825 793 content.update(self.engine_addrs)
826 794 # check if requesting available IDs:
827 795 if queue in self.by_ident:
828 796 try:
829 797 raise KeyError("queue_id %r in use"%queue)
830 798 except:
831 799 content = wrap_exception()
832 800 logging.error("queue_id %r in use"%queue, exc_info=True)
833 801 elif heart in self.hearts: # need to check unique hearts?
834 802 try:
835 803 raise KeyError("heart_id %r in use"%heart)
836 804 except:
837 805 logging.error("heart_id %r in use"%heart, exc_info=True)
838 806 content = wrap_exception()
839 807 else:
840 808 for h, pack in self.incoming_registrations.iteritems():
841 809 if heart == h:
842 810 try:
843 811 raise KeyError("heart_id %r in use"%heart)
844 812 except:
845 813 logging.error("heart_id %r in use"%heart, exc_info=True)
846 814 content = wrap_exception()
847 815 break
848 816 elif queue == pack[1]:
849 817 try:
850 818 raise KeyError("queue_id %r in use"%queue)
851 819 except:
852 820 logging.error("queue_id %r in use"%queue, exc_info=True)
853 821 content = wrap_exception()
854 822 break
855 823
856 824 msg = self.session.send(self.registrar, "registration_reply",
857 825 content=content,
858 826 ident=reg)
859 827
860 828 if content['status'] == 'ok':
861 829 if heart in self.heartmonitor.hearts:
862 830 # already beating
863 831 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
864 832 self.finish_registration(heart)
865 833 else:
866 834 purge = lambda : self._purge_stalled_registration(heart)
867 835 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
868 836 dc.start()
869 837 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
870 838 else:
871 839 logging.error("registration::registration %i failed: %s"%(eid, content['evalue']))
872 840 return eid
873 841
874 842 def unregister_engine(self, ident, msg):
875 843 """Unregister an engine that explicitly requested to leave."""
876 844 try:
877 845 eid = msg['content']['id']
878 846 except:
879 847 logging.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
880 848 return
881 849 logging.info("registration::unregister_engine(%s)"%eid)
882 850 content=dict(id=eid, queue=self.engines[eid].queue)
883 851 self.ids.remove(eid)
884 852 self.keytable.pop(eid)
885 853 ec = self.engines.pop(eid)
886 854 self.hearts.pop(ec.heartbeat)
887 855 self.by_ident.pop(ec.queue)
888 856 self.completed.pop(eid)
889 857 for msg_id in self.queues.pop(eid):
890 858 msg = self.pending.remove(msg_id)
891 859 ############## TODO: HANDLE IT ################
892 860
893 861 if self.notifier:
894 862 self.session.send(self.notifier, "unregistration_notification", content=content)
895 863
896 864 def finish_registration(self, heart):
897 865 """Second half of engine registration, called after our HeartMonitor
898 866 has received a beat from the Engine's Heart."""
899 867 try:
900 868 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
901 869 except KeyError:
902 870 logging.error("registration::tried to finish nonexistant registration", exc_info=True)
903 871 return
904 872 logging.info("registration::finished registering engine %i:%r"%(eid,queue))
905 873 if purge is not None:
906 874 purge.stop()
907 875 control = queue
908 876 self.ids.add(eid)
909 877 self.keytable[eid] = queue
910 878 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
911 879 control=control, heartbeat=heart)
912 880 self.by_ident[queue] = eid
913 881 self.queues[eid] = list()
914 882 self.tasks[eid] = list()
915 883 self.completed[eid] = list()
916 884 self.hearts[heart] = eid
917 885 content = dict(id=eid, queue=self.engines[eid].queue)
918 886 if self.notifier:
919 887 self.session.send(self.notifier, "registration_notification", content=content)
920 888
921 889 def _purge_stalled_registration(self, heart):
922 890 if heart in self.incoming_registrations:
923 891 eid = self.incoming_registrations.pop(heart)[0]
924 892 logging.info("registration::purging stalled registration: %i"%eid)
925 893 else:
926 894 pass
927 895
928 896 #-------------------------------------------------------------------------
929 897 # Client Requests
930 898 #-------------------------------------------------------------------------
931 899
932 900 def shutdown_request(self, client_id, msg):
933 901 """handle shutdown request."""
934 902 # s = self.context.socket(zmq.XREQ)
935 903 # s.connect(self.client_connections['mux'])
936 904 # time.sleep(0.1)
937 905 # for eid,ec in self.engines.iteritems():
938 906 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
939 907 # time.sleep(1)
940 908 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
941 909 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
942 910 dc.start()
943 911
944 912 def _shutdown(self):
945 913 logging.info("hub::hub shutting down.")
946 914 time.sleep(0.1)
947 915 sys.exit(0)
948 916
949 917
950 918 def check_load(self, client_id, msg):
951 919 content = msg['content']
952 920 try:
953 921 targets = content['targets']
954 922 targets = self._validate_targets(targets)
955 923 except:
956 924 content = wrap_exception()
957 925 self.session.send(self.clientele, "hub_error",
958 926 content=content, ident=client_id)
959 927 return
960 928
961 929 content = dict(status='ok')
962 930 # loads = {}
963 931 for t in targets:
964 932 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
965 933 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
966 934
967 935
968 936 def queue_status(self, client_id, msg):
969 937 """Return the Queue status of one or more targets.
970 938 if verbose: return the msg_ids
971 939 else: return len of each type.
972 940 keys: queue (pending MUX jobs)
973 941 tasks (pending Task jobs)
974 942 completed (finished jobs from both queues)"""
975 943 content = msg['content']
976 944 targets = content['targets']
977 945 try:
978 946 targets = self._validate_targets(targets)
979 947 except:
980 948 content = wrap_exception()
981 949 self.session.send(self.clientele, "hub_error",
982 950 content=content, ident=client_id)
983 951 return
984 952 verbose = content.get('verbose', False)
985 953 content = dict(status='ok')
986 954 for t in targets:
987 955 queue = self.queues[t]
988 956 completed = self.completed[t]
989 957 tasks = self.tasks[t]
990 958 if not verbose:
991 959 queue = len(queue)
992 960 completed = len(completed)
993 961 tasks = len(tasks)
994 962 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
995 963 # pending
996 964 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
997 965
998 966 def purge_results(self, client_id, msg):
999 967 """Purge results from memory. This method is more valuable before we move
1000 968 to a DB based message storage mechanism."""
1001 969 content = msg['content']
1002 970 msg_ids = content.get('msg_ids', [])
1003 971 reply = dict(status='ok')
1004 972 if msg_ids == 'all':
1005 973 self.db.drop_matching_records(dict(completed={'$ne':None}))
1006 974 else:
1007 975 for msg_id in msg_ids:
1008 976 if msg_id in self.all_completed:
1009 977 self.db.drop_record(msg_id)
1010 978 else:
1011 979 if msg_id in self.pending:
1012 980 try:
1013 981 raise IndexError("msg pending: %r"%msg_id)
1014 982 except:
1015 983 reply = wrap_exception()
1016 984 else:
1017 985 try:
1018 986 raise IndexError("No such msg: %r"%msg_id)
1019 987 except:
1020 988 reply = wrap_exception()
1021 989 break
1022 990 eids = content.get('engine_ids', [])
1023 991 for eid in eids:
1024 992 if eid not in self.engines:
1025 993 try:
1026 994 raise IndexError("No such engine: %i"%eid)
1027 995 except:
1028 996 reply = wrap_exception()
1029 997 break
1030 998 msg_ids = self.completed.pop(eid)
1031 999 uid = self.engines[eid].queue
1032 1000 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1033 1001
1034 1002 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
1035 1003
1036 1004 def resubmit_task(self, client_id, msg, buffers):
1037 1005 """Resubmit a task."""
1038 1006 raise NotImplementedError
1039 1007
1040 1008 def get_results(self, client_id, msg):
1041 1009 """Get the result of 1 or more messages."""
1042 1010 content = msg['content']
1043 1011 msg_ids = sorted(set(content['msg_ids']))
1044 1012 statusonly = content.get('status_only', False)
1045 1013 pending = []
1046 1014 completed = []
1047 1015 content = dict(status='ok')
1048 1016 content['pending'] = pending
1049 1017 content['completed'] = completed
1050 1018 buffers = []
1051 1019 if not statusonly:
1052 1020 content['results'] = {}
1053 1021 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1054 1022 for msg_id in msg_ids:
1055 1023 if msg_id in self.pending:
1056 1024 pending.append(msg_id)
1057 1025 elif msg_id in self.all_completed:
1058 1026 completed.append(msg_id)
1059 1027 if not statusonly:
1060 1028 rec = records[msg_id]
1061 1029 io_dict = {}
1062 1030 for key in 'pyin pyout pyerr stdout stderr'.split():
1063 1031 io_dict[key] = rec[key]
1064 1032 content[msg_id] = { 'result_content': rec['result_content'],
1065 1033 'header': rec['header'],
1066 1034 'result_header' : rec['result_header'],
1067 1035 'io' : io_dict,
1068 1036 }
1069 1037 buffers.extend(map(str, rec['result_buffers']))
1070 1038 else:
1071 1039 try:
1072 1040 raise KeyError('No such message: '+msg_id)
1073 1041 except:
1074 1042 content = wrap_exception()
1075 1043 break
1076 1044 self.session.send(self.clientele, "result_reply", content=content,
1077 1045 parent=msg, ident=client_id,
1078 1046 buffers=buffers)
1079 1047
@@ -1,322 +1,340
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import copy
21 21 import sys
22 22 import os
23 23 import logging
24 24 # from twisted.application import service
25 25 # from twisted.internet import reactor
26 26 # from twisted.python import log
27 27
28 28 import zmq
29 29 from zmq.log.handlers import PUBHandler
30 30
31 31 from IPython.config.loader import Config
32 32 from IPython.zmq.parallel import factory
33 33 from IPython.zmq.parallel.controller import ControllerFactory
34 34 from IPython.zmq.parallel.clusterdir import (
35 35 ApplicationWithClusterDir,
36 36 ClusterDirConfigLoader
37 37 )
38 38 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
39 39 from IPython.utils.traitlets import Instance, Unicode
40 40
41 41 from entry_point import generate_exec_key
42 42
43 43
44 44 #-----------------------------------------------------------------------------
45 45 # Module level variables
46 46 #-----------------------------------------------------------------------------
47 47
48 48
49 49 #: The default config file name for this application
50 50 default_config_file_name = u'ipcontroller_config.py'
51 51
52 52
53 53 _description = """Start the IPython controller for parallel computing.
54 54
55 55 The IPython controller provides a gateway between the IPython engines and
56 56 clients. The controller needs to be started before the engines and can be
57 57 configured using command line options or using a cluster directory. Cluster
58 58 directories contain config, log and security files and are usually located in
59 59 your .ipython directory and named as "cluster_<profile>". See the --profile
60 60 and --cluster-dir options for details.
61 61 """
62 62
63 63 #-----------------------------------------------------------------------------
64 64 # Default interfaces
65 65 #-----------------------------------------------------------------------------
66 66
67 67 # The default client interfaces for FCClientServiceFactory.interfaces
68 68 default_client_interfaces = Config()
69 69 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
70 70
71 71 # Make this a dict we can pass to Config.__init__ for the default
72 72 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
73 73
74 74
75 75
76 76 # The default engine interfaces for FCEngineServiceFactory.interfaces
77 77 default_engine_interfaces = Config()
78 78 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
79 79
80 80 # Make this a dict we can pass to Config.__init__ for the default
81 81 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
82 82
83 83
84 84 #-----------------------------------------------------------------------------
85 85 # Service factories
86 86 #-----------------------------------------------------------------------------
87 87
88 88 #
89 89 # class FCClientServiceFactory(FCServiceFactory):
90 90 # """A Foolscap implementation of the client services."""
91 91 #
92 92 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
93 93 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
94 94 # allow_none=False, config=True)
95 95 #
96 96 #
97 97 # class FCEngineServiceFactory(FCServiceFactory):
98 98 # """A Foolscap implementation of the engine services."""
99 99 #
100 100 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
101 101 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
102 102 # allow_none=False, config=True)
103 103 #
104 104
105 105 #-----------------------------------------------------------------------------
106 106 # Command line options
107 107 #-----------------------------------------------------------------------------
108 108
109 109
110 110 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
111 111
112 112 def _add_arguments(self):
113 113 super(IPControllerAppConfigLoader, self)._add_arguments()
114 114 paa = self.parser.add_argument
115 115
116 116 ## Hub Config:
117 117 paa('--mongodb',
118 118 dest='HubFactory.db_class', action='store_const',
119 119 const='IPython.zmq.parallel.mongodb.MongoDB',
120 120 help='Use MongoDB task storage [default: in-memory]')
121 121 paa('--hb',
122 122 type=int, dest='HubFactory.hb', nargs=2,
123 123 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
124 124 'connections [default: random]',
125 125 metavar='Hub.hb_ports')
126 paa('--ping',
127 type=int, dest='HubFactory.ping',
128 help='The frequency at which the Hub pings the engines for heartbeats '
129 ' (in ms) [default: 100]',
130 metavar='Hub.ping')
126 131
127 132 # Client config
128 133 paa('--client-ip',
129 134 type=str, dest='HubFactory.client_ip',
130 135 help='The IP address or hostname the Hub will listen on for '
131 136 'client connections. Both engine-ip and client-ip can be set simultaneously '
132 137 'via --ip [default: loopback]',
133 138 metavar='Hub.client_ip')
134 139 paa('--client-transport',
135 140 type=str, dest='HubFactory.client_transport',
136 141 help='The ZeroMQ transport the Hub will use for '
137 142 'client connections. Both engine-transport and client-transport can be set simultaneously '
138 143 'via --transport [default: tcp]',
139 144 metavar='Hub.client_transport')
140 145 paa('--query',
141 146 type=int, dest='HubFactory.query_port',
142 147 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
143 148 metavar='Hub.query_port')
144 149 paa('--notifier',
145 150 type=int, dest='HubFactory.notifier_port',
146 151 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
147 152 metavar='Hub.notifier_port')
148 153
149 154 # Engine config
150 155 paa('--engine-ip',
151 156 type=str, dest='HubFactory.engine_ip',
152 157 help='The IP address or hostname the Hub will listen on for '
153 158 'engine connections. This applies to the Hub and its schedulers'
154 159 'engine-ip and client-ip can be set simultaneously '
155 160 'via --ip [default: loopback]',
156 161 metavar='Hub.engine_ip')
157 162 paa('--engine-transport',
158 163 type=str, dest='HubFactory.engine_transport',
159 164 help='The ZeroMQ transport the Hub will use for '
160 165 'client connections. Both engine-transport and client-transport can be set simultaneously '
161 166 'via --transport [default: tcp]',
162 167 metavar='Hub.engine_transport')
163 168
164 169 # Scheduler config
165 170 paa('--mux',
166 171 type=int, dest='ControllerFactory.mux', nargs=2,
167 172 help='The (2) ports the MUX scheduler will listen on for client,engine '
168 173 'connections, respectively [default: random]',
169 174 metavar='Scheduler.mux_ports')
170 175 paa('--task',
171 176 type=int, dest='ControllerFactory.task', nargs=2,
172 177 help='The (2) ports the Task scheduler will listen on for client,engine '
173 178 'connections, respectively [default: random]',
174 179 metavar='Scheduler.task_ports')
175 180 paa('--control',
176 181 type=int, dest='ControllerFactory.control', nargs=2,
177 182 help='The (2) ports the Control scheduler will listen on for client,engine '
178 183 'connections, respectively [default: random]',
179 184 metavar='Scheduler.control_ports')
180 185 paa('--iopub',
181 186 type=int, dest='ControllerFactory.iopub', nargs=2,
182 187 help='The (2) ports the IOPub scheduler will listen on for client,engine '
183 188 'connections, respectively [default: random]',
184 189 metavar='Scheduler.iopub_ports')
185 190 paa('--scheme',
186 191 type=str, dest='ControllerFactory.scheme',
187 192 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
188 193 help='select the task scheduler scheme [default: Python LRU]',
189 194 metavar='Scheduler.scheme')
190 195 paa('--usethreads',
191 196 dest='ControllerFactory.usethreads', action="store_true",
192 197 help='Use threads instead of processes for the schedulers',
193 198 )
194 199
195 200 ## Global config
196 201 paa('--log-to-file',
197 202 action='store_true', dest='Global.log_to_file',
198 203 help='Log to a file in the log directory (default is stdout)')
199 204 paa('--log-url',
200 205 type=str, dest='Global.log_url',
201 206 help='Broadcast logs to an iploggerz process [default: disabled]')
202 207 paa('-r','--reuse-key',
203 208 action='store_true', dest='Global.reuse_key',
204 209 help='Try to reuse existing execution keys.')
205 210 paa('--no-secure',
206 211 action='store_false', dest='Global.secure',
207 help='Turn off execution keys.')
212 help='Turn off execution keys (default).')
208 213 paa('--secure',
209 214 action='store_true', dest='Global.secure',
210 help='Turn on execution keys (default).')
215 help='Turn on execution keys.')
211 216 paa('--execkey',
212 217 type=str, dest='Global.exec_key',
213 218 help='path to a file containing an execution key.',
214 219 metavar='keyfile')
215 220 factory.add_session_arguments(self.parser)
216 221 factory.add_registration_arguments(self.parser)
217 222
218 223
219 224 #-----------------------------------------------------------------------------
220 225 # The main application
221 226 #-----------------------------------------------------------------------------
222 227
223 228
224 229 class IPControllerApp(ApplicationWithClusterDir):
225 230
226 231 name = u'ipcontrollerz'
227 232 description = _description
228 233 command_line_loader = IPControllerAppConfigLoader
229 234 default_config_file_name = default_config_file_name
230 235 auto_create_cluster_dir = True
231 236
232 237 def create_default_config(self):
233 238 super(IPControllerApp, self).create_default_config()
234 239 # Don't set defaults for Global.secure or Global.reuse_furls
235 240 # as those are set in a component.
236 241 self.default_config.Global.import_statements = []
237 242 self.default_config.Global.clean_logs = True
238 243 self.default_config.Global.secure = False
239 244 self.default_config.Global.reuse_key = False
240 245 self.default_config.Global.exec_key = "exec_key.key"
241 246
242 247 def pre_construct(self):
243 248 super(IPControllerApp, self).pre_construct()
244 249 c = self.master_config
245 250 # The defaults for these are set in FCClientServiceFactory and
246 251 # FCEngineServiceFactory, so we only set them here if the global
247 252 # options have be set to override the class level defaults.
248 253
249 254 # if hasattr(c.Global, 'reuse_furls'):
250 255 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
251 256 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
252 257 # del c.Global.reuse_furls
253 258 # if hasattr(c.Global, 'secure'):
254 259 # c.FCClientServiceFactory.secure = c.Global.secure
255 260 # c.FCEngineServiceFactory.secure = c.Global.secure
256 261 # del c.Global.secure
257 262
258 263 def construct(self):
259 264 # This is the working dir by now.
260 265 sys.path.insert(0, '')
261 266 c = self.master_config
262 267
263 268 self.import_statements()
264 269
265 270 if c.Global.secure:
266 271 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
267 272 if not c.Global.reuse_key or not os.path.exists(keyfile):
268 273 generate_exec_key(keyfile)
269 274 c.SessionFactory.exec_key = keyfile
270 275 else:
271 276 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
272 277 if os.path.exists(keyfile):
273 278 os.remove(keyfile)
274 279 c.SessionFactory.exec_key = ''
275 280
276 281 try:
277 282 self.factory = ControllerFactory(config=c)
278 283 self.start_logging()
279 284 self.factory.construct()
280 285 except:
281 286 self.log.error("Couldn't construct the Controller", exc_info=True)
282 287 self.exit(1)
288
289 def save_urls(self):
290 """save the registration urls to files."""
291 c = self.master_config
292
293 sec_dir = c.Global.security_dir
294 cf = self.factory
295
296 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
297 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
298
299 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
300 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
283 301
284 302
285 303 def import_statements(self):
286 304 statements = self.master_config.Global.import_statements
287 305 for s in statements:
288 306 try:
289 307 self.log.msg("Executing statement: '%s'" % s)
290 308 exec s in globals(), locals()
291 309 except:
292 310 self.log.msg("Error running statement: %s" % s)
293 311
294 # def start_logging(self):
295 # super(IPControllerApp, self).start_logging()
296 # if self.master_config.Global.log_url:
297 # context = self.factory.context
298 # lsock = context.socket(zmq.PUB)
299 # lsock.connect(self.master_config.Global.log_url)
300 # handler = PUBHandler(lsock)
301 # handler.root_topic = 'controller'
302 # handler.setLevel(self.log_level)
303 # self.log.addHandler(handler)
312 def start_logging(self):
313 super(IPControllerApp, self).start_logging()
314 if self.master_config.Global.log_url:
315 context = self.factory.context
316 lsock = context.socket(zmq.PUB)
317 lsock.connect(self.master_config.Global.log_url)
318 handler = PUBHandler(lsock)
319 handler.root_topic = 'controller'
320 handler.setLevel(self.log_level)
321 self.log.addHandler(handler)
304 322 #
305 323 def start_app(self):
306 # Start the controller service.
324 # Start the subprocesses:
307 325 self.factory.start()
308 326 self.write_pid_file(overwrite=True)
309 327 try:
310 328 self.factory.loop.start()
311 329 except KeyboardInterrupt:
312 330 self.log.critical("Interrupted, Exiting...\n")
313 331
314 332
315 333 def launch_new_instance():
316 334 """Create and run the IPython controller"""
317 335 app = IPControllerApp()
318 336 app.start()
319 337
320 338
321 339 if __name__ == '__main__':
322 340 launch_new_instance()
@@ -1,251 +1,252
1 1 #!/usr/bin/env python
2 2 # -*- coding: utf-8 -*-
3 3 """Setup script for IPython.
4 4
5 5 Under Posix environments it works like a typical setup.py script.
6 6 Under Windows, the command sdist is not supported, since IPython
7 7 requires utilities which are not available under Windows."""
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (c) 2008-2010, IPython Development Team.
11 11 # Copyright (c) 2001-2007, Fernando Perez <fernando.perez@colorado.edu>
12 12 # Copyright (c) 2001, Janko Hauser <jhauser@zscout.de>
13 13 # Copyright (c) 2001, Nathaniel Gray <n8gray@caltech.edu>
14 14 #
15 15 # Distributed under the terms of the Modified BSD License.
16 16 #
17 17 # The full license is in the file COPYING.txt, distributed with this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Minimal Python version sanity check
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import sys
25 25
26 26 # This check is also made in IPython/__init__, don't forget to update both when
27 27 # changing Python version requirements.
28 28 if sys.version[0:3] < '2.6':
29 29 error = """\
30 30 ERROR: 'IPython requires Python Version 2.6 or above.'
31 31 Exiting."""
32 32 print >> sys.stderr, error
33 33 sys.exit(1)
34 34
35 35 # At least we're on the python version we need, move on.
36 36
37 37 #-------------------------------------------------------------------------------
38 38 # Imports
39 39 #-------------------------------------------------------------------------------
40 40
41 41 # Stdlib imports
42 42 import os
43 43 import shutil
44 44
45 45 from glob import glob
46 46
47 47 # BEFORE importing distutils, remove MANIFEST. distutils doesn't properly
48 48 # update it when the contents of directories change.
49 49 if os.path.exists('MANIFEST'): os.remove('MANIFEST')
50 50
51 51 from distutils.core import setup
52 52
53 53 # Our own imports
54 54 from IPython.utils.path import target_update
55 55
56 56 from setupbase import (
57 57 setup_args,
58 58 find_packages,
59 59 find_package_data,
60 60 find_scripts,
61 61 find_data_files,
62 62 check_for_dependencies,
63 63 record_commit_info,
64 64 )
65 65
66 66 isfile = os.path.isfile
67 67 pjoin = os.path.join
68 68
69 69 #-----------------------------------------------------------------------------
70 70 # Function definitions
71 71 #-----------------------------------------------------------------------------
72 72
73 73 def cleanup():
74 74 """Clean up the junk left around by the build process"""
75 75 if "develop" not in sys.argv:
76 76 try:
77 77 shutil.rmtree('ipython.egg-info')
78 78 except:
79 79 try:
80 80 os.unlink('ipython.egg-info')
81 81 except:
82 82 pass
83 83
84 84 #-------------------------------------------------------------------------------
85 85 # Handle OS specific things
86 86 #-------------------------------------------------------------------------------
87 87
88 88 if os.name == 'posix':
89 89 os_name = 'posix'
90 90 elif os.name in ['nt','dos']:
91 91 os_name = 'windows'
92 92 else:
93 93 print 'Unsupported operating system:',os.name
94 94 sys.exit(1)
95 95
96 96 # Under Windows, 'sdist' has not been supported. Now that the docs build with
97 97 # Sphinx it might work, but let's not turn it on until someone confirms that it
98 98 # actually works.
99 99 if os_name == 'windows' and 'sdist' in sys.argv:
100 100 print 'The sdist command is not available under Windows. Exiting.'
101 101 sys.exit(1)
102 102
103 103 #-------------------------------------------------------------------------------
104 104 # Things related to the IPython documentation
105 105 #-------------------------------------------------------------------------------
106 106
107 107 # update the manuals when building a source dist
108 108 if len(sys.argv) >= 2 and sys.argv[1] in ('sdist','bdist_rpm'):
109 109 import textwrap
110 110
111 111 # List of things to be updated. Each entry is a triplet of args for
112 112 # target_update()
113 113 to_update = [
114 114 # FIXME - Disabled for now: we need to redo an automatic way
115 115 # of generating the magic info inside the rst.
116 116 #('docs/magic.tex',
117 117 #['IPython/Magic.py'],
118 118 #"cd doc && ./update_magic.sh" ),
119 119
120 120 ('docs/man/ipcluster.1.gz',
121 121 ['docs/man/ipcluster.1'],
122 122 'cd docs/man && gzip -9c ipcluster.1 > ipcluster.1.gz'),
123 123
124 124 ('docs/man/ipcontroller.1.gz',
125 125 ['docs/man/ipcontroller.1'],
126 126 'cd docs/man && gzip -9c ipcontroller.1 > ipcontroller.1.gz'),
127 127
128 128 ('docs/man/ipengine.1.gz',
129 129 ['docs/man/ipengine.1'],
130 130 'cd docs/man && gzip -9c ipengine.1 > ipengine.1.gz'),
131 131
132 132 ('docs/man/ipython.1.gz',
133 133 ['docs/man/ipython.1'],
134 134 'cd docs/man && gzip -9c ipython.1 > ipython.1.gz'),
135 135
136 136 ('docs/man/ipython-wx.1.gz',
137 137 ['docs/man/ipython-wx.1'],
138 138 'cd docs/man && gzip -9c ipython-wx.1 > ipython-wx.1.gz'),
139 139
140 140 ('docs/man/ipythonx.1.gz',
141 141 ['docs/man/ipythonx.1'],
142 142 'cd docs/man && gzip -9c ipythonx.1 > ipythonx.1.gz'),
143 143
144 144 ('docs/man/irunner.1.gz',
145 145 ['docs/man/irunner.1'],
146 146 'cd docs/man && gzip -9c irunner.1 > irunner.1.gz'),
147 147
148 148 ('docs/man/pycolor.1.gz',
149 149 ['docs/man/pycolor.1'],
150 150 'cd docs/man && gzip -9c pycolor.1 > pycolor.1.gz'),
151 151 ]
152 152
153 153 # Only build the docs if sphinx is present
154 154 try:
155 155 import sphinx
156 156 except ImportError:
157 157 pass
158 158 else:
159 159 # The Makefile calls the do_sphinx scripts to build html and pdf, so
160 160 # just one target is enough to cover all manual generation
161 161
162 162 # First, compute all the dependencies that can force us to rebuild the
163 163 # docs. Start with the main release file that contains metadata
164 164 docdeps = ['IPython/core/release.py']
165 165 # Inculde all the reST sources
166 166 pjoin = os.path.join
167 167 for dirpath,dirnames,filenames in os.walk('docs/source'):
168 168 if dirpath in ['_static','_templates']:
169 169 continue
170 170 docdeps += [ pjoin(dirpath,f) for f in filenames
171 171 if f.endswith('.txt') ]
172 172 # and the examples
173 173 for dirpath,dirnames,filenames in os.walk('docs/example'):
174 174 docdeps += [ pjoin(dirpath,f) for f in filenames
175 175 if not f.endswith('~') ]
176 176 # then, make them all dependencies for the main PDF (the html will get
177 177 # auto-generated as well).
178 178 to_update.append(
179 179 ('docs/dist/ipython.pdf',
180 180 docdeps,
181 181 "cd docs && make dist")
182 182 )
183 183
184 184 [ target_update(*t) for t in to_update ]
185 185
186 186 #---------------------------------------------------------------------------
187 187 # Find all the packages, package data, scripts and data_files
188 188 #---------------------------------------------------------------------------
189 189
190 190 packages = find_packages()
191 191 package_data = find_package_data()
192 192 scripts = find_scripts()
193 193 data_files = find_data_files()
194 194
195 195 #---------------------------------------------------------------------------
196 196 # Handle dependencies and setuptools specific things
197 197 #---------------------------------------------------------------------------
198 198
199 199 # For some commands, use setuptools. Note that we do NOT list install here!
200 200 # If you want a setuptools-enhanced install, just run 'setupegg.py install'
201 201 if len(set(('develop', 'sdist', 'release', 'bdist_egg', 'bdist_rpm',
202 202 'bdist', 'bdist_dumb', 'bdist_wininst', 'install_egg_info',
203 203 'build_sphinx', 'egg_info', 'easy_install', 'upload',
204 204 )).intersection(sys.argv)) > 0:
205 205 import setuptools
206 206
207 207 # This dict is used for passing extra arguments that are setuptools
208 208 # specific to setup
209 209 setuptools_extra_args = {}
210 210
211 211 if 'setuptools' in sys.modules:
212 212 setuptools_extra_args['zip_safe'] = False
213 213 setuptools_extra_args['entry_points'] = {
214 214 'console_scripts': [
215 215 'ipython = IPython.frontend.terminal.ipapp:launch_new_instance',
216 216 'ipython-qtconsole = IPython.frontend.qt.console.ipythonqt:main',
217 217 'pycolor = IPython.utils.PyColorize:main',
218 218 'ipcontrollerz = IPython.zmq.parallel.ipcontrollerapp:launch_new_instance',
219 219 'ipenginez = IPython.zmq.parallel.ipengineapp:launch_new_instance',
220 220 'iploggerz = IPython.zmq.parallel.iploggerapp:launch_new_instance',
221 'ipclusterz = IPython.zmq.parallel.ipcluster:main',
221 'ipclusterz = IPython.zmq.parallel.ipclusterapp:launch_new_instance',
222 222 'iptest = IPython.testing.iptest:main',
223 223 'irunner = IPython.lib.irunner:main'
224 224 ]
225 225 }
226 226 setup_args['extras_require'] = dict(
227 zmq = 'pyzmq>=2.0.10',
227 228 doc='Sphinx>=0.3',
228 229 test='nose>=0.10.1',
229 230 security='pyOpenSSL>=0.6'
230 231 )
231 232 else:
232 233 # If we are running without setuptools, call this function which will
233 234 # check for dependencies an inform the user what is needed. This is
234 235 # just to make life easy for users.
235 236 check_for_dependencies()
236 237
237 238 #---------------------------------------------------------------------------
238 239 # Do the actual setup now
239 240 #---------------------------------------------------------------------------
240 241
241 242 setup_args['cmdclass'] = {'build_py': record_commit_info('IPython')}
242 243 setup_args['packages'] = packages
243 244 setup_args['package_data'] = package_data
244 245 setup_args['scripts'] = scripts
245 246 setup_args['data_files'] = data_files
246 247 setup_args.update(setuptools_extra_args)
247 248
248 249
249 250 if __name__ == '__main__':
250 251 setup(**setup_args)
251 252 cleanup()
General Comments 0
You need to be logged in to leave comments. Login now