Show More
This diff has been collapsed as it changes many lines, (502 lines changed) Show them Hide them | |||
@@ -0,0 +1,502 b'' | |||
|
1 | #!/usr/bin/env python | |
|
2 | # encoding: utf-8 | |
|
3 | """ | |
|
4 | The ipcluster application. | |
|
5 | """ | |
|
6 | ||
|
7 | #----------------------------------------------------------------------------- | |
|
8 | # Copyright (C) 2008-2009 The IPython Development Team | |
|
9 | # | |
|
10 | # Distributed under the terms of the BSD License. The full license is in | |
|
11 | # the file COPYING, distributed as part of this software. | |
|
12 | #----------------------------------------------------------------------------- | |
|
13 | ||
|
14 | #----------------------------------------------------------------------------- | |
|
15 | # Imports | |
|
16 | #----------------------------------------------------------------------------- | |
|
17 | ||
|
18 | import logging | |
|
19 | import os | |
|
20 | import signal | |
|
21 | import logging | |
|
22 | ||
|
23 | from zmq.eventloop import ioloop | |
|
24 | ||
|
25 | from IPython.external.argparse import ArgumentParser, SUPPRESS | |
|
26 | from IPython.utils.importstring import import_item | |
|
27 | from IPython.zmq.parallel.clusterdir import ( | |
|
28 | ApplicationWithClusterDir, ClusterDirConfigLoader, | |
|
29 | ClusterDirError, PIDFileError | |
|
30 | ) | |
|
31 | ||
|
32 | ||
|
33 | #----------------------------------------------------------------------------- | |
|
34 | # Module level variables | |
|
35 | #----------------------------------------------------------------------------- | |
|
36 | ||
|
37 | ||
|
38 | default_config_file_name = u'ipcluster_config.py' | |
|
39 | ||
|
40 | ||
|
41 | _description = """\ | |
|
42 | Start an IPython cluster for parallel computing.\n\n | |
|
43 | ||
|
44 | An IPython cluster consists of 1 controller and 1 or more engines. | |
|
45 | This command automates the startup of these processes using a wide | |
|
46 | range of startup methods (SSH, local processes, PBS, mpiexec, | |
|
47 | Windows HPC Server 2008). To start a cluster with 4 engines on your | |
|
48 | local host simply do 'ipcluster start -n 4'. For more complex usage | |
|
49 | you will typically do 'ipcluster create -p mycluster', then edit | |
|
50 | configuration files, followed by 'ipcluster start -p mycluster -n 4'. | |
|
51 | """ | |
|
52 | ||
|
53 | ||
|
54 | # Exit codes for ipcluster | |
|
55 | ||
|
56 | # This will be the exit code if the ipcluster appears to be running because | |
|
57 | # a .pid file exists | |
|
58 | ALREADY_STARTED = 10 | |
|
59 | ||
|
60 | ||
|
61 | # This will be the exit code if ipcluster stop is run, but there is not .pid | |
|
62 | # file to be found. | |
|
63 | ALREADY_STOPPED = 11 | |
|
64 | ||
|
65 | ||
|
66 | #----------------------------------------------------------------------------- | |
|
67 | # Command line options | |
|
68 | #----------------------------------------------------------------------------- | |
|
69 | ||
|
70 | ||
|
71 | class IPClusterAppConfigLoader(ClusterDirConfigLoader): | |
|
72 | ||
|
73 | def _add_arguments(self): | |
|
74 | # Don't call ClusterDirConfigLoader._add_arguments as we don't want | |
|
75 | # its defaults on self.parser. Instead, we will put those on | |
|
76 | # default options on our subparsers. | |
|
77 | ||
|
78 | # This has all the common options that all subcommands use | |
|
79 | parent_parser1 = ArgumentParser( | |
|
80 | add_help=False, | |
|
81 | argument_default=SUPPRESS | |
|
82 | ) | |
|
83 | self._add_ipython_dir(parent_parser1) | |
|
84 | self._add_log_level(parent_parser1) | |
|
85 | ||
|
86 | # This has all the common options that other subcommands use | |
|
87 | parent_parser2 = ArgumentParser( | |
|
88 | add_help=False, | |
|
89 | argument_default=SUPPRESS | |
|
90 | ) | |
|
91 | self._add_cluster_profile(parent_parser2) | |
|
92 | self._add_cluster_dir(parent_parser2) | |
|
93 | self._add_work_dir(parent_parser2) | |
|
94 | paa = parent_parser2.add_argument | |
|
95 | paa('--log-to-file', | |
|
96 | action='store_true', dest='Global.log_to_file', | |
|
97 | help='Log to a file in the log directory (default is stdout)') | |
|
98 | ||
|
99 | # Create the object used to create the subparsers. | |
|
100 | subparsers = self.parser.add_subparsers( | |
|
101 | dest='Global.subcommand', | |
|
102 | title='ipcluster subcommands', | |
|
103 | description= | |
|
104 | """ipcluster has a variety of subcommands. The general way of | |
|
105 | running ipcluster is 'ipcluster <cmd> [options]'. To get help | |
|
106 | on a particular subcommand do 'ipcluster <cmd> -h'.""" | |
|
107 | # help="For more help, type 'ipcluster <cmd> -h'", | |
|
108 | ) | |
|
109 | ||
|
110 | # The "list" subcommand parser | |
|
111 | parser_list = subparsers.add_parser( | |
|
112 | 'list', | |
|
113 | parents=[parent_parser1], | |
|
114 | argument_default=SUPPRESS, | |
|
115 | help="List all clusters in cwd and ipython_dir.", | |
|
116 | description= | |
|
117 | """List all available clusters, by cluster directory, that can | |
|
118 | be found in the current working directly or in the ipython | |
|
119 | directory. Cluster directories are named using the convention | |
|
120 | 'cluster_<profile>'.""" | |
|
121 | ) | |
|
122 | ||
|
123 | # The "create" subcommand parser | |
|
124 | parser_create = subparsers.add_parser( | |
|
125 | 'create', | |
|
126 | parents=[parent_parser1, parent_parser2], | |
|
127 | argument_default=SUPPRESS, | |
|
128 | help="Create a new cluster directory.", | |
|
129 | description= | |
|
130 | """Create an ipython cluster directory by its profile name or | |
|
131 | cluster directory path. Cluster directories contain | |
|
132 | configuration, log and security related files and are named | |
|
133 | using the convention 'cluster_<profile>'. By default they are | |
|
134 | located in your ipython directory. Once created, you will | |
|
135 | probably need to edit the configuration files in the cluster | |
|
136 | directory to configure your cluster. Most users will create a | |
|
137 | cluster directory by profile name, | |
|
138 | 'ipcluster create -p mycluster', which will put the directory | |
|
139 | in '<ipython_dir>/cluster_mycluster'. | |
|
140 | """ | |
|
141 | ) | |
|
142 | paa = parser_create.add_argument | |
|
143 | paa('--reset-config', | |
|
144 | dest='Global.reset_config', action='store_true', | |
|
145 | help= | |
|
146 | """Recopy the default config files to the cluster directory. | |
|
147 | You will loose any modifications you have made to these files.""") | |
|
148 | ||
|
149 | # The "start" subcommand parser | |
|
150 | parser_start = subparsers.add_parser( | |
|
151 | 'start', | |
|
152 | parents=[parent_parser1, parent_parser2], | |
|
153 | argument_default=SUPPRESS, | |
|
154 | help="Start a cluster.", | |
|
155 | description= | |
|
156 | """Start an ipython cluster by its profile name or cluster | |
|
157 | directory. Cluster directories contain configuration, log and | |
|
158 | security related files and are named using the convention | |
|
159 | 'cluster_<profile>' and should be creating using the 'start' | |
|
160 | subcommand of 'ipcluster'. If your cluster directory is in | |
|
161 | the cwd or the ipython directory, you can simply refer to it | |
|
162 | using its profile name, 'ipcluster start -n 4 -p <profile>`, | |
|
163 | otherwise use the '--cluster-dir' option. | |
|
164 | """ | |
|
165 | ) | |
|
166 | paa = parser_start.add_argument | |
|
167 | paa('-n', '--number', | |
|
168 | type=int, dest='Global.n', | |
|
169 | help='The number of engines to start.', | |
|
170 | metavar='Global.n') | |
|
171 | paa('--clean-logs', | |
|
172 | dest='Global.clean_logs', action='store_true', | |
|
173 | help='Delete old log flies before starting.') | |
|
174 | paa('--no-clean-logs', | |
|
175 | dest='Global.clean_logs', action='store_false', | |
|
176 | help="Don't delete old log flies before starting.") | |
|
177 | paa('--daemon', | |
|
178 | dest='Global.daemonize', action='store_true', | |
|
179 | help='Daemonize the ipcluster program. This implies --log-to-file') | |
|
180 | paa('--no-daemon', | |
|
181 | dest='Global.daemonize', action='store_false', | |
|
182 | help="Dont't daemonize the ipcluster program.") | |
|
183 | ||
|
184 | # The "stop" subcommand parser | |
|
185 | parser_stop = subparsers.add_parser( | |
|
186 | 'stop', | |
|
187 | parents=[parent_parser1, parent_parser2], | |
|
188 | argument_default=SUPPRESS, | |
|
189 | help="Stop a running cluster.", | |
|
190 | description= | |
|
191 | """Stop a running ipython cluster by its profile name or cluster | |
|
192 | directory. Cluster directories are named using the convention | |
|
193 | 'cluster_<profile>'. If your cluster directory is in | |
|
194 | the cwd or the ipython directory, you can simply refer to it | |
|
195 | using its profile name, 'ipcluster stop -p <profile>`, otherwise | |
|
196 | use the '--cluster-dir' option. | |
|
197 | """ | |
|
198 | ) | |
|
199 | paa = parser_stop.add_argument | |
|
200 | paa('--signal', | |
|
201 | dest='Global.signal', type=int, | |
|
202 | help="The signal number to use in stopping the cluster (default=2).", | |
|
203 | metavar="Global.signal") | |
|
204 | ||
|
205 | ||
|
206 | #----------------------------------------------------------------------------- | |
|
207 | # Main application | |
|
208 | #----------------------------------------------------------------------------- | |
|
209 | ||
|
210 | ||
|
211 | class IPClusterApp(ApplicationWithClusterDir): | |
|
212 | ||
|
213 | name = u'ipclusterz' | |
|
214 | description = _description | |
|
215 | usage = None | |
|
216 | command_line_loader = IPClusterAppConfigLoader | |
|
217 | default_config_file_name = default_config_file_name | |
|
218 | default_log_level = logging.INFO | |
|
219 | auto_create_cluster_dir = False | |
|
220 | ||
|
221 | def create_default_config(self): | |
|
222 | super(IPClusterApp, self).create_default_config() | |
|
223 | self.default_config.Global.controller_launcher = \ | |
|
224 | 'IPython.zmq.parallel.launcher.LocalControllerLauncher' | |
|
225 | self.default_config.Global.engine_launcher = \ | |
|
226 | 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher' | |
|
227 | self.default_config.Global.n = 2 | |
|
228 | self.default_config.Global.reset_config = False | |
|
229 | self.default_config.Global.clean_logs = True | |
|
230 | self.default_config.Global.signal = 2 | |
|
231 | self.default_config.Global.daemonize = False | |
|
232 | ||
|
233 | def find_resources(self): | |
|
234 | subcommand = self.command_line_config.Global.subcommand | |
|
235 | if subcommand=='list': | |
|
236 | self.list_cluster_dirs() | |
|
237 | # Exit immediately because there is nothing left to do. | |
|
238 | self.exit() | |
|
239 | elif subcommand=='create': | |
|
240 | self.auto_create_cluster_dir = True | |
|
241 | super(IPClusterApp, self).find_resources() | |
|
242 | elif subcommand=='start' or subcommand=='stop': | |
|
243 | self.auto_create_cluster_dir = True | |
|
244 | try: | |
|
245 | super(IPClusterApp, self).find_resources() | |
|
246 | except ClusterDirError: | |
|
247 | raise ClusterDirError( | |
|
248 | "Could not find a cluster directory. A cluster dir must " | |
|
249 | "be created before running 'ipcluster start'. Do " | |
|
250 | "'ipcluster create -h' or 'ipcluster list -h' for more " | |
|
251 | "information about creating and listing cluster dirs." | |
|
252 | ) | |
|
253 | ||
|
254 | def list_cluster_dirs(self): | |
|
255 | # Find the search paths | |
|
256 | cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','') | |
|
257 | if cluster_dir_paths: | |
|
258 | cluster_dir_paths = cluster_dir_paths.split(':') | |
|
259 | else: | |
|
260 | cluster_dir_paths = [] | |
|
261 | try: | |
|
262 | ipython_dir = self.command_line_config.Global.ipython_dir | |
|
263 | except AttributeError: | |
|
264 | ipython_dir = self.default_config.Global.ipython_dir | |
|
265 | paths = [os.getcwd(), ipython_dir] + \ | |
|
266 | cluster_dir_paths | |
|
267 | paths = list(set(paths)) | |
|
268 | ||
|
269 | self.log.info('Searching for cluster dirs in paths: %r' % paths) | |
|
270 | for path in paths: | |
|
271 | files = os.listdir(path) | |
|
272 | for f in files: | |
|
273 | full_path = os.path.join(path, f) | |
|
274 | if os.path.isdir(full_path) and f.startswith('cluster_'): | |
|
275 | profile = full_path.split('_')[-1] | |
|
276 | start_cmd = 'ipcluster start -p %s -n 4' % profile | |
|
277 | print start_cmd + " ==> " + full_path | |
|
278 | ||
|
279 | def pre_construct(self): | |
|
280 | # IPClusterApp.pre_construct() is where we cd to the working directory. | |
|
281 | super(IPClusterApp, self).pre_construct() | |
|
282 | config = self.master_config | |
|
283 | try: | |
|
284 | daemon = config.Global.daemonize | |
|
285 | if daemon: | |
|
286 | config.Global.log_to_file = True | |
|
287 | except AttributeError: | |
|
288 | pass | |
|
289 | ||
|
290 | def construct(self): | |
|
291 | config = self.master_config | |
|
292 | subcmd = config.Global.subcommand | |
|
293 | reset = config.Global.reset_config | |
|
294 | if subcmd == 'list': | |
|
295 | return | |
|
296 | if subcmd == 'create': | |
|
297 | self.log.info('Copying default config files to cluster directory ' | |
|
298 | '[overwrite=%r]' % (reset,)) | |
|
299 | self.cluster_dir_obj.copy_all_config_files(overwrite=reset) | |
|
300 | if subcmd =='start': | |
|
301 | self.cluster_dir_obj.copy_all_config_files(overwrite=False) | |
|
302 | self.start_logging() | |
|
303 | self.loop = ioloop.IOLoop.instance() | |
|
304 | # reactor.callWhenRunning(self.start_launchers) | |
|
305 | dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop) | |
|
306 | dc.start() | |
|
307 | ||
|
308 | def start_launchers(self): | |
|
309 | config = self.master_config | |
|
310 | ||
|
311 | # Create the launchers. In both bases, we set the work_dir of | |
|
312 | # the launcher to the cluster_dir. This is where the launcher's | |
|
313 | # subprocesses will be launched. It is not where the controller | |
|
314 | # and engine will be launched. | |
|
315 | el_class = import_item(config.Global.engine_launcher) | |
|
316 | self.engine_launcher = el_class( | |
|
317 | work_dir=self.cluster_dir, config=config | |
|
318 | ) | |
|
319 | cl_class = import_item(config.Global.controller_launcher) | |
|
320 | self.controller_launcher = cl_class( | |
|
321 | work_dir=self.cluster_dir, config=config | |
|
322 | ) | |
|
323 | ||
|
324 | # Setup signals | |
|
325 | signal.signal(signal.SIGINT, self.sigint_handler) | |
|
326 | ||
|
327 | # Setup the observing of stopping. If the controller dies, shut | |
|
328 | # everything down as that will be completely fatal for the engines. | |
|
329 | self.controller_launcher.on_stop(self.stop_launchers) | |
|
330 | # d1.addCallback(self.stop_launchers) | |
|
331 | # But, we don't monitor the stopping of engines. An engine dying | |
|
332 | # is just fine and in principle a user could start a new engine. | |
|
333 | # Also, if we did monitor engine stopping, it is difficult to | |
|
334 | # know what to do when only some engines die. Currently, the | |
|
335 | # observing of engine stopping is inconsistent. Some launchers | |
|
336 | # might trigger on a single engine stopping, other wait until | |
|
337 | # all stop. TODO: think more about how to handle this. | |
|
338 | ||
|
339 | # Start the controller and engines | |
|
340 | self._stopping = False # Make sure stop_launchers is not called 2x. | |
|
341 | d = self.start_controller() | |
|
342 | self.start_engines() | |
|
343 | self.startup_message() | |
|
344 | # d.addCallback(self.start_engines) | |
|
345 | # d.addCallback(self.startup_message) | |
|
346 | # If the controller or engines fail to start, stop everything | |
|
347 | # d.addErrback(self.stop_launchers) | |
|
348 | return d | |
|
349 | ||
|
350 | def startup_message(self, r=None): | |
|
351 | logging.info("IPython cluster: started") | |
|
352 | return r | |
|
353 | ||
|
354 | def start_controller(self, r=None): | |
|
355 | # logging.info("In start_controller") | |
|
356 | config = self.master_config | |
|
357 | d = self.controller_launcher.start( | |
|
358 | cluster_dir=config.Global.cluster_dir | |
|
359 | ) | |
|
360 | return d | |
|
361 | ||
|
362 | def start_engines(self, r=None): | |
|
363 | # logging.info("In start_engines") | |
|
364 | config = self.master_config | |
|
365 | d = self.engine_launcher.start( | |
|
366 | config.Global.n, | |
|
367 | cluster_dir=config.Global.cluster_dir | |
|
368 | ) | |
|
369 | return d | |
|
370 | ||
|
371 | def stop_controller(self, r=None): | |
|
372 | # logging.info("In stop_controller") | |
|
373 | if self.controller_launcher.running: | |
|
374 | return self.controller_launcher.stop() | |
|
375 | ||
|
376 | def stop_engines(self, r=None): | |
|
377 | # logging.info("In stop_engines") | |
|
378 | if self.engine_launcher.running: | |
|
379 | d = self.engine_launcher.stop() | |
|
380 | # d.addErrback(self.log_err) | |
|
381 | return d | |
|
382 | else: | |
|
383 | return None | |
|
384 | ||
|
385 | def log_err(self, f): | |
|
386 | logging.error(f.getTraceback()) | |
|
387 | return None | |
|
388 | ||
|
389 | def stop_launchers(self, r=None): | |
|
390 | if not self._stopping: | |
|
391 | self._stopping = True | |
|
392 | # if isinstance(r, failure.Failure): | |
|
393 | # logging.error('Unexpected error in ipcluster:') | |
|
394 | # logging.info(r.getTraceback()) | |
|
395 | logging.error("IPython cluster: stopping") | |
|
396 | # These return deferreds. We are not doing anything with them | |
|
397 | # but we are holding refs to them as a reminder that they | |
|
398 | # do return deferreds. | |
|
399 | d1 = self.stop_engines() | |
|
400 | d2 = self.stop_controller() | |
|
401 | # Wait a few seconds to let things shut down. | |
|
402 | dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop) | |
|
403 | dc.start() | |
|
404 | # reactor.callLater(4.0, reactor.stop) | |
|
405 | ||
|
406 | def sigint_handler(self, signum, frame): | |
|
407 | self.stop_launchers() | |
|
408 | ||
|
409 | def start_logging(self): | |
|
410 | # Remove old log files of the controller and engine | |
|
411 | if self.master_config.Global.clean_logs: | |
|
412 | log_dir = self.master_config.Global.log_dir | |
|
413 | for f in os.listdir(log_dir): | |
|
414 | if f.startswith('ipengine' + '-'): | |
|
415 | if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'): | |
|
416 | os.remove(os.path.join(log_dir, f)) | |
|
417 | if f.startswith('ipcontroller' + '-'): | |
|
418 | if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'): | |
|
419 | os.remove(os.path.join(log_dir, f)) | |
|
420 | # This will remote old log files for ipcluster itself | |
|
421 | super(IPClusterApp, self).start_logging() | |
|
422 | ||
|
423 | def start_app(self): | |
|
424 | """Start the application, depending on what subcommand is used.""" | |
|
425 | subcmd = self.master_config.Global.subcommand | |
|
426 | if subcmd=='create' or subcmd=='list': | |
|
427 | return | |
|
428 | elif subcmd=='start': | |
|
429 | self.start_app_start() | |
|
430 | elif subcmd=='stop': | |
|
431 | self.start_app_stop() | |
|
432 | ||
|
433 | def start_app_start(self): | |
|
434 | """Start the app for the start subcommand.""" | |
|
435 | config = self.master_config | |
|
436 | # First see if the cluster is already running | |
|
437 | try: | |
|
438 | pid = self.get_pid_from_file() | |
|
439 | except PIDFileError: | |
|
440 | pass | |
|
441 | else: | |
|
442 | self.log.critical( | |
|
443 | 'Cluster is already running with [pid=%s]. ' | |
|
444 | 'use "ipcluster stop" to stop the cluster.' % pid | |
|
445 | ) | |
|
446 | # Here I exit with a unusual exit status that other processes | |
|
447 | # can watch for to learn how I existed. | |
|
448 | self.exit(ALREADY_STARTED) | |
|
449 | ||
|
450 | # Now log and daemonize | |
|
451 | self.log.info( | |
|
452 | 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize | |
|
453 | ) | |
|
454 | # TODO: Get daemonize working on Windows or as a Windows Server. | |
|
455 | if config.Global.daemonize: | |
|
456 | if os.name=='posix': | |
|
457 | from twisted.scripts._twistd_unix import daemonize | |
|
458 | daemonize() | |
|
459 | ||
|
460 | # Now write the new pid file AFTER our new forked pid is active. | |
|
461 | self.write_pid_file() | |
|
462 | try: | |
|
463 | self.loop.start() | |
|
464 | except: | |
|
465 | logging.info("stopping...") | |
|
466 | self.remove_pid_file() | |
|
467 | ||
|
468 | def start_app_stop(self): | |
|
469 | """Start the app for the stop subcommand.""" | |
|
470 | config = self.master_config | |
|
471 | try: | |
|
472 | pid = self.get_pid_from_file() | |
|
473 | except PIDFileError: | |
|
474 | self.log.critical( | |
|
475 | 'Problem reading pid file, cluster is probably not running.' | |
|
476 | ) | |
|
477 | # Here I exit with a unusual exit status that other processes | |
|
478 | # can watch for to learn how I existed. | |
|
479 | self.exit(ALREADY_STOPPED) | |
|
480 | else: | |
|
481 | if os.name=='posix': | |
|
482 | sig = config.Global.signal | |
|
483 | self.log.info( | |
|
484 | "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig) | |
|
485 | ) | |
|
486 | os.kill(pid, sig) | |
|
487 | elif os.name=='nt': | |
|
488 | # As of right now, we don't support daemonize on Windows, so | |
|
489 | # stop will not do anything. Minimally, it should clean up the | |
|
490 | # old .pid files. | |
|
491 | self.remove_pid_file() | |
|
492 | ||
|
493 | ||
|
494 | def launch_new_instance(): | |
|
495 | """Create and run the IPython cluster.""" | |
|
496 | app = IPClusterApp() | |
|
497 | app.start() | |
|
498 | ||
|
499 | ||
|
500 | if __name__ == '__main__': | |
|
501 | launch_new_instance() | |
|
502 |
This diff has been collapsed as it changes many lines, (824 lines changed) Show them Hide them | |||
@@ -0,0 +1,824 b'' | |||
|
1 | #!/usr/bin/env python | |
|
2 | # encoding: utf-8 | |
|
3 | """ | |
|
4 | Facilities for launching IPython processes asynchronously. | |
|
5 | """ | |
|
6 | ||
|
7 | #----------------------------------------------------------------------------- | |
|
8 | # Copyright (C) 2008-2009 The IPython Development Team | |
|
9 | # | |
|
10 | # Distributed under the terms of the BSD License. The full license is in | |
|
11 | # the file COPYING, distributed as part of this software. | |
|
12 | #----------------------------------------------------------------------------- | |
|
13 | ||
|
14 | #----------------------------------------------------------------------------- | |
|
15 | # Imports | |
|
16 | #----------------------------------------------------------------------------- | |
|
17 | ||
|
18 | import os | |
|
19 | import re | |
|
20 | import sys | |
|
21 | import logging | |
|
22 | ||
|
23 | from signal import SIGINT | |
|
24 | try: | |
|
25 | from signal import SIGKILL | |
|
26 | except ImportError: | |
|
27 | SIGKILL=SIGTERM | |
|
28 | ||
|
29 | from subprocess import Popen, PIPE | |
|
30 | ||
|
31 | from zmq.eventloop import ioloop | |
|
32 | ||
|
33 | from IPython.config.configurable import Configurable | |
|
34 | from IPython.utils.traitlets import Str, Int, List, Unicode, Instance | |
|
35 | from IPython.utils.path import get_ipython_module_path | |
|
36 | from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError | |
|
37 | ||
|
38 | # from IPython.kernel.winhpcjob import ( | |
|
39 | # IPControllerTask, IPEngineTask, | |
|
40 | # IPControllerJob, IPEngineSetJob | |
|
41 | # ) | |
|
42 | ||
|
43 | ||
|
44 | #----------------------------------------------------------------------------- | |
|
45 | # Paths to the kernel apps | |
|
46 | #----------------------------------------------------------------------------- | |
|
47 | ||
|
48 | ||
|
49 | ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path( | |
|
50 | 'IPython.zmq.parallel.ipclusterapp' | |
|
51 | )) | |
|
52 | ||
|
53 | ipengine_cmd_argv = pycmd2argv(get_ipython_module_path( | |
|
54 | 'IPython.zmq.parallel.ipengineapp' | |
|
55 | )) | |
|
56 | ||
|
57 | ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path( | |
|
58 | 'IPython.zmq.parallel.ipcontrollerapp' | |
|
59 | )) | |
|
60 | ||
|
61 | #----------------------------------------------------------------------------- | |
|
62 | # Base launchers and errors | |
|
63 | #----------------------------------------------------------------------------- | |
|
64 | ||
|
65 | ||
|
66 | class LauncherError(Exception): | |
|
67 | pass | |
|
68 | ||
|
69 | ||
|
70 | class ProcessStateError(LauncherError): | |
|
71 | pass | |
|
72 | ||
|
73 | ||
|
74 | class UnknownStatus(LauncherError): | |
|
75 | pass | |
|
76 | ||
|
77 | ||
|
78 | class BaseLauncher(Configurable): | |
|
79 | """An asbtraction for starting, stopping and signaling a process.""" | |
|
80 | ||
|
81 | # In all of the launchers, the work_dir is where child processes will be | |
|
82 | # run. This will usually be the cluster_dir, but may not be. any work_dir | |
|
83 | # passed into the __init__ method will override the config value. | |
|
84 | # This should not be used to set the work_dir for the actual engine | |
|
85 | # and controller. Instead, use their own config files or the | |
|
86 | # controller_args, engine_args attributes of the launchers to add | |
|
87 | # the --work-dir option. | |
|
88 | work_dir = Unicode(u'.') | |
|
89 | loop = Instance('zmq.eventloop.ioloop.IOLoop') | |
|
90 | def _loop_default(self): | |
|
91 | return ioloop.IOLoop.instance() | |
|
92 | ||
|
93 | def __init__(self, work_dir=u'.', config=None): | |
|
94 | super(BaseLauncher, self).__init__(work_dir=work_dir, config=config) | |
|
95 | self.state = 'before' # can be before, running, after | |
|
96 | self.stop_callbacks = [] | |
|
97 | self.start_data = None | |
|
98 | self.stop_data = None | |
|
99 | ||
|
100 | @property | |
|
101 | def args(self): | |
|
102 | """A list of cmd and args that will be used to start the process. | |
|
103 | ||
|
104 | This is what is passed to :func:`spawnProcess` and the first element | |
|
105 | will be the process name. | |
|
106 | """ | |
|
107 | return self.find_args() | |
|
108 | ||
|
109 | def find_args(self): | |
|
110 | """The ``.args`` property calls this to find the args list. | |
|
111 | ||
|
112 | Subcommand should implement this to construct the cmd and args. | |
|
113 | """ | |
|
114 | raise NotImplementedError('find_args must be implemented in a subclass') | |
|
115 | ||
|
116 | @property | |
|
117 | def arg_str(self): | |
|
118 | """The string form of the program arguments.""" | |
|
119 | return ' '.join(self.args) | |
|
120 | ||
|
121 | @property | |
|
122 | def running(self): | |
|
123 | """Am I running.""" | |
|
124 | if self.state == 'running': | |
|
125 | return True | |
|
126 | else: | |
|
127 | return False | |
|
128 | ||
|
129 | def start(self): | |
|
130 | """Start the process. | |
|
131 | ||
|
132 | This must return a deferred that fires with information about the | |
|
133 | process starting (like a pid, job id, etc.). | |
|
134 | """ | |
|
135 | raise NotImplementedError('start must be implemented in a subclass') | |
|
136 | ||
|
137 | def stop(self): | |
|
138 | """Stop the process and notify observers of stopping. | |
|
139 | ||
|
140 | This must return a deferred that fires with information about the | |
|
141 | processing stopping, like errors that occur while the process is | |
|
142 | attempting to be shut down. This deferred won't fire when the process | |
|
143 | actually stops. To observe the actual process stopping, see | |
|
144 | :func:`observe_stop`. | |
|
145 | """ | |
|
146 | raise NotImplementedError('stop must be implemented in a subclass') | |
|
147 | ||
|
148 | def on_stop(self, f): | |
|
149 | """Get a deferred that will fire when the process stops. | |
|
150 | ||
|
151 | The deferred will fire with data that contains information about | |
|
152 | the exit status of the process. | |
|
153 | """ | |
|
154 | if self.state=='after': | |
|
155 | return f(self.stop_data) | |
|
156 | else: | |
|
157 | self.stop_callbacks.append(f) | |
|
158 | ||
|
159 | def notify_start(self, data): | |
|
160 | """Call this to trigger startup actions. | |
|
161 | ||
|
162 | This logs the process startup and sets the state to 'running'. It is | |
|
163 | a pass-through so it can be used as a callback. | |
|
164 | """ | |
|
165 | ||
|
166 | logging.info('Process %r started: %r' % (self.args[0], data)) | |
|
167 | self.start_data = data | |
|
168 | self.state = 'running' | |
|
169 | return data | |
|
170 | ||
|
171 | def notify_stop(self, data): | |
|
172 | """Call this to trigger process stop actions. | |
|
173 | ||
|
174 | This logs the process stopping and sets the state to 'after'. Call | |
|
175 | this to trigger all the deferreds from :func:`observe_stop`.""" | |
|
176 | ||
|
177 | logging.info('Process %r stopped: %r' % (self.args[0], data)) | |
|
178 | self.stop_data = data | |
|
179 | self.state = 'after' | |
|
180 | for i in range(len(self.stop_callbacks)): | |
|
181 | d = self.stop_callbacks.pop() | |
|
182 | d(data) | |
|
183 | return data | |
|
184 | ||
|
185 | def signal(self, sig): | |
|
186 | """Signal the process. | |
|
187 | ||
|
188 | Return a semi-meaningless deferred after signaling the process. | |
|
189 | ||
|
190 | Parameters | |
|
191 | ---------- | |
|
192 | sig : str or int | |
|
193 | 'KILL', 'INT', etc., or any signal number | |
|
194 | """ | |
|
195 | raise NotImplementedError('signal must be implemented in a subclass') | |
|
196 | ||
|
197 | ||
|
198 | #----------------------------------------------------------------------------- | |
|
199 | # Local process launchers | |
|
200 | #----------------------------------------------------------------------------- | |
|
201 | ||
|
202 | ||
|
203 | class LocalProcessLauncher(BaseLauncher): | |
|
204 | """Start and stop an external process in an asynchronous manner. | |
|
205 | ||
|
206 | This will launch the external process with a working directory of | |
|
207 | ``self.work_dir``. | |
|
208 | """ | |
|
209 | ||
|
210 | # This is used to to construct self.args, which is passed to | |
|
211 | # spawnProcess. | |
|
212 | cmd_and_args = List([]) | |
|
213 | poll_frequency = Int(100) # in ms | |
|
214 | ||
|
215 | def __init__(self, work_dir=u'.', config=None): | |
|
216 | super(LocalProcessLauncher, self).__init__( | |
|
217 | work_dir=work_dir, config=config | |
|
218 | ) | |
|
219 | self.process = None | |
|
220 | self.start_deferred = None | |
|
221 | self.poller = None | |
|
222 | ||
|
223 | def find_args(self): | |
|
224 | return self.cmd_and_args | |
|
225 | ||
|
226 | def start(self): | |
|
227 | if self.state == 'before': | |
|
228 | self.process = Popen(self.args, | |
|
229 | stdout=PIPE,stderr=PIPE,stdin=PIPE, | |
|
230 | env=os.environ, | |
|
231 | cwd=self.work_dir | |
|
232 | ) | |
|
233 | ||
|
234 | self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ) | |
|
235 | self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ) | |
|
236 | self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop) | |
|
237 | self.poller.start() | |
|
238 | self.notify_start(self.process.pid) | |
|
239 | else: | |
|
240 | s = 'The process was already started and has state: %r' % self.state | |
|
241 | raise ProcessStateError(s) | |
|
242 | ||
|
243 | def stop(self): | |
|
244 | return self.interrupt_then_kill() | |
|
245 | ||
|
246 | def signal(self, sig): | |
|
247 | if self.state == 'running': | |
|
248 | self.process.send_signal(sig) | |
|
249 | ||
|
250 | def interrupt_then_kill(self, delay=2.0): | |
|
251 | """Send INT, wait a delay and then send KILL.""" | |
|
252 | self.signal(SIGINT) | |
|
253 | self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop) | |
|
254 | self.killer.start() | |
|
255 | ||
|
256 | # callbacks, etc: | |
|
257 | ||
|
258 | def handle_stdout(self, fd, events): | |
|
259 | line = self.process.stdout.readline() | |
|
260 | # a stopped process will be readable but return empty strings | |
|
261 | if line: | |
|
262 | logging.info(line[:-1]) | |
|
263 | else: | |
|
264 | self.poll() | |
|
265 | ||
|
266 | def handle_stderr(self, fd, events): | |
|
267 | line = self.process.stderr.readline() | |
|
268 | # a stopped process will be readable but return empty strings | |
|
269 | if line: | |
|
270 | logging.error(line[:-1]) | |
|
271 | else: | |
|
272 | self.poll() | |
|
273 | ||
|
274 | def poll(self): | |
|
275 | status = self.process.poll() | |
|
276 | if status is not None: | |
|
277 | self.poller.stop() | |
|
278 | self.loop.remove_handler(self.process.stdout.fileno()) | |
|
279 | self.loop.remove_handler(self.process.stderr.fileno()) | |
|
280 | self.notify_stop(dict(exit_code=status, pid=self.process.pid)) | |
|
281 | return status | |
|
282 | ||
|
283 | class LocalControllerLauncher(LocalProcessLauncher): | |
|
284 | """Launch a controller as a regular external process.""" | |
|
285 | ||
|
286 | controller_cmd = List(ipcontroller_cmd_argv, config=True) | |
|
287 | # Command line arguments to ipcontroller. | |
|
288 | controller_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True) | |
|
289 | ||
|
290 | def find_args(self): | |
|
291 | return self.controller_cmd + self.controller_args | |
|
292 | ||
|
293 | def start(self, cluster_dir): | |
|
294 | """Start the controller by cluster_dir.""" | |
|
295 | self.controller_args.extend(['--cluster-dir', cluster_dir]) | |
|
296 | self.cluster_dir = unicode(cluster_dir) | |
|
297 | logging.info("Starting LocalControllerLauncher: %r" % self.args) | |
|
298 | return super(LocalControllerLauncher, self).start() | |
|
299 | ||
|
300 | ||
|
301 | class LocalEngineLauncher(LocalProcessLauncher): | |
|
302 | """Launch a single engine as a regular externall process.""" | |
|
303 | ||
|
304 | engine_cmd = List(ipengine_cmd_argv, config=True) | |
|
305 | # Command line arguments for ipengine. | |
|
306 | engine_args = List( | |
|
307 | ['--log-to-file','--log-level', str(logging.ERROR)], config=True | |
|
308 | ) | |
|
309 | ||
|
310 | def find_args(self): | |
|
311 | return self.engine_cmd + self.engine_args | |
|
312 | ||
|
313 | def start(self, cluster_dir): | |
|
314 | """Start the engine by cluster_dir.""" | |
|
315 | self.engine_args.extend(['--cluster-dir', cluster_dir]) | |
|
316 | self.cluster_dir = unicode(cluster_dir) | |
|
317 | return super(LocalEngineLauncher, self).start() | |
|
318 | ||
|
319 | ||
|
320 | class LocalEngineSetLauncher(BaseLauncher): | |
|
321 | """Launch a set of engines as regular external processes.""" | |
|
322 | ||
|
323 | # Command line arguments for ipengine. | |
|
324 | engine_args = List( | |
|
325 | ['--log-to-file','--log-level', str(logging.ERROR)], config=True | |
|
326 | ) | |
|
327 | # launcher class | |
|
328 | launcher_class = LocalEngineLauncher | |
|
329 | ||
|
330 | def __init__(self, work_dir=u'.', config=None): | |
|
331 | super(LocalEngineSetLauncher, self).__init__( | |
|
332 | work_dir=work_dir, config=config | |
|
333 | ) | |
|
334 | self.launchers = {} | |
|
335 | self.stop_data = {} | |
|
336 | ||
|
337 | def start(self, n, cluster_dir): | |
|
338 | """Start n engines by profile or cluster_dir.""" | |
|
339 | self.cluster_dir = unicode(cluster_dir) | |
|
340 | dlist = [] | |
|
341 | for i in range(n): | |
|
342 | el = self.launcher_class(work_dir=self.work_dir, config=self.config) | |
|
343 | # Copy the engine args over to each engine launcher. | |
|
344 | import copy | |
|
345 | el.engine_args = copy.deepcopy(self.engine_args) | |
|
346 | el.on_stop(self._notice_engine_stopped) | |
|
347 | d = el.start(cluster_dir) | |
|
348 | if i==0: | |
|
349 | logging.info("Starting LocalEngineSetLauncher: %r" % el.args) | |
|
350 | self.launchers[i] = el | |
|
351 | dlist.append(d) | |
|
352 | self.notify_start(dlist) | |
|
353 | # The consumeErrors here could be dangerous | |
|
354 | # dfinal = gatherBoth(dlist, consumeErrors=True) | |
|
355 | # dfinal.addCallback(self.notify_start) | |
|
356 | return dlist | |
|
357 | ||
|
358 | def find_args(self): | |
|
359 | return ['engine set'] | |
|
360 | ||
|
361 | def signal(self, sig): | |
|
362 | dlist = [] | |
|
363 | for el in self.launchers.itervalues(): | |
|
364 | d = el.signal(sig) | |
|
365 | dlist.append(d) | |
|
366 | # dfinal = gatherBoth(dlist, consumeErrors=True) | |
|
367 | return dlist | |
|
368 | ||
|
369 | def interrupt_then_kill(self, delay=1.0): | |
|
370 | dlist = [] | |
|
371 | for el in self.launchers.itervalues(): | |
|
372 | d = el.interrupt_then_kill(delay) | |
|
373 | dlist.append(d) | |
|
374 | # dfinal = gatherBoth(dlist, consumeErrors=True) | |
|
375 | return dlist | |
|
376 | ||
|
377 | def stop(self): | |
|
378 | return self.interrupt_then_kill() | |
|
379 | ||
|
380 | def _notice_engine_stopped(self, data): | |
|
381 | print "notice", data | |
|
382 | pid = data['pid'] | |
|
383 | for idx,el in self.launchers.iteritems(): | |
|
384 | if el.process.pid == pid: | |
|
385 | break | |
|
386 | self.launchers.pop(idx) | |
|
387 | self.stop_data[idx] = data | |
|
388 | if not self.launchers: | |
|
389 | self.notify_stop(self.stop_data) | |
|
390 | ||
|
391 | ||
|
392 | #----------------------------------------------------------------------------- | |
|
393 | # MPIExec launchers | |
|
394 | #----------------------------------------------------------------------------- | |
|
395 | ||
|
396 | ||
|
397 | class MPIExecLauncher(LocalProcessLauncher): | |
|
398 | """Launch an external process using mpiexec.""" | |
|
399 | ||
|
400 | # The mpiexec command to use in starting the process. | |
|
401 | mpi_cmd = List(['mpiexec'], config=True) | |
|
402 | # The command line arguments to pass to mpiexec. | |
|
403 | mpi_args = List([], config=True) | |
|
404 | # The program to start using mpiexec. | |
|
405 | program = List(['date'], config=True) | |
|
406 | # The command line argument to the program. | |
|
407 | program_args = List([], config=True) | |
|
408 | # The number of instances of the program to start. | |
|
409 | n = Int(1, config=True) | |
|
410 | ||
|
411 | def find_args(self): | |
|
412 | """Build self.args using all the fields.""" | |
|
413 | return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ | |
|
414 | self.program + self.program_args | |
|
415 | ||
|
416 | def start(self, n): | |
|
417 | """Start n instances of the program using mpiexec.""" | |
|
418 | self.n = n | |
|
419 | return super(MPIExecLauncher, self).start() | |
|
420 | ||
|
421 | ||
|
422 | class MPIExecControllerLauncher(MPIExecLauncher): | |
|
423 | """Launch a controller using mpiexec.""" | |
|
424 | ||
|
425 | controller_cmd = List(ipcontroller_cmd_argv, config=True) | |
|
426 | # Command line arguments to ipcontroller. | |
|
427 | controller_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True) | |
|
428 | n = Int(1, config=False) | |
|
429 | ||
|
430 | def start(self, cluster_dir): | |
|
431 | """Start the controller by cluster_dir.""" | |
|
432 | self.controller_args.extend(['--cluster-dir', cluster_dir]) | |
|
433 | self.cluster_dir = unicode(cluster_dir) | |
|
434 | logging.info("Starting MPIExecControllerLauncher: %r" % self.args) | |
|
435 | return super(MPIExecControllerLauncher, self).start(1) | |
|
436 | ||
|
437 | def find_args(self): | |
|
438 | return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ | |
|
439 | self.controller_cmd + self.controller_args | |
|
440 | ||
|
441 | ||
|
442 | class MPIExecEngineSetLauncher(MPIExecLauncher): | |
|
443 | ||
|
444 | engine_cmd = List(ipengine_cmd_argv, config=True) | |
|
445 | # Command line arguments for ipengine. | |
|
446 | engine_args = List( | |
|
447 | ['--log-to-file','--log-level', str(logging.ERROR)], config=True | |
|
448 | ) | |
|
449 | n = Int(1, config=True) | |
|
450 | ||
|
451 | def start(self, n, cluster_dir): | |
|
452 | """Start n engines by profile or cluster_dir.""" | |
|
453 | self.engine_args.extend(['--cluster-dir', cluster_dir]) | |
|
454 | self.cluster_dir = unicode(cluster_dir) | |
|
455 | self.n = n | |
|
456 | logging.info('Starting MPIExecEngineSetLauncher: %r' % self.args) | |
|
457 | return super(MPIExecEngineSetLauncher, self).start(n) | |
|
458 | ||
|
459 | def find_args(self): | |
|
460 | return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ | |
|
461 | self.engine_cmd + self.engine_args | |
|
462 | ||
|
463 | ||
|
464 | #----------------------------------------------------------------------------- | |
|
465 | # SSH launchers | |
|
466 | #----------------------------------------------------------------------------- | |
|
467 | ||
|
468 | # TODO: Get SSH Launcher working again. | |
|
469 | ||
|
470 | class SSHLauncher(LocalProcessLauncher): | |
|
471 | """A minimal launcher for ssh. | |
|
472 | ||
|
473 | To be useful this will probably have to be extended to use the ``sshx`` | |
|
474 | idea for environment variables. There could be other things this needs | |
|
475 | as well. | |
|
476 | """ | |
|
477 | ||
|
478 | ssh_cmd = List(['ssh'], config=True) | |
|
479 | ssh_args = List([], config=True) | |
|
480 | program = List(['date'], config=True) | |
|
481 | program_args = List([], config=True) | |
|
482 | hostname = Str('', config=True) | |
|
483 | user = Str(os.environ.get('USER','username'), config=True) | |
|
484 | location = Str('') | |
|
485 | ||
|
486 | def _hostname_changed(self, name, old, new): | |
|
487 | self.location = '%s@%s' % (self.user, new) | |
|
488 | ||
|
489 | def _user_changed(self, name, old, new): | |
|
490 | self.location = '%s@%s' % (new, self.hostname) | |
|
491 | ||
|
492 | def find_args(self): | |
|
493 | return self.ssh_cmd + self.ssh_args + [self.location] + \ | |
|
494 | self.program + self.program_args | |
|
495 | ||
|
496 | def start(self, cluster_dir, hostname=None, user=None): | |
|
497 | if hostname is not None: | |
|
498 | self.hostname = hostname | |
|
499 | if user is not None: | |
|
500 | self.user = user | |
|
501 | return super(SSHLauncher, self).start() | |
|
502 | ||
|
503 | ||
|
504 | class SSHControllerLauncher(SSHLauncher): | |
|
505 | ||
|
506 | program = List(ipcontroller_cmd_argv, config=True) | |
|
507 | # Command line arguments to ipcontroller. | |
|
508 | program_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True) | |
|
509 | ||
|
510 | ||
|
511 | class SSHEngineLauncher(SSHLauncher): | |
|
512 | program = List(ipengine_cmd_argv, config=True) | |
|
513 | # Command line arguments for ipengine. | |
|
514 | program_args = List( | |
|
515 | ['--log-to-file','--log-level', str(logging.ERROR)], config=True | |
|
516 | ) | |
|
517 | ||
|
518 | class SSHEngineSetLauncher(LocalEngineSetLauncher): | |
|
519 | launcher_class = SSHEngineLauncher | |
|
520 | ||
|
521 | ||
|
522 | #----------------------------------------------------------------------------- | |
|
523 | # Windows HPC Server 2008 scheduler launchers | |
|
524 | #----------------------------------------------------------------------------- | |
|
525 | ||
|
526 | ||
|
527 | # # This is only used on Windows. | |
|
528 | # def find_job_cmd(): | |
|
529 | # if os.name=='nt': | |
|
530 | # try: | |
|
531 | # return find_cmd('job') | |
|
532 | # except FindCmdError: | |
|
533 | # return 'job' | |
|
534 | # else: | |
|
535 | # return 'job' | |
|
536 | # | |
|
537 | # | |
|
538 | # class WindowsHPCLauncher(BaseLauncher): | |
|
539 | # | |
|
540 | # # A regular expression used to get the job id from the output of the | |
|
541 | # # submit_command. | |
|
542 | # job_id_regexp = Str(r'\d+', config=True) | |
|
543 | # # The filename of the instantiated job script. | |
|
544 | # job_file_name = Unicode(u'ipython_job.xml', config=True) | |
|
545 | # # The full path to the instantiated job script. This gets made dynamically | |
|
546 | # # by combining the work_dir with the job_file_name. | |
|
547 | # job_file = Unicode(u'') | |
|
548 | # # The hostname of the scheduler to submit the job to | |
|
549 | # scheduler = Str('', config=True) | |
|
550 | # job_cmd = Str(find_job_cmd(), config=True) | |
|
551 | # | |
|
552 | # def __init__(self, work_dir=u'.', config=None): | |
|
553 | # super(WindowsHPCLauncher, self).__init__( | |
|
554 | # work_dir=work_dir, config=config | |
|
555 | # ) | |
|
556 | # | |
|
557 | # @property | |
|
558 | # def job_file(self): | |
|
559 | # return os.path.join(self.work_dir, self.job_file_name) | |
|
560 | # | |
|
561 | # def write_job_file(self, n): | |
|
562 | # raise NotImplementedError("Implement write_job_file in a subclass.") | |
|
563 | # | |
|
564 | # def find_args(self): | |
|
565 | # return ['job.exe'] | |
|
566 | # | |
|
567 | # def parse_job_id(self, output): | |
|
568 | # """Take the output of the submit command and return the job id.""" | |
|
569 | # m = re.search(self.job_id_regexp, output) | |
|
570 | # if m is not None: | |
|
571 | # job_id = m.group() | |
|
572 | # else: | |
|
573 | # raise LauncherError("Job id couldn't be determined: %s" % output) | |
|
574 | # self.job_id = job_id | |
|
575 | # logging.info('Job started with job id: %r' % job_id) | |
|
576 | # return job_id | |
|
577 | # | |
|
578 | # @inlineCallbacks | |
|
579 | # def start(self, n): | |
|
580 | # """Start n copies of the process using the Win HPC job scheduler.""" | |
|
581 | # self.write_job_file(n) | |
|
582 | # args = [ | |
|
583 | # 'submit', | |
|
584 | # '/jobfile:%s' % self.job_file, | |
|
585 | # '/scheduler:%s' % self.scheduler | |
|
586 | # ] | |
|
587 | # logging.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),)) | |
|
588 | # # Twisted will raise DeprecationWarnings if we try to pass unicode to this | |
|
589 | # output = yield getProcessOutput(str(self.job_cmd), | |
|
590 | # [str(a) for a in args], | |
|
591 | # env=dict((str(k),str(v)) for k,v in os.environ.items()), | |
|
592 | # path=self.work_dir | |
|
593 | # ) | |
|
594 | # job_id = self.parse_job_id(output) | |
|
595 | # self.notify_start(job_id) | |
|
596 | # defer.returnValue(job_id) | |
|
597 | # | |
|
598 | # @inlineCallbacks | |
|
599 | # def stop(self): | |
|
600 | # args = [ | |
|
601 | # 'cancel', | |
|
602 | # self.job_id, | |
|
603 | # '/scheduler:%s' % self.scheduler | |
|
604 | # ] | |
|
605 | # logging.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),)) | |
|
606 | # try: | |
|
607 | # # Twisted will raise DeprecationWarnings if we try to pass unicode to this | |
|
608 | # output = yield getProcessOutput(str(self.job_cmd), | |
|
609 | # [str(a) for a in args], | |
|
610 | # env=dict((str(k),str(v)) for k,v in os.environ.iteritems()), | |
|
611 | # path=self.work_dir | |
|
612 | # ) | |
|
613 | # except: | |
|
614 | # output = 'The job already appears to be stoppped: %r' % self.job_id | |
|
615 | # self.notify_stop(output) # Pass the output of the kill cmd | |
|
616 | # defer.returnValue(output) | |
|
617 | # | |
|
618 | # | |
|
619 | # class WindowsHPCControllerLauncher(WindowsHPCLauncher): | |
|
620 | # | |
|
621 | # job_file_name = Unicode(u'ipcontroller_job.xml', config=True) | |
|
622 | # extra_args = List([], config=False) | |
|
623 | # | |
|
624 | # def write_job_file(self, n): | |
|
625 | # job = IPControllerJob(config=self.config) | |
|
626 | # | |
|
627 | # t = IPControllerTask(config=self.config) | |
|
628 | # # The tasks work directory is *not* the actual work directory of | |
|
629 | # # the controller. It is used as the base path for the stdout/stderr | |
|
630 | # # files that the scheduler redirects to. | |
|
631 | # t.work_directory = self.cluster_dir | |
|
632 | # # Add the --cluster-dir and from self.start(). | |
|
633 | # t.controller_args.extend(self.extra_args) | |
|
634 | # job.add_task(t) | |
|
635 | # | |
|
636 | # logging.info("Writing job description file: %s" % self.job_file) | |
|
637 | # job.write(self.job_file) | |
|
638 | # | |
|
639 | # @property | |
|
640 | # def job_file(self): | |
|
641 | # return os.path.join(self.cluster_dir, self.job_file_name) | |
|
642 | # | |
|
643 | # def start(self, cluster_dir): | |
|
644 | # """Start the controller by cluster_dir.""" | |
|
645 | # self.extra_args = ['--cluster-dir', cluster_dir] | |
|
646 | # self.cluster_dir = unicode(cluster_dir) | |
|
647 | # return super(WindowsHPCControllerLauncher, self).start(1) | |
|
648 | # | |
|
649 | # | |
|
650 | # class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): | |
|
651 | # | |
|
652 | # job_file_name = Unicode(u'ipengineset_job.xml', config=True) | |
|
653 | # extra_args = List([], config=False) | |
|
654 | # | |
|
655 | # def write_job_file(self, n): | |
|
656 | # job = IPEngineSetJob(config=self.config) | |
|
657 | # | |
|
658 | # for i in range(n): | |
|
659 | # t = IPEngineTask(config=self.config) | |
|
660 | # # The tasks work directory is *not* the actual work directory of | |
|
661 | # # the engine. It is used as the base path for the stdout/stderr | |
|
662 | # # files that the scheduler redirects to. | |
|
663 | # t.work_directory = self.cluster_dir | |
|
664 | # # Add the --cluster-dir and from self.start(). | |
|
665 | # t.engine_args.extend(self.extra_args) | |
|
666 | # job.add_task(t) | |
|
667 | # | |
|
668 | # logging.info("Writing job description file: %s" % self.job_file) | |
|
669 | # job.write(self.job_file) | |
|
670 | # | |
|
671 | # @property | |
|
672 | # def job_file(self): | |
|
673 | # return os.path.join(self.cluster_dir, self.job_file_name) | |
|
674 | # | |
|
675 | # def start(self, n, cluster_dir): | |
|
676 | # """Start the controller by cluster_dir.""" | |
|
677 | # self.extra_args = ['--cluster-dir', cluster_dir] | |
|
678 | # self.cluster_dir = unicode(cluster_dir) | |
|
679 | # return super(WindowsHPCEngineSetLauncher, self).start(n) | |
|
680 | # | |
|
681 | # | |
|
682 | # #----------------------------------------------------------------------------- | |
|
683 | # # Batch (PBS) system launchers | |
|
684 | # #----------------------------------------------------------------------------- | |
|
685 | # | |
|
686 | # # TODO: Get PBS launcher working again. | |
|
687 | # | |
|
688 | # class BatchSystemLauncher(BaseLauncher): | |
|
689 | # """Launch an external process using a batch system. | |
|
690 | # | |
|
691 | # This class is designed to work with UNIX batch systems like PBS, LSF, | |
|
692 | # GridEngine, etc. The overall model is that there are different commands | |
|
693 | # like qsub, qdel, etc. that handle the starting and stopping of the process. | |
|
694 | # | |
|
695 | # This class also has the notion of a batch script. The ``batch_template`` | |
|
696 | # attribute can be set to a string that is a template for the batch script. | |
|
697 | # This template is instantiated using Itpl. Thus the template can use | |
|
698 | # ${n} fot the number of instances. Subclasses can add additional variables | |
|
699 | # to the template dict. | |
|
700 | # """ | |
|
701 | # | |
|
702 | # # Subclasses must fill these in. See PBSEngineSet | |
|
703 | # # The name of the command line program used to submit jobs. | |
|
704 | # submit_command = Str('', config=True) | |
|
705 | # # The name of the command line program used to delete jobs. | |
|
706 | # delete_command = Str('', config=True) | |
|
707 | # # A regular expression used to get the job id from the output of the | |
|
708 | # # submit_command. | |
|
709 | # job_id_regexp = Str('', config=True) | |
|
710 | # # The string that is the batch script template itself. | |
|
711 | # batch_template = Str('', config=True) | |
|
712 | # # The filename of the instantiated batch script. | |
|
713 | # batch_file_name = Unicode(u'batch_script', config=True) | |
|
714 | # # The full path to the instantiated batch script. | |
|
715 | # batch_file = Unicode(u'') | |
|
716 | # | |
|
717 | # def __init__(self, work_dir=u'.', config=None): | |
|
718 | # super(BatchSystemLauncher, self).__init__( | |
|
719 | # work_dir=work_dir, config=config | |
|
720 | # ) | |
|
721 | # self.batch_file = os.path.join(self.work_dir, self.batch_file_name) | |
|
722 | # self.context = {} | |
|
723 | # | |
|
724 | # def parse_job_id(self, output): | |
|
725 | # """Take the output of the submit command and return the job id.""" | |
|
726 | # m = re.match(self.job_id_regexp, output) | |
|
727 | # if m is not None: | |
|
728 | # job_id = m.group() | |
|
729 | # else: | |
|
730 | # raise LauncherError("Job id couldn't be determined: %s" % output) | |
|
731 | # self.job_id = job_id | |
|
732 | # logging.info('Job started with job id: %r' % job_id) | |
|
733 | # return job_id | |
|
734 | # | |
|
735 | # def write_batch_script(self, n): | |
|
736 | # """Instantiate and write the batch script to the work_dir.""" | |
|
737 | # self.context['n'] = n | |
|
738 | # script_as_string = Itpl.itplns(self.batch_template, self.context) | |
|
739 | # logging.info('Writing instantiated batch script: %s' % self.batch_file) | |
|
740 | # f = open(self.batch_file, 'w') | |
|
741 | # f.write(script_as_string) | |
|
742 | # f.close() | |
|
743 | # | |
|
744 | # @inlineCallbacks | |
|
745 | # def start(self, n): | |
|
746 | # """Start n copies of the process using a batch system.""" | |
|
747 | # self.write_batch_script(n) | |
|
748 | # output = yield getProcessOutput(self.submit_command, | |
|
749 | # [self.batch_file], env=os.environ) | |
|
750 | # job_id = self.parse_job_id(output) | |
|
751 | # self.notify_start(job_id) | |
|
752 | # defer.returnValue(job_id) | |
|
753 | # | |
|
754 | # @inlineCallbacks | |
|
755 | # def stop(self): | |
|
756 | # output = yield getProcessOutput(self.delete_command, | |
|
757 | # [self.job_id], env=os.environ | |
|
758 | # ) | |
|
759 | # self.notify_stop(output) # Pass the output of the kill cmd | |
|
760 | # defer.returnValue(output) | |
|
761 | # | |
|
762 | # | |
|
763 | # class PBSLauncher(BatchSystemLauncher): | |
|
764 | # """A BatchSystemLauncher subclass for PBS.""" | |
|
765 | # | |
|
766 | # submit_command = Str('qsub', config=True) | |
|
767 | # delete_command = Str('qdel', config=True) | |
|
768 | # job_id_regexp = Str(r'\d+', config=True) | |
|
769 | # batch_template = Str('', config=True) | |
|
770 | # batch_file_name = Unicode(u'pbs_batch_script', config=True) | |
|
771 | # batch_file = Unicode(u'') | |
|
772 | # | |
|
773 | # | |
|
774 | # class PBSControllerLauncher(PBSLauncher): | |
|
775 | # """Launch a controller using PBS.""" | |
|
776 | # | |
|
777 | # batch_file_name = Unicode(u'pbs_batch_script_controller', config=True) | |
|
778 | # | |
|
779 | # def start(self, cluster_dir): | |
|
780 | # """Start the controller by profile or cluster_dir.""" | |
|
781 | # # Here we save profile and cluster_dir in the context so they | |
|
782 | # # can be used in the batch script template as ${profile} and | |
|
783 | # # ${cluster_dir} | |
|
784 | # self.context['cluster_dir'] = cluster_dir | |
|
785 | # self.cluster_dir = unicode(cluster_dir) | |
|
786 | # logging.info("Starting PBSControllerLauncher: %r" % self.args) | |
|
787 | # return super(PBSControllerLauncher, self).start(1) | |
|
788 | # | |
|
789 | # | |
|
790 | # class PBSEngineSetLauncher(PBSLauncher): | |
|
791 | # | |
|
792 | # batch_file_name = Unicode(u'pbs_batch_script_engines', config=True) | |
|
793 | # | |
|
794 | # def start(self, n, cluster_dir): | |
|
795 | # """Start n engines by profile or cluster_dir.""" | |
|
796 | # self.program_args.extend(['--cluster-dir', cluster_dir]) | |
|
797 | # self.cluster_dir = unicode(cluster_dir) | |
|
798 | # logging.info('Starting PBSEngineSetLauncher: %r' % self.args) | |
|
799 | # return super(PBSEngineSetLauncher, self).start(n) | |
|
800 | ||
|
801 | ||
|
802 | #----------------------------------------------------------------------------- | |
|
803 | # A launcher for ipcluster itself! | |
|
804 | #----------------------------------------------------------------------------- | |
|
805 | ||
|
806 | ||
|
807 | class IPClusterLauncher(LocalProcessLauncher): | |
|
808 | """Launch the ipcluster program in an external process.""" | |
|
809 | ||
|
810 | ipcluster_cmd = List(ipcluster_cmd_argv, config=True) | |
|
811 | # Command line arguments to pass to ipcluster. | |
|
812 | ipcluster_args = List( | |
|
813 | ['--clean-logs', '--log-to-file', '--log-level', str(logging.ERROR)], config=True) | |
|
814 | ipcluster_subcommand = Str('start') | |
|
815 | ipcluster_n = Int(2) | |
|
816 | ||
|
817 | def find_args(self): | |
|
818 | return self.ipcluster_cmd + [self.ipcluster_subcommand] + \ | |
|
819 | ['-n', repr(self.ipcluster_n)] + self.ipcluster_args | |
|
820 | ||
|
821 | def start(self): | |
|
822 | logging.info("Starting ipcluster: %r" % self.args) | |
|
823 | return super(IPClusterLauncher, self).start() | |
|
824 |
@@ -90,14 +90,6 b' def defaultblock(f, self, *args, **kwargs):' | |||
|
90 | 90 | # Classes |
|
91 | 91 | #-------------------------------------------------------------------------- |
|
92 | 92 | |
|
93 | class ResultDict(dict): | |
|
94 | """A subclass of dict that raises errors if it has them.""" | |
|
95 | def __getitem__(self, key): | |
|
96 | res = dict.__getitem__(self, key) | |
|
97 | if isinstance(res, error.KernelError): | |
|
98 | raise res | |
|
99 | return res | |
|
100 | ||
|
101 | 93 | class Metadata(dict): |
|
102 | 94 | """Subclass of dict for initializing metadata values.""" |
|
103 | 95 | def __init__(self, *args, **kwargs): |
@@ -35,18 +35,6 b' from IPython.utils.path import (' | |||
|
35 | 35 | from IPython.utils.traitlets import Unicode |
|
36 | 36 | |
|
37 | 37 | #----------------------------------------------------------------------------- |
|
38 | # Warnings control | |
|
39 | #----------------------------------------------------------------------------- | |
|
40 | # Twisted generates annoying warnings with Python 2.6, as will do other code | |
|
41 | # that imports 'sets' as of today | |
|
42 | warnings.filterwarnings('ignore', 'the sets module is deprecated', | |
|
43 | DeprecationWarning ) | |
|
44 | ||
|
45 | # This one also comes from Twisted | |
|
46 | warnings.filterwarnings('ignore', 'the sha module is deprecated', | |
|
47 | DeprecationWarning) | |
|
48 | ||
|
49 | #----------------------------------------------------------------------------- | |
|
50 | 38 | # Module errors |
|
51 | 39 | #----------------------------------------------------------------------------- |
|
52 | 40 |
@@ -1,7 +1,6 b'' | |||
|
1 | 1 | #!/usr/bin/env python |
|
2 | 2 | """The IPython Controller with 0MQ |
|
3 | This is the master object that handles connections from engines and clients, | |
|
4 | and monitors traffic through the various queues. | |
|
3 | This is a collection of one Hub and several Schedulers. | |
|
5 | 4 | """ |
|
6 | 5 | #----------------------------------------------------------------------------- |
|
7 | 6 | # Copyright (C) 2010 The IPython Development Team |
@@ -15,75 +14,25 b' and monitors traffic through the various queues.' | |||
|
15 | 14 | #----------------------------------------------------------------------------- |
|
16 | 15 | from __future__ import print_function |
|
17 | 16 | |
|
18 | import os | |
|
19 | import sys | |
|
20 | import time | |
|
21 | 17 | import logging |
|
22 | 18 | from multiprocessing import Process |
|
23 | 19 | |
|
24 | 20 | import zmq |
|
25 | from zmq.eventloop import ioloop | |
|
26 | from zmq.eventloop.zmqstream import ZMQStream | |
|
27 | # from zmq.devices import ProcessMonitoredQueue | |
|
28 | 21 | |
|
29 | 22 | # internal: |
|
30 | 23 | from IPython.utils.importstring import import_item |
|
31 | 24 | from IPython.utils.traitlets import Int, Str, Instance, List, Bool |
|
32 | from IPython.zmq.entry_point import bind_port | |
|
33 | 25 | |
|
34 | from entry_point import (make_base_argument_parser, select_random_ports, split_ports, | |
|
35 | connect_logger, parse_url, signal_children, generate_exec_key, | |
|
36 | local_logger) | |
|
26 | from entry_point import signal_children | |
|
37 | 27 | |
|
38 | 28 | |
|
39 | import streamsession as session | |
|
40 | import heartmonitor | |
|
41 | 29 | from scheduler import launch_scheduler |
|
42 | 30 | from hub import Hub, HubFactory |
|
43 | 31 | |
|
44 | from dictdb import DictDB | |
|
45 | try: | |
|
46 | import pymongo | |
|
47 | except ImportError: | |
|
48 | MongoDB=None | |
|
49 | else: | |
|
50 | from mongodb import MongoDB | |
|
51 | ||
|
52 | #------------------------------------------------------------------------- | |
|
53 | # Entry Point | |
|
54 | #------------------------------------------------------------------------- | |
|
55 | ||
|
56 | def make_argument_parser(): | |
|
57 | """Make an argument parser""" | |
|
58 | parser = make_base_argument_parser() | |
|
59 | ||
|
60 | parser.add_argument('--client', type=int, metavar='PORT', default=0, | |
|
61 | help='set the XREP port for clients [default: random]') | |
|
62 | parser.add_argument('--notice', type=int, metavar='PORT', default=0, | |
|
63 | help='set the PUB socket for registration notification [default: random]') | |
|
64 | parser.add_argument('--hb', type=str, metavar='PORTS', | |
|
65 | help='set the 2 ports for heartbeats [default: random]') | |
|
66 | parser.add_argument('--ping', type=int, default=100, | |
|
67 | help='set the heartbeat period in ms [default: 100]') | |
|
68 | parser.add_argument('--monitor', type=int, metavar='PORT', default=0, | |
|
69 | help='set the SUB port for queue monitoring [default: random]') | |
|
70 | parser.add_argument('--mux', type=str, metavar='PORTS', | |
|
71 | help='set the XREP ports for the MUX queue [default: random]') | |
|
72 | parser.add_argument('--task', type=str, metavar='PORTS', | |
|
73 | help='set the XREP/XREQ ports for the task queue [default: random]') | |
|
74 | parser.add_argument('--control', type=str, metavar='PORTS', | |
|
75 | help='set the XREP ports for the control queue [default: random]') | |
|
76 | parser.add_argument('--iopub', type=str, metavar='PORTS', | |
|
77 | help='set the PUB/SUB ports for the iopub relay [default: random]') | |
|
78 | parser.add_argument('--scheduler', type=str, default='lru', | |
|
79 | choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], | |
|
80 | help='select the task scheduler [default: Python LRU]') | |
|
81 | parser.add_argument('--mongodb', action='store_true', | |
|
82 | help='Use MongoDB task storage [default: in-memory]') | |
|
83 | parser.add_argument('--session', type=str, default=None, | |
|
84 | help='Manually specify the session id.') | |
|
32 | #----------------------------------------------------------------------------- | |
|
33 | # Configurable | |
|
34 | #----------------------------------------------------------------------------- | |
|
85 | 35 | |
|
86 | return parser | |
|
87 | 36 | |
|
88 | 37 | class ControllerFactory(HubFactory): |
|
89 | 38 | """Configurable for setting up a Hub and Schedulers.""" |
@@ -159,187 +108,3 b' class ControllerFactory(HubFactory):' | |||
|
159 | 108 | q.daemon=True |
|
160 | 109 | children.append(q) |
|
161 | 110 | |
|
162 | ||
|
163 | def main(argv=None): | |
|
164 | """DO NOT USE ME ANYMORE""" | |
|
165 | ||
|
166 | parser = make_argument_parser() | |
|
167 | ||
|
168 | args = parser.parse_args(argv) | |
|
169 | parse_url(args) | |
|
170 | ||
|
171 | iface="%s://%s"%(args.transport,args.ip)+':%i' | |
|
172 | ||
|
173 | random_ports = 0 | |
|
174 | if args.hb: | |
|
175 | hb = split_ports(args.hb, 2) | |
|
176 | else: | |
|
177 | hb = select_random_ports(2) | |
|
178 | if args.mux: | |
|
179 | mux = split_ports(args.mux, 2) | |
|
180 | else: | |
|
181 | mux = None | |
|
182 | random_ports += 2 | |
|
183 | if args.iopub: | |
|
184 | iopub = split_ports(args.iopub, 2) | |
|
185 | else: | |
|
186 | iopub = None | |
|
187 | random_ports += 2 | |
|
188 | if args.task: | |
|
189 | task = split_ports(args.task, 2) | |
|
190 | else: | |
|
191 | task = None | |
|
192 | random_ports += 2 | |
|
193 | if args.control: | |
|
194 | control = split_ports(args.control, 2) | |
|
195 | else: | |
|
196 | control = None | |
|
197 | random_ports += 2 | |
|
198 | ||
|
199 | ctx = zmq.Context() | |
|
200 | loop = ioloop.IOLoop.instance() | |
|
201 | ||
|
202 | ||
|
203 | # Registrar socket | |
|
204 | reg = ZMQStream(ctx.socket(zmq.XREP), loop) | |
|
205 | regport = bind_port(reg, args.ip, args.regport) | |
|
206 | ||
|
207 | ### Engine connections ### | |
|
208 | ||
|
209 | # heartbeat | |
|
210 | hpub = ctx.socket(zmq.PUB) | |
|
211 | bind_port(hpub, args.ip, hb[0]) | |
|
212 | hrep = ctx.socket(zmq.XREP) | |
|
213 | bind_port(hrep, args.ip, hb[1]) | |
|
214 | ||
|
215 | hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping) | |
|
216 | hmon.start() | |
|
217 | ||
|
218 | ### Client connections ### | |
|
219 | # Clientele socket | |
|
220 | c = ZMQStream(ctx.socket(zmq.XREP), loop) | |
|
221 | cport = bind_port(c, args.ip, args.client) | |
|
222 | # Notifier socket | |
|
223 | n = ZMQStream(ctx.socket(zmq.PUB), loop) | |
|
224 | nport = bind_port(n, args.ip, args.notice) | |
|
225 | ||
|
226 | ### Key File ### | |
|
227 | if args.execkey and not os.path.isfile(args.execkey): | |
|
228 | generate_exec_key(args.execkey) | |
|
229 | ||
|
230 | thesession = session.StreamSession(username=args.ident or "controller", | |
|
231 | keyfile=args.execkey, session=args.session) | |
|
232 | ||
|
233 | ### build and launch the queues ### | |
|
234 | ||
|
235 | # monitor socket | |
|
236 | sub = ctx.socket(zmq.SUB) | |
|
237 | sub.setsockopt(zmq.SUBSCRIBE, "") | |
|
238 | monport = bind_port(sub, args.ip, args.monitor) | |
|
239 | sub = ZMQStream(sub, loop) | |
|
240 | ||
|
241 | ports = select_random_ports(random_ports) | |
|
242 | children = [] | |
|
243 | ||
|
244 | # IOPub relay (in a Process) | |
|
245 | if not iopub: | |
|
246 | iopub = (ports.pop(),ports.pop()) | |
|
247 | q = ProcessMonitoredQueue(zmq.SUB, zmq.PUB, zmq.PUB, 'iopub', 'N/A') | |
|
248 | q.bind_in(iface%iopub[1]) | |
|
249 | q.bind_out(iface%iopub[0]) | |
|
250 | q.setsockopt_in(zmq.SUBSCRIBE, '') | |
|
251 | q.connect_mon(iface%monport) | |
|
252 | q.daemon=True | |
|
253 | q.start() | |
|
254 | children.append(q.launcher) | |
|
255 | ||
|
256 | # Multiplexer Queue (in a Process) | |
|
257 | if not mux: | |
|
258 | mux = (ports.pop(),ports.pop()) | |
|
259 | q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') | |
|
260 | q.bind_in(iface%mux[0]) | |
|
261 | q.bind_out(iface%mux[1]) | |
|
262 | q.connect_mon(iface%monport) | |
|
263 | q.daemon=True | |
|
264 | q.start() | |
|
265 | children.append(q.launcher) | |
|
266 | ||
|
267 | # Control Queue (in a Process) | |
|
268 | if not control: | |
|
269 | control = (ports.pop(),ports.pop()) | |
|
270 | q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') | |
|
271 | q.bind_in(iface%control[0]) | |
|
272 | q.bind_out(iface%control[1]) | |
|
273 | q.connect_mon(iface%monport) | |
|
274 | q.daemon=True | |
|
275 | q.start() | |
|
276 | children.append(q.launcher) | |
|
277 | # Task Queue (in a Process) | |
|
278 | if not task: | |
|
279 | task = (ports.pop(),ports.pop()) | |
|
280 | if args.scheduler == 'pure': | |
|
281 | q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') | |
|
282 | q.bind_in(iface%task[0]) | |
|
283 | q.bind_out(iface%task[1]) | |
|
284 | q.connect_mon(iface%monport) | |
|
285 | q.daemon=True | |
|
286 | q.start() | |
|
287 | children.append(q.launcher) | |
|
288 | else: | |
|
289 | log_addr = iface%args.logport if args.logport else None | |
|
290 | sargs = (iface%task[0], iface%task[1], iface%monport, iface%nport, | |
|
291 | log_addr, args.loglevel, args.scheduler) | |
|
292 | print (sargs) | |
|
293 | q = Process(target=launch_scheduler, args=sargs) | |
|
294 | q.daemon=True | |
|
295 | q.start() | |
|
296 | children.append(q) | |
|
297 | ||
|
298 | if args.mongodb: | |
|
299 | from mongodb import MongoDB | |
|
300 | db = MongoDB(thesession.session) | |
|
301 | else: | |
|
302 | db = DictDB() | |
|
303 | time.sleep(.25) | |
|
304 | ||
|
305 | # build connection dicts | |
|
306 | engine_addrs = { | |
|
307 | 'control' : iface%control[1], | |
|
308 | 'mux': iface%mux[1], | |
|
309 | 'heartbeat': (iface%hb[0], iface%hb[1]), | |
|
310 | 'task' : iface%task[1], | |
|
311 | 'iopub' : iface%iopub[1], | |
|
312 | 'monitor' : iface%monport, | |
|
313 | } | |
|
314 | ||
|
315 | client_addrs = { | |
|
316 | 'control' : iface%control[0], | |
|
317 | 'query': iface%cport, | |
|
318 | 'mux': iface%mux[0], | |
|
319 | 'task' : iface%task[0], | |
|
320 | 'iopub' : iface%iopub[0], | |
|
321 | 'notification': iface%nport | |
|
322 | } | |
|
323 | ||
|
324 | # setup logging | |
|
325 | if args.logport: | |
|
326 | connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel) | |
|
327 | else: | |
|
328 | local_logger(args.loglevel) | |
|
329 | ||
|
330 | # register relay of signals to the children | |
|
331 | signal_children(children) | |
|
332 | hub = Hub(loop=loop, session=thesession, monitor=sub, heartmonitor=hmon, | |
|
333 | registrar=reg, clientele=c, notifier=n, db=db, | |
|
334 | engine_addrs=engine_addrs, client_addrs=client_addrs) | |
|
335 | ||
|
336 | dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) | |
|
337 | dc.start() | |
|
338 | try: | |
|
339 | loop.start() | |
|
340 | except KeyboardInterrupt: | |
|
341 | print ("interrupted, exiting...", file=sys.__stderr__) | |
|
342 | ||
|
343 | ||
|
344 | if __name__ == '__main__': | |
|
345 | main() |
@@ -6,7 +6,6 b" connected to the Controller's queue(s)." | |||
|
6 | 6 | from __future__ import print_function |
|
7 | 7 | import sys |
|
8 | 8 | import time |
|
9 | import traceback | |
|
10 | 9 | import uuid |
|
11 | 10 | import logging |
|
12 | 11 | from pprint import pprint |
@@ -21,12 +20,9 b' from IPython.utils.traitlets import Instance, Str, Dict, Int, Type' | |||
|
21 | 20 | |
|
22 | 21 | from factory import RegistrationFactory |
|
23 | 22 | |
|
24 |
from streamsession import Message |
|
|
25 |
from streamkernel import Kernel |
|
|
23 | from streamsession import Message | |
|
24 | from streamkernel import Kernel | |
|
26 | 25 | import heartmonitor |
|
27 | from entry_point import (make_base_argument_parser, connect_engine_logger, parse_url, | |
|
28 | local_logger) | |
|
29 | # import taskthread | |
|
30 | 26 | |
|
31 | 27 | def printer(*msg): |
|
32 | 28 | # print (logging.handlers, file=sys.__stdout__) |
@@ -107,16 +103,15 b' class EngineFactory(RegistrationFactory):' | |||
|
107 | 103 | # print (hb_addrs) |
|
108 | 104 | |
|
109 | 105 | # # Redirect input streams and set a display hook. |
|
110 |
|
|
|
111 |
|
|
|
112 |
|
|
|
113 |
|
|
|
114 |
|
|
|
115 |
|
|
|
116 |
|
|
|
117 |
|
|
|
118 | ||
|
119 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() | |
|
106 | if self.out_stream_factory: | |
|
107 | sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout') | |
|
108 | sys.stdout.topic = 'engine.%i.stdout'%self.id | |
|
109 | sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr') | |
|
110 | sys.stderr.topic = 'engine.%i.stderr'%self.id | |
|
111 | if self.display_hook_factory: | |
|
112 | sys.displayhook = self.display_hook_factory(self.session, iopub_stream) | |
|
113 | sys.displayhook.topic = 'engine.%i.pyout'%self.id | |
|
114 | ||
|
120 | 115 | self.kernel = Kernel(int_id=self.id, ident=self.ident, session=self.session, |
|
121 | 116 | control_stream=control_stream, |
|
122 | 117 | shell_streams=shell_streams, iopub_stream=iopub_stream, loop=loop, |
@@ -124,6 +119,7 b' class EngineFactory(RegistrationFactory):' | |||
|
124 | 119 | self.kernel.start() |
|
125 | 120 | |
|
126 | 121 | heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) |
|
122 | # ioloop.DelayedCallback(heart.start, 1000, self.loop).start() | |
|
127 | 123 | heart.start() |
|
128 | 124 | |
|
129 | 125 | |
@@ -143,48 +139,3 b' class EngineFactory(RegistrationFactory):' | |||
|
143 | 139 | dc = ioloop.DelayedCallback(self.register, 0, self.loop) |
|
144 | 140 | dc.start() |
|
145 | 141 | |
|
146 | ||
|
147 | ||
|
148 | def main(argv=None, user_ns=None): | |
|
149 | """DO NOT USE ME ANYMORE""" | |
|
150 | parser = make_base_argument_parser() | |
|
151 | ||
|
152 | args = parser.parse_args(argv) | |
|
153 | ||
|
154 | parse_url(args) | |
|
155 | ||
|
156 | iface="%s://%s"%(args.transport,args.ip)+':%i' | |
|
157 | ||
|
158 | loop = ioloop.IOLoop.instance() | |
|
159 | session = StreamSession(keyfile=args.execkey) | |
|
160 | # print (session.key) | |
|
161 | ctx = zmq.Context() | |
|
162 | ||
|
163 | # setup logging | |
|
164 | ||
|
165 | reg_conn = iface % args.regport | |
|
166 | print (reg_conn, file=sys.__stdout__) | |
|
167 | print ("Starting the engine...", file=sys.__stderr__) | |
|
168 | ||
|
169 | reg = ctx.socket(zmq.PAIR) | |
|
170 | reg.connect(reg_conn) | |
|
171 | reg = zmqstream.ZMQStream(reg, loop) | |
|
172 | ||
|
173 | e = Engine(context=ctx, loop=loop, session=session, registrar=reg, | |
|
174 | ident=args.ident or '', user_ns=user_ns) | |
|
175 | if args.logport: | |
|
176 | print ("connecting logger to %s"%(iface%args.logport), file=sys.__stdout__) | |
|
177 | connect_engine_logger(ctx, iface%args.logport, e, loglevel=args.loglevel) | |
|
178 | else: | |
|
179 | local_logger(args.loglevel) | |
|
180 | ||
|
181 | dc = ioloop.DelayedCallback(e.start, 0, loop) | |
|
182 | dc.start() | |
|
183 | try: | |
|
184 | loop.start() | |
|
185 | except KeyboardInterrupt: | |
|
186 | print ("interrupted, exiting...", file=sys.__stderr__) | |
|
187 | ||
|
188 | # Execution as a script | |
|
189 | if __name__ == '__main__': | |
|
190 | main() |
@@ -28,15 +28,6 b' from IPython.core.ultratb import FormattedTB' | |||
|
28 | 28 | from IPython.external.argparse import ArgumentParser |
|
29 | 29 | from IPython.zmq.log import EnginePUBHandler |
|
30 | 30 | |
|
31 | def split_ports(s, n): | |
|
32 | """Parser helper for multiport strings""" | |
|
33 | if not s: | |
|
34 | return tuple([0]*n) | |
|
35 | ports = map(int, s.split(',')) | |
|
36 | if len(ports) != n: | |
|
37 | raise ValueError | |
|
38 | return ports | |
|
39 | ||
|
40 | 31 | _random_ports = set() |
|
41 | 32 | |
|
42 | 33 | def select_random_ports(n): |
@@ -57,18 +48,6 b' def select_random_ports(n):' | |||
|
57 | 48 | _random_ports.add(port) |
|
58 | 49 | return ports |
|
59 | 50 | |
|
60 | def parse_url(args): | |
|
61 | """Ensure args.url contains full transport://interface:port""" | |
|
62 | if args.url: | |
|
63 | iface = args.url.split('://',1) | |
|
64 | if len(args) == 2: | |
|
65 | args.transport,iface = iface | |
|
66 | iface = iface.split(':') | |
|
67 | args.ip = iface[0] | |
|
68 | if iface[1]: | |
|
69 | args.regport = iface[1] | |
|
70 | args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport) | |
|
71 | ||
|
72 | 51 | def signal_children(children): |
|
73 | 52 | """Relay interupt/term signals to children, for more solid process cleanup.""" |
|
74 | 53 | def terminate_children(sig, frame): |
@@ -92,34 +71,6 b' def generate_exec_key(keyfile):' | |||
|
92 | 71 | os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) |
|
93 | 72 | |
|
94 | 73 | |
|
95 | def make_base_argument_parser(): | |
|
96 | """ Creates an ArgumentParser for the generic arguments supported by all | |
|
97 | ipcluster entry points. | |
|
98 | """ | |
|
99 | ||
|
100 | parser = ArgumentParser() | |
|
101 | parser.add_argument('--ip', type=str, default='127.0.0.1', | |
|
102 | help='set the controller\'s IP address [default: local]') | |
|
103 | parser.add_argument('--transport', type=str, default='tcp', | |
|
104 | help='set the transport to use [default: tcp]') | |
|
105 | parser.add_argument('--regport', type=int, metavar='PORT', default=10101, | |
|
106 | help='set the XREP port for registration [default: 10101]') | |
|
107 | parser.add_argument('--logport', type=int, metavar='PORT', default=0, | |
|
108 | help='set the PUB port for remote logging [default: log to stdout]') | |
|
109 | parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.INFO, | |
|
110 | help='set the log level [default: INFO]') | |
|
111 | parser.add_argument('--ident', type=str, | |
|
112 | help='set the ZMQ identity [default: random]') | |
|
113 | parser.add_argument('--packer', type=str, default='json', | |
|
114 | choices=['json','pickle'], | |
|
115 | help='set the message format method [default: json]') | |
|
116 | parser.add_argument('--url', type=str, | |
|
117 | help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101') | |
|
118 | parser.add_argument('--execkey', type=str, | |
|
119 | help="File containing key for authenticating requests.") | |
|
120 | ||
|
121 | return parser | |
|
122 | ||
|
123 | 74 | def integer_loglevel(loglevel): |
|
124 | 75 | try: |
|
125 | 76 | loglevel = int(loglevel) |
@@ -158,26 +158,26 b' class HubFactory(RegistrationFactory):' | |||
|
158 | 158 | subconstructors = List() |
|
159 | 159 | _constructed = Bool(False) |
|
160 | 160 | |
|
161 | def _ip_changed(self, name, old, new): | |
|
162 | self.engine_ip = new | |
|
163 | self.client_ip = new | |
|
164 | self.monitor_ip = new | |
|
165 | self._update_monitor_url() | |
|
166 | ||
|
161 | 167 | def _update_monitor_url(self): |
|
162 | 168 | self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port) |
|
163 | 169 | |
|
164 | def _sync_ips(self): | |
|
165 |
self.engine_ |
|
|
166 |
self.client_ |
|
|
167 |
self.monitor_ |
|
|
168 | self._update_monitor_url() | |
|
169 | ||
|
170 | def _sync_transports(self): | |
|
171 | self.engine_transport = self.transport | |
|
172 | self.client_transport = self.transport | |
|
173 | self.monitor_transport = self.transport | |
|
170 | def _transport_changed(self, name, old, new): | |
|
171 | self.engine_transport = new | |
|
172 | self.client_transport = new | |
|
173 | self.monitor_transport = new | |
|
174 | 174 | self._update_monitor_url() |
|
175 | 175 | |
|
176 | 176 | def __init__(self, **kwargs): |
|
177 | 177 | super(HubFactory, self).__init__(**kwargs) |
|
178 | 178 | self._update_monitor_url() |
|
179 | self.on_trait_change(self._sync_ips, 'ip') | |
|
180 | self.on_trait_change(self._sync_transports, 'transport') | |
|
179 | # self.on_trait_change(self._sync_ips, 'ip') | |
|
180 | # self.on_trait_change(self._sync_transports, 'transport') | |
|
181 | 181 | self.subconstructors.append(self.construct_hub) |
|
182 | 182 | |
|
183 | 183 | |
@@ -334,45 +334,11 b' class Hub(HasTraits):' | |||
|
334 | 334 | """ |
|
335 | 335 | |
|
336 | 336 | super(Hub, self).__init__(**kwargs) |
|
337 | self.ids = set() | |
|
338 | self.pending = set() | |
|
339 | # self.keytable={} | |
|
340 | # self.incoming_registrations={} | |
|
341 | # self.engines = {} | |
|
342 | # self.by_ident = {} | |
|
343 | # self.clients = {} | |
|
344 | # self.hearts = {} | |
|
345 | # self.mia = set() | |
|
346 | 337 | self.registration_timeout = max(5000, 2*self.heartmonitor.period) |
|
347 | # this is the stuff that will move to DB: | |
|
348 | # self.pending = set() # pending messages, keyed by msg_id | |
|
349 | # self.queues = {} # pending msg_ids keyed by engine_id | |
|
350 | # self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id | |
|
351 | # self.completed = {} # completed msg_ids keyed by engine_id | |
|
352 | # self.all_completed = set() | |
|
353 | # self._idcounter = 0 | |
|
354 | # self.sockets = {} | |
|
355 | # self.loop = loop | |
|
356 | # self.session = session | |
|
357 | # self.registrar = registrar | |
|
358 | # self.clientele = clientele | |
|
359 | # self.queue = queue | |
|
360 | # self.heartmonitor = heartbeat | |
|
361 | # self.notifier = notifier | |
|
362 | # self.db = db | |
|
363 | 338 | |
|
364 | 339 | # validate connection dicts: |
|
365 | # self.client_addrs = client_addrs | |
|
366 | 340 | validate_url_container(self.client_addrs) |
|
367 | ||
|
368 | # assert isinstance(self.client_addrs['queue'], str) | |
|
369 | # assert isinstance(self.client_addrs['control'], str) | |
|
370 | # self.hb_addrs = hb_addrs | |
|
371 | 341 | validate_url_container(self.engine_addrs) |
|
372 | # self.engine_addrs = engine_addrs | |
|
373 | # assert isinstance(self.engine_addrs['queue'], str) | |
|
374 | # assert isinstance(self.engine_addrs['control'], str) | |
|
375 | # assert len(engine_addrs['heartbeat']) == 2 | |
|
376 | 342 | |
|
377 | 343 | # register our callbacks |
|
378 | 344 | self.registrar.on_recv(self.dispatch_register_request) |
@@ -409,7 +375,9 b' class Hub(HasTraits):' | |||
|
409 | 375 | |
|
410 | 376 | @property |
|
411 | 377 | def _next_id(self): |
|
412 |
"""gemerate a new ID |
|
|
378 | """gemerate a new ID. | |
|
379 | ||
|
380 | No longer reuse old ids, just count from 0.""" | |
|
413 | 381 | newid = self._idcounter |
|
414 | 382 | self._idcounter += 1 |
|
415 | 383 | return newid |
@@ -123,6 +123,11 b' class IPControllerAppConfigLoader(ClusterDirConfigLoader):' | |||
|
123 | 123 | help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat ' |
|
124 | 124 | 'connections [default: random]', |
|
125 | 125 | metavar='Hub.hb_ports') |
|
126 | paa('--ping', | |
|
127 | type=int, dest='HubFactory.ping', | |
|
128 | help='The frequency at which the Hub pings the engines for heartbeats ' | |
|
129 | ' (in ms) [default: 100]', | |
|
130 | metavar='Hub.ping') | |
|
126 | 131 | |
|
127 | 132 | # Client config |
|
128 | 133 | paa('--client-ip', |
@@ -204,10 +209,10 b' class IPControllerAppConfigLoader(ClusterDirConfigLoader):' | |||
|
204 | 209 | help='Try to reuse existing execution keys.') |
|
205 | 210 | paa('--no-secure', |
|
206 | 211 | action='store_false', dest='Global.secure', |
|
207 | help='Turn off execution keys.') | |
|
212 | help='Turn off execution keys (default).') | |
|
208 | 213 | paa('--secure', |
|
209 | 214 | action='store_true', dest='Global.secure', |
|
210 |
help='Turn on execution keys |
|
|
215 | help='Turn on execution keys.') | |
|
211 | 216 | paa('--execkey', |
|
212 | 217 | type=str, dest='Global.exec_key', |
|
213 | 218 | help='path to a file containing an execution key.', |
@@ -281,6 +286,19 b' class IPControllerApp(ApplicationWithClusterDir):' | |||
|
281 | 286 | self.log.error("Couldn't construct the Controller", exc_info=True) |
|
282 | 287 | self.exit(1) |
|
283 | 288 |
|
|
289 | def save_urls(self): | |
|
290 | """save the registration urls to files.""" | |
|
291 | c = self.master_config | |
|
292 | ||
|
293 | sec_dir = c.Global.security_dir | |
|
294 | cf = self.factory | |
|
295 | ||
|
296 | with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f: | |
|
297 | f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport)) | |
|
298 | ||
|
299 | with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f: | |
|
300 | f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport)) | |
|
301 | ||
|
284 | 302 | |
|
285 | 303 | def import_statements(self): |
|
286 | 304 | statements = self.master_config.Global.import_statements |
@@ -291,19 +309,19 b' class IPControllerApp(ApplicationWithClusterDir):' | |||
|
291 | 309 | except: |
|
292 | 310 | self.log.msg("Error running statement: %s" % s) |
|
293 | 311 | |
|
294 |
|
|
|
295 |
|
|
|
296 |
|
|
|
297 |
|
|
|
298 |
|
|
|
299 |
|
|
|
300 |
|
|
|
301 |
|
|
|
302 |
|
|
|
303 |
|
|
|
312 | def start_logging(self): | |
|
313 | super(IPControllerApp, self).start_logging() | |
|
314 | if self.master_config.Global.log_url: | |
|
315 | context = self.factory.context | |
|
316 | lsock = context.socket(zmq.PUB) | |
|
317 | lsock.connect(self.master_config.Global.log_url) | |
|
318 | handler = PUBHandler(lsock) | |
|
319 | handler.root_topic = 'controller' | |
|
320 | handler.setLevel(self.log_level) | |
|
321 | self.log.addHandler(handler) | |
|
304 | 322 | # |
|
305 | 323 | def start_app(self): |
|
306 |
# Start the |
|
|
324 | # Start the subprocesses: | |
|
307 | 325 | self.factory.start() |
|
308 | 326 | self.write_pid_file(overwrite=True) |
|
309 | 327 | try: |
@@ -218,12 +218,13 b" if 'setuptools' in sys.modules:" | |||
|
218 | 218 | 'ipcontrollerz = IPython.zmq.parallel.ipcontrollerapp:launch_new_instance', |
|
219 | 219 | 'ipenginez = IPython.zmq.parallel.ipengineapp:launch_new_instance', |
|
220 | 220 | 'iploggerz = IPython.zmq.parallel.iploggerapp:launch_new_instance', |
|
221 |
'ipclusterz = IPython.zmq.parallel.ipcluster |
|
|
221 | 'ipclusterz = IPython.zmq.parallel.ipclusterapp:launch_new_instance', | |
|
222 | 222 | 'iptest = IPython.testing.iptest:main', |
|
223 | 223 | 'irunner = IPython.lib.irunner:main' |
|
224 | 224 | ] |
|
225 | 225 | } |
|
226 | 226 | setup_args['extras_require'] = dict( |
|
227 | zmq = 'pyzmq>=2.0.10', | |
|
227 | 228 | doc='Sphinx>=0.3', |
|
228 | 229 | test='nose>=0.10.1', |
|
229 | 230 | security='pyOpenSSL>=0.6' |
General Comments 0
You need to be logged in to leave comments.
Login now