Show More
@@ -205,9 +205,9 class IPControllerAppConfigLoader(ClusterDirConfigLoader): | |||||
205 | help='Use threads instead of processes for the schedulers', |
|
205 | help='Use threads instead of processes for the schedulers', | |
206 | ) |
|
206 | ) | |
207 | paa('--hwm', |
|
207 | paa('--hwm', | |
208 |
dest=' |
|
208 | dest='TaskScheduler.hwm', type=int, | |
209 |
help='specify the High Water Mark (HWM) |
|
209 | help='specify the High Water Mark (HWM) ' | |
210 |
' |
|
210 | 'in the Python scheduler. This is the maximum number ' | |
211 | 'of allowed outstanding tasks on each engine.', |
|
211 | 'of allowed outstanding tasks on each engine.', | |
212 | ) |
|
212 | ) | |
213 |
|
213 |
@@ -35,8 +35,6 class ControllerFactory(HubFactory): | |||||
35 | """Configurable for setting up a Hub and Schedulers.""" |
|
35 | """Configurable for setting up a Hub and Schedulers.""" | |
36 |
|
36 | |||
37 | usethreads = Bool(False, config=True) |
|
37 | usethreads = Bool(False, config=True) | |
38 | # pure-zmq downstream HWM |
|
|||
39 | hwm = Int(0, config=True) |
|
|||
40 |
|
38 | |||
41 | # internal |
|
39 | # internal | |
42 | children = List() |
|
40 | children = List() | |
@@ -97,7 +95,6 class ControllerFactory(HubFactory): | |||||
97 | if self.scheme == 'pure': |
|
95 | if self.scheme == 'pure': | |
98 | self.log.warn("task::using pure XREQ Task scheduler") |
|
96 | self.log.warn("task::using pure XREQ Task scheduler") | |
99 | q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') |
|
97 | q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') | |
100 | q.setsockopt_out(zmq.HWM, self.hwm) |
|
|||
101 | q.bind_in(self.client_info['task'][1]) |
|
98 | q.bind_in(self.client_info['task'][1]) | |
102 | q.setsockopt_in(zmq.IDENTITY, 'task') |
|
99 | q.setsockopt_in(zmq.IDENTITY, 'task') | |
103 | q.bind_out(self.engine_info['task']) |
|
100 | q.bind_out(self.engine_info['task']) |
General Comments 0
You need to be logged in to leave comments.
Login now