Show More
@@ -75,6 +75,7 b' base_aliases.update({' | |||||
75 | 'log-to-file' : 'BaseParallelApplication.log_to_file', |
|
75 | 'log-to-file' : 'BaseParallelApplication.log_to_file', | |
76 | 'clean-logs' : 'BaseParallelApplication.clean_logs', |
|
76 | 'clean-logs' : 'BaseParallelApplication.clean_logs', | |
77 | 'log-url' : 'BaseParallelApplication.log_url', |
|
77 | 'log-url' : 'BaseParallelApplication.log_url', | |
|
78 | 'cluster-id' : 'BaseParallelApplication.cluster_id', | |||
78 | }) |
|
79 | }) | |
79 |
|
80 | |||
80 | base_flags = { |
|
81 | base_flags = { | |
@@ -116,6 +117,18 b' class BaseParallelApplication(BaseIPythonApplication):' | |||||
116 | log_url = Unicode('', config=True, |
|
117 | log_url = Unicode('', config=True, | |
117 | help="The ZMQ URL of the iplogger to aggregate logging.") |
|
118 | help="The ZMQ URL of the iplogger to aggregate logging.") | |
118 |
|
119 | |||
|
120 | cluster_id = Unicode('', config=True, | |||
|
121 | help="""String id to add to runtime files, to prevent name collisions when | |||
|
122 | using multiple clusters with a single profile. | |||
|
123 | ||||
|
124 | When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json' | |||
|
125 | """ | |||
|
126 | ) | |||
|
127 | def _cluster_id_changed(self, name, old, new): | |||
|
128 | self.name = self.__class__.name | |||
|
129 | if new: | |||
|
130 | self.name += '-%s'%new | |||
|
131 | ||||
119 | def _config_files_default(self): |
|
132 | def _config_files_default(self): | |
120 | return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py'] |
|
133 | return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py'] | |
121 |
|
134 |
@@ -174,7 +174,18 b' class IPControllerApp(BaseParallelApplication):' | |||||
174 |
|
174 | |||
175 | use_threads = Bool(False, config=True, |
|
175 | use_threads = Bool(False, config=True, | |
176 | help='Use threads instead of processes for the schedulers', |
|
176 | help='Use threads instead of processes for the schedulers', | |
177 |
|
|
177 | ) | |
|
178 | ||||
|
179 | engine_json_file = Unicode('ipcontroller-engine.json', config=True, | |||
|
180 | help="JSON filename where engine connection info will be stored.") | |||
|
181 | client_json_file = Unicode('ipcontroller-client.json', config=True, | |||
|
182 | help="JSON filename where client connection info will be stored.") | |||
|
183 | ||||
|
184 | def _cluster_id_changed(self, name, old, new): | |||
|
185 | super(IPControllerApp, self)._cluster_id_changed(name, old, new) | |||
|
186 | self.engine_json_file = "%s-engine.json"%self.name | |||
|
187 | self.client_json_file = "%s-client.json"%self.name | |||
|
188 | ||||
178 |
|
189 | |||
179 | # internal |
|
190 | # internal | |
180 | children = List() |
|
191 | children = List() | |
@@ -215,7 +226,7 b' class IPControllerApp(BaseParallelApplication):' | |||||
215 | """load config from existing json connector files.""" |
|
226 | """load config from existing json connector files.""" | |
216 | c = self.config |
|
227 | c = self.config | |
217 | # load from engine config |
|
228 | # load from engine config | |
218 |
with open(os.path.join(self.profile_dir.security_dir, |
|
229 | with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f: | |
219 | cfg = json.loads(f.read()) |
|
230 | cfg = json.loads(f.read()) | |
220 | key = c.Session.key = asbytes(cfg['exec_key']) |
|
231 | key = c.Session.key = asbytes(cfg['exec_key']) | |
221 | xport,addr = cfg['url'].split('://') |
|
232 | xport,addr = cfg['url'].split('://') | |
@@ -227,7 +238,7 b' class IPControllerApp(BaseParallelApplication):' | |||||
227 | if not self.engine_ssh_server: |
|
238 | if not self.engine_ssh_server: | |
228 | self.engine_ssh_server = cfg['ssh'] |
|
239 | self.engine_ssh_server = cfg['ssh'] | |
229 | # load client config |
|
240 | # load client config | |
230 |
with open(os.path.join(self.profile_dir.security_dir, |
|
241 | with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f: | |
231 | cfg = json.loads(f.read()) |
|
242 | cfg = json.loads(f.read()) | |
232 | assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys" |
|
243 | assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys" | |
233 | xport,addr = cfg['url'].split('://') |
|
244 | xport,addr = cfg['url'].split('://') | |
@@ -277,11 +288,11 b' class IPControllerApp(BaseParallelApplication):' | |||||
277 | 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport), |
|
288 | 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport), | |
278 | 'location' : self.location |
|
289 | 'location' : self.location | |
279 | } |
|
290 | } | |
280 |
self.save_connection_dict( |
|
291 | self.save_connection_dict(self.client_json_file, cdict) | |
281 | edict = cdict |
|
292 | edict = cdict | |
282 | edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport)) |
|
293 | edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport)) | |
283 | edict['ssh'] = self.engine_ssh_server |
|
294 | edict['ssh'] = self.engine_ssh_server | |
284 |
self.save_connection_dict( |
|
295 | self.save_connection_dict(self.engine_json_file, edict) | |
285 |
|
296 | |||
286 | # |
|
297 | # | |
287 | def init_schedulers(self): |
|
298 | def init_schedulers(self): |
@@ -135,8 +135,8 b' aliases.update(base_aliases)' | |||||
135 |
|
135 | |||
136 | class IPEngineApp(BaseParallelApplication): |
|
136 | class IPEngineApp(BaseParallelApplication): | |
137 |
|
137 | |||
138 |
name = |
|
138 | name = 'ipengine' | |
139 |
description = |
|
139 | description = _description | |
140 | examples = _examples |
|
140 | examples = _examples | |
141 | config_file_name = Unicode(default_config_file_name) |
|
141 | config_file_name = Unicode(default_config_file_name) | |
142 | classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI]) |
|
142 | classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI]) | |
@@ -158,7 +158,15 b' class IPEngineApp(BaseParallelApplication):' | |||||
158 | controller and engine are started at the same time and it |
|
158 | controller and engine are started at the same time and it | |
159 | may take a moment for the controller to write the connector files.""") |
|
159 | may take a moment for the controller to write the connector files.""") | |
160 |
|
160 | |||
161 | url_file_name = Unicode(u'ipcontroller-engine.json') |
|
161 | url_file_name = Unicode(u'ipcontroller-engine.json', config=True) | |
|
162 | ||||
|
163 | def _cluster_id_changed(self, name, old, new): | |||
|
164 | if new: | |||
|
165 | base = 'ipcontroller-%s'%new | |||
|
166 | else: | |||
|
167 | base = 'ipcontroller' | |||
|
168 | self.url_file_name = "%s-engine.json"%base | |||
|
169 | ||||
162 | log_url = Unicode('', config=True, |
|
170 | log_url = Unicode('', config=True, | |
163 | help="""The URL for the iploggerapp instance, for forwarding |
|
171 | help="""The URL for the iploggerapp instance, for forwarding | |
164 | logging to a central location.""") |
|
172 | logging to a central location.""") |
General Comments 0
You need to be logged in to leave comments.
Login now