##// END OF EJS Templates
prevent infinite Controllers on Windows...
MinRK -
Show More
@@ -1,414 +1,427 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 from __future__ import with_statement
24 from __future__ import with_statement
25
25
26 import os
26 import os
27 import socket
27 import socket
28 import stat
28 import stat
29 import sys
29 import sys
30 import uuid
30 import uuid
31
31
32 from multiprocessing import Process
32 from multiprocessing import Process
33
33
34 import zmq
34 import zmq
35 from zmq.devices import ProcessMonitoredQueue
35 from zmq.devices import ProcessMonitoredQueue
36 from zmq.log.handlers import PUBHandler
36 from zmq.log.handlers import PUBHandler
37 from zmq.utils import jsonapi as json
37 from zmq.utils import jsonapi as json
38
38
39 from IPython.config.application import boolean_flag
39 from IPython.config.application import boolean_flag
40 from IPython.core.profiledir import ProfileDir
40 from IPython.core.profiledir import ProfileDir
41
41
42 from IPython.parallel.apps.baseapp import (
42 from IPython.parallel.apps.baseapp import (
43 BaseParallelApplication,
43 BaseParallelApplication,
44 base_flags
44 base_flags
45 )
45 )
46 from IPython.utils.importstring import import_item
46 from IPython.utils.importstring import import_item
47 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
47 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
48
48
49 # from IPython.parallel.controller.controller import ControllerFactory
49 # from IPython.parallel.controller.controller import ControllerFactory
50 from IPython.zmq.session import Session
50 from IPython.zmq.session import Session
51 from IPython.parallel.controller.heartmonitor import HeartMonitor
51 from IPython.parallel.controller.heartmonitor import HeartMonitor
52 from IPython.parallel.controller.hub import HubFactory
52 from IPython.parallel.controller.hub import HubFactory
53 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
53 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
54 from IPython.parallel.controller.sqlitedb import SQLiteDB
54 from IPython.parallel.controller.sqlitedb import SQLiteDB
55
55
56 from IPython.parallel.util import signal_children, split_url
56 from IPython.parallel.util import signal_children, split_url
57
57
58 # conditional import of MongoDB backend class
58 # conditional import of MongoDB backend class
59
59
60 try:
60 try:
61 from IPython.parallel.controller.mongodb import MongoDB
61 from IPython.parallel.controller.mongodb import MongoDB
62 except ImportError:
62 except ImportError:
63 maybe_mongo = []
63 maybe_mongo = []
64 else:
64 else:
65 maybe_mongo = [MongoDB]
65 maybe_mongo = [MongoDB]
66
66
67
67
68 #-----------------------------------------------------------------------------
68 #-----------------------------------------------------------------------------
69 # Module level variables
69 # Module level variables
70 #-----------------------------------------------------------------------------
70 #-----------------------------------------------------------------------------
71
71
72
72
73 #: The default config file name for this application
73 #: The default config file name for this application
74 default_config_file_name = u'ipcontroller_config.py'
74 default_config_file_name = u'ipcontroller_config.py'
75
75
76
76
77 _description = """Start the IPython controller for parallel computing.
77 _description = """Start the IPython controller for parallel computing.
78
78
79 The IPython controller provides a gateway between the IPython engines and
79 The IPython controller provides a gateway between the IPython engines and
80 clients. The controller needs to be started before the engines and can be
80 clients. The controller needs to be started before the engines and can be
81 configured using command line options or using a cluster directory. Cluster
81 configured using command line options or using a cluster directory. Cluster
82 directories contain config, log and security files and are usually located in
82 directories contain config, log and security files and are usually located in
83 your ipython directory and named as "profile_name". See the `profile`
83 your ipython directory and named as "profile_name". See the `profile`
84 and `profile_dir` options for details.
84 and `profile_dir` options for details.
85 """
85 """
86
86
87
87
88
88
89
89
90 #-----------------------------------------------------------------------------
90 #-----------------------------------------------------------------------------
91 # The main application
91 # The main application
92 #-----------------------------------------------------------------------------
92 #-----------------------------------------------------------------------------
93 flags = {}
93 flags = {}
94 flags.update(base_flags)
94 flags.update(base_flags)
95 flags.update({
95 flags.update({
96 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
96 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
97 'Use threads instead of processes for the schedulers'),
97 'Use threads instead of processes for the schedulers'),
98 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
98 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
99 'use the SQLiteDB backend'),
99 'use the SQLiteDB backend'),
100 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
100 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
101 'use the MongoDB backend'),
101 'use the MongoDB backend'),
102 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
102 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
103 'use the in-memory DictDB backend'),
103 'use the in-memory DictDB backend'),
104 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
104 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
105 'reuse existing json connection files')
105 'reuse existing json connection files')
106 })
106 })
107
107
108 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
108 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
109 "Use HMAC digests for authentication of messages.",
109 "Use HMAC digests for authentication of messages.",
110 "Don't authenticate messages."
110 "Don't authenticate messages."
111 ))
111 ))
112
112
113 class IPControllerApp(BaseParallelApplication):
113 class IPControllerApp(BaseParallelApplication):
114
114
115 name = u'ipcontroller'
115 name = u'ipcontroller'
116 description = _description
116 description = _description
117 config_file_name = Unicode(default_config_file_name)
117 config_file_name = Unicode(default_config_file_name)
118 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
118 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
119
119
120 # change default to True
120 # change default to True
121 auto_create = Bool(True, config=True,
121 auto_create = Bool(True, config=True,
122 help="""Whether to create profile dir if it doesn't exist.""")
122 help="""Whether to create profile dir if it doesn't exist.""")
123
123
124 reuse_files = Bool(False, config=True,
124 reuse_files = Bool(False, config=True,
125 help='Whether to reuse existing json connection files.'
125 help='Whether to reuse existing json connection files.'
126 )
126 )
127 secure = Bool(True, config=True,
127 secure = Bool(True, config=True,
128 help='Whether to use HMAC digests for extra message authentication.'
128 help='Whether to use HMAC digests for extra message authentication.'
129 )
129 )
130 ssh_server = Unicode(u'', config=True,
130 ssh_server = Unicode(u'', config=True,
131 help="""ssh url for clients to use when connecting to the Controller
131 help="""ssh url for clients to use when connecting to the Controller
132 processes. It should be of the form: [user@]server[:port]. The
132 processes. It should be of the form: [user@]server[:port]. The
133 Controller's listening addresses must be accessible from the ssh server""",
133 Controller's listening addresses must be accessible from the ssh server""",
134 )
134 )
135 location = Unicode(u'', config=True,
135 location = Unicode(u'', config=True,
136 help="""The external IP or domain name of the Controller, used for disambiguating
136 help="""The external IP or domain name of the Controller, used for disambiguating
137 engine and client connections.""",
137 engine and client connections.""",
138 )
138 )
139 import_statements = List([], config=True,
139 import_statements = List([], config=True,
140 help="import statements to be run at startup. Necessary in some environments"
140 help="import statements to be run at startup. Necessary in some environments"
141 )
141 )
142
142
143 use_threads = Bool(False, config=True,
143 use_threads = Bool(False, config=True,
144 help='Use threads instead of processes for the schedulers',
144 help='Use threads instead of processes for the schedulers',
145 )
145 )
146
146
147 # internal
147 # internal
148 children = List()
148 children = List()
149 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
149 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
150
150
151 def _use_threads_changed(self, name, old, new):
151 def _use_threads_changed(self, name, old, new):
152 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
152 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
153
153
154 aliases = Dict(dict(
154 aliases = Dict(dict(
155 log_level = 'IPControllerApp.log_level',
155 log_level = 'IPControllerApp.log_level',
156 log_url = 'IPControllerApp.log_url',
156 log_url = 'IPControllerApp.log_url',
157 reuse_files = 'IPControllerApp.reuse_files',
157 reuse_files = 'IPControllerApp.reuse_files',
158 secure = 'IPControllerApp.secure',
158 secure = 'IPControllerApp.secure',
159 ssh = 'IPControllerApp.ssh_server',
159 ssh = 'IPControllerApp.ssh_server',
160 use_threads = 'IPControllerApp.use_threads',
160 use_threads = 'IPControllerApp.use_threads',
161 import_statements = 'IPControllerApp.import_statements',
161 import_statements = 'IPControllerApp.import_statements',
162 location = 'IPControllerApp.location',
162 location = 'IPControllerApp.location',
163
163
164 ident = 'Session.session',
164 ident = 'Session.session',
165 user = 'Session.username',
165 user = 'Session.username',
166 exec_key = 'Session.keyfile',
166 exec_key = 'Session.keyfile',
167
167
168 url = 'HubFactory.url',
168 url = 'HubFactory.url',
169 ip = 'HubFactory.ip',
169 ip = 'HubFactory.ip',
170 transport = 'HubFactory.transport',
170 transport = 'HubFactory.transport',
171 port = 'HubFactory.regport',
171 port = 'HubFactory.regport',
172
172
173 ping = 'HeartMonitor.period',
173 ping = 'HeartMonitor.period',
174
174
175 scheme = 'TaskScheduler.scheme_name',
175 scheme = 'TaskScheduler.scheme_name',
176 hwm = 'TaskScheduler.hwm',
176 hwm = 'TaskScheduler.hwm',
177
177
178
178
179 profile = "BaseIPythonApplication.profile",
179 profile = "BaseIPythonApplication.profile",
180 profile_dir = 'ProfileDir.location',
180 profile_dir = 'ProfileDir.location',
181
181
182 ))
182 ))
183 flags = Dict(flags)
183 flags = Dict(flags)
184
184
185
185
186 def save_connection_dict(self, fname, cdict):
186 def save_connection_dict(self, fname, cdict):
187 """save a connection dict to json file."""
187 """save a connection dict to json file."""
188 c = self.config
188 c = self.config
189 url = cdict['url']
189 url = cdict['url']
190 location = cdict['location']
190 location = cdict['location']
191 if not location:
191 if not location:
192 try:
192 try:
193 proto,ip,port = split_url(url)
193 proto,ip,port = split_url(url)
194 except AssertionError:
194 except AssertionError:
195 pass
195 pass
196 else:
196 else:
197 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
197 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
198 cdict['location'] = location
198 cdict['location'] = location
199 fname = os.path.join(self.profile_dir.security_dir, fname)
199 fname = os.path.join(self.profile_dir.security_dir, fname)
200 with open(fname, 'w') as f:
200 with open(fname, 'w') as f:
201 f.write(json.dumps(cdict, indent=2))
201 f.write(json.dumps(cdict, indent=2))
202 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
202 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
203
203
204 def load_config_from_json(self):
204 def load_config_from_json(self):
205 """load config from existing json connector files."""
205 """load config from existing json connector files."""
206 c = self.config
206 c = self.config
207 # load from engine config
207 # load from engine config
208 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
208 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
209 cfg = json.loads(f.read())
209 cfg = json.loads(f.read())
210 key = c.Session.key = cfg['exec_key']
210 key = c.Session.key = cfg['exec_key']
211 xport,addr = cfg['url'].split('://')
211 xport,addr = cfg['url'].split('://')
212 c.HubFactory.engine_transport = xport
212 c.HubFactory.engine_transport = xport
213 ip,ports = addr.split(':')
213 ip,ports = addr.split(':')
214 c.HubFactory.engine_ip = ip
214 c.HubFactory.engine_ip = ip
215 c.HubFactory.regport = int(ports)
215 c.HubFactory.regport = int(ports)
216 self.location = cfg['location']
216 self.location = cfg['location']
217
217
218 # load client config
218 # load client config
219 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
219 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
220 cfg = json.loads(f.read())
220 cfg = json.loads(f.read())
221 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
221 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
222 xport,addr = cfg['url'].split('://')
222 xport,addr = cfg['url'].split('://')
223 c.HubFactory.client_transport = xport
223 c.HubFactory.client_transport = xport
224 ip,ports = addr.split(':')
224 ip,ports = addr.split(':')
225 c.HubFactory.client_ip = ip
225 c.HubFactory.client_ip = ip
226 self.ssh_server = cfg['ssh']
226 self.ssh_server = cfg['ssh']
227 assert int(ports) == c.HubFactory.regport, "regport mismatch"
227 assert int(ports) == c.HubFactory.regport, "regport mismatch"
228
228
229 def init_hub(self):
229 def init_hub(self):
230 c = self.config
230 c = self.config
231
231
232 self.do_import_statements()
232 self.do_import_statements()
233 reusing = self.reuse_files
233 reusing = self.reuse_files
234 if reusing:
234 if reusing:
235 try:
235 try:
236 self.load_config_from_json()
236 self.load_config_from_json()
237 except (AssertionError,IOError):
237 except (AssertionError,IOError):
238 reusing=False
238 reusing=False
239 # check again, because reusing may have failed:
239 # check again, because reusing may have failed:
240 if reusing:
240 if reusing:
241 pass
241 pass
242 elif self.secure:
242 elif self.secure:
243 key = str(uuid.uuid4())
243 key = str(uuid.uuid4())
244 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
244 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
245 # with open(keyfile, 'w') as f:
245 # with open(keyfile, 'w') as f:
246 # f.write(key)
246 # f.write(key)
247 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
247 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
248 c.Session.key = key
248 c.Session.key = key
249 else:
249 else:
250 key = c.Session.key = ''
250 key = c.Session.key = ''
251
251
252 try:
252 try:
253 self.factory = HubFactory(config=c, log=self.log)
253 self.factory = HubFactory(config=c, log=self.log)
254 # self.start_logging()
254 # self.start_logging()
255 self.factory.init_hub()
255 self.factory.init_hub()
256 except:
256 except:
257 self.log.error("Couldn't construct the Controller", exc_info=True)
257 self.log.error("Couldn't construct the Controller", exc_info=True)
258 self.exit(1)
258 self.exit(1)
259
259
260 if not reusing:
260 if not reusing:
261 # save to new json config files
261 # save to new json config files
262 f = self.factory
262 f = self.factory
263 cdict = {'exec_key' : key,
263 cdict = {'exec_key' : key,
264 'ssh' : self.ssh_server,
264 'ssh' : self.ssh_server,
265 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
265 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
266 'location' : self.location
266 'location' : self.location
267 }
267 }
268 self.save_connection_dict('ipcontroller-client.json', cdict)
268 self.save_connection_dict('ipcontroller-client.json', cdict)
269 edict = cdict
269 edict = cdict
270 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
270 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
271 self.save_connection_dict('ipcontroller-engine.json', edict)
271 self.save_connection_dict('ipcontroller-engine.json', edict)
272
272
273 #
273 #
274 def init_schedulers(self):
274 def init_schedulers(self):
275 children = self.children
275 children = self.children
276 mq = import_item(str(self.mq_class))
276 mq = import_item(str(self.mq_class))
277
277
278 hub = self.factory
278 hub = self.factory
279 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
279 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
280 # IOPub relay (in a Process)
280 # IOPub relay (in a Process)
281 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
281 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
282 q.bind_in(hub.client_info['iopub'])
282 q.bind_in(hub.client_info['iopub'])
283 q.bind_out(hub.engine_info['iopub'])
283 q.bind_out(hub.engine_info['iopub'])
284 q.setsockopt_out(zmq.SUBSCRIBE, '')
284 q.setsockopt_out(zmq.SUBSCRIBE, '')
285 q.connect_mon(hub.monitor_url)
285 q.connect_mon(hub.monitor_url)
286 q.daemon=True
286 q.daemon=True
287 children.append(q)
287 children.append(q)
288
288
289 # Multiplexer Queue (in a Process)
289 # Multiplexer Queue (in a Process)
290 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
290 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
291 q.bind_in(hub.client_info['mux'])
291 q.bind_in(hub.client_info['mux'])
292 q.setsockopt_in(zmq.IDENTITY, 'mux')
292 q.setsockopt_in(zmq.IDENTITY, 'mux')
293 q.bind_out(hub.engine_info['mux'])
293 q.bind_out(hub.engine_info['mux'])
294 q.connect_mon(hub.monitor_url)
294 q.connect_mon(hub.monitor_url)
295 q.daemon=True
295 q.daemon=True
296 children.append(q)
296 children.append(q)
297
297
298 # Control Queue (in a Process)
298 # Control Queue (in a Process)
299 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
299 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
300 q.bind_in(hub.client_info['control'])
300 q.bind_in(hub.client_info['control'])
301 q.setsockopt_in(zmq.IDENTITY, 'control')
301 q.setsockopt_in(zmq.IDENTITY, 'control')
302 q.bind_out(hub.engine_info['control'])
302 q.bind_out(hub.engine_info['control'])
303 q.connect_mon(hub.monitor_url)
303 q.connect_mon(hub.monitor_url)
304 q.daemon=True
304 q.daemon=True
305 children.append(q)
305 children.append(q)
306 try:
306 try:
307 scheme = self.config.TaskScheduler.scheme_name
307 scheme = self.config.TaskScheduler.scheme_name
308 except AttributeError:
308 except AttributeError:
309 scheme = TaskScheduler.scheme_name.get_default_value()
309 scheme = TaskScheduler.scheme_name.get_default_value()
310 # Task Queue (in a Process)
310 # Task Queue (in a Process)
311 if scheme == 'pure':
311 if scheme == 'pure':
312 self.log.warn("task::using pure XREQ Task scheduler")
312 self.log.warn("task::using pure XREQ Task scheduler")
313 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
313 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
314 # q.setsockopt_out(zmq.HWM, hub.hwm)
314 # q.setsockopt_out(zmq.HWM, hub.hwm)
315 q.bind_in(hub.client_info['task'][1])
315 q.bind_in(hub.client_info['task'][1])
316 q.setsockopt_in(zmq.IDENTITY, 'task')
316 q.setsockopt_in(zmq.IDENTITY, 'task')
317 q.bind_out(hub.engine_info['task'])
317 q.bind_out(hub.engine_info['task'])
318 q.connect_mon(hub.monitor_url)
318 q.connect_mon(hub.monitor_url)
319 q.daemon=True
319 q.daemon=True
320 children.append(q)
320 children.append(q)
321 elif scheme == 'none':
321 elif scheme == 'none':
322 self.log.warn("task::using no Task scheduler")
322 self.log.warn("task::using no Task scheduler")
323
323
324 else:
324 else:
325 self.log.info("task::using Python %s Task scheduler"%scheme)
325 self.log.info("task::using Python %s Task scheduler"%scheme)
326 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
326 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
327 hub.monitor_url, hub.client_info['notification'])
327 hub.monitor_url, hub.client_info['notification'])
328 kwargs = dict(logname='scheduler', loglevel=self.log_level,
328 kwargs = dict(logname='scheduler', loglevel=self.log_level,
329 log_url = self.log_url, config=dict(self.config))
329 log_url = self.log_url, config=dict(self.config))
330 if 'Process' in self.mq_class:
330 if 'Process' in self.mq_class:
331 # run the Python scheduler in a Process
331 # run the Python scheduler in a Process
332 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
332 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
333 q.daemon=True
333 q.daemon=True
334 children.append(q)
334 children.append(q)
335 else:
335 else:
336 # single-threaded Controller
336 # single-threaded Controller
337 kwargs['in_thread'] = True
337 kwargs['in_thread'] = True
338 launch_scheduler(*sargs, **kwargs)
338 launch_scheduler(*sargs, **kwargs)
339
339
340
340
341 def save_urls(self):
341 def save_urls(self):
342 """save the registration urls to files."""
342 """save the registration urls to files."""
343 c = self.config
343 c = self.config
344
344
345 sec_dir = self.profile_dir.security_dir
345 sec_dir = self.profile_dir.security_dir
346 cf = self.factory
346 cf = self.factory
347
347
348 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
348 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
349 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
349 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
350
350
351 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
351 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
352 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
352 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
353
353
354
354
355 def do_import_statements(self):
355 def do_import_statements(self):
356 statements = self.import_statements
356 statements = self.import_statements
357 for s in statements:
357 for s in statements:
358 try:
358 try:
359 self.log.msg("Executing statement: '%s'" % s)
359 self.log.msg("Executing statement: '%s'" % s)
360 exec s in globals(), locals()
360 exec s in globals(), locals()
361 except:
361 except:
362 self.log.msg("Error running statement: %s" % s)
362 self.log.msg("Error running statement: %s" % s)
363
363
364 def forward_logging(self):
364 def forward_logging(self):
365 if self.log_url:
365 if self.log_url:
366 self.log.info("Forwarding logging to %s"%self.log_url)
366 self.log.info("Forwarding logging to %s"%self.log_url)
367 context = zmq.Context.instance()
367 context = zmq.Context.instance()
368 lsock = context.socket(zmq.PUB)
368 lsock = context.socket(zmq.PUB)
369 lsock.connect(self.log_url)
369 lsock.connect(self.log_url)
370 handler = PUBHandler(lsock)
370 handler = PUBHandler(lsock)
371 self.log.removeHandler(self._log_handler)
371 self.log.removeHandler(self._log_handler)
372 handler.root_topic = 'controller'
372 handler.root_topic = 'controller'
373 handler.setLevel(self.log_level)
373 handler.setLevel(self.log_level)
374 self.log.addHandler(handler)
374 self.log.addHandler(handler)
375 self._log_handler = handler
375 self._log_handler = handler
376 # #
376 # #
377
377
378 def initialize(self, argv=None):
378 def initialize(self, argv=None):
379 super(IPControllerApp, self).initialize(argv)
379 super(IPControllerApp, self).initialize(argv)
380 self.forward_logging()
380 self.forward_logging()
381 self.init_hub()
381 self.init_hub()
382 self.init_schedulers()
382 self.init_schedulers()
383
383
384 def start(self):
384 def start(self):
385 # Start the subprocesses:
385 # Start the subprocesses:
386 self.factory.start()
386 self.factory.start()
387 child_procs = []
387 child_procs = []
388 for child in self.children:
388 for child in self.children:
389 child.start()
389 child.start()
390 if isinstance(child, ProcessMonitoredQueue):
390 if isinstance(child, ProcessMonitoredQueue):
391 child_procs.append(child.launcher)
391 child_procs.append(child.launcher)
392 elif isinstance(child, Process):
392 elif isinstance(child, Process):
393 child_procs.append(child)
393 child_procs.append(child)
394 if child_procs:
394 if child_procs:
395 signal_children(child_procs)
395 signal_children(child_procs)
396
396
397 self.write_pid_file(overwrite=True)
397 self.write_pid_file(overwrite=True)
398
398
399 try:
399 try:
400 self.factory.loop.start()
400 self.factory.loop.start()
401 except KeyboardInterrupt:
401 except KeyboardInterrupt:
402 self.log.critical("Interrupted, Exiting...\n")
402 self.log.critical("Interrupted, Exiting...\n")
403
403
404
404
405
405
406 def launch_new_instance():
406 def launch_new_instance():
407 """Create and run the IPython controller"""
407 """Create and run the IPython controller"""
408 if sys.platform == 'win32':
409 # make sure we don't get called from a multiprocessing subprocess
410 # this can result in infinite Controllers being started on Windows
411 # which doesn't have a proper fork, so multiprocessing is wonky
412
413 # this only comes up when IPython has been installed using vanilla
414 # setuptools, and *not* distribute.
415 import inspect
416 for record in inspect.stack():
417 frame = record[0]
418 if frame.f_locals.get('__name__') == '__parents_main__':
419 # we are a subprocess, don't start another Controller!
420 return
408 app = IPControllerApp.instance()
421 app = IPControllerApp.instance()
409 app.initialize()
422 app.initialize()
410 app.start()
423 app.start()
411
424
412
425
413 if __name__ == '__main__':
426 if __name__ == '__main__':
414 launch_new_instance()
427 launch_new_instance()
General Comments 0
You need to be logged in to leave comments. Login now