##// END OF EJS Templates
Merge pull request #1372 from minrk/reuse-cleanup...
Min RK -
r6080:c1c517a0 merge
parent child Browse files
Show More
@@ -1,459 +1,493 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 from __future__ import with_statement
25 25
26 26 import json
27 27 import os
28 28 import socket
29 29 import stat
30 30 import sys
31 31
32 32 from multiprocessing import Process
33 from signal import signal, SIGINT, SIGABRT, SIGTERM
33 34
34 35 import zmq
35 36 from zmq.devices import ProcessMonitoredQueue
36 37 from zmq.log.handlers import PUBHandler
37 38
38 39 from IPython.core.profiledir import ProfileDir
39 40
40 41 from IPython.parallel.apps.baseapp import (
41 42 BaseParallelApplication,
42 43 base_aliases,
43 44 base_flags,
44 45 catch_config_error,
45 46 )
46 47 from IPython.utils.importstring import import_item
47 48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
48 49
49 50 from IPython.zmq.session import (
50 51 Session, session_aliases, session_flags, default_secure
51 52 )
52 53
53 54 from IPython.parallel.controller.heartmonitor import HeartMonitor
54 55 from IPython.parallel.controller.hub import HubFactory
55 56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
56 57 from IPython.parallel.controller.sqlitedb import SQLiteDB
57 58
58 from IPython.parallel.util import signal_children, split_url, disambiguate_url
59 from IPython.parallel.util import split_url, disambiguate_url
59 60
60 61 # conditional import of MongoDB backend class
61 62
62 63 try:
63 64 from IPython.parallel.controller.mongodb import MongoDB
64 65 except ImportError:
65 66 maybe_mongo = []
66 67 else:
67 68 maybe_mongo = [MongoDB]
68 69
69 70
70 71 #-----------------------------------------------------------------------------
71 72 # Module level variables
72 73 #-----------------------------------------------------------------------------
73 74
74 75
75 76 #: The default config file name for this application
76 77 default_config_file_name = u'ipcontroller_config.py'
77 78
78 79
79 80 _description = """Start the IPython controller for parallel computing.
80 81
81 82 The IPython controller provides a gateway between the IPython engines and
82 83 clients. The controller needs to be started before the engines and can be
83 84 configured using command line options or using a cluster directory. Cluster
84 85 directories contain config, log and security files and are usually located in
85 86 your ipython directory and named as "profile_name". See the `profile`
86 87 and `profile-dir` options for details.
87 88 """
88 89
89 90 _examples = """
90 91 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
91 92 ipcontroller --scheme=pure # use the pure zeromq scheduler
92 93 """
93 94
94 95
95 96 #-----------------------------------------------------------------------------
96 97 # The main application
97 98 #-----------------------------------------------------------------------------
98 99 flags = {}
99 100 flags.update(base_flags)
100 101 flags.update({
101 102 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
102 103 'Use threads instead of processes for the schedulers'),
103 104 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
104 105 'use the SQLiteDB backend'),
105 106 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
106 107 'use the MongoDB backend'),
107 108 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
108 109 'use the in-memory DictDB backend'),
109 110 'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}},
110 111 """use dummy DB backend, which doesn't store any information.
111 112
112 113 This can be used to prevent growth of the memory footprint of the Hub
113 114 in cases where its record-keeping is not required. Requesting results
114 115 of tasks submitted by other clients, db_queries, and task resubmission
115 116 will not be available."""),
116 117 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
117 118 'reuse existing json connection files')
118 119 })
119 120
120 121 flags.update(session_flags)
121 122
122 123 aliases = dict(
123 124 ssh = 'IPControllerApp.ssh_server',
124 125 enginessh = 'IPControllerApp.engine_ssh_server',
125 126 location = 'IPControllerApp.location',
126 127
127 128 url = 'HubFactory.url',
128 129 ip = 'HubFactory.ip',
129 130 transport = 'HubFactory.transport',
130 131 port = 'HubFactory.regport',
131 132
132 133 ping = 'HeartMonitor.period',
133 134
134 135 scheme = 'TaskScheduler.scheme_name',
135 136 hwm = 'TaskScheduler.hwm',
136 137 )
137 138 aliases.update(base_aliases)
138 139 aliases.update(session_aliases)
139 140
140 141
141 142 class IPControllerApp(BaseParallelApplication):
142 143
143 144 name = u'ipcontroller'
144 145 description = _description
145 146 examples = _examples
146 147 config_file_name = Unicode(default_config_file_name)
147 148 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
148 149
149 150 # change default to True
150 151 auto_create = Bool(True, config=True,
151 152 help="""Whether to create profile dir if it doesn't exist.""")
152 153
153 154 reuse_files = Bool(False, config=True,
154 help='Whether to reuse existing json connection files.'
155 help="""Whether to reuse existing json connection files.
156 If False, connection files will be removed on a clean exit.
157 """
155 158 )
156 159 ssh_server = Unicode(u'', config=True,
157 160 help="""ssh url for clients to use when connecting to the Controller
158 161 processes. It should be of the form: [user@]server[:port]. The
159 162 Controller's listening addresses must be accessible from the ssh server""",
160 163 )
161 164 engine_ssh_server = Unicode(u'', config=True,
162 165 help="""ssh url for engines to use when connecting to the Controller
163 166 processes. It should be of the form: [user@]server[:port]. The
164 167 Controller's listening addresses must be accessible from the ssh server""",
165 168 )
166 169 location = Unicode(u'', config=True,
167 170 help="""The external IP or domain name of the Controller, used for disambiguating
168 171 engine and client connections.""",
169 172 )
170 173 import_statements = List([], config=True,
171 174 help="import statements to be run at startup. Necessary in some environments"
172 175 )
173 176
174 177 use_threads = Bool(False, config=True,
175 178 help='Use threads instead of processes for the schedulers',
176 179 )
177 180
178 181 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
179 182 help="JSON filename where engine connection info will be stored.")
180 183 client_json_file = Unicode('ipcontroller-client.json', config=True,
181 184 help="JSON filename where client connection info will be stored.")
182 185
183 186 def _cluster_id_changed(self, name, old, new):
184 187 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
185 188 self.engine_json_file = "%s-engine.json" % self.name
186 189 self.client_json_file = "%s-client.json" % self.name
187 190
188 191
189 192 # internal
190 193 children = List()
191 194 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
192 195
193 196 def _use_threads_changed(self, name, old, new):
194 197 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
195 198
199 write_connection_files = Bool(True,
200 help="""Whether to write connection files to disk.
201 True in all cases other than runs with `reuse_files=True` *after the first*
202 """
203 )
204
196 205 aliases = Dict(aliases)
197 206 flags = Dict(flags)
198 207
199 208
200 209 def save_connection_dict(self, fname, cdict):
201 210 """save a connection dict to json file."""
202 211 c = self.config
203 212 url = cdict['url']
204 213 location = cdict['location']
205 214 if not location:
206 215 try:
207 216 proto,ip,port = split_url(url)
208 217 except AssertionError:
209 218 pass
210 219 else:
211 220 try:
212 221 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
213 222 except (socket.gaierror, IndexError):
214 223 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
215 224 " You may need to specify '--location=<external_ip_address>' to help"
216 225 " IPython decide when to connect via loopback.")
217 226 location = '127.0.0.1'
218 227 cdict['location'] = location
219 228 fname = os.path.join(self.profile_dir.security_dir, fname)
220 229 self.log.info("writing connection info to %s", fname)
221 230 with open(fname, 'w') as f:
222 231 f.write(json.dumps(cdict, indent=2))
223 232 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
224 233
225 234 def load_config_from_json(self):
226 235 """load config from existing json connector files."""
227 236 c = self.config
228 237 self.log.debug("loading config from JSON")
229 238 # load from engine config
230 239 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
231 240 self.log.info("loading connection info from %s", fname)
232 241 with open(fname) as f:
233 242 cfg = json.loads(f.read())
234 243 key = cfg['exec_key']
235 244 # json gives unicode, Session.key wants bytes
236 245 c.Session.key = key.encode('ascii')
237 246 xport,addr = cfg['url'].split('://')
238 247 c.HubFactory.engine_transport = xport
239 248 ip,ports = addr.split(':')
240 249 c.HubFactory.engine_ip = ip
241 250 c.HubFactory.regport = int(ports)
242 251 self.location = cfg['location']
243 252 if not self.engine_ssh_server:
244 253 self.engine_ssh_server = cfg['ssh']
245 254 # load client config
246 255 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
247 256 self.log.info("loading connection info from %s", fname)
248 257 with open(fname) as f:
249 258 cfg = json.loads(f.read())
250 259 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
251 260 xport,addr = cfg['url'].split('://')
252 261 c.HubFactory.client_transport = xport
253 262 ip,ports = addr.split(':')
254 263 c.HubFactory.client_ip = ip
255 264 if not self.ssh_server:
256 265 self.ssh_server = cfg['ssh']
257 266 assert int(ports) == c.HubFactory.regport, "regport mismatch"
258 267
268 def cleanup_connection_files(self):
269 if self.reuse_files:
270 self.log.debug("leaving JSON connection files for reuse")
271 return
272 self.log.debug("cleaning up JSON connection files")
273 for f in (self.client_json_file, self.engine_json_file):
274 f = os.path.join(self.profile_dir.security_dir, f)
275 try:
276 os.remove(f)
277 except Exception as e:
278 self.log.error("Failed to cleanup connection file: %s", e)
279 else:
280 self.log.debug(u"removed %s", f)
281
259 282 def load_secondary_config(self):
260 283 """secondary config, loading from JSON and setting defaults"""
261 284 if self.reuse_files:
262 285 try:
263 286 self.load_config_from_json()
264 287 except (AssertionError,IOError) as e:
265 288 self.log.error("Could not load config from JSON: %s" % e)
266 self.reuse_files=False
289 else:
290 # successfully loaded config from JSON, and reuse=True
291 # no need to wite back the same file
292 self.write_connection_files = False
293
267 294 # switch Session.key default to secure
268 295 default_secure(self.config)
269 296 self.log.debug("Config changed")
270 297 self.log.debug(repr(self.config))
271 298
272 299 def init_hub(self):
273 300 c = self.config
274 301
275 302 self.do_import_statements()
276 303
277 304 try:
278 305 self.factory = HubFactory(config=c, log=self.log)
279 306 # self.start_logging()
280 307 self.factory.init_hub()
281 308 except TraitError:
282 309 raise
283 310 except Exception:
284 311 self.log.error("Couldn't construct the Controller", exc_info=True)
285 312 self.exit(1)
286 313
287 if not self.reuse_files:
314 if self.write_connection_files:
288 315 # save to new json config files
289 316 f = self.factory
290 317 cdict = {'exec_key' : f.session.key.decode('ascii'),
291 318 'ssh' : self.ssh_server,
292 319 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
293 320 'location' : self.location
294 321 }
295 322 self.save_connection_dict(self.client_json_file, cdict)
296 323 edict = cdict
297 324 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
298 325 edict['ssh'] = self.engine_ssh_server
299 326 self.save_connection_dict(self.engine_json_file, edict)
300 327
301 #
302 328 def init_schedulers(self):
303 329 children = self.children
304 330 mq = import_item(str(self.mq_class))
305 331
306 332 hub = self.factory
307 333 # disambiguate url, in case of *
308 334 monitor_url = disambiguate_url(hub.monitor_url)
309 335 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
310 336 # IOPub relay (in a Process)
311 337 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
312 338 q.bind_in(hub.client_info['iopub'])
313 339 q.bind_out(hub.engine_info['iopub'])
314 340 q.setsockopt_out(zmq.SUBSCRIBE, b'')
315 341 q.connect_mon(monitor_url)
316 342 q.daemon=True
317 343 children.append(q)
318 344
319 345 # Multiplexer Queue (in a Process)
320 346 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
321 347 q.bind_in(hub.client_info['mux'])
322 348 q.setsockopt_in(zmq.IDENTITY, b'mux')
323 349 q.bind_out(hub.engine_info['mux'])
324 350 q.connect_mon(monitor_url)
325 351 q.daemon=True
326 352 children.append(q)
327 353
328 354 # Control Queue (in a Process)
329 355 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
330 356 q.bind_in(hub.client_info['control'])
331 357 q.setsockopt_in(zmq.IDENTITY, b'control')
332 358 q.bind_out(hub.engine_info['control'])
333 359 q.connect_mon(monitor_url)
334 360 q.daemon=True
335 361 children.append(q)
336 362 try:
337 363 scheme = self.config.TaskScheduler.scheme_name
338 364 except AttributeError:
339 365 scheme = TaskScheduler.scheme_name.get_default_value()
340 366 # Task Queue (in a Process)
341 367 if scheme == 'pure':
342 368 self.log.warn("task::using pure XREQ Task scheduler")
343 369 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
344 370 # q.setsockopt_out(zmq.HWM, hub.hwm)
345 371 q.bind_in(hub.client_info['task'][1])
346 372 q.setsockopt_in(zmq.IDENTITY, b'task')
347 373 q.bind_out(hub.engine_info['task'])
348 374 q.connect_mon(monitor_url)
349 375 q.daemon=True
350 376 children.append(q)
351 377 elif scheme == 'none':
352 378 self.log.warn("task::using no Task scheduler")
353 379
354 380 else:
355 381 self.log.info("task::using Python %s Task scheduler"%scheme)
356 382 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
357 383 monitor_url, disambiguate_url(hub.client_info['notification']))
358 384 kwargs = dict(logname='scheduler', loglevel=self.log_level,
359 385 log_url = self.log_url, config=dict(self.config))
360 386 if 'Process' in self.mq_class:
361 387 # run the Python scheduler in a Process
362 388 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
363 389 q.daemon=True
364 390 children.append(q)
365 391 else:
366 392 # single-threaded Controller
367 393 kwargs['in_thread'] = True
368 394 launch_scheduler(*sargs, **kwargs)
369 395
396 def terminate_children(self):
397 child_procs = []
398 for child in self.children:
399 if isinstance(child, ProcessMonitoredQueue):
400 child_procs.append(child.launcher)
401 elif isinstance(child, Process):
402 child_procs.append(child)
403 if child_procs:
404 self.log.critical("terminating children...")
405 for child in child_procs:
406 try:
407 child.terminate()
408 except OSError:
409 # already dead
410 pass
370 411
371 def save_urls(self):
372 """save the registration urls to files."""
373 c = self.config
374
375 sec_dir = self.profile_dir.security_dir
376 cf = self.factory
377
378 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
379 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
380
381 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
382 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
412 def handle_signal(self, sig, frame):
413 self.log.critical("Received signal %i, shutting down", sig)
414 self.terminate_children()
415 self.loop.stop()
383 416
417 def init_signal(self):
418 for sig in (SIGINT, SIGABRT, SIGTERM):
419 signal(sig, self.handle_signal)
384 420
385 421 def do_import_statements(self):
386 422 statements = self.import_statements
387 423 for s in statements:
388 424 try:
389 425 self.log.msg("Executing statement: '%s'" % s)
390 426 exec s in globals(), locals()
391 427 except:
392 428 self.log.msg("Error running statement: %s" % s)
393 429
394 430 def forward_logging(self):
395 431 if self.log_url:
396 432 self.log.info("Forwarding logging to %s"%self.log_url)
397 433 context = zmq.Context.instance()
398 434 lsock = context.socket(zmq.PUB)
399 435 lsock.connect(self.log_url)
400 436 handler = PUBHandler(lsock)
401 437 self.log.removeHandler(self._log_handler)
402 438 handler.root_topic = 'controller'
403 439 handler.setLevel(self.log_level)
404 440 self.log.addHandler(handler)
405 441 self._log_handler = handler
406 442
407 443 @catch_config_error
408 444 def initialize(self, argv=None):
409 445 super(IPControllerApp, self).initialize(argv)
410 446 self.forward_logging()
411 447 self.load_secondary_config()
412 448 self.init_hub()
413 449 self.init_schedulers()
414 450
415 451 def start(self):
416 452 # Start the subprocesses:
417 453 self.factory.start()
418 child_procs = []
454 # children must be started before signals are setup,
455 # otherwise signal-handling will fire multiple times
419 456 for child in self.children:
420 457 child.start()
421 if isinstance(child, ProcessMonitoredQueue):
422 child_procs.append(child.launcher)
423 elif isinstance(child, Process):
424 child_procs.append(child)
425 if child_procs:
426 signal_children(child_procs)
458 self.init_signal()
427 459
428 460 self.write_pid_file(overwrite=True)
429 461
430 462 try:
431 463 self.factory.loop.start()
432 464 except KeyboardInterrupt:
433 465 self.log.critical("Interrupted, Exiting...\n")
466 finally:
467 self.cleanup_connection_files()
434 468
435 469
436 470
437 471 def launch_new_instance():
438 472 """Create and run the IPython controller"""
439 473 if sys.platform == 'win32':
440 474 # make sure we don't get called from a multiprocessing subprocess
441 475 # this can result in infinite Controllers being started on Windows
442 476 # which doesn't have a proper fork, so multiprocessing is wonky
443 477
444 478 # this only comes up when IPython has been installed using vanilla
445 479 # setuptools, and *not* distribute.
446 480 import multiprocessing
447 481 p = multiprocessing.current_process()
448 482 # the main process has name 'MainProcess'
449 483 # subprocesses will have names like 'Process-1'
450 484 if p.name != 'MainProcess':
451 485 # we are a subprocess, don't start another Controller!
452 486 return
453 487 app = IPControllerApp.instance()
454 488 app.initialize()
455 489 app.start()
456 490
457 491
458 492 if __name__ == '__main__':
459 493 launch_new_instance()
@@ -1,726 +1,726 b''
1 1 """The Python scheduler for rich scheduling.
2 2
3 3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 5 Python Scheduler exists.
6 6
7 7 Authors:
8 8
9 9 * Min RK
10 10 """
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2010-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #----------------------------------------------------------------------
19 19 # Imports
20 20 #----------------------------------------------------------------------
21 21
22 22 from __future__ import print_function
23 23
24 24 import logging
25 25 import sys
26 26
27 27 from datetime import datetime, timedelta
28 28 from random import randint, random
29 29 from types import FunctionType
30 30
31 31 try:
32 32 import numpy
33 33 except ImportError:
34 34 numpy = None
35 35
36 36 import zmq
37 37 from zmq.eventloop import ioloop, zmqstream
38 38
39 39 # local imports
40 40 from IPython.external.decorator import decorator
41 41 from IPython.config.application import Application
42 42 from IPython.config.loader import Config
43 43 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
44 44
45 45 from IPython.parallel import error
46 46 from IPython.parallel.factory import SessionFactory
47 47 from IPython.parallel.util import connect_logger, local_logger, asbytes
48 48
49 49 from .dependency import Dependency
50 50
51 51 @decorator
52 52 def logged(f,self,*args,**kwargs):
53 53 # print ("#--------------------")
54 54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
55 55 # print ("#--")
56 56 return f(self,*args, **kwargs)
57 57
58 58 #----------------------------------------------------------------------
59 59 # Chooser functions
60 60 #----------------------------------------------------------------------
61 61
62 62 def plainrandom(loads):
63 63 """Plain random pick."""
64 64 n = len(loads)
65 65 return randint(0,n-1)
66 66
67 67 def lru(loads):
68 68 """Always pick the front of the line.
69 69
70 70 The content of `loads` is ignored.
71 71
72 72 Assumes LRU ordering of loads, with oldest first.
73 73 """
74 74 return 0
75 75
76 76 def twobin(loads):
77 77 """Pick two at random, use the LRU of the two.
78 78
79 79 The content of loads is ignored.
80 80
81 81 Assumes LRU ordering of loads, with oldest first.
82 82 """
83 83 n = len(loads)
84 84 a = randint(0,n-1)
85 85 b = randint(0,n-1)
86 86 return min(a,b)
87 87
88 88 def weighted(loads):
89 89 """Pick two at random using inverse load as weight.
90 90
91 91 Return the less loaded of the two.
92 92 """
93 93 # weight 0 a million times more than 1:
94 94 weights = 1./(1e-6+numpy.array(loads))
95 95 sums = weights.cumsum()
96 96 t = sums[-1]
97 97 x = random()*t
98 98 y = random()*t
99 99 idx = 0
100 100 idy = 0
101 101 while sums[idx] < x:
102 102 idx += 1
103 103 while sums[idy] < y:
104 104 idy += 1
105 105 if weights[idy] > weights[idx]:
106 106 return idy
107 107 else:
108 108 return idx
109 109
110 110 def leastload(loads):
111 111 """Always choose the lowest load.
112 112
113 113 If the lowest load occurs more than once, the first
114 114 occurance will be used. If loads has LRU ordering, this means
115 115 the LRU of those with the lowest load is chosen.
116 116 """
117 117 return loads.index(min(loads))
118 118
119 119 #---------------------------------------------------------------------
120 120 # Classes
121 121 #---------------------------------------------------------------------
122 122 # store empty default dependency:
123 123 MET = Dependency([])
124 124
125 125 class TaskScheduler(SessionFactory):
126 126 """Python TaskScheduler object.
127 127
128 128 This is the simplest object that supports msg_id based
129 129 DAG dependencies. *Only* task msg_ids are checked, not
130 130 msg_ids of jobs submitted via the MUX queue.
131 131
132 132 """
133 133
134 134 hwm = Integer(1, config=True,
135 135 help="""specify the High Water Mark (HWM) for the downstream
136 136 socket in the Task scheduler. This is the maximum number
137 137 of allowed outstanding tasks on each engine.
138 138
139 139 The default (1) means that only one task can be outstanding on each
140 140 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
141 141 engines continue to be assigned tasks while they are working,
142 142 effectively hiding network latency behind computation, but can result
143 143 in an imbalance of work when submitting many heterogenous tasks all at
144 144 once. Any positive value greater than one is a compromise between the
145 145 two.
146 146
147 147 """
148 148 )
149 149 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
150 150 'leastload', config=True, allow_none=False,
151 151 help="""select the task scheduler scheme [default: Python LRU]
152 152 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
153 153 )
154 154 def _scheme_name_changed(self, old, new):
155 155 self.log.debug("Using scheme %r"%new)
156 156 self.scheme = globals()[new]
157 157
158 158 # input arguments:
159 159 scheme = Instance(FunctionType) # function for determining the destination
160 160 def _scheme_default(self):
161 161 return leastload
162 162 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
163 163 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
164 164 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
165 165 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
166 166
167 167 # internals:
168 168 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
169 169 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
170 170 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
171 171 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
172 172 pending = Dict() # dict by engine_uuid of submitted tasks
173 173 completed = Dict() # dict by engine_uuid of completed tasks
174 174 failed = Dict() # dict by engine_uuid of failed tasks
175 175 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
176 176 clients = Dict() # dict by msg_id for who submitted the task
177 177 targets = List() # list of target IDENTs
178 178 loads = List() # list of engine loads
179 179 # full = Set() # set of IDENTs that have HWM outstanding tasks
180 180 all_completed = Set() # set of all completed tasks
181 181 all_failed = Set() # set of all failed tasks
182 182 all_done = Set() # set of all finished tasks=union(completed,failed)
183 183 all_ids = Set() # set of all submitted task IDs
184 184 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
185 185 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
186 186
187 187 ident = CBytes() # ZMQ identity. This should just be self.session.session
188 188 # but ensure Bytes
189 189 def _ident_default(self):
190 190 return self.session.bsession
191 191
192 192 def start(self):
193 193 self.engine_stream.on_recv(self.dispatch_result, copy=False)
194 194 self._notification_handlers = dict(
195 195 registration_notification = self._register_engine,
196 196 unregistration_notification = self._unregister_engine
197 197 )
198 198 self.notifier_stream.on_recv(self.dispatch_notification)
199 199 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
200 200 self.auditor.start()
201 201 self.log.info("Scheduler started [%s]"%self.scheme_name)
202 202
203 203 def resume_receiving(self):
204 204 """Resume accepting jobs."""
205 205 self.client_stream.on_recv(self.dispatch_submission, copy=False)
206 206
207 207 def stop_receiving(self):
208 208 """Stop accepting jobs while there are no engines.
209 209 Leave them in the ZMQ queue."""
210 210 self.client_stream.on_recv(None)
211 211
212 212 #-----------------------------------------------------------------------
213 213 # [Un]Registration Handling
214 214 #-----------------------------------------------------------------------
215 215
216 216 def dispatch_notification(self, msg):
217 217 """dispatch register/unregister events."""
218 218 try:
219 219 idents,msg = self.session.feed_identities(msg)
220 220 except ValueError:
221 221 self.log.warn("task::Invalid Message: %r",msg)
222 222 return
223 223 try:
224 224 msg = self.session.unserialize(msg)
225 225 except ValueError:
226 226 self.log.warn("task::Unauthorized message from: %r"%idents)
227 227 return
228 228
229 229 msg_type = msg['header']['msg_type']
230 230
231 231 handler = self._notification_handlers.get(msg_type, None)
232 232 if handler is None:
233 233 self.log.error("Unhandled message type: %r"%msg_type)
234 234 else:
235 235 try:
236 236 handler(asbytes(msg['content']['queue']))
237 237 except Exception:
238 238 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
239 239
240 240 def _register_engine(self, uid):
241 241 """New engine with ident `uid` became available."""
242 242 # head of the line:
243 243 self.targets.insert(0,uid)
244 244 self.loads.insert(0,0)
245 245
246 246 # initialize sets
247 247 self.completed[uid] = set()
248 248 self.failed[uid] = set()
249 249 self.pending[uid] = {}
250 250 if len(self.targets) == 1:
251 251 self.resume_receiving()
252 252 # rescan the graph:
253 253 self.update_graph(None)
254 254
255 255 def _unregister_engine(self, uid):
256 256 """Existing engine with ident `uid` became unavailable."""
257 257 if len(self.targets) == 1:
258 258 # this was our only engine
259 259 self.stop_receiving()
260 260
261 261 # handle any potentially finished tasks:
262 262 self.engine_stream.flush()
263 263
264 264 # don't pop destinations, because they might be used later
265 265 # map(self.destinations.pop, self.completed.pop(uid))
266 266 # map(self.destinations.pop, self.failed.pop(uid))
267 267
268 268 # prevent this engine from receiving work
269 269 idx = self.targets.index(uid)
270 270 self.targets.pop(idx)
271 271 self.loads.pop(idx)
272 272
273 273 # wait 5 seconds before cleaning up pending jobs, since the results might
274 274 # still be incoming
275 275 if self.pending[uid]:
276 276 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
277 277 dc.start()
278 278 else:
279 279 self.completed.pop(uid)
280 280 self.failed.pop(uid)
281 281
282 282
283 283 def handle_stranded_tasks(self, engine):
284 284 """Deal with jobs resident in an engine that died."""
285 285 lost = self.pending[engine]
286 286 for msg_id in lost.keys():
287 287 if msg_id not in self.pending[engine]:
288 288 # prevent double-handling of messages
289 289 continue
290 290
291 291 raw_msg = lost[msg_id][0]
292 292 idents,msg = self.session.feed_identities(raw_msg, copy=False)
293 293 parent = self.session.unpack(msg[1].bytes)
294 294 idents = [engine, idents[0]]
295 295
296 296 # build fake error reply
297 297 try:
298 298 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
299 299 except:
300 300 content = error.wrap_exception()
301 301 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
302 302 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
303 303 # and dispatch it
304 304 self.dispatch_result(raw_reply)
305 305
306 306 # finally scrub completed/failed lists
307 307 self.completed.pop(engine)
308 308 self.failed.pop(engine)
309 309
310 310
311 311 #-----------------------------------------------------------------------
312 312 # Job Submission
313 313 #-----------------------------------------------------------------------
314 314 def dispatch_submission(self, raw_msg):
315 315 """Dispatch job submission to appropriate handlers."""
316 316 # ensure targets up to date:
317 317 self.notifier_stream.flush()
318 318 try:
319 319 idents, msg = self.session.feed_identities(raw_msg, copy=False)
320 320 msg = self.session.unserialize(msg, content=False, copy=False)
321 321 except Exception:
322 322 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
323 323 return
324 324
325 325
326 326 # send to monitor
327 327 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
328 328
329 329 header = msg['header']
330 330 msg_id = header['msg_id']
331 331 self.all_ids.add(msg_id)
332 332
333 333 # get targets as a set of bytes objects
334 334 # from a list of unicode objects
335 335 targets = header.get('targets', [])
336 336 targets = map(asbytes, targets)
337 337 targets = set(targets)
338 338
339 339 retries = header.get('retries', 0)
340 340 self.retries[msg_id] = retries
341 341
342 342 # time dependencies
343 343 after = header.get('after', None)
344 344 if after:
345 345 after = Dependency(after)
346 346 if after.all:
347 347 if after.success:
348 348 after = Dependency(after.difference(self.all_completed),
349 349 success=after.success,
350 350 failure=after.failure,
351 351 all=after.all,
352 352 )
353 353 if after.failure:
354 354 after = Dependency(after.difference(self.all_failed),
355 355 success=after.success,
356 356 failure=after.failure,
357 357 all=after.all,
358 358 )
359 359 if after.check(self.all_completed, self.all_failed):
360 360 # recast as empty set, if `after` already met,
361 361 # to prevent unnecessary set comparisons
362 362 after = MET
363 363 else:
364 364 after = MET
365 365
366 366 # location dependencies
367 367 follow = Dependency(header.get('follow', []))
368 368
369 369 # turn timeouts into datetime objects:
370 370 timeout = header.get('timeout', None)
371 371 if timeout:
372 372 # cast to float, because jsonlib returns floats as decimal.Decimal,
373 373 # which timedelta does not accept
374 374 timeout = datetime.now() + timedelta(0,float(timeout),0)
375 375
376 376 args = [raw_msg, targets, after, follow, timeout]
377 377
378 378 # validate and reduce dependencies:
379 379 for dep in after,follow:
380 380 if not dep: # empty dependency
381 381 continue
382 382 # check valid:
383 383 if msg_id in dep or dep.difference(self.all_ids):
384 384 self.depending[msg_id] = args
385 385 return self.fail_unreachable(msg_id, error.InvalidDependency)
386 386 # check if unreachable:
387 387 if dep.unreachable(self.all_completed, self.all_failed):
388 388 self.depending[msg_id] = args
389 389 return self.fail_unreachable(msg_id)
390 390
391 391 if after.check(self.all_completed, self.all_failed):
392 392 # time deps already met, try to run
393 393 if not self.maybe_run(msg_id, *args):
394 394 # can't run yet
395 395 if msg_id not in self.all_failed:
396 396 # could have failed as unreachable
397 397 self.save_unmet(msg_id, *args)
398 398 else:
399 399 self.save_unmet(msg_id, *args)
400 400
401 401 def audit_timeouts(self):
402 402 """Audit all waiting tasks for expired timeouts."""
403 403 now = datetime.now()
404 404 for msg_id in self.depending.keys():
405 405 # must recheck, in case one failure cascaded to another:
406 406 if msg_id in self.depending:
407 407 raw,after,targets,follow,timeout = self.depending[msg_id]
408 408 if timeout and timeout < now:
409 409 self.fail_unreachable(msg_id, error.TaskTimeout)
410 410
411 411 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
412 412 """a task has become unreachable, send a reply with an ImpossibleDependency
413 413 error."""
414 414 if msg_id not in self.depending:
415 415 self.log.error("msg %r already failed!", msg_id)
416 416 return
417 417 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
418 418 for mid in follow.union(after):
419 419 if mid in self.graph:
420 420 self.graph[mid].remove(msg_id)
421 421
422 422 # FIXME: unpacking a message I've already unpacked, but didn't save:
423 423 idents,msg = self.session.feed_identities(raw_msg, copy=False)
424 424 header = self.session.unpack(msg[1].bytes)
425 425
426 426 try:
427 427 raise why()
428 428 except:
429 429 content = error.wrap_exception()
430 430
431 431 self.all_done.add(msg_id)
432 432 self.all_failed.add(msg_id)
433 433
434 434 msg = self.session.send(self.client_stream, 'apply_reply', content,
435 435 parent=header, ident=idents)
436 436 self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents)
437 437
438 438 self.update_graph(msg_id, success=False)
439 439
440 440 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
441 441 """check location dependencies, and run if they are met."""
442 442 blacklist = self.blacklist.setdefault(msg_id, set())
443 443 if follow or targets or blacklist or self.hwm:
444 444 # we need a can_run filter
445 445 def can_run(idx):
446 446 # check hwm
447 447 if self.hwm and self.loads[idx] == self.hwm:
448 448 return False
449 449 target = self.targets[idx]
450 450 # check blacklist
451 451 if target in blacklist:
452 452 return False
453 453 # check targets
454 454 if targets and target not in targets:
455 455 return False
456 456 # check follow
457 457 return follow.check(self.completed[target], self.failed[target])
458 458
459 459 indices = filter(can_run, range(len(self.targets)))
460 460
461 461 if not indices:
462 462 # couldn't run
463 463 if follow.all:
464 464 # check follow for impossibility
465 465 dests = set()
466 466 relevant = set()
467 467 if follow.success:
468 468 relevant = self.all_completed
469 469 if follow.failure:
470 470 relevant = relevant.union(self.all_failed)
471 471 for m in follow.intersection(relevant):
472 472 dests.add(self.destinations[m])
473 473 if len(dests) > 1:
474 474 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
475 475 self.fail_unreachable(msg_id)
476 476 return False
477 477 if targets:
478 478 # check blacklist+targets for impossibility
479 479 targets.difference_update(blacklist)
480 480 if not targets or not targets.intersection(self.targets):
481 481 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
482 482 self.fail_unreachable(msg_id)
483 483 return False
484 484 return False
485 485 else:
486 486 indices = None
487 487
488 488 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
489 489 return True
490 490
491 491 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
492 492 """Save a message for later submission when its dependencies are met."""
493 493 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
494 494 # track the ids in follow or after, but not those already finished
495 495 for dep_id in after.union(follow).difference(self.all_done):
496 496 if dep_id not in self.graph:
497 497 self.graph[dep_id] = set()
498 498 self.graph[dep_id].add(msg_id)
499 499
500 500 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
501 501 """Submit a task to any of a subset of our targets."""
502 502 if indices:
503 503 loads = [self.loads[i] for i in indices]
504 504 else:
505 505 loads = self.loads
506 506 idx = self.scheme(loads)
507 507 if indices:
508 508 idx = indices[idx]
509 509 target = self.targets[idx]
510 510 # print (target, map(str, msg[:3]))
511 511 # send job to the engine
512 512 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
513 513 self.engine_stream.send_multipart(raw_msg, copy=False)
514 514 # update load
515 515 self.add_job(idx)
516 516 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
517 517 # notify Hub
518 518 content = dict(msg_id=msg_id, engine_id=target.decode('ascii'))
519 519 self.session.send(self.mon_stream, 'task_destination', content=content,
520 520 ident=[b'tracktask',self.ident])
521 521
522 522
523 523 #-----------------------------------------------------------------------
524 524 # Result Handling
525 525 #-----------------------------------------------------------------------
526 526 def dispatch_result(self, raw_msg):
527 527 """dispatch method for result replies"""
528 528 try:
529 529 idents,msg = self.session.feed_identities(raw_msg, copy=False)
530 530 msg = self.session.unserialize(msg, content=False, copy=False)
531 531 engine = idents[0]
532 532 try:
533 533 idx = self.targets.index(engine)
534 534 except ValueError:
535 535 pass # skip load-update for dead engines
536 536 else:
537 537 self.finish_job(idx)
538 538 except Exception:
539 539 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
540 540 return
541 541
542 542 header = msg['header']
543 543 parent = msg['parent_header']
544 544 if header.get('dependencies_met', True):
545 545 success = (header['status'] == 'ok')
546 546 msg_id = parent['msg_id']
547 547 retries = self.retries[msg_id]
548 548 if not success and retries > 0:
549 549 # failed
550 550 self.retries[msg_id] = retries - 1
551 551 self.handle_unmet_dependency(idents, parent)
552 552 else:
553 553 del self.retries[msg_id]
554 554 # relay to client and update graph
555 555 self.handle_result(idents, parent, raw_msg, success)
556 556 # send to Hub monitor
557 557 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
558 558 else:
559 559 self.handle_unmet_dependency(idents, parent)
560 560
561 561 def handle_result(self, idents, parent, raw_msg, success=True):
562 562 """handle a real task result, either success or failure"""
563 563 # first, relay result to client
564 564 engine = idents[0]
565 565 client = idents[1]
566 566 # swap_ids for XREP-XREP mirror
567 567 raw_msg[:2] = [client,engine]
568 568 # print (map(str, raw_msg[:4]))
569 569 self.client_stream.send_multipart(raw_msg, copy=False)
570 570 # now, update our data structures
571 571 msg_id = parent['msg_id']
572 572 self.blacklist.pop(msg_id, None)
573 573 self.pending[engine].pop(msg_id)
574 574 if success:
575 575 self.completed[engine].add(msg_id)
576 576 self.all_completed.add(msg_id)
577 577 else:
578 578 self.failed[engine].add(msg_id)
579 579 self.all_failed.add(msg_id)
580 580 self.all_done.add(msg_id)
581 581 self.destinations[msg_id] = engine
582 582
583 583 self.update_graph(msg_id, success)
584 584
585 585 def handle_unmet_dependency(self, idents, parent):
586 586 """handle an unmet dependency"""
587 587 engine = idents[0]
588 588 msg_id = parent['msg_id']
589 589
590 590 if msg_id not in self.blacklist:
591 591 self.blacklist[msg_id] = set()
592 592 self.blacklist[msg_id].add(engine)
593 593
594 594 args = self.pending[engine].pop(msg_id)
595 595 raw,targets,after,follow,timeout = args
596 596
597 597 if self.blacklist[msg_id] == targets:
598 598 self.depending[msg_id] = args
599 599 self.fail_unreachable(msg_id)
600 600 elif not self.maybe_run(msg_id, *args):
601 601 # resubmit failed
602 602 if msg_id not in self.all_failed:
603 603 # put it back in our dependency tree
604 604 self.save_unmet(msg_id, *args)
605 605
606 606 if self.hwm:
607 607 try:
608 608 idx = self.targets.index(engine)
609 609 except ValueError:
610 610 pass # skip load-update for dead engines
611 611 else:
612 612 if self.loads[idx] == self.hwm-1:
613 613 self.update_graph(None)
614 614
615 615
616 616
617 617 def update_graph(self, dep_id=None, success=True):
618 618 """dep_id just finished. Update our dependency
619 619 graph and submit any jobs that just became runable.
620 620
621 621 Called with dep_id=None to update entire graph for hwm, but without finishing
622 622 a task.
623 623 """
624 624 # print ("\n\n***********")
625 625 # pprint (dep_id)
626 626 # pprint (self.graph)
627 627 # pprint (self.depending)
628 628 # pprint (self.all_completed)
629 629 # pprint (self.all_failed)
630 630 # print ("\n\n***********\n\n")
631 631 # update any jobs that depended on the dependency
632 632 jobs = self.graph.pop(dep_id, [])
633 633
634 634 # recheck *all* jobs if
635 635 # a) we have HWM and an engine just become no longer full
636 636 # or b) dep_id was given as None
637 637 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
638 638 jobs = self.depending.keys()
639 639
640 640 for msg_id in jobs:
641 641 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
642 642
643 643 if after.unreachable(self.all_completed, self.all_failed)\
644 644 or follow.unreachable(self.all_completed, self.all_failed):
645 645 self.fail_unreachable(msg_id)
646 646
647 647 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
648 648 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
649 649
650 650 self.depending.pop(msg_id)
651 651 for mid in follow.union(after):
652 652 if mid in self.graph:
653 653 self.graph[mid].remove(msg_id)
654 654
655 655 #----------------------------------------------------------------------
656 656 # methods to be overridden by subclasses
657 657 #----------------------------------------------------------------------
658 658
659 659 def add_job(self, idx):
660 660 """Called after self.targets[idx] just got the job with header.
661 661 Override with subclasses. The default ordering is simple LRU.
662 662 The default loads are the number of outstanding jobs."""
663 663 self.loads[idx] += 1
664 664 for lis in (self.targets, self.loads):
665 665 lis.append(lis.pop(idx))
666 666
667 667
668 668 def finish_job(self, idx):
669 669 """Called after self.targets[idx] just finished a job.
670 670 Override with subclasses."""
671 671 self.loads[idx] -= 1
672 672
673 673
674 674
675 675 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
676 676 logname='root', log_url=None, loglevel=logging.DEBUG,
677 677 identity=b'task', in_thread=False):
678 678
679 679 ZMQStream = zmqstream.ZMQStream
680 680
681 681 if config:
682 682 # unwrap dict back into Config
683 683 config = Config(config)
684 684
685 685 if in_thread:
686 686 # use instance() to get the same Context/Loop as our parent
687 687 ctx = zmq.Context.instance()
688 688 loop = ioloop.IOLoop.instance()
689 689 else:
690 690 # in a process, don't use instance()
691 691 # for safety with multiprocessing
692 692 ctx = zmq.Context()
693 693 loop = ioloop.IOLoop()
694 694 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
695 695 ins.setsockopt(zmq.IDENTITY, identity)
696 696 ins.bind(in_addr)
697 697
698 698 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
699 699 outs.setsockopt(zmq.IDENTITY, identity)
700 700 outs.bind(out_addr)
701 701 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
702 702 mons.connect(mon_addr)
703 703 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
704 704 nots.setsockopt(zmq.SUBSCRIBE, b'')
705 705 nots.connect(not_addr)
706 706
707 707 # setup logging.
708 708 if in_thread:
709 709 log = Application.instance().log
710 710 else:
711 711 if log_url:
712 712 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
713 713 else:
714 714 log = local_logger(logname, loglevel)
715 715
716 716 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
717 717 mon_stream=mons, notifier_stream=nots,
718 718 loop=loop, log=log,
719 719 config=config)
720 720 scheduler.start()
721 721 if not in_thread:
722 722 try:
723 723 loop.start()
724 724 except KeyboardInterrupt:
725 print ("interrupted, exiting...", file=sys.__stderr__)
725 scheduler.log.critical("Interrupted, exiting...")
726 726
General Comments 0
You need to be logged in to leave comments. Login now