##// END OF EJS Templates
adapt kernel's ipcluster and Launchers to newparallel
MinRK -
Show More
This diff has been collapsed as it changes many lines, (502 lines changed) Show them Hide them
@@ -0,0 +1,502 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3 """
4 The ipcluster application.
5 """
6
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 import logging
19 import os
20 import signal
21 import logging
22
23 from zmq.eventloop import ioloop
24
25 from IPython.external.argparse import ArgumentParser, SUPPRESS
26 from IPython.utils.importstring import import_item
27 from IPython.zmq.parallel.clusterdir import (
28 ApplicationWithClusterDir, ClusterDirConfigLoader,
29 ClusterDirError, PIDFileError
30 )
31
32
33 #-----------------------------------------------------------------------------
34 # Module level variables
35 #-----------------------------------------------------------------------------
36
37
38 default_config_file_name = u'ipcluster_config.py'
39
40
41 _description = """\
42 Start an IPython cluster for parallel computing.\n\n
43
44 An IPython cluster consists of 1 controller and 1 or more engines.
45 This command automates the startup of these processes using a wide
46 range of startup methods (SSH, local processes, PBS, mpiexec,
47 Windows HPC Server 2008). To start a cluster with 4 engines on your
48 local host simply do 'ipcluster start -n 4'. For more complex usage
49 you will typically do 'ipcluster create -p mycluster', then edit
50 configuration files, followed by 'ipcluster start -p mycluster -n 4'.
51 """
52
53
54 # Exit codes for ipcluster
55
56 # This will be the exit code if the ipcluster appears to be running because
57 # a .pid file exists
58 ALREADY_STARTED = 10
59
60
61 # This will be the exit code if ipcluster stop is run, but there is not .pid
62 # file to be found.
63 ALREADY_STOPPED = 11
64
65
66 #-----------------------------------------------------------------------------
67 # Command line options
68 #-----------------------------------------------------------------------------
69
70
71 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
72
73 def _add_arguments(self):
74 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
75 # its defaults on self.parser. Instead, we will put those on
76 # default options on our subparsers.
77
78 # This has all the common options that all subcommands use
79 parent_parser1 = ArgumentParser(
80 add_help=False,
81 argument_default=SUPPRESS
82 )
83 self._add_ipython_dir(parent_parser1)
84 self._add_log_level(parent_parser1)
85
86 # This has all the common options that other subcommands use
87 parent_parser2 = ArgumentParser(
88 add_help=False,
89 argument_default=SUPPRESS
90 )
91 self._add_cluster_profile(parent_parser2)
92 self._add_cluster_dir(parent_parser2)
93 self._add_work_dir(parent_parser2)
94 paa = parent_parser2.add_argument
95 paa('--log-to-file',
96 action='store_true', dest='Global.log_to_file',
97 help='Log to a file in the log directory (default is stdout)')
98
99 # Create the object used to create the subparsers.
100 subparsers = self.parser.add_subparsers(
101 dest='Global.subcommand',
102 title='ipcluster subcommands',
103 description=
104 """ipcluster has a variety of subcommands. The general way of
105 running ipcluster is 'ipcluster <cmd> [options]'. To get help
106 on a particular subcommand do 'ipcluster <cmd> -h'."""
107 # help="For more help, type 'ipcluster <cmd> -h'",
108 )
109
110 # The "list" subcommand parser
111 parser_list = subparsers.add_parser(
112 'list',
113 parents=[parent_parser1],
114 argument_default=SUPPRESS,
115 help="List all clusters in cwd and ipython_dir.",
116 description=
117 """List all available clusters, by cluster directory, that can
118 be found in the current working directly or in the ipython
119 directory. Cluster directories are named using the convention
120 'cluster_<profile>'."""
121 )
122
123 # The "create" subcommand parser
124 parser_create = subparsers.add_parser(
125 'create',
126 parents=[parent_parser1, parent_parser2],
127 argument_default=SUPPRESS,
128 help="Create a new cluster directory.",
129 description=
130 """Create an ipython cluster directory by its profile name or
131 cluster directory path. Cluster directories contain
132 configuration, log and security related files and are named
133 using the convention 'cluster_<profile>'. By default they are
134 located in your ipython directory. Once created, you will
135 probably need to edit the configuration files in the cluster
136 directory to configure your cluster. Most users will create a
137 cluster directory by profile name,
138 'ipcluster create -p mycluster', which will put the directory
139 in '<ipython_dir>/cluster_mycluster'.
140 """
141 )
142 paa = parser_create.add_argument
143 paa('--reset-config',
144 dest='Global.reset_config', action='store_true',
145 help=
146 """Recopy the default config files to the cluster directory.
147 You will loose any modifications you have made to these files.""")
148
149 # The "start" subcommand parser
150 parser_start = subparsers.add_parser(
151 'start',
152 parents=[parent_parser1, parent_parser2],
153 argument_default=SUPPRESS,
154 help="Start a cluster.",
155 description=
156 """Start an ipython cluster by its profile name or cluster
157 directory. Cluster directories contain configuration, log and
158 security related files and are named using the convention
159 'cluster_<profile>' and should be creating using the 'start'
160 subcommand of 'ipcluster'. If your cluster directory is in
161 the cwd or the ipython directory, you can simply refer to it
162 using its profile name, 'ipcluster start -n 4 -p <profile>`,
163 otherwise use the '--cluster-dir' option.
164 """
165 )
166 paa = parser_start.add_argument
167 paa('-n', '--number',
168 type=int, dest='Global.n',
169 help='The number of engines to start.',
170 metavar='Global.n')
171 paa('--clean-logs',
172 dest='Global.clean_logs', action='store_true',
173 help='Delete old log flies before starting.')
174 paa('--no-clean-logs',
175 dest='Global.clean_logs', action='store_false',
176 help="Don't delete old log flies before starting.")
177 paa('--daemon',
178 dest='Global.daemonize', action='store_true',
179 help='Daemonize the ipcluster program. This implies --log-to-file')
180 paa('--no-daemon',
181 dest='Global.daemonize', action='store_false',
182 help="Dont't daemonize the ipcluster program.")
183
184 # The "stop" subcommand parser
185 parser_stop = subparsers.add_parser(
186 'stop',
187 parents=[parent_parser1, parent_parser2],
188 argument_default=SUPPRESS,
189 help="Stop a running cluster.",
190 description=
191 """Stop a running ipython cluster by its profile name or cluster
192 directory. Cluster directories are named using the convention
193 'cluster_<profile>'. If your cluster directory is in
194 the cwd or the ipython directory, you can simply refer to it
195 using its profile name, 'ipcluster stop -p <profile>`, otherwise
196 use the '--cluster-dir' option.
197 """
198 )
199 paa = parser_stop.add_argument
200 paa('--signal',
201 dest='Global.signal', type=int,
202 help="The signal number to use in stopping the cluster (default=2).",
203 metavar="Global.signal")
204
205
206 #-----------------------------------------------------------------------------
207 # Main application
208 #-----------------------------------------------------------------------------
209
210
211 class IPClusterApp(ApplicationWithClusterDir):
212
213 name = u'ipclusterz'
214 description = _description
215 usage = None
216 command_line_loader = IPClusterAppConfigLoader
217 default_config_file_name = default_config_file_name
218 default_log_level = logging.INFO
219 auto_create_cluster_dir = False
220
221 def create_default_config(self):
222 super(IPClusterApp, self).create_default_config()
223 self.default_config.Global.controller_launcher = \
224 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
225 self.default_config.Global.engine_launcher = \
226 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
227 self.default_config.Global.n = 2
228 self.default_config.Global.reset_config = False
229 self.default_config.Global.clean_logs = True
230 self.default_config.Global.signal = 2
231 self.default_config.Global.daemonize = False
232
233 def find_resources(self):
234 subcommand = self.command_line_config.Global.subcommand
235 if subcommand=='list':
236 self.list_cluster_dirs()
237 # Exit immediately because there is nothing left to do.
238 self.exit()
239 elif subcommand=='create':
240 self.auto_create_cluster_dir = True
241 super(IPClusterApp, self).find_resources()
242 elif subcommand=='start' or subcommand=='stop':
243 self.auto_create_cluster_dir = True
244 try:
245 super(IPClusterApp, self).find_resources()
246 except ClusterDirError:
247 raise ClusterDirError(
248 "Could not find a cluster directory. A cluster dir must "
249 "be created before running 'ipcluster start'. Do "
250 "'ipcluster create -h' or 'ipcluster list -h' for more "
251 "information about creating and listing cluster dirs."
252 )
253
254 def list_cluster_dirs(self):
255 # Find the search paths
256 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
257 if cluster_dir_paths:
258 cluster_dir_paths = cluster_dir_paths.split(':')
259 else:
260 cluster_dir_paths = []
261 try:
262 ipython_dir = self.command_line_config.Global.ipython_dir
263 except AttributeError:
264 ipython_dir = self.default_config.Global.ipython_dir
265 paths = [os.getcwd(), ipython_dir] + \
266 cluster_dir_paths
267 paths = list(set(paths))
268
269 self.log.info('Searching for cluster dirs in paths: %r' % paths)
270 for path in paths:
271 files = os.listdir(path)
272 for f in files:
273 full_path = os.path.join(path, f)
274 if os.path.isdir(full_path) and f.startswith('cluster_'):
275 profile = full_path.split('_')[-1]
276 start_cmd = 'ipcluster start -p %s -n 4' % profile
277 print start_cmd + " ==> " + full_path
278
279 def pre_construct(self):
280 # IPClusterApp.pre_construct() is where we cd to the working directory.
281 super(IPClusterApp, self).pre_construct()
282 config = self.master_config
283 try:
284 daemon = config.Global.daemonize
285 if daemon:
286 config.Global.log_to_file = True
287 except AttributeError:
288 pass
289
290 def construct(self):
291 config = self.master_config
292 subcmd = config.Global.subcommand
293 reset = config.Global.reset_config
294 if subcmd == 'list':
295 return
296 if subcmd == 'create':
297 self.log.info('Copying default config files to cluster directory '
298 '[overwrite=%r]' % (reset,))
299 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
300 if subcmd =='start':
301 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
302 self.start_logging()
303 self.loop = ioloop.IOLoop.instance()
304 # reactor.callWhenRunning(self.start_launchers)
305 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
306 dc.start()
307
308 def start_launchers(self):
309 config = self.master_config
310
311 # Create the launchers. In both bases, we set the work_dir of
312 # the launcher to the cluster_dir. This is where the launcher's
313 # subprocesses will be launched. It is not where the controller
314 # and engine will be launched.
315 el_class = import_item(config.Global.engine_launcher)
316 self.engine_launcher = el_class(
317 work_dir=self.cluster_dir, config=config
318 )
319 cl_class = import_item(config.Global.controller_launcher)
320 self.controller_launcher = cl_class(
321 work_dir=self.cluster_dir, config=config
322 )
323
324 # Setup signals
325 signal.signal(signal.SIGINT, self.sigint_handler)
326
327 # Setup the observing of stopping. If the controller dies, shut
328 # everything down as that will be completely fatal for the engines.
329 self.controller_launcher.on_stop(self.stop_launchers)
330 # d1.addCallback(self.stop_launchers)
331 # But, we don't monitor the stopping of engines. An engine dying
332 # is just fine and in principle a user could start a new engine.
333 # Also, if we did monitor engine stopping, it is difficult to
334 # know what to do when only some engines die. Currently, the
335 # observing of engine stopping is inconsistent. Some launchers
336 # might trigger on a single engine stopping, other wait until
337 # all stop. TODO: think more about how to handle this.
338
339 # Start the controller and engines
340 self._stopping = False # Make sure stop_launchers is not called 2x.
341 d = self.start_controller()
342 self.start_engines()
343 self.startup_message()
344 # d.addCallback(self.start_engines)
345 # d.addCallback(self.startup_message)
346 # If the controller or engines fail to start, stop everything
347 # d.addErrback(self.stop_launchers)
348 return d
349
350 def startup_message(self, r=None):
351 logging.info("IPython cluster: started")
352 return r
353
354 def start_controller(self, r=None):
355 # logging.info("In start_controller")
356 config = self.master_config
357 d = self.controller_launcher.start(
358 cluster_dir=config.Global.cluster_dir
359 )
360 return d
361
362 def start_engines(self, r=None):
363 # logging.info("In start_engines")
364 config = self.master_config
365 d = self.engine_launcher.start(
366 config.Global.n,
367 cluster_dir=config.Global.cluster_dir
368 )
369 return d
370
371 def stop_controller(self, r=None):
372 # logging.info("In stop_controller")
373 if self.controller_launcher.running:
374 return self.controller_launcher.stop()
375
376 def stop_engines(self, r=None):
377 # logging.info("In stop_engines")
378 if self.engine_launcher.running:
379 d = self.engine_launcher.stop()
380 # d.addErrback(self.log_err)
381 return d
382 else:
383 return None
384
385 def log_err(self, f):
386 logging.error(f.getTraceback())
387 return None
388
389 def stop_launchers(self, r=None):
390 if not self._stopping:
391 self._stopping = True
392 # if isinstance(r, failure.Failure):
393 # logging.error('Unexpected error in ipcluster:')
394 # logging.info(r.getTraceback())
395 logging.error("IPython cluster: stopping")
396 # These return deferreds. We are not doing anything with them
397 # but we are holding refs to them as a reminder that they
398 # do return deferreds.
399 d1 = self.stop_engines()
400 d2 = self.stop_controller()
401 # Wait a few seconds to let things shut down.
402 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
403 dc.start()
404 # reactor.callLater(4.0, reactor.stop)
405
406 def sigint_handler(self, signum, frame):
407 self.stop_launchers()
408
409 def start_logging(self):
410 # Remove old log files of the controller and engine
411 if self.master_config.Global.clean_logs:
412 log_dir = self.master_config.Global.log_dir
413 for f in os.listdir(log_dir):
414 if f.startswith('ipengine' + '-'):
415 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
416 os.remove(os.path.join(log_dir, f))
417 if f.startswith('ipcontroller' + '-'):
418 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
419 os.remove(os.path.join(log_dir, f))
420 # This will remote old log files for ipcluster itself
421 super(IPClusterApp, self).start_logging()
422
423 def start_app(self):
424 """Start the application, depending on what subcommand is used."""
425 subcmd = self.master_config.Global.subcommand
426 if subcmd=='create' or subcmd=='list':
427 return
428 elif subcmd=='start':
429 self.start_app_start()
430 elif subcmd=='stop':
431 self.start_app_stop()
432
433 def start_app_start(self):
434 """Start the app for the start subcommand."""
435 config = self.master_config
436 # First see if the cluster is already running
437 try:
438 pid = self.get_pid_from_file()
439 except PIDFileError:
440 pass
441 else:
442 self.log.critical(
443 'Cluster is already running with [pid=%s]. '
444 'use "ipcluster stop" to stop the cluster.' % pid
445 )
446 # Here I exit with a unusual exit status that other processes
447 # can watch for to learn how I existed.
448 self.exit(ALREADY_STARTED)
449
450 # Now log and daemonize
451 self.log.info(
452 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
453 )
454 # TODO: Get daemonize working on Windows or as a Windows Server.
455 if config.Global.daemonize:
456 if os.name=='posix':
457 from twisted.scripts._twistd_unix import daemonize
458 daemonize()
459
460 # Now write the new pid file AFTER our new forked pid is active.
461 self.write_pid_file()
462 try:
463 self.loop.start()
464 except:
465 logging.info("stopping...")
466 self.remove_pid_file()
467
468 def start_app_stop(self):
469 """Start the app for the stop subcommand."""
470 config = self.master_config
471 try:
472 pid = self.get_pid_from_file()
473 except PIDFileError:
474 self.log.critical(
475 'Problem reading pid file, cluster is probably not running.'
476 )
477 # Here I exit with a unusual exit status that other processes
478 # can watch for to learn how I existed.
479 self.exit(ALREADY_STOPPED)
480 else:
481 if os.name=='posix':
482 sig = config.Global.signal
483 self.log.info(
484 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
485 )
486 os.kill(pid, sig)
487 elif os.name=='nt':
488 # As of right now, we don't support daemonize on Windows, so
489 # stop will not do anything. Minimally, it should clean up the
490 # old .pid files.
491 self.remove_pid_file()
492
493
494 def launch_new_instance():
495 """Create and run the IPython cluster."""
496 app = IPClusterApp()
497 app.start()
498
499
500 if __name__ == '__main__':
501 launch_new_instance()
502
This diff has been collapsed as it changes many lines, (824 lines changed) Show them Hide them
@@ -0,0 +1,824 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3 """
4 Facilities for launching IPython processes asynchronously.
5 """
6
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 import os
19 import re
20 import sys
21 import logging
22
23 from signal import SIGINT
24 try:
25 from signal import SIGKILL
26 except ImportError:
27 SIGKILL=SIGTERM
28
29 from subprocess import Popen, PIPE
30
31 from zmq.eventloop import ioloop
32
33 from IPython.config.configurable import Configurable
34 from IPython.utils.traitlets import Str, Int, List, Unicode, Instance
35 from IPython.utils.path import get_ipython_module_path
36 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
37
38 # from IPython.kernel.winhpcjob import (
39 # IPControllerTask, IPEngineTask,
40 # IPControllerJob, IPEngineSetJob
41 # )
42
43
44 #-----------------------------------------------------------------------------
45 # Paths to the kernel apps
46 #-----------------------------------------------------------------------------
47
48
49 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
50 'IPython.zmq.parallel.ipclusterapp'
51 ))
52
53 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
54 'IPython.zmq.parallel.ipengineapp'
55 ))
56
57 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
58 'IPython.zmq.parallel.ipcontrollerapp'
59 ))
60
61 #-----------------------------------------------------------------------------
62 # Base launchers and errors
63 #-----------------------------------------------------------------------------
64
65
66 class LauncherError(Exception):
67 pass
68
69
70 class ProcessStateError(LauncherError):
71 pass
72
73
74 class UnknownStatus(LauncherError):
75 pass
76
77
78 class BaseLauncher(Configurable):
79 """An asbtraction for starting, stopping and signaling a process."""
80
81 # In all of the launchers, the work_dir is where child processes will be
82 # run. This will usually be the cluster_dir, but may not be. any work_dir
83 # passed into the __init__ method will override the config value.
84 # This should not be used to set the work_dir for the actual engine
85 # and controller. Instead, use their own config files or the
86 # controller_args, engine_args attributes of the launchers to add
87 # the --work-dir option.
88 work_dir = Unicode(u'.')
89 loop = Instance('zmq.eventloop.ioloop.IOLoop')
90 def _loop_default(self):
91 return ioloop.IOLoop.instance()
92
93 def __init__(self, work_dir=u'.', config=None):
94 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config)
95 self.state = 'before' # can be before, running, after
96 self.stop_callbacks = []
97 self.start_data = None
98 self.stop_data = None
99
100 @property
101 def args(self):
102 """A list of cmd and args that will be used to start the process.
103
104 This is what is passed to :func:`spawnProcess` and the first element
105 will be the process name.
106 """
107 return self.find_args()
108
109 def find_args(self):
110 """The ``.args`` property calls this to find the args list.
111
112 Subcommand should implement this to construct the cmd and args.
113 """
114 raise NotImplementedError('find_args must be implemented in a subclass')
115
116 @property
117 def arg_str(self):
118 """The string form of the program arguments."""
119 return ' '.join(self.args)
120
121 @property
122 def running(self):
123 """Am I running."""
124 if self.state == 'running':
125 return True
126 else:
127 return False
128
129 def start(self):
130 """Start the process.
131
132 This must return a deferred that fires with information about the
133 process starting (like a pid, job id, etc.).
134 """
135 raise NotImplementedError('start must be implemented in a subclass')
136
137 def stop(self):
138 """Stop the process and notify observers of stopping.
139
140 This must return a deferred that fires with information about the
141 processing stopping, like errors that occur while the process is
142 attempting to be shut down. This deferred won't fire when the process
143 actually stops. To observe the actual process stopping, see
144 :func:`observe_stop`.
145 """
146 raise NotImplementedError('stop must be implemented in a subclass')
147
148 def on_stop(self, f):
149 """Get a deferred that will fire when the process stops.
150
151 The deferred will fire with data that contains information about
152 the exit status of the process.
153 """
154 if self.state=='after':
155 return f(self.stop_data)
156 else:
157 self.stop_callbacks.append(f)
158
159 def notify_start(self, data):
160 """Call this to trigger startup actions.
161
162 This logs the process startup and sets the state to 'running'. It is
163 a pass-through so it can be used as a callback.
164 """
165
166 logging.info('Process %r started: %r' % (self.args[0], data))
167 self.start_data = data
168 self.state = 'running'
169 return data
170
171 def notify_stop(self, data):
172 """Call this to trigger process stop actions.
173
174 This logs the process stopping and sets the state to 'after'. Call
175 this to trigger all the deferreds from :func:`observe_stop`."""
176
177 logging.info('Process %r stopped: %r' % (self.args[0], data))
178 self.stop_data = data
179 self.state = 'after'
180 for i in range(len(self.stop_callbacks)):
181 d = self.stop_callbacks.pop()
182 d(data)
183 return data
184
185 def signal(self, sig):
186 """Signal the process.
187
188 Return a semi-meaningless deferred after signaling the process.
189
190 Parameters
191 ----------
192 sig : str or int
193 'KILL', 'INT', etc., or any signal number
194 """
195 raise NotImplementedError('signal must be implemented in a subclass')
196
197
198 #-----------------------------------------------------------------------------
199 # Local process launchers
200 #-----------------------------------------------------------------------------
201
202
203 class LocalProcessLauncher(BaseLauncher):
204 """Start and stop an external process in an asynchronous manner.
205
206 This will launch the external process with a working directory of
207 ``self.work_dir``.
208 """
209
210 # This is used to to construct self.args, which is passed to
211 # spawnProcess.
212 cmd_and_args = List([])
213 poll_frequency = Int(100) # in ms
214
215 def __init__(self, work_dir=u'.', config=None):
216 super(LocalProcessLauncher, self).__init__(
217 work_dir=work_dir, config=config
218 )
219 self.process = None
220 self.start_deferred = None
221 self.poller = None
222
223 def find_args(self):
224 return self.cmd_and_args
225
226 def start(self):
227 if self.state == 'before':
228 self.process = Popen(self.args,
229 stdout=PIPE,stderr=PIPE,stdin=PIPE,
230 env=os.environ,
231 cwd=self.work_dir
232 )
233
234 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
235 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
236 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
237 self.poller.start()
238 self.notify_start(self.process.pid)
239 else:
240 s = 'The process was already started and has state: %r' % self.state
241 raise ProcessStateError(s)
242
243 def stop(self):
244 return self.interrupt_then_kill()
245
246 def signal(self, sig):
247 if self.state == 'running':
248 self.process.send_signal(sig)
249
250 def interrupt_then_kill(self, delay=2.0):
251 """Send INT, wait a delay and then send KILL."""
252 self.signal(SIGINT)
253 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
254 self.killer.start()
255
256 # callbacks, etc:
257
258 def handle_stdout(self, fd, events):
259 line = self.process.stdout.readline()
260 # a stopped process will be readable but return empty strings
261 if line:
262 logging.info(line[:-1])
263 else:
264 self.poll()
265
266 def handle_stderr(self, fd, events):
267 line = self.process.stderr.readline()
268 # a stopped process will be readable but return empty strings
269 if line:
270 logging.error(line[:-1])
271 else:
272 self.poll()
273
274 def poll(self):
275 status = self.process.poll()
276 if status is not None:
277 self.poller.stop()
278 self.loop.remove_handler(self.process.stdout.fileno())
279 self.loop.remove_handler(self.process.stderr.fileno())
280 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
281 return status
282
283 class LocalControllerLauncher(LocalProcessLauncher):
284 """Launch a controller as a regular external process."""
285
286 controller_cmd = List(ipcontroller_cmd_argv, config=True)
287 # Command line arguments to ipcontroller.
288 controller_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True)
289
290 def find_args(self):
291 return self.controller_cmd + self.controller_args
292
293 def start(self, cluster_dir):
294 """Start the controller by cluster_dir."""
295 self.controller_args.extend(['--cluster-dir', cluster_dir])
296 self.cluster_dir = unicode(cluster_dir)
297 logging.info("Starting LocalControllerLauncher: %r" % self.args)
298 return super(LocalControllerLauncher, self).start()
299
300
301 class LocalEngineLauncher(LocalProcessLauncher):
302 """Launch a single engine as a regular externall process."""
303
304 engine_cmd = List(ipengine_cmd_argv, config=True)
305 # Command line arguments for ipengine.
306 engine_args = List(
307 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
308 )
309
310 def find_args(self):
311 return self.engine_cmd + self.engine_args
312
313 def start(self, cluster_dir):
314 """Start the engine by cluster_dir."""
315 self.engine_args.extend(['--cluster-dir', cluster_dir])
316 self.cluster_dir = unicode(cluster_dir)
317 return super(LocalEngineLauncher, self).start()
318
319
320 class LocalEngineSetLauncher(BaseLauncher):
321 """Launch a set of engines as regular external processes."""
322
323 # Command line arguments for ipengine.
324 engine_args = List(
325 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
326 )
327 # launcher class
328 launcher_class = LocalEngineLauncher
329
330 def __init__(self, work_dir=u'.', config=None):
331 super(LocalEngineSetLauncher, self).__init__(
332 work_dir=work_dir, config=config
333 )
334 self.launchers = {}
335 self.stop_data = {}
336
337 def start(self, n, cluster_dir):
338 """Start n engines by profile or cluster_dir."""
339 self.cluster_dir = unicode(cluster_dir)
340 dlist = []
341 for i in range(n):
342 el = self.launcher_class(work_dir=self.work_dir, config=self.config)
343 # Copy the engine args over to each engine launcher.
344 import copy
345 el.engine_args = copy.deepcopy(self.engine_args)
346 el.on_stop(self._notice_engine_stopped)
347 d = el.start(cluster_dir)
348 if i==0:
349 logging.info("Starting LocalEngineSetLauncher: %r" % el.args)
350 self.launchers[i] = el
351 dlist.append(d)
352 self.notify_start(dlist)
353 # The consumeErrors here could be dangerous
354 # dfinal = gatherBoth(dlist, consumeErrors=True)
355 # dfinal.addCallback(self.notify_start)
356 return dlist
357
358 def find_args(self):
359 return ['engine set']
360
361 def signal(self, sig):
362 dlist = []
363 for el in self.launchers.itervalues():
364 d = el.signal(sig)
365 dlist.append(d)
366 # dfinal = gatherBoth(dlist, consumeErrors=True)
367 return dlist
368
369 def interrupt_then_kill(self, delay=1.0):
370 dlist = []
371 for el in self.launchers.itervalues():
372 d = el.interrupt_then_kill(delay)
373 dlist.append(d)
374 # dfinal = gatherBoth(dlist, consumeErrors=True)
375 return dlist
376
377 def stop(self):
378 return self.interrupt_then_kill()
379
380 def _notice_engine_stopped(self, data):
381 print "notice", data
382 pid = data['pid']
383 for idx,el in self.launchers.iteritems():
384 if el.process.pid == pid:
385 break
386 self.launchers.pop(idx)
387 self.stop_data[idx] = data
388 if not self.launchers:
389 self.notify_stop(self.stop_data)
390
391
392 #-----------------------------------------------------------------------------
393 # MPIExec launchers
394 #-----------------------------------------------------------------------------
395
396
397 class MPIExecLauncher(LocalProcessLauncher):
398 """Launch an external process using mpiexec."""
399
400 # The mpiexec command to use in starting the process.
401 mpi_cmd = List(['mpiexec'], config=True)
402 # The command line arguments to pass to mpiexec.
403 mpi_args = List([], config=True)
404 # The program to start using mpiexec.
405 program = List(['date'], config=True)
406 # The command line argument to the program.
407 program_args = List([], config=True)
408 # The number of instances of the program to start.
409 n = Int(1, config=True)
410
411 def find_args(self):
412 """Build self.args using all the fields."""
413 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
414 self.program + self.program_args
415
416 def start(self, n):
417 """Start n instances of the program using mpiexec."""
418 self.n = n
419 return super(MPIExecLauncher, self).start()
420
421
422 class MPIExecControllerLauncher(MPIExecLauncher):
423 """Launch a controller using mpiexec."""
424
425 controller_cmd = List(ipcontroller_cmd_argv, config=True)
426 # Command line arguments to ipcontroller.
427 controller_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True)
428 n = Int(1, config=False)
429
430 def start(self, cluster_dir):
431 """Start the controller by cluster_dir."""
432 self.controller_args.extend(['--cluster-dir', cluster_dir])
433 self.cluster_dir = unicode(cluster_dir)
434 logging.info("Starting MPIExecControllerLauncher: %r" % self.args)
435 return super(MPIExecControllerLauncher, self).start(1)
436
437 def find_args(self):
438 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
439 self.controller_cmd + self.controller_args
440
441
442 class MPIExecEngineSetLauncher(MPIExecLauncher):
443
444 engine_cmd = List(ipengine_cmd_argv, config=True)
445 # Command line arguments for ipengine.
446 engine_args = List(
447 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
448 )
449 n = Int(1, config=True)
450
451 def start(self, n, cluster_dir):
452 """Start n engines by profile or cluster_dir."""
453 self.engine_args.extend(['--cluster-dir', cluster_dir])
454 self.cluster_dir = unicode(cluster_dir)
455 self.n = n
456 logging.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
457 return super(MPIExecEngineSetLauncher, self).start(n)
458
459 def find_args(self):
460 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
461 self.engine_cmd + self.engine_args
462
463
464 #-----------------------------------------------------------------------------
465 # SSH launchers
466 #-----------------------------------------------------------------------------
467
468 # TODO: Get SSH Launcher working again.
469
470 class SSHLauncher(LocalProcessLauncher):
471 """A minimal launcher for ssh.
472
473 To be useful this will probably have to be extended to use the ``sshx``
474 idea for environment variables. There could be other things this needs
475 as well.
476 """
477
478 ssh_cmd = List(['ssh'], config=True)
479 ssh_args = List([], config=True)
480 program = List(['date'], config=True)
481 program_args = List([], config=True)
482 hostname = Str('', config=True)
483 user = Str(os.environ.get('USER','username'), config=True)
484 location = Str('')
485
486 def _hostname_changed(self, name, old, new):
487 self.location = '%s@%s' % (self.user, new)
488
489 def _user_changed(self, name, old, new):
490 self.location = '%s@%s' % (new, self.hostname)
491
492 def find_args(self):
493 return self.ssh_cmd + self.ssh_args + [self.location] + \
494 self.program + self.program_args
495
496 def start(self, cluster_dir, hostname=None, user=None):
497 if hostname is not None:
498 self.hostname = hostname
499 if user is not None:
500 self.user = user
501 return super(SSHLauncher, self).start()
502
503
504 class SSHControllerLauncher(SSHLauncher):
505
506 program = List(ipcontroller_cmd_argv, config=True)
507 # Command line arguments to ipcontroller.
508 program_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True)
509
510
511 class SSHEngineLauncher(SSHLauncher):
512 program = List(ipengine_cmd_argv, config=True)
513 # Command line arguments for ipengine.
514 program_args = List(
515 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
516 )
517
518 class SSHEngineSetLauncher(LocalEngineSetLauncher):
519 launcher_class = SSHEngineLauncher
520
521
522 #-----------------------------------------------------------------------------
523 # Windows HPC Server 2008 scheduler launchers
524 #-----------------------------------------------------------------------------
525
526
527 # # This is only used on Windows.
528 # def find_job_cmd():
529 # if os.name=='nt':
530 # try:
531 # return find_cmd('job')
532 # except FindCmdError:
533 # return 'job'
534 # else:
535 # return 'job'
536 #
537 #
538 # class WindowsHPCLauncher(BaseLauncher):
539 #
540 # # A regular expression used to get the job id from the output of the
541 # # submit_command.
542 # job_id_regexp = Str(r'\d+', config=True)
543 # # The filename of the instantiated job script.
544 # job_file_name = Unicode(u'ipython_job.xml', config=True)
545 # # The full path to the instantiated job script. This gets made dynamically
546 # # by combining the work_dir with the job_file_name.
547 # job_file = Unicode(u'')
548 # # The hostname of the scheduler to submit the job to
549 # scheduler = Str('', config=True)
550 # job_cmd = Str(find_job_cmd(), config=True)
551 #
552 # def __init__(self, work_dir=u'.', config=None):
553 # super(WindowsHPCLauncher, self).__init__(
554 # work_dir=work_dir, config=config
555 # )
556 #
557 # @property
558 # def job_file(self):
559 # return os.path.join(self.work_dir, self.job_file_name)
560 #
561 # def write_job_file(self, n):
562 # raise NotImplementedError("Implement write_job_file in a subclass.")
563 #
564 # def find_args(self):
565 # return ['job.exe']
566 #
567 # def parse_job_id(self, output):
568 # """Take the output of the submit command and return the job id."""
569 # m = re.search(self.job_id_regexp, output)
570 # if m is not None:
571 # job_id = m.group()
572 # else:
573 # raise LauncherError("Job id couldn't be determined: %s" % output)
574 # self.job_id = job_id
575 # logging.info('Job started with job id: %r' % job_id)
576 # return job_id
577 #
578 # @inlineCallbacks
579 # def start(self, n):
580 # """Start n copies of the process using the Win HPC job scheduler."""
581 # self.write_job_file(n)
582 # args = [
583 # 'submit',
584 # '/jobfile:%s' % self.job_file,
585 # '/scheduler:%s' % self.scheduler
586 # ]
587 # logging.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
588 # # Twisted will raise DeprecationWarnings if we try to pass unicode to this
589 # output = yield getProcessOutput(str(self.job_cmd),
590 # [str(a) for a in args],
591 # env=dict((str(k),str(v)) for k,v in os.environ.items()),
592 # path=self.work_dir
593 # )
594 # job_id = self.parse_job_id(output)
595 # self.notify_start(job_id)
596 # defer.returnValue(job_id)
597 #
598 # @inlineCallbacks
599 # def stop(self):
600 # args = [
601 # 'cancel',
602 # self.job_id,
603 # '/scheduler:%s' % self.scheduler
604 # ]
605 # logging.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
606 # try:
607 # # Twisted will raise DeprecationWarnings if we try to pass unicode to this
608 # output = yield getProcessOutput(str(self.job_cmd),
609 # [str(a) for a in args],
610 # env=dict((str(k),str(v)) for k,v in os.environ.iteritems()),
611 # path=self.work_dir
612 # )
613 # except:
614 # output = 'The job already appears to be stoppped: %r' % self.job_id
615 # self.notify_stop(output) # Pass the output of the kill cmd
616 # defer.returnValue(output)
617 #
618 #
619 # class WindowsHPCControllerLauncher(WindowsHPCLauncher):
620 #
621 # job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
622 # extra_args = List([], config=False)
623 #
624 # def write_job_file(self, n):
625 # job = IPControllerJob(config=self.config)
626 #
627 # t = IPControllerTask(config=self.config)
628 # # The tasks work directory is *not* the actual work directory of
629 # # the controller. It is used as the base path for the stdout/stderr
630 # # files that the scheduler redirects to.
631 # t.work_directory = self.cluster_dir
632 # # Add the --cluster-dir and from self.start().
633 # t.controller_args.extend(self.extra_args)
634 # job.add_task(t)
635 #
636 # logging.info("Writing job description file: %s" % self.job_file)
637 # job.write(self.job_file)
638 #
639 # @property
640 # def job_file(self):
641 # return os.path.join(self.cluster_dir, self.job_file_name)
642 #
643 # def start(self, cluster_dir):
644 # """Start the controller by cluster_dir."""
645 # self.extra_args = ['--cluster-dir', cluster_dir]
646 # self.cluster_dir = unicode(cluster_dir)
647 # return super(WindowsHPCControllerLauncher, self).start(1)
648 #
649 #
650 # class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
651 #
652 # job_file_name = Unicode(u'ipengineset_job.xml', config=True)
653 # extra_args = List([], config=False)
654 #
655 # def write_job_file(self, n):
656 # job = IPEngineSetJob(config=self.config)
657 #
658 # for i in range(n):
659 # t = IPEngineTask(config=self.config)
660 # # The tasks work directory is *not* the actual work directory of
661 # # the engine. It is used as the base path for the stdout/stderr
662 # # files that the scheduler redirects to.
663 # t.work_directory = self.cluster_dir
664 # # Add the --cluster-dir and from self.start().
665 # t.engine_args.extend(self.extra_args)
666 # job.add_task(t)
667 #
668 # logging.info("Writing job description file: %s" % self.job_file)
669 # job.write(self.job_file)
670 #
671 # @property
672 # def job_file(self):
673 # return os.path.join(self.cluster_dir, self.job_file_name)
674 #
675 # def start(self, n, cluster_dir):
676 # """Start the controller by cluster_dir."""
677 # self.extra_args = ['--cluster-dir', cluster_dir]
678 # self.cluster_dir = unicode(cluster_dir)
679 # return super(WindowsHPCEngineSetLauncher, self).start(n)
680 #
681 #
682 # #-----------------------------------------------------------------------------
683 # # Batch (PBS) system launchers
684 # #-----------------------------------------------------------------------------
685 #
686 # # TODO: Get PBS launcher working again.
687 #
688 # class BatchSystemLauncher(BaseLauncher):
689 # """Launch an external process using a batch system.
690 #
691 # This class is designed to work with UNIX batch systems like PBS, LSF,
692 # GridEngine, etc. The overall model is that there are different commands
693 # like qsub, qdel, etc. that handle the starting and stopping of the process.
694 #
695 # This class also has the notion of a batch script. The ``batch_template``
696 # attribute can be set to a string that is a template for the batch script.
697 # This template is instantiated using Itpl. Thus the template can use
698 # ${n} fot the number of instances. Subclasses can add additional variables
699 # to the template dict.
700 # """
701 #
702 # # Subclasses must fill these in. See PBSEngineSet
703 # # The name of the command line program used to submit jobs.
704 # submit_command = Str('', config=True)
705 # # The name of the command line program used to delete jobs.
706 # delete_command = Str('', config=True)
707 # # A regular expression used to get the job id from the output of the
708 # # submit_command.
709 # job_id_regexp = Str('', config=True)
710 # # The string that is the batch script template itself.
711 # batch_template = Str('', config=True)
712 # # The filename of the instantiated batch script.
713 # batch_file_name = Unicode(u'batch_script', config=True)
714 # # The full path to the instantiated batch script.
715 # batch_file = Unicode(u'')
716 #
717 # def __init__(self, work_dir=u'.', config=None):
718 # super(BatchSystemLauncher, self).__init__(
719 # work_dir=work_dir, config=config
720 # )
721 # self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
722 # self.context = {}
723 #
724 # def parse_job_id(self, output):
725 # """Take the output of the submit command and return the job id."""
726 # m = re.match(self.job_id_regexp, output)
727 # if m is not None:
728 # job_id = m.group()
729 # else:
730 # raise LauncherError("Job id couldn't be determined: %s" % output)
731 # self.job_id = job_id
732 # logging.info('Job started with job id: %r' % job_id)
733 # return job_id
734 #
735 # def write_batch_script(self, n):
736 # """Instantiate and write the batch script to the work_dir."""
737 # self.context['n'] = n
738 # script_as_string = Itpl.itplns(self.batch_template, self.context)
739 # logging.info('Writing instantiated batch script: %s' % self.batch_file)
740 # f = open(self.batch_file, 'w')
741 # f.write(script_as_string)
742 # f.close()
743 #
744 # @inlineCallbacks
745 # def start(self, n):
746 # """Start n copies of the process using a batch system."""
747 # self.write_batch_script(n)
748 # output = yield getProcessOutput(self.submit_command,
749 # [self.batch_file], env=os.environ)
750 # job_id = self.parse_job_id(output)
751 # self.notify_start(job_id)
752 # defer.returnValue(job_id)
753 #
754 # @inlineCallbacks
755 # def stop(self):
756 # output = yield getProcessOutput(self.delete_command,
757 # [self.job_id], env=os.environ
758 # )
759 # self.notify_stop(output) # Pass the output of the kill cmd
760 # defer.returnValue(output)
761 #
762 #
763 # class PBSLauncher(BatchSystemLauncher):
764 # """A BatchSystemLauncher subclass for PBS."""
765 #
766 # submit_command = Str('qsub', config=True)
767 # delete_command = Str('qdel', config=True)
768 # job_id_regexp = Str(r'\d+', config=True)
769 # batch_template = Str('', config=True)
770 # batch_file_name = Unicode(u'pbs_batch_script', config=True)
771 # batch_file = Unicode(u'')
772 #
773 #
774 # class PBSControllerLauncher(PBSLauncher):
775 # """Launch a controller using PBS."""
776 #
777 # batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
778 #
779 # def start(self, cluster_dir):
780 # """Start the controller by profile or cluster_dir."""
781 # # Here we save profile and cluster_dir in the context so they
782 # # can be used in the batch script template as ${profile} and
783 # # ${cluster_dir}
784 # self.context['cluster_dir'] = cluster_dir
785 # self.cluster_dir = unicode(cluster_dir)
786 # logging.info("Starting PBSControllerLauncher: %r" % self.args)
787 # return super(PBSControllerLauncher, self).start(1)
788 #
789 #
790 # class PBSEngineSetLauncher(PBSLauncher):
791 #
792 # batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
793 #
794 # def start(self, n, cluster_dir):
795 # """Start n engines by profile or cluster_dir."""
796 # self.program_args.extend(['--cluster-dir', cluster_dir])
797 # self.cluster_dir = unicode(cluster_dir)
798 # logging.info('Starting PBSEngineSetLauncher: %r' % self.args)
799 # return super(PBSEngineSetLauncher, self).start(n)
800
801
802 #-----------------------------------------------------------------------------
803 # A launcher for ipcluster itself!
804 #-----------------------------------------------------------------------------
805
806
807 class IPClusterLauncher(LocalProcessLauncher):
808 """Launch the ipcluster program in an external process."""
809
810 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
811 # Command line arguments to pass to ipcluster.
812 ipcluster_args = List(
813 ['--clean-logs', '--log-to-file', '--log-level', str(logging.ERROR)], config=True)
814 ipcluster_subcommand = Str('start')
815 ipcluster_n = Int(2)
816
817 def find_args(self):
818 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
819 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
820
821 def start(self):
822 logging.info("Starting ipcluster: %r" % self.args)
823 return super(IPClusterLauncher, self).start()
824
@@ -90,14 +90,6 b' def defaultblock(f, self, *args, **kwargs):'
90 90 # Classes
91 91 #--------------------------------------------------------------------------
92 92
93 class ResultDict(dict):
94 """A subclass of dict that raises errors if it has them."""
95 def __getitem__(self, key):
96 res = dict.__getitem__(self, key)
97 if isinstance(res, error.KernelError):
98 raise res
99 return res
100
101 93 class Metadata(dict):
102 94 """Subclass of dict for initializing metadata values."""
103 95 def __init__(self, *args, **kwargs):
@@ -35,18 +35,6 b' from IPython.utils.path import ('
35 35 from IPython.utils.traitlets import Unicode
36 36
37 37 #-----------------------------------------------------------------------------
38 # Warnings control
39 #-----------------------------------------------------------------------------
40 # Twisted generates annoying warnings with Python 2.6, as will do other code
41 # that imports 'sets' as of today
42 warnings.filterwarnings('ignore', 'the sets module is deprecated',
43 DeprecationWarning )
44
45 # This one also comes from Twisted
46 warnings.filterwarnings('ignore', 'the sha module is deprecated',
47 DeprecationWarning)
48
49 #-----------------------------------------------------------------------------
50 38 # Module errors
51 39 #-----------------------------------------------------------------------------
52 40
@@ -1,7 +1,6 b''
1 1 #!/usr/bin/env python
2 2 """The IPython Controller with 0MQ
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
3 This is a collection of one Hub and several Schedulers.
5 4 """
6 5 #-----------------------------------------------------------------------------
7 6 # Copyright (C) 2010 The IPython Development Team
@@ -15,75 +14,25 b' and monitors traffic through the various queues.'
15 14 #-----------------------------------------------------------------------------
16 15 from __future__ import print_function
17 16
18 import os
19 import sys
20 import time
21 17 import logging
22 18 from multiprocessing import Process
23 19
24 20 import zmq
25 from zmq.eventloop import ioloop
26 from zmq.eventloop.zmqstream import ZMQStream
27 # from zmq.devices import ProcessMonitoredQueue
28 21
29 22 # internal:
30 23 from IPython.utils.importstring import import_item
31 24 from IPython.utils.traitlets import Int, Str, Instance, List, Bool
32 from IPython.zmq.entry_point import bind_port
33 25
34 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
35 connect_logger, parse_url, signal_children, generate_exec_key,
36 local_logger)
26 from entry_point import signal_children
37 27
38 28
39 import streamsession as session
40 import heartmonitor
41 29 from scheduler import launch_scheduler
42 30 from hub import Hub, HubFactory
43 31
44 from dictdb import DictDB
45 try:
46 import pymongo
47 except ImportError:
48 MongoDB=None
49 else:
50 from mongodb import MongoDB
51
52 #-------------------------------------------------------------------------
53 # Entry Point
54 #-------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33 # Configurable
34 #-----------------------------------------------------------------------------
55 35
56 def make_argument_parser():
57 """Make an argument parser"""
58 parser = make_base_argument_parser()
59
60 parser.add_argument('--client', type=int, metavar='PORT', default=0,
61 help='set the XREP port for clients [default: random]')
62 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
63 help='set the PUB socket for registration notification [default: random]')
64 parser.add_argument('--hb', type=str, metavar='PORTS',
65 help='set the 2 ports for heartbeats [default: random]')
66 parser.add_argument('--ping', type=int, default=100,
67 help='set the heartbeat period in ms [default: 100]')
68 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
69 help='set the SUB port for queue monitoring [default: random]')
70 parser.add_argument('--mux', type=str, metavar='PORTS',
71 help='set the XREP ports for the MUX queue [default: random]')
72 parser.add_argument('--task', type=str, metavar='PORTS',
73 help='set the XREP/XREQ ports for the task queue [default: random]')
74 parser.add_argument('--control', type=str, metavar='PORTS',
75 help='set the XREP ports for the control queue [default: random]')
76 parser.add_argument('--iopub', type=str, metavar='PORTS',
77 help='set the PUB/SUB ports for the iopub relay [default: random]')
78 parser.add_argument('--scheduler', type=str, default='lru',
79 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
80 help='select the task scheduler [default: Python LRU]')
81 parser.add_argument('--mongodb', action='store_true',
82 help='Use MongoDB task storage [default: in-memory]')
83 parser.add_argument('--session', type=str, default=None,
84 help='Manually specify the session id.')
85
86 return parser
87 36
88 37 class ControllerFactory(HubFactory):
89 38 """Configurable for setting up a Hub and Schedulers."""
@@ -158,188 +107,4 b' class ControllerFactory(HubFactory):'
158 107 q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme))
159 108 q.daemon=True
160 109 children.append(q)
161
162
163 def main(argv=None):
164 """DO NOT USE ME ANYMORE"""
165
166 parser = make_argument_parser()
167
168 args = parser.parse_args(argv)
169 parse_url(args)
170
171 iface="%s://%s"%(args.transport,args.ip)+':%i'
172
173 random_ports = 0
174 if args.hb:
175 hb = split_ports(args.hb, 2)
176 else:
177 hb = select_random_ports(2)
178 if args.mux:
179 mux = split_ports(args.mux, 2)
180 else:
181 mux = None
182 random_ports += 2
183 if args.iopub:
184 iopub = split_ports(args.iopub, 2)
185 else:
186 iopub = None
187 random_ports += 2
188 if args.task:
189 task = split_ports(args.task, 2)
190 else:
191 task = None
192 random_ports += 2
193 if args.control:
194 control = split_ports(args.control, 2)
195 else:
196 control = None
197 random_ports += 2
198
199 ctx = zmq.Context()
200 loop = ioloop.IOLoop.instance()
201
202
203 # Registrar socket
204 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
205 regport = bind_port(reg, args.ip, args.regport)
206
207 ### Engine connections ###
208
209 # heartbeat
210 hpub = ctx.socket(zmq.PUB)
211 bind_port(hpub, args.ip, hb[0])
212 hrep = ctx.socket(zmq.XREP)
213 bind_port(hrep, args.ip, hb[1])
214
215 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
216 hmon.start()
217
218 ### Client connections ###
219 # Clientele socket
220 c = ZMQStream(ctx.socket(zmq.XREP), loop)
221 cport = bind_port(c, args.ip, args.client)
222 # Notifier socket
223 n = ZMQStream(ctx.socket(zmq.PUB), loop)
224 nport = bind_port(n, args.ip, args.notice)
225
226 ### Key File ###
227 if args.execkey and not os.path.isfile(args.execkey):
228 generate_exec_key(args.execkey)
229
230 thesession = session.StreamSession(username=args.ident or "controller",
231 keyfile=args.execkey, session=args.session)
232
233 ### build and launch the queues ###
234
235 # monitor socket
236 sub = ctx.socket(zmq.SUB)
237 sub.setsockopt(zmq.SUBSCRIBE, "")
238 monport = bind_port(sub, args.ip, args.monitor)
239 sub = ZMQStream(sub, loop)
240
241 ports = select_random_ports(random_ports)
242 children = []
243
244 # IOPub relay (in a Process)
245 if not iopub:
246 iopub = (ports.pop(),ports.pop())
247 q = ProcessMonitoredQueue(zmq.SUB, zmq.PUB, zmq.PUB, 'iopub', 'N/A')
248 q.bind_in(iface%iopub[1])
249 q.bind_out(iface%iopub[0])
250 q.setsockopt_in(zmq.SUBSCRIBE, '')
251 q.connect_mon(iface%monport)
252 q.daemon=True
253 q.start()
254 children.append(q.launcher)
255
256 # Multiplexer Queue (in a Process)
257 if not mux:
258 mux = (ports.pop(),ports.pop())
259 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
260 q.bind_in(iface%mux[0])
261 q.bind_out(iface%mux[1])
262 q.connect_mon(iface%monport)
263 q.daemon=True
264 q.start()
265 children.append(q.launcher)
266
267 # Control Queue (in a Process)
268 if not control:
269 control = (ports.pop(),ports.pop())
270 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
271 q.bind_in(iface%control[0])
272 q.bind_out(iface%control[1])
273 q.connect_mon(iface%monport)
274 q.daemon=True
275 q.start()
276 children.append(q.launcher)
277 # Task Queue (in a Process)
278 if not task:
279 task = (ports.pop(),ports.pop())
280 if args.scheduler == 'pure':
281 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
282 q.bind_in(iface%task[0])
283 q.bind_out(iface%task[1])
284 q.connect_mon(iface%monport)
285 q.daemon=True
286 q.start()
287 children.append(q.launcher)
288 else:
289 log_addr = iface%args.logport if args.logport else None
290 sargs = (iface%task[0], iface%task[1], iface%monport, iface%nport,
291 log_addr, args.loglevel, args.scheduler)
292 print (sargs)
293 q = Process(target=launch_scheduler, args=sargs)
294 q.daemon=True
295 q.start()
296 children.append(q)
297
298 if args.mongodb:
299 from mongodb import MongoDB
300 db = MongoDB(thesession.session)
301 else:
302 db = DictDB()
303 time.sleep(.25)
304
305 # build connection dicts
306 engine_addrs = {
307 'control' : iface%control[1],
308 'mux': iface%mux[1],
309 'heartbeat': (iface%hb[0], iface%hb[1]),
310 'task' : iface%task[1],
311 'iopub' : iface%iopub[1],
312 'monitor' : iface%monport,
313 }
314
315 client_addrs = {
316 'control' : iface%control[0],
317 'query': iface%cport,
318 'mux': iface%mux[0],
319 'task' : iface%task[0],
320 'iopub' : iface%iopub[0],
321 'notification': iface%nport
322 }
323
324 # setup logging
325 if args.logport:
326 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
327 else:
328 local_logger(args.loglevel)
329
330 # register relay of signals to the children
331 signal_children(children)
332 hub = Hub(loop=loop, session=thesession, monitor=sub, heartmonitor=hmon,
333 registrar=reg, clientele=c, notifier=n, db=db,
334 engine_addrs=engine_addrs, client_addrs=client_addrs)
335
336 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
337 dc.start()
338 try:
339 loop.start()
340 except KeyboardInterrupt:
341 print ("interrupted, exiting...", file=sys.__stderr__)
342
343 110
344 if __name__ == '__main__':
345 main()
@@ -6,7 +6,6 b" connected to the Controller's queue(s)."
6 6 from __future__ import print_function
7 7 import sys
8 8 import time
9 import traceback
10 9 import uuid
11 10 import logging
12 11 from pprint import pprint
@@ -21,12 +20,9 b' from IPython.utils.traitlets import Instance, Str, Dict, Int, Type'
21 20
22 21 from factory import RegistrationFactory
23 22
24 from streamsession import Message, StreamSession
25 from streamkernel import Kernel, make_kernel
23 from streamsession import Message
24 from streamkernel import Kernel
26 25 import heartmonitor
27 from entry_point import (make_base_argument_parser, connect_engine_logger, parse_url,
28 local_logger)
29 # import taskthread
30 26
31 27 def printer(*msg):
32 28 # print (logging.handlers, file=sys.__stdout__)
@@ -107,16 +103,15 b' class EngineFactory(RegistrationFactory):'
107 103 # print (hb_addrs)
108 104
109 105 # # Redirect input streams and set a display hook.
110 # if self.out_stream_factory:
111 # sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
112 # sys.stdout.topic = 'engine.%i.stdout'%self.id
113 # sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
114 # sys.stderr.topic = 'engine.%i.stderr'%self.id
115 # if self.display_hook_factory:
116 # sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
117 # sys.displayhook.topic = 'engine.%i.pyout'%self.id
106 if self.out_stream_factory:
107 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
108 sys.stdout.topic = 'engine.%i.stdout'%self.id
109 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
110 sys.stderr.topic = 'engine.%i.stderr'%self.id
111 if self.display_hook_factory:
112 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
113 sys.displayhook.topic = 'engine.%i.pyout'%self.id
118 114
119 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
120 115 self.kernel = Kernel(int_id=self.id, ident=self.ident, session=self.session,
121 116 control_stream=control_stream,
122 117 shell_streams=shell_streams, iopub_stream=iopub_stream, loop=loop,
@@ -124,6 +119,7 b' class EngineFactory(RegistrationFactory):'
124 119 self.kernel.start()
125 120
126 121 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
122 # ioloop.DelayedCallback(heart.start, 1000, self.loop).start()
127 123 heart.start()
128 124
129 125
@@ -143,48 +139,3 b' class EngineFactory(RegistrationFactory):'
143 139 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
144 140 dc.start()
145 141
146
147
148 def main(argv=None, user_ns=None):
149 """DO NOT USE ME ANYMORE"""
150 parser = make_base_argument_parser()
151
152 args = parser.parse_args(argv)
153
154 parse_url(args)
155
156 iface="%s://%s"%(args.transport,args.ip)+':%i'
157
158 loop = ioloop.IOLoop.instance()
159 session = StreamSession(keyfile=args.execkey)
160 # print (session.key)
161 ctx = zmq.Context()
162
163 # setup logging
164
165 reg_conn = iface % args.regport
166 print (reg_conn, file=sys.__stdout__)
167 print ("Starting the engine...", file=sys.__stderr__)
168
169 reg = ctx.socket(zmq.PAIR)
170 reg.connect(reg_conn)
171 reg = zmqstream.ZMQStream(reg, loop)
172
173 e = Engine(context=ctx, loop=loop, session=session, registrar=reg,
174 ident=args.ident or '', user_ns=user_ns)
175 if args.logport:
176 print ("connecting logger to %s"%(iface%args.logport), file=sys.__stdout__)
177 connect_engine_logger(ctx, iface%args.logport, e, loglevel=args.loglevel)
178 else:
179 local_logger(args.loglevel)
180
181 dc = ioloop.DelayedCallback(e.start, 0, loop)
182 dc.start()
183 try:
184 loop.start()
185 except KeyboardInterrupt:
186 print ("interrupted, exiting...", file=sys.__stderr__)
187
188 # Execution as a script
189 if __name__ == '__main__':
190 main()
@@ -28,15 +28,6 b' from IPython.core.ultratb import FormattedTB'
28 28 from IPython.external.argparse import ArgumentParser
29 29 from IPython.zmq.log import EnginePUBHandler
30 30
31 def split_ports(s, n):
32 """Parser helper for multiport strings"""
33 if not s:
34 return tuple([0]*n)
35 ports = map(int, s.split(','))
36 if len(ports) != n:
37 raise ValueError
38 return ports
39
40 31 _random_ports = set()
41 32
42 33 def select_random_ports(n):
@@ -57,18 +48,6 b' def select_random_ports(n):'
57 48 _random_ports.add(port)
58 49 return ports
59 50
60 def parse_url(args):
61 """Ensure args.url contains full transport://interface:port"""
62 if args.url:
63 iface = args.url.split('://',1)
64 if len(args) == 2:
65 args.transport,iface = iface
66 iface = iface.split(':')
67 args.ip = iface[0]
68 if iface[1]:
69 args.regport = iface[1]
70 args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport)
71
72 51 def signal_children(children):
73 52 """Relay interupt/term signals to children, for more solid process cleanup."""
74 53 def terminate_children(sig, frame):
@@ -90,35 +69,7 b' def generate_exec_key(keyfile):'
90 69 # set user-only RW permissions (0600)
91 70 # this will have no effect on Windows
92 71 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
93
94
95 def make_base_argument_parser():
96 """ Creates an ArgumentParser for the generic arguments supported by all
97 ipcluster entry points.
98 """
99
100 parser = ArgumentParser()
101 parser.add_argument('--ip', type=str, default='127.0.0.1',
102 help='set the controller\'s IP address [default: local]')
103 parser.add_argument('--transport', type=str, default='tcp',
104 help='set the transport to use [default: tcp]')
105 parser.add_argument('--regport', type=int, metavar='PORT', default=10101,
106 help='set the XREP port for registration [default: 10101]')
107 parser.add_argument('--logport', type=int, metavar='PORT', default=0,
108 help='set the PUB port for remote logging [default: log to stdout]')
109 parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.INFO,
110 help='set the log level [default: INFO]')
111 parser.add_argument('--ident', type=str,
112 help='set the ZMQ identity [default: random]')
113 parser.add_argument('--packer', type=str, default='json',
114 choices=['json','pickle'],
115 help='set the message format method [default: json]')
116 parser.add_argument('--url', type=str,
117 help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101')
118 parser.add_argument('--execkey', type=str,
119 help="File containing key for authenticating requests.")
120 72
121 return parser
122 73
123 74 def integer_loglevel(loglevel):
124 75 try:
@@ -158,26 +158,26 b' class HubFactory(RegistrationFactory):'
158 158 subconstructors = List()
159 159 _constructed = Bool(False)
160 160
161 def _ip_changed(self, name, old, new):
162 self.engine_ip = new
163 self.client_ip = new
164 self.monitor_ip = new
165 self._update_monitor_url()
166
161 167 def _update_monitor_url(self):
162 168 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
163 169
164 def _sync_ips(self):
165 self.engine_ip = self.ip
166 self.client_ip = self.ip
167 self.monitor_ip = self.ip
168 self._update_monitor_url()
169
170 def _sync_transports(self):
171 self.engine_transport = self.transport
172 self.client_transport = self.transport
173 self.monitor_transport = self.transport
170 def _transport_changed(self, name, old, new):
171 self.engine_transport = new
172 self.client_transport = new
173 self.monitor_transport = new
174 174 self._update_monitor_url()
175 175
176 176 def __init__(self, **kwargs):
177 177 super(HubFactory, self).__init__(**kwargs)
178 178 self._update_monitor_url()
179 self.on_trait_change(self._sync_ips, 'ip')
180 self.on_trait_change(self._sync_transports, 'transport')
179 # self.on_trait_change(self._sync_ips, 'ip')
180 # self.on_trait_change(self._sync_transports, 'transport')
181 181 self.subconstructors.append(self.construct_hub)
182 182
183 183
@@ -334,45 +334,11 b' class Hub(HasTraits):'
334 334 """
335 335
336 336 super(Hub, self).__init__(**kwargs)
337 self.ids = set()
338 self.pending = set()
339 # self.keytable={}
340 # self.incoming_registrations={}
341 # self.engines = {}
342 # self.by_ident = {}
343 # self.clients = {}
344 # self.hearts = {}
345 # self.mia = set()
346 337 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
347 # this is the stuff that will move to DB:
348 # self.pending = set() # pending messages, keyed by msg_id
349 # self.queues = {} # pending msg_ids keyed by engine_id
350 # self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
351 # self.completed = {} # completed msg_ids keyed by engine_id
352 # self.all_completed = set()
353 # self._idcounter = 0
354 # self.sockets = {}
355 # self.loop = loop
356 # self.session = session
357 # self.registrar = registrar
358 # self.clientele = clientele
359 # self.queue = queue
360 # self.heartmonitor = heartbeat
361 # self.notifier = notifier
362 # self.db = db
363 338
364 339 # validate connection dicts:
365 # self.client_addrs = client_addrs
366 340 validate_url_container(self.client_addrs)
367
368 # assert isinstance(self.client_addrs['queue'], str)
369 # assert isinstance(self.client_addrs['control'], str)
370 # self.hb_addrs = hb_addrs
371 341 validate_url_container(self.engine_addrs)
372 # self.engine_addrs = engine_addrs
373 # assert isinstance(self.engine_addrs['queue'], str)
374 # assert isinstance(self.engine_addrs['control'], str)
375 # assert len(engine_addrs['heartbeat']) == 2
376 342
377 343 # register our callbacks
378 344 self.registrar.on_recv(self.dispatch_register_request)
@@ -409,7 +375,9 b' class Hub(HasTraits):'
409 375
410 376 @property
411 377 def _next_id(self):
412 """gemerate a new ID"""
378 """gemerate a new ID.
379
380 No longer reuse old ids, just count from 0."""
413 381 newid = self._idcounter
414 382 self._idcounter += 1
415 383 return newid
@@ -123,6 +123,11 b' class IPControllerAppConfigLoader(ClusterDirConfigLoader):'
123 123 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
124 124 'connections [default: random]',
125 125 metavar='Hub.hb_ports')
126 paa('--ping',
127 type=int, dest='HubFactory.ping',
128 help='The frequency at which the Hub pings the engines for heartbeats '
129 ' (in ms) [default: 100]',
130 metavar='Hub.ping')
126 131
127 132 # Client config
128 133 paa('--client-ip',
@@ -204,10 +209,10 b' class IPControllerAppConfigLoader(ClusterDirConfigLoader):'
204 209 help='Try to reuse existing execution keys.')
205 210 paa('--no-secure',
206 211 action='store_false', dest='Global.secure',
207 help='Turn off execution keys.')
212 help='Turn off execution keys (default).')
208 213 paa('--secure',
209 214 action='store_true', dest='Global.secure',
210 help='Turn on execution keys (default).')
215 help='Turn on execution keys.')
211 216 paa('--execkey',
212 217 type=str, dest='Global.exec_key',
213 218 help='path to a file containing an execution key.',
@@ -280,6 +285,19 b' class IPControllerApp(ApplicationWithClusterDir):'
280 285 except:
281 286 self.log.error("Couldn't construct the Controller", exc_info=True)
282 287 self.exit(1)
288
289 def save_urls(self):
290 """save the registration urls to files."""
291 c = self.master_config
292
293 sec_dir = c.Global.security_dir
294 cf = self.factory
295
296 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
297 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
298
299 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
300 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
283 301
284 302
285 303 def import_statements(self):
@@ -291,19 +309,19 b' class IPControllerApp(ApplicationWithClusterDir):'
291 309 except:
292 310 self.log.msg("Error running statement: %s" % s)
293 311
294 # def start_logging(self):
295 # super(IPControllerApp, self).start_logging()
296 # if self.master_config.Global.log_url:
297 # context = self.factory.context
298 # lsock = context.socket(zmq.PUB)
299 # lsock.connect(self.master_config.Global.log_url)
300 # handler = PUBHandler(lsock)
301 # handler.root_topic = 'controller'
302 # handler.setLevel(self.log_level)
303 # self.log.addHandler(handler)
312 def start_logging(self):
313 super(IPControllerApp, self).start_logging()
314 if self.master_config.Global.log_url:
315 context = self.factory.context
316 lsock = context.socket(zmq.PUB)
317 lsock.connect(self.master_config.Global.log_url)
318 handler = PUBHandler(lsock)
319 handler.root_topic = 'controller'
320 handler.setLevel(self.log_level)
321 self.log.addHandler(handler)
304 322 #
305 323 def start_app(self):
306 # Start the controller service.
324 # Start the subprocesses:
307 325 self.factory.start()
308 326 self.write_pid_file(overwrite=True)
309 327 try:
@@ -218,12 +218,13 b" if 'setuptools' in sys.modules:"
218 218 'ipcontrollerz = IPython.zmq.parallel.ipcontrollerapp:launch_new_instance',
219 219 'ipenginez = IPython.zmq.parallel.ipengineapp:launch_new_instance',
220 220 'iploggerz = IPython.zmq.parallel.iploggerapp:launch_new_instance',
221 'ipclusterz = IPython.zmq.parallel.ipcluster:main',
221 'ipclusterz = IPython.zmq.parallel.ipclusterapp:launch_new_instance',
222 222 'iptest = IPython.testing.iptest:main',
223 223 'irunner = IPython.lib.irunner:main'
224 224 ]
225 225 }
226 226 setup_args['extras_require'] = dict(
227 zmq = 'pyzmq>=2.0.10',
227 228 doc='Sphinx>=0.3',
228 229 test='nose>=0.10.1',
229 230 security='pyOpenSSL>=0.6'
General Comments 0
You need to be logged in to leave comments. Login now