Show More
This diff has been collapsed as it changes many lines, (502 lines changed) Show them Hide them | |||||
@@ -0,0 +1,502 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | # encoding: utf-8 | |||
|
3 | """ | |||
|
4 | The ipcluster application. | |||
|
5 | """ | |||
|
6 | ||||
|
7 | #----------------------------------------------------------------------------- | |||
|
8 | # Copyright (C) 2008-2009 The IPython Development Team | |||
|
9 | # | |||
|
10 | # Distributed under the terms of the BSD License. The full license is in | |||
|
11 | # the file COPYING, distributed as part of this software. | |||
|
12 | #----------------------------------------------------------------------------- | |||
|
13 | ||||
|
14 | #----------------------------------------------------------------------------- | |||
|
15 | # Imports | |||
|
16 | #----------------------------------------------------------------------------- | |||
|
17 | ||||
|
18 | import logging | |||
|
19 | import os | |||
|
20 | import signal | |||
|
21 | import logging | |||
|
22 | ||||
|
23 | from zmq.eventloop import ioloop | |||
|
24 | ||||
|
25 | from IPython.external.argparse import ArgumentParser, SUPPRESS | |||
|
26 | from IPython.utils.importstring import import_item | |||
|
27 | from IPython.zmq.parallel.clusterdir import ( | |||
|
28 | ApplicationWithClusterDir, ClusterDirConfigLoader, | |||
|
29 | ClusterDirError, PIDFileError | |||
|
30 | ) | |||
|
31 | ||||
|
32 | ||||
|
33 | #----------------------------------------------------------------------------- | |||
|
34 | # Module level variables | |||
|
35 | #----------------------------------------------------------------------------- | |||
|
36 | ||||
|
37 | ||||
|
38 | default_config_file_name = u'ipcluster_config.py' | |||
|
39 | ||||
|
40 | ||||
|
41 | _description = """\ | |||
|
42 | Start an IPython cluster for parallel computing.\n\n | |||
|
43 | ||||
|
44 | An IPython cluster consists of 1 controller and 1 or more engines. | |||
|
45 | This command automates the startup of these processes using a wide | |||
|
46 | range of startup methods (SSH, local processes, PBS, mpiexec, | |||
|
47 | Windows HPC Server 2008). To start a cluster with 4 engines on your | |||
|
48 | local host simply do 'ipcluster start -n 4'. For more complex usage | |||
|
49 | you will typically do 'ipcluster create -p mycluster', then edit | |||
|
50 | configuration files, followed by 'ipcluster start -p mycluster -n 4'. | |||
|
51 | """ | |||
|
52 | ||||
|
53 | ||||
|
54 | # Exit codes for ipcluster | |||
|
55 | ||||
|
56 | # This will be the exit code if the ipcluster appears to be running because | |||
|
57 | # a .pid file exists | |||
|
58 | ALREADY_STARTED = 10 | |||
|
59 | ||||
|
60 | ||||
|
61 | # This will be the exit code if ipcluster stop is run, but there is not .pid | |||
|
62 | # file to be found. | |||
|
63 | ALREADY_STOPPED = 11 | |||
|
64 | ||||
|
65 | ||||
|
66 | #----------------------------------------------------------------------------- | |||
|
67 | # Command line options | |||
|
68 | #----------------------------------------------------------------------------- | |||
|
69 | ||||
|
70 | ||||
|
71 | class IPClusterAppConfigLoader(ClusterDirConfigLoader): | |||
|
72 | ||||
|
73 | def _add_arguments(self): | |||
|
74 | # Don't call ClusterDirConfigLoader._add_arguments as we don't want | |||
|
75 | # its defaults on self.parser. Instead, we will put those on | |||
|
76 | # default options on our subparsers. | |||
|
77 | ||||
|
78 | # This has all the common options that all subcommands use | |||
|
79 | parent_parser1 = ArgumentParser( | |||
|
80 | add_help=False, | |||
|
81 | argument_default=SUPPRESS | |||
|
82 | ) | |||
|
83 | self._add_ipython_dir(parent_parser1) | |||
|
84 | self._add_log_level(parent_parser1) | |||
|
85 | ||||
|
86 | # This has all the common options that other subcommands use | |||
|
87 | parent_parser2 = ArgumentParser( | |||
|
88 | add_help=False, | |||
|
89 | argument_default=SUPPRESS | |||
|
90 | ) | |||
|
91 | self._add_cluster_profile(parent_parser2) | |||
|
92 | self._add_cluster_dir(parent_parser2) | |||
|
93 | self._add_work_dir(parent_parser2) | |||
|
94 | paa = parent_parser2.add_argument | |||
|
95 | paa('--log-to-file', | |||
|
96 | action='store_true', dest='Global.log_to_file', | |||
|
97 | help='Log to a file in the log directory (default is stdout)') | |||
|
98 | ||||
|
99 | # Create the object used to create the subparsers. | |||
|
100 | subparsers = self.parser.add_subparsers( | |||
|
101 | dest='Global.subcommand', | |||
|
102 | title='ipcluster subcommands', | |||
|
103 | description= | |||
|
104 | """ipcluster has a variety of subcommands. The general way of | |||
|
105 | running ipcluster is 'ipcluster <cmd> [options]'. To get help | |||
|
106 | on a particular subcommand do 'ipcluster <cmd> -h'.""" | |||
|
107 | # help="For more help, type 'ipcluster <cmd> -h'", | |||
|
108 | ) | |||
|
109 | ||||
|
110 | # The "list" subcommand parser | |||
|
111 | parser_list = subparsers.add_parser( | |||
|
112 | 'list', | |||
|
113 | parents=[parent_parser1], | |||
|
114 | argument_default=SUPPRESS, | |||
|
115 | help="List all clusters in cwd and ipython_dir.", | |||
|
116 | description= | |||
|
117 | """List all available clusters, by cluster directory, that can | |||
|
118 | be found in the current working directly or in the ipython | |||
|
119 | directory. Cluster directories are named using the convention | |||
|
120 | 'cluster_<profile>'.""" | |||
|
121 | ) | |||
|
122 | ||||
|
123 | # The "create" subcommand parser | |||
|
124 | parser_create = subparsers.add_parser( | |||
|
125 | 'create', | |||
|
126 | parents=[parent_parser1, parent_parser2], | |||
|
127 | argument_default=SUPPRESS, | |||
|
128 | help="Create a new cluster directory.", | |||
|
129 | description= | |||
|
130 | """Create an ipython cluster directory by its profile name or | |||
|
131 | cluster directory path. Cluster directories contain | |||
|
132 | configuration, log and security related files and are named | |||
|
133 | using the convention 'cluster_<profile>'. By default they are | |||
|
134 | located in your ipython directory. Once created, you will | |||
|
135 | probably need to edit the configuration files in the cluster | |||
|
136 | directory to configure your cluster. Most users will create a | |||
|
137 | cluster directory by profile name, | |||
|
138 | 'ipcluster create -p mycluster', which will put the directory | |||
|
139 | in '<ipython_dir>/cluster_mycluster'. | |||
|
140 | """ | |||
|
141 | ) | |||
|
142 | paa = parser_create.add_argument | |||
|
143 | paa('--reset-config', | |||
|
144 | dest='Global.reset_config', action='store_true', | |||
|
145 | help= | |||
|
146 | """Recopy the default config files to the cluster directory. | |||
|
147 | You will loose any modifications you have made to these files.""") | |||
|
148 | ||||
|
149 | # The "start" subcommand parser | |||
|
150 | parser_start = subparsers.add_parser( | |||
|
151 | 'start', | |||
|
152 | parents=[parent_parser1, parent_parser2], | |||
|
153 | argument_default=SUPPRESS, | |||
|
154 | help="Start a cluster.", | |||
|
155 | description= | |||
|
156 | """Start an ipython cluster by its profile name or cluster | |||
|
157 | directory. Cluster directories contain configuration, log and | |||
|
158 | security related files and are named using the convention | |||
|
159 | 'cluster_<profile>' and should be creating using the 'start' | |||
|
160 | subcommand of 'ipcluster'. If your cluster directory is in | |||
|
161 | the cwd or the ipython directory, you can simply refer to it | |||
|
162 | using its profile name, 'ipcluster start -n 4 -p <profile>`, | |||
|
163 | otherwise use the '--cluster-dir' option. | |||
|
164 | """ | |||
|
165 | ) | |||
|
166 | paa = parser_start.add_argument | |||
|
167 | paa('-n', '--number', | |||
|
168 | type=int, dest='Global.n', | |||
|
169 | help='The number of engines to start.', | |||
|
170 | metavar='Global.n') | |||
|
171 | paa('--clean-logs', | |||
|
172 | dest='Global.clean_logs', action='store_true', | |||
|
173 | help='Delete old log flies before starting.') | |||
|
174 | paa('--no-clean-logs', | |||
|
175 | dest='Global.clean_logs', action='store_false', | |||
|
176 | help="Don't delete old log flies before starting.") | |||
|
177 | paa('--daemon', | |||
|
178 | dest='Global.daemonize', action='store_true', | |||
|
179 | help='Daemonize the ipcluster program. This implies --log-to-file') | |||
|
180 | paa('--no-daemon', | |||
|
181 | dest='Global.daemonize', action='store_false', | |||
|
182 | help="Dont't daemonize the ipcluster program.") | |||
|
183 | ||||
|
184 | # The "stop" subcommand parser | |||
|
185 | parser_stop = subparsers.add_parser( | |||
|
186 | 'stop', | |||
|
187 | parents=[parent_parser1, parent_parser2], | |||
|
188 | argument_default=SUPPRESS, | |||
|
189 | help="Stop a running cluster.", | |||
|
190 | description= | |||
|
191 | """Stop a running ipython cluster by its profile name or cluster | |||
|
192 | directory. Cluster directories are named using the convention | |||
|
193 | 'cluster_<profile>'. If your cluster directory is in | |||
|
194 | the cwd or the ipython directory, you can simply refer to it | |||
|
195 | using its profile name, 'ipcluster stop -p <profile>`, otherwise | |||
|
196 | use the '--cluster-dir' option. | |||
|
197 | """ | |||
|
198 | ) | |||
|
199 | paa = parser_stop.add_argument | |||
|
200 | paa('--signal', | |||
|
201 | dest='Global.signal', type=int, | |||
|
202 | help="The signal number to use in stopping the cluster (default=2).", | |||
|
203 | metavar="Global.signal") | |||
|
204 | ||||
|
205 | ||||
|
206 | #----------------------------------------------------------------------------- | |||
|
207 | # Main application | |||
|
208 | #----------------------------------------------------------------------------- | |||
|
209 | ||||
|
210 | ||||
|
211 | class IPClusterApp(ApplicationWithClusterDir): | |||
|
212 | ||||
|
213 | name = u'ipclusterz' | |||
|
214 | description = _description | |||
|
215 | usage = None | |||
|
216 | command_line_loader = IPClusterAppConfigLoader | |||
|
217 | default_config_file_name = default_config_file_name | |||
|
218 | default_log_level = logging.INFO | |||
|
219 | auto_create_cluster_dir = False | |||
|
220 | ||||
|
221 | def create_default_config(self): | |||
|
222 | super(IPClusterApp, self).create_default_config() | |||
|
223 | self.default_config.Global.controller_launcher = \ | |||
|
224 | 'IPython.zmq.parallel.launcher.LocalControllerLauncher' | |||
|
225 | self.default_config.Global.engine_launcher = \ | |||
|
226 | 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher' | |||
|
227 | self.default_config.Global.n = 2 | |||
|
228 | self.default_config.Global.reset_config = False | |||
|
229 | self.default_config.Global.clean_logs = True | |||
|
230 | self.default_config.Global.signal = 2 | |||
|
231 | self.default_config.Global.daemonize = False | |||
|
232 | ||||
|
233 | def find_resources(self): | |||
|
234 | subcommand = self.command_line_config.Global.subcommand | |||
|
235 | if subcommand=='list': | |||
|
236 | self.list_cluster_dirs() | |||
|
237 | # Exit immediately because there is nothing left to do. | |||
|
238 | self.exit() | |||
|
239 | elif subcommand=='create': | |||
|
240 | self.auto_create_cluster_dir = True | |||
|
241 | super(IPClusterApp, self).find_resources() | |||
|
242 | elif subcommand=='start' or subcommand=='stop': | |||
|
243 | self.auto_create_cluster_dir = True | |||
|
244 | try: | |||
|
245 | super(IPClusterApp, self).find_resources() | |||
|
246 | except ClusterDirError: | |||
|
247 | raise ClusterDirError( | |||
|
248 | "Could not find a cluster directory. A cluster dir must " | |||
|
249 | "be created before running 'ipcluster start'. Do " | |||
|
250 | "'ipcluster create -h' or 'ipcluster list -h' for more " | |||
|
251 | "information about creating and listing cluster dirs." | |||
|
252 | ) | |||
|
253 | ||||
|
254 | def list_cluster_dirs(self): | |||
|
255 | # Find the search paths | |||
|
256 | cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','') | |||
|
257 | if cluster_dir_paths: | |||
|
258 | cluster_dir_paths = cluster_dir_paths.split(':') | |||
|
259 | else: | |||
|
260 | cluster_dir_paths = [] | |||
|
261 | try: | |||
|
262 | ipython_dir = self.command_line_config.Global.ipython_dir | |||
|
263 | except AttributeError: | |||
|
264 | ipython_dir = self.default_config.Global.ipython_dir | |||
|
265 | paths = [os.getcwd(), ipython_dir] + \ | |||
|
266 | cluster_dir_paths | |||
|
267 | paths = list(set(paths)) | |||
|
268 | ||||
|
269 | self.log.info('Searching for cluster dirs in paths: %r' % paths) | |||
|
270 | for path in paths: | |||
|
271 | files = os.listdir(path) | |||
|
272 | for f in files: | |||
|
273 | full_path = os.path.join(path, f) | |||
|
274 | if os.path.isdir(full_path) and f.startswith('cluster_'): | |||
|
275 | profile = full_path.split('_')[-1] | |||
|
276 | start_cmd = 'ipcluster start -p %s -n 4' % profile | |||
|
277 | print start_cmd + " ==> " + full_path | |||
|
278 | ||||
|
279 | def pre_construct(self): | |||
|
280 | # IPClusterApp.pre_construct() is where we cd to the working directory. | |||
|
281 | super(IPClusterApp, self).pre_construct() | |||
|
282 | config = self.master_config | |||
|
283 | try: | |||
|
284 | daemon = config.Global.daemonize | |||
|
285 | if daemon: | |||
|
286 | config.Global.log_to_file = True | |||
|
287 | except AttributeError: | |||
|
288 | pass | |||
|
289 | ||||
|
290 | def construct(self): | |||
|
291 | config = self.master_config | |||
|
292 | subcmd = config.Global.subcommand | |||
|
293 | reset = config.Global.reset_config | |||
|
294 | if subcmd == 'list': | |||
|
295 | return | |||
|
296 | if subcmd == 'create': | |||
|
297 | self.log.info('Copying default config files to cluster directory ' | |||
|
298 | '[overwrite=%r]' % (reset,)) | |||
|
299 | self.cluster_dir_obj.copy_all_config_files(overwrite=reset) | |||
|
300 | if subcmd =='start': | |||
|
301 | self.cluster_dir_obj.copy_all_config_files(overwrite=False) | |||
|
302 | self.start_logging() | |||
|
303 | self.loop = ioloop.IOLoop.instance() | |||
|
304 | # reactor.callWhenRunning(self.start_launchers) | |||
|
305 | dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop) | |||
|
306 | dc.start() | |||
|
307 | ||||
|
308 | def start_launchers(self): | |||
|
309 | config = self.master_config | |||
|
310 | ||||
|
311 | # Create the launchers. In both bases, we set the work_dir of | |||
|
312 | # the launcher to the cluster_dir. This is where the launcher's | |||
|
313 | # subprocesses will be launched. It is not where the controller | |||
|
314 | # and engine will be launched. | |||
|
315 | el_class = import_item(config.Global.engine_launcher) | |||
|
316 | self.engine_launcher = el_class( | |||
|
317 | work_dir=self.cluster_dir, config=config | |||
|
318 | ) | |||
|
319 | cl_class = import_item(config.Global.controller_launcher) | |||
|
320 | self.controller_launcher = cl_class( | |||
|
321 | work_dir=self.cluster_dir, config=config | |||
|
322 | ) | |||
|
323 | ||||
|
324 | # Setup signals | |||
|
325 | signal.signal(signal.SIGINT, self.sigint_handler) | |||
|
326 | ||||
|
327 | # Setup the observing of stopping. If the controller dies, shut | |||
|
328 | # everything down as that will be completely fatal for the engines. | |||
|
329 | self.controller_launcher.on_stop(self.stop_launchers) | |||
|
330 | # d1.addCallback(self.stop_launchers) | |||
|
331 | # But, we don't monitor the stopping of engines. An engine dying | |||
|
332 | # is just fine and in principle a user could start a new engine. | |||
|
333 | # Also, if we did monitor engine stopping, it is difficult to | |||
|
334 | # know what to do when only some engines die. Currently, the | |||
|
335 | # observing of engine stopping is inconsistent. Some launchers | |||
|
336 | # might trigger on a single engine stopping, other wait until | |||
|
337 | # all stop. TODO: think more about how to handle this. | |||
|
338 | ||||
|
339 | # Start the controller and engines | |||
|
340 | self._stopping = False # Make sure stop_launchers is not called 2x. | |||
|
341 | d = self.start_controller() | |||
|
342 | self.start_engines() | |||
|
343 | self.startup_message() | |||
|
344 | # d.addCallback(self.start_engines) | |||
|
345 | # d.addCallback(self.startup_message) | |||
|
346 | # If the controller or engines fail to start, stop everything | |||
|
347 | # d.addErrback(self.stop_launchers) | |||
|
348 | return d | |||
|
349 | ||||
|
350 | def startup_message(self, r=None): | |||
|
351 | logging.info("IPython cluster: started") | |||
|
352 | return r | |||
|
353 | ||||
|
354 | def start_controller(self, r=None): | |||
|
355 | # logging.info("In start_controller") | |||
|
356 | config = self.master_config | |||
|
357 | d = self.controller_launcher.start( | |||
|
358 | cluster_dir=config.Global.cluster_dir | |||
|
359 | ) | |||
|
360 | return d | |||
|
361 | ||||
|
362 | def start_engines(self, r=None): | |||
|
363 | # logging.info("In start_engines") | |||
|
364 | config = self.master_config | |||
|
365 | d = self.engine_launcher.start( | |||
|
366 | config.Global.n, | |||
|
367 | cluster_dir=config.Global.cluster_dir | |||
|
368 | ) | |||
|
369 | return d | |||
|
370 | ||||
|
371 | def stop_controller(self, r=None): | |||
|
372 | # logging.info("In stop_controller") | |||
|
373 | if self.controller_launcher.running: | |||
|
374 | return self.controller_launcher.stop() | |||
|
375 | ||||
|
376 | def stop_engines(self, r=None): | |||
|
377 | # logging.info("In stop_engines") | |||
|
378 | if self.engine_launcher.running: | |||
|
379 | d = self.engine_launcher.stop() | |||
|
380 | # d.addErrback(self.log_err) | |||
|
381 | return d | |||
|
382 | else: | |||
|
383 | return None | |||
|
384 | ||||
|
385 | def log_err(self, f): | |||
|
386 | logging.error(f.getTraceback()) | |||
|
387 | return None | |||
|
388 | ||||
|
389 | def stop_launchers(self, r=None): | |||
|
390 | if not self._stopping: | |||
|
391 | self._stopping = True | |||
|
392 | # if isinstance(r, failure.Failure): | |||
|
393 | # logging.error('Unexpected error in ipcluster:') | |||
|
394 | # logging.info(r.getTraceback()) | |||
|
395 | logging.error("IPython cluster: stopping") | |||
|
396 | # These return deferreds. We are not doing anything with them | |||
|
397 | # but we are holding refs to them as a reminder that they | |||
|
398 | # do return deferreds. | |||
|
399 | d1 = self.stop_engines() | |||
|
400 | d2 = self.stop_controller() | |||
|
401 | # Wait a few seconds to let things shut down. | |||
|
402 | dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop) | |||
|
403 | dc.start() | |||
|
404 | # reactor.callLater(4.0, reactor.stop) | |||
|
405 | ||||
|
406 | def sigint_handler(self, signum, frame): | |||
|
407 | self.stop_launchers() | |||
|
408 | ||||
|
409 | def start_logging(self): | |||
|
410 | # Remove old log files of the controller and engine | |||
|
411 | if self.master_config.Global.clean_logs: | |||
|
412 | log_dir = self.master_config.Global.log_dir | |||
|
413 | for f in os.listdir(log_dir): | |||
|
414 | if f.startswith('ipengine' + '-'): | |||
|
415 | if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'): | |||
|
416 | os.remove(os.path.join(log_dir, f)) | |||
|
417 | if f.startswith('ipcontroller' + '-'): | |||
|
418 | if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'): | |||
|
419 | os.remove(os.path.join(log_dir, f)) | |||
|
420 | # This will remote old log files for ipcluster itself | |||
|
421 | super(IPClusterApp, self).start_logging() | |||
|
422 | ||||
|
423 | def start_app(self): | |||
|
424 | """Start the application, depending on what subcommand is used.""" | |||
|
425 | subcmd = self.master_config.Global.subcommand | |||
|
426 | if subcmd=='create' or subcmd=='list': | |||
|
427 | return | |||
|
428 | elif subcmd=='start': | |||
|
429 | self.start_app_start() | |||
|
430 | elif subcmd=='stop': | |||
|
431 | self.start_app_stop() | |||
|
432 | ||||
|
433 | def start_app_start(self): | |||
|
434 | """Start the app for the start subcommand.""" | |||
|
435 | config = self.master_config | |||
|
436 | # First see if the cluster is already running | |||
|
437 | try: | |||
|
438 | pid = self.get_pid_from_file() | |||
|
439 | except PIDFileError: | |||
|
440 | pass | |||
|
441 | else: | |||
|
442 | self.log.critical( | |||
|
443 | 'Cluster is already running with [pid=%s]. ' | |||
|
444 | 'use "ipcluster stop" to stop the cluster.' % pid | |||
|
445 | ) | |||
|
446 | # Here I exit with a unusual exit status that other processes | |||
|
447 | # can watch for to learn how I existed. | |||
|
448 | self.exit(ALREADY_STARTED) | |||
|
449 | ||||
|
450 | # Now log and daemonize | |||
|
451 | self.log.info( | |||
|
452 | 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize | |||
|
453 | ) | |||
|
454 | # TODO: Get daemonize working on Windows or as a Windows Server. | |||
|
455 | if config.Global.daemonize: | |||
|
456 | if os.name=='posix': | |||
|
457 | from twisted.scripts._twistd_unix import daemonize | |||
|
458 | daemonize() | |||
|
459 | ||||
|
460 | # Now write the new pid file AFTER our new forked pid is active. | |||
|
461 | self.write_pid_file() | |||
|
462 | try: | |||
|
463 | self.loop.start() | |||
|
464 | except: | |||
|
465 | logging.info("stopping...") | |||
|
466 | self.remove_pid_file() | |||
|
467 | ||||
|
468 | def start_app_stop(self): | |||
|
469 | """Start the app for the stop subcommand.""" | |||
|
470 | config = self.master_config | |||
|
471 | try: | |||
|
472 | pid = self.get_pid_from_file() | |||
|
473 | except PIDFileError: | |||
|
474 | self.log.critical( | |||
|
475 | 'Problem reading pid file, cluster is probably not running.' | |||
|
476 | ) | |||
|
477 | # Here I exit with a unusual exit status that other processes | |||
|
478 | # can watch for to learn how I existed. | |||
|
479 | self.exit(ALREADY_STOPPED) | |||
|
480 | else: | |||
|
481 | if os.name=='posix': | |||
|
482 | sig = config.Global.signal | |||
|
483 | self.log.info( | |||
|
484 | "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig) | |||
|
485 | ) | |||
|
486 | os.kill(pid, sig) | |||
|
487 | elif os.name=='nt': | |||
|
488 | # As of right now, we don't support daemonize on Windows, so | |||
|
489 | # stop will not do anything. Minimally, it should clean up the | |||
|
490 | # old .pid files. | |||
|
491 | self.remove_pid_file() | |||
|
492 | ||||
|
493 | ||||
|
494 | def launch_new_instance(): | |||
|
495 | """Create and run the IPython cluster.""" | |||
|
496 | app = IPClusterApp() | |||
|
497 | app.start() | |||
|
498 | ||||
|
499 | ||||
|
500 | if __name__ == '__main__': | |||
|
501 | launch_new_instance() | |||
|
502 |
This diff has been collapsed as it changes many lines, (824 lines changed) Show them Hide them | |||||
@@ -0,0 +1,824 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | # encoding: utf-8 | |||
|
3 | """ | |||
|
4 | Facilities for launching IPython processes asynchronously. | |||
|
5 | """ | |||
|
6 | ||||
|
7 | #----------------------------------------------------------------------------- | |||
|
8 | # Copyright (C) 2008-2009 The IPython Development Team | |||
|
9 | # | |||
|
10 | # Distributed under the terms of the BSD License. The full license is in | |||
|
11 | # the file COPYING, distributed as part of this software. | |||
|
12 | #----------------------------------------------------------------------------- | |||
|
13 | ||||
|
14 | #----------------------------------------------------------------------------- | |||
|
15 | # Imports | |||
|
16 | #----------------------------------------------------------------------------- | |||
|
17 | ||||
|
18 | import os | |||
|
19 | import re | |||
|
20 | import sys | |||
|
21 | import logging | |||
|
22 | ||||
|
23 | from signal import SIGINT | |||
|
24 | try: | |||
|
25 | from signal import SIGKILL | |||
|
26 | except ImportError: | |||
|
27 | SIGKILL=SIGTERM | |||
|
28 | ||||
|
29 | from subprocess import Popen, PIPE | |||
|
30 | ||||
|
31 | from zmq.eventloop import ioloop | |||
|
32 | ||||
|
33 | from IPython.config.configurable import Configurable | |||
|
34 | from IPython.utils.traitlets import Str, Int, List, Unicode, Instance | |||
|
35 | from IPython.utils.path import get_ipython_module_path | |||
|
36 | from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError | |||
|
37 | ||||
|
38 | # from IPython.kernel.winhpcjob import ( | |||
|
39 | # IPControllerTask, IPEngineTask, | |||
|
40 | # IPControllerJob, IPEngineSetJob | |||
|
41 | # ) | |||
|
42 | ||||
|
43 | ||||
|
44 | #----------------------------------------------------------------------------- | |||
|
45 | # Paths to the kernel apps | |||
|
46 | #----------------------------------------------------------------------------- | |||
|
47 | ||||
|
48 | ||||
|
49 | ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path( | |||
|
50 | 'IPython.zmq.parallel.ipclusterapp' | |||
|
51 | )) | |||
|
52 | ||||
|
53 | ipengine_cmd_argv = pycmd2argv(get_ipython_module_path( | |||
|
54 | 'IPython.zmq.parallel.ipengineapp' | |||
|
55 | )) | |||
|
56 | ||||
|
57 | ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path( | |||
|
58 | 'IPython.zmq.parallel.ipcontrollerapp' | |||
|
59 | )) | |||
|
60 | ||||
|
61 | #----------------------------------------------------------------------------- | |||
|
62 | # Base launchers and errors | |||
|
63 | #----------------------------------------------------------------------------- | |||
|
64 | ||||
|
65 | ||||
|
66 | class LauncherError(Exception): | |||
|
67 | pass | |||
|
68 | ||||
|
69 | ||||
|
70 | class ProcessStateError(LauncherError): | |||
|
71 | pass | |||
|
72 | ||||
|
73 | ||||
|
74 | class UnknownStatus(LauncherError): | |||
|
75 | pass | |||
|
76 | ||||
|
77 | ||||
|
78 | class BaseLauncher(Configurable): | |||
|
79 | """An asbtraction for starting, stopping and signaling a process.""" | |||
|
80 | ||||
|
81 | # In all of the launchers, the work_dir is where child processes will be | |||
|
82 | # run. This will usually be the cluster_dir, but may not be. any work_dir | |||
|
83 | # passed into the __init__ method will override the config value. | |||
|
84 | # This should not be used to set the work_dir for the actual engine | |||
|
85 | # and controller. Instead, use their own config files or the | |||
|
86 | # controller_args, engine_args attributes of the launchers to add | |||
|
87 | # the --work-dir option. | |||
|
88 | work_dir = Unicode(u'.') | |||
|
89 | loop = Instance('zmq.eventloop.ioloop.IOLoop') | |||
|
90 | def _loop_default(self): | |||
|
91 | return ioloop.IOLoop.instance() | |||
|
92 | ||||
|
93 | def __init__(self, work_dir=u'.', config=None): | |||
|
94 | super(BaseLauncher, self).__init__(work_dir=work_dir, config=config) | |||
|
95 | self.state = 'before' # can be before, running, after | |||
|
96 | self.stop_callbacks = [] | |||
|
97 | self.start_data = None | |||
|
98 | self.stop_data = None | |||
|
99 | ||||
|
100 | @property | |||
|
101 | def args(self): | |||
|
102 | """A list of cmd and args that will be used to start the process. | |||
|
103 | ||||
|
104 | This is what is passed to :func:`spawnProcess` and the first element | |||
|
105 | will be the process name. | |||
|
106 | """ | |||
|
107 | return self.find_args() | |||
|
108 | ||||
|
109 | def find_args(self): | |||
|
110 | """The ``.args`` property calls this to find the args list. | |||
|
111 | ||||
|
112 | Subcommand should implement this to construct the cmd and args. | |||
|
113 | """ | |||
|
114 | raise NotImplementedError('find_args must be implemented in a subclass') | |||
|
115 | ||||
|
116 | @property | |||
|
117 | def arg_str(self): | |||
|
118 | """The string form of the program arguments.""" | |||
|
119 | return ' '.join(self.args) | |||
|
120 | ||||
|
121 | @property | |||
|
122 | def running(self): | |||
|
123 | """Am I running.""" | |||
|
124 | if self.state == 'running': | |||
|
125 | return True | |||
|
126 | else: | |||
|
127 | return False | |||
|
128 | ||||
|
129 | def start(self): | |||
|
130 | """Start the process. | |||
|
131 | ||||
|
132 | This must return a deferred that fires with information about the | |||
|
133 | process starting (like a pid, job id, etc.). | |||
|
134 | """ | |||
|
135 | raise NotImplementedError('start must be implemented in a subclass') | |||
|
136 | ||||
|
137 | def stop(self): | |||
|
138 | """Stop the process and notify observers of stopping. | |||
|
139 | ||||
|
140 | This must return a deferred that fires with information about the | |||
|
141 | processing stopping, like errors that occur while the process is | |||
|
142 | attempting to be shut down. This deferred won't fire when the process | |||
|
143 | actually stops. To observe the actual process stopping, see | |||
|
144 | :func:`observe_stop`. | |||
|
145 | """ | |||
|
146 | raise NotImplementedError('stop must be implemented in a subclass') | |||
|
147 | ||||
|
148 | def on_stop(self, f): | |||
|
149 | """Get a deferred that will fire when the process stops. | |||
|
150 | ||||
|
151 | The deferred will fire with data that contains information about | |||
|
152 | the exit status of the process. | |||
|
153 | """ | |||
|
154 | if self.state=='after': | |||
|
155 | return f(self.stop_data) | |||
|
156 | else: | |||
|
157 | self.stop_callbacks.append(f) | |||
|
158 | ||||
|
159 | def notify_start(self, data): | |||
|
160 | """Call this to trigger startup actions. | |||
|
161 | ||||
|
162 | This logs the process startup and sets the state to 'running'. It is | |||
|
163 | a pass-through so it can be used as a callback. | |||
|
164 | """ | |||
|
165 | ||||
|
166 | logging.info('Process %r started: %r' % (self.args[0], data)) | |||
|
167 | self.start_data = data | |||
|
168 | self.state = 'running' | |||
|
169 | return data | |||
|
170 | ||||
|
171 | def notify_stop(self, data): | |||
|
172 | """Call this to trigger process stop actions. | |||
|
173 | ||||
|
174 | This logs the process stopping and sets the state to 'after'. Call | |||
|
175 | this to trigger all the deferreds from :func:`observe_stop`.""" | |||
|
176 | ||||
|
177 | logging.info('Process %r stopped: %r' % (self.args[0], data)) | |||
|
178 | self.stop_data = data | |||
|
179 | self.state = 'after' | |||
|
180 | for i in range(len(self.stop_callbacks)): | |||
|
181 | d = self.stop_callbacks.pop() | |||
|
182 | d(data) | |||
|
183 | return data | |||
|
184 | ||||
|
185 | def signal(self, sig): | |||
|
186 | """Signal the process. | |||
|
187 | ||||
|
188 | Return a semi-meaningless deferred after signaling the process. | |||
|
189 | ||||
|
190 | Parameters | |||
|
191 | ---------- | |||
|
192 | sig : str or int | |||
|
193 | 'KILL', 'INT', etc., or any signal number | |||
|
194 | """ | |||
|
195 | raise NotImplementedError('signal must be implemented in a subclass') | |||
|
196 | ||||
|
197 | ||||
|
198 | #----------------------------------------------------------------------------- | |||
|
199 | # Local process launchers | |||
|
200 | #----------------------------------------------------------------------------- | |||
|
201 | ||||
|
202 | ||||
|
203 | class LocalProcessLauncher(BaseLauncher): | |||
|
204 | """Start and stop an external process in an asynchronous manner. | |||
|
205 | ||||
|
206 | This will launch the external process with a working directory of | |||
|
207 | ``self.work_dir``. | |||
|
208 | """ | |||
|
209 | ||||
|
210 | # This is used to to construct self.args, which is passed to | |||
|
211 | # spawnProcess. | |||
|
212 | cmd_and_args = List([]) | |||
|
213 | poll_frequency = Int(100) # in ms | |||
|
214 | ||||
|
215 | def __init__(self, work_dir=u'.', config=None): | |||
|
216 | super(LocalProcessLauncher, self).__init__( | |||
|
217 | work_dir=work_dir, config=config | |||
|
218 | ) | |||
|
219 | self.process = None | |||
|
220 | self.start_deferred = None | |||
|
221 | self.poller = None | |||
|
222 | ||||
|
223 | def find_args(self): | |||
|
224 | return self.cmd_and_args | |||
|
225 | ||||
|
226 | def start(self): | |||
|
227 | if self.state == 'before': | |||
|
228 | self.process = Popen(self.args, | |||
|
229 | stdout=PIPE,stderr=PIPE,stdin=PIPE, | |||
|
230 | env=os.environ, | |||
|
231 | cwd=self.work_dir | |||
|
232 | ) | |||
|
233 | ||||
|
234 | self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ) | |||
|
235 | self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ) | |||
|
236 | self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop) | |||
|
237 | self.poller.start() | |||
|
238 | self.notify_start(self.process.pid) | |||
|
239 | else: | |||
|
240 | s = 'The process was already started and has state: %r' % self.state | |||
|
241 | raise ProcessStateError(s) | |||
|
242 | ||||
|
243 | def stop(self): | |||
|
244 | return self.interrupt_then_kill() | |||
|
245 | ||||
|
246 | def signal(self, sig): | |||
|
247 | if self.state == 'running': | |||
|
248 | self.process.send_signal(sig) | |||
|
249 | ||||
|
250 | def interrupt_then_kill(self, delay=2.0): | |||
|
251 | """Send INT, wait a delay and then send KILL.""" | |||
|
252 | self.signal(SIGINT) | |||
|
253 | self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop) | |||
|
254 | self.killer.start() | |||
|
255 | ||||
|
256 | # callbacks, etc: | |||
|
257 | ||||
|
258 | def handle_stdout(self, fd, events): | |||
|
259 | line = self.process.stdout.readline() | |||
|
260 | # a stopped process will be readable but return empty strings | |||
|
261 | if line: | |||
|
262 | logging.info(line[:-1]) | |||
|
263 | else: | |||
|
264 | self.poll() | |||
|
265 | ||||
|
266 | def handle_stderr(self, fd, events): | |||
|
267 | line = self.process.stderr.readline() | |||
|
268 | # a stopped process will be readable but return empty strings | |||
|
269 | if line: | |||
|
270 | logging.error(line[:-1]) | |||
|
271 | else: | |||
|
272 | self.poll() | |||
|
273 | ||||
|
274 | def poll(self): | |||
|
275 | status = self.process.poll() | |||
|
276 | if status is not None: | |||
|
277 | self.poller.stop() | |||
|
278 | self.loop.remove_handler(self.process.stdout.fileno()) | |||
|
279 | self.loop.remove_handler(self.process.stderr.fileno()) | |||
|
280 | self.notify_stop(dict(exit_code=status, pid=self.process.pid)) | |||
|
281 | return status | |||
|
282 | ||||
|
283 | class LocalControllerLauncher(LocalProcessLauncher): | |||
|
284 | """Launch a controller as a regular external process.""" | |||
|
285 | ||||
|
286 | controller_cmd = List(ipcontroller_cmd_argv, config=True) | |||
|
287 | # Command line arguments to ipcontroller. | |||
|
288 | controller_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True) | |||
|
289 | ||||
|
290 | def find_args(self): | |||
|
291 | return self.controller_cmd + self.controller_args | |||
|
292 | ||||
|
293 | def start(self, cluster_dir): | |||
|
294 | """Start the controller by cluster_dir.""" | |||
|
295 | self.controller_args.extend(['--cluster-dir', cluster_dir]) | |||
|
296 | self.cluster_dir = unicode(cluster_dir) | |||
|
297 | logging.info("Starting LocalControllerLauncher: %r" % self.args) | |||
|
298 | return super(LocalControllerLauncher, self).start() | |||
|
299 | ||||
|
300 | ||||
|
301 | class LocalEngineLauncher(LocalProcessLauncher): | |||
|
302 | """Launch a single engine as a regular externall process.""" | |||
|
303 | ||||
|
304 | engine_cmd = List(ipengine_cmd_argv, config=True) | |||
|
305 | # Command line arguments for ipengine. | |||
|
306 | engine_args = List( | |||
|
307 | ['--log-to-file','--log-level', str(logging.ERROR)], config=True | |||
|
308 | ) | |||
|
309 | ||||
|
310 | def find_args(self): | |||
|
311 | return self.engine_cmd + self.engine_args | |||
|
312 | ||||
|
313 | def start(self, cluster_dir): | |||
|
314 | """Start the engine by cluster_dir.""" | |||
|
315 | self.engine_args.extend(['--cluster-dir', cluster_dir]) | |||
|
316 | self.cluster_dir = unicode(cluster_dir) | |||
|
317 | return super(LocalEngineLauncher, self).start() | |||
|
318 | ||||
|
319 | ||||
|
320 | class LocalEngineSetLauncher(BaseLauncher): | |||
|
321 | """Launch a set of engines as regular external processes.""" | |||
|
322 | ||||
|
323 | # Command line arguments for ipengine. | |||
|
324 | engine_args = List( | |||
|
325 | ['--log-to-file','--log-level', str(logging.ERROR)], config=True | |||
|
326 | ) | |||
|
327 | # launcher class | |||
|
328 | launcher_class = LocalEngineLauncher | |||
|
329 | ||||
|
330 | def __init__(self, work_dir=u'.', config=None): | |||
|
331 | super(LocalEngineSetLauncher, self).__init__( | |||
|
332 | work_dir=work_dir, config=config | |||
|
333 | ) | |||
|
334 | self.launchers = {} | |||
|
335 | self.stop_data = {} | |||
|
336 | ||||
|
337 | def start(self, n, cluster_dir): | |||
|
338 | """Start n engines by profile or cluster_dir.""" | |||
|
339 | self.cluster_dir = unicode(cluster_dir) | |||
|
340 | dlist = [] | |||
|
341 | for i in range(n): | |||
|
342 | el = self.launcher_class(work_dir=self.work_dir, config=self.config) | |||
|
343 | # Copy the engine args over to each engine launcher. | |||
|
344 | import copy | |||
|
345 | el.engine_args = copy.deepcopy(self.engine_args) | |||
|
346 | el.on_stop(self._notice_engine_stopped) | |||
|
347 | d = el.start(cluster_dir) | |||
|
348 | if i==0: | |||
|
349 | logging.info("Starting LocalEngineSetLauncher: %r" % el.args) | |||
|
350 | self.launchers[i] = el | |||
|
351 | dlist.append(d) | |||
|
352 | self.notify_start(dlist) | |||
|
353 | # The consumeErrors here could be dangerous | |||
|
354 | # dfinal = gatherBoth(dlist, consumeErrors=True) | |||
|
355 | # dfinal.addCallback(self.notify_start) | |||
|
356 | return dlist | |||
|
357 | ||||
|
358 | def find_args(self): | |||
|
359 | return ['engine set'] | |||
|
360 | ||||
|
361 | def signal(self, sig): | |||
|
362 | dlist = [] | |||
|
363 | for el in self.launchers.itervalues(): | |||
|
364 | d = el.signal(sig) | |||
|
365 | dlist.append(d) | |||
|
366 | # dfinal = gatherBoth(dlist, consumeErrors=True) | |||
|
367 | return dlist | |||
|
368 | ||||
|
369 | def interrupt_then_kill(self, delay=1.0): | |||
|
370 | dlist = [] | |||
|
371 | for el in self.launchers.itervalues(): | |||
|
372 | d = el.interrupt_then_kill(delay) | |||
|
373 | dlist.append(d) | |||
|
374 | # dfinal = gatherBoth(dlist, consumeErrors=True) | |||
|
375 | return dlist | |||
|
376 | ||||
|
377 | def stop(self): | |||
|
378 | return self.interrupt_then_kill() | |||
|
379 | ||||
|
380 | def _notice_engine_stopped(self, data): | |||
|
381 | print "notice", data | |||
|
382 | pid = data['pid'] | |||
|
383 | for idx,el in self.launchers.iteritems(): | |||
|
384 | if el.process.pid == pid: | |||
|
385 | break | |||
|
386 | self.launchers.pop(idx) | |||
|
387 | self.stop_data[idx] = data | |||
|
388 | if not self.launchers: | |||
|
389 | self.notify_stop(self.stop_data) | |||
|
390 | ||||
|
391 | ||||
|
392 | #----------------------------------------------------------------------------- | |||
|
393 | # MPIExec launchers | |||
|
394 | #----------------------------------------------------------------------------- | |||
|
395 | ||||
|
396 | ||||
|
397 | class MPIExecLauncher(LocalProcessLauncher): | |||
|
398 | """Launch an external process using mpiexec.""" | |||
|
399 | ||||
|
400 | # The mpiexec command to use in starting the process. | |||
|
401 | mpi_cmd = List(['mpiexec'], config=True) | |||
|
402 | # The command line arguments to pass to mpiexec. | |||
|
403 | mpi_args = List([], config=True) | |||
|
404 | # The program to start using mpiexec. | |||
|
405 | program = List(['date'], config=True) | |||
|
406 | # The command line argument to the program. | |||
|
407 | program_args = List([], config=True) | |||
|
408 | # The number of instances of the program to start. | |||
|
409 | n = Int(1, config=True) | |||
|
410 | ||||
|
411 | def find_args(self): | |||
|
412 | """Build self.args using all the fields.""" | |||
|
413 | return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ | |||
|
414 | self.program + self.program_args | |||
|
415 | ||||
|
416 | def start(self, n): | |||
|
417 | """Start n instances of the program using mpiexec.""" | |||
|
418 | self.n = n | |||
|
419 | return super(MPIExecLauncher, self).start() | |||
|
420 | ||||
|
421 | ||||
|
422 | class MPIExecControllerLauncher(MPIExecLauncher): | |||
|
423 | """Launch a controller using mpiexec.""" | |||
|
424 | ||||
|
425 | controller_cmd = List(ipcontroller_cmd_argv, config=True) | |||
|
426 | # Command line arguments to ipcontroller. | |||
|
427 | controller_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True) | |||
|
428 | n = Int(1, config=False) | |||
|
429 | ||||
|
430 | def start(self, cluster_dir): | |||
|
431 | """Start the controller by cluster_dir.""" | |||
|
432 | self.controller_args.extend(['--cluster-dir', cluster_dir]) | |||
|
433 | self.cluster_dir = unicode(cluster_dir) | |||
|
434 | logging.info("Starting MPIExecControllerLauncher: %r" % self.args) | |||
|
435 | return super(MPIExecControllerLauncher, self).start(1) | |||
|
436 | ||||
|
437 | def find_args(self): | |||
|
438 | return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ | |||
|
439 | self.controller_cmd + self.controller_args | |||
|
440 | ||||
|
441 | ||||
|
442 | class MPIExecEngineSetLauncher(MPIExecLauncher): | |||
|
443 | ||||
|
444 | engine_cmd = List(ipengine_cmd_argv, config=True) | |||
|
445 | # Command line arguments for ipengine. | |||
|
446 | engine_args = List( | |||
|
447 | ['--log-to-file','--log-level', str(logging.ERROR)], config=True | |||
|
448 | ) | |||
|
449 | n = Int(1, config=True) | |||
|
450 | ||||
|
451 | def start(self, n, cluster_dir): | |||
|
452 | """Start n engines by profile or cluster_dir.""" | |||
|
453 | self.engine_args.extend(['--cluster-dir', cluster_dir]) | |||
|
454 | self.cluster_dir = unicode(cluster_dir) | |||
|
455 | self.n = n | |||
|
456 | logging.info('Starting MPIExecEngineSetLauncher: %r' % self.args) | |||
|
457 | return super(MPIExecEngineSetLauncher, self).start(n) | |||
|
458 | ||||
|
459 | def find_args(self): | |||
|
460 | return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \ | |||
|
461 | self.engine_cmd + self.engine_args | |||
|
462 | ||||
|
463 | ||||
|
464 | #----------------------------------------------------------------------------- | |||
|
465 | # SSH launchers | |||
|
466 | #----------------------------------------------------------------------------- | |||
|
467 | ||||
|
468 | # TODO: Get SSH Launcher working again. | |||
|
469 | ||||
|
470 | class SSHLauncher(LocalProcessLauncher): | |||
|
471 | """A minimal launcher for ssh. | |||
|
472 | ||||
|
473 | To be useful this will probably have to be extended to use the ``sshx`` | |||
|
474 | idea for environment variables. There could be other things this needs | |||
|
475 | as well. | |||
|
476 | """ | |||
|
477 | ||||
|
478 | ssh_cmd = List(['ssh'], config=True) | |||
|
479 | ssh_args = List([], config=True) | |||
|
480 | program = List(['date'], config=True) | |||
|
481 | program_args = List([], config=True) | |||
|
482 | hostname = Str('', config=True) | |||
|
483 | user = Str(os.environ.get('USER','username'), config=True) | |||
|
484 | location = Str('') | |||
|
485 | ||||
|
486 | def _hostname_changed(self, name, old, new): | |||
|
487 | self.location = '%s@%s' % (self.user, new) | |||
|
488 | ||||
|
489 | def _user_changed(self, name, old, new): | |||
|
490 | self.location = '%s@%s' % (new, self.hostname) | |||
|
491 | ||||
|
492 | def find_args(self): | |||
|
493 | return self.ssh_cmd + self.ssh_args + [self.location] + \ | |||
|
494 | self.program + self.program_args | |||
|
495 | ||||
|
496 | def start(self, cluster_dir, hostname=None, user=None): | |||
|
497 | if hostname is not None: | |||
|
498 | self.hostname = hostname | |||
|
499 | if user is not None: | |||
|
500 | self.user = user | |||
|
501 | return super(SSHLauncher, self).start() | |||
|
502 | ||||
|
503 | ||||
|
504 | class SSHControllerLauncher(SSHLauncher): | |||
|
505 | ||||
|
506 | program = List(ipcontroller_cmd_argv, config=True) | |||
|
507 | # Command line arguments to ipcontroller. | |||
|
508 | program_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True) | |||
|
509 | ||||
|
510 | ||||
|
511 | class SSHEngineLauncher(SSHLauncher): | |||
|
512 | program = List(ipengine_cmd_argv, config=True) | |||
|
513 | # Command line arguments for ipengine. | |||
|
514 | program_args = List( | |||
|
515 | ['--log-to-file','--log-level', str(logging.ERROR)], config=True | |||
|
516 | ) | |||
|
517 | ||||
|
518 | class SSHEngineSetLauncher(LocalEngineSetLauncher): | |||
|
519 | launcher_class = SSHEngineLauncher | |||
|
520 | ||||
|
521 | ||||
|
522 | #----------------------------------------------------------------------------- | |||
|
523 | # Windows HPC Server 2008 scheduler launchers | |||
|
524 | #----------------------------------------------------------------------------- | |||
|
525 | ||||
|
526 | ||||
|
527 | # # This is only used on Windows. | |||
|
528 | # def find_job_cmd(): | |||
|
529 | # if os.name=='nt': | |||
|
530 | # try: | |||
|
531 | # return find_cmd('job') | |||
|
532 | # except FindCmdError: | |||
|
533 | # return 'job' | |||
|
534 | # else: | |||
|
535 | # return 'job' | |||
|
536 | # | |||
|
537 | # | |||
|
538 | # class WindowsHPCLauncher(BaseLauncher): | |||
|
539 | # | |||
|
540 | # # A regular expression used to get the job id from the output of the | |||
|
541 | # # submit_command. | |||
|
542 | # job_id_regexp = Str(r'\d+', config=True) | |||
|
543 | # # The filename of the instantiated job script. | |||
|
544 | # job_file_name = Unicode(u'ipython_job.xml', config=True) | |||
|
545 | # # The full path to the instantiated job script. This gets made dynamically | |||
|
546 | # # by combining the work_dir with the job_file_name. | |||
|
547 | # job_file = Unicode(u'') | |||
|
548 | # # The hostname of the scheduler to submit the job to | |||
|
549 | # scheduler = Str('', config=True) | |||
|
550 | # job_cmd = Str(find_job_cmd(), config=True) | |||
|
551 | # | |||
|
552 | # def __init__(self, work_dir=u'.', config=None): | |||
|
553 | # super(WindowsHPCLauncher, self).__init__( | |||
|
554 | # work_dir=work_dir, config=config | |||
|
555 | # ) | |||
|
556 | # | |||
|
557 | # @property | |||
|
558 | # def job_file(self): | |||
|
559 | # return os.path.join(self.work_dir, self.job_file_name) | |||
|
560 | # | |||
|
561 | # def write_job_file(self, n): | |||
|
562 | # raise NotImplementedError("Implement write_job_file in a subclass.") | |||
|
563 | # | |||
|
564 | # def find_args(self): | |||
|
565 | # return ['job.exe'] | |||
|
566 | # | |||
|
567 | # def parse_job_id(self, output): | |||
|
568 | # """Take the output of the submit command and return the job id.""" | |||
|
569 | # m = re.search(self.job_id_regexp, output) | |||
|
570 | # if m is not None: | |||
|
571 | # job_id = m.group() | |||
|
572 | # else: | |||
|
573 | # raise LauncherError("Job id couldn't be determined: %s" % output) | |||
|
574 | # self.job_id = job_id | |||
|
575 | # logging.info('Job started with job id: %r' % job_id) | |||
|
576 | # return job_id | |||
|
577 | # | |||
|
578 | # @inlineCallbacks | |||
|
579 | # def start(self, n): | |||
|
580 | # """Start n copies of the process using the Win HPC job scheduler.""" | |||
|
581 | # self.write_job_file(n) | |||
|
582 | # args = [ | |||
|
583 | # 'submit', | |||
|
584 | # '/jobfile:%s' % self.job_file, | |||
|
585 | # '/scheduler:%s' % self.scheduler | |||
|
586 | # ] | |||
|
587 | # logging.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),)) | |||
|
588 | # # Twisted will raise DeprecationWarnings if we try to pass unicode to this | |||
|
589 | # output = yield getProcessOutput(str(self.job_cmd), | |||
|
590 | # [str(a) for a in args], | |||
|
591 | # env=dict((str(k),str(v)) for k,v in os.environ.items()), | |||
|
592 | # path=self.work_dir | |||
|
593 | # ) | |||
|
594 | # job_id = self.parse_job_id(output) | |||
|
595 | # self.notify_start(job_id) | |||
|
596 | # defer.returnValue(job_id) | |||
|
597 | # | |||
|
598 | # @inlineCallbacks | |||
|
599 | # def stop(self): | |||
|
600 | # args = [ | |||
|
601 | # 'cancel', | |||
|
602 | # self.job_id, | |||
|
603 | # '/scheduler:%s' % self.scheduler | |||
|
604 | # ] | |||
|
605 | # logging.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),)) | |||
|
606 | # try: | |||
|
607 | # # Twisted will raise DeprecationWarnings if we try to pass unicode to this | |||
|
608 | # output = yield getProcessOutput(str(self.job_cmd), | |||
|
609 | # [str(a) for a in args], | |||
|
610 | # env=dict((str(k),str(v)) for k,v in os.environ.iteritems()), | |||
|
611 | # path=self.work_dir | |||
|
612 | # ) | |||
|
613 | # except: | |||
|
614 | # output = 'The job already appears to be stoppped: %r' % self.job_id | |||
|
615 | # self.notify_stop(output) # Pass the output of the kill cmd | |||
|
616 | # defer.returnValue(output) | |||
|
617 | # | |||
|
618 | # | |||
|
619 | # class WindowsHPCControllerLauncher(WindowsHPCLauncher): | |||
|
620 | # | |||
|
621 | # job_file_name = Unicode(u'ipcontroller_job.xml', config=True) | |||
|
622 | # extra_args = List([], config=False) | |||
|
623 | # | |||
|
624 | # def write_job_file(self, n): | |||
|
625 | # job = IPControllerJob(config=self.config) | |||
|
626 | # | |||
|
627 | # t = IPControllerTask(config=self.config) | |||
|
628 | # # The tasks work directory is *not* the actual work directory of | |||
|
629 | # # the controller. It is used as the base path for the stdout/stderr | |||
|
630 | # # files that the scheduler redirects to. | |||
|
631 | # t.work_directory = self.cluster_dir | |||
|
632 | # # Add the --cluster-dir and from self.start(). | |||
|
633 | # t.controller_args.extend(self.extra_args) | |||
|
634 | # job.add_task(t) | |||
|
635 | # | |||
|
636 | # logging.info("Writing job description file: %s" % self.job_file) | |||
|
637 | # job.write(self.job_file) | |||
|
638 | # | |||
|
639 | # @property | |||
|
640 | # def job_file(self): | |||
|
641 | # return os.path.join(self.cluster_dir, self.job_file_name) | |||
|
642 | # | |||
|
643 | # def start(self, cluster_dir): | |||
|
644 | # """Start the controller by cluster_dir.""" | |||
|
645 | # self.extra_args = ['--cluster-dir', cluster_dir] | |||
|
646 | # self.cluster_dir = unicode(cluster_dir) | |||
|
647 | # return super(WindowsHPCControllerLauncher, self).start(1) | |||
|
648 | # | |||
|
649 | # | |||
|
650 | # class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): | |||
|
651 | # | |||
|
652 | # job_file_name = Unicode(u'ipengineset_job.xml', config=True) | |||
|
653 | # extra_args = List([], config=False) | |||
|
654 | # | |||
|
655 | # def write_job_file(self, n): | |||
|
656 | # job = IPEngineSetJob(config=self.config) | |||
|
657 | # | |||
|
658 | # for i in range(n): | |||
|
659 | # t = IPEngineTask(config=self.config) | |||
|
660 | # # The tasks work directory is *not* the actual work directory of | |||
|
661 | # # the engine. It is used as the base path for the stdout/stderr | |||
|
662 | # # files that the scheduler redirects to. | |||
|
663 | # t.work_directory = self.cluster_dir | |||
|
664 | # # Add the --cluster-dir and from self.start(). | |||
|
665 | # t.engine_args.extend(self.extra_args) | |||
|
666 | # job.add_task(t) | |||
|
667 | # | |||
|
668 | # logging.info("Writing job description file: %s" % self.job_file) | |||
|
669 | # job.write(self.job_file) | |||
|
670 | # | |||
|
671 | # @property | |||
|
672 | # def job_file(self): | |||
|
673 | # return os.path.join(self.cluster_dir, self.job_file_name) | |||
|
674 | # | |||
|
675 | # def start(self, n, cluster_dir): | |||
|
676 | # """Start the controller by cluster_dir.""" | |||
|
677 | # self.extra_args = ['--cluster-dir', cluster_dir] | |||
|
678 | # self.cluster_dir = unicode(cluster_dir) | |||
|
679 | # return super(WindowsHPCEngineSetLauncher, self).start(n) | |||
|
680 | # | |||
|
681 | # | |||
|
682 | # #----------------------------------------------------------------------------- | |||
|
683 | # # Batch (PBS) system launchers | |||
|
684 | # #----------------------------------------------------------------------------- | |||
|
685 | # | |||
|
686 | # # TODO: Get PBS launcher working again. | |||
|
687 | # | |||
|
688 | # class BatchSystemLauncher(BaseLauncher): | |||
|
689 | # """Launch an external process using a batch system. | |||
|
690 | # | |||
|
691 | # This class is designed to work with UNIX batch systems like PBS, LSF, | |||
|
692 | # GridEngine, etc. The overall model is that there are different commands | |||
|
693 | # like qsub, qdel, etc. that handle the starting and stopping of the process. | |||
|
694 | # | |||
|
695 | # This class also has the notion of a batch script. The ``batch_template`` | |||
|
696 | # attribute can be set to a string that is a template for the batch script. | |||
|
697 | # This template is instantiated using Itpl. Thus the template can use | |||
|
698 | # ${n} fot the number of instances. Subclasses can add additional variables | |||
|
699 | # to the template dict. | |||
|
700 | # """ | |||
|
701 | # | |||
|
702 | # # Subclasses must fill these in. See PBSEngineSet | |||
|
703 | # # The name of the command line program used to submit jobs. | |||
|
704 | # submit_command = Str('', config=True) | |||
|
705 | # # The name of the command line program used to delete jobs. | |||
|
706 | # delete_command = Str('', config=True) | |||
|
707 | # # A regular expression used to get the job id from the output of the | |||
|
708 | # # submit_command. | |||
|
709 | # job_id_regexp = Str('', config=True) | |||
|
710 | # # The string that is the batch script template itself. | |||
|
711 | # batch_template = Str('', config=True) | |||
|
712 | # # The filename of the instantiated batch script. | |||
|
713 | # batch_file_name = Unicode(u'batch_script', config=True) | |||
|
714 | # # The full path to the instantiated batch script. | |||
|
715 | # batch_file = Unicode(u'') | |||
|
716 | # | |||
|
717 | # def __init__(self, work_dir=u'.', config=None): | |||
|
718 | # super(BatchSystemLauncher, self).__init__( | |||
|
719 | # work_dir=work_dir, config=config | |||
|
720 | # ) | |||
|
721 | # self.batch_file = os.path.join(self.work_dir, self.batch_file_name) | |||
|
722 | # self.context = {} | |||
|
723 | # | |||
|
724 | # def parse_job_id(self, output): | |||
|
725 | # """Take the output of the submit command and return the job id.""" | |||
|
726 | # m = re.match(self.job_id_regexp, output) | |||
|
727 | # if m is not None: | |||
|
728 | # job_id = m.group() | |||
|
729 | # else: | |||
|
730 | # raise LauncherError("Job id couldn't be determined: %s" % output) | |||
|
731 | # self.job_id = job_id | |||
|
732 | # logging.info('Job started with job id: %r' % job_id) | |||
|
733 | # return job_id | |||
|
734 | # | |||
|
735 | # def write_batch_script(self, n): | |||
|
736 | # """Instantiate and write the batch script to the work_dir.""" | |||
|
737 | # self.context['n'] = n | |||
|
738 | # script_as_string = Itpl.itplns(self.batch_template, self.context) | |||
|
739 | # logging.info('Writing instantiated batch script: %s' % self.batch_file) | |||
|
740 | # f = open(self.batch_file, 'w') | |||
|
741 | # f.write(script_as_string) | |||
|
742 | # f.close() | |||
|
743 | # | |||
|
744 | # @inlineCallbacks | |||
|
745 | # def start(self, n): | |||
|
746 | # """Start n copies of the process using a batch system.""" | |||
|
747 | # self.write_batch_script(n) | |||
|
748 | # output = yield getProcessOutput(self.submit_command, | |||
|
749 | # [self.batch_file], env=os.environ) | |||
|
750 | # job_id = self.parse_job_id(output) | |||
|
751 | # self.notify_start(job_id) | |||
|
752 | # defer.returnValue(job_id) | |||
|
753 | # | |||
|
754 | # @inlineCallbacks | |||
|
755 | # def stop(self): | |||
|
756 | # output = yield getProcessOutput(self.delete_command, | |||
|
757 | # [self.job_id], env=os.environ | |||
|
758 | # ) | |||
|
759 | # self.notify_stop(output) # Pass the output of the kill cmd | |||
|
760 | # defer.returnValue(output) | |||
|
761 | # | |||
|
762 | # | |||
|
763 | # class PBSLauncher(BatchSystemLauncher): | |||
|
764 | # """A BatchSystemLauncher subclass for PBS.""" | |||
|
765 | # | |||
|
766 | # submit_command = Str('qsub', config=True) | |||
|
767 | # delete_command = Str('qdel', config=True) | |||
|
768 | # job_id_regexp = Str(r'\d+', config=True) | |||
|
769 | # batch_template = Str('', config=True) | |||
|
770 | # batch_file_name = Unicode(u'pbs_batch_script', config=True) | |||
|
771 | # batch_file = Unicode(u'') | |||
|
772 | # | |||
|
773 | # | |||
|
774 | # class PBSControllerLauncher(PBSLauncher): | |||
|
775 | # """Launch a controller using PBS.""" | |||
|
776 | # | |||
|
777 | # batch_file_name = Unicode(u'pbs_batch_script_controller', config=True) | |||
|
778 | # | |||
|
779 | # def start(self, cluster_dir): | |||
|
780 | # """Start the controller by profile or cluster_dir.""" | |||
|
781 | # # Here we save profile and cluster_dir in the context so they | |||
|
782 | # # can be used in the batch script template as ${profile} and | |||
|
783 | # # ${cluster_dir} | |||
|
784 | # self.context['cluster_dir'] = cluster_dir | |||
|
785 | # self.cluster_dir = unicode(cluster_dir) | |||
|
786 | # logging.info("Starting PBSControllerLauncher: %r" % self.args) | |||
|
787 | # return super(PBSControllerLauncher, self).start(1) | |||
|
788 | # | |||
|
789 | # | |||
|
790 | # class PBSEngineSetLauncher(PBSLauncher): | |||
|
791 | # | |||
|
792 | # batch_file_name = Unicode(u'pbs_batch_script_engines', config=True) | |||
|
793 | # | |||
|
794 | # def start(self, n, cluster_dir): | |||
|
795 | # """Start n engines by profile or cluster_dir.""" | |||
|
796 | # self.program_args.extend(['--cluster-dir', cluster_dir]) | |||
|
797 | # self.cluster_dir = unicode(cluster_dir) | |||
|
798 | # logging.info('Starting PBSEngineSetLauncher: %r' % self.args) | |||
|
799 | # return super(PBSEngineSetLauncher, self).start(n) | |||
|
800 | ||||
|
801 | ||||
|
802 | #----------------------------------------------------------------------------- | |||
|
803 | # A launcher for ipcluster itself! | |||
|
804 | #----------------------------------------------------------------------------- | |||
|
805 | ||||
|
806 | ||||
|
807 | class IPClusterLauncher(LocalProcessLauncher): | |||
|
808 | """Launch the ipcluster program in an external process.""" | |||
|
809 | ||||
|
810 | ipcluster_cmd = List(ipcluster_cmd_argv, config=True) | |||
|
811 | # Command line arguments to pass to ipcluster. | |||
|
812 | ipcluster_args = List( | |||
|
813 | ['--clean-logs', '--log-to-file', '--log-level', str(logging.ERROR)], config=True) | |||
|
814 | ipcluster_subcommand = Str('start') | |||
|
815 | ipcluster_n = Int(2) | |||
|
816 | ||||
|
817 | def find_args(self): | |||
|
818 | return self.ipcluster_cmd + [self.ipcluster_subcommand] + \ | |||
|
819 | ['-n', repr(self.ipcluster_n)] + self.ipcluster_args | |||
|
820 | ||||
|
821 | def start(self): | |||
|
822 | logging.info("Starting ipcluster: %r" % self.args) | |||
|
823 | return super(IPClusterLauncher, self).start() | |||
|
824 |
@@ -90,14 +90,6 b' def defaultblock(f, self, *args, **kwargs):' | |||||
90 | # Classes |
|
90 | # Classes | |
91 | #-------------------------------------------------------------------------- |
|
91 | #-------------------------------------------------------------------------- | |
92 |
|
92 | |||
93 | class ResultDict(dict): |
|
|||
94 | """A subclass of dict that raises errors if it has them.""" |
|
|||
95 | def __getitem__(self, key): |
|
|||
96 | res = dict.__getitem__(self, key) |
|
|||
97 | if isinstance(res, error.KernelError): |
|
|||
98 | raise res |
|
|||
99 | return res |
|
|||
100 |
|
||||
101 | class Metadata(dict): |
|
93 | class Metadata(dict): | |
102 | """Subclass of dict for initializing metadata values.""" |
|
94 | """Subclass of dict for initializing metadata values.""" | |
103 | def __init__(self, *args, **kwargs): |
|
95 | def __init__(self, *args, **kwargs): |
@@ -35,18 +35,6 b' from IPython.utils.path import (' | |||||
35 | from IPython.utils.traitlets import Unicode |
|
35 | from IPython.utils.traitlets import Unicode | |
36 |
|
36 | |||
37 | #----------------------------------------------------------------------------- |
|
37 | #----------------------------------------------------------------------------- | |
38 | # Warnings control |
|
|||
39 | #----------------------------------------------------------------------------- |
|
|||
40 | # Twisted generates annoying warnings with Python 2.6, as will do other code |
|
|||
41 | # that imports 'sets' as of today |
|
|||
42 | warnings.filterwarnings('ignore', 'the sets module is deprecated', |
|
|||
43 | DeprecationWarning ) |
|
|||
44 |
|
||||
45 | # This one also comes from Twisted |
|
|||
46 | warnings.filterwarnings('ignore', 'the sha module is deprecated', |
|
|||
47 | DeprecationWarning) |
|
|||
48 |
|
||||
49 | #----------------------------------------------------------------------------- |
|
|||
50 | # Module errors |
|
38 | # Module errors | |
51 | #----------------------------------------------------------------------------- |
|
39 | #----------------------------------------------------------------------------- | |
52 |
|
40 |
@@ -1,7 +1,6 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | """The IPython Controller with 0MQ |
|
2 | """The IPython Controller with 0MQ | |
3 | This is the master object that handles connections from engines and clients, |
|
3 | This is a collection of one Hub and several Schedulers. | |
4 | and monitors traffic through the various queues. |
|
|||
5 | """ |
|
4 | """ | |
6 | #----------------------------------------------------------------------------- |
|
5 | #----------------------------------------------------------------------------- | |
7 | # Copyright (C) 2010 The IPython Development Team |
|
6 | # Copyright (C) 2010 The IPython Development Team | |
@@ -15,75 +14,25 b' and monitors traffic through the various queues.' | |||||
15 | #----------------------------------------------------------------------------- |
|
14 | #----------------------------------------------------------------------------- | |
16 | from __future__ import print_function |
|
15 | from __future__ import print_function | |
17 |
|
16 | |||
18 | import os |
|
|||
19 | import sys |
|
|||
20 | import time |
|
|||
21 | import logging |
|
17 | import logging | |
22 | from multiprocessing import Process |
|
18 | from multiprocessing import Process | |
23 |
|
19 | |||
24 | import zmq |
|
20 | import zmq | |
25 | from zmq.eventloop import ioloop |
|
|||
26 | from zmq.eventloop.zmqstream import ZMQStream |
|
|||
27 | # from zmq.devices import ProcessMonitoredQueue |
|
|||
28 |
|
21 | |||
29 | # internal: |
|
22 | # internal: | |
30 | from IPython.utils.importstring import import_item |
|
23 | from IPython.utils.importstring import import_item | |
31 | from IPython.utils.traitlets import Int, Str, Instance, List, Bool |
|
24 | from IPython.utils.traitlets import Int, Str, Instance, List, Bool | |
32 | from IPython.zmq.entry_point import bind_port |
|
|||
33 |
|
25 | |||
34 | from entry_point import (make_base_argument_parser, select_random_ports, split_ports, |
|
26 | from entry_point import signal_children | |
35 | connect_logger, parse_url, signal_children, generate_exec_key, |
|
|||
36 | local_logger) |
|
|||
37 |
|
27 | |||
38 |
|
28 | |||
39 | import streamsession as session |
|
|||
40 | import heartmonitor |
|
|||
41 | from scheduler import launch_scheduler |
|
29 | from scheduler import launch_scheduler | |
42 | from hub import Hub, HubFactory |
|
30 | from hub import Hub, HubFactory | |
43 |
|
31 | |||
44 | from dictdb import DictDB |
|
32 | #----------------------------------------------------------------------------- | |
45 | try: |
|
33 | # Configurable | |
46 | import pymongo |
|
34 | #----------------------------------------------------------------------------- | |
47 | except ImportError: |
|
|||
48 | MongoDB=None |
|
|||
49 | else: |
|
|||
50 | from mongodb import MongoDB |
|
|||
51 |
|
||||
52 | #------------------------------------------------------------------------- |
|
|||
53 | # Entry Point |
|
|||
54 | #------------------------------------------------------------------------- |
|
|||
55 |
|
||||
56 | def make_argument_parser(): |
|
|||
57 | """Make an argument parser""" |
|
|||
58 | parser = make_base_argument_parser() |
|
|||
59 |
|
||||
60 | parser.add_argument('--client', type=int, metavar='PORT', default=0, |
|
|||
61 | help='set the XREP port for clients [default: random]') |
|
|||
62 | parser.add_argument('--notice', type=int, metavar='PORT', default=0, |
|
|||
63 | help='set the PUB socket for registration notification [default: random]') |
|
|||
64 | parser.add_argument('--hb', type=str, metavar='PORTS', |
|
|||
65 | help='set the 2 ports for heartbeats [default: random]') |
|
|||
66 | parser.add_argument('--ping', type=int, default=100, |
|
|||
67 | help='set the heartbeat period in ms [default: 100]') |
|
|||
68 | parser.add_argument('--monitor', type=int, metavar='PORT', default=0, |
|
|||
69 | help='set the SUB port for queue monitoring [default: random]') |
|
|||
70 | parser.add_argument('--mux', type=str, metavar='PORTS', |
|
|||
71 | help='set the XREP ports for the MUX queue [default: random]') |
|
|||
72 | parser.add_argument('--task', type=str, metavar='PORTS', |
|
|||
73 | help='set the XREP/XREQ ports for the task queue [default: random]') |
|
|||
74 | parser.add_argument('--control', type=str, metavar='PORTS', |
|
|||
75 | help='set the XREP ports for the control queue [default: random]') |
|
|||
76 | parser.add_argument('--iopub', type=str, metavar='PORTS', |
|
|||
77 | help='set the PUB/SUB ports for the iopub relay [default: random]') |
|
|||
78 | parser.add_argument('--scheduler', type=str, default='lru', |
|
|||
79 | choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], |
|
|||
80 | help='select the task scheduler [default: Python LRU]') |
|
|||
81 | parser.add_argument('--mongodb', action='store_true', |
|
|||
82 | help='Use MongoDB task storage [default: in-memory]') |
|
|||
83 | parser.add_argument('--session', type=str, default=None, |
|
|||
84 | help='Manually specify the session id.') |
|
|||
85 |
|
35 | |||
86 | return parser |
|
|||
87 |
|
36 | |||
88 | class ControllerFactory(HubFactory): |
|
37 | class ControllerFactory(HubFactory): | |
89 | """Configurable for setting up a Hub and Schedulers.""" |
|
38 | """Configurable for setting up a Hub and Schedulers.""" | |
@@ -159,187 +108,3 b' class ControllerFactory(HubFactory):' | |||||
159 | q.daemon=True |
|
108 | q.daemon=True | |
160 | children.append(q) |
|
109 | children.append(q) | |
161 |
|
110 | |||
162 |
|
||||
163 | def main(argv=None): |
|
|||
164 | """DO NOT USE ME ANYMORE""" |
|
|||
165 |
|
||||
166 | parser = make_argument_parser() |
|
|||
167 |
|
||||
168 | args = parser.parse_args(argv) |
|
|||
169 | parse_url(args) |
|
|||
170 |
|
||||
171 | iface="%s://%s"%(args.transport,args.ip)+':%i' |
|
|||
172 |
|
||||
173 | random_ports = 0 |
|
|||
174 | if args.hb: |
|
|||
175 | hb = split_ports(args.hb, 2) |
|
|||
176 | else: |
|
|||
177 | hb = select_random_ports(2) |
|
|||
178 | if args.mux: |
|
|||
179 | mux = split_ports(args.mux, 2) |
|
|||
180 | else: |
|
|||
181 | mux = None |
|
|||
182 | random_ports += 2 |
|
|||
183 | if args.iopub: |
|
|||
184 | iopub = split_ports(args.iopub, 2) |
|
|||
185 | else: |
|
|||
186 | iopub = None |
|
|||
187 | random_ports += 2 |
|
|||
188 | if args.task: |
|
|||
189 | task = split_ports(args.task, 2) |
|
|||
190 | else: |
|
|||
191 | task = None |
|
|||
192 | random_ports += 2 |
|
|||
193 | if args.control: |
|
|||
194 | control = split_ports(args.control, 2) |
|
|||
195 | else: |
|
|||
196 | control = None |
|
|||
197 | random_ports += 2 |
|
|||
198 |
|
||||
199 | ctx = zmq.Context() |
|
|||
200 | loop = ioloop.IOLoop.instance() |
|
|||
201 |
|
||||
202 |
|
||||
203 | # Registrar socket |
|
|||
204 | reg = ZMQStream(ctx.socket(zmq.XREP), loop) |
|
|||
205 | regport = bind_port(reg, args.ip, args.regport) |
|
|||
206 |
|
||||
207 | ### Engine connections ### |
|
|||
208 |
|
||||
209 | # heartbeat |
|
|||
210 | hpub = ctx.socket(zmq.PUB) |
|
|||
211 | bind_port(hpub, args.ip, hb[0]) |
|
|||
212 | hrep = ctx.socket(zmq.XREP) |
|
|||
213 | bind_port(hrep, args.ip, hb[1]) |
|
|||
214 |
|
||||
215 | hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping) |
|
|||
216 | hmon.start() |
|
|||
217 |
|
||||
218 | ### Client connections ### |
|
|||
219 | # Clientele socket |
|
|||
220 | c = ZMQStream(ctx.socket(zmq.XREP), loop) |
|
|||
221 | cport = bind_port(c, args.ip, args.client) |
|
|||
222 | # Notifier socket |
|
|||
223 | n = ZMQStream(ctx.socket(zmq.PUB), loop) |
|
|||
224 | nport = bind_port(n, args.ip, args.notice) |
|
|||
225 |
|
||||
226 | ### Key File ### |
|
|||
227 | if args.execkey and not os.path.isfile(args.execkey): |
|
|||
228 | generate_exec_key(args.execkey) |
|
|||
229 |
|
||||
230 | thesession = session.StreamSession(username=args.ident or "controller", |
|
|||
231 | keyfile=args.execkey, session=args.session) |
|
|||
232 |
|
||||
233 | ### build and launch the queues ### |
|
|||
234 |
|
||||
235 | # monitor socket |
|
|||
236 | sub = ctx.socket(zmq.SUB) |
|
|||
237 | sub.setsockopt(zmq.SUBSCRIBE, "") |
|
|||
238 | monport = bind_port(sub, args.ip, args.monitor) |
|
|||
239 | sub = ZMQStream(sub, loop) |
|
|||
240 |
|
||||
241 | ports = select_random_ports(random_ports) |
|
|||
242 | children = [] |
|
|||
243 |
|
||||
244 | # IOPub relay (in a Process) |
|
|||
245 | if not iopub: |
|
|||
246 | iopub = (ports.pop(),ports.pop()) |
|
|||
247 | q = ProcessMonitoredQueue(zmq.SUB, zmq.PUB, zmq.PUB, 'iopub', 'N/A') |
|
|||
248 | q.bind_in(iface%iopub[1]) |
|
|||
249 | q.bind_out(iface%iopub[0]) |
|
|||
250 | q.setsockopt_in(zmq.SUBSCRIBE, '') |
|
|||
251 | q.connect_mon(iface%monport) |
|
|||
252 | q.daemon=True |
|
|||
253 | q.start() |
|
|||
254 | children.append(q.launcher) |
|
|||
255 |
|
||||
256 | # Multiplexer Queue (in a Process) |
|
|||
257 | if not mux: |
|
|||
258 | mux = (ports.pop(),ports.pop()) |
|
|||
259 | q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') |
|
|||
260 | q.bind_in(iface%mux[0]) |
|
|||
261 | q.bind_out(iface%mux[1]) |
|
|||
262 | q.connect_mon(iface%monport) |
|
|||
263 | q.daemon=True |
|
|||
264 | q.start() |
|
|||
265 | children.append(q.launcher) |
|
|||
266 |
|
||||
267 | # Control Queue (in a Process) |
|
|||
268 | if not control: |
|
|||
269 | control = (ports.pop(),ports.pop()) |
|
|||
270 | q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') |
|
|||
271 | q.bind_in(iface%control[0]) |
|
|||
272 | q.bind_out(iface%control[1]) |
|
|||
273 | q.connect_mon(iface%monport) |
|
|||
274 | q.daemon=True |
|
|||
275 | q.start() |
|
|||
276 | children.append(q.launcher) |
|
|||
277 | # Task Queue (in a Process) |
|
|||
278 | if not task: |
|
|||
279 | task = (ports.pop(),ports.pop()) |
|
|||
280 | if args.scheduler == 'pure': |
|
|||
281 | q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') |
|
|||
282 | q.bind_in(iface%task[0]) |
|
|||
283 | q.bind_out(iface%task[1]) |
|
|||
284 | q.connect_mon(iface%monport) |
|
|||
285 | q.daemon=True |
|
|||
286 | q.start() |
|
|||
287 | children.append(q.launcher) |
|
|||
288 | else: |
|
|||
289 | log_addr = iface%args.logport if args.logport else None |
|
|||
290 | sargs = (iface%task[0], iface%task[1], iface%monport, iface%nport, |
|
|||
291 | log_addr, args.loglevel, args.scheduler) |
|
|||
292 | print (sargs) |
|
|||
293 | q = Process(target=launch_scheduler, args=sargs) |
|
|||
294 | q.daemon=True |
|
|||
295 | q.start() |
|
|||
296 | children.append(q) |
|
|||
297 |
|
||||
298 | if args.mongodb: |
|
|||
299 | from mongodb import MongoDB |
|
|||
300 | db = MongoDB(thesession.session) |
|
|||
301 | else: |
|
|||
302 | db = DictDB() |
|
|||
303 | time.sleep(.25) |
|
|||
304 |
|
||||
305 | # build connection dicts |
|
|||
306 | engine_addrs = { |
|
|||
307 | 'control' : iface%control[1], |
|
|||
308 | 'mux': iface%mux[1], |
|
|||
309 | 'heartbeat': (iface%hb[0], iface%hb[1]), |
|
|||
310 | 'task' : iface%task[1], |
|
|||
311 | 'iopub' : iface%iopub[1], |
|
|||
312 | 'monitor' : iface%monport, |
|
|||
313 | } |
|
|||
314 |
|
||||
315 | client_addrs = { |
|
|||
316 | 'control' : iface%control[0], |
|
|||
317 | 'query': iface%cport, |
|
|||
318 | 'mux': iface%mux[0], |
|
|||
319 | 'task' : iface%task[0], |
|
|||
320 | 'iopub' : iface%iopub[0], |
|
|||
321 | 'notification': iface%nport |
|
|||
322 | } |
|
|||
323 |
|
||||
324 | # setup logging |
|
|||
325 | if args.logport: |
|
|||
326 | connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel) |
|
|||
327 | else: |
|
|||
328 | local_logger(args.loglevel) |
|
|||
329 |
|
||||
330 | # register relay of signals to the children |
|
|||
331 | signal_children(children) |
|
|||
332 | hub = Hub(loop=loop, session=thesession, monitor=sub, heartmonitor=hmon, |
|
|||
333 | registrar=reg, clientele=c, notifier=n, db=db, |
|
|||
334 | engine_addrs=engine_addrs, client_addrs=client_addrs) |
|
|||
335 |
|
||||
336 | dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) |
|
|||
337 | dc.start() |
|
|||
338 | try: |
|
|||
339 | loop.start() |
|
|||
340 | except KeyboardInterrupt: |
|
|||
341 | print ("interrupted, exiting...", file=sys.__stderr__) |
|
|||
342 |
|
||||
343 |
|
||||
344 | if __name__ == '__main__': |
|
|||
345 | main() |
|
@@ -6,7 +6,6 b" connected to the Controller's queue(s)." | |||||
6 | from __future__ import print_function |
|
6 | from __future__ import print_function | |
7 | import sys |
|
7 | import sys | |
8 | import time |
|
8 | import time | |
9 | import traceback |
|
|||
10 | import uuid |
|
9 | import uuid | |
11 | import logging |
|
10 | import logging | |
12 | from pprint import pprint |
|
11 | from pprint import pprint | |
@@ -21,12 +20,9 b' from IPython.utils.traitlets import Instance, Str, Dict, Int, Type' | |||||
21 |
|
20 | |||
22 | from factory import RegistrationFactory |
|
21 | from factory import RegistrationFactory | |
23 |
|
22 | |||
24 |
from streamsession import Message |
|
23 | from streamsession import Message | |
25 |
from streamkernel import Kernel |
|
24 | from streamkernel import Kernel | |
26 | import heartmonitor |
|
25 | import heartmonitor | |
27 | from entry_point import (make_base_argument_parser, connect_engine_logger, parse_url, |
|
|||
28 | local_logger) |
|
|||
29 | # import taskthread |
|
|||
30 |
|
26 | |||
31 | def printer(*msg): |
|
27 | def printer(*msg): | |
32 | # print (logging.handlers, file=sys.__stdout__) |
|
28 | # print (logging.handlers, file=sys.__stdout__) | |
@@ -107,16 +103,15 b' class EngineFactory(RegistrationFactory):' | |||||
107 | # print (hb_addrs) |
|
103 | # print (hb_addrs) | |
108 |
|
104 | |||
109 | # # Redirect input streams and set a display hook. |
|
105 | # # Redirect input streams and set a display hook. | |
110 |
|
|
106 | if self.out_stream_factory: | |
111 |
|
|
107 | sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout') | |
112 |
|
|
108 | sys.stdout.topic = 'engine.%i.stdout'%self.id | |
113 |
|
|
109 | sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr') | |
114 |
|
|
110 | sys.stderr.topic = 'engine.%i.stderr'%self.id | |
115 |
|
|
111 | if self.display_hook_factory: | |
116 |
|
|
112 | sys.displayhook = self.display_hook_factory(self.session, iopub_stream) | |
117 |
|
|
113 | sys.displayhook.topic = 'engine.%i.pyout'%self.id | |
118 |
|
114 | |||
119 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() |
|
|||
120 | self.kernel = Kernel(int_id=self.id, ident=self.ident, session=self.session, |
|
115 | self.kernel = Kernel(int_id=self.id, ident=self.ident, session=self.session, | |
121 | control_stream=control_stream, |
|
116 | control_stream=control_stream, | |
122 | shell_streams=shell_streams, iopub_stream=iopub_stream, loop=loop, |
|
117 | shell_streams=shell_streams, iopub_stream=iopub_stream, loop=loop, | |
@@ -124,6 +119,7 b' class EngineFactory(RegistrationFactory):' | |||||
124 | self.kernel.start() |
|
119 | self.kernel.start() | |
125 |
|
120 | |||
126 | heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) |
|
121 | heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) | |
|
122 | # ioloop.DelayedCallback(heart.start, 1000, self.loop).start() | |||
127 | heart.start() |
|
123 | heart.start() | |
128 |
|
124 | |||
129 |
|
125 | |||
@@ -143,48 +139,3 b' class EngineFactory(RegistrationFactory):' | |||||
143 | dc = ioloop.DelayedCallback(self.register, 0, self.loop) |
|
139 | dc = ioloop.DelayedCallback(self.register, 0, self.loop) | |
144 | dc.start() |
|
140 | dc.start() | |
145 |
|
141 | |||
146 |
|
||||
147 |
|
||||
148 | def main(argv=None, user_ns=None): |
|
|||
149 | """DO NOT USE ME ANYMORE""" |
|
|||
150 | parser = make_base_argument_parser() |
|
|||
151 |
|
||||
152 | args = parser.parse_args(argv) |
|
|||
153 |
|
||||
154 | parse_url(args) |
|
|||
155 |
|
||||
156 | iface="%s://%s"%(args.transport,args.ip)+':%i' |
|
|||
157 |
|
||||
158 | loop = ioloop.IOLoop.instance() |
|
|||
159 | session = StreamSession(keyfile=args.execkey) |
|
|||
160 | # print (session.key) |
|
|||
161 | ctx = zmq.Context() |
|
|||
162 |
|
||||
163 | # setup logging |
|
|||
164 |
|
||||
165 | reg_conn = iface % args.regport |
|
|||
166 | print (reg_conn, file=sys.__stdout__) |
|
|||
167 | print ("Starting the engine...", file=sys.__stderr__) |
|
|||
168 |
|
||||
169 | reg = ctx.socket(zmq.PAIR) |
|
|||
170 | reg.connect(reg_conn) |
|
|||
171 | reg = zmqstream.ZMQStream(reg, loop) |
|
|||
172 |
|
||||
173 | e = Engine(context=ctx, loop=loop, session=session, registrar=reg, |
|
|||
174 | ident=args.ident or '', user_ns=user_ns) |
|
|||
175 | if args.logport: |
|
|||
176 | print ("connecting logger to %s"%(iface%args.logport), file=sys.__stdout__) |
|
|||
177 | connect_engine_logger(ctx, iface%args.logport, e, loglevel=args.loglevel) |
|
|||
178 | else: |
|
|||
179 | local_logger(args.loglevel) |
|
|||
180 |
|
||||
181 | dc = ioloop.DelayedCallback(e.start, 0, loop) |
|
|||
182 | dc.start() |
|
|||
183 | try: |
|
|||
184 | loop.start() |
|
|||
185 | except KeyboardInterrupt: |
|
|||
186 | print ("interrupted, exiting...", file=sys.__stderr__) |
|
|||
187 |
|
||||
188 | # Execution as a script |
|
|||
189 | if __name__ == '__main__': |
|
|||
190 | main() |
|
@@ -28,15 +28,6 b' from IPython.core.ultratb import FormattedTB' | |||||
28 | from IPython.external.argparse import ArgumentParser |
|
28 | from IPython.external.argparse import ArgumentParser | |
29 | from IPython.zmq.log import EnginePUBHandler |
|
29 | from IPython.zmq.log import EnginePUBHandler | |
30 |
|
30 | |||
31 | def split_ports(s, n): |
|
|||
32 | """Parser helper for multiport strings""" |
|
|||
33 | if not s: |
|
|||
34 | return tuple([0]*n) |
|
|||
35 | ports = map(int, s.split(',')) |
|
|||
36 | if len(ports) != n: |
|
|||
37 | raise ValueError |
|
|||
38 | return ports |
|
|||
39 |
|
||||
40 | _random_ports = set() |
|
31 | _random_ports = set() | |
41 |
|
32 | |||
42 | def select_random_ports(n): |
|
33 | def select_random_ports(n): | |
@@ -57,18 +48,6 b' def select_random_ports(n):' | |||||
57 | _random_ports.add(port) |
|
48 | _random_ports.add(port) | |
58 | return ports |
|
49 | return ports | |
59 |
|
50 | |||
60 | def parse_url(args): |
|
|||
61 | """Ensure args.url contains full transport://interface:port""" |
|
|||
62 | if args.url: |
|
|||
63 | iface = args.url.split('://',1) |
|
|||
64 | if len(args) == 2: |
|
|||
65 | args.transport,iface = iface |
|
|||
66 | iface = iface.split(':') |
|
|||
67 | args.ip = iface[0] |
|
|||
68 | if iface[1]: |
|
|||
69 | args.regport = iface[1] |
|
|||
70 | args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport) |
|
|||
71 |
|
||||
72 | def signal_children(children): |
|
51 | def signal_children(children): | |
73 | """Relay interupt/term signals to children, for more solid process cleanup.""" |
|
52 | """Relay interupt/term signals to children, for more solid process cleanup.""" | |
74 | def terminate_children(sig, frame): |
|
53 | def terminate_children(sig, frame): | |
@@ -92,34 +71,6 b' def generate_exec_key(keyfile):' | |||||
92 | os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) |
|
71 | os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) | |
93 |
|
72 | |||
94 |
|
73 | |||
95 | def make_base_argument_parser(): |
|
|||
96 | """ Creates an ArgumentParser for the generic arguments supported by all |
|
|||
97 | ipcluster entry points. |
|
|||
98 | """ |
|
|||
99 |
|
||||
100 | parser = ArgumentParser() |
|
|||
101 | parser.add_argument('--ip', type=str, default='127.0.0.1', |
|
|||
102 | help='set the controller\'s IP address [default: local]') |
|
|||
103 | parser.add_argument('--transport', type=str, default='tcp', |
|
|||
104 | help='set the transport to use [default: tcp]') |
|
|||
105 | parser.add_argument('--regport', type=int, metavar='PORT', default=10101, |
|
|||
106 | help='set the XREP port for registration [default: 10101]') |
|
|||
107 | parser.add_argument('--logport', type=int, metavar='PORT', default=0, |
|
|||
108 | help='set the PUB port for remote logging [default: log to stdout]') |
|
|||
109 | parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.INFO, |
|
|||
110 | help='set the log level [default: INFO]') |
|
|||
111 | parser.add_argument('--ident', type=str, |
|
|||
112 | help='set the ZMQ identity [default: random]') |
|
|||
113 | parser.add_argument('--packer', type=str, default='json', |
|
|||
114 | choices=['json','pickle'], |
|
|||
115 | help='set the message format method [default: json]') |
|
|||
116 | parser.add_argument('--url', type=str, |
|
|||
117 | help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101') |
|
|||
118 | parser.add_argument('--execkey', type=str, |
|
|||
119 | help="File containing key for authenticating requests.") |
|
|||
120 |
|
||||
121 | return parser |
|
|||
122 |
|
||||
123 | def integer_loglevel(loglevel): |
|
74 | def integer_loglevel(loglevel): | |
124 | try: |
|
75 | try: | |
125 | loglevel = int(loglevel) |
|
76 | loglevel = int(loglevel) |
@@ -158,26 +158,26 b' class HubFactory(RegistrationFactory):' | |||||
158 | subconstructors = List() |
|
158 | subconstructors = List() | |
159 | _constructed = Bool(False) |
|
159 | _constructed = Bool(False) | |
160 |
|
160 | |||
|
161 | def _ip_changed(self, name, old, new): | |||
|
162 | self.engine_ip = new | |||
|
163 | self.client_ip = new | |||
|
164 | self.monitor_ip = new | |||
|
165 | self._update_monitor_url() | |||
|
166 | ||||
161 | def _update_monitor_url(self): |
|
167 | def _update_monitor_url(self): | |
162 | self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port) |
|
168 | self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port) | |
163 |
|
169 | |||
164 | def _sync_ips(self): |
|
170 | def _transport_changed(self, name, old, new): | |
165 |
self.engine_ |
|
171 | self.engine_transport = new | |
166 |
self.client_ |
|
172 | self.client_transport = new | |
167 |
self.monitor_ |
|
173 | self.monitor_transport = new | |
168 | self._update_monitor_url() |
|
|||
169 |
|
||||
170 | def _sync_transports(self): |
|
|||
171 | self.engine_transport = self.transport |
|
|||
172 | self.client_transport = self.transport |
|
|||
173 | self.monitor_transport = self.transport |
|
|||
174 | self._update_monitor_url() |
|
174 | self._update_monitor_url() | |
175 |
|
175 | |||
176 | def __init__(self, **kwargs): |
|
176 | def __init__(self, **kwargs): | |
177 | super(HubFactory, self).__init__(**kwargs) |
|
177 | super(HubFactory, self).__init__(**kwargs) | |
178 | self._update_monitor_url() |
|
178 | self._update_monitor_url() | |
179 | self.on_trait_change(self._sync_ips, 'ip') |
|
179 | # self.on_trait_change(self._sync_ips, 'ip') | |
180 | self.on_trait_change(self._sync_transports, 'transport') |
|
180 | # self.on_trait_change(self._sync_transports, 'transport') | |
181 | self.subconstructors.append(self.construct_hub) |
|
181 | self.subconstructors.append(self.construct_hub) | |
182 |
|
182 | |||
183 |
|
183 | |||
@@ -334,45 +334,11 b' class Hub(HasTraits):' | |||||
334 | """ |
|
334 | """ | |
335 |
|
335 | |||
336 | super(Hub, self).__init__(**kwargs) |
|
336 | super(Hub, self).__init__(**kwargs) | |
337 | self.ids = set() |
|
|||
338 | self.pending = set() |
|
|||
339 | # self.keytable={} |
|
|||
340 | # self.incoming_registrations={} |
|
|||
341 | # self.engines = {} |
|
|||
342 | # self.by_ident = {} |
|
|||
343 | # self.clients = {} |
|
|||
344 | # self.hearts = {} |
|
|||
345 | # self.mia = set() |
|
|||
346 | self.registration_timeout = max(5000, 2*self.heartmonitor.period) |
|
337 | self.registration_timeout = max(5000, 2*self.heartmonitor.period) | |
347 | # this is the stuff that will move to DB: |
|
|||
348 | # self.pending = set() # pending messages, keyed by msg_id |
|
|||
349 | # self.queues = {} # pending msg_ids keyed by engine_id |
|
|||
350 | # self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id |
|
|||
351 | # self.completed = {} # completed msg_ids keyed by engine_id |
|
|||
352 | # self.all_completed = set() |
|
|||
353 | # self._idcounter = 0 |
|
|||
354 | # self.sockets = {} |
|
|||
355 | # self.loop = loop |
|
|||
356 | # self.session = session |
|
|||
357 | # self.registrar = registrar |
|
|||
358 | # self.clientele = clientele |
|
|||
359 | # self.queue = queue |
|
|||
360 | # self.heartmonitor = heartbeat |
|
|||
361 | # self.notifier = notifier |
|
|||
362 | # self.db = db |
|
|||
363 |
|
338 | |||
364 | # validate connection dicts: |
|
339 | # validate connection dicts: | |
365 | # self.client_addrs = client_addrs |
|
|||
366 | validate_url_container(self.client_addrs) |
|
340 | validate_url_container(self.client_addrs) | |
367 |
|
||||
368 | # assert isinstance(self.client_addrs['queue'], str) |
|
|||
369 | # assert isinstance(self.client_addrs['control'], str) |
|
|||
370 | # self.hb_addrs = hb_addrs |
|
|||
371 | validate_url_container(self.engine_addrs) |
|
341 | validate_url_container(self.engine_addrs) | |
372 | # self.engine_addrs = engine_addrs |
|
|||
373 | # assert isinstance(self.engine_addrs['queue'], str) |
|
|||
374 | # assert isinstance(self.engine_addrs['control'], str) |
|
|||
375 | # assert len(engine_addrs['heartbeat']) == 2 |
|
|||
376 |
|
342 | |||
377 | # register our callbacks |
|
343 | # register our callbacks | |
378 | self.registrar.on_recv(self.dispatch_register_request) |
|
344 | self.registrar.on_recv(self.dispatch_register_request) | |
@@ -409,7 +375,9 b' class Hub(HasTraits):' | |||||
409 |
|
375 | |||
410 | @property |
|
376 | @property | |
411 | def _next_id(self): |
|
377 | def _next_id(self): | |
412 |
"""gemerate a new ID |
|
378 | """gemerate a new ID. | |
|
379 | ||||
|
380 | No longer reuse old ids, just count from 0.""" | |||
413 | newid = self._idcounter |
|
381 | newid = self._idcounter | |
414 | self._idcounter += 1 |
|
382 | self._idcounter += 1 | |
415 | return newid |
|
383 | return newid |
@@ -123,6 +123,11 b' class IPControllerAppConfigLoader(ClusterDirConfigLoader):' | |||||
123 | help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat ' |
|
123 | help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat ' | |
124 | 'connections [default: random]', |
|
124 | 'connections [default: random]', | |
125 | metavar='Hub.hb_ports') |
|
125 | metavar='Hub.hb_ports') | |
|
126 | paa('--ping', | |||
|
127 | type=int, dest='HubFactory.ping', | |||
|
128 | help='The frequency at which the Hub pings the engines for heartbeats ' | |||
|
129 | ' (in ms) [default: 100]', | |||
|
130 | metavar='Hub.ping') | |||
126 |
|
131 | |||
127 | # Client config |
|
132 | # Client config | |
128 | paa('--client-ip', |
|
133 | paa('--client-ip', | |
@@ -204,10 +209,10 b' class IPControllerAppConfigLoader(ClusterDirConfigLoader):' | |||||
204 | help='Try to reuse existing execution keys.') |
|
209 | help='Try to reuse existing execution keys.') | |
205 | paa('--no-secure', |
|
210 | paa('--no-secure', | |
206 | action='store_false', dest='Global.secure', |
|
211 | action='store_false', dest='Global.secure', | |
207 | help='Turn off execution keys.') |
|
212 | help='Turn off execution keys (default).') | |
208 | paa('--secure', |
|
213 | paa('--secure', | |
209 | action='store_true', dest='Global.secure', |
|
214 | action='store_true', dest='Global.secure', | |
210 |
help='Turn on execution keys |
|
215 | help='Turn on execution keys.') | |
211 | paa('--execkey', |
|
216 | paa('--execkey', | |
212 | type=str, dest='Global.exec_key', |
|
217 | type=str, dest='Global.exec_key', | |
213 | help='path to a file containing an execution key.', |
|
218 | help='path to a file containing an execution key.', | |
@@ -281,6 +286,19 b' class IPControllerApp(ApplicationWithClusterDir):' | |||||
281 | self.log.error("Couldn't construct the Controller", exc_info=True) |
|
286 | self.log.error("Couldn't construct the Controller", exc_info=True) | |
282 | self.exit(1) |
|
287 | self.exit(1) | |
283 |
|
|
288 | ||
|
289 | def save_urls(self): | |||
|
290 | """save the registration urls to files.""" | |||
|
291 | c = self.master_config | |||
|
292 | ||||
|
293 | sec_dir = c.Global.security_dir | |||
|
294 | cf = self.factory | |||
|
295 | ||||
|
296 | with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f: | |||
|
297 | f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport)) | |||
|
298 | ||||
|
299 | with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f: | |||
|
300 | f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport)) | |||
|
301 | ||||
284 |
|
302 | |||
285 | def import_statements(self): |
|
303 | def import_statements(self): | |
286 | statements = self.master_config.Global.import_statements |
|
304 | statements = self.master_config.Global.import_statements | |
@@ -291,19 +309,19 b' class IPControllerApp(ApplicationWithClusterDir):' | |||||
291 | except: |
|
309 | except: | |
292 | self.log.msg("Error running statement: %s" % s) |
|
310 | self.log.msg("Error running statement: %s" % s) | |
293 |
|
311 | |||
294 |
|
|
312 | def start_logging(self): | |
295 |
|
|
313 | super(IPControllerApp, self).start_logging() | |
296 |
|
|
314 | if self.master_config.Global.log_url: | |
297 |
|
|
315 | context = self.factory.context | |
298 |
|
|
316 | lsock = context.socket(zmq.PUB) | |
299 |
|
|
317 | lsock.connect(self.master_config.Global.log_url) | |
300 |
|
|
318 | handler = PUBHandler(lsock) | |
301 |
|
|
319 | handler.root_topic = 'controller' | |
302 |
|
|
320 | handler.setLevel(self.log_level) | |
303 |
|
|
321 | self.log.addHandler(handler) | |
304 | # |
|
322 | # | |
305 | def start_app(self): |
|
323 | def start_app(self): | |
306 |
# Start the |
|
324 | # Start the subprocesses: | |
307 | self.factory.start() |
|
325 | self.factory.start() | |
308 | self.write_pid_file(overwrite=True) |
|
326 | self.write_pid_file(overwrite=True) | |
309 | try: |
|
327 | try: |
@@ -218,12 +218,13 b" if 'setuptools' in sys.modules:" | |||||
218 | 'ipcontrollerz = IPython.zmq.parallel.ipcontrollerapp:launch_new_instance', |
|
218 | 'ipcontrollerz = IPython.zmq.parallel.ipcontrollerapp:launch_new_instance', | |
219 | 'ipenginez = IPython.zmq.parallel.ipengineapp:launch_new_instance', |
|
219 | 'ipenginez = IPython.zmq.parallel.ipengineapp:launch_new_instance', | |
220 | 'iploggerz = IPython.zmq.parallel.iploggerapp:launch_new_instance', |
|
220 | 'iploggerz = IPython.zmq.parallel.iploggerapp:launch_new_instance', | |
221 |
'ipclusterz = IPython.zmq.parallel.ipcluster |
|
221 | 'ipclusterz = IPython.zmq.parallel.ipclusterapp:launch_new_instance', | |
222 | 'iptest = IPython.testing.iptest:main', |
|
222 | 'iptest = IPython.testing.iptest:main', | |
223 | 'irunner = IPython.lib.irunner:main' |
|
223 | 'irunner = IPython.lib.irunner:main' | |
224 | ] |
|
224 | ] | |
225 | } |
|
225 | } | |
226 | setup_args['extras_require'] = dict( |
|
226 | setup_args['extras_require'] = dict( | |
|
227 | zmq = 'pyzmq>=2.0.10', | |||
227 | doc='Sphinx>=0.3', |
|
228 | doc='Sphinx>=0.3', | |
228 | test='nose>=0.10.1', |
|
229 | test='nose>=0.10.1', | |
229 | security='pyOpenSSL>=0.6' |
|
230 | security='pyOpenSSL>=0.6' |
General Comments 0
You need to be logged in to leave comments.
Login now