##// END OF EJS Templates
fix residual import issues with IPython.parallel reorganization
MinRK -
Show More
@@ -1,592 +1,593 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import errno
19 19 import logging
20 20 import os
21 21 import re
22 22 import signal
23 23
24 24 import zmq
25 25 from zmq.eventloop import ioloop
26 26
27 27 from IPython.external.argparse import ArgumentParser, SUPPRESS
28 28 from IPython.utils.importstring import import_item
29 from .clusterdir import (
29
30 from IPython.parallel.apps.clusterdir import (
30 31 ApplicationWithClusterDir, ClusterDirConfigLoader,
31 32 ClusterDirError, PIDFileError
32 33 )
33 34
34 35
35 36 #-----------------------------------------------------------------------------
36 37 # Module level variables
37 38 #-----------------------------------------------------------------------------
38 39
39 40
40 41 default_config_file_name = u'ipcluster_config.py'
41 42
42 43
43 44 _description = """\
44 45 Start an IPython cluster for parallel computing.\n\n
45 46
46 47 An IPython cluster consists of 1 controller and 1 or more engines.
47 48 This command automates the startup of these processes using a wide
48 49 range of startup methods (SSH, local processes, PBS, mpiexec,
49 50 Windows HPC Server 2008). To start a cluster with 4 engines on your
50 51 local host simply do 'ipcluster start -n 4'. For more complex usage
51 52 you will typically do 'ipcluster create -p mycluster', then edit
52 53 configuration files, followed by 'ipcluster start -p mycluster -n 4'.
53 54 """
54 55
55 56
56 57 # Exit codes for ipcluster
57 58
58 59 # This will be the exit code if the ipcluster appears to be running because
59 60 # a .pid file exists
60 61 ALREADY_STARTED = 10
61 62
62 63
63 64 # This will be the exit code if ipcluster stop is run, but there is not .pid
64 65 # file to be found.
65 66 ALREADY_STOPPED = 11
66 67
67 68 # This will be the exit code if ipcluster engines is run, but there is not .pid
68 69 # file to be found.
69 70 NO_CLUSTER = 12
70 71
71 72
72 73 #-----------------------------------------------------------------------------
73 74 # Command line options
74 75 #-----------------------------------------------------------------------------
75 76
76 77
77 78 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
78 79
79 80 def _add_arguments(self):
80 81 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
81 82 # its defaults on self.parser. Instead, we will put those on
82 83 # default options on our subparsers.
83 84
84 85 # This has all the common options that all subcommands use
85 86 parent_parser1 = ArgumentParser(
86 87 add_help=False,
87 88 argument_default=SUPPRESS
88 89 )
89 90 self._add_ipython_dir(parent_parser1)
90 91 self._add_log_level(parent_parser1)
91 92
92 93 # This has all the common options that other subcommands use
93 94 parent_parser2 = ArgumentParser(
94 95 add_help=False,
95 96 argument_default=SUPPRESS
96 97 )
97 98 self._add_cluster_profile(parent_parser2)
98 99 self._add_cluster_dir(parent_parser2)
99 100 self._add_work_dir(parent_parser2)
100 101 paa = parent_parser2.add_argument
101 102 paa('--log-to-file',
102 103 action='store_true', dest='Global.log_to_file',
103 104 help='Log to a file in the log directory (default is stdout)')
104 105
105 106 # Create the object used to create the subparsers.
106 107 subparsers = self.parser.add_subparsers(
107 108 dest='Global.subcommand',
108 109 title='ipcluster subcommands',
109 110 description=
110 111 """ipcluster has a variety of subcommands. The general way of
111 112 running ipcluster is 'ipcluster <cmd> [options]'. To get help
112 113 on a particular subcommand do 'ipcluster <cmd> -h'."""
113 114 # help="For more help, type 'ipcluster <cmd> -h'",
114 115 )
115 116
116 117 # The "list" subcommand parser
117 118 parser_list = subparsers.add_parser(
118 119 'list',
119 120 parents=[parent_parser1],
120 121 argument_default=SUPPRESS,
121 122 help="List all clusters in cwd and ipython_dir.",
122 123 description=
123 124 """List all available clusters, by cluster directory, that can
124 125 be found in the current working directly or in the ipython
125 126 directory. Cluster directories are named using the convention
126 127 'cluster_<profile>'."""
127 128 )
128 129
129 130 # The "create" subcommand parser
130 131 parser_create = subparsers.add_parser(
131 132 'create',
132 133 parents=[parent_parser1, parent_parser2],
133 134 argument_default=SUPPRESS,
134 135 help="Create a new cluster directory.",
135 136 description=
136 137 """Create an ipython cluster directory by its profile name or
137 138 cluster directory path. Cluster directories contain
138 139 configuration, log and security related files and are named
139 140 using the convention 'cluster_<profile>'. By default they are
140 141 located in your ipython directory. Once created, you will
141 142 probably need to edit the configuration files in the cluster
142 143 directory to configure your cluster. Most users will create a
143 144 cluster directory by profile name,
144 145 'ipcluster create -p mycluster', which will put the directory
145 146 in '<ipython_dir>/cluster_mycluster'.
146 147 """
147 148 )
148 149 paa = parser_create.add_argument
149 150 paa('--reset-config',
150 151 dest='Global.reset_config', action='store_true',
151 152 help=
152 153 """Recopy the default config files to the cluster directory.
153 154 You will loose any modifications you have made to these files.""")
154 155
155 156 # The "start" subcommand parser
156 157 parser_start = subparsers.add_parser(
157 158 'start',
158 159 parents=[parent_parser1, parent_parser2],
159 160 argument_default=SUPPRESS,
160 161 help="Start a cluster.",
161 162 description=
162 163 """Start an ipython cluster by its profile name or cluster
163 164 directory. Cluster directories contain configuration, log and
164 165 security related files and are named using the convention
165 166 'cluster_<profile>' and should be creating using the 'start'
166 167 subcommand of 'ipcluster'. If your cluster directory is in
167 168 the cwd or the ipython directory, you can simply refer to it
168 169 using its profile name, 'ipcluster start -n 4 -p <profile>`,
169 170 otherwise use the '--cluster-dir' option.
170 171 """
171 172 )
172 173
173 174 paa = parser_start.add_argument
174 175 paa('-n', '--number',
175 176 type=int, dest='Global.n',
176 177 help='The number of engines to start.',
177 178 metavar='Global.n')
178 179 paa('--clean-logs',
179 180 dest='Global.clean_logs', action='store_true',
180 181 help='Delete old log flies before starting.')
181 182 paa('--no-clean-logs',
182 183 dest='Global.clean_logs', action='store_false',
183 184 help="Don't delete old log flies before starting.")
184 185 paa('--daemon',
185 186 dest='Global.daemonize', action='store_true',
186 187 help='Daemonize the ipcluster program. This implies --log-to-file')
187 188 paa('--no-daemon',
188 189 dest='Global.daemonize', action='store_false',
189 190 help="Dont't daemonize the ipcluster program.")
190 191 paa('--delay',
191 192 type=float, dest='Global.delay',
192 193 help="Specify the delay (in seconds) between starting the controller and starting the engine(s).")
193 194
194 195 # The "stop" subcommand parser
195 196 parser_stop = subparsers.add_parser(
196 197 'stop',
197 198 parents=[parent_parser1, parent_parser2],
198 199 argument_default=SUPPRESS,
199 200 help="Stop a running cluster.",
200 201 description=
201 202 """Stop a running ipython cluster by its profile name or cluster
202 203 directory. Cluster directories are named using the convention
203 204 'cluster_<profile>'. If your cluster directory is in
204 205 the cwd or the ipython directory, you can simply refer to it
205 206 using its profile name, 'ipcluster stop -p <profile>`, otherwise
206 207 use the '--cluster-dir' option.
207 208 """
208 209 )
209 210 paa = parser_stop.add_argument
210 211 paa('--signal',
211 212 dest='Global.signal', type=int,
212 213 help="The signal number to use in stopping the cluster (default=2).",
213 214 metavar="Global.signal")
214 215
215 216 # the "engines" subcommand parser
216 217 parser_engines = subparsers.add_parser(
217 218 'engines',
218 219 parents=[parent_parser1, parent_parser2],
219 220 argument_default=SUPPRESS,
220 221 help="Attach some engines to an existing controller or cluster.",
221 222 description=
222 223 """Start one or more engines to connect to an existing Cluster
223 224 by profile name or cluster directory.
224 225 Cluster directories contain configuration, log and
225 226 security related files and are named using the convention
226 227 'cluster_<profile>' and should be creating using the 'start'
227 228 subcommand of 'ipcluster'. If your cluster directory is in
228 229 the cwd or the ipython directory, you can simply refer to it
229 230 using its profile name, 'ipcluster engines -n 4 -p <profile>`,
230 231 otherwise use the '--cluster-dir' option.
231 232 """
232 233 )
233 234 paa = parser_engines.add_argument
234 235 paa('-n', '--number',
235 236 type=int, dest='Global.n',
236 237 help='The number of engines to start.',
237 238 metavar='Global.n')
238 239 paa('--daemon',
239 240 dest='Global.daemonize', action='store_true',
240 241 help='Daemonize the ipcluster program. This implies --log-to-file')
241 242 paa('--no-daemon',
242 243 dest='Global.daemonize', action='store_false',
243 244 help="Dont't daemonize the ipcluster program.")
244 245
245 246 #-----------------------------------------------------------------------------
246 247 # Main application
247 248 #-----------------------------------------------------------------------------
248 249
249 250
250 251 class IPClusterApp(ApplicationWithClusterDir):
251 252
252 253 name = u'ipcluster'
253 254 description = _description
254 255 usage = None
255 256 command_line_loader = IPClusterAppConfigLoader
256 257 default_config_file_name = default_config_file_name
257 258 default_log_level = logging.INFO
258 259 auto_create_cluster_dir = False
259 260
260 261 def create_default_config(self):
261 262 super(IPClusterApp, self).create_default_config()
262 263 self.default_config.Global.controller_launcher = \
263 'IPython.parallel.launcher.LocalControllerLauncher'
264 'IPython.parallel.apps.launcher.LocalControllerLauncher'
264 265 self.default_config.Global.engine_launcher = \
265 'IPython.parallel.launcher.LocalEngineSetLauncher'
266 'IPython.parallel.apps.launcher.LocalEngineSetLauncher'
266 267 self.default_config.Global.n = 2
267 268 self.default_config.Global.delay = 2
268 269 self.default_config.Global.reset_config = False
269 270 self.default_config.Global.clean_logs = True
270 271 self.default_config.Global.signal = signal.SIGINT
271 272 self.default_config.Global.daemonize = False
272 273
273 274 def find_resources(self):
274 275 subcommand = self.command_line_config.Global.subcommand
275 276 if subcommand=='list':
276 277 self.list_cluster_dirs()
277 278 # Exit immediately because there is nothing left to do.
278 279 self.exit()
279 280 elif subcommand=='create':
280 281 self.auto_create_cluster_dir = True
281 282 super(IPClusterApp, self).find_resources()
282 283 elif subcommand=='start' or subcommand=='stop':
283 284 self.auto_create_cluster_dir = True
284 285 try:
285 286 super(IPClusterApp, self).find_resources()
286 287 except ClusterDirError:
287 288 raise ClusterDirError(
288 289 "Could not find a cluster directory. A cluster dir must "
289 290 "be created before running 'ipcluster start'. Do "
290 291 "'ipcluster create -h' or 'ipcluster list -h' for more "
291 292 "information about creating and listing cluster dirs."
292 293 )
293 294 elif subcommand=='engines':
294 295 self.auto_create_cluster_dir = False
295 296 try:
296 297 super(IPClusterApp, self).find_resources()
297 298 except ClusterDirError:
298 299 raise ClusterDirError(
299 300 "Could not find a cluster directory. A cluster dir must "
300 301 "be created before running 'ipcluster start'. Do "
301 302 "'ipcluster create -h' or 'ipcluster list -h' for more "
302 303 "information about creating and listing cluster dirs."
303 304 )
304 305
305 306 def list_cluster_dirs(self):
306 307 # Find the search paths
307 308 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
308 309 if cluster_dir_paths:
309 310 cluster_dir_paths = cluster_dir_paths.split(':')
310 311 else:
311 312 cluster_dir_paths = []
312 313 try:
313 314 ipython_dir = self.command_line_config.Global.ipython_dir
314 315 except AttributeError:
315 316 ipython_dir = self.default_config.Global.ipython_dir
316 317 paths = [os.getcwd(), ipython_dir] + \
317 318 cluster_dir_paths
318 319 paths = list(set(paths))
319 320
320 321 self.log.info('Searching for cluster dirs in paths: %r' % paths)
321 322 for path in paths:
322 323 files = os.listdir(path)
323 324 for f in files:
324 325 full_path = os.path.join(path, f)
325 326 if os.path.isdir(full_path) and f.startswith('cluster_'):
326 327 profile = full_path.split('_')[-1]
327 328 start_cmd = 'ipcluster start -p %s -n 4' % profile
328 329 print start_cmd + " ==> " + full_path
329 330
330 331 def pre_construct(self):
331 332 # IPClusterApp.pre_construct() is where we cd to the working directory.
332 333 super(IPClusterApp, self).pre_construct()
333 334 config = self.master_config
334 335 try:
335 336 daemon = config.Global.daemonize
336 337 if daemon:
337 338 config.Global.log_to_file = True
338 339 except AttributeError:
339 340 pass
340 341
341 342 def construct(self):
342 343 config = self.master_config
343 344 subcmd = config.Global.subcommand
344 345 reset = config.Global.reset_config
345 346 if subcmd == 'list':
346 347 return
347 348 if subcmd == 'create':
348 349 self.log.info('Copying default config files to cluster directory '
349 350 '[overwrite=%r]' % (reset,))
350 351 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
351 352 if subcmd =='start':
352 353 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
353 354 self.start_logging()
354 355 self.loop = ioloop.IOLoop.instance()
355 356 # reactor.callWhenRunning(self.start_launchers)
356 357 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
357 358 dc.start()
358 359 if subcmd == 'engines':
359 360 self.start_logging()
360 361 self.loop = ioloop.IOLoop.instance()
361 362 # reactor.callWhenRunning(self.start_launchers)
362 363 engine_only = lambda : self.start_launchers(controller=False)
363 364 dc = ioloop.DelayedCallback(engine_only, 0, self.loop)
364 365 dc.start()
365 366
366 367 def start_launchers(self, controller=True):
367 368 config = self.master_config
368 369
369 370 # Create the launchers. In both bases, we set the work_dir of
370 371 # the launcher to the cluster_dir. This is where the launcher's
371 372 # subprocesses will be launched. It is not where the controller
372 373 # and engine will be launched.
373 374 if controller:
374 375 cl_class = import_item(config.Global.controller_launcher)
375 376 self.controller_launcher = cl_class(
376 377 work_dir=self.cluster_dir, config=config,
377 378 logname=self.log.name
378 379 )
379 380 # Setup the observing of stopping. If the controller dies, shut
380 381 # everything down as that will be completely fatal for the engines.
381 382 self.controller_launcher.on_stop(self.stop_launchers)
382 383 # But, we don't monitor the stopping of engines. An engine dying
383 384 # is just fine and in principle a user could start a new engine.
384 385 # Also, if we did monitor engine stopping, it is difficult to
385 386 # know what to do when only some engines die. Currently, the
386 387 # observing of engine stopping is inconsistent. Some launchers
387 388 # might trigger on a single engine stopping, other wait until
388 389 # all stop. TODO: think more about how to handle this.
389 390 else:
390 391 self.controller_launcher = None
391 392
392 393 el_class = import_item(config.Global.engine_launcher)
393 394 self.engine_launcher = el_class(
394 395 work_dir=self.cluster_dir, config=config, logname=self.log.name
395 396 )
396 397
397 398 # Setup signals
398 399 signal.signal(signal.SIGINT, self.sigint_handler)
399 400
400 401 # Start the controller and engines
401 402 self._stopping = False # Make sure stop_launchers is not called 2x.
402 403 if controller:
403 404 self.start_controller()
404 405 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop)
405 406 dc.start()
406 407 self.startup_message()
407 408
408 409 def startup_message(self, r=None):
409 410 self.log.info("IPython cluster: started")
410 411 return r
411 412
412 413 def start_controller(self, r=None):
413 414 # self.log.info("In start_controller")
414 415 config = self.master_config
415 416 d = self.controller_launcher.start(
416 417 cluster_dir=config.Global.cluster_dir
417 418 )
418 419 return d
419 420
420 421 def start_engines(self, r=None):
421 422 # self.log.info("In start_engines")
422 423 config = self.master_config
423 424
424 425 d = self.engine_launcher.start(
425 426 config.Global.n,
426 427 cluster_dir=config.Global.cluster_dir
427 428 )
428 429 return d
429 430
430 431 def stop_controller(self, r=None):
431 432 # self.log.info("In stop_controller")
432 433 if self.controller_launcher and self.controller_launcher.running:
433 434 return self.controller_launcher.stop()
434 435
435 436 def stop_engines(self, r=None):
436 437 # self.log.info("In stop_engines")
437 438 if self.engine_launcher.running:
438 439 d = self.engine_launcher.stop()
439 440 # d.addErrback(self.log_err)
440 441 return d
441 442 else:
442 443 return None
443 444
444 445 def log_err(self, f):
445 446 self.log.error(f.getTraceback())
446 447 return None
447 448
448 449 def stop_launchers(self, r=None):
449 450 if not self._stopping:
450 451 self._stopping = True
451 452 # if isinstance(r, failure.Failure):
452 453 # self.log.error('Unexpected error in ipcluster:')
453 454 # self.log.info(r.getTraceback())
454 455 self.log.error("IPython cluster: stopping")
455 456 # These return deferreds. We are not doing anything with them
456 457 # but we are holding refs to them as a reminder that they
457 458 # do return deferreds.
458 459 d1 = self.stop_engines()
459 460 d2 = self.stop_controller()
460 461 # Wait a few seconds to let things shut down.
461 462 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
462 463 dc.start()
463 464 # reactor.callLater(4.0, reactor.stop)
464 465
465 466 def sigint_handler(self, signum, frame):
466 467 self.stop_launchers()
467 468
468 469 def start_logging(self):
469 470 # Remove old log files of the controller and engine
470 471 if self.master_config.Global.clean_logs:
471 472 log_dir = self.master_config.Global.log_dir
472 473 for f in os.listdir(log_dir):
473 474 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
474 475 os.remove(os.path.join(log_dir, f))
475 476 # This will remove old log files for ipcluster itself
476 477 super(IPClusterApp, self).start_logging()
477 478
478 479 def start_app(self):
479 480 """Start the application, depending on what subcommand is used."""
480 481 subcmd = self.master_config.Global.subcommand
481 482 if subcmd=='create' or subcmd=='list':
482 483 return
483 484 elif subcmd=='start':
484 485 self.start_app_start()
485 486 elif subcmd=='stop':
486 487 self.start_app_stop()
487 488 elif subcmd=='engines':
488 489 self.start_app_engines()
489 490
490 491 def start_app_start(self):
491 492 """Start the app for the start subcommand."""
492 493 config = self.master_config
493 494 # First see if the cluster is already running
494 495 try:
495 496 pid = self.get_pid_from_file()
496 497 except PIDFileError:
497 498 pass
498 499 else:
499 500 self.log.critical(
500 501 'Cluster is already running with [pid=%s]. '
501 502 'use "ipcluster stop" to stop the cluster.' % pid
502 503 )
503 504 # Here I exit with a unusual exit status that other processes
504 505 # can watch for to learn how I existed.
505 506 self.exit(ALREADY_STARTED)
506 507
507 508 # Now log and daemonize
508 509 self.log.info(
509 510 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
510 511 )
511 512 # TODO: Get daemonize working on Windows or as a Windows Server.
512 513 if config.Global.daemonize:
513 514 if os.name=='posix':
514 515 from twisted.scripts._twistd_unix import daemonize
515 516 daemonize()
516 517
517 518 # Now write the new pid file AFTER our new forked pid is active.
518 519 self.write_pid_file()
519 520 try:
520 521 self.loop.start()
521 522 except KeyboardInterrupt:
522 523 pass
523 524 except zmq.ZMQError as e:
524 525 if e.errno == errno.EINTR:
525 526 pass
526 527 else:
527 528 raise
528 529 self.remove_pid_file()
529 530
530 531 def start_app_engines(self):
531 532 """Start the app for the start subcommand."""
532 533 config = self.master_config
533 534 # First see if the cluster is already running
534 535
535 536 # Now log and daemonize
536 537 self.log.info(
537 538 'Starting engines with [daemon=%r]' % config.Global.daemonize
538 539 )
539 540 # TODO: Get daemonize working on Windows or as a Windows Server.
540 541 if config.Global.daemonize:
541 542 if os.name=='posix':
542 543 from twisted.scripts._twistd_unix import daemonize
543 544 daemonize()
544 545
545 546 # Now write the new pid file AFTER our new forked pid is active.
546 547 # self.write_pid_file()
547 548 try:
548 549 self.loop.start()
549 550 except KeyboardInterrupt:
550 551 pass
551 552 except zmq.ZMQError as e:
552 553 if e.errno == errno.EINTR:
553 554 pass
554 555 else:
555 556 raise
556 557 # self.remove_pid_file()
557 558
558 559 def start_app_stop(self):
559 560 """Start the app for the stop subcommand."""
560 561 config = self.master_config
561 562 try:
562 563 pid = self.get_pid_from_file()
563 564 except PIDFileError:
564 565 self.log.critical(
565 566 'Problem reading pid file, cluster is probably not running.'
566 567 )
567 568 # Here I exit with a unusual exit status that other processes
568 569 # can watch for to learn how I existed.
569 570 self.exit(ALREADY_STOPPED)
570 571 else:
571 572 if os.name=='posix':
572 573 sig = config.Global.signal
573 574 self.log.info(
574 575 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
575 576 )
576 577 os.kill(pid, sig)
577 578 elif os.name=='nt':
578 579 # As of right now, we don't support daemonize on Windows, so
579 580 # stop will not do anything. Minimally, it should clean up the
580 581 # old .pid files.
581 582 self.remove_pid_file()
582 583
583 584
584 585 def launch_new_instance():
585 586 """Create and run the IPython cluster."""
586 587 app = IPClusterApp()
587 588 app.start()
588 589
589 590
590 591 if __name__ == '__main__':
591 592 launch_new_instance()
592 593
@@ -1,432 +1,433 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import copy
21 21 import os
22 22 import logging
23 23 import socket
24 24 import stat
25 25 import sys
26 26 import uuid
27 27
28 28 import zmq
29 29 from zmq.log.handlers import PUBHandler
30 30 from zmq.utils import jsonapi as json
31 31
32 32 from IPython.config.loader import Config
33 33
34 34 from IPython.parallel import factory
35 from .clusterdir import (
35
36 from IPython.parallel.apps.clusterdir import (
36 37 ApplicationWithClusterDir,
37 38 ClusterDirConfigLoader
38 39 )
39 40 from IPython.parallel.util import disambiguate_ip_address, split_url
40 41 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
41 42 from IPython.utils.traitlets import Instance, Unicode
42 43
43 44 from IPython.parallel.controller.controller import ControllerFactory
44 45
45 46
46 47 #-----------------------------------------------------------------------------
47 48 # Module level variables
48 49 #-----------------------------------------------------------------------------
49 50
50 51
51 52 #: The default config file name for this application
52 53 default_config_file_name = u'ipcontroller_config.py'
53 54
54 55
55 56 _description = """Start the IPython controller for parallel computing.
56 57
57 58 The IPython controller provides a gateway between the IPython engines and
58 59 clients. The controller needs to be started before the engines and can be
59 60 configured using command line options or using a cluster directory. Cluster
60 61 directories contain config, log and security files and are usually located in
61 62 your ipython directory and named as "cluster_<profile>". See the --profile
62 63 and --cluster-dir options for details.
63 64 """
64 65
65 66 #-----------------------------------------------------------------------------
66 67 # Default interfaces
67 68 #-----------------------------------------------------------------------------
68 69
69 70 # The default client interfaces for FCClientServiceFactory.interfaces
70 71 default_client_interfaces = Config()
71 72 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
72 73
73 74 # Make this a dict we can pass to Config.__init__ for the default
74 75 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
75 76
76 77
77 78
78 79 # The default engine interfaces for FCEngineServiceFactory.interfaces
79 80 default_engine_interfaces = Config()
80 81 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
81 82
82 83 # Make this a dict we can pass to Config.__init__ for the default
83 84 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
84 85
85 86
86 87 #-----------------------------------------------------------------------------
87 88 # Service factories
88 89 #-----------------------------------------------------------------------------
89 90
90 91 #
91 92 # class FCClientServiceFactory(FCServiceFactory):
92 93 # """A Foolscap implementation of the client services."""
93 94 #
94 95 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
95 96 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
96 97 # allow_none=False, config=True)
97 98 #
98 99 #
99 100 # class FCEngineServiceFactory(FCServiceFactory):
100 101 # """A Foolscap implementation of the engine services."""
101 102 #
102 103 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
103 104 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
104 105 # allow_none=False, config=True)
105 106 #
106 107
107 108 #-----------------------------------------------------------------------------
108 109 # Command line options
109 110 #-----------------------------------------------------------------------------
110 111
111 112
112 113 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
113 114
114 115 def _add_arguments(self):
115 116 super(IPControllerAppConfigLoader, self)._add_arguments()
116 117 paa = self.parser.add_argument
117 118
118 119 ## Hub Config:
119 120 paa('--mongodb',
120 121 dest='HubFactory.db_class', action='store_const',
121 122 const='IPython.parallel.controller.mongodb.MongoDB',
122 123 help='Use MongoDB for task storage [default: in-memory]')
123 124 paa('--sqlite',
124 125 dest='HubFactory.db_class', action='store_const',
125 126 const='IPython.parallel.controller.sqlitedb.SQLiteDB',
126 127 help='Use SQLite3 for DB task storage [default: in-memory]')
127 128 paa('--hb',
128 129 type=int, dest='HubFactory.hb', nargs=2,
129 130 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
130 131 'connections [default: random]',
131 132 metavar='Hub.hb_ports')
132 133 paa('--ping',
133 134 type=int, dest='HubFactory.ping',
134 135 help='The frequency at which the Hub pings the engines for heartbeats '
135 136 ' (in ms) [default: 100]',
136 137 metavar='Hub.ping')
137 138
138 139 # Client config
139 140 paa('--client-ip',
140 141 type=str, dest='HubFactory.client_ip',
141 142 help='The IP address or hostname the Hub will listen on for '
142 143 'client connections. Both engine-ip and client-ip can be set simultaneously '
143 144 'via --ip [default: loopback]',
144 145 metavar='Hub.client_ip')
145 146 paa('--client-transport',
146 147 type=str, dest='HubFactory.client_transport',
147 148 help='The ZeroMQ transport the Hub will use for '
148 149 'client connections. Both engine-transport and client-transport can be set simultaneously '
149 150 'via --transport [default: tcp]',
150 151 metavar='Hub.client_transport')
151 152 paa('--query',
152 153 type=int, dest='HubFactory.query_port',
153 154 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
154 155 metavar='Hub.query_port')
155 156 paa('--notifier',
156 157 type=int, dest='HubFactory.notifier_port',
157 158 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
158 159 metavar='Hub.notifier_port')
159 160
160 161 # Engine config
161 162 paa('--engine-ip',
162 163 type=str, dest='HubFactory.engine_ip',
163 164 help='The IP address or hostname the Hub will listen on for '
164 165 'engine connections. This applies to the Hub and its schedulers'
165 166 'engine-ip and client-ip can be set simultaneously '
166 167 'via --ip [default: loopback]',
167 168 metavar='Hub.engine_ip')
168 169 paa('--engine-transport',
169 170 type=str, dest='HubFactory.engine_transport',
170 171 help='The ZeroMQ transport the Hub will use for '
171 172 'client connections. Both engine-transport and client-transport can be set simultaneously '
172 173 'via --transport [default: tcp]',
173 174 metavar='Hub.engine_transport')
174 175
175 176 # Scheduler config
176 177 paa('--mux',
177 178 type=int, dest='ControllerFactory.mux', nargs=2,
178 179 help='The (2) ports the MUX scheduler will listen on for client,engine '
179 180 'connections, respectively [default: random]',
180 181 metavar='Scheduler.mux_ports')
181 182 paa('--task',
182 183 type=int, dest='ControllerFactory.task', nargs=2,
183 184 help='The (2) ports the Task scheduler will listen on for client,engine '
184 185 'connections, respectively [default: random]',
185 186 metavar='Scheduler.task_ports')
186 187 paa('--control',
187 188 type=int, dest='ControllerFactory.control', nargs=2,
188 189 help='The (2) ports the Control scheduler will listen on for client,engine '
189 190 'connections, respectively [default: random]',
190 191 metavar='Scheduler.control_ports')
191 192 paa('--iopub',
192 193 type=int, dest='ControllerFactory.iopub', nargs=2,
193 194 help='The (2) ports the IOPub scheduler will listen on for client,engine '
194 195 'connections, respectively [default: random]',
195 196 metavar='Scheduler.iopub_ports')
196 197
197 198 paa('--scheme',
198 199 type=str, dest='HubFactory.scheme',
199 200 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
200 201 help='select the task scheduler scheme [default: Python LRU]',
201 202 metavar='Scheduler.scheme')
202 203 paa('--usethreads',
203 204 dest='ControllerFactory.usethreads', action="store_true",
204 205 help='Use threads instead of processes for the schedulers',
205 206 )
206 207 paa('--hwm',
207 208 dest='ControllerFactory.hwm', type=int,
208 209 help='specify the High Water Mark (HWM) for the downstream '
209 210 'socket in the pure ZMQ scheduler. This is the maximum number '
210 211 'of allowed outstanding tasks on each engine.',
211 212 )
212 213
213 214 ## Global config
214 215 paa('--log-to-file',
215 216 action='store_true', dest='Global.log_to_file',
216 217 help='Log to a file in the log directory (default is stdout)')
217 218 paa('--log-url',
218 219 type=str, dest='Global.log_url',
219 220 help='Broadcast logs to an iploggerz process [default: disabled]')
220 221 paa('-r','--reuse-files',
221 222 action='store_true', dest='Global.reuse_files',
222 223 help='Try to reuse existing json connection files.')
223 224 paa('--no-secure',
224 225 action='store_false', dest='Global.secure',
225 226 help='Turn off execution keys (default).')
226 227 paa('--secure',
227 228 action='store_true', dest='Global.secure',
228 229 help='Turn on execution keys.')
229 230 paa('--execkey',
230 231 type=str, dest='Global.exec_key',
231 232 help='path to a file containing an execution key.',
232 233 metavar='keyfile')
233 234 paa('--ssh',
234 235 type=str, dest='Global.sshserver',
235 236 help='ssh url for clients to use when connecting to the Controller '
236 237 'processes. It should be of the form: [user@]server[:port]. The '
237 238 'Controller\'s listening addresses must be accessible from the ssh server',
238 239 metavar='Global.sshserver')
239 240 paa('--location',
240 241 type=str, dest='Global.location',
241 242 help="The external IP or domain name of this machine, used for disambiguating "
242 243 "engine and client connections.",
243 244 metavar='Global.location')
244 245 factory.add_session_arguments(self.parser)
245 246 factory.add_registration_arguments(self.parser)
246 247
247 248
248 249 #-----------------------------------------------------------------------------
249 250 # The main application
250 251 #-----------------------------------------------------------------------------
251 252
252 253
253 254 class IPControllerApp(ApplicationWithClusterDir):
254 255
255 256 name = u'ipcontroller'
256 257 description = _description
257 258 command_line_loader = IPControllerAppConfigLoader
258 259 default_config_file_name = default_config_file_name
259 260 auto_create_cluster_dir = True
260 261
261 262
262 263 def create_default_config(self):
263 264 super(IPControllerApp, self).create_default_config()
264 265 # Don't set defaults for Global.secure or Global.reuse_furls
265 266 # as those are set in a component.
266 267 self.default_config.Global.import_statements = []
267 268 self.default_config.Global.clean_logs = True
268 269 self.default_config.Global.secure = True
269 270 self.default_config.Global.reuse_files = False
270 271 self.default_config.Global.exec_key = "exec_key.key"
271 272 self.default_config.Global.sshserver = None
272 273 self.default_config.Global.location = None
273 274
274 275 def pre_construct(self):
275 276 super(IPControllerApp, self).pre_construct()
276 277 c = self.master_config
277 278 # The defaults for these are set in FCClientServiceFactory and
278 279 # FCEngineServiceFactory, so we only set them here if the global
279 280 # options have be set to override the class level defaults.
280 281
281 282 # if hasattr(c.Global, 'reuse_furls'):
282 283 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
283 284 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
284 285 # del c.Global.reuse_furls
285 286 # if hasattr(c.Global, 'secure'):
286 287 # c.FCClientServiceFactory.secure = c.Global.secure
287 288 # c.FCEngineServiceFactory.secure = c.Global.secure
288 289 # del c.Global.secure
289 290
290 291 def save_connection_dict(self, fname, cdict):
291 292 """save a connection dict to json file."""
292 293 c = self.master_config
293 294 url = cdict['url']
294 295 location = cdict['location']
295 296 if not location:
296 297 try:
297 298 proto,ip,port = split_url(url)
298 299 except AssertionError:
299 300 pass
300 301 else:
301 302 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
302 303 cdict['location'] = location
303 304 fname = os.path.join(c.Global.security_dir, fname)
304 305 with open(fname, 'w') as f:
305 306 f.write(json.dumps(cdict, indent=2))
306 307 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
307 308
308 309 def load_config_from_json(self):
309 310 """load config from existing json connector files."""
310 311 c = self.master_config
311 312 # load from engine config
312 313 with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f:
313 314 cfg = json.loads(f.read())
314 315 key = c.SessionFactory.exec_key = cfg['exec_key']
315 316 xport,addr = cfg['url'].split('://')
316 317 c.HubFactory.engine_transport = xport
317 318 ip,ports = addr.split(':')
318 319 c.HubFactory.engine_ip = ip
319 320 c.HubFactory.regport = int(ports)
320 321 c.Global.location = cfg['location']
321 322
322 323 # load client config
323 324 with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f:
324 325 cfg = json.loads(f.read())
325 326 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
326 327 xport,addr = cfg['url'].split('://')
327 328 c.HubFactory.client_transport = xport
328 329 ip,ports = addr.split(':')
329 330 c.HubFactory.client_ip = ip
330 331 c.Global.sshserver = cfg['ssh']
331 332 assert int(ports) == c.HubFactory.regport, "regport mismatch"
332 333
333 334 def construct(self):
334 335 # This is the working dir by now.
335 336 sys.path.insert(0, '')
336 337 c = self.master_config
337 338
338 339 self.import_statements()
339 340 reusing = c.Global.reuse_files
340 341 if reusing:
341 342 try:
342 343 self.load_config_from_json()
343 344 except (AssertionError,IOError):
344 345 reusing=False
345 346 # check again, because reusing may have failed:
346 347 if reusing:
347 348 pass
348 349 elif c.Global.secure:
349 350 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
350 351 key = str(uuid.uuid4())
351 352 with open(keyfile, 'w') as f:
352 353 f.write(key)
353 354 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
354 355 c.SessionFactory.exec_key = key
355 356 else:
356 357 c.SessionFactory.exec_key = ''
357 358 key = None
358 359
359 360 try:
360 361 self.factory = ControllerFactory(config=c, logname=self.log.name)
361 362 self.start_logging()
362 363 self.factory.construct()
363 364 except:
364 365 self.log.error("Couldn't construct the Controller", exc_info=True)
365 366 self.exit(1)
366 367
367 368 if not reusing:
368 369 # save to new json config files
369 370 f = self.factory
370 371 cdict = {'exec_key' : key,
371 372 'ssh' : c.Global.sshserver,
372 373 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
373 374 'location' : c.Global.location
374 375 }
375 376 self.save_connection_dict('ipcontroller-client.json', cdict)
376 377 edict = cdict
377 378 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
378 379 self.save_connection_dict('ipcontroller-engine.json', edict)
379 380
380 381
381 382 def save_urls(self):
382 383 """save the registration urls to files."""
383 384 c = self.master_config
384 385
385 386 sec_dir = c.Global.security_dir
386 387 cf = self.factory
387 388
388 389 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
389 390 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
390 391
391 392 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
392 393 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
393 394
394 395
395 396 def import_statements(self):
396 397 statements = self.master_config.Global.import_statements
397 398 for s in statements:
398 399 try:
399 400 self.log.msg("Executing statement: '%s'" % s)
400 401 exec s in globals(), locals()
401 402 except:
402 403 self.log.msg("Error running statement: %s" % s)
403 404
404 405 def start_logging(self):
405 406 super(IPControllerApp, self).start_logging()
406 407 if self.master_config.Global.log_url:
407 408 context = self.factory.context
408 409 lsock = context.socket(zmq.PUB)
409 410 lsock.connect(self.master_config.Global.log_url)
410 411 handler = PUBHandler(lsock)
411 412 handler.root_topic = 'controller'
412 413 handler.setLevel(self.log_level)
413 414 self.log.addHandler(handler)
414 415 #
415 416 def start_app(self):
416 417 # Start the subprocesses:
417 418 self.factory.start()
418 419 self.write_pid_file(overwrite=True)
419 420 try:
420 421 self.factory.loop.start()
421 422 except KeyboardInterrupt:
422 423 self.log.critical("Interrupted, Exiting...\n")
423 424
424 425
425 426 def launch_new_instance():
426 427 """Create and run the IPython controller"""
427 428 app = IPControllerApp()
428 429 app.start()
429 430
430 431
431 432 if __name__ == '__main__':
432 433 launch_new_instance()
@@ -1,303 +1,303 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import json
19 19 import os
20 20 import sys
21 21
22 22 import zmq
23 23 from zmq.eventloop import ioloop
24 24
25 from .clusterdir import (
25 from IPython.parallel.apps.clusterdir import (
26 26 ApplicationWithClusterDir,
27 27 ClusterDirConfigLoader
28 28 )
29 29 from IPython.zmq.log import EnginePUBHandler
30 30
31 31 from IPython.parallel import factory
32 32 from IPython.parallel.engine.engine import EngineFactory
33 33 from IPython.parallel.engine.streamkernel import Kernel
34 34 from IPython.parallel.util import disambiguate_url
35 35
36 36 from IPython.utils.importstring import import_item
37 37
38 38
39 39 #-----------------------------------------------------------------------------
40 40 # Module level variables
41 41 #-----------------------------------------------------------------------------
42 42
43 43 #: The default config file name for this application
44 44 default_config_file_name = u'ipengine_config.py'
45 45
46 46
47 47 mpi4py_init = """from mpi4py import MPI as mpi
48 48 mpi.size = mpi.COMM_WORLD.Get_size()
49 49 mpi.rank = mpi.COMM_WORLD.Get_rank()
50 50 """
51 51
52 52
53 53 pytrilinos_init = """from PyTrilinos import Epetra
54 54 class SimpleStruct:
55 55 pass
56 56 mpi = SimpleStruct()
57 57 mpi.rank = 0
58 58 mpi.size = 0
59 59 """
60 60
61 61
62 62 _description = """Start an IPython engine for parallel computing.\n\n
63 63
64 64 IPython engines run in parallel and perform computations on behalf of a client
65 65 and controller. A controller needs to be started before the engines. The
66 66 engine can be configured using command line options or using a cluster
67 67 directory. Cluster directories contain config, log and security files and are
68 68 usually located in your ipython directory and named as "cluster_<profile>".
69 69 See the --profile and --cluster-dir options for details.
70 70 """
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # Command line options
74 74 #-----------------------------------------------------------------------------
75 75
76 76
77 77 class IPEngineAppConfigLoader(ClusterDirConfigLoader):
78 78
79 79 def _add_arguments(self):
80 80 super(IPEngineAppConfigLoader, self)._add_arguments()
81 81 paa = self.parser.add_argument
82 82 # Controller config
83 83 paa('--file', '-f',
84 84 type=unicode, dest='Global.url_file',
85 85 help='The full location of the file containing the connection information fo '
86 86 'controller. If this is not given, the file must be in the '
87 87 'security directory of the cluster directory. This location is '
88 88 'resolved using the --profile and --app-dir options.',
89 89 metavar='Global.url_file')
90 90 # MPI
91 91 paa('--mpi',
92 92 type=str, dest='MPI.use',
93 93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
94 94 metavar='MPI.use')
95 95 # Global config
96 96 paa('--log-to-file',
97 97 action='store_true', dest='Global.log_to_file',
98 98 help='Log to a file in the log directory (default is stdout)')
99 99 paa('--log-url',
100 100 dest='Global.log_url',
101 101 help="url of ZMQ logger, as started with iploggerz")
102 102 # paa('--execkey',
103 103 # type=str, dest='Global.exec_key',
104 104 # help='path to a file containing an execution key.',
105 105 # metavar='keyfile')
106 106 # paa('--no-secure',
107 107 # action='store_false', dest='Global.secure',
108 108 # help='Turn off execution keys.')
109 109 # paa('--secure',
110 110 # action='store_true', dest='Global.secure',
111 111 # help='Turn on execution keys (default).')
112 112 # init command
113 113 paa('-c',
114 114 type=str, dest='Global.extra_exec_lines',
115 115 help='specify a command to be run at startup')
116 116 paa('-s',
117 117 type=unicode, dest='Global.extra_exec_file',
118 118 help='specify a script to be run at startup')
119 119
120 120 factory.add_session_arguments(self.parser)
121 121 factory.add_registration_arguments(self.parser)
122 122
123 123
124 124 #-----------------------------------------------------------------------------
125 125 # Main application
126 126 #-----------------------------------------------------------------------------
127 127
128 128
129 129 class IPEngineApp(ApplicationWithClusterDir):
130 130
131 131 name = u'ipengine'
132 132 description = _description
133 133 command_line_loader = IPEngineAppConfigLoader
134 134 default_config_file_name = default_config_file_name
135 135 auto_create_cluster_dir = True
136 136
137 137 def create_default_config(self):
138 138 super(IPEngineApp, self).create_default_config()
139 139
140 140 # The engine should not clean logs as we don't want to remove the
141 141 # active log files of other running engines.
142 142 self.default_config.Global.clean_logs = False
143 143 self.default_config.Global.secure = True
144 144
145 145 # Global config attributes
146 146 self.default_config.Global.exec_lines = []
147 147 self.default_config.Global.extra_exec_lines = ''
148 148 self.default_config.Global.extra_exec_file = u''
149 149
150 150 # Configuration related to the controller
151 151 # This must match the filename (path not included) that the controller
152 152 # used for the FURL file.
153 153 self.default_config.Global.url_file = u''
154 154 self.default_config.Global.url_file_name = u'ipcontroller-engine.json'
155 155 # If given, this is the actual location of the controller's FURL file.
156 156 # If not, this is computed using the profile, app_dir and furl_file_name
157 157 # self.default_config.Global.key_file_name = u'exec_key.key'
158 158 # self.default_config.Global.key_file = u''
159 159
160 160 # MPI related config attributes
161 161 self.default_config.MPI.use = ''
162 162 self.default_config.MPI.mpi4py = mpi4py_init
163 163 self.default_config.MPI.pytrilinos = pytrilinos_init
164 164
165 165 def post_load_command_line_config(self):
166 166 pass
167 167
168 168 def pre_construct(self):
169 169 super(IPEngineApp, self).pre_construct()
170 170 # self.find_cont_url_file()
171 171 self.find_url_file()
172 172 if self.master_config.Global.extra_exec_lines:
173 173 self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines)
174 174 if self.master_config.Global.extra_exec_file:
175 175 enc = sys.getfilesystemencoding() or 'utf8'
176 176 cmd="execfile(%r)"%self.master_config.Global.extra_exec_file.encode(enc)
177 177 self.master_config.Global.exec_lines.append(cmd)
178 178
179 179 # def find_key_file(self):
180 180 # """Set the key file.
181 181 #
182 182 # Here we don't try to actually see if it exists for is valid as that
183 183 # is hadled by the connection logic.
184 184 # """
185 185 # config = self.master_config
186 186 # # Find the actual controller key file
187 187 # if not config.Global.key_file:
188 188 # try_this = os.path.join(
189 189 # config.Global.cluster_dir,
190 190 # config.Global.security_dir,
191 191 # config.Global.key_file_name
192 192 # )
193 193 # config.Global.key_file = try_this
194 194
195 195 def find_url_file(self):
196 196 """Set the key file.
197 197
198 198 Here we don't try to actually see if it exists for is valid as that
199 199 is hadled by the connection logic.
200 200 """
201 201 config = self.master_config
202 202 # Find the actual controller key file
203 203 if not config.Global.url_file:
204 204 try_this = os.path.join(
205 205 config.Global.cluster_dir,
206 206 config.Global.security_dir,
207 207 config.Global.url_file_name
208 208 )
209 209 config.Global.url_file = try_this
210 210
211 211 def construct(self):
212 212 # This is the working dir by now.
213 213 sys.path.insert(0, '')
214 214 config = self.master_config
215 215 # if os.path.exists(config.Global.key_file) and config.Global.secure:
216 216 # config.SessionFactory.exec_key = config.Global.key_file
217 217 if os.path.exists(config.Global.url_file):
218 218 with open(config.Global.url_file) as f:
219 219 d = json.loads(f.read())
220 220 for k,v in d.iteritems():
221 221 if isinstance(v, unicode):
222 222 d[k] = v.encode()
223 223 if d['exec_key']:
224 224 config.SessionFactory.exec_key = d['exec_key']
225 225 d['url'] = disambiguate_url(d['url'], d['location'])
226 226 config.RegistrationFactory.url=d['url']
227 227 config.EngineFactory.location = d['location']
228 228
229 229
230 230
231 231 config.Kernel.exec_lines = config.Global.exec_lines
232 232
233 233 self.start_mpi()
234 234
235 235 # Create the underlying shell class and EngineService
236 236 # shell_class = import_item(self.master_config.Global.shell_class)
237 237 try:
238 238 self.engine = EngineFactory(config=config, logname=self.log.name)
239 239 except:
240 240 self.log.error("Couldn't start the Engine", exc_info=True)
241 241 self.exit(1)
242 242
243 243 self.start_logging()
244 244
245 245 # Create the service hierarchy
246 246 # self.main_service = service.MultiService()
247 247 # self.engine_service.setServiceParent(self.main_service)
248 248 # self.tub_service = Tub()
249 249 # self.tub_service.setServiceParent(self.main_service)
250 250 # # This needs to be called before the connection is initiated
251 251 # self.main_service.startService()
252 252
253 253 # This initiates the connection to the controller and calls
254 254 # register_engine to tell the controller we are ready to do work
255 255 # self.engine_connector = EngineConnector(self.tub_service)
256 256
257 257 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
258 258
259 259 # reactor.callWhenRunning(self.call_connect)
260 260
261 261
262 262 def start_logging(self):
263 263 super(IPEngineApp, self).start_logging()
264 264 if self.master_config.Global.log_url:
265 265 context = self.engine.context
266 266 lsock = context.socket(zmq.PUB)
267 267 lsock.connect(self.master_config.Global.log_url)
268 268 handler = EnginePUBHandler(self.engine, lsock)
269 269 handler.setLevel(self.log_level)
270 270 self.log.addHandler(handler)
271 271
272 272 def start_mpi(self):
273 273 global mpi
274 274 mpikey = self.master_config.MPI.use
275 275 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
276 276 if mpi_import_statement is not None:
277 277 try:
278 278 self.log.info("Initializing MPI:")
279 279 self.log.info(mpi_import_statement)
280 280 exec mpi_import_statement in globals()
281 281 except:
282 282 mpi = None
283 283 else:
284 284 mpi = None
285 285
286 286
287 287 def start_app(self):
288 288 self.engine.start()
289 289 try:
290 290 self.engine.loop.start()
291 291 except KeyboardInterrupt:
292 292 self.log.critical("Engine Interrupted, shutting down...\n")
293 293
294 294
295 295 def launch_new_instance():
296 296 """Create and run the IPython controller"""
297 297 app = IPEngineApp()
298 298 app.start()
299 299
300 300
301 301 if __name__ == '__main__':
302 302 launch_new_instance()
303 303
@@ -1,132 +1,132 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 A simple IPython logger application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import sys
20 20
21 21 import zmq
22 22
23 from .clusterdir import (
23 from IPython.parallel.apps.clusterdir import (
24 24 ApplicationWithClusterDir,
25 25 ClusterDirConfigLoader
26 26 )
27 from .logwatcher import LogWatcher
27 from IPython.parallel.apps.logwatcher import LogWatcher
28 28
29 29 #-----------------------------------------------------------------------------
30 30 # Module level variables
31 31 #-----------------------------------------------------------------------------
32 32
33 33 #: The default config file name for this application
34 34 default_config_file_name = u'iplogger_config.py'
35 35
36 36 _description = """Start an IPython logger for parallel computing.\n\n
37 37
38 38 IPython controllers and engines (and your own processes) can broadcast log messages
39 39 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
40 40 logger can be configured using command line options or using a cluster
41 41 directory. Cluster directories contain config, log and security files and are
42 42 usually located in your ipython directory and named as "cluster_<profile>".
43 43 See the --profile and --cluster-dir options for details.
44 44 """
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Command line options
48 48 #-----------------------------------------------------------------------------
49 49
50 50
51 51 class IPLoggerAppConfigLoader(ClusterDirConfigLoader):
52 52
53 53 def _add_arguments(self):
54 54 super(IPLoggerAppConfigLoader, self)._add_arguments()
55 55 paa = self.parser.add_argument
56 56 # Controller config
57 57 paa('--url',
58 58 type=str, dest='LogWatcher.url',
59 59 help='The url the LogWatcher will listen on',
60 60 )
61 61 # MPI
62 62 paa('--topics',
63 63 type=str, dest='LogWatcher.topics', nargs='+',
64 64 help='What topics to subscribe to',
65 65 metavar='topics')
66 66 # Global config
67 67 paa('--log-to-file',
68 68 action='store_true', dest='Global.log_to_file',
69 69 help='Log to a file in the log directory (default is stdout)')
70 70
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # Main application
74 74 #-----------------------------------------------------------------------------
75 75
76 76
77 77 class IPLoggerApp(ApplicationWithClusterDir):
78 78
79 79 name = u'iploggerz'
80 80 description = _description
81 81 command_line_loader = IPLoggerAppConfigLoader
82 82 default_config_file_name = default_config_file_name
83 83 auto_create_cluster_dir = True
84 84
85 85 def create_default_config(self):
86 86 super(IPLoggerApp, self).create_default_config()
87 87
88 88 # The engine should not clean logs as we don't want to remove the
89 89 # active log files of other running engines.
90 90 self.default_config.Global.clean_logs = False
91 91
92 92 # If given, this is the actual location of the logger's URL file.
93 93 # If not, this is computed using the profile, app_dir and furl_file_name
94 94 self.default_config.Global.url_file_name = u'iplogger.url'
95 95 self.default_config.Global.url_file = u''
96 96
97 97 def post_load_command_line_config(self):
98 98 pass
99 99
100 100 def pre_construct(self):
101 101 super(IPLoggerApp, self).pre_construct()
102 102
103 103 def construct(self):
104 104 # This is the working dir by now.
105 105 sys.path.insert(0, '')
106 106
107 107 self.start_logging()
108 108
109 109 try:
110 110 self.watcher = LogWatcher(config=self.master_config, logname=self.log.name)
111 111 except:
112 112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
113 113 self.exit(1)
114 114
115 115
116 116 def start_app(self):
117 117 try:
118 118 self.watcher.start()
119 119 self.watcher.loop.start()
120 120 except KeyboardInterrupt:
121 121 self.log.critical("Logging Interrupted, shutting down...\n")
122 122
123 123
124 124 def launch_new_instance():
125 125 """Create and run the IPython LogWatcher"""
126 126 app = IPLoggerApp()
127 127 app.start()
128 128
129 129
130 130 if __name__ == '__main__':
131 131 launch_new_instance()
132 132
General Comments 0
You need to be logged in to leave comments. Login now