Show More
@@ -24,6 +24,7 b' Authors:' | |||||
24 | import json |
|
24 | import json | |
25 | import os |
|
25 | import os | |
26 | import sys |
|
26 | import sys | |
|
27 | import time | |||
27 |
|
28 | |||
28 | import zmq |
|
29 | import zmq | |
29 | from zmq.eventloop import ioloop |
|
30 | from zmq.eventloop import ioloop | |
@@ -43,7 +44,7 b' from IPython.parallel.engine.streamkernel import Kernel' | |||||
43 | from IPython.parallel.util import disambiguate_url |
|
44 | from IPython.parallel.util import disambiguate_url | |
44 |
|
45 | |||
45 | from IPython.utils.importstring import import_item |
|
46 | from IPython.utils.importstring import import_item | |
46 | from IPython.utils.traitlets import Bool, Unicode, Dict, List |
|
47 | from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float | |
47 |
|
48 | |||
48 |
|
49 | |||
49 | #----------------------------------------------------------------------------- |
|
50 | #----------------------------------------------------------------------------- | |
@@ -143,6 +144,11 b' class IPEngineApp(BaseParallelApplication):' | |||||
143 | security directory of the cluster directory. This location is |
|
144 | security directory of the cluster directory. This location is | |
144 | resolved using the `profile` or `profile_dir` options.""", |
|
145 | resolved using the `profile` or `profile_dir` options.""", | |
145 | ) |
|
146 | ) | |
|
147 | wait_for_url_file = Float(5, config=True, | |||
|
148 | help="""The maximum number of seconds to wait for url_file to exist. | |||
|
149 | This is useful for batch-systems and shared-filesystems where the | |||
|
150 | controller and engine are started at the same time and it | |||
|
151 | may take a moment for the controller to write the connector files.""") | |||
146 |
|
152 | |||
147 | url_file_name = Unicode(u'ipcontroller-engine.json') |
|
153 | url_file_name = Unicode(u'ipcontroller-engine.json') | |
148 | log_url = Unicode('', config=True, |
|
154 | log_url = Unicode('', config=True, | |
@@ -168,7 +174,7 b' class IPEngineApp(BaseParallelApplication):' | |||||
168 | # config.Global.key_file = try_this |
|
174 | # config.Global.key_file = try_this | |
169 |
|
175 | |||
170 | def find_url_file(self): |
|
176 | def find_url_file(self): | |
171 |
"""Set the |
|
177 | """Set the url file. | |
172 |
|
178 | |||
173 | Here we don't try to actually see if it exists for is valid as that |
|
179 | Here we don't try to actually see if it exists for is valid as that | |
174 | is hadled by the connection logic. |
|
180 | is hadled by the connection logic. | |
@@ -186,10 +192,28 b' class IPEngineApp(BaseParallelApplication):' | |||||
186 | config = self.config |
|
192 | config = self.config | |
187 | # print config |
|
193 | # print config | |
188 | self.find_url_file() |
|
194 | self.find_url_file() | |
189 |
|
195 | |||
190 | # if os.path.exists(config.Global.key_file) and config.Global.secure: |
|
196 | # was the url manually specified? | |
191 | # config.SessionFactory.exec_key = config.Global.key_file |
|
197 | keys = set(self.config.EngineFactory.keys()) | |
|
198 | keys = keys.union(set(self.config.RegistrationFactory.keys())) | |||
|
199 | ||||
|
200 | if keys.intersection(set(['ip', 'url', 'port'])): | |||
|
201 | # Connection info was specified, don't wait for the file | |||
|
202 | url_specified = True | |||
|
203 | self.wait_for_url_file = 0 | |||
|
204 | else: | |||
|
205 | url_specified = False | |||
|
206 | ||||
|
207 | if self.wait_for_url_file and not os.path.exists(self.url_file): | |||
|
208 | self.log.warn("url_file %r not found"%self.url_file) | |||
|
209 | self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file) | |||
|
210 | tic = time.time() | |||
|
211 | while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file): | |||
|
212 | # wait for url_file to exist, for up to 10 seconds | |||
|
213 | time.sleep(0.1) | |||
|
214 | ||||
192 | if os.path.exists(self.url_file): |
|
215 | if os.path.exists(self.url_file): | |
|
216 | self.log.info("Loading url_file %r"%self.url_file) | |||
193 | with open(self.url_file) as f: |
|
217 | with open(self.url_file) as f: | |
194 | d = json.loads(f.read()) |
|
218 | d = json.loads(f.read()) | |
195 | for k,v in d.iteritems(): |
|
219 | for k,v in d.iteritems(): | |
@@ -200,6 +224,10 b' class IPEngineApp(BaseParallelApplication):' | |||||
200 | d['url'] = disambiguate_url(d['url'], d['location']) |
|
224 | d['url'] = disambiguate_url(d['url'], d['location']) | |
201 | config.EngineFactory.url = d['url'] |
|
225 | config.EngineFactory.url = d['url'] | |
202 | config.EngineFactory.location = d['location'] |
|
226 | config.EngineFactory.location = d['location'] | |
|
227 | elif not url_specified: | |||
|
228 | self.log.critical("Fatal: url file never arrived: %s"%self.url_file) | |||
|
229 | self.exit(1) | |||
|
230 | ||||
203 |
|
231 | |||
204 | try: |
|
232 | try: | |
205 | exec_lines = config.Kernel.exec_lines |
|
233 | exec_lines = config.Kernel.exec_lines |
@@ -72,7 +72,7 b' class EngineFactory(RegistrationFactory):' | |||||
72 | def register(self): |
|
72 | def register(self): | |
73 | """send the registration_request""" |
|
73 | """send the registration_request""" | |
74 |
|
74 | |||
75 |
self.log.info(" |
|
75 | self.log.info("Registering with controller at %s"%self.url) | |
76 | content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident) |
|
76 | content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident) | |
77 | self.registrar.on_recv(self.complete_registration) |
|
77 | self.registrar.on_recv(self.complete_registration) | |
78 | # print (self.session.key) |
|
78 | # print (self.session.key) |
General Comments 0
You need to be logged in to leave comments.
Login now