##// END OF EJS Templates
allow engines to wait for url_files to arrive...
MinRK -
Show More
@@ -24,6 +24,7 b' Authors:'
24 import json
24 import json
25 import os
25 import os
26 import sys
26 import sys
27 import time
27
28
28 import zmq
29 import zmq
29 from zmq.eventloop import ioloop
30 from zmq.eventloop import ioloop
@@ -43,7 +44,7 b' from IPython.parallel.engine.streamkernel import Kernel'
43 from IPython.parallel.util import disambiguate_url
44 from IPython.parallel.util import disambiguate_url
44
45
45 from IPython.utils.importstring import import_item
46 from IPython.utils.importstring import import_item
46 from IPython.utils.traitlets import Bool, Unicode, Dict, List
47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
47
48
48
49
49 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
@@ -143,6 +144,11 b' class IPEngineApp(BaseParallelApplication):'
143 security directory of the cluster directory. This location is
144 security directory of the cluster directory. This location is
144 resolved using the `profile` or `profile_dir` options.""",
145 resolved using the `profile` or `profile_dir` options.""",
145 )
146 )
147 wait_for_url_file = Float(5, config=True,
148 help="""The maximum number of seconds to wait for url_file to exist.
149 This is useful for batch-systems and shared-filesystems where the
150 controller and engine are started at the same time and it
151 may take a moment for the controller to write the connector files.""")
146
152
147 url_file_name = Unicode(u'ipcontroller-engine.json')
153 url_file_name = Unicode(u'ipcontroller-engine.json')
148 log_url = Unicode('', config=True,
154 log_url = Unicode('', config=True,
@@ -168,7 +174,7 b' class IPEngineApp(BaseParallelApplication):'
168 # config.Global.key_file = try_this
174 # config.Global.key_file = try_this
169
175
170 def find_url_file(self):
176 def find_url_file(self):
171 """Set the key file.
177 """Set the url file.
172
178
173 Here we don't try to actually see if it exists for is valid as that
179 Here we don't try to actually see if it exists for is valid as that
174 is hadled by the connection logic.
180 is hadled by the connection logic.
@@ -186,10 +192,28 b' class IPEngineApp(BaseParallelApplication):'
186 config = self.config
192 config = self.config
187 # print config
193 # print config
188 self.find_url_file()
194 self.find_url_file()
189
195
190 # if os.path.exists(config.Global.key_file) and config.Global.secure:
196 # was the url manually specified?
191 # config.SessionFactory.exec_key = config.Global.key_file
197 keys = set(self.config.EngineFactory.keys())
198 keys = keys.union(set(self.config.RegistrationFactory.keys()))
199
200 if keys.intersection(set(['ip', 'url', 'port'])):
201 # Connection info was specified, don't wait for the file
202 url_specified = True
203 self.wait_for_url_file = 0
204 else:
205 url_specified = False
206
207 if self.wait_for_url_file and not os.path.exists(self.url_file):
208 self.log.warn("url_file %r not found"%self.url_file)
209 self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file)
210 tic = time.time()
211 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
212 # wait for url_file to exist, for up to 10 seconds
213 time.sleep(0.1)
214
192 if os.path.exists(self.url_file):
215 if os.path.exists(self.url_file):
216 self.log.info("Loading url_file %r"%self.url_file)
193 with open(self.url_file) as f:
217 with open(self.url_file) as f:
194 d = json.loads(f.read())
218 d = json.loads(f.read())
195 for k,v in d.iteritems():
219 for k,v in d.iteritems():
@@ -200,6 +224,10 b' class IPEngineApp(BaseParallelApplication):'
200 d['url'] = disambiguate_url(d['url'], d['location'])
224 d['url'] = disambiguate_url(d['url'], d['location'])
201 config.EngineFactory.url = d['url']
225 config.EngineFactory.url = d['url']
202 config.EngineFactory.location = d['location']
226 config.EngineFactory.location = d['location']
227 elif not url_specified:
228 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
229 self.exit(1)
230
203
231
204 try:
232 try:
205 exec_lines = config.Kernel.exec_lines
233 exec_lines = config.Kernel.exec_lines
@@ -72,7 +72,7 b' class EngineFactory(RegistrationFactory):'
72 def register(self):
72 def register(self):
73 """send the registration_request"""
73 """send the registration_request"""
74
74
75 self.log.info("registering")
75 self.log.info("Registering with controller at %s"%self.url)
76 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
76 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
77 self.registrar.on_recv(self.complete_registration)
77 self.registrar.on_recv(self.complete_registration)
78 # print (self.session.key)
78 # print (self.session.key)
General Comments 0
You need to be logged in to leave comments. Login now