##// END OF EJS Templates
allow engines to wait for url_files to arrive...
MinRK -
Show More
@@ -24,6 +24,7 Authors:
24 24 import json
25 25 import os
26 26 import sys
27 import time
27 28
28 29 import zmq
29 30 from zmq.eventloop import ioloop
@@ -43,7 +44,7 from IPython.parallel.engine.streamkernel import Kernel
43 44 from IPython.parallel.util import disambiguate_url
44 45
45 46 from IPython.utils.importstring import import_item
46 from IPython.utils.traitlets import Bool, Unicode, Dict, List
47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
47 48
48 49
49 50 #-----------------------------------------------------------------------------
@@ -143,6 +144,11 class IPEngineApp(BaseParallelApplication):
143 144 security directory of the cluster directory. This location is
144 145 resolved using the `profile` or `profile_dir` options.""",
145 146 )
147 wait_for_url_file = Float(5, config=True,
148 help="""The maximum number of seconds to wait for url_file to exist.
149 This is useful for batch-systems and shared-filesystems where the
150 controller and engine are started at the same time and it
151 may take a moment for the controller to write the connector files.""")
146 152
147 153 url_file_name = Unicode(u'ipcontroller-engine.json')
148 154 log_url = Unicode('', config=True,
@@ -168,7 +174,7 class IPEngineApp(BaseParallelApplication):
168 174 # config.Global.key_file = try_this
169 175
170 176 def find_url_file(self):
171 """Set the key file.
177 """Set the url file.
172 178
173 179 Here we don't try to actually see if it exists for is valid as that
174 180 is hadled by the connection logic.
@@ -186,10 +192,28 class IPEngineApp(BaseParallelApplication):
186 192 config = self.config
187 193 # print config
188 194 self.find_url_file()
189
190 # if os.path.exists(config.Global.key_file) and config.Global.secure:
191 # config.SessionFactory.exec_key = config.Global.key_file
195
196 # was the url manually specified?
197 keys = set(self.config.EngineFactory.keys())
198 keys = keys.union(set(self.config.RegistrationFactory.keys()))
199
200 if keys.intersection(set(['ip', 'url', 'port'])):
201 # Connection info was specified, don't wait for the file
202 url_specified = True
203 self.wait_for_url_file = 0
204 else:
205 url_specified = False
206
207 if self.wait_for_url_file and not os.path.exists(self.url_file):
208 self.log.warn("url_file %r not found"%self.url_file)
209 self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file)
210 tic = time.time()
211 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
212 # wait for url_file to exist, for up to 10 seconds
213 time.sleep(0.1)
214
192 215 if os.path.exists(self.url_file):
216 self.log.info("Loading url_file %r"%self.url_file)
193 217 with open(self.url_file) as f:
194 218 d = json.loads(f.read())
195 219 for k,v in d.iteritems():
@@ -200,6 +224,10 class IPEngineApp(BaseParallelApplication):
200 224 d['url'] = disambiguate_url(d['url'], d['location'])
201 225 config.EngineFactory.url = d['url']
202 226 config.EngineFactory.location = d['location']
227 elif not url_specified:
228 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
229 self.exit(1)
230
203 231
204 232 try:
205 233 exec_lines = config.Kernel.exec_lines
@@ -72,7 +72,7 class EngineFactory(RegistrationFactory):
72 72 def register(self):
73 73 """send the registration_request"""
74 74
75 self.log.info("registering")
75 self.log.info("Registering with controller at %s"%self.url)
76 76 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
77 77 self.registrar.on_recv(self.complete_registration)
78 78 # print (self.session.key)
General Comments 0
You need to be logged in to leave comments. Login now