##// END OF EJS Templates
minor cleanup in ipcontroller/ipengine...
MinRK -
Show More
@@ -210,6 +210,7 b' class IPControllerApp(BaseParallelApplication):'
210 location = '127.0.0.1'
210 location = '127.0.0.1'
211 cdict['location'] = location
211 cdict['location'] = location
212 fname = os.path.join(self.profile_dir.security_dir, fname)
212 fname = os.path.join(self.profile_dir.security_dir, fname)
213 self.log.info("writing connection info to %s", fname)
213 with open(fname, 'w') as f:
214 with open(fname, 'w') as f:
214 f.write(json.dumps(cdict, indent=2))
215 f.write(json.dumps(cdict, indent=2))
215 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
216 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
@@ -219,7 +220,9 b' class IPControllerApp(BaseParallelApplication):'
219 c = self.config
220 c = self.config
220 self.log.debug("loading config from JSON")
221 self.log.debug("loading config from JSON")
221 # load from engine config
222 # load from engine config
222 with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f:
223 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
224 self.log.info("loading connection info from %s", fname)
225 with open(fname) as f:
223 cfg = json.loads(f.read())
226 cfg = json.loads(f.read())
224 key = c.Session.key = asbytes(cfg['exec_key'])
227 key = c.Session.key = asbytes(cfg['exec_key'])
225 xport,addr = cfg['url'].split('://')
228 xport,addr = cfg['url'].split('://')
@@ -231,7 +234,9 b' class IPControllerApp(BaseParallelApplication):'
231 if not self.engine_ssh_server:
234 if not self.engine_ssh_server:
232 self.engine_ssh_server = cfg['ssh']
235 self.engine_ssh_server = cfg['ssh']
233 # load client config
236 # load client config
234 with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f:
237 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
238 self.log.info("loading connection info from %s", fname)
239 with open(fname) as f:
235 cfg = json.loads(f.read())
240 cfg = json.loads(f.read())
236 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
241 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
237 xport,addr = cfg['url'].split('://')
242 xport,addr = cfg['url'].split('://')
@@ -177,22 +177,6 b' class IPEngineApp(BaseParallelApplication):'
177 aliases = Dict(aliases)
177 aliases = Dict(aliases)
178 flags = Dict(flags)
178 flags = Dict(flags)
179
179
180 # def find_key_file(self):
181 # """Set the key file.
182 #
183 # Here we don't try to actually see if it exists for is valid as that
184 # is hadled by the connection logic.
185 # """
186 # config = self.master_config
187 # # Find the actual controller key file
188 # if not config.Global.key_file:
189 # try_this = os.path.join(
190 # config.Global.profile_dir,
191 # config.Global.security_dir,
192 # config.Global.key_file_name
193 # )
194 # config.Global.key_file = try_this
195
196 def find_url_file(self):
180 def find_url_file(self):
197 """Set the url file.
181 """Set the url file.
198
182
@@ -212,7 +196,7 b' class IPEngineApp(BaseParallelApplication):'
212 at a *lower* priority than command-line/config files.
196 at a *lower* priority than command-line/config files.
213 """
197 """
214
198
215 self.log.info("Loading url_file %r"%self.url_file)
199 self.log.info("Loading url_file %r", self.url_file)
216 config = self.config
200 config = self.config
217
201
218 with open(self.url_file) as f:
202 with open(self.url_file) as f:
@@ -256,17 +240,17 b' class IPEngineApp(BaseParallelApplication):'
256 url_specified = False
240 url_specified = False
257
241
258 if self.wait_for_url_file and not os.path.exists(self.url_file):
242 if self.wait_for_url_file and not os.path.exists(self.url_file):
259 self.log.warn("url_file %r not found"%self.url_file)
243 self.log.warn("url_file %r not found", self.url_file)
260 self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file)
244 self.log.warn("Waiting up to %.1f seconds for it to arrive.", self.wait_for_url_file)
261 tic = time.time()
245 tic = time.time()
262 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
246 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
263 # wait for url_file to exist, for up to 10 seconds
247 # wait for url_file to exist, or until time limit
264 time.sleep(0.1)
248 time.sleep(0.1)
265
249
266 if os.path.exists(self.url_file):
250 if os.path.exists(self.url_file):
267 self.load_connector_file()
251 self.load_connector_file()
268 elif not url_specified:
252 elif not url_specified:
269 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
253 self.log.fatal("Fatal: url file never arrived: %s", self.url_file)
270 self.exit(1)
254 self.exit(1)
271
255
272
256
@@ -278,7 +262,7 b' class IPEngineApp(BaseParallelApplication):'
278
262
279 if self.startup_script:
263 if self.startup_script:
280 enc = sys.getfilesystemencoding() or 'utf8'
264 enc = sys.getfilesystemencoding() or 'utf8'
281 cmd="execfile(%r)"%self.startup_script.encode(enc)
265 cmd="execfile(%r)" % self.startup_script.encode(enc)
282 exec_lines.append(cmd)
266 exec_lines.append(cmd)
283 if self.startup_command:
267 if self.startup_command:
284 exec_lines.append(self.startup_command)
268 exec_lines.append(self.startup_command)
@@ -294,7 +278,7 b' class IPEngineApp(BaseParallelApplication):'
294
278
295 def forward_logging(self):
279 def forward_logging(self):
296 if self.log_url:
280 if self.log_url:
297 self.log.info("Forwarding logging to %s"%self.log_url)
281 self.log.info("Forwarding logging to %s", self.log_url)
298 context = self.engine.context
282 context = self.engine.context
299 lsock = context.socket(zmq.PUB)
283 lsock = context.socket(zmq.PUB)
300 lsock.connect(self.log_url)
284 lsock.connect(self.log_url)
General Comments 0
You need to be logged in to leave comments. Login now