##// END OF EJS Templates
remove uneccesary Config objects from flags.
MinRK -
Show More
@@ -1,257 +1,257 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython cluster directory
4 The IPython cluster directory
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import os
20 import os
21 import logging
21 import logging
22 import re
22 import re
23 import sys
23 import sys
24
24
25 from subprocess import Popen, PIPE
25 from subprocess import Popen, PIPE
26
26
27 from IPython.config.loader import Config
28 from IPython.core import release
27 from IPython.core import release
29 from IPython.core.crashhandler import CrashHandler
28 from IPython.core.crashhandler import CrashHandler
30 from IPython.core.newapplication import (
29 from IPython.core.newapplication import (
31 BaseIPythonApplication,
30 BaseIPythonApplication,
32 base_aliases as base_ip_aliases,
31 base_aliases as base_ip_aliases,
33 base_flags as base_ip_flags
32 base_flags as base_ip_flags
34 )
33 )
35 from IPython.utils.path import expand_path
34 from IPython.utils.path import expand_path
36
35
37 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
36 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
38
37
39 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
40 # Module errors
39 # Module errors
41 #-----------------------------------------------------------------------------
40 #-----------------------------------------------------------------------------
42
41
43 class PIDFileError(Exception):
42 class PIDFileError(Exception):
44 pass
43 pass
45
44
46
45
47 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
48 # Crash handler for this application
47 # Crash handler for this application
49 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
50
49
51
50
52 _message_template = """\
51 _message_template = """\
53 Oops, $self.app_name crashed. We do our best to make it stable, but...
52 Oops, $self.app_name crashed. We do our best to make it stable, but...
54
53
55 A crash report was automatically generated with the following information:
54 A crash report was automatically generated with the following information:
56 - A verbatim copy of the crash traceback.
55 - A verbatim copy of the crash traceback.
57 - Data on your current $self.app_name configuration.
56 - Data on your current $self.app_name configuration.
58
57
59 It was left in the file named:
58 It was left in the file named:
60 \t'$self.crash_report_fname'
59 \t'$self.crash_report_fname'
61 If you can email this file to the developers, the information in it will help
60 If you can email this file to the developers, the information in it will help
62 them in understanding and correcting the problem.
61 them in understanding and correcting the problem.
63
62
64 You can mail it to: $self.contact_name at $self.contact_email
63 You can mail it to: $self.contact_name at $self.contact_email
65 with the subject '$self.app_name Crash Report'.
64 with the subject '$self.app_name Crash Report'.
66
65
67 If you want to do it now, the following command will work (under Unix):
66 If you want to do it now, the following command will work (under Unix):
68 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
67 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
69
68
70 To ensure accurate tracking of this issue, please file a report about it at:
69 To ensure accurate tracking of this issue, please file a report about it at:
71 $self.bug_tracker
70 $self.bug_tracker
72 """
71 """
73
72
74 class ParallelCrashHandler(CrashHandler):
73 class ParallelCrashHandler(CrashHandler):
75 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
74 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
76
75
77 message_template = _message_template
76 message_template = _message_template
78
77
79 def __init__(self, app):
78 def __init__(self, app):
80 contact_name = release.authors['Min'][0]
79 contact_name = release.authors['Min'][0]
81 contact_email = release.authors['Min'][1]
80 contact_email = release.authors['Min'][1]
82 bug_tracker = 'http://github.com/ipython/ipython/issues'
81 bug_tracker = 'http://github.com/ipython/ipython/issues'
83 super(ParallelCrashHandler,self).__init__(
82 super(ParallelCrashHandler,self).__init__(
84 app, contact_name, contact_email, bug_tracker
83 app, contact_name, contact_email, bug_tracker
85 )
84 )
86
85
87
86
88 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
89 # Main application
88 # Main application
90 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
91 base_aliases = {}
90 base_aliases = {}
92 base_aliases.update(base_ip_aliases)
91 base_aliases.update(base_ip_aliases)
93 base_aliases.update({
92 base_aliases.update({
94 'profile_dir' : 'ProfileDir.location',
93 'profile_dir' : 'ProfileDir.location',
95 'log_level' : 'BaseParallelApplication.log_level',
94 'log_level' : 'BaseParallelApplication.log_level',
96 'work_dir' : 'BaseParallelApplication.work_dir',
95 'work_dir' : 'BaseParallelApplication.work_dir',
97 'log_to_file' : 'BaseParallelApplication.log_to_file',
96 'log_to_file' : 'BaseParallelApplication.log_to_file',
98 'clean_logs' : 'BaseParallelApplication.clean_logs',
97 'clean_logs' : 'BaseParallelApplication.clean_logs',
99 'log_url' : 'BaseParallelApplication.log_url',
98 'log_url' : 'BaseParallelApplication.log_url',
100 })
99 })
101
100
102 base_flags = {
101 base_flags = {
103 'log-to-file' : ({'BaseParallelApplication' : Config({
102 'log-to-file' : (
104 'log_to_file' : True}),
103 {'BaseParallelApplication' : {'log_to_file' : True}},
105 }, "send log output to a file")
104 "send log output to a file"
105 )
106 }
106 }
107 base_flags.update(base_ip_flags)
107 base_flags.update(base_ip_flags)
108
108
109 class BaseParallelApplication(BaseIPythonApplication):
109 class BaseParallelApplication(BaseIPythonApplication):
110 """The base Application for IPython.parallel apps
110 """The base Application for IPython.parallel apps
111
111
112 Principle extensions to BaseIPyythonApplication:
112 Principle extensions to BaseIPyythonApplication:
113
113
114 * work_dir
114 * work_dir
115 * remote logging via pyzmq
115 * remote logging via pyzmq
116 * IOLoop instance
116 * IOLoop instance
117 """
117 """
118
118
119 crash_handler_class = ParallelCrashHandler
119 crash_handler_class = ParallelCrashHandler
120
120
121 def _log_level_default(self):
121 def _log_level_default(self):
122 # temporarily override default_log_level to INFO
122 # temporarily override default_log_level to INFO
123 return logging.INFO
123 return logging.INFO
124
124
125 work_dir = Unicode(os.getcwdu(), config=True,
125 work_dir = Unicode(os.getcwdu(), config=True,
126 help='Set the working dir for the process.'
126 help='Set the working dir for the process.'
127 )
127 )
128 def _work_dir_changed(self, name, old, new):
128 def _work_dir_changed(self, name, old, new):
129 self.work_dir = unicode(expand_path(new))
129 self.work_dir = unicode(expand_path(new))
130
130
131 log_to_file = Bool(config=True,
131 log_to_file = Bool(config=True,
132 help="whether to log to a file")
132 help="whether to log to a file")
133
133
134 clean_logs = Bool(False, config=True,
134 clean_logs = Bool(False, config=True,
135 help="whether to cleanup old logfiles before starting")
135 help="whether to cleanup old logfiles before starting")
136
136
137 log_url = Unicode('', config=True,
137 log_url = Unicode('', config=True,
138 help="The ZMQ URL of the iplogger to aggregate logging.")
138 help="The ZMQ URL of the iplogger to aggregate logging.")
139
139
140 def _config_files_default(self):
140 def _config_files_default(self):
141 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
141 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
142
142
143 loop = Instance('zmq.eventloop.ioloop.IOLoop')
143 loop = Instance('zmq.eventloop.ioloop.IOLoop')
144 def _loop_default(self):
144 def _loop_default(self):
145 from zmq.eventloop.ioloop import IOLoop
145 from zmq.eventloop.ioloop import IOLoop
146 return IOLoop.instance()
146 return IOLoop.instance()
147
147
148 aliases = Dict(base_aliases)
148 aliases = Dict(base_aliases)
149 flags = Dict(base_flags)
149 flags = Dict(base_flags)
150
150
151 def initialize(self, argv=None):
151 def initialize(self, argv=None):
152 """initialize the app"""
152 """initialize the app"""
153 super(BaseParallelApplication, self).initialize(argv)
153 super(BaseParallelApplication, self).initialize(argv)
154 self.to_work_dir()
154 self.to_work_dir()
155 self.reinit_logging()
155 self.reinit_logging()
156
156
157 def to_work_dir(self):
157 def to_work_dir(self):
158 wd = self.work_dir
158 wd = self.work_dir
159 if unicode(wd) != os.getcwdu():
159 if unicode(wd) != os.getcwdu():
160 os.chdir(wd)
160 os.chdir(wd)
161 self.log.info("Changing to working dir: %s" % wd)
161 self.log.info("Changing to working dir: %s" % wd)
162 # This is the working dir by now.
162 # This is the working dir by now.
163 sys.path.insert(0, '')
163 sys.path.insert(0, '')
164
164
165 def reinit_logging(self):
165 def reinit_logging(self):
166 # Remove old log files
166 # Remove old log files
167 log_dir = self.profile_dir.log_dir
167 log_dir = self.profile_dir.log_dir
168 if self.clean_logs:
168 if self.clean_logs:
169 for f in os.listdir(log_dir):
169 for f in os.listdir(log_dir):
170 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
170 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
171 os.remove(os.path.join(log_dir, f))
171 os.remove(os.path.join(log_dir, f))
172 if self.log_to_file:
172 if self.log_to_file:
173 # Start logging to the new log file
173 # Start logging to the new log file
174 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
174 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
175 logfile = os.path.join(log_dir, log_filename)
175 logfile = os.path.join(log_dir, log_filename)
176 open_log_file = open(logfile, 'w')
176 open_log_file = open(logfile, 'w')
177 else:
177 else:
178 open_log_file = None
178 open_log_file = None
179 if open_log_file is not None:
179 if open_log_file is not None:
180 self.log.removeHandler(self._log_handler)
180 self.log.removeHandler(self._log_handler)
181 self._log_handler = logging.StreamHandler(open_log_file)
181 self._log_handler = logging.StreamHandler(open_log_file)
182 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
182 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
183 self._log_handler.setFormatter(self._log_formatter)
183 self._log_handler.setFormatter(self._log_formatter)
184 self.log.addHandler(self._log_handler)
184 self.log.addHandler(self._log_handler)
185
185
186 def write_pid_file(self, overwrite=False):
186 def write_pid_file(self, overwrite=False):
187 """Create a .pid file in the pid_dir with my pid.
187 """Create a .pid file in the pid_dir with my pid.
188
188
189 This must be called after pre_construct, which sets `self.pid_dir`.
189 This must be called after pre_construct, which sets `self.pid_dir`.
190 This raises :exc:`PIDFileError` if the pid file exists already.
190 This raises :exc:`PIDFileError` if the pid file exists already.
191 """
191 """
192 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
192 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
193 if os.path.isfile(pid_file):
193 if os.path.isfile(pid_file):
194 pid = self.get_pid_from_file()
194 pid = self.get_pid_from_file()
195 if not overwrite:
195 if not overwrite:
196 raise PIDFileError(
196 raise PIDFileError(
197 'The pid file [%s] already exists. \nThis could mean that this '
197 'The pid file [%s] already exists. \nThis could mean that this '
198 'server is already running with [pid=%s].' % (pid_file, pid)
198 'server is already running with [pid=%s].' % (pid_file, pid)
199 )
199 )
200 with open(pid_file, 'w') as f:
200 with open(pid_file, 'w') as f:
201 self.log.info("Creating pid file: %s" % pid_file)
201 self.log.info("Creating pid file: %s" % pid_file)
202 f.write(repr(os.getpid())+'\n')
202 f.write(repr(os.getpid())+'\n')
203
203
204 def remove_pid_file(self):
204 def remove_pid_file(self):
205 """Remove the pid file.
205 """Remove the pid file.
206
206
207 This should be called at shutdown by registering a callback with
207 This should be called at shutdown by registering a callback with
208 :func:`reactor.addSystemEventTrigger`. This needs to return
208 :func:`reactor.addSystemEventTrigger`. This needs to return
209 ``None``.
209 ``None``.
210 """
210 """
211 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
211 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
212 if os.path.isfile(pid_file):
212 if os.path.isfile(pid_file):
213 try:
213 try:
214 self.log.info("Removing pid file: %s" % pid_file)
214 self.log.info("Removing pid file: %s" % pid_file)
215 os.remove(pid_file)
215 os.remove(pid_file)
216 except:
216 except:
217 self.log.warn("Error removing the pid file: %s" % pid_file)
217 self.log.warn("Error removing the pid file: %s" % pid_file)
218
218
219 def get_pid_from_file(self):
219 def get_pid_from_file(self):
220 """Get the pid from the pid file.
220 """Get the pid from the pid file.
221
221
222 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
222 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
223 """
223 """
224 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
224 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
225 if os.path.isfile(pid_file):
225 if os.path.isfile(pid_file):
226 with open(pid_file, 'r') as f:
226 with open(pid_file, 'r') as f:
227 pid = int(f.read().strip())
227 pid = int(f.read().strip())
228 return pid
228 return pid
229 else:
229 else:
230 raise PIDFileError('pid file not found: %s' % pid_file)
230 raise PIDFileError('pid file not found: %s' % pid_file)
231
231
232 def check_pid(self, pid):
232 def check_pid(self, pid):
233 if os.name == 'nt':
233 if os.name == 'nt':
234 try:
234 try:
235 import ctypes
235 import ctypes
236 # returns 0 if no such process (of ours) exists
236 # returns 0 if no such process (of ours) exists
237 # positive int otherwise
237 # positive int otherwise
238 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
238 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
239 except Exception:
239 except Exception:
240 self.log.warn(
240 self.log.warn(
241 "Could not determine whether pid %i is running via `OpenProcess`. "
241 "Could not determine whether pid %i is running via `OpenProcess`. "
242 " Making the likely assumption that it is."%pid
242 " Making the likely assumption that it is."%pid
243 )
243 )
244 return True
244 return True
245 return bool(p)
245 return bool(p)
246 else:
246 else:
247 try:
247 try:
248 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
248 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
249 output,_ = p.communicate()
249 output,_ = p.communicate()
250 except OSError:
250 except OSError:
251 self.log.warn(
251 self.log.warn(
252 "Could not determine whether pid %i is running via `ps x`. "
252 "Could not determine whether pid %i is running via `ps x`. "
253 " Making the likely assumption that it is."%pid
253 " Making the likely assumption that it is."%pid
254 )
254 )
255 return True
255 return True
256 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
256 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
257 return pid in pids
257 return pid in pids
@@ -1,399 +1,398 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import os
20 import os
21 import socket
21 import socket
22 import stat
22 import stat
23 import sys
23 import sys
24 import uuid
24 import uuid
25
25
26 from multiprocessing import Process
26 from multiprocessing import Process
27
27
28 import zmq
28 import zmq
29 from zmq.devices import ProcessMonitoredQueue
29 from zmq.devices import ProcessMonitoredQueue
30 from zmq.log.handlers import PUBHandler
30 from zmq.log.handlers import PUBHandler
31 from zmq.utils import jsonapi as json
31 from zmq.utils import jsonapi as json
32
32
33 from IPython.config.loader import Config
34 from IPython.core.newapplication import ProfileDir
33 from IPython.core.newapplication import ProfileDir
35
34
36 from IPython.parallel.apps.baseapp import (
35 from IPython.parallel.apps.baseapp import (
37 BaseParallelApplication,
36 BaseParallelApplication,
38 base_flags
37 base_flags
39 )
38 )
40 from IPython.utils.importstring import import_item
39 from IPython.utils.importstring import import_item
41 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
40 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
42
41
43 # from IPython.parallel.controller.controller import ControllerFactory
42 # from IPython.parallel.controller.controller import ControllerFactory
44 from IPython.parallel.streamsession import StreamSession
43 from IPython.parallel.streamsession import StreamSession
45 from IPython.parallel.controller.heartmonitor import HeartMonitor
44 from IPython.parallel.controller.heartmonitor import HeartMonitor
46 from IPython.parallel.controller.hub import HubFactory
45 from IPython.parallel.controller.hub import HubFactory
47 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
46 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
48 from IPython.parallel.controller.sqlitedb import SQLiteDB
47 from IPython.parallel.controller.sqlitedb import SQLiteDB
49
48
50 from IPython.parallel.util import signal_children, split_url
49 from IPython.parallel.util import signal_children, split_url
51
50
52 # conditional import of MongoDB backend class
51 # conditional import of MongoDB backend class
53
52
54 try:
53 try:
55 from IPython.parallel.controller.mongodb import MongoDB
54 from IPython.parallel.controller.mongodb import MongoDB
56 except ImportError:
55 except ImportError:
57 maybe_mongo = []
56 maybe_mongo = []
58 else:
57 else:
59 maybe_mongo = [MongoDB]
58 maybe_mongo = [MongoDB]
60
59
61
60
62 #-----------------------------------------------------------------------------
61 #-----------------------------------------------------------------------------
63 # Module level variables
62 # Module level variables
64 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
65
64
66
65
67 #: The default config file name for this application
66 #: The default config file name for this application
68 default_config_file_name = u'ipcontroller_config.py'
67 default_config_file_name = u'ipcontroller_config.py'
69
68
70
69
71 _description = """Start the IPython controller for parallel computing.
70 _description = """Start the IPython controller for parallel computing.
72
71
73 The IPython controller provides a gateway between the IPython engines and
72 The IPython controller provides a gateway between the IPython engines and
74 clients. The controller needs to be started before the engines and can be
73 clients. The controller needs to be started before the engines and can be
75 configured using command line options or using a cluster directory. Cluster
74 configured using command line options or using a cluster directory. Cluster
76 directories contain config, log and security files and are usually located in
75 directories contain config, log and security files and are usually located in
77 your ipython directory and named as "cluster_<profile>". See the `profile`
76 your ipython directory and named as "cluster_<profile>". See the `profile`
78 and `profile_dir` options for details.
77 and `profile_dir` options for details.
79 """
78 """
80
79
81
80
82
81
83
82
84 #-----------------------------------------------------------------------------
83 #-----------------------------------------------------------------------------
85 # The main application
84 # The main application
86 #-----------------------------------------------------------------------------
85 #-----------------------------------------------------------------------------
87 flags = {}
86 flags = {}
88 flags.update(base_flags)
87 flags.update(base_flags)
89 flags.update({
88 flags.update({
90 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
89 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
91 'Use threads instead of processes for the schedulers'),
90 'Use threads instead of processes for the schedulers'),
92 'sqlitedb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'})},
91 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
93 'use the SQLiteDB backend'),
92 'use the SQLiteDB backend'),
94 'mongodb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'})},
93 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
95 'use the MongoDB backend'),
94 'use the MongoDB backend'),
96 'dictdb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.dictdb.DictDB'})},
95 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
97 'use the in-memory DictDB backend'),
96 'use the in-memory DictDB backend'),
98 'reuse' : ({'IPControllerApp' : Config({'reuse_files' : True})},
97 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
99 'reuse existing json connection files')
98 'reuse existing json connection files')
100 })
99 })
101
100
102 flags.update()
101 flags.update()
103
102
104 class IPControllerApp(BaseParallelApplication):
103 class IPControllerApp(BaseParallelApplication):
105
104
106 name = u'ipcontroller'
105 name = u'ipcontroller'
107 description = _description
106 description = _description
108 config_file_name = Unicode(default_config_file_name)
107 config_file_name = Unicode(default_config_file_name)
109 classes = [ProfileDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
108 classes = [ProfileDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
110
109
111 # change default to True
110 # change default to True
112 auto_create = Bool(True, config=True,
111 auto_create = Bool(True, config=True,
113 help="""Whether to create profile dir if it doesn't exist""")
112 help="""Whether to create profile dir if it doesn't exist""")
114
113
115 reuse_files = Bool(False, config=True,
114 reuse_files = Bool(False, config=True,
116 help='Whether to reuse existing json connection files [default: False]'
115 help='Whether to reuse existing json connection files [default: False]'
117 )
116 )
118 secure = Bool(True, config=True,
117 secure = Bool(True, config=True,
119 help='Whether to use exec_keys for extra authentication [default: True]'
118 help='Whether to use exec_keys for extra authentication [default: True]'
120 )
119 )
121 ssh_server = Unicode(u'', config=True,
120 ssh_server = Unicode(u'', config=True,
122 help="""ssh url for clients to use when connecting to the Controller
121 help="""ssh url for clients to use when connecting to the Controller
123 processes. It should be of the form: [user@]server[:port]. The
122 processes. It should be of the form: [user@]server[:port]. The
124 Controller\'s listening addresses must be accessible from the ssh server""",
123 Controller\'s listening addresses must be accessible from the ssh server""",
125 )
124 )
126 location = Unicode(u'', config=True,
125 location = Unicode(u'', config=True,
127 help="""The external IP or domain name of the Controller, used for disambiguating
126 help="""The external IP or domain name of the Controller, used for disambiguating
128 engine and client connections.""",
127 engine and client connections.""",
129 )
128 )
130 import_statements = List([], config=True,
129 import_statements = List([], config=True,
131 help="import statements to be run at startup. Necessary in some environments"
130 help="import statements to be run at startup. Necessary in some environments"
132 )
131 )
133
132
134 use_threads = Bool(False, config=True,
133 use_threads = Bool(False, config=True,
135 help='Use threads instead of processes for the schedulers',
134 help='Use threads instead of processes for the schedulers',
136 )
135 )
137
136
138 # internal
137 # internal
139 children = List()
138 children = List()
140 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
139 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
141
140
142 def _use_threads_changed(self, name, old, new):
141 def _use_threads_changed(self, name, old, new):
143 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
142 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
144
143
145 aliases = Dict(dict(
144 aliases = Dict(dict(
146 log_level = 'IPControllerApp.log_level',
145 log_level = 'IPControllerApp.log_level',
147 log_url = 'IPControllerApp.log_url',
146 log_url = 'IPControllerApp.log_url',
148 reuse_files = 'IPControllerApp.reuse_files',
147 reuse_files = 'IPControllerApp.reuse_files',
149 secure = 'IPControllerApp.secure',
148 secure = 'IPControllerApp.secure',
150 ssh = 'IPControllerApp.ssh_server',
149 ssh = 'IPControllerApp.ssh_server',
151 use_threads = 'IPControllerApp.use_threads',
150 use_threads = 'IPControllerApp.use_threads',
152 import_statements = 'IPControllerApp.import_statements',
151 import_statements = 'IPControllerApp.import_statements',
153 location = 'IPControllerApp.location',
152 location = 'IPControllerApp.location',
154
153
155 ident = 'StreamSession.session',
154 ident = 'StreamSession.session',
156 user = 'StreamSession.username',
155 user = 'StreamSession.username',
157 exec_key = 'StreamSession.keyfile',
156 exec_key = 'StreamSession.keyfile',
158
157
159 url = 'HubFactory.url',
158 url = 'HubFactory.url',
160 ip = 'HubFactory.ip',
159 ip = 'HubFactory.ip',
161 transport = 'HubFactory.transport',
160 transport = 'HubFactory.transport',
162 port = 'HubFactory.regport',
161 port = 'HubFactory.regport',
163
162
164 ping = 'HeartMonitor.period',
163 ping = 'HeartMonitor.period',
165
164
166 scheme = 'TaskScheduler.scheme_name',
165 scheme = 'TaskScheduler.scheme_name',
167 hwm = 'TaskScheduler.hwm',
166 hwm = 'TaskScheduler.hwm',
168
167
169
168
170 profile = "BaseIPythonApplication.profile",
169 profile = "BaseIPythonApplication.profile",
171 profile_dir = 'ProfileDir.location',
170 profile_dir = 'ProfileDir.location',
172
171
173 ))
172 ))
174 flags = Dict(flags)
173 flags = Dict(flags)
175
174
176
175
177 def save_connection_dict(self, fname, cdict):
176 def save_connection_dict(self, fname, cdict):
178 """save a connection dict to json file."""
177 """save a connection dict to json file."""
179 c = self.config
178 c = self.config
180 url = cdict['url']
179 url = cdict['url']
181 location = cdict['location']
180 location = cdict['location']
182 if not location:
181 if not location:
183 try:
182 try:
184 proto,ip,port = split_url(url)
183 proto,ip,port = split_url(url)
185 except AssertionError:
184 except AssertionError:
186 pass
185 pass
187 else:
186 else:
188 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
187 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
189 cdict['location'] = location
188 cdict['location'] = location
190 fname = os.path.join(self.profile_dir.security_dir, fname)
189 fname = os.path.join(self.profile_dir.security_dir, fname)
191 with open(fname, 'w') as f:
190 with open(fname, 'w') as f:
192 f.write(json.dumps(cdict, indent=2))
191 f.write(json.dumps(cdict, indent=2))
193 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
192 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
194
193
195 def load_config_from_json(self):
194 def load_config_from_json(self):
196 """load config from existing json connector files."""
195 """load config from existing json connector files."""
197 c = self.config
196 c = self.config
198 # load from engine config
197 # load from engine config
199 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
198 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
200 cfg = json.loads(f.read())
199 cfg = json.loads(f.read())
201 key = c.StreamSession.key = cfg['exec_key']
200 key = c.StreamSession.key = cfg['exec_key']
202 xport,addr = cfg['url'].split('://')
201 xport,addr = cfg['url'].split('://')
203 c.HubFactory.engine_transport = xport
202 c.HubFactory.engine_transport = xport
204 ip,ports = addr.split(':')
203 ip,ports = addr.split(':')
205 c.HubFactory.engine_ip = ip
204 c.HubFactory.engine_ip = ip
206 c.HubFactory.regport = int(ports)
205 c.HubFactory.regport = int(ports)
207 self.location = cfg['location']
206 self.location = cfg['location']
208
207
209 # load client config
208 # load client config
210 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
209 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
211 cfg = json.loads(f.read())
210 cfg = json.loads(f.read())
212 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
211 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
213 xport,addr = cfg['url'].split('://')
212 xport,addr = cfg['url'].split('://')
214 c.HubFactory.client_transport = xport
213 c.HubFactory.client_transport = xport
215 ip,ports = addr.split(':')
214 ip,ports = addr.split(':')
216 c.HubFactory.client_ip = ip
215 c.HubFactory.client_ip = ip
217 self.ssh_server = cfg['ssh']
216 self.ssh_server = cfg['ssh']
218 assert int(ports) == c.HubFactory.regport, "regport mismatch"
217 assert int(ports) == c.HubFactory.regport, "regport mismatch"
219
218
220 def init_hub(self):
219 def init_hub(self):
221 c = self.config
220 c = self.config
222
221
223 self.do_import_statements()
222 self.do_import_statements()
224 reusing = self.reuse_files
223 reusing = self.reuse_files
225 if reusing:
224 if reusing:
226 try:
225 try:
227 self.load_config_from_json()
226 self.load_config_from_json()
228 except (AssertionError,IOError):
227 except (AssertionError,IOError):
229 reusing=False
228 reusing=False
230 # check again, because reusing may have failed:
229 # check again, because reusing may have failed:
231 if reusing:
230 if reusing:
232 pass
231 pass
233 elif self.secure:
232 elif self.secure:
234 key = str(uuid.uuid4())
233 key = str(uuid.uuid4())
235 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
234 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
236 # with open(keyfile, 'w') as f:
235 # with open(keyfile, 'w') as f:
237 # f.write(key)
236 # f.write(key)
238 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
237 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
239 c.StreamSession.key = key
238 c.StreamSession.key = key
240 else:
239 else:
241 key = c.StreamSession.key = ''
240 key = c.StreamSession.key = ''
242
241
243 try:
242 try:
244 self.factory = HubFactory(config=c, log=self.log)
243 self.factory = HubFactory(config=c, log=self.log)
245 # self.start_logging()
244 # self.start_logging()
246 self.factory.init_hub()
245 self.factory.init_hub()
247 except:
246 except:
248 self.log.error("Couldn't construct the Controller", exc_info=True)
247 self.log.error("Couldn't construct the Controller", exc_info=True)
249 self.exit(1)
248 self.exit(1)
250
249
251 if not reusing:
250 if not reusing:
252 # save to new json config files
251 # save to new json config files
253 f = self.factory
252 f = self.factory
254 cdict = {'exec_key' : key,
253 cdict = {'exec_key' : key,
255 'ssh' : self.ssh_server,
254 'ssh' : self.ssh_server,
256 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
255 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
257 'location' : self.location
256 'location' : self.location
258 }
257 }
259 self.save_connection_dict('ipcontroller-client.json', cdict)
258 self.save_connection_dict('ipcontroller-client.json', cdict)
260 edict = cdict
259 edict = cdict
261 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
260 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
262 self.save_connection_dict('ipcontroller-engine.json', edict)
261 self.save_connection_dict('ipcontroller-engine.json', edict)
263
262
264 #
263 #
265 def init_schedulers(self):
264 def init_schedulers(self):
266 children = self.children
265 children = self.children
267 mq = import_item(str(self.mq_class))
266 mq = import_item(str(self.mq_class))
268
267
269 hub = self.factory
268 hub = self.factory
270 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
269 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
271 # IOPub relay (in a Process)
270 # IOPub relay (in a Process)
272 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
271 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
273 q.bind_in(hub.client_info['iopub'])
272 q.bind_in(hub.client_info['iopub'])
274 q.bind_out(hub.engine_info['iopub'])
273 q.bind_out(hub.engine_info['iopub'])
275 q.setsockopt_out(zmq.SUBSCRIBE, '')
274 q.setsockopt_out(zmq.SUBSCRIBE, '')
276 q.connect_mon(hub.monitor_url)
275 q.connect_mon(hub.monitor_url)
277 q.daemon=True
276 q.daemon=True
278 children.append(q)
277 children.append(q)
279
278
280 # Multiplexer Queue (in a Process)
279 # Multiplexer Queue (in a Process)
281 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
280 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
282 q.bind_in(hub.client_info['mux'])
281 q.bind_in(hub.client_info['mux'])
283 q.setsockopt_in(zmq.IDENTITY, 'mux')
282 q.setsockopt_in(zmq.IDENTITY, 'mux')
284 q.bind_out(hub.engine_info['mux'])
283 q.bind_out(hub.engine_info['mux'])
285 q.connect_mon(hub.monitor_url)
284 q.connect_mon(hub.monitor_url)
286 q.daemon=True
285 q.daemon=True
287 children.append(q)
286 children.append(q)
288
287
289 # Control Queue (in a Process)
288 # Control Queue (in a Process)
290 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
289 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
291 q.bind_in(hub.client_info['control'])
290 q.bind_in(hub.client_info['control'])
292 q.setsockopt_in(zmq.IDENTITY, 'control')
291 q.setsockopt_in(zmq.IDENTITY, 'control')
293 q.bind_out(hub.engine_info['control'])
292 q.bind_out(hub.engine_info['control'])
294 q.connect_mon(hub.monitor_url)
293 q.connect_mon(hub.monitor_url)
295 q.daemon=True
294 q.daemon=True
296 children.append(q)
295 children.append(q)
297 try:
296 try:
298 scheme = self.config.TaskScheduler.scheme_name
297 scheme = self.config.TaskScheduler.scheme_name
299 except AttributeError:
298 except AttributeError:
300 scheme = TaskScheduler.scheme_name.get_default_value()
299 scheme = TaskScheduler.scheme_name.get_default_value()
301 # Task Queue (in a Process)
300 # Task Queue (in a Process)
302 if scheme == 'pure':
301 if scheme == 'pure':
303 self.log.warn("task::using pure XREQ Task scheduler")
302 self.log.warn("task::using pure XREQ Task scheduler")
304 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
303 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
305 # q.setsockopt_out(zmq.HWM, hub.hwm)
304 # q.setsockopt_out(zmq.HWM, hub.hwm)
306 q.bind_in(hub.client_info['task'][1])
305 q.bind_in(hub.client_info['task'][1])
307 q.setsockopt_in(zmq.IDENTITY, 'task')
306 q.setsockopt_in(zmq.IDENTITY, 'task')
308 q.bind_out(hub.engine_info['task'])
307 q.bind_out(hub.engine_info['task'])
309 q.connect_mon(hub.monitor_url)
308 q.connect_mon(hub.monitor_url)
310 q.daemon=True
309 q.daemon=True
311 children.append(q)
310 children.append(q)
312 elif scheme == 'none':
311 elif scheme == 'none':
313 self.log.warn("task::using no Task scheduler")
312 self.log.warn("task::using no Task scheduler")
314
313
315 else:
314 else:
316 self.log.info("task::using Python %s Task scheduler"%scheme)
315 self.log.info("task::using Python %s Task scheduler"%scheme)
317 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
316 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
318 hub.monitor_url, hub.client_info['notification'])
317 hub.monitor_url, hub.client_info['notification'])
319 kwargs = dict(logname='scheduler', loglevel=self.log_level,
318 kwargs = dict(logname='scheduler', loglevel=self.log_level,
320 log_url = self.log_url, config=dict(self.config))
319 log_url = self.log_url, config=dict(self.config))
321 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
320 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
322 q.daemon=True
321 q.daemon=True
323 children.append(q)
322 children.append(q)
324
323
325
324
326 def save_urls(self):
325 def save_urls(self):
327 """save the registration urls to files."""
326 """save the registration urls to files."""
328 c = self.config
327 c = self.config
329
328
330 sec_dir = self.profile_dir.security_dir
329 sec_dir = self.profile_dir.security_dir
331 cf = self.factory
330 cf = self.factory
332
331
333 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
332 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
334 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
333 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
335
334
336 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
335 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
337 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
336 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
338
337
339
338
340 def do_import_statements(self):
339 def do_import_statements(self):
341 statements = self.import_statements
340 statements = self.import_statements
342 for s in statements:
341 for s in statements:
343 try:
342 try:
344 self.log.msg("Executing statement: '%s'" % s)
343 self.log.msg("Executing statement: '%s'" % s)
345 exec s in globals(), locals()
344 exec s in globals(), locals()
346 except:
345 except:
347 self.log.msg("Error running statement: %s" % s)
346 self.log.msg("Error running statement: %s" % s)
348
347
349 def forward_logging(self):
348 def forward_logging(self):
350 if self.log_url:
349 if self.log_url:
351 self.log.info("Forwarding logging to %s"%self.log_url)
350 self.log.info("Forwarding logging to %s"%self.log_url)
352 context = zmq.Context.instance()
351 context = zmq.Context.instance()
353 lsock = context.socket(zmq.PUB)
352 lsock = context.socket(zmq.PUB)
354 lsock.connect(self.log_url)
353 lsock.connect(self.log_url)
355 handler = PUBHandler(lsock)
354 handler = PUBHandler(lsock)
356 self.log.removeHandler(self._log_handler)
355 self.log.removeHandler(self._log_handler)
357 handler.root_topic = 'controller'
356 handler.root_topic = 'controller'
358 handler.setLevel(self.log_level)
357 handler.setLevel(self.log_level)
359 self.log.addHandler(handler)
358 self.log.addHandler(handler)
360 self._log_handler = handler
359 self._log_handler = handler
361 # #
360 # #
362
361
363 def initialize(self, argv=None):
362 def initialize(self, argv=None):
364 super(IPControllerApp, self).initialize(argv)
363 super(IPControllerApp, self).initialize(argv)
365 self.forward_logging()
364 self.forward_logging()
366 self.init_hub()
365 self.init_hub()
367 self.init_schedulers()
366 self.init_schedulers()
368
367
369 def start(self):
368 def start(self):
370 # Start the subprocesses:
369 # Start the subprocesses:
371 self.factory.start()
370 self.factory.start()
372 child_procs = []
371 child_procs = []
373 for child in self.children:
372 for child in self.children:
374 child.start()
373 child.start()
375 if isinstance(child, ProcessMonitoredQueue):
374 if isinstance(child, ProcessMonitoredQueue):
376 child_procs.append(child.launcher)
375 child_procs.append(child.launcher)
377 elif isinstance(child, Process):
376 elif isinstance(child, Process):
378 child_procs.append(child)
377 child_procs.append(child)
379 if child_procs:
378 if child_procs:
380 signal_children(child_procs)
379 signal_children(child_procs)
381
380
382 self.write_pid_file(overwrite=True)
381 self.write_pid_file(overwrite=True)
383
382
384 try:
383 try:
385 self.factory.loop.start()
384 self.factory.loop.start()
386 except KeyboardInterrupt:
385 except KeyboardInterrupt:
387 self.log.critical("Interrupted, Exiting...\n")
386 self.log.critical("Interrupted, Exiting...\n")
388
387
389
388
390
389
391 def launch_new_instance():
390 def launch_new_instance():
392 """Create and run the IPython controller"""
391 """Create and run the IPython controller"""
393 app = IPControllerApp()
392 app = IPControllerApp()
394 app.initialize()
393 app.initialize()
395 app.start()
394 app.start()
396
395
397
396
398 if __name__ == '__main__':
397 if __name__ == '__main__':
399 launch_new_instance()
398 launch_new_instance()
General Comments 0
You need to be logged in to leave comments. Login now