##// END OF EJS Templates
Backport PR #2214: use KernelApp.exec_lines/files in IPEngineApp...
MinRK -
Show More
@@ -1,377 +1,384 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import json
25 25 import os
26 26 import sys
27 27 import time
28 28
29 29 import zmq
30 30 from zmq.eventloop import ioloop
31 31
32 32 from IPython.core.profiledir import ProfileDir
33 33 from IPython.parallel.apps.baseapp import (
34 34 BaseParallelApplication,
35 35 base_aliases,
36 36 base_flags,
37 37 catch_config_error,
38 38 )
39 39 from IPython.zmq.log import EnginePUBHandler
40 40 from IPython.zmq.ipkernel import Kernel, IPKernelApp
41 41 from IPython.zmq.session import (
42 42 Session, session_aliases, session_flags
43 43 )
44 44
45 45 from IPython.config.configurable import Configurable
46 46
47 47 from IPython.parallel.engine.engine import EngineFactory
48 48 from IPython.parallel.util import disambiguate_url
49 49
50 50 from IPython.utils.importstring import import_item
51 51 from IPython.utils.py3compat import cast_bytes
52 52 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float, Instance
53 53
54 54
55 55 #-----------------------------------------------------------------------------
56 56 # Module level variables
57 57 #-----------------------------------------------------------------------------
58 58
59 59 #: The default config file name for this application
60 60 default_config_file_name = u'ipengine_config.py'
61 61
62 62 _description = """Start an IPython engine for parallel computing.
63 63
64 64 IPython engines run in parallel and perform computations on behalf of a client
65 65 and controller. A controller needs to be started before the engines. The
66 66 engine can be configured using command line options or using a cluster
67 67 directory. Cluster directories contain config, log and security files and are
68 68 usually located in your ipython directory and named as "profile_name".
69 69 See the `profile` and `profile-dir` options for details.
70 70 """
71 71
72 72 _examples = """
73 73 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
74 74 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
75 75 """
76 76
77 77 #-----------------------------------------------------------------------------
78 78 # MPI configuration
79 79 #-----------------------------------------------------------------------------
80 80
81 81 mpi4py_init = """from mpi4py import MPI as mpi
82 82 mpi.size = mpi.COMM_WORLD.Get_size()
83 83 mpi.rank = mpi.COMM_WORLD.Get_rank()
84 84 """
85 85
86 86
87 87 pytrilinos_init = """from PyTrilinos import Epetra
88 88 class SimpleStruct:
89 89 pass
90 90 mpi = SimpleStruct()
91 91 mpi.rank = 0
92 92 mpi.size = 0
93 93 """
94 94
95 95 class MPI(Configurable):
96 96 """Configurable for MPI initialization"""
97 97 use = Unicode('', config=True,
98 98 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
99 99 )
100 100
101 101 def _use_changed(self, name, old, new):
102 102 # load default init script if it's not set
103 103 if not self.init_script:
104 104 self.init_script = self.default_inits.get(new, '')
105 105
106 106 init_script = Unicode('', config=True,
107 107 help="Initialization code for MPI")
108 108
109 109 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
110 110 config=True)
111 111
112 112
113 113 #-----------------------------------------------------------------------------
114 114 # Main application
115 115 #-----------------------------------------------------------------------------
116 116 aliases = dict(
117 117 file = 'IPEngineApp.url_file',
118 118 c = 'IPEngineApp.startup_command',
119 119 s = 'IPEngineApp.startup_script',
120 120
121 121 url = 'EngineFactory.url',
122 122 ssh = 'EngineFactory.sshserver',
123 123 sshkey = 'EngineFactory.sshkey',
124 124 ip = 'EngineFactory.ip',
125 125 transport = 'EngineFactory.transport',
126 126 port = 'EngineFactory.regport',
127 127 location = 'EngineFactory.location',
128 128
129 129 timeout = 'EngineFactory.timeout',
130 130
131 131 mpi = 'MPI.use',
132 132
133 133 )
134 134 aliases.update(base_aliases)
135 135 aliases.update(session_aliases)
136 136 flags = {}
137 137 flags.update(base_flags)
138 138 flags.update(session_flags)
139 139
140 140 class IPEngineApp(BaseParallelApplication):
141 141
142 142 name = 'ipengine'
143 143 description = _description
144 144 examples = _examples
145 145 config_file_name = Unicode(default_config_file_name)
146 146 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
147 147
148 148 startup_script = Unicode(u'', config=True,
149 149 help='specify a script to be run at startup')
150 150 startup_command = Unicode('', config=True,
151 151 help='specify a command to be run at startup')
152 152
153 153 url_file = Unicode(u'', config=True,
154 154 help="""The full location of the file containing the connection information for
155 155 the controller. If this is not given, the file must be in the
156 156 security directory of the cluster directory. This location is
157 157 resolved using the `profile` or `profile_dir` options.""",
158 158 )
159 159 wait_for_url_file = Float(5, config=True,
160 160 help="""The maximum number of seconds to wait for url_file to exist.
161 161 This is useful for batch-systems and shared-filesystems where the
162 162 controller and engine are started at the same time and it
163 163 may take a moment for the controller to write the connector files.""")
164 164
165 165 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
166 166
167 167 def _cluster_id_changed(self, name, old, new):
168 168 if new:
169 169 base = 'ipcontroller-%s' % new
170 170 else:
171 171 base = 'ipcontroller'
172 172 self.url_file_name = "%s-engine.json" % base
173 173
174 174 log_url = Unicode('', config=True,
175 175 help="""The URL for the iploggerapp instance, for forwarding
176 176 logging to a central location.""")
177 177
178 178 # an IPKernelApp instance, used to setup listening for shell frontends
179 179 kernel_app = Instance(IPKernelApp)
180 180
181 181 aliases = Dict(aliases)
182 182 flags = Dict(flags)
183 183
184 184 @property
185 185 def kernel(self):
186 186 """allow access to the Kernel object, so I look like IPKernelApp"""
187 187 return self.engine.kernel
188 188
189 189 def find_url_file(self):
190 190 """Set the url file.
191 191
192 192 Here we don't try to actually see if it exists for is valid as that
193 193 is hadled by the connection logic.
194 194 """
195 195 config = self.config
196 196 # Find the actual controller key file
197 197 if not self.url_file:
198 198 self.url_file = os.path.join(
199 199 self.profile_dir.security_dir,
200 200 self.url_file_name
201 201 )
202 202
203 203 def load_connector_file(self):
204 204 """load config from a JSON connector file,
205 205 at a *lower* priority than command-line/config files.
206 206 """
207 207
208 208 self.log.info("Loading url_file %r", self.url_file)
209 209 config = self.config
210 210
211 211 with open(self.url_file) as f:
212 212 d = json.loads(f.read())
213 213
214 214 if 'exec_key' in d:
215 215 config.Session.key = cast_bytes(d['exec_key'])
216 216
217 217 try:
218 218 config.EngineFactory.location
219 219 except AttributeError:
220 220 config.EngineFactory.location = d['location']
221 221
222 222 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
223 223 try:
224 224 config.EngineFactory.url
225 225 except AttributeError:
226 226 config.EngineFactory.url = d['url']
227 227
228 228 try:
229 229 config.EngineFactory.sshserver
230 230 except AttributeError:
231 231 config.EngineFactory.sshserver = d['ssh']
232 232
233 233 def bind_kernel(self, **kwargs):
234 234 """Promote engine to listening kernel, accessible to frontends."""
235 235 if self.kernel_app is not None:
236 236 return
237 237
238 238 self.log.info("Opening ports for direct connections as an IPython kernel")
239 239
240 240 kernel = self.kernel
241 241
242 242 kwargs.setdefault('config', self.config)
243 243 kwargs.setdefault('log', self.log)
244 244 kwargs.setdefault('profile_dir', self.profile_dir)
245 245 kwargs.setdefault('session', self.engine.session)
246 246
247 247 app = self.kernel_app = IPKernelApp(**kwargs)
248 248
249 249 # allow IPKernelApp.instance():
250 250 IPKernelApp._instance = app
251 251
252 252 app.init_connection_file()
253 253 # relevant contents of init_sockets:
254 254
255 255 app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port)
256 256 app.log.debug("shell ROUTER Channel on port: %i", app.shell_port)
257 257
258 258 app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port)
259 259 app.log.debug("iopub PUB Channel on port: %i", app.iopub_port)
260 260
261 261 kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER)
262 262 app.stdin_port = app._bind_socket(kernel.stdin_socket, app.stdin_port)
263 263 app.log.debug("stdin ROUTER Channel on port: %i", app.stdin_port)
264 264
265 265 # start the heartbeat, and log connection info:
266 266
267 267 app.init_heartbeat()
268 268
269 269 app.log_connection_info()
270 270 app.write_connection_file()
271 271
272 272
273 273 def init_engine(self):
274 274 # This is the working dir by now.
275 275 sys.path.insert(0, '')
276 276 config = self.config
277 277 # print config
278 278 self.find_url_file()
279 279
280 280 # was the url manually specified?
281 281 keys = set(self.config.EngineFactory.keys())
282 282 keys = keys.union(set(self.config.RegistrationFactory.keys()))
283 283
284 284 if keys.intersection(set(['ip', 'url', 'port'])):
285 285 # Connection info was specified, don't wait for the file
286 286 url_specified = True
287 287 self.wait_for_url_file = 0
288 288 else:
289 289 url_specified = False
290 290
291 291 if self.wait_for_url_file and not os.path.exists(self.url_file):
292 292 self.log.warn("url_file %r not found", self.url_file)
293 293 self.log.warn("Waiting up to %.1f seconds for it to arrive.", self.wait_for_url_file)
294 294 tic = time.time()
295 295 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
296 296 # wait for url_file to exist, or until time limit
297 297 time.sleep(0.1)
298 298
299 299 if os.path.exists(self.url_file):
300 300 self.load_connector_file()
301 301 elif not url_specified:
302 302 self.log.fatal("Fatal: url file never arrived: %s", self.url_file)
303 303 self.exit(1)
304 304
305 305
306 306 try:
307 exec_lines = config.Kernel.exec_lines
307 exec_lines = config.IPKernelApp.exec_lines
308 308 except AttributeError:
309 config.Kernel.exec_lines = []
310 exec_lines = config.Kernel.exec_lines
309 try:
310 exec_lines = config.InteractiveShellApp.exec_lines
311 except AttributeError:
312 exec_lines = config.IPKernelApp.exec_lines = []
313 try:
314 exec_files = config.IPKernelApp.exec_files
315 except AttributeError:
316 try:
317 exec_files = config.InteractiveShellApp.exec_files
318 except AttributeError:
319 exec_files = config.IPKernelApp.exec_files = []
311 320
312 321 if self.startup_script:
313 enc = sys.getfilesystemencoding() or 'utf8'
314 cmd="execfile(%r)" % self.startup_script.encode(enc)
315 exec_lines.append(cmd)
322 exec_files.append(self.startup_script)
316 323 if self.startup_command:
317 324 exec_lines.append(self.startup_command)
318 325
319 326 # Create the underlying shell class and Engine
320 327 # shell_class = import_item(self.master_config.Global.shell_class)
321 328 # print self.config
322 329 try:
323 330 self.engine = EngineFactory(config=config, log=self.log)
324 331 except:
325 332 self.log.error("Couldn't start the Engine", exc_info=True)
326 333 self.exit(1)
327 334
328 335 def forward_logging(self):
329 336 if self.log_url:
330 337 self.log.info("Forwarding logging to %s", self.log_url)
331 338 context = self.engine.context
332 339 lsock = context.socket(zmq.PUB)
333 340 lsock.connect(self.log_url)
334 341 handler = EnginePUBHandler(self.engine, lsock)
335 342 handler.setLevel(self.log_level)
336 343 self.log.addHandler(handler)
337 344
338 345 def init_mpi(self):
339 346 global mpi
340 347 self.mpi = MPI(config=self.config)
341 348
342 349 mpi_import_statement = self.mpi.init_script
343 350 if mpi_import_statement:
344 351 try:
345 352 self.log.info("Initializing MPI:")
346 353 self.log.info(mpi_import_statement)
347 354 exec mpi_import_statement in globals()
348 355 except:
349 356 mpi = None
350 357 else:
351 358 mpi = None
352 359
353 360 @catch_config_error
354 361 def initialize(self, argv=None):
355 362 super(IPEngineApp, self).initialize(argv)
356 363 self.init_mpi()
357 364 self.init_engine()
358 365 self.forward_logging()
359 366
360 367 def start(self):
361 368 self.engine.start()
362 369 try:
363 370 self.engine.loop.start()
364 371 except KeyboardInterrupt:
365 372 self.log.critical("Engine Interrupted, shutting down...\n")
366 373
367 374
368 375 def launch_new_instance():
369 376 """Create and run the IPython engine"""
370 377 app = IPEngineApp.instance()
371 378 app.initialize()
372 379 app.start()
373 380
374 381
375 382 if __name__ == '__main__':
376 383 launch_new_instance()
377 384
@@ -1,237 +1,242 b''
1 1 """A simple engine that talks to a controller over 0MQ.
2 2 it handles registration, etc. and launches a kernel
3 3 connected to the Controller's Schedulers.
4 4
5 5 Authors:
6 6
7 7 * Min RK
8 8 """
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2010-2011 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 from __future__ import print_function
17 17
18 18 import sys
19 19 import time
20 20 from getpass import getpass
21 21
22 22 import zmq
23 23 from zmq.eventloop import ioloop, zmqstream
24 24
25 25 from IPython.external.ssh import tunnel
26 26 # internal
27 27 from IPython.utils.traitlets import (
28 28 Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool
29 29 )
30 30 from IPython.utils.py3compat import cast_bytes
31 31
32 32 from IPython.parallel.controller.heartmonitor import Heart
33 33 from IPython.parallel.factory import RegistrationFactory
34 34 from IPython.parallel.util import disambiguate_url
35 35
36 36 from IPython.zmq.session import Message
37 from IPython.zmq.ipkernel import Kernel
37 from IPython.zmq.ipkernel import Kernel, IPKernelApp
38 38
39 39 class EngineFactory(RegistrationFactory):
40 40 """IPython engine"""
41 41
42 42 # configurables:
43 43 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
44 44 help="""The OutStream for handling stdout/err.
45 45 Typically 'IPython.zmq.iostream.OutStream'""")
46 46 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
47 47 help="""The class for handling displayhook.
48 48 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
49 49 location=Unicode(config=True,
50 50 help="""The location (an IP address) of the controller. This is
51 51 used for disambiguating URLs, to determine whether
52 52 loopback should be used to connect or the public address.""")
53 53 timeout=CFloat(2,config=True,
54 54 help="""The time (in seconds) to wait for the Controller to respond
55 55 to registration requests before giving up.""")
56 56 sshserver=Unicode(config=True,
57 57 help="""The SSH server to use for tunneling connections to the Controller.""")
58 58 sshkey=Unicode(config=True,
59 59 help="""The SSH private key file to use when tunneling connections to the Controller.""")
60 60 paramiko=Bool(sys.platform == 'win32', config=True,
61 61 help="""Whether to use paramiko instead of openssh for tunnels.""")
62 62
63 63 # not configurable:
64 64 user_ns=Dict()
65 65 id=Integer(allow_none=True)
66 66 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
67 67 kernel=Instance(Kernel)
68 68
69 69 bident = CBytes()
70 70 ident = Unicode()
71 71 def _ident_changed(self, name, old, new):
72 72 self.bident = cast_bytes(new)
73 73 using_ssh=Bool(False)
74 74
75 75
76 76 def __init__(self, **kwargs):
77 77 super(EngineFactory, self).__init__(**kwargs)
78 78 self.ident = self.session.session
79 79
80 80 def init_connector(self):
81 81 """construct connection function, which handles tunnels."""
82 82 self.using_ssh = bool(self.sshkey or self.sshserver)
83 83
84 84 if self.sshkey and not self.sshserver:
85 85 # We are using ssh directly to the controller, tunneling localhost to localhost
86 86 self.sshserver = self.url.split('://')[1].split(':')[0]
87 87
88 88 if self.using_ssh:
89 89 if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
90 90 password=False
91 91 else:
92 92 password = getpass("SSH Password for %s: "%self.sshserver)
93 93 else:
94 94 password = False
95 95
96 96 def connect(s, url):
97 97 url = disambiguate_url(url, self.location)
98 98 if self.using_ssh:
99 99 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
100 100 return tunnel.tunnel_connection(s, url, self.sshserver,
101 101 keyfile=self.sshkey, paramiko=self.paramiko,
102 102 password=password,
103 103 )
104 104 else:
105 105 return s.connect(url)
106 106
107 107 def maybe_tunnel(url):
108 108 """like connect, but don't complete the connection (for use by heartbeat)"""
109 109 url = disambiguate_url(url, self.location)
110 110 if self.using_ssh:
111 111 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
112 112 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
113 113 keyfile=self.sshkey, paramiko=self.paramiko,
114 114 password=password,
115 115 )
116 116 return url
117 117 return connect, maybe_tunnel
118 118
119 119 def register(self):
120 120 """send the registration_request"""
121 121
122 122 self.log.info("Registering with controller at %s"%self.url)
123 123 ctx = self.context
124 124 connect,maybe_tunnel = self.init_connector()
125 125 reg = ctx.socket(zmq.DEALER)
126 126 reg.setsockopt(zmq.IDENTITY, self.bident)
127 127 connect(reg, self.url)
128 128 self.registrar = zmqstream.ZMQStream(reg, self.loop)
129 129
130 130
131 131 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
132 132 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
133 133 # print (self.session.key)
134 134 self.session.send(self.registrar, "registration_request",content=content)
135 135
136 136 def complete_registration(self, msg, connect, maybe_tunnel):
137 137 # print msg
138 138 self._abort_dc.stop()
139 139 ctx = self.context
140 140 loop = self.loop
141 141 identity = self.bident
142 142 idents,msg = self.session.feed_identities(msg)
143 143 msg = Message(self.session.unserialize(msg))
144 144
145 145 if msg.content.status == 'ok':
146 146 self.id = int(msg.content.id)
147 147
148 148 # launch heartbeat
149 149 hb_addrs = msg.content.heartbeat
150 150
151 151 # possibly forward hb ports with tunnels
152 152 hb_addrs = [ maybe_tunnel(addr) for addr in hb_addrs ]
153 153 heart = Heart(*map(str, hb_addrs), heart_id=identity)
154 154 heart.start()
155 155
156 156 # create Shell Streams (MUX, Task, etc.):
157 157 queue_addr = msg.content.mux
158 158 shell_addrs = [ str(queue_addr) ]
159 159 task_addr = msg.content.task
160 160 if task_addr:
161 161 shell_addrs.append(str(task_addr))
162 162
163 163 # Uncomment this to go back to two-socket model
164 164 # shell_streams = []
165 165 # for addr in shell_addrs:
166 166 # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
167 167 # stream.setsockopt(zmq.IDENTITY, identity)
168 168 # stream.connect(disambiguate_url(addr, self.location))
169 169 # shell_streams.append(stream)
170 170
171 171 # Now use only one shell stream for mux and tasks
172 172 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
173 173 stream.setsockopt(zmq.IDENTITY, identity)
174 174 shell_streams = [stream]
175 175 for addr in shell_addrs:
176 176 connect(stream, addr)
177 177 # end single stream-socket
178 178
179 179 # control stream:
180 180 control_addr = str(msg.content.control)
181 181 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
182 182 control_stream.setsockopt(zmq.IDENTITY, identity)
183 183 connect(control_stream, control_addr)
184 184
185 185 # create iopub stream:
186 186 iopub_addr = msg.content.iopub
187 187 iopub_socket = ctx.socket(zmq.PUB)
188 188 iopub_socket.setsockopt(zmq.IDENTITY, identity)
189 189 connect(iopub_socket, iopub_addr)
190 190
191 191 # disable history:
192 192 self.config.HistoryManager.hist_file = ':memory:'
193 193
194 194 # Redirect input streams and set a display hook.
195 195 if self.out_stream_factory:
196 196 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
197 197 sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id)
198 198 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
199 199 sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id)
200 200 if self.display_hook_factory:
201 201 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
202 202 sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id)
203 203
204 204 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
205 205 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
206 206 loop=loop, user_ns=self.user_ns, log=self.log)
207
207 208 self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id)
208 self.kernel.start()
209 209
210 # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged
211 app = IPKernelApp(config=self.config, shell=self.kernel.shell, kernel=self.kernel, log=self.log)
212 app.init_profile_dir()
213 app.init_code()
210 214
215 self.kernel.start()
211 216 else:
212 217 self.log.fatal("Registration Failed: %s"%msg)
213 218 raise Exception("Registration Failed: %s"%msg)
214 219
215 220 self.log.info("Completed registration with id %i"%self.id)
216 221
217 222
218 223 def abort(self):
219 224 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
220 225 if self.url.startswith('127.'):
221 226 self.log.fatal("""
222 227 If the controller and engines are not on the same machine,
223 228 you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py):
224 229 c.HubFactory.ip='*' # for all interfaces, internal and external
225 230 c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see
226 231 or tunnel connections via ssh.
227 232 """)
228 233 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
229 234 time.sleep(1)
230 235 sys.exit(255)
231 236
232 237 def start(self):
233 238 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
234 239 dc.start()
235 240 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
236 241 self._abort_dc.start()
237 242
General Comments 0
You need to be logged in to leave comments. Login now