##// END OF EJS Templates
add '-s' for startup script in ipengine...
MinRK -
Show More
@@ -1,295 +1,303
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import json
18 import json
19 import os
19 import os
20 import sys
20 import sys
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24
24
25 from .clusterdir import (
25 from .clusterdir import (
26 ApplicationWithClusterDir,
26 ApplicationWithClusterDir,
27 ClusterDirConfigLoader
27 ClusterDirConfigLoader
28 )
28 )
29 from IPython.zmq.log import EnginePUBHandler
29 from IPython.zmq.log import EnginePUBHandler
30
30
31 from IPython.parallel import factory
31 from IPython.parallel import factory
32 from IPython.parallel.engine.engine import EngineFactory
32 from IPython.parallel.engine.engine import EngineFactory
33 from IPython.parallel.engine.streamkernel import Kernel
33 from IPython.parallel.engine.streamkernel import Kernel
34 from IPython.parallel.util import disambiguate_url
34 from IPython.parallel.util import disambiguate_url
35
35
36 from IPython.utils.importstring import import_item
36 from IPython.utils.importstring import import_item
37
37
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # Module level variables
40 # Module level variables
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43 #: The default config file name for this application
43 #: The default config file name for this application
44 default_config_file_name = u'ipengine_config.py'
44 default_config_file_name = u'ipengine_config.py'
45
45
46
46
47 mpi4py_init = """from mpi4py import MPI as mpi
47 mpi4py_init = """from mpi4py import MPI as mpi
48 mpi.size = mpi.COMM_WORLD.Get_size()
48 mpi.size = mpi.COMM_WORLD.Get_size()
49 mpi.rank = mpi.COMM_WORLD.Get_rank()
49 mpi.rank = mpi.COMM_WORLD.Get_rank()
50 """
50 """
51
51
52
52
53 pytrilinos_init = """from PyTrilinos import Epetra
53 pytrilinos_init = """from PyTrilinos import Epetra
54 class SimpleStruct:
54 class SimpleStruct:
55 pass
55 pass
56 mpi = SimpleStruct()
56 mpi = SimpleStruct()
57 mpi.rank = 0
57 mpi.rank = 0
58 mpi.size = 0
58 mpi.size = 0
59 """
59 """
60
60
61
61
62 _description = """Start an IPython engine for parallel computing.\n\n
62 _description = """Start an IPython engine for parallel computing.\n\n
63
63
64 IPython engines run in parallel and perform computations on behalf of a client
64 IPython engines run in parallel and perform computations on behalf of a client
65 and controller. A controller needs to be started before the engines. The
65 and controller. A controller needs to be started before the engines. The
66 engine can be configured using command line options or using a cluster
66 engine can be configured using command line options or using a cluster
67 directory. Cluster directories contain config, log and security files and are
67 directory. Cluster directories contain config, log and security files and are
68 usually located in your ipython directory and named as "cluster_<profile>".
68 usually located in your ipython directory and named as "cluster_<profile>".
69 See the --profile and --cluster-dir options for details.
69 See the --profile and --cluster-dir options for details.
70 """
70 """
71
71
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 # Command line options
73 # Command line options
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75
75
76
76
77 class IPEngineAppConfigLoader(ClusterDirConfigLoader):
77 class IPEngineAppConfigLoader(ClusterDirConfigLoader):
78
78
79 def _add_arguments(self):
79 def _add_arguments(self):
80 super(IPEngineAppConfigLoader, self)._add_arguments()
80 super(IPEngineAppConfigLoader, self)._add_arguments()
81 paa = self.parser.add_argument
81 paa = self.parser.add_argument
82 # Controller config
82 # Controller config
83 paa('--file', '-f',
83 paa('--file', '-f',
84 type=unicode, dest='Global.url_file',
84 type=unicode, dest='Global.url_file',
85 help='The full location of the file containing the connection information fo '
85 help='The full location of the file containing the connection information fo '
86 'controller. If this is not given, the file must be in the '
86 'controller. If this is not given, the file must be in the '
87 'security directory of the cluster directory. This location is '
87 'security directory of the cluster directory. This location is '
88 'resolved using the --profile and --app-dir options.',
88 'resolved using the --profile and --app-dir options.',
89 metavar='Global.url_file')
89 metavar='Global.url_file')
90 # MPI
90 # MPI
91 paa('--mpi',
91 paa('--mpi',
92 type=str, dest='MPI.use',
92 type=str, dest='MPI.use',
93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
94 metavar='MPI.use')
94 metavar='MPI.use')
95 # Global config
95 # Global config
96 paa('--log-to-file',
96 paa('--log-to-file',
97 action='store_true', dest='Global.log_to_file',
97 action='store_true', dest='Global.log_to_file',
98 help='Log to a file in the log directory (default is stdout)')
98 help='Log to a file in the log directory (default is stdout)')
99 paa('--log-url',
99 paa('--log-url',
100 dest='Global.log_url',
100 dest='Global.log_url',
101 help="url of ZMQ logger, as started with iploggerz")
101 help="url of ZMQ logger, as started with iploggerz")
102 # paa('--execkey',
102 # paa('--execkey',
103 # type=str, dest='Global.exec_key',
103 # type=str, dest='Global.exec_key',
104 # help='path to a file containing an execution key.',
104 # help='path to a file containing an execution key.',
105 # metavar='keyfile')
105 # metavar='keyfile')
106 # paa('--no-secure',
106 # paa('--no-secure',
107 # action='store_false', dest='Global.secure',
107 # action='store_false', dest='Global.secure',
108 # help='Turn off execution keys.')
108 # help='Turn off execution keys.')
109 # paa('--secure',
109 # paa('--secure',
110 # action='store_true', dest='Global.secure',
110 # action='store_true', dest='Global.secure',
111 # help='Turn on execution keys (default).')
111 # help='Turn on execution keys (default).')
112 # init command
112 # init command
113 paa('-c',
113 paa('-c',
114 type=str, dest='Global.extra_exec_lines',
114 type=str, dest='Global.extra_exec_lines',
115 help='specify a command to be run at startup')
115 help='specify a command to be run at startup')
116 paa('-s',
117 type=unicode, dest='Global.extra_exec_file',
118 help='specify a script to be run at startup')
116
119
117 factory.add_session_arguments(self.parser)
120 factory.add_session_arguments(self.parser)
118 factory.add_registration_arguments(self.parser)
121 factory.add_registration_arguments(self.parser)
119
122
120
123
121 #-----------------------------------------------------------------------------
124 #-----------------------------------------------------------------------------
122 # Main application
125 # Main application
123 #-----------------------------------------------------------------------------
126 #-----------------------------------------------------------------------------
124
127
125
128
126 class IPEngineApp(ApplicationWithClusterDir):
129 class IPEngineApp(ApplicationWithClusterDir):
127
130
128 name = u'ipengine'
131 name = u'ipengine'
129 description = _description
132 description = _description
130 command_line_loader = IPEngineAppConfigLoader
133 command_line_loader = IPEngineAppConfigLoader
131 default_config_file_name = default_config_file_name
134 default_config_file_name = default_config_file_name
132 auto_create_cluster_dir = True
135 auto_create_cluster_dir = True
133
136
134 def create_default_config(self):
137 def create_default_config(self):
135 super(IPEngineApp, self).create_default_config()
138 super(IPEngineApp, self).create_default_config()
136
139
137 # The engine should not clean logs as we don't want to remove the
140 # The engine should not clean logs as we don't want to remove the
138 # active log files of other running engines.
141 # active log files of other running engines.
139 self.default_config.Global.clean_logs = False
142 self.default_config.Global.clean_logs = False
140 self.default_config.Global.secure = True
143 self.default_config.Global.secure = True
141
144
142 # Global config attributes
145 # Global config attributes
143 self.default_config.Global.exec_lines = []
146 self.default_config.Global.exec_lines = []
144 self.default_config.Global.extra_exec_lines = ''
147 self.default_config.Global.extra_exec_lines = ''
148 self.default_config.Global.extra_exec_file = u''
145
149
146 # Configuration related to the controller
150 # Configuration related to the controller
147 # This must match the filename (path not included) that the controller
151 # This must match the filename (path not included) that the controller
148 # used for the FURL file.
152 # used for the FURL file.
149 self.default_config.Global.url_file = u''
153 self.default_config.Global.url_file = u''
150 self.default_config.Global.url_file_name = u'ipcontroller-engine.json'
154 self.default_config.Global.url_file_name = u'ipcontroller-engine.json'
151 # If given, this is the actual location of the controller's FURL file.
155 # If given, this is the actual location of the controller's FURL file.
152 # If not, this is computed using the profile, app_dir and furl_file_name
156 # If not, this is computed using the profile, app_dir and furl_file_name
153 # self.default_config.Global.key_file_name = u'exec_key.key'
157 # self.default_config.Global.key_file_name = u'exec_key.key'
154 # self.default_config.Global.key_file = u''
158 # self.default_config.Global.key_file = u''
155
159
156 # MPI related config attributes
160 # MPI related config attributes
157 self.default_config.MPI.use = ''
161 self.default_config.MPI.use = ''
158 self.default_config.MPI.mpi4py = mpi4py_init
162 self.default_config.MPI.mpi4py = mpi4py_init
159 self.default_config.MPI.pytrilinos = pytrilinos_init
163 self.default_config.MPI.pytrilinos = pytrilinos_init
160
164
161 def post_load_command_line_config(self):
165 def post_load_command_line_config(self):
162 pass
166 pass
163
167
164 def pre_construct(self):
168 def pre_construct(self):
165 super(IPEngineApp, self).pre_construct()
169 super(IPEngineApp, self).pre_construct()
166 # self.find_cont_url_file()
170 # self.find_cont_url_file()
167 self.find_url_file()
171 self.find_url_file()
168 if self.master_config.Global.extra_exec_lines:
172 if self.master_config.Global.extra_exec_lines:
169 self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines)
173 self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines)
174 if self.master_config.Global.extra_exec_file:
175 enc = sys.getfilesystemencoding() or 'utf8'
176 cmd="execfile(%r)"%self.master_config.Global.extra_exec_file.encode(enc)
177 self.master_config.Global.exec_lines.append(cmd)
170
178
171 # def find_key_file(self):
179 # def find_key_file(self):
172 # """Set the key file.
180 # """Set the key file.
173 #
181 #
174 # Here we don't try to actually see if it exists for is valid as that
182 # Here we don't try to actually see if it exists for is valid as that
175 # is hadled by the connection logic.
183 # is hadled by the connection logic.
176 # """
184 # """
177 # config = self.master_config
185 # config = self.master_config
178 # # Find the actual controller key file
186 # # Find the actual controller key file
179 # if not config.Global.key_file:
187 # if not config.Global.key_file:
180 # try_this = os.path.join(
188 # try_this = os.path.join(
181 # config.Global.cluster_dir,
189 # config.Global.cluster_dir,
182 # config.Global.security_dir,
190 # config.Global.security_dir,
183 # config.Global.key_file_name
191 # config.Global.key_file_name
184 # )
192 # )
185 # config.Global.key_file = try_this
193 # config.Global.key_file = try_this
186
194
187 def find_url_file(self):
195 def find_url_file(self):
188 """Set the key file.
196 """Set the key file.
189
197
190 Here we don't try to actually see if it exists for is valid as that
198 Here we don't try to actually see if it exists for is valid as that
191 is hadled by the connection logic.
199 is hadled by the connection logic.
192 """
200 """
193 config = self.master_config
201 config = self.master_config
194 # Find the actual controller key file
202 # Find the actual controller key file
195 if not config.Global.url_file:
203 if not config.Global.url_file:
196 try_this = os.path.join(
204 try_this = os.path.join(
197 config.Global.cluster_dir,
205 config.Global.cluster_dir,
198 config.Global.security_dir,
206 config.Global.security_dir,
199 config.Global.url_file_name
207 config.Global.url_file_name
200 )
208 )
201 config.Global.url_file = try_this
209 config.Global.url_file = try_this
202
210
203 def construct(self):
211 def construct(self):
204 # This is the working dir by now.
212 # This is the working dir by now.
205 sys.path.insert(0, '')
213 sys.path.insert(0, '')
206 config = self.master_config
214 config = self.master_config
207 # if os.path.exists(config.Global.key_file) and config.Global.secure:
215 # if os.path.exists(config.Global.key_file) and config.Global.secure:
208 # config.SessionFactory.exec_key = config.Global.key_file
216 # config.SessionFactory.exec_key = config.Global.key_file
209 if os.path.exists(config.Global.url_file):
217 if os.path.exists(config.Global.url_file):
210 with open(config.Global.url_file) as f:
218 with open(config.Global.url_file) as f:
211 d = json.loads(f.read())
219 d = json.loads(f.read())
212 for k,v in d.iteritems():
220 for k,v in d.iteritems():
213 if isinstance(v, unicode):
221 if isinstance(v, unicode):
214 d[k] = v.encode()
222 d[k] = v.encode()
215 if d['exec_key']:
223 if d['exec_key']:
216 config.SessionFactory.exec_key = d['exec_key']
224 config.SessionFactory.exec_key = d['exec_key']
217 d['url'] = disambiguate_url(d['url'], d['location'])
225 d['url'] = disambiguate_url(d['url'], d['location'])
218 config.RegistrationFactory.url=d['url']
226 config.RegistrationFactory.url=d['url']
219 config.EngineFactory.location = d['location']
227 config.EngineFactory.location = d['location']
220
228
221
229
222
230
223 config.Kernel.exec_lines = config.Global.exec_lines
231 config.Kernel.exec_lines = config.Global.exec_lines
224
232
225 self.start_mpi()
233 self.start_mpi()
226
234
227 # Create the underlying shell class and EngineService
235 # Create the underlying shell class and EngineService
228 # shell_class = import_item(self.master_config.Global.shell_class)
236 # shell_class = import_item(self.master_config.Global.shell_class)
229 try:
237 try:
230 self.engine = EngineFactory(config=config, logname=self.log.name)
238 self.engine = EngineFactory(config=config, logname=self.log.name)
231 except:
239 except:
232 self.log.error("Couldn't start the Engine", exc_info=True)
240 self.log.error("Couldn't start the Engine", exc_info=True)
233 self.exit(1)
241 self.exit(1)
234
242
235 self.start_logging()
243 self.start_logging()
236
244
237 # Create the service hierarchy
245 # Create the service hierarchy
238 # self.main_service = service.MultiService()
246 # self.main_service = service.MultiService()
239 # self.engine_service.setServiceParent(self.main_service)
247 # self.engine_service.setServiceParent(self.main_service)
240 # self.tub_service = Tub()
248 # self.tub_service = Tub()
241 # self.tub_service.setServiceParent(self.main_service)
249 # self.tub_service.setServiceParent(self.main_service)
242 # # This needs to be called before the connection is initiated
250 # # This needs to be called before the connection is initiated
243 # self.main_service.startService()
251 # self.main_service.startService()
244
252
245 # This initiates the connection to the controller and calls
253 # This initiates the connection to the controller and calls
246 # register_engine to tell the controller we are ready to do work
254 # register_engine to tell the controller we are ready to do work
247 # self.engine_connector = EngineConnector(self.tub_service)
255 # self.engine_connector = EngineConnector(self.tub_service)
248
256
249 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
257 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
250
258
251 # reactor.callWhenRunning(self.call_connect)
259 # reactor.callWhenRunning(self.call_connect)
252
260
253
261
254 def start_logging(self):
262 def start_logging(self):
255 super(IPEngineApp, self).start_logging()
263 super(IPEngineApp, self).start_logging()
256 if self.master_config.Global.log_url:
264 if self.master_config.Global.log_url:
257 context = self.engine.context
265 context = self.engine.context
258 lsock = context.socket(zmq.PUB)
266 lsock = context.socket(zmq.PUB)
259 lsock.connect(self.master_config.Global.log_url)
267 lsock.connect(self.master_config.Global.log_url)
260 handler = EnginePUBHandler(self.engine, lsock)
268 handler = EnginePUBHandler(self.engine, lsock)
261 handler.setLevel(self.log_level)
269 handler.setLevel(self.log_level)
262 self.log.addHandler(handler)
270 self.log.addHandler(handler)
263
271
264 def start_mpi(self):
272 def start_mpi(self):
265 global mpi
273 global mpi
266 mpikey = self.master_config.MPI.use
274 mpikey = self.master_config.MPI.use
267 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
275 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
268 if mpi_import_statement is not None:
276 if mpi_import_statement is not None:
269 try:
277 try:
270 self.log.info("Initializing MPI:")
278 self.log.info("Initializing MPI:")
271 self.log.info(mpi_import_statement)
279 self.log.info(mpi_import_statement)
272 exec mpi_import_statement in globals()
280 exec mpi_import_statement in globals()
273 except:
281 except:
274 mpi = None
282 mpi = None
275 else:
283 else:
276 mpi = None
284 mpi = None
277
285
278
286
279 def start_app(self):
287 def start_app(self):
280 self.engine.start()
288 self.engine.start()
281 try:
289 try:
282 self.engine.loop.start()
290 self.engine.loop.start()
283 except KeyboardInterrupt:
291 except KeyboardInterrupt:
284 self.log.critical("Engine Interrupted, shutting down...\n")
292 self.log.critical("Engine Interrupted, shutting down...\n")
285
293
286
294
287 def launch_new_instance():
295 def launch_new_instance():
288 """Create and run the IPython controller"""
296 """Create and run the IPython controller"""
289 app = IPEngineApp()
297 app = IPEngineApp()
290 app.start()
298 app.start()
291
299
292
300
293 if __name__ == '__main__':
301 if __name__ == '__main__':
294 launch_new_instance()
302 launch_new_instance()
295
303
@@ -1,1279 +1,1281
1 """A semi-synchronous Client for the ZMQ cluster"""
1 """A semi-synchronous Client for the ZMQ cluster"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import os
13 import os
14 import json
14 import json
15 import time
15 import time
16 import warnings
16 import warnings
17 from datetime import datetime
17 from datetime import datetime
18 from getpass import getpass
18 from getpass import getpass
19 from pprint import pprint
19 from pprint import pprint
20
20
21 pjoin = os.path.join
21 pjoin = os.path.join
22
22
23 import zmq
23 import zmq
24 # from zmq.eventloop import ioloop, zmqstream
24 # from zmq.eventloop import ioloop, zmqstream
25
25
26 from IPython.utils.path import get_ipython_dir
26 from IPython.utils.path import get_ipython_dir
27 from IPython.utils.traitlets import (HasTraits, Int, Instance, CUnicode,
27 from IPython.utils.traitlets import (HasTraits, Int, Instance, CUnicode,
28 Dict, List, Bool, Str, Set)
28 Dict, List, Bool, Str, Set)
29 from IPython.external.decorator import decorator
29 from IPython.external.decorator import decorator
30 from IPython.external.ssh import tunnel
30 from IPython.external.ssh import tunnel
31
31
32 from IPython.parallel import error
32 from IPython.parallel import error
33 from IPython.parallel import streamsession as ss
33 from IPython.parallel import streamsession as ss
34 from IPython.parallel import util
34 from IPython.parallel import util
35
35
36 from .asyncresult import AsyncResult, AsyncHubResult
36 from .asyncresult import AsyncResult, AsyncHubResult
37 from IPython.parallel.apps.clusterdir import ClusterDir, ClusterDirError
37 from IPython.parallel.apps.clusterdir import ClusterDir, ClusterDirError
38 from .view import DirectView, LoadBalancedView
38 from .view import DirectView, LoadBalancedView
39
39
40 #--------------------------------------------------------------------------
40 #--------------------------------------------------------------------------
41 # Decorators for Client methods
41 # Decorators for Client methods
42 #--------------------------------------------------------------------------
42 #--------------------------------------------------------------------------
43
43
44 @decorator
44 @decorator
45 def spin_first(f, self, *args, **kwargs):
45 def spin_first(f, self, *args, **kwargs):
46 """Call spin() to sync state prior to calling the method."""
46 """Call spin() to sync state prior to calling the method."""
47 self.spin()
47 self.spin()
48 return f(self, *args, **kwargs)
48 return f(self, *args, **kwargs)
49
49
50 @decorator
50 @decorator
51 def default_block(f, self, *args, **kwargs):
51 def default_block(f, self, *args, **kwargs):
52 """Default to self.block; preserve self.block."""
52 """Default to self.block; preserve self.block."""
53 block = kwargs.get('block',None)
53 block = kwargs.get('block',None)
54 block = self.block if block is None else block
54 block = self.block if block is None else block
55 saveblock = self.block
55 saveblock = self.block
56 self.block = block
56 self.block = block
57 try:
57 try:
58 ret = f(self, *args, **kwargs)
58 ret = f(self, *args, **kwargs)
59 finally:
59 finally:
60 self.block = saveblock
60 self.block = saveblock
61 return ret
61 return ret
62
62
63
63
64 #--------------------------------------------------------------------------
64 #--------------------------------------------------------------------------
65 # Classes
65 # Classes
66 #--------------------------------------------------------------------------
66 #--------------------------------------------------------------------------
67
67
68 class Metadata(dict):
68 class Metadata(dict):
69 """Subclass of dict for initializing metadata values.
69 """Subclass of dict for initializing metadata values.
70
70
71 Attribute access works on keys.
71 Attribute access works on keys.
72
72
73 These objects have a strict set of keys - errors will raise if you try
73 These objects have a strict set of keys - errors will raise if you try
74 to add new keys.
74 to add new keys.
75 """
75 """
76 def __init__(self, *args, **kwargs):
76 def __init__(self, *args, **kwargs):
77 dict.__init__(self)
77 dict.__init__(self)
78 md = {'msg_id' : None,
78 md = {'msg_id' : None,
79 'submitted' : None,
79 'submitted' : None,
80 'started' : None,
80 'started' : None,
81 'completed' : None,
81 'completed' : None,
82 'received' : None,
82 'received' : None,
83 'engine_uuid' : None,
83 'engine_uuid' : None,
84 'engine_id' : None,
84 'engine_id' : None,
85 'follow' : None,
85 'follow' : None,
86 'after' : None,
86 'after' : None,
87 'status' : None,
87 'status' : None,
88
88
89 'pyin' : None,
89 'pyin' : None,
90 'pyout' : None,
90 'pyout' : None,
91 'pyerr' : None,
91 'pyerr' : None,
92 'stdout' : '',
92 'stdout' : '',
93 'stderr' : '',
93 'stderr' : '',
94 }
94 }
95 self.update(md)
95 self.update(md)
96 self.update(dict(*args, **kwargs))
96 self.update(dict(*args, **kwargs))
97
97
98 def __getattr__(self, key):
98 def __getattr__(self, key):
99 """getattr aliased to getitem"""
99 """getattr aliased to getitem"""
100 if key in self.iterkeys():
100 if key in self.iterkeys():
101 return self[key]
101 return self[key]
102 else:
102 else:
103 raise AttributeError(key)
103 raise AttributeError(key)
104
104
105 def __setattr__(self, key, value):
105 def __setattr__(self, key, value):
106 """setattr aliased to setitem, with strict"""
106 """setattr aliased to setitem, with strict"""
107 if key in self.iterkeys():
107 if key in self.iterkeys():
108 self[key] = value
108 self[key] = value
109 else:
109 else:
110 raise AttributeError(key)
110 raise AttributeError(key)
111
111
112 def __setitem__(self, key, value):
112 def __setitem__(self, key, value):
113 """strict static key enforcement"""
113 """strict static key enforcement"""
114 if key in self.iterkeys():
114 if key in self.iterkeys():
115 dict.__setitem__(self, key, value)
115 dict.__setitem__(self, key, value)
116 else:
116 else:
117 raise KeyError(key)
117 raise KeyError(key)
118
118
119
119
120 class Client(HasTraits):
120 class Client(HasTraits):
121 """A semi-synchronous client to the IPython ZMQ cluster
121 """A semi-synchronous client to the IPython ZMQ cluster
122
122
123 Parameters
123 Parameters
124 ----------
124 ----------
125
125
126 url_or_file : bytes; zmq url or path to ipcontroller-client.json
126 url_or_file : bytes; zmq url or path to ipcontroller-client.json
127 Connection information for the Hub's registration. If a json connector
127 Connection information for the Hub's registration. If a json connector
128 file is given, then likely no further configuration is necessary.
128 file is given, then likely no further configuration is necessary.
129 [Default: use profile]
129 [Default: use profile]
130 profile : bytes
130 profile : bytes
131 The name of the Cluster profile to be used to find connector information.
131 The name of the Cluster profile to be used to find connector information.
132 [Default: 'default']
132 [Default: 'default']
133 context : zmq.Context
133 context : zmq.Context
134 Pass an existing zmq.Context instance, otherwise the client will create its own.
134 Pass an existing zmq.Context instance, otherwise the client will create its own.
135 username : bytes
135 username : bytes
136 set username to be passed to the Session object
136 set username to be passed to the Session object
137 debug : bool
137 debug : bool
138 flag for lots of message printing for debug purposes
138 flag for lots of message printing for debug purposes
139
139
140 #-------------- ssh related args ----------------
140 #-------------- ssh related args ----------------
141 # These are args for configuring the ssh tunnel to be used
141 # These are args for configuring the ssh tunnel to be used
142 # credentials are used to forward connections over ssh to the Controller
142 # credentials are used to forward connections over ssh to the Controller
143 # Note that the ip given in `addr` needs to be relative to sshserver
143 # Note that the ip given in `addr` needs to be relative to sshserver
144 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
144 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
145 # and set sshserver as the same machine the Controller is on. However,
145 # and set sshserver as the same machine the Controller is on. However,
146 # the only requirement is that sshserver is able to see the Controller
146 # the only requirement is that sshserver is able to see the Controller
147 # (i.e. is within the same trusted network).
147 # (i.e. is within the same trusted network).
148
148
149 sshserver : str
149 sshserver : str
150 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
150 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
151 If keyfile or password is specified, and this is not, it will default to
151 If keyfile or password is specified, and this is not, it will default to
152 the ip given in addr.
152 the ip given in addr.
153 sshkey : str; path to public ssh key file
153 sshkey : str; path to public ssh key file
154 This specifies a key to be used in ssh login, default None.
154 This specifies a key to be used in ssh login, default None.
155 Regular default ssh keys will be used without specifying this argument.
155 Regular default ssh keys will be used without specifying this argument.
156 password : str
156 password : str
157 Your ssh password to sshserver. Note that if this is left None,
157 Your ssh password to sshserver. Note that if this is left None,
158 you will be prompted for it if passwordless key based login is unavailable.
158 you will be prompted for it if passwordless key based login is unavailable.
159 paramiko : bool
159 paramiko : bool
160 flag for whether to use paramiko instead of shell ssh for tunneling.
160 flag for whether to use paramiko instead of shell ssh for tunneling.
161 [default: True on win32, False else]
161 [default: True on win32, False else]
162
162
163 ------- exec authentication args -------
163 ------- exec authentication args -------
164 If even localhost is untrusted, you can have some protection against
164 If even localhost is untrusted, you can have some protection against
165 unauthorized execution by using a key. Messages are still sent
165 unauthorized execution by using a key. Messages are still sent
166 as cleartext, so if someone can snoop your loopback traffic this will
166 as cleartext, so if someone can snoop your loopback traffic this will
167 not help against malicious attacks.
167 not help against malicious attacks.
168
168
169 exec_key : str
169 exec_key : str
170 an authentication key or file containing a key
170 an authentication key or file containing a key
171 default: None
171 default: None
172
172
173
173
174 Attributes
174 Attributes
175 ----------
175 ----------
176
176
177 ids : list of int engine IDs
177 ids : list of int engine IDs
178 requesting the ids attribute always synchronizes
178 requesting the ids attribute always synchronizes
179 the registration state. To request ids without synchronization,
179 the registration state. To request ids without synchronization,
180 use semi-private _ids attributes.
180 use semi-private _ids attributes.
181
181
182 history : list of msg_ids
182 history : list of msg_ids
183 a list of msg_ids, keeping track of all the execution
183 a list of msg_ids, keeping track of all the execution
184 messages you have submitted in order.
184 messages you have submitted in order.
185
185
186 outstanding : set of msg_ids
186 outstanding : set of msg_ids
187 a set of msg_ids that have been submitted, but whose
187 a set of msg_ids that have been submitted, but whose
188 results have not yet been received.
188 results have not yet been received.
189
189
190 results : dict
190 results : dict
191 a dict of all our results, keyed by msg_id
191 a dict of all our results, keyed by msg_id
192
192
193 block : bool
193 block : bool
194 determines default behavior when block not specified
194 determines default behavior when block not specified
195 in execution methods
195 in execution methods
196
196
197 Methods
197 Methods
198 -------
198 -------
199
199
200 spin
200 spin
201 flushes incoming results and registration state changes
201 flushes incoming results and registration state changes
202 control methods spin, and requesting `ids` also ensures up to date
202 control methods spin, and requesting `ids` also ensures up to date
203
203
204 wait
204 wait
205 wait on one or more msg_ids
205 wait on one or more msg_ids
206
206
207 execution methods
207 execution methods
208 apply
208 apply
209 legacy: execute, run
209 legacy: execute, run
210
210
211 data movement
211 data movement
212 push, pull, scatter, gather
212 push, pull, scatter, gather
213
213
214 query methods
214 query methods
215 queue_status, get_result, purge, result_status
215 queue_status, get_result, purge, result_status
216
216
217 control methods
217 control methods
218 abort, shutdown
218 abort, shutdown
219
219
220 """
220 """
221
221
222
222
223 block = Bool(False)
223 block = Bool(False)
224 outstanding = Set()
224 outstanding = Set()
225 results = Instance('collections.defaultdict', (dict,))
225 results = Instance('collections.defaultdict', (dict,))
226 metadata = Instance('collections.defaultdict', (Metadata,))
226 metadata = Instance('collections.defaultdict', (Metadata,))
227 history = List()
227 history = List()
228 debug = Bool(False)
228 debug = Bool(False)
229 profile=CUnicode('default')
229 profile=CUnicode('default')
230
230
231 _outstanding_dict = Instance('collections.defaultdict', (set,))
231 _outstanding_dict = Instance('collections.defaultdict', (set,))
232 _ids = List()
232 _ids = List()
233 _connected=Bool(False)
233 _connected=Bool(False)
234 _ssh=Bool(False)
234 _ssh=Bool(False)
235 _context = Instance('zmq.Context')
235 _context = Instance('zmq.Context')
236 _config = Dict()
236 _config = Dict()
237 _engines=Instance(util.ReverseDict, (), {})
237 _engines=Instance(util.ReverseDict, (), {})
238 # _hub_socket=Instance('zmq.Socket')
238 # _hub_socket=Instance('zmq.Socket')
239 _query_socket=Instance('zmq.Socket')
239 _query_socket=Instance('zmq.Socket')
240 _control_socket=Instance('zmq.Socket')
240 _control_socket=Instance('zmq.Socket')
241 _iopub_socket=Instance('zmq.Socket')
241 _iopub_socket=Instance('zmq.Socket')
242 _notification_socket=Instance('zmq.Socket')
242 _notification_socket=Instance('zmq.Socket')
243 _mux_socket=Instance('zmq.Socket')
243 _mux_socket=Instance('zmq.Socket')
244 _task_socket=Instance('zmq.Socket')
244 _task_socket=Instance('zmq.Socket')
245 _task_scheme=Str()
245 _task_scheme=Str()
246 _closed = False
246 _closed = False
247 _ignored_control_replies=Int(0)
247 _ignored_control_replies=Int(0)
248 _ignored_hub_replies=Int(0)
248 _ignored_hub_replies=Int(0)
249
249
250 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
250 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
251 context=None, username=None, debug=False, exec_key=None,
251 context=None, username=None, debug=False, exec_key=None,
252 sshserver=None, sshkey=None, password=None, paramiko=None,
252 sshserver=None, sshkey=None, password=None, paramiko=None,
253 timeout=10
253 timeout=10
254 ):
254 ):
255 super(Client, self).__init__(debug=debug, profile=profile)
255 super(Client, self).__init__(debug=debug, profile=profile)
256 if context is None:
256 if context is None:
257 context = zmq.Context.instance()
257 context = zmq.Context.instance()
258 self._context = context
258 self._context = context
259
259
260
260
261 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
261 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
262 if self._cd is not None:
262 if self._cd is not None:
263 if url_or_file is None:
263 if url_or_file is None:
264 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
264 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
265 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
265 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
266 " Please specify at least one of url_or_file or profile."
266 " Please specify at least one of url_or_file or profile."
267
267
268 try:
268 try:
269 util.validate_url(url_or_file)
269 util.validate_url(url_or_file)
270 except AssertionError:
270 except AssertionError:
271 if not os.path.exists(url_or_file):
271 if not os.path.exists(url_or_file):
272 if self._cd:
272 if self._cd:
273 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
273 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
274 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
274 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
275 with open(url_or_file) as f:
275 with open(url_or_file) as f:
276 cfg = json.loads(f.read())
276 cfg = json.loads(f.read())
277 else:
277 else:
278 cfg = {'url':url_or_file}
278 cfg = {'url':url_or_file}
279
279
280 # sync defaults from args, json:
280 # sync defaults from args, json:
281 if sshserver:
281 if sshserver:
282 cfg['ssh'] = sshserver
282 cfg['ssh'] = sshserver
283 if exec_key:
283 if exec_key:
284 cfg['exec_key'] = exec_key
284 cfg['exec_key'] = exec_key
285 exec_key = cfg['exec_key']
285 exec_key = cfg['exec_key']
286 sshserver=cfg['ssh']
286 sshserver=cfg['ssh']
287 url = cfg['url']
287 url = cfg['url']
288 location = cfg.setdefault('location', None)
288 location = cfg.setdefault('location', None)
289 cfg['url'] = util.disambiguate_url(cfg['url'], location)
289 cfg['url'] = util.disambiguate_url(cfg['url'], location)
290 url = cfg['url']
290 url = cfg['url']
291
291
292 self._config = cfg
292 self._config = cfg
293
293
294 self._ssh = bool(sshserver or sshkey or password)
294 self._ssh = bool(sshserver or sshkey or password)
295 if self._ssh and sshserver is None:
295 if self._ssh and sshserver is None:
296 # default to ssh via localhost
296 # default to ssh via localhost
297 sshserver = url.split('://')[1].split(':')[0]
297 sshserver = url.split('://')[1].split(':')[0]
298 if self._ssh and password is None:
298 if self._ssh and password is None:
299 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
299 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
300 password=False
300 password=False
301 else:
301 else:
302 password = getpass("SSH Password for %s: "%sshserver)
302 password = getpass("SSH Password for %s: "%sshserver)
303 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
303 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
304 if exec_key is not None and os.path.isfile(exec_key):
304 if exec_key is not None and os.path.isfile(exec_key):
305 arg = 'keyfile'
305 arg = 'keyfile'
306 else:
306 else:
307 arg = 'key'
307 arg = 'key'
308 key_arg = {arg:exec_key}
308 key_arg = {arg:exec_key}
309 if username is None:
309 if username is None:
310 self.session = ss.StreamSession(**key_arg)
310 self.session = ss.StreamSession(**key_arg)
311 else:
311 else:
312 self.session = ss.StreamSession(username, **key_arg)
312 self.session = ss.StreamSession(username, **key_arg)
313 self._query_socket = self._context.socket(zmq.XREQ)
313 self._query_socket = self._context.socket(zmq.XREQ)
314 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
314 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
315 if self._ssh:
315 if self._ssh:
316 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
316 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
317 else:
317 else:
318 self._query_socket.connect(url)
318 self._query_socket.connect(url)
319
319
320 self.session.debug = self.debug
320 self.session.debug = self.debug
321
321
322 self._notification_handlers = {'registration_notification' : self._register_engine,
322 self._notification_handlers = {'registration_notification' : self._register_engine,
323 'unregistration_notification' : self._unregister_engine,
323 'unregistration_notification' : self._unregister_engine,
324 'shutdown_notification' : lambda msg: self.close(),
324 'shutdown_notification' : lambda msg: self.close(),
325 }
325 }
326 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
326 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
327 'apply_reply' : self._handle_apply_reply}
327 'apply_reply' : self._handle_apply_reply}
328 self._connect(sshserver, ssh_kwargs, timeout)
328 self._connect(sshserver, ssh_kwargs, timeout)
329
329
330 def __del__(self):
330 def __del__(self):
331 """cleanup sockets, but _not_ context."""
331 """cleanup sockets, but _not_ context."""
332 self.close()
332 self.close()
333
333
334 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
334 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
335 if ipython_dir is None:
335 if ipython_dir is None:
336 ipython_dir = get_ipython_dir()
336 ipython_dir = get_ipython_dir()
337 if cluster_dir is not None:
337 if cluster_dir is not None:
338 try:
338 try:
339 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
339 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
340 return
340 return
341 except ClusterDirError:
341 except ClusterDirError:
342 pass
342 pass
343 elif profile is not None:
343 elif profile is not None:
344 try:
344 try:
345 self._cd = ClusterDir.find_cluster_dir_by_profile(
345 self._cd = ClusterDir.find_cluster_dir_by_profile(
346 ipython_dir, profile)
346 ipython_dir, profile)
347 return
347 return
348 except ClusterDirError:
348 except ClusterDirError:
349 pass
349 pass
350 self._cd = None
350 self._cd = None
351
351
352 def _update_engines(self, engines):
352 def _update_engines(self, engines):
353 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
353 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
354 for k,v in engines.iteritems():
354 for k,v in engines.iteritems():
355 eid = int(k)
355 eid = int(k)
356 self._engines[eid] = bytes(v) # force not unicode
356 self._engines[eid] = bytes(v) # force not unicode
357 self._ids.append(eid)
357 self._ids.append(eid)
358 self._ids = sorted(self._ids)
358 self._ids = sorted(self._ids)
359 if sorted(self._engines.keys()) != range(len(self._engines)) and \
359 if sorted(self._engines.keys()) != range(len(self._engines)) and \
360 self._task_scheme == 'pure' and self._task_socket:
360 self._task_scheme == 'pure' and self._task_socket:
361 self._stop_scheduling_tasks()
361 self._stop_scheduling_tasks()
362
362
363 def _stop_scheduling_tasks(self):
363 def _stop_scheduling_tasks(self):
364 """Stop scheduling tasks because an engine has been unregistered
364 """Stop scheduling tasks because an engine has been unregistered
365 from a pure ZMQ scheduler.
365 from a pure ZMQ scheduler.
366 """
366 """
367 self._task_socket.close()
367 self._task_socket.close()
368 self._task_socket = None
368 self._task_socket = None
369 msg = "An engine has been unregistered, and we are using pure " +\
369 msg = "An engine has been unregistered, and we are using pure " +\
370 "ZMQ task scheduling. Task farming will be disabled."
370 "ZMQ task scheduling. Task farming will be disabled."
371 if self.outstanding:
371 if self.outstanding:
372 msg += " If you were running tasks when this happened, " +\
372 msg += " If you were running tasks when this happened, " +\
373 "some `outstanding` msg_ids may never resolve."
373 "some `outstanding` msg_ids may never resolve."
374 warnings.warn(msg, RuntimeWarning)
374 warnings.warn(msg, RuntimeWarning)
375
375
376 def _build_targets(self, targets):
376 def _build_targets(self, targets):
377 """Turn valid target IDs or 'all' into two lists:
377 """Turn valid target IDs or 'all' into two lists:
378 (int_ids, uuids).
378 (int_ids, uuids).
379 """
379 """
380 if targets is None:
380 if targets is None:
381 targets = self._ids
381 targets = self._ids
382 elif isinstance(targets, str):
382 elif isinstance(targets, str):
383 if targets.lower() == 'all':
383 if targets.lower() == 'all':
384 targets = self._ids
384 targets = self._ids
385 else:
385 else:
386 raise TypeError("%r not valid str target, must be 'all'"%(targets))
386 raise TypeError("%r not valid str target, must be 'all'"%(targets))
387 elif isinstance(targets, int):
387 elif isinstance(targets, int):
388 if targets < 0:
388 if targets < 0:
389 targets = self.ids[targets]
389 targets = self.ids[targets]
390 if targets not in self.ids:
390 if targets not in self.ids:
391 raise IndexError("No such engine: %i"%targets)
391 raise IndexError("No such engine: %i"%targets)
392 targets = [targets]
392 targets = [targets]
393
393
394 if isinstance(targets, slice):
394 if isinstance(targets, slice):
395 indices = range(len(self._ids))[targets]
395 indices = range(len(self._ids))[targets]
396 ids = self.ids
396 ids = self.ids
397 targets = [ ids[i] for i in indices ]
397 targets = [ ids[i] for i in indices ]
398
398
399 if not isinstance(targets, (tuple, list, xrange)):
399 if not isinstance(targets, (tuple, list, xrange)):
400 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
400 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
401
401
402 return [self._engines[t] for t in targets], list(targets)
402 return [self._engines[t] for t in targets], list(targets)
403
403
404 def _connect(self, sshserver, ssh_kwargs, timeout):
404 def _connect(self, sshserver, ssh_kwargs, timeout):
405 """setup all our socket connections to the cluster. This is called from
405 """setup all our socket connections to the cluster. This is called from
406 __init__."""
406 __init__."""
407
407
408 # Maybe allow reconnecting?
408 # Maybe allow reconnecting?
409 if self._connected:
409 if self._connected:
410 return
410 return
411 self._connected=True
411 self._connected=True
412
412
413 def connect_socket(s, url):
413 def connect_socket(s, url):
414 url = util.disambiguate_url(url, self._config['location'])
414 url = util.disambiguate_url(url, self._config['location'])
415 if self._ssh:
415 if self._ssh:
416 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
416 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
417 else:
417 else:
418 return s.connect(url)
418 return s.connect(url)
419
419
420 self.session.send(self._query_socket, 'connection_request')
420 self.session.send(self._query_socket, 'connection_request')
421 r,w,x = zmq.select([self._query_socket],[],[], timeout)
421 r,w,x = zmq.select([self._query_socket],[],[], timeout)
422 if not r:
422 if not r:
423 raise error.TimeoutError("Hub connection request timed out")
423 raise error.TimeoutError("Hub connection request timed out")
424 idents,msg = self.session.recv(self._query_socket,mode=0)
424 idents,msg = self.session.recv(self._query_socket,mode=0)
425 if self.debug:
425 if self.debug:
426 pprint(msg)
426 pprint(msg)
427 msg = ss.Message(msg)
427 msg = ss.Message(msg)
428 content = msg.content
428 content = msg.content
429 self._config['registration'] = dict(content)
429 self._config['registration'] = dict(content)
430 if content.status == 'ok':
430 if content.status == 'ok':
431 if content.mux:
431 if content.mux:
432 self._mux_socket = self._context.socket(zmq.XREQ)
432 self._mux_socket = self._context.socket(zmq.XREQ)
433 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
433 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
434 connect_socket(self._mux_socket, content.mux)
434 connect_socket(self._mux_socket, content.mux)
435 if content.task:
435 if content.task:
436 self._task_scheme, task_addr = content.task
436 self._task_scheme, task_addr = content.task
437 self._task_socket = self._context.socket(zmq.XREQ)
437 self._task_socket = self._context.socket(zmq.XREQ)
438 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
438 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
439 connect_socket(self._task_socket, task_addr)
439 connect_socket(self._task_socket, task_addr)
440 if content.notification:
440 if content.notification:
441 self._notification_socket = self._context.socket(zmq.SUB)
441 self._notification_socket = self._context.socket(zmq.SUB)
442 connect_socket(self._notification_socket, content.notification)
442 connect_socket(self._notification_socket, content.notification)
443 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
443 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
444 # if content.query:
444 # if content.query:
445 # self._query_socket = self._context.socket(zmq.XREQ)
445 # self._query_socket = self._context.socket(zmq.XREQ)
446 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
446 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
447 # connect_socket(self._query_socket, content.query)
447 # connect_socket(self._query_socket, content.query)
448 if content.control:
448 if content.control:
449 self._control_socket = self._context.socket(zmq.XREQ)
449 self._control_socket = self._context.socket(zmq.XREQ)
450 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
450 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
451 connect_socket(self._control_socket, content.control)
451 connect_socket(self._control_socket, content.control)
452 if content.iopub:
452 if content.iopub:
453 self._iopub_socket = self._context.socket(zmq.SUB)
453 self._iopub_socket = self._context.socket(zmq.SUB)
454 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
454 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
455 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
455 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
456 connect_socket(self._iopub_socket, content.iopub)
456 connect_socket(self._iopub_socket, content.iopub)
457 self._update_engines(dict(content.engines))
457 self._update_engines(dict(content.engines))
458 else:
458 else:
459 self._connected = False
459 self._connected = False
460 raise Exception("Failed to connect!")
460 raise Exception("Failed to connect!")
461
461
462 #--------------------------------------------------------------------------
462 #--------------------------------------------------------------------------
463 # handlers and callbacks for incoming messages
463 # handlers and callbacks for incoming messages
464 #--------------------------------------------------------------------------
464 #--------------------------------------------------------------------------
465
465
466 def _unwrap_exception(self, content):
466 def _unwrap_exception(self, content):
467 """unwrap exception, and remap engine_id to int."""
467 """unwrap exception, and remap engine_id to int."""
468 e = error.unwrap_exception(content)
468 e = error.unwrap_exception(content)
469 # print e.traceback
469 # print e.traceback
470 if e.engine_info:
470 if e.engine_info:
471 e_uuid = e.engine_info['engine_uuid']
471 e_uuid = e.engine_info['engine_uuid']
472 eid = self._engines[e_uuid]
472 eid = self._engines[e_uuid]
473 e.engine_info['engine_id'] = eid
473 e.engine_info['engine_id'] = eid
474 return e
474 return e
475
475
476 def _extract_metadata(self, header, parent, content):
476 def _extract_metadata(self, header, parent, content):
477 md = {'msg_id' : parent['msg_id'],
477 md = {'msg_id' : parent['msg_id'],
478 'received' : datetime.now(),
478 'received' : datetime.now(),
479 'engine_uuid' : header.get('engine', None),
479 'engine_uuid' : header.get('engine', None),
480 'follow' : parent.get('follow', []),
480 'follow' : parent.get('follow', []),
481 'after' : parent.get('after', []),
481 'after' : parent.get('after', []),
482 'status' : content['status'],
482 'status' : content['status'],
483 }
483 }
484
484
485 if md['engine_uuid'] is not None:
485 if md['engine_uuid'] is not None:
486 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
486 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
487
487
488 if 'date' in parent:
488 if 'date' in parent:
489 md['submitted'] = datetime.strptime(parent['date'], util.ISO8601)
489 md['submitted'] = datetime.strptime(parent['date'], util.ISO8601)
490 if 'started' in header:
490 if 'started' in header:
491 md['started'] = datetime.strptime(header['started'], util.ISO8601)
491 md['started'] = datetime.strptime(header['started'], util.ISO8601)
492 if 'date' in header:
492 if 'date' in header:
493 md['completed'] = datetime.strptime(header['date'], util.ISO8601)
493 md['completed'] = datetime.strptime(header['date'], util.ISO8601)
494 return md
494 return md
495
495
496 def _register_engine(self, msg):
496 def _register_engine(self, msg):
497 """Register a new engine, and update our connection info."""
497 """Register a new engine, and update our connection info."""
498 content = msg['content']
498 content = msg['content']
499 eid = content['id']
499 eid = content['id']
500 d = {eid : content['queue']}
500 d = {eid : content['queue']}
501 self._update_engines(d)
501 self._update_engines(d)
502
502
503 def _unregister_engine(self, msg):
503 def _unregister_engine(self, msg):
504 """Unregister an engine that has died."""
504 """Unregister an engine that has died."""
505 content = msg['content']
505 content = msg['content']
506 eid = int(content['id'])
506 eid = int(content['id'])
507 if eid in self._ids:
507 if eid in self._ids:
508 self._ids.remove(eid)
508 self._ids.remove(eid)
509 uuid = self._engines.pop(eid)
509 uuid = self._engines.pop(eid)
510
510
511 self._handle_stranded_msgs(eid, uuid)
511 self._handle_stranded_msgs(eid, uuid)
512
512
513 if self._task_socket and self._task_scheme == 'pure':
513 if self._task_socket and self._task_scheme == 'pure':
514 self._stop_scheduling_tasks()
514 self._stop_scheduling_tasks()
515
515
516 def _handle_stranded_msgs(self, eid, uuid):
516 def _handle_stranded_msgs(self, eid, uuid):
517 """Handle messages known to be on an engine when the engine unregisters.
517 """Handle messages known to be on an engine when the engine unregisters.
518
518
519 It is possible that this will fire prematurely - that is, an engine will
519 It is possible that this will fire prematurely - that is, an engine will
520 go down after completing a result, and the client will be notified
520 go down after completing a result, and the client will be notified
521 of the unregistration and later receive the successful result.
521 of the unregistration and later receive the successful result.
522 """
522 """
523
523
524 outstanding = self._outstanding_dict[uuid]
524 outstanding = self._outstanding_dict[uuid]
525
525
526 for msg_id in list(outstanding):
526 for msg_id in list(outstanding):
527 if msg_id in self.results:
527 if msg_id in self.results:
528 # we already
528 # we already
529 continue
529 continue
530 try:
530 try:
531 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
531 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
532 except:
532 except:
533 content = error.wrap_exception()
533 content = error.wrap_exception()
534 # build a fake message:
534 # build a fake message:
535 parent = {}
535 parent = {}
536 header = {}
536 header = {}
537 parent['msg_id'] = msg_id
537 parent['msg_id'] = msg_id
538 header['engine'] = uuid
538 header['engine'] = uuid
539 header['date'] = datetime.now().strftime(util.ISO8601)
539 header['date'] = datetime.now().strftime(util.ISO8601)
540 msg = dict(parent_header=parent, header=header, content=content)
540 msg = dict(parent_header=parent, header=header, content=content)
541 self._handle_apply_reply(msg)
541 self._handle_apply_reply(msg)
542
542
543 def _handle_execute_reply(self, msg):
543 def _handle_execute_reply(self, msg):
544 """Save the reply to an execute_request into our results.
544 """Save the reply to an execute_request into our results.
545
545
546 execute messages are never actually used. apply is used instead.
546 execute messages are never actually used. apply is used instead.
547 """
547 """
548
548
549 parent = msg['parent_header']
549 parent = msg['parent_header']
550 msg_id = parent['msg_id']
550 msg_id = parent['msg_id']
551 if msg_id not in self.outstanding:
551 if msg_id not in self.outstanding:
552 if msg_id in self.history:
552 if msg_id in self.history:
553 print ("got stale result: %s"%msg_id)
553 print ("got stale result: %s"%msg_id)
554 else:
554 else:
555 print ("got unknown result: %s"%msg_id)
555 print ("got unknown result: %s"%msg_id)
556 else:
556 else:
557 self.outstanding.remove(msg_id)
557 self.outstanding.remove(msg_id)
558 self.results[msg_id] = self._unwrap_exception(msg['content'])
558 self.results[msg_id] = self._unwrap_exception(msg['content'])
559
559
560 def _handle_apply_reply(self, msg):
560 def _handle_apply_reply(self, msg):
561 """Save the reply to an apply_request into our results."""
561 """Save the reply to an apply_request into our results."""
562 parent = msg['parent_header']
562 parent = msg['parent_header']
563 msg_id = parent['msg_id']
563 msg_id = parent['msg_id']
564 if msg_id not in self.outstanding:
564 if msg_id not in self.outstanding:
565 if msg_id in self.history:
565 if msg_id in self.history:
566 print ("got stale result: %s"%msg_id)
566 print ("got stale result: %s"%msg_id)
567 print self.results[msg_id]
567 print self.results[msg_id]
568 print msg
568 print msg
569 else:
569 else:
570 print ("got unknown result: %s"%msg_id)
570 print ("got unknown result: %s"%msg_id)
571 else:
571 else:
572 self.outstanding.remove(msg_id)
572 self.outstanding.remove(msg_id)
573 content = msg['content']
573 content = msg['content']
574 header = msg['header']
574 header = msg['header']
575
575
576 # construct metadata:
576 # construct metadata:
577 md = self.metadata[msg_id]
577 md = self.metadata[msg_id]
578 md.update(self._extract_metadata(header, parent, content))
578 md.update(self._extract_metadata(header, parent, content))
579 # is this redundant?
579 # is this redundant?
580 self.metadata[msg_id] = md
580 self.metadata[msg_id] = md
581
581
582 e_outstanding = self._outstanding_dict[md['engine_uuid']]
582 e_outstanding = self._outstanding_dict[md['engine_uuid']]
583 if msg_id in e_outstanding:
583 if msg_id in e_outstanding:
584 e_outstanding.remove(msg_id)
584 e_outstanding.remove(msg_id)
585
585
586 # construct result:
586 # construct result:
587 if content['status'] == 'ok':
587 if content['status'] == 'ok':
588 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
588 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
589 elif content['status'] == 'aborted':
589 elif content['status'] == 'aborted':
590 self.results[msg_id] = error.TaskAborted(msg_id)
590 self.results[msg_id] = error.TaskAborted(msg_id)
591 elif content['status'] == 'resubmitted':
591 elif content['status'] == 'resubmitted':
592 # TODO: handle resubmission
592 # TODO: handle resubmission
593 pass
593 pass
594 else:
594 else:
595 self.results[msg_id] = self._unwrap_exception(content)
595 self.results[msg_id] = self._unwrap_exception(content)
596
596
597 def _flush_notifications(self):
597 def _flush_notifications(self):
598 """Flush notifications of engine registrations waiting
598 """Flush notifications of engine registrations waiting
599 in ZMQ queue."""
599 in ZMQ queue."""
600 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
600 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
601 while msg is not None:
601 while msg is not None:
602 if self.debug:
602 if self.debug:
603 pprint(msg)
603 pprint(msg)
604 msg = msg[-1]
604 msg = msg[-1]
605 msg_type = msg['msg_type']
605 msg_type = msg['msg_type']
606 handler = self._notification_handlers.get(msg_type, None)
606 handler = self._notification_handlers.get(msg_type, None)
607 if handler is None:
607 if handler is None:
608 raise Exception("Unhandled message type: %s"%msg.msg_type)
608 raise Exception("Unhandled message type: %s"%msg.msg_type)
609 else:
609 else:
610 handler(msg)
610 handler(msg)
611 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
611 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
612
612
613 def _flush_results(self, sock):
613 def _flush_results(self, sock):
614 """Flush task or queue results waiting in ZMQ queue."""
614 """Flush task or queue results waiting in ZMQ queue."""
615 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
615 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
616 while msg is not None:
616 while msg is not None:
617 if self.debug:
617 if self.debug:
618 pprint(msg)
618 pprint(msg)
619 msg = msg[-1]
619 msg = msg[-1]
620 msg_type = msg['msg_type']
620 msg_type = msg['msg_type']
621 handler = self._queue_handlers.get(msg_type, None)
621 handler = self._queue_handlers.get(msg_type, None)
622 if handler is None:
622 if handler is None:
623 raise Exception("Unhandled message type: %s"%msg.msg_type)
623 raise Exception("Unhandled message type: %s"%msg.msg_type)
624 else:
624 else:
625 handler(msg)
625 handler(msg)
626 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
626 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
627
627
628 def _flush_control(self, sock):
628 def _flush_control(self, sock):
629 """Flush replies from the control channel waiting
629 """Flush replies from the control channel waiting
630 in the ZMQ queue.
630 in the ZMQ queue.
631
631
632 Currently: ignore them."""
632 Currently: ignore them."""
633 if self._ignored_control_replies <= 0:
633 if self._ignored_control_replies <= 0:
634 return
634 return
635 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
635 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
636 while msg is not None:
636 while msg is not None:
637 self._ignored_control_replies -= 1
637 self._ignored_control_replies -= 1
638 if self.debug:
638 if self.debug:
639 pprint(msg)
639 pprint(msg)
640 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
640 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
641
641
642 def _flush_ignored_control(self):
642 def _flush_ignored_control(self):
643 """flush ignored control replies"""
643 """flush ignored control replies"""
644 while self._ignored_control_replies > 0:
644 while self._ignored_control_replies > 0:
645 self.session.recv(self._control_socket)
645 self.session.recv(self._control_socket)
646 self._ignored_control_replies -= 1
646 self._ignored_control_replies -= 1
647
647
648 def _flush_ignored_hub_replies(self):
648 def _flush_ignored_hub_replies(self):
649 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
649 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
650 while msg is not None:
650 while msg is not None:
651 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
651 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
652
652
653 def _flush_iopub(self, sock):
653 def _flush_iopub(self, sock):
654 """Flush replies from the iopub channel waiting
654 """Flush replies from the iopub channel waiting
655 in the ZMQ queue.
655 in the ZMQ queue.
656 """
656 """
657 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
657 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
658 while msg is not None:
658 while msg is not None:
659 if self.debug:
659 if self.debug:
660 pprint(msg)
660 pprint(msg)
661 msg = msg[-1]
661 msg = msg[-1]
662 parent = msg['parent_header']
662 parent = msg['parent_header']
663 msg_id = parent['msg_id']
663 msg_id = parent['msg_id']
664 content = msg['content']
664 content = msg['content']
665 header = msg['header']
665 header = msg['header']
666 msg_type = msg['msg_type']
666 msg_type = msg['msg_type']
667
667
668 # init metadata:
668 # init metadata:
669 md = self.metadata[msg_id]
669 md = self.metadata[msg_id]
670
670
671 if msg_type == 'stream':
671 if msg_type == 'stream':
672 name = content['name']
672 name = content['name']
673 s = md[name] or ''
673 s = md[name] or ''
674 md[name] = s + content['data']
674 md[name] = s + content['data']
675 elif msg_type == 'pyerr':
675 elif msg_type == 'pyerr':
676 md.update({'pyerr' : self._unwrap_exception(content)})
676 md.update({'pyerr' : self._unwrap_exception(content)})
677 elif msg_type == 'pyin':
678 md.update({'pyin' : content['code']})
677 else:
679 else:
678 md.update({msg_type : content['data']})
680 md.update({msg_type : content.get('data', '')})
679
681
680 # reduntant?
682 # reduntant?
681 self.metadata[msg_id] = md
683 self.metadata[msg_id] = md
682
684
683 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
685 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
684
686
685 #--------------------------------------------------------------------------
687 #--------------------------------------------------------------------------
686 # len, getitem
688 # len, getitem
687 #--------------------------------------------------------------------------
689 #--------------------------------------------------------------------------
688
690
689 def __len__(self):
691 def __len__(self):
690 """len(client) returns # of engines."""
692 """len(client) returns # of engines."""
691 return len(self.ids)
693 return len(self.ids)
692
694
693 def __getitem__(self, key):
695 def __getitem__(self, key):
694 """index access returns DirectView multiplexer objects
696 """index access returns DirectView multiplexer objects
695
697
696 Must be int, slice, or list/tuple/xrange of ints"""
698 Must be int, slice, or list/tuple/xrange of ints"""
697 if not isinstance(key, (int, slice, tuple, list, xrange)):
699 if not isinstance(key, (int, slice, tuple, list, xrange)):
698 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
700 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
699 else:
701 else:
700 return self.direct_view(key)
702 return self.direct_view(key)
701
703
702 #--------------------------------------------------------------------------
704 #--------------------------------------------------------------------------
703 # Begin public methods
705 # Begin public methods
704 #--------------------------------------------------------------------------
706 #--------------------------------------------------------------------------
705
707
706 @property
708 @property
707 def ids(self):
709 def ids(self):
708 """Always up-to-date ids property."""
710 """Always up-to-date ids property."""
709 self._flush_notifications()
711 self._flush_notifications()
710 # always copy:
712 # always copy:
711 return list(self._ids)
713 return list(self._ids)
712
714
713 def close(self):
715 def close(self):
714 if self._closed:
716 if self._closed:
715 return
717 return
716 snames = filter(lambda n: n.endswith('socket'), dir(self))
718 snames = filter(lambda n: n.endswith('socket'), dir(self))
717 for socket in map(lambda name: getattr(self, name), snames):
719 for socket in map(lambda name: getattr(self, name), snames):
718 if isinstance(socket, zmq.Socket) and not socket.closed:
720 if isinstance(socket, zmq.Socket) and not socket.closed:
719 socket.close()
721 socket.close()
720 self._closed = True
722 self._closed = True
721
723
722 def spin(self):
724 def spin(self):
723 """Flush any registration notifications and execution results
725 """Flush any registration notifications and execution results
724 waiting in the ZMQ queue.
726 waiting in the ZMQ queue.
725 """
727 """
726 if self._notification_socket:
728 if self._notification_socket:
727 self._flush_notifications()
729 self._flush_notifications()
728 if self._mux_socket:
730 if self._mux_socket:
729 self._flush_results(self._mux_socket)
731 self._flush_results(self._mux_socket)
730 if self._task_socket:
732 if self._task_socket:
731 self._flush_results(self._task_socket)
733 self._flush_results(self._task_socket)
732 if self._control_socket:
734 if self._control_socket:
733 self._flush_control(self._control_socket)
735 self._flush_control(self._control_socket)
734 if self._iopub_socket:
736 if self._iopub_socket:
735 self._flush_iopub(self._iopub_socket)
737 self._flush_iopub(self._iopub_socket)
736 if self._query_socket:
738 if self._query_socket:
737 self._flush_ignored_hub_replies()
739 self._flush_ignored_hub_replies()
738
740
739 def wait(self, jobs=None, timeout=-1):
741 def wait(self, jobs=None, timeout=-1):
740 """waits on one or more `jobs`, for up to `timeout` seconds.
742 """waits on one or more `jobs`, for up to `timeout` seconds.
741
743
742 Parameters
744 Parameters
743 ----------
745 ----------
744
746
745 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
747 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
746 ints are indices to self.history
748 ints are indices to self.history
747 strs are msg_ids
749 strs are msg_ids
748 default: wait on all outstanding messages
750 default: wait on all outstanding messages
749 timeout : float
751 timeout : float
750 a time in seconds, after which to give up.
752 a time in seconds, after which to give up.
751 default is -1, which means no timeout
753 default is -1, which means no timeout
752
754
753 Returns
755 Returns
754 -------
756 -------
755
757
756 True : when all msg_ids are done
758 True : when all msg_ids are done
757 False : timeout reached, some msg_ids still outstanding
759 False : timeout reached, some msg_ids still outstanding
758 """
760 """
759 tic = time.time()
761 tic = time.time()
760 if jobs is None:
762 if jobs is None:
761 theids = self.outstanding
763 theids = self.outstanding
762 else:
764 else:
763 if isinstance(jobs, (int, str, AsyncResult)):
765 if isinstance(jobs, (int, str, AsyncResult)):
764 jobs = [jobs]
766 jobs = [jobs]
765 theids = set()
767 theids = set()
766 for job in jobs:
768 for job in jobs:
767 if isinstance(job, int):
769 if isinstance(job, int):
768 # index access
770 # index access
769 job = self.history[job]
771 job = self.history[job]
770 elif isinstance(job, AsyncResult):
772 elif isinstance(job, AsyncResult):
771 map(theids.add, job.msg_ids)
773 map(theids.add, job.msg_ids)
772 continue
774 continue
773 theids.add(job)
775 theids.add(job)
774 if not theids.intersection(self.outstanding):
776 if not theids.intersection(self.outstanding):
775 return True
777 return True
776 self.spin()
778 self.spin()
777 while theids.intersection(self.outstanding):
779 while theids.intersection(self.outstanding):
778 if timeout >= 0 and ( time.time()-tic ) > timeout:
780 if timeout >= 0 and ( time.time()-tic ) > timeout:
779 break
781 break
780 time.sleep(1e-3)
782 time.sleep(1e-3)
781 self.spin()
783 self.spin()
782 return len(theids.intersection(self.outstanding)) == 0
784 return len(theids.intersection(self.outstanding)) == 0
783
785
784 #--------------------------------------------------------------------------
786 #--------------------------------------------------------------------------
785 # Control methods
787 # Control methods
786 #--------------------------------------------------------------------------
788 #--------------------------------------------------------------------------
787
789
788 @spin_first
790 @spin_first
789 @default_block
791 @default_block
790 def clear(self, targets=None, block=None):
792 def clear(self, targets=None, block=None):
791 """Clear the namespace in target(s)."""
793 """Clear the namespace in target(s)."""
792 targets = self._build_targets(targets)[0]
794 targets = self._build_targets(targets)[0]
793 for t in targets:
795 for t in targets:
794 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
796 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
795 error = False
797 error = False
796 if self.block:
798 if self.block:
797 self._flush_ignored_control()
799 self._flush_ignored_control()
798 for i in range(len(targets)):
800 for i in range(len(targets)):
799 idents,msg = self.session.recv(self._control_socket,0)
801 idents,msg = self.session.recv(self._control_socket,0)
800 if self.debug:
802 if self.debug:
801 pprint(msg)
803 pprint(msg)
802 if msg['content']['status'] != 'ok':
804 if msg['content']['status'] != 'ok':
803 error = self._unwrap_exception(msg['content'])
805 error = self._unwrap_exception(msg['content'])
804 else:
806 else:
805 self._ignored_control_replies += len(targets)
807 self._ignored_control_replies += len(targets)
806 if error:
808 if error:
807 raise error
809 raise error
808
810
809
811
810 @spin_first
812 @spin_first
811 @default_block
813 @default_block
812 def abort(self, jobs=None, targets=None, block=None):
814 def abort(self, jobs=None, targets=None, block=None):
813 """Abort specific jobs from the execution queues of target(s).
815 """Abort specific jobs from the execution queues of target(s).
814
816
815 This is a mechanism to prevent jobs that have already been submitted
817 This is a mechanism to prevent jobs that have already been submitted
816 from executing.
818 from executing.
817
819
818 Parameters
820 Parameters
819 ----------
821 ----------
820
822
821 jobs : msg_id, list of msg_ids, or AsyncResult
823 jobs : msg_id, list of msg_ids, or AsyncResult
822 The jobs to be aborted
824 The jobs to be aborted
823
825
824
826
825 """
827 """
826 targets = self._build_targets(targets)[0]
828 targets = self._build_targets(targets)[0]
827 msg_ids = []
829 msg_ids = []
828 if isinstance(jobs, (basestring,AsyncResult)):
830 if isinstance(jobs, (basestring,AsyncResult)):
829 jobs = [jobs]
831 jobs = [jobs]
830 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
832 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
831 if bad_ids:
833 if bad_ids:
832 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
834 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
833 for j in jobs:
835 for j in jobs:
834 if isinstance(j, AsyncResult):
836 if isinstance(j, AsyncResult):
835 msg_ids.extend(j.msg_ids)
837 msg_ids.extend(j.msg_ids)
836 else:
838 else:
837 msg_ids.append(j)
839 msg_ids.append(j)
838 content = dict(msg_ids=msg_ids)
840 content = dict(msg_ids=msg_ids)
839 for t in targets:
841 for t in targets:
840 self.session.send(self._control_socket, 'abort_request',
842 self.session.send(self._control_socket, 'abort_request',
841 content=content, ident=t)
843 content=content, ident=t)
842 error = False
844 error = False
843 if self.block:
845 if self.block:
844 self._flush_ignored_control()
846 self._flush_ignored_control()
845 for i in range(len(targets)):
847 for i in range(len(targets)):
846 idents,msg = self.session.recv(self._control_socket,0)
848 idents,msg = self.session.recv(self._control_socket,0)
847 if self.debug:
849 if self.debug:
848 pprint(msg)
850 pprint(msg)
849 if msg['content']['status'] != 'ok':
851 if msg['content']['status'] != 'ok':
850 error = self._unwrap_exception(msg['content'])
852 error = self._unwrap_exception(msg['content'])
851 else:
853 else:
852 self._ignored_control_replies += len(targets)
854 self._ignored_control_replies += len(targets)
853 if error:
855 if error:
854 raise error
856 raise error
855
857
856 @spin_first
858 @spin_first
857 @default_block
859 @default_block
858 def shutdown(self, targets=None, restart=False, hub=False, block=None):
860 def shutdown(self, targets=None, restart=False, hub=False, block=None):
859 """Terminates one or more engine processes, optionally including the hub."""
861 """Terminates one or more engine processes, optionally including the hub."""
860 if hub:
862 if hub:
861 targets = 'all'
863 targets = 'all'
862 targets = self._build_targets(targets)[0]
864 targets = self._build_targets(targets)[0]
863 for t in targets:
865 for t in targets:
864 self.session.send(self._control_socket, 'shutdown_request',
866 self.session.send(self._control_socket, 'shutdown_request',
865 content={'restart':restart},ident=t)
867 content={'restart':restart},ident=t)
866 error = False
868 error = False
867 if block or hub:
869 if block or hub:
868 self._flush_ignored_control()
870 self._flush_ignored_control()
869 for i in range(len(targets)):
871 for i in range(len(targets)):
870 idents,msg = self.session.recv(self._control_socket, 0)
872 idents,msg = self.session.recv(self._control_socket, 0)
871 if self.debug:
873 if self.debug:
872 pprint(msg)
874 pprint(msg)
873 if msg['content']['status'] != 'ok':
875 if msg['content']['status'] != 'ok':
874 error = self._unwrap_exception(msg['content'])
876 error = self._unwrap_exception(msg['content'])
875 else:
877 else:
876 self._ignored_control_replies += len(targets)
878 self._ignored_control_replies += len(targets)
877
879
878 if hub:
880 if hub:
879 time.sleep(0.25)
881 time.sleep(0.25)
880 self.session.send(self._query_socket, 'shutdown_request')
882 self.session.send(self._query_socket, 'shutdown_request')
881 idents,msg = self.session.recv(self._query_socket, 0)
883 idents,msg = self.session.recv(self._query_socket, 0)
882 if self.debug:
884 if self.debug:
883 pprint(msg)
885 pprint(msg)
884 if msg['content']['status'] != 'ok':
886 if msg['content']['status'] != 'ok':
885 error = self._unwrap_exception(msg['content'])
887 error = self._unwrap_exception(msg['content'])
886
888
887 if error:
889 if error:
888 raise error
890 raise error
889
891
890 #--------------------------------------------------------------------------
892 #--------------------------------------------------------------------------
891 # Execution methods
893 # Execution methods
892 #--------------------------------------------------------------------------
894 #--------------------------------------------------------------------------
893
895
894 @default_block
896 @default_block
895 def _execute(self, code, targets='all', block=None):
897 def _execute(self, code, targets='all', block=None):
896 """Executes `code` on `targets` in blocking or nonblocking manner.
898 """Executes `code` on `targets` in blocking or nonblocking manner.
897
899
898 ``execute`` is always `bound` (affects engine namespace)
900 ``execute`` is always `bound` (affects engine namespace)
899
901
900 Parameters
902 Parameters
901 ----------
903 ----------
902
904
903 code : str
905 code : str
904 the code string to be executed
906 the code string to be executed
905 targets : int/str/list of ints/strs
907 targets : int/str/list of ints/strs
906 the engines on which to execute
908 the engines on which to execute
907 default : all
909 default : all
908 block : bool
910 block : bool
909 whether or not to wait until done to return
911 whether or not to wait until done to return
910 default: self.block
912 default: self.block
911 """
913 """
912 return self[targets].execute(code, block=block)
914 return self[targets].execute(code, block=block)
913
915
914 def _maybe_raise(self, result):
916 def _maybe_raise(self, result):
915 """wrapper for maybe raising an exception if apply failed."""
917 """wrapper for maybe raising an exception if apply failed."""
916 if isinstance(result, error.RemoteError):
918 if isinstance(result, error.RemoteError):
917 raise result
919 raise result
918
920
919 return result
921 return result
920
922
921 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
923 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
922 ident=None):
924 ident=None):
923 """construct and send an apply message via a socket.
925 """construct and send an apply message via a socket.
924
926
925 This is the principal method with which all engine execution is performed by views.
927 This is the principal method with which all engine execution is performed by views.
926 """
928 """
927
929
928 assert not self._closed, "cannot use me anymore, I'm closed!"
930 assert not self._closed, "cannot use me anymore, I'm closed!"
929 # defaults:
931 # defaults:
930 args = args if args is not None else []
932 args = args if args is not None else []
931 kwargs = kwargs if kwargs is not None else {}
933 kwargs = kwargs if kwargs is not None else {}
932 subheader = subheader if subheader is not None else {}
934 subheader = subheader if subheader is not None else {}
933
935
934 # validate arguments
936 # validate arguments
935 if not callable(f):
937 if not callable(f):
936 raise TypeError("f must be callable, not %s"%type(f))
938 raise TypeError("f must be callable, not %s"%type(f))
937 if not isinstance(args, (tuple, list)):
939 if not isinstance(args, (tuple, list)):
938 raise TypeError("args must be tuple or list, not %s"%type(args))
940 raise TypeError("args must be tuple or list, not %s"%type(args))
939 if not isinstance(kwargs, dict):
941 if not isinstance(kwargs, dict):
940 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
942 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
941 if not isinstance(subheader, dict):
943 if not isinstance(subheader, dict):
942 raise TypeError("subheader must be dict, not %s"%type(subheader))
944 raise TypeError("subheader must be dict, not %s"%type(subheader))
943
945
944 if not self._ids:
946 if not self._ids:
945 # flush notification socket if no engines yet
947 # flush notification socket if no engines yet
946 any_ids = self.ids
948 any_ids = self.ids
947 if not any_ids:
949 if not any_ids:
948 raise error.NoEnginesRegistered("Can't execute without any connected engines.")
950 raise error.NoEnginesRegistered("Can't execute without any connected engines.")
949 # enforce types of f,args,kwargs
951 # enforce types of f,args,kwargs
950
952
951 bufs = util.pack_apply_message(f,args,kwargs)
953 bufs = util.pack_apply_message(f,args,kwargs)
952
954
953 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
955 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
954 subheader=subheader, track=track)
956 subheader=subheader, track=track)
955
957
956 msg_id = msg['msg_id']
958 msg_id = msg['msg_id']
957 self.outstanding.add(msg_id)
959 self.outstanding.add(msg_id)
958 if ident:
960 if ident:
959 # possibly routed to a specific engine
961 # possibly routed to a specific engine
960 if isinstance(ident, list):
962 if isinstance(ident, list):
961 ident = ident[-1]
963 ident = ident[-1]
962 if ident in self._engines.values():
964 if ident in self._engines.values():
963 # save for later, in case of engine death
965 # save for later, in case of engine death
964 self._outstanding_dict[ident].add(msg_id)
966 self._outstanding_dict[ident].add(msg_id)
965 self.history.append(msg_id)
967 self.history.append(msg_id)
966 self.metadata[msg_id]['submitted'] = datetime.now()
968 self.metadata[msg_id]['submitted'] = datetime.now()
967
969
968 return msg
970 return msg
969
971
970 #--------------------------------------------------------------------------
972 #--------------------------------------------------------------------------
971 # construct a View object
973 # construct a View object
972 #--------------------------------------------------------------------------
974 #--------------------------------------------------------------------------
973
975
974 def load_balanced_view(self, targets=None):
976 def load_balanced_view(self, targets=None):
975 """construct a DirectView object.
977 """construct a DirectView object.
976
978
977 If no arguments are specified, create a LoadBalancedView
979 If no arguments are specified, create a LoadBalancedView
978 using all engines.
980 using all engines.
979
981
980 Parameters
982 Parameters
981 ----------
983 ----------
982
984
983 targets: list,slice,int,etc. [default: use all engines]
985 targets: list,slice,int,etc. [default: use all engines]
984 The subset of engines across which to load-balance
986 The subset of engines across which to load-balance
985 """
987 """
986 if targets is not None:
988 if targets is not None:
987 targets = self._build_targets(targets)[1]
989 targets = self._build_targets(targets)[1]
988 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
990 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
989
991
990 def direct_view(self, targets='all'):
992 def direct_view(self, targets='all'):
991 """construct a DirectView object.
993 """construct a DirectView object.
992
994
993 If no targets are specified, create a DirectView
995 If no targets are specified, create a DirectView
994 using all engines.
996 using all engines.
995
997
996 Parameters
998 Parameters
997 ----------
999 ----------
998
1000
999 targets: list,slice,int,etc. [default: use all engines]
1001 targets: list,slice,int,etc. [default: use all engines]
1000 The engines to use for the View
1002 The engines to use for the View
1001 """
1003 """
1002 single = isinstance(targets, int)
1004 single = isinstance(targets, int)
1003 targets = self._build_targets(targets)[1]
1005 targets = self._build_targets(targets)[1]
1004 if single:
1006 if single:
1005 targets = targets[0]
1007 targets = targets[0]
1006 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1008 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1007
1009
1008 #--------------------------------------------------------------------------
1010 #--------------------------------------------------------------------------
1009 # Data movement (TO BE REMOVED)
1011 # Data movement (TO BE REMOVED)
1010 #--------------------------------------------------------------------------
1012 #--------------------------------------------------------------------------
1011
1013
1012 @default_block
1014 @default_block
1013 def _push(self, ns, targets='all', block=None, track=False):
1015 def _push(self, ns, targets='all', block=None, track=False):
1014 """Push the contents of `ns` into the namespace on `target`"""
1016 """Push the contents of `ns` into the namespace on `target`"""
1015 if not isinstance(ns, dict):
1017 if not isinstance(ns, dict):
1016 raise TypeError("Must be a dict, not %s"%type(ns))
1018 raise TypeError("Must be a dict, not %s"%type(ns))
1017 result = self.apply(util._push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track)
1019 result = self.apply(util._push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track)
1018 if not block:
1020 if not block:
1019 return result
1021 return result
1020
1022
1021 @default_block
1023 @default_block
1022 def _pull(self, keys, targets='all', block=None):
1024 def _pull(self, keys, targets='all', block=None):
1023 """Pull objects from `target`'s namespace by `keys`"""
1025 """Pull objects from `target`'s namespace by `keys`"""
1024 if isinstance(keys, basestring):
1026 if isinstance(keys, basestring):
1025 pass
1027 pass
1026 elif isinstance(keys, (list,tuple,set)):
1028 elif isinstance(keys, (list,tuple,set)):
1027 for key in keys:
1029 for key in keys:
1028 if not isinstance(key, basestring):
1030 if not isinstance(key, basestring):
1029 raise TypeError("keys must be str, not type %r"%type(key))
1031 raise TypeError("keys must be str, not type %r"%type(key))
1030 else:
1032 else:
1031 raise TypeError("keys must be strs, not %r"%keys)
1033 raise TypeError("keys must be strs, not %r"%keys)
1032 result = self.apply(util._pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1034 result = self.apply(util._pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1033 return result
1035 return result
1034
1036
1035 #--------------------------------------------------------------------------
1037 #--------------------------------------------------------------------------
1036 # Query methods
1038 # Query methods
1037 #--------------------------------------------------------------------------
1039 #--------------------------------------------------------------------------
1038
1040
1039 @spin_first
1041 @spin_first
1040 @default_block
1042 @default_block
1041 def get_result(self, indices_or_msg_ids=None, block=None):
1043 def get_result(self, indices_or_msg_ids=None, block=None):
1042 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1044 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1043
1045
1044 If the client already has the results, no request to the Hub will be made.
1046 If the client already has the results, no request to the Hub will be made.
1045
1047
1046 This is a convenient way to construct AsyncResult objects, which are wrappers
1048 This is a convenient way to construct AsyncResult objects, which are wrappers
1047 that include metadata about execution, and allow for awaiting results that
1049 that include metadata about execution, and allow for awaiting results that
1048 were not submitted by this Client.
1050 were not submitted by this Client.
1049
1051
1050 It can also be a convenient way to retrieve the metadata associated with
1052 It can also be a convenient way to retrieve the metadata associated with
1051 blocking execution, since it always retrieves
1053 blocking execution, since it always retrieves
1052
1054
1053 Examples
1055 Examples
1054 --------
1056 --------
1055 ::
1057 ::
1056
1058
1057 In [10]: r = client.apply()
1059 In [10]: r = client.apply()
1058
1060
1059 Parameters
1061 Parameters
1060 ----------
1062 ----------
1061
1063
1062 indices_or_msg_ids : integer history index, str msg_id, or list of either
1064 indices_or_msg_ids : integer history index, str msg_id, or list of either
1063 The indices or msg_ids of indices to be retrieved
1065 The indices or msg_ids of indices to be retrieved
1064
1066
1065 block : bool
1067 block : bool
1066 Whether to wait for the result to be done
1068 Whether to wait for the result to be done
1067
1069
1068 Returns
1070 Returns
1069 -------
1071 -------
1070
1072
1071 AsyncResult
1073 AsyncResult
1072 A single AsyncResult object will always be returned.
1074 A single AsyncResult object will always be returned.
1073
1075
1074 AsyncHubResult
1076 AsyncHubResult
1075 A subclass of AsyncResult that retrieves results from the Hub
1077 A subclass of AsyncResult that retrieves results from the Hub
1076
1078
1077 """
1079 """
1078 if indices_or_msg_ids is None:
1080 if indices_or_msg_ids is None:
1079 indices_or_msg_ids = -1
1081 indices_or_msg_ids = -1
1080
1082
1081 if not isinstance(indices_or_msg_ids, (list,tuple)):
1083 if not isinstance(indices_or_msg_ids, (list,tuple)):
1082 indices_or_msg_ids = [indices_or_msg_ids]
1084 indices_or_msg_ids = [indices_or_msg_ids]
1083
1085
1084 theids = []
1086 theids = []
1085 for id in indices_or_msg_ids:
1087 for id in indices_or_msg_ids:
1086 if isinstance(id, int):
1088 if isinstance(id, int):
1087 id = self.history[id]
1089 id = self.history[id]
1088 if not isinstance(id, str):
1090 if not isinstance(id, str):
1089 raise TypeError("indices must be str or int, not %r"%id)
1091 raise TypeError("indices must be str or int, not %r"%id)
1090 theids.append(id)
1092 theids.append(id)
1091
1093
1092 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1094 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1093 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1095 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1094
1096
1095 if remote_ids:
1097 if remote_ids:
1096 ar = AsyncHubResult(self, msg_ids=theids)
1098 ar = AsyncHubResult(self, msg_ids=theids)
1097 else:
1099 else:
1098 ar = AsyncResult(self, msg_ids=theids)
1100 ar = AsyncResult(self, msg_ids=theids)
1099
1101
1100 if block:
1102 if block:
1101 ar.wait()
1103 ar.wait()
1102
1104
1103 return ar
1105 return ar
1104
1106
1105 @spin_first
1107 @spin_first
1106 def result_status(self, msg_ids, status_only=True):
1108 def result_status(self, msg_ids, status_only=True):
1107 """Check on the status of the result(s) of the apply request with `msg_ids`.
1109 """Check on the status of the result(s) of the apply request with `msg_ids`.
1108
1110
1109 If status_only is False, then the actual results will be retrieved, else
1111 If status_only is False, then the actual results will be retrieved, else
1110 only the status of the results will be checked.
1112 only the status of the results will be checked.
1111
1113
1112 Parameters
1114 Parameters
1113 ----------
1115 ----------
1114
1116
1115 msg_ids : list of msg_ids
1117 msg_ids : list of msg_ids
1116 if int:
1118 if int:
1117 Passed as index to self.history for convenience.
1119 Passed as index to self.history for convenience.
1118 status_only : bool (default: True)
1120 status_only : bool (default: True)
1119 if False:
1121 if False:
1120 Retrieve the actual results of completed tasks.
1122 Retrieve the actual results of completed tasks.
1121
1123
1122 Returns
1124 Returns
1123 -------
1125 -------
1124
1126
1125 results : dict
1127 results : dict
1126 There will always be the keys 'pending' and 'completed', which will
1128 There will always be the keys 'pending' and 'completed', which will
1127 be lists of msg_ids that are incomplete or complete. If `status_only`
1129 be lists of msg_ids that are incomplete or complete. If `status_only`
1128 is False, then completed results will be keyed by their `msg_id`.
1130 is False, then completed results will be keyed by their `msg_id`.
1129 """
1131 """
1130 if not isinstance(msg_ids, (list,tuple)):
1132 if not isinstance(msg_ids, (list,tuple)):
1131 msg_ids = [msg_ids]
1133 msg_ids = [msg_ids]
1132
1134
1133 theids = []
1135 theids = []
1134 for msg_id in msg_ids:
1136 for msg_id in msg_ids:
1135 if isinstance(msg_id, int):
1137 if isinstance(msg_id, int):
1136 msg_id = self.history[msg_id]
1138 msg_id = self.history[msg_id]
1137 if not isinstance(msg_id, basestring):
1139 if not isinstance(msg_id, basestring):
1138 raise TypeError("msg_ids must be str, not %r"%msg_id)
1140 raise TypeError("msg_ids must be str, not %r"%msg_id)
1139 theids.append(msg_id)
1141 theids.append(msg_id)
1140
1142
1141 completed = []
1143 completed = []
1142 local_results = {}
1144 local_results = {}
1143
1145
1144 # comment this block out to temporarily disable local shortcut:
1146 # comment this block out to temporarily disable local shortcut:
1145 for msg_id in theids:
1147 for msg_id in theids:
1146 if msg_id in self.results:
1148 if msg_id in self.results:
1147 completed.append(msg_id)
1149 completed.append(msg_id)
1148 local_results[msg_id] = self.results[msg_id]
1150 local_results[msg_id] = self.results[msg_id]
1149 theids.remove(msg_id)
1151 theids.remove(msg_id)
1150
1152
1151 if theids: # some not locally cached
1153 if theids: # some not locally cached
1152 content = dict(msg_ids=theids, status_only=status_only)
1154 content = dict(msg_ids=theids, status_only=status_only)
1153 msg = self.session.send(self._query_socket, "result_request", content=content)
1155 msg = self.session.send(self._query_socket, "result_request", content=content)
1154 zmq.select([self._query_socket], [], [])
1156 zmq.select([self._query_socket], [], [])
1155 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1157 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1156 if self.debug:
1158 if self.debug:
1157 pprint(msg)
1159 pprint(msg)
1158 content = msg['content']
1160 content = msg['content']
1159 if content['status'] != 'ok':
1161 if content['status'] != 'ok':
1160 raise self._unwrap_exception(content)
1162 raise self._unwrap_exception(content)
1161 buffers = msg['buffers']
1163 buffers = msg['buffers']
1162 else:
1164 else:
1163 content = dict(completed=[],pending=[])
1165 content = dict(completed=[],pending=[])
1164
1166
1165 content['completed'].extend(completed)
1167 content['completed'].extend(completed)
1166
1168
1167 if status_only:
1169 if status_only:
1168 return content
1170 return content
1169
1171
1170 failures = []
1172 failures = []
1171 # load cached results into result:
1173 # load cached results into result:
1172 content.update(local_results)
1174 content.update(local_results)
1173 # update cache with results:
1175 # update cache with results:
1174 for msg_id in sorted(theids):
1176 for msg_id in sorted(theids):
1175 if msg_id in content['completed']:
1177 if msg_id in content['completed']:
1176 rec = content[msg_id]
1178 rec = content[msg_id]
1177 parent = rec['header']
1179 parent = rec['header']
1178 header = rec['result_header']
1180 header = rec['result_header']
1179 rcontent = rec['result_content']
1181 rcontent = rec['result_content']
1180 iodict = rec['io']
1182 iodict = rec['io']
1181 if isinstance(rcontent, str):
1183 if isinstance(rcontent, str):
1182 rcontent = self.session.unpack(rcontent)
1184 rcontent = self.session.unpack(rcontent)
1183
1185
1184 md = self.metadata[msg_id]
1186 md = self.metadata[msg_id]
1185 md.update(self._extract_metadata(header, parent, rcontent))
1187 md.update(self._extract_metadata(header, parent, rcontent))
1186 md.update(iodict)
1188 md.update(iodict)
1187
1189
1188 if rcontent['status'] == 'ok':
1190 if rcontent['status'] == 'ok':
1189 res,buffers = util.unserialize_object(buffers)
1191 res,buffers = util.unserialize_object(buffers)
1190 else:
1192 else:
1191 print rcontent
1193 print rcontent
1192 res = self._unwrap_exception(rcontent)
1194 res = self._unwrap_exception(rcontent)
1193 failures.append(res)
1195 failures.append(res)
1194
1196
1195 self.results[msg_id] = res
1197 self.results[msg_id] = res
1196 content[msg_id] = res
1198 content[msg_id] = res
1197
1199
1198 if len(theids) == 1 and failures:
1200 if len(theids) == 1 and failures:
1199 raise failures[0]
1201 raise failures[0]
1200
1202
1201 error.collect_exceptions(failures, "result_status")
1203 error.collect_exceptions(failures, "result_status")
1202 return content
1204 return content
1203
1205
1204 @spin_first
1206 @spin_first
1205 def queue_status(self, targets='all', verbose=False):
1207 def queue_status(self, targets='all', verbose=False):
1206 """Fetch the status of engine queues.
1208 """Fetch the status of engine queues.
1207
1209
1208 Parameters
1210 Parameters
1209 ----------
1211 ----------
1210
1212
1211 targets : int/str/list of ints/strs
1213 targets : int/str/list of ints/strs
1212 the engines whose states are to be queried.
1214 the engines whose states are to be queried.
1213 default : all
1215 default : all
1214 verbose : bool
1216 verbose : bool
1215 Whether to return lengths only, or lists of ids for each element
1217 Whether to return lengths only, or lists of ids for each element
1216 """
1218 """
1217 engine_ids = self._build_targets(targets)[1]
1219 engine_ids = self._build_targets(targets)[1]
1218 content = dict(targets=engine_ids, verbose=verbose)
1220 content = dict(targets=engine_ids, verbose=verbose)
1219 self.session.send(self._query_socket, "queue_request", content=content)
1221 self.session.send(self._query_socket, "queue_request", content=content)
1220 idents,msg = self.session.recv(self._query_socket, 0)
1222 idents,msg = self.session.recv(self._query_socket, 0)
1221 if self.debug:
1223 if self.debug:
1222 pprint(msg)
1224 pprint(msg)
1223 content = msg['content']
1225 content = msg['content']
1224 status = content.pop('status')
1226 status = content.pop('status')
1225 if status != 'ok':
1227 if status != 'ok':
1226 raise self._unwrap_exception(content)
1228 raise self._unwrap_exception(content)
1227 content = util.rekey(content)
1229 content = util.rekey(content)
1228 if isinstance(targets, int):
1230 if isinstance(targets, int):
1229 return content[targets]
1231 return content[targets]
1230 else:
1232 else:
1231 return content
1233 return content
1232
1234
1233 @spin_first
1235 @spin_first
1234 def purge_results(self, jobs=[], targets=[]):
1236 def purge_results(self, jobs=[], targets=[]):
1235 """Tell the Hub to forget results.
1237 """Tell the Hub to forget results.
1236
1238
1237 Individual results can be purged by msg_id, or the entire
1239 Individual results can be purged by msg_id, or the entire
1238 history of specific targets can be purged.
1240 history of specific targets can be purged.
1239
1241
1240 Parameters
1242 Parameters
1241 ----------
1243 ----------
1242
1244
1243 jobs : str or list of str or AsyncResult objects
1245 jobs : str or list of str or AsyncResult objects
1244 the msg_ids whose results should be forgotten.
1246 the msg_ids whose results should be forgotten.
1245 targets : int/str/list of ints/strs
1247 targets : int/str/list of ints/strs
1246 The targets, by uuid or int_id, whose entire history is to be purged.
1248 The targets, by uuid or int_id, whose entire history is to be purged.
1247 Use `targets='all'` to scrub everything from the Hub's memory.
1249 Use `targets='all'` to scrub everything from the Hub's memory.
1248
1250
1249 default : None
1251 default : None
1250 """
1252 """
1251 if not targets and not jobs:
1253 if not targets and not jobs:
1252 raise ValueError("Must specify at least one of `targets` and `jobs`")
1254 raise ValueError("Must specify at least one of `targets` and `jobs`")
1253 if targets:
1255 if targets:
1254 targets = self._build_targets(targets)[1]
1256 targets = self._build_targets(targets)[1]
1255
1257
1256 # construct msg_ids from jobs
1258 # construct msg_ids from jobs
1257 msg_ids = []
1259 msg_ids = []
1258 if isinstance(jobs, (basestring,AsyncResult)):
1260 if isinstance(jobs, (basestring,AsyncResult)):
1259 jobs = [jobs]
1261 jobs = [jobs]
1260 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1262 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1261 if bad_ids:
1263 if bad_ids:
1262 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1264 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1263 for j in jobs:
1265 for j in jobs:
1264 if isinstance(j, AsyncResult):
1266 if isinstance(j, AsyncResult):
1265 msg_ids.extend(j.msg_ids)
1267 msg_ids.extend(j.msg_ids)
1266 else:
1268 else:
1267 msg_ids.append(j)
1269 msg_ids.append(j)
1268
1270
1269 content = dict(targets=targets, msg_ids=msg_ids)
1271 content = dict(targets=targets, msg_ids=msg_ids)
1270 self.session.send(self._query_socket, "purge_request", content=content)
1272 self.session.send(self._query_socket, "purge_request", content=content)
1271 idents, msg = self.session.recv(self._query_socket, 0)
1273 idents, msg = self.session.recv(self._query_socket, 0)
1272 if self.debug:
1274 if self.debug:
1273 pprint(msg)
1275 pprint(msg)
1274 content = msg['content']
1276 content = msg['content']
1275 if content['status'] != 'ok':
1277 if content['status'] != 'ok':
1276 raise self._unwrap_exception(content)
1278 raise self._unwrap_exception(content)
1277
1279
1278
1280
1279 __all__ = [ 'Client' ]
1281 __all__ = [ 'Client' ]
@@ -1,1089 +1,1091
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller Hub with 0MQ
2 """The IPython Controller Hub with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010 The IPython Development Team
7 # Copyright (C) 2010 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import sys
18 import sys
19 import time
19 import time
20 from datetime import datetime
20 from datetime import datetime
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24 from zmq.eventloop.zmqstream import ZMQStream
24 from zmq.eventloop.zmqstream import ZMQStream
25
25
26 # internal:
26 # internal:
27 from IPython.utils.importstring import import_item
27 from IPython.utils.importstring import import_item
28 from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool
28 from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool
29
29
30 from IPython.parallel import error
30 from IPython.parallel import error
31 from IPython.parallel.factory import RegistrationFactory, LoggingFactory
31 from IPython.parallel.factory import RegistrationFactory, LoggingFactory
32 from IPython.parallel.util import select_random_ports, validate_url_container, ISO8601
32 from IPython.parallel.util import select_random_ports, validate_url_container, ISO8601
33
33
34 from .heartmonitor import HeartMonitor
34 from .heartmonitor import HeartMonitor
35
35
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37 # Code
37 # Code
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39
39
40 def _passer(*args, **kwargs):
40 def _passer(*args, **kwargs):
41 return
41 return
42
42
43 def _printer(*args, **kwargs):
43 def _printer(*args, **kwargs):
44 print (args)
44 print (args)
45 print (kwargs)
45 print (kwargs)
46
46
47 def empty_record():
47 def empty_record():
48 """Return an empty dict with all record keys."""
48 """Return an empty dict with all record keys."""
49 return {
49 return {
50 'msg_id' : None,
50 'msg_id' : None,
51 'header' : None,
51 'header' : None,
52 'content': None,
52 'content': None,
53 'buffers': None,
53 'buffers': None,
54 'submitted': None,
54 'submitted': None,
55 'client_uuid' : None,
55 'client_uuid' : None,
56 'engine_uuid' : None,
56 'engine_uuid' : None,
57 'started': None,
57 'started': None,
58 'completed': None,
58 'completed': None,
59 'resubmitted': None,
59 'resubmitted': None,
60 'result_header' : None,
60 'result_header' : None,
61 'result_content' : None,
61 'result_content' : None,
62 'result_buffers' : None,
62 'result_buffers' : None,
63 'queue' : None,
63 'queue' : None,
64 'pyin' : None,
64 'pyin' : None,
65 'pyout': None,
65 'pyout': None,
66 'pyerr': None,
66 'pyerr': None,
67 'stdout': '',
67 'stdout': '',
68 'stderr': '',
68 'stderr': '',
69 }
69 }
70
70
71 def init_record(msg):
71 def init_record(msg):
72 """Initialize a TaskRecord based on a request."""
72 """Initialize a TaskRecord based on a request."""
73 header = msg['header']
73 header = msg['header']
74 return {
74 return {
75 'msg_id' : header['msg_id'],
75 'msg_id' : header['msg_id'],
76 'header' : header,
76 'header' : header,
77 'content': msg['content'],
77 'content': msg['content'],
78 'buffers': msg['buffers'],
78 'buffers': msg['buffers'],
79 'submitted': datetime.strptime(header['date'], ISO8601),
79 'submitted': datetime.strptime(header['date'], ISO8601),
80 'client_uuid' : None,
80 'client_uuid' : None,
81 'engine_uuid' : None,
81 'engine_uuid' : None,
82 'started': None,
82 'started': None,
83 'completed': None,
83 'completed': None,
84 'resubmitted': None,
84 'resubmitted': None,
85 'result_header' : None,
85 'result_header' : None,
86 'result_content' : None,
86 'result_content' : None,
87 'result_buffers' : None,
87 'result_buffers' : None,
88 'queue' : None,
88 'queue' : None,
89 'pyin' : None,
89 'pyin' : None,
90 'pyout': None,
90 'pyout': None,
91 'pyerr': None,
91 'pyerr': None,
92 'stdout': '',
92 'stdout': '',
93 'stderr': '',
93 'stderr': '',
94 }
94 }
95
95
96
96
97 class EngineConnector(HasTraits):
97 class EngineConnector(HasTraits):
98 """A simple object for accessing the various zmq connections of an object.
98 """A simple object for accessing the various zmq connections of an object.
99 Attributes are:
99 Attributes are:
100 id (int): engine ID
100 id (int): engine ID
101 uuid (str): uuid (unused?)
101 uuid (str): uuid (unused?)
102 queue (str): identity of queue's XREQ socket
102 queue (str): identity of queue's XREQ socket
103 registration (str): identity of registration XREQ socket
103 registration (str): identity of registration XREQ socket
104 heartbeat (str): identity of heartbeat XREQ socket
104 heartbeat (str): identity of heartbeat XREQ socket
105 """
105 """
106 id=Int(0)
106 id=Int(0)
107 queue=Str()
107 queue=Str()
108 control=Str()
108 control=Str()
109 registration=Str()
109 registration=Str()
110 heartbeat=Str()
110 heartbeat=Str()
111 pending=Set()
111 pending=Set()
112
112
113 class HubFactory(RegistrationFactory):
113 class HubFactory(RegistrationFactory):
114 """The Configurable for setting up a Hub."""
114 """The Configurable for setting up a Hub."""
115
115
116 # name of a scheduler scheme
116 # name of a scheduler scheme
117 scheme = Str('leastload', config=True)
117 scheme = Str('leastload', config=True)
118
118
119 # port-pairs for monitoredqueues:
119 # port-pairs for monitoredqueues:
120 hb = Instance(list, config=True)
120 hb = Instance(list, config=True)
121 def _hb_default(self):
121 def _hb_default(self):
122 return select_random_ports(2)
122 return select_random_ports(2)
123
123
124 mux = Instance(list, config=True)
124 mux = Instance(list, config=True)
125 def _mux_default(self):
125 def _mux_default(self):
126 return select_random_ports(2)
126 return select_random_ports(2)
127
127
128 task = Instance(list, config=True)
128 task = Instance(list, config=True)
129 def _task_default(self):
129 def _task_default(self):
130 return select_random_ports(2)
130 return select_random_ports(2)
131
131
132 control = Instance(list, config=True)
132 control = Instance(list, config=True)
133 def _control_default(self):
133 def _control_default(self):
134 return select_random_ports(2)
134 return select_random_ports(2)
135
135
136 iopub = Instance(list, config=True)
136 iopub = Instance(list, config=True)
137 def _iopub_default(self):
137 def _iopub_default(self):
138 return select_random_ports(2)
138 return select_random_ports(2)
139
139
140 # single ports:
140 # single ports:
141 mon_port = Instance(int, config=True)
141 mon_port = Instance(int, config=True)
142 def _mon_port_default(self):
142 def _mon_port_default(self):
143 return select_random_ports(1)[0]
143 return select_random_ports(1)[0]
144
144
145 notifier_port = Instance(int, config=True)
145 notifier_port = Instance(int, config=True)
146 def _notifier_port_default(self):
146 def _notifier_port_default(self):
147 return select_random_ports(1)[0]
147 return select_random_ports(1)[0]
148
148
149 ping = Int(1000, config=True) # ping frequency
149 ping = Int(1000, config=True) # ping frequency
150
150
151 engine_ip = CStr('127.0.0.1', config=True)
151 engine_ip = CStr('127.0.0.1', config=True)
152 engine_transport = CStr('tcp', config=True)
152 engine_transport = CStr('tcp', config=True)
153
153
154 client_ip = CStr('127.0.0.1', config=True)
154 client_ip = CStr('127.0.0.1', config=True)
155 client_transport = CStr('tcp', config=True)
155 client_transport = CStr('tcp', config=True)
156
156
157 monitor_ip = CStr('127.0.0.1', config=True)
157 monitor_ip = CStr('127.0.0.1', config=True)
158 monitor_transport = CStr('tcp', config=True)
158 monitor_transport = CStr('tcp', config=True)
159
159
160 monitor_url = CStr('')
160 monitor_url = CStr('')
161
161
162 db_class = CStr('IPython.parallel.controller.dictdb.DictDB', config=True)
162 db_class = CStr('IPython.parallel.controller.dictdb.DictDB', config=True)
163
163
164 # not configurable
164 # not configurable
165 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
165 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
166 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
166 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
167 subconstructors = List()
167 subconstructors = List()
168 _constructed = Bool(False)
168 _constructed = Bool(False)
169
169
170 def _ip_changed(self, name, old, new):
170 def _ip_changed(self, name, old, new):
171 self.engine_ip = new
171 self.engine_ip = new
172 self.client_ip = new
172 self.client_ip = new
173 self.monitor_ip = new
173 self.monitor_ip = new
174 self._update_monitor_url()
174 self._update_monitor_url()
175
175
176 def _update_monitor_url(self):
176 def _update_monitor_url(self):
177 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
177 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
178
178
179 def _transport_changed(self, name, old, new):
179 def _transport_changed(self, name, old, new):
180 self.engine_transport = new
180 self.engine_transport = new
181 self.client_transport = new
181 self.client_transport = new
182 self.monitor_transport = new
182 self.monitor_transport = new
183 self._update_monitor_url()
183 self._update_monitor_url()
184
184
185 def __init__(self, **kwargs):
185 def __init__(self, **kwargs):
186 super(HubFactory, self).__init__(**kwargs)
186 super(HubFactory, self).__init__(**kwargs)
187 self._update_monitor_url()
187 self._update_monitor_url()
188 # self.on_trait_change(self._sync_ips, 'ip')
188 # self.on_trait_change(self._sync_ips, 'ip')
189 # self.on_trait_change(self._sync_transports, 'transport')
189 # self.on_trait_change(self._sync_transports, 'transport')
190 self.subconstructors.append(self.construct_hub)
190 self.subconstructors.append(self.construct_hub)
191
191
192
192
193 def construct(self):
193 def construct(self):
194 assert not self._constructed, "already constructed!"
194 assert not self._constructed, "already constructed!"
195
195
196 for subc in self.subconstructors:
196 for subc in self.subconstructors:
197 subc()
197 subc()
198
198
199 self._constructed = True
199 self._constructed = True
200
200
201
201
202 def start(self):
202 def start(self):
203 assert self._constructed, "must be constructed by self.construct() first!"
203 assert self._constructed, "must be constructed by self.construct() first!"
204 self.heartmonitor.start()
204 self.heartmonitor.start()
205 self.log.info("Heartmonitor started")
205 self.log.info("Heartmonitor started")
206
206
207 def construct_hub(self):
207 def construct_hub(self):
208 """construct"""
208 """construct"""
209 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
209 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
210 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
210 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
211
211
212 ctx = self.context
212 ctx = self.context
213 loop = self.loop
213 loop = self.loop
214
214
215 # Registrar socket
215 # Registrar socket
216 q = ZMQStream(ctx.socket(zmq.XREP), loop)
216 q = ZMQStream(ctx.socket(zmq.XREP), loop)
217 q.bind(client_iface % self.regport)
217 q.bind(client_iface % self.regport)
218 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
218 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
219 if self.client_ip != self.engine_ip:
219 if self.client_ip != self.engine_ip:
220 q.bind(engine_iface % self.regport)
220 q.bind(engine_iface % self.regport)
221 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
221 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
222
222
223 ### Engine connections ###
223 ### Engine connections ###
224
224
225 # heartbeat
225 # heartbeat
226 hpub = ctx.socket(zmq.PUB)
226 hpub = ctx.socket(zmq.PUB)
227 hpub.bind(engine_iface % self.hb[0])
227 hpub.bind(engine_iface % self.hb[0])
228 hrep = ctx.socket(zmq.XREP)
228 hrep = ctx.socket(zmq.XREP)
229 hrep.bind(engine_iface % self.hb[1])
229 hrep.bind(engine_iface % self.hb[1])
230 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
230 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
231 period=self.ping, logname=self.log.name)
231 period=self.ping, logname=self.log.name)
232
232
233 ### Client connections ###
233 ### Client connections ###
234 # Notifier socket
234 # Notifier socket
235 n = ZMQStream(ctx.socket(zmq.PUB), loop)
235 n = ZMQStream(ctx.socket(zmq.PUB), loop)
236 n.bind(client_iface%self.notifier_port)
236 n.bind(client_iface%self.notifier_port)
237
237
238 ### build and launch the queues ###
238 ### build and launch the queues ###
239
239
240 # monitor socket
240 # monitor socket
241 sub = ctx.socket(zmq.SUB)
241 sub = ctx.socket(zmq.SUB)
242 sub.setsockopt(zmq.SUBSCRIBE, "")
242 sub.setsockopt(zmq.SUBSCRIBE, "")
243 sub.bind(self.monitor_url)
243 sub.bind(self.monitor_url)
244 sub.bind('inproc://monitor')
244 sub.bind('inproc://monitor')
245 sub = ZMQStream(sub, loop)
245 sub = ZMQStream(sub, loop)
246
246
247 # connect the db
247 # connect the db
248 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
248 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
249 # cdir = self.config.Global.cluster_dir
249 # cdir = self.config.Global.cluster_dir
250 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
250 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
251 time.sleep(.25)
251 time.sleep(.25)
252
252
253 # build connection dicts
253 # build connection dicts
254 self.engine_info = {
254 self.engine_info = {
255 'control' : engine_iface%self.control[1],
255 'control' : engine_iface%self.control[1],
256 'mux': engine_iface%self.mux[1],
256 'mux': engine_iface%self.mux[1],
257 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
257 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
258 'task' : engine_iface%self.task[1],
258 'task' : engine_iface%self.task[1],
259 'iopub' : engine_iface%self.iopub[1],
259 'iopub' : engine_iface%self.iopub[1],
260 # 'monitor' : engine_iface%self.mon_port,
260 # 'monitor' : engine_iface%self.mon_port,
261 }
261 }
262
262
263 self.client_info = {
263 self.client_info = {
264 'control' : client_iface%self.control[0],
264 'control' : client_iface%self.control[0],
265 'mux': client_iface%self.mux[0],
265 'mux': client_iface%self.mux[0],
266 'task' : (self.scheme, client_iface%self.task[0]),
266 'task' : (self.scheme, client_iface%self.task[0]),
267 'iopub' : client_iface%self.iopub[0],
267 'iopub' : client_iface%self.iopub[0],
268 'notification': client_iface%self.notifier_port
268 'notification': client_iface%self.notifier_port
269 }
269 }
270 self.log.debug("Hub engine addrs: %s"%self.engine_info)
270 self.log.debug("Hub engine addrs: %s"%self.engine_info)
271 self.log.debug("Hub client addrs: %s"%self.client_info)
271 self.log.debug("Hub client addrs: %s"%self.client_info)
272 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
272 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
273 query=q, notifier=n, db=self.db,
273 query=q, notifier=n, db=self.db,
274 engine_info=self.engine_info, client_info=self.client_info,
274 engine_info=self.engine_info, client_info=self.client_info,
275 logname=self.log.name)
275 logname=self.log.name)
276
276
277
277
278 class Hub(LoggingFactory):
278 class Hub(LoggingFactory):
279 """The IPython Controller Hub with 0MQ connections
279 """The IPython Controller Hub with 0MQ connections
280
280
281 Parameters
281 Parameters
282 ==========
282 ==========
283 loop: zmq IOLoop instance
283 loop: zmq IOLoop instance
284 session: StreamSession object
284 session: StreamSession object
285 <removed> context: zmq context for creating new connections (?)
285 <removed> context: zmq context for creating new connections (?)
286 queue: ZMQStream for monitoring the command queue (SUB)
286 queue: ZMQStream for monitoring the command queue (SUB)
287 query: ZMQStream for engine registration and client queries requests (XREP)
287 query: ZMQStream for engine registration and client queries requests (XREP)
288 heartbeat: HeartMonitor object checking the pulse of the engines
288 heartbeat: HeartMonitor object checking the pulse of the engines
289 notifier: ZMQStream for broadcasting engine registration changes (PUB)
289 notifier: ZMQStream for broadcasting engine registration changes (PUB)
290 db: connection to db for out of memory logging of commands
290 db: connection to db for out of memory logging of commands
291 NotImplemented
291 NotImplemented
292 engine_info: dict of zmq connection information for engines to connect
292 engine_info: dict of zmq connection information for engines to connect
293 to the queues.
293 to the queues.
294 client_info: dict of zmq connection information for engines to connect
294 client_info: dict of zmq connection information for engines to connect
295 to the queues.
295 to the queues.
296 """
296 """
297 # internal data structures:
297 # internal data structures:
298 ids=Set() # engine IDs
298 ids=Set() # engine IDs
299 keytable=Dict()
299 keytable=Dict()
300 by_ident=Dict()
300 by_ident=Dict()
301 engines=Dict()
301 engines=Dict()
302 clients=Dict()
302 clients=Dict()
303 hearts=Dict()
303 hearts=Dict()
304 pending=Set()
304 pending=Set()
305 queues=Dict() # pending msg_ids keyed by engine_id
305 queues=Dict() # pending msg_ids keyed by engine_id
306 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
306 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
307 completed=Dict() # completed msg_ids keyed by engine_id
307 completed=Dict() # completed msg_ids keyed by engine_id
308 all_completed=Set() # completed msg_ids keyed by engine_id
308 all_completed=Set() # completed msg_ids keyed by engine_id
309 dead_engines=Set() # completed msg_ids keyed by engine_id
309 dead_engines=Set() # completed msg_ids keyed by engine_id
310 # mia=None
310 # mia=None
311 incoming_registrations=Dict()
311 incoming_registrations=Dict()
312 registration_timeout=Int()
312 registration_timeout=Int()
313 _idcounter=Int(0)
313 _idcounter=Int(0)
314
314
315 # objects from constructor:
315 # objects from constructor:
316 loop=Instance(ioloop.IOLoop)
316 loop=Instance(ioloop.IOLoop)
317 query=Instance(ZMQStream)
317 query=Instance(ZMQStream)
318 monitor=Instance(ZMQStream)
318 monitor=Instance(ZMQStream)
319 heartmonitor=Instance(HeartMonitor)
319 heartmonitor=Instance(HeartMonitor)
320 notifier=Instance(ZMQStream)
320 notifier=Instance(ZMQStream)
321 db=Instance(object)
321 db=Instance(object)
322 client_info=Dict()
322 client_info=Dict()
323 engine_info=Dict()
323 engine_info=Dict()
324
324
325
325
326 def __init__(self, **kwargs):
326 def __init__(self, **kwargs):
327 """
327 """
328 # universal:
328 # universal:
329 loop: IOLoop for creating future connections
329 loop: IOLoop for creating future connections
330 session: streamsession for sending serialized data
330 session: streamsession for sending serialized data
331 # engine:
331 # engine:
332 queue: ZMQStream for monitoring queue messages
332 queue: ZMQStream for monitoring queue messages
333 query: ZMQStream for engine+client registration and client requests
333 query: ZMQStream for engine+client registration and client requests
334 heartbeat: HeartMonitor object for tracking engines
334 heartbeat: HeartMonitor object for tracking engines
335 # extra:
335 # extra:
336 db: ZMQStream for db connection (NotImplemented)
336 db: ZMQStream for db connection (NotImplemented)
337 engine_info: zmq address/protocol dict for engine connections
337 engine_info: zmq address/protocol dict for engine connections
338 client_info: zmq address/protocol dict for client connections
338 client_info: zmq address/protocol dict for client connections
339 """
339 """
340
340
341 super(Hub, self).__init__(**kwargs)
341 super(Hub, self).__init__(**kwargs)
342 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
342 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
343
343
344 # validate connection dicts:
344 # validate connection dicts:
345 for k,v in self.client_info.iteritems():
345 for k,v in self.client_info.iteritems():
346 if k == 'task':
346 if k == 'task':
347 validate_url_container(v[1])
347 validate_url_container(v[1])
348 else:
348 else:
349 validate_url_container(v)
349 validate_url_container(v)
350 # validate_url_container(self.client_info)
350 # validate_url_container(self.client_info)
351 validate_url_container(self.engine_info)
351 validate_url_container(self.engine_info)
352
352
353 # register our callbacks
353 # register our callbacks
354 self.query.on_recv(self.dispatch_query)
354 self.query.on_recv(self.dispatch_query)
355 self.monitor.on_recv(self.dispatch_monitor_traffic)
355 self.monitor.on_recv(self.dispatch_monitor_traffic)
356
356
357 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
357 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
358 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
358 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
359
359
360 self.monitor_handlers = { 'in' : self.save_queue_request,
360 self.monitor_handlers = { 'in' : self.save_queue_request,
361 'out': self.save_queue_result,
361 'out': self.save_queue_result,
362 'intask': self.save_task_request,
362 'intask': self.save_task_request,
363 'outtask': self.save_task_result,
363 'outtask': self.save_task_result,
364 'tracktask': self.save_task_destination,
364 'tracktask': self.save_task_destination,
365 'incontrol': _passer,
365 'incontrol': _passer,
366 'outcontrol': _passer,
366 'outcontrol': _passer,
367 'iopub': self.save_iopub_message,
367 'iopub': self.save_iopub_message,
368 }
368 }
369
369
370 self.query_handlers = {'queue_request': self.queue_status,
370 self.query_handlers = {'queue_request': self.queue_status,
371 'result_request': self.get_results,
371 'result_request': self.get_results,
372 'purge_request': self.purge_results,
372 'purge_request': self.purge_results,
373 'load_request': self.check_load,
373 'load_request': self.check_load,
374 'resubmit_request': self.resubmit_task,
374 'resubmit_request': self.resubmit_task,
375 'shutdown_request': self.shutdown_request,
375 'shutdown_request': self.shutdown_request,
376 'registration_request' : self.register_engine,
376 'registration_request' : self.register_engine,
377 'unregistration_request' : self.unregister_engine,
377 'unregistration_request' : self.unregister_engine,
378 'connection_request': self.connection_request,
378 'connection_request': self.connection_request,
379 }
379 }
380
380
381 self.log.info("hub::created hub")
381 self.log.info("hub::created hub")
382
382
383 @property
383 @property
384 def _next_id(self):
384 def _next_id(self):
385 """gemerate a new ID.
385 """gemerate a new ID.
386
386
387 No longer reuse old ids, just count from 0."""
387 No longer reuse old ids, just count from 0."""
388 newid = self._idcounter
388 newid = self._idcounter
389 self._idcounter += 1
389 self._idcounter += 1
390 return newid
390 return newid
391 # newid = 0
391 # newid = 0
392 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
392 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
393 # # print newid, self.ids, self.incoming_registrations
393 # # print newid, self.ids, self.incoming_registrations
394 # while newid in self.ids or newid in incoming:
394 # while newid in self.ids or newid in incoming:
395 # newid += 1
395 # newid += 1
396 # return newid
396 # return newid
397
397
398 #-----------------------------------------------------------------------------
398 #-----------------------------------------------------------------------------
399 # message validation
399 # message validation
400 #-----------------------------------------------------------------------------
400 #-----------------------------------------------------------------------------
401
401
402 def _validate_targets(self, targets):
402 def _validate_targets(self, targets):
403 """turn any valid targets argument into a list of integer ids"""
403 """turn any valid targets argument into a list of integer ids"""
404 if targets is None:
404 if targets is None:
405 # default to all
405 # default to all
406 targets = self.ids
406 targets = self.ids
407
407
408 if isinstance(targets, (int,str,unicode)):
408 if isinstance(targets, (int,str,unicode)):
409 # only one target specified
409 # only one target specified
410 targets = [targets]
410 targets = [targets]
411 _targets = []
411 _targets = []
412 for t in targets:
412 for t in targets:
413 # map raw identities to ids
413 # map raw identities to ids
414 if isinstance(t, (str,unicode)):
414 if isinstance(t, (str,unicode)):
415 t = self.by_ident.get(t, t)
415 t = self.by_ident.get(t, t)
416 _targets.append(t)
416 _targets.append(t)
417 targets = _targets
417 targets = _targets
418 bad_targets = [ t for t in targets if t not in self.ids ]
418 bad_targets = [ t for t in targets if t not in self.ids ]
419 if bad_targets:
419 if bad_targets:
420 raise IndexError("No Such Engine: %r"%bad_targets)
420 raise IndexError("No Such Engine: %r"%bad_targets)
421 if not targets:
421 if not targets:
422 raise IndexError("No Engines Registered")
422 raise IndexError("No Engines Registered")
423 return targets
423 return targets
424
424
425 #-----------------------------------------------------------------------------
425 #-----------------------------------------------------------------------------
426 # dispatch methods (1 per stream)
426 # dispatch methods (1 per stream)
427 #-----------------------------------------------------------------------------
427 #-----------------------------------------------------------------------------
428
428
429 # def dispatch_registration_request(self, msg):
429 # def dispatch_registration_request(self, msg):
430 # """"""
430 # """"""
431 # self.log.debug("registration::dispatch_register_request(%s)"%msg)
431 # self.log.debug("registration::dispatch_register_request(%s)"%msg)
432 # idents,msg = self.session.feed_identities(msg)
432 # idents,msg = self.session.feed_identities(msg)
433 # if not idents:
433 # if not idents:
434 # self.log.error("Bad Query Message: %s"%msg, exc_info=True)
434 # self.log.error("Bad Query Message: %s"%msg, exc_info=True)
435 # return
435 # return
436 # try:
436 # try:
437 # msg = self.session.unpack_message(msg,content=True)
437 # msg = self.session.unpack_message(msg,content=True)
438 # except:
438 # except:
439 # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
439 # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
440 # return
440 # return
441 #
441 #
442 # msg_type = msg['msg_type']
442 # msg_type = msg['msg_type']
443 # content = msg['content']
443 # content = msg['content']
444 #
444 #
445 # handler = self.query_handlers.get(msg_type, None)
445 # handler = self.query_handlers.get(msg_type, None)
446 # if handler is None:
446 # if handler is None:
447 # self.log.error("registration::got bad registration message: %s"%msg)
447 # self.log.error("registration::got bad registration message: %s"%msg)
448 # else:
448 # else:
449 # handler(idents, msg)
449 # handler(idents, msg)
450
450
451 def dispatch_monitor_traffic(self, msg):
451 def dispatch_monitor_traffic(self, msg):
452 """all ME and Task queue messages come through here, as well as
452 """all ME and Task queue messages come through here, as well as
453 IOPub traffic."""
453 IOPub traffic."""
454 self.log.debug("monitor traffic: %s"%msg[:2])
454 self.log.debug("monitor traffic: %s"%msg[:2])
455 switch = msg[0]
455 switch = msg[0]
456 idents, msg = self.session.feed_identities(msg[1:])
456 idents, msg = self.session.feed_identities(msg[1:])
457 if not idents:
457 if not idents:
458 self.log.error("Bad Monitor Message: %s"%msg)
458 self.log.error("Bad Monitor Message: %s"%msg)
459 return
459 return
460 handler = self.monitor_handlers.get(switch, None)
460 handler = self.monitor_handlers.get(switch, None)
461 if handler is not None:
461 if handler is not None:
462 handler(idents, msg)
462 handler(idents, msg)
463 else:
463 else:
464 self.log.error("Invalid monitor topic: %s"%switch)
464 self.log.error("Invalid monitor topic: %s"%switch)
465
465
466
466
467 def dispatch_query(self, msg):
467 def dispatch_query(self, msg):
468 """Route registration requests and queries from clients."""
468 """Route registration requests and queries from clients."""
469 idents, msg = self.session.feed_identities(msg)
469 idents, msg = self.session.feed_identities(msg)
470 if not idents:
470 if not idents:
471 self.log.error("Bad Query Message: %s"%msg)
471 self.log.error("Bad Query Message: %s"%msg)
472 return
472 return
473 client_id = idents[0]
473 client_id = idents[0]
474 try:
474 try:
475 msg = self.session.unpack_message(msg, content=True)
475 msg = self.session.unpack_message(msg, content=True)
476 except:
476 except:
477 content = error.wrap_exception()
477 content = error.wrap_exception()
478 self.log.error("Bad Query Message: %s"%msg, exc_info=True)
478 self.log.error("Bad Query Message: %s"%msg, exc_info=True)
479 self.session.send(self.query, "hub_error", ident=client_id,
479 self.session.send(self.query, "hub_error", ident=client_id,
480 content=content)
480 content=content)
481 return
481 return
482
482
483 # print client_id, header, parent, content
483 # print client_id, header, parent, content
484 #switch on message type:
484 #switch on message type:
485 msg_type = msg['msg_type']
485 msg_type = msg['msg_type']
486 self.log.info("client::client %s requested %s"%(client_id, msg_type))
486 self.log.info("client::client %s requested %s"%(client_id, msg_type))
487 handler = self.query_handlers.get(msg_type, None)
487 handler = self.query_handlers.get(msg_type, None)
488 try:
488 try:
489 assert handler is not None, "Bad Message Type: %s"%msg_type
489 assert handler is not None, "Bad Message Type: %s"%msg_type
490 except:
490 except:
491 content = error.wrap_exception()
491 content = error.wrap_exception()
492 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
492 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
493 self.session.send(self.query, "hub_error", ident=client_id,
493 self.session.send(self.query, "hub_error", ident=client_id,
494 content=content)
494 content=content)
495 return
495 return
496 else:
496 else:
497 handler(idents, msg)
497 handler(idents, msg)
498
498
499 def dispatch_db(self, msg):
499 def dispatch_db(self, msg):
500 """"""
500 """"""
501 raise NotImplementedError
501 raise NotImplementedError
502
502
503 #---------------------------------------------------------------------------
503 #---------------------------------------------------------------------------
504 # handler methods (1 per event)
504 # handler methods (1 per event)
505 #---------------------------------------------------------------------------
505 #---------------------------------------------------------------------------
506
506
507 #----------------------- Heartbeat --------------------------------------
507 #----------------------- Heartbeat --------------------------------------
508
508
509 def handle_new_heart(self, heart):
509 def handle_new_heart(self, heart):
510 """handler to attach to heartbeater.
510 """handler to attach to heartbeater.
511 Called when a new heart starts to beat.
511 Called when a new heart starts to beat.
512 Triggers completion of registration."""
512 Triggers completion of registration."""
513 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
513 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
514 if heart not in self.incoming_registrations:
514 if heart not in self.incoming_registrations:
515 self.log.info("heartbeat::ignoring new heart: %r"%heart)
515 self.log.info("heartbeat::ignoring new heart: %r"%heart)
516 else:
516 else:
517 self.finish_registration(heart)
517 self.finish_registration(heart)
518
518
519
519
520 def handle_heart_failure(self, heart):
520 def handle_heart_failure(self, heart):
521 """handler to attach to heartbeater.
521 """handler to attach to heartbeater.
522 called when a previously registered heart fails to respond to beat request.
522 called when a previously registered heart fails to respond to beat request.
523 triggers unregistration"""
523 triggers unregistration"""
524 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
524 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
525 eid = self.hearts.get(heart, None)
525 eid = self.hearts.get(heart, None)
526 queue = self.engines[eid].queue
526 queue = self.engines[eid].queue
527 if eid is None:
527 if eid is None:
528 self.log.info("heartbeat::ignoring heart failure %r"%heart)
528 self.log.info("heartbeat::ignoring heart failure %r"%heart)
529 else:
529 else:
530 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
530 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
531
531
532 #----------------------- MUX Queue Traffic ------------------------------
532 #----------------------- MUX Queue Traffic ------------------------------
533
533
534 def save_queue_request(self, idents, msg):
534 def save_queue_request(self, idents, msg):
535 if len(idents) < 2:
535 if len(idents) < 2:
536 self.log.error("invalid identity prefix: %s"%idents)
536 self.log.error("invalid identity prefix: %s"%idents)
537 return
537 return
538 queue_id, client_id = idents[:2]
538 queue_id, client_id = idents[:2]
539 try:
539 try:
540 msg = self.session.unpack_message(msg, content=False)
540 msg = self.session.unpack_message(msg, content=False)
541 except:
541 except:
542 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
542 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
543 return
543 return
544
544
545 eid = self.by_ident.get(queue_id, None)
545 eid = self.by_ident.get(queue_id, None)
546 if eid is None:
546 if eid is None:
547 self.log.error("queue::target %r not registered"%queue_id)
547 self.log.error("queue::target %r not registered"%queue_id)
548 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
548 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
549 return
549 return
550
550
551 header = msg['header']
551 header = msg['header']
552 msg_id = header['msg_id']
552 msg_id = header['msg_id']
553 record = init_record(msg)
553 record = init_record(msg)
554 record['engine_uuid'] = queue_id
554 record['engine_uuid'] = queue_id
555 record['client_uuid'] = client_id
555 record['client_uuid'] = client_id
556 record['queue'] = 'mux'
556 record['queue'] = 'mux'
557
557
558 try:
558 try:
559 # it's posible iopub arrived first:
559 # it's posible iopub arrived first:
560 existing = self.db.get_record(msg_id)
560 existing = self.db.get_record(msg_id)
561 for key,evalue in existing.iteritems():
561 for key,evalue in existing.iteritems():
562 rvalue = record[key]
562 rvalue = record[key]
563 if evalue and rvalue and evalue != rvalue:
563 if evalue and rvalue and evalue != rvalue:
564 self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
564 self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
565 elif evalue and not rvalue:
565 elif evalue and not rvalue:
566 record[key] = evalue
566 record[key] = evalue
567 self.db.update_record(msg_id, record)
567 self.db.update_record(msg_id, record)
568 except KeyError:
568 except KeyError:
569 self.db.add_record(msg_id, record)
569 self.db.add_record(msg_id, record)
570
570
571 self.pending.add(msg_id)
571 self.pending.add(msg_id)
572 self.queues[eid].append(msg_id)
572 self.queues[eid].append(msg_id)
573
573
574 def save_queue_result(self, idents, msg):
574 def save_queue_result(self, idents, msg):
575 if len(idents) < 2:
575 if len(idents) < 2:
576 self.log.error("invalid identity prefix: %s"%idents)
576 self.log.error("invalid identity prefix: %s"%idents)
577 return
577 return
578
578
579 client_id, queue_id = idents[:2]
579 client_id, queue_id = idents[:2]
580 try:
580 try:
581 msg = self.session.unpack_message(msg, content=False)
581 msg = self.session.unpack_message(msg, content=False)
582 except:
582 except:
583 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
583 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
584 queue_id,client_id, msg), exc_info=True)
584 queue_id,client_id, msg), exc_info=True)
585 return
585 return
586
586
587 eid = self.by_ident.get(queue_id, None)
587 eid = self.by_ident.get(queue_id, None)
588 if eid is None:
588 if eid is None:
589 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
589 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
590 # self.log.debug("queue:: %s"%msg[2:])
590 # self.log.debug("queue:: %s"%msg[2:])
591 return
591 return
592
592
593 parent = msg['parent_header']
593 parent = msg['parent_header']
594 if not parent:
594 if not parent:
595 return
595 return
596 msg_id = parent['msg_id']
596 msg_id = parent['msg_id']
597 if msg_id in self.pending:
597 if msg_id in self.pending:
598 self.pending.remove(msg_id)
598 self.pending.remove(msg_id)
599 self.all_completed.add(msg_id)
599 self.all_completed.add(msg_id)
600 self.queues[eid].remove(msg_id)
600 self.queues[eid].remove(msg_id)
601 self.completed[eid].append(msg_id)
601 self.completed[eid].append(msg_id)
602 elif msg_id not in self.all_completed:
602 elif msg_id not in self.all_completed:
603 # it could be a result from a dead engine that died before delivering the
603 # it could be a result from a dead engine that died before delivering the
604 # result
604 # result
605 self.log.warn("queue:: unknown msg finished %s"%msg_id)
605 self.log.warn("queue:: unknown msg finished %s"%msg_id)
606 return
606 return
607 # update record anyway, because the unregistration could have been premature
607 # update record anyway, because the unregistration could have been premature
608 rheader = msg['header']
608 rheader = msg['header']
609 completed = datetime.strptime(rheader['date'], ISO8601)
609 completed = datetime.strptime(rheader['date'], ISO8601)
610 started = rheader.get('started', None)
610 started = rheader.get('started', None)
611 if started is not None:
611 if started is not None:
612 started = datetime.strptime(started, ISO8601)
612 started = datetime.strptime(started, ISO8601)
613 result = {
613 result = {
614 'result_header' : rheader,
614 'result_header' : rheader,
615 'result_content': msg['content'],
615 'result_content': msg['content'],
616 'started' : started,
616 'started' : started,
617 'completed' : completed
617 'completed' : completed
618 }
618 }
619
619
620 result['result_buffers'] = msg['buffers']
620 result['result_buffers'] = msg['buffers']
621 self.db.update_record(msg_id, result)
621 self.db.update_record(msg_id, result)
622
622
623
623
624 #--------------------- Task Queue Traffic ------------------------------
624 #--------------------- Task Queue Traffic ------------------------------
625
625
626 def save_task_request(self, idents, msg):
626 def save_task_request(self, idents, msg):
627 """Save the submission of a task."""
627 """Save the submission of a task."""
628 client_id = idents[0]
628 client_id = idents[0]
629
629
630 try:
630 try:
631 msg = self.session.unpack_message(msg, content=False)
631 msg = self.session.unpack_message(msg, content=False)
632 except:
632 except:
633 self.log.error("task::client %r sent invalid task message: %s"%(
633 self.log.error("task::client %r sent invalid task message: %s"%(
634 client_id, msg), exc_info=True)
634 client_id, msg), exc_info=True)
635 return
635 return
636 record = init_record(msg)
636 record = init_record(msg)
637
637
638 record['client_uuid'] = client_id
638 record['client_uuid'] = client_id
639 record['queue'] = 'task'
639 record['queue'] = 'task'
640 header = msg['header']
640 header = msg['header']
641 msg_id = header['msg_id']
641 msg_id = header['msg_id']
642 self.pending.add(msg_id)
642 self.pending.add(msg_id)
643 try:
643 try:
644 # it's posible iopub arrived first:
644 # it's posible iopub arrived first:
645 existing = self.db.get_record(msg_id)
645 existing = self.db.get_record(msg_id)
646 for key,evalue in existing.iteritems():
646 for key,evalue in existing.iteritems():
647 rvalue = record[key]
647 rvalue = record[key]
648 if evalue and rvalue and evalue != rvalue:
648 if evalue and rvalue and evalue != rvalue:
649 self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
649 self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
650 elif evalue and not rvalue:
650 elif evalue and not rvalue:
651 record[key] = evalue
651 record[key] = evalue
652 self.db.update_record(msg_id, record)
652 self.db.update_record(msg_id, record)
653 except KeyError:
653 except KeyError:
654 self.db.add_record(msg_id, record)
654 self.db.add_record(msg_id, record)
655
655
656 def save_task_result(self, idents, msg):
656 def save_task_result(self, idents, msg):
657 """save the result of a completed task."""
657 """save the result of a completed task."""
658 client_id = idents[0]
658 client_id = idents[0]
659 try:
659 try:
660 msg = self.session.unpack_message(msg, content=False)
660 msg = self.session.unpack_message(msg, content=False)
661 except:
661 except:
662 self.log.error("task::invalid task result message send to %r: %s"%(
662 self.log.error("task::invalid task result message send to %r: %s"%(
663 client_id, msg), exc_info=True)
663 client_id, msg), exc_info=True)
664 raise
664 raise
665 return
665 return
666
666
667 parent = msg['parent_header']
667 parent = msg['parent_header']
668 if not parent:
668 if not parent:
669 # print msg
669 # print msg
670 self.log.warn("Task %r had no parent!"%msg)
670 self.log.warn("Task %r had no parent!"%msg)
671 return
671 return
672 msg_id = parent['msg_id']
672 msg_id = parent['msg_id']
673
673
674 header = msg['header']
674 header = msg['header']
675 engine_uuid = header.get('engine', None)
675 engine_uuid = header.get('engine', None)
676 eid = self.by_ident.get(engine_uuid, None)
676 eid = self.by_ident.get(engine_uuid, None)
677
677
678 if msg_id in self.pending:
678 if msg_id in self.pending:
679 self.pending.remove(msg_id)
679 self.pending.remove(msg_id)
680 self.all_completed.add(msg_id)
680 self.all_completed.add(msg_id)
681 if eid is not None:
681 if eid is not None:
682 self.completed[eid].append(msg_id)
682 self.completed[eid].append(msg_id)
683 if msg_id in self.tasks[eid]:
683 if msg_id in self.tasks[eid]:
684 self.tasks[eid].remove(msg_id)
684 self.tasks[eid].remove(msg_id)
685 completed = datetime.strptime(header['date'], ISO8601)
685 completed = datetime.strptime(header['date'], ISO8601)
686 started = header.get('started', None)
686 started = header.get('started', None)
687 if started is not None:
687 if started is not None:
688 started = datetime.strptime(started, ISO8601)
688 started = datetime.strptime(started, ISO8601)
689 result = {
689 result = {
690 'result_header' : header,
690 'result_header' : header,
691 'result_content': msg['content'],
691 'result_content': msg['content'],
692 'started' : started,
692 'started' : started,
693 'completed' : completed,
693 'completed' : completed,
694 'engine_uuid': engine_uuid
694 'engine_uuid': engine_uuid
695 }
695 }
696
696
697 result['result_buffers'] = msg['buffers']
697 result['result_buffers'] = msg['buffers']
698 self.db.update_record(msg_id, result)
698 self.db.update_record(msg_id, result)
699
699
700 else:
700 else:
701 self.log.debug("task::unknown task %s finished"%msg_id)
701 self.log.debug("task::unknown task %s finished"%msg_id)
702
702
703 def save_task_destination(self, idents, msg):
703 def save_task_destination(self, idents, msg):
704 try:
704 try:
705 msg = self.session.unpack_message(msg, content=True)
705 msg = self.session.unpack_message(msg, content=True)
706 except:
706 except:
707 self.log.error("task::invalid task tracking message", exc_info=True)
707 self.log.error("task::invalid task tracking message", exc_info=True)
708 return
708 return
709 content = msg['content']
709 content = msg['content']
710 # print (content)
710 # print (content)
711 msg_id = content['msg_id']
711 msg_id = content['msg_id']
712 engine_uuid = content['engine_id']
712 engine_uuid = content['engine_id']
713 eid = self.by_ident[engine_uuid]
713 eid = self.by_ident[engine_uuid]
714
714
715 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
715 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
716 # if msg_id in self.mia:
716 # if msg_id in self.mia:
717 # self.mia.remove(msg_id)
717 # self.mia.remove(msg_id)
718 # else:
718 # else:
719 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
719 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
720
720
721 self.tasks[eid].append(msg_id)
721 self.tasks[eid].append(msg_id)
722 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
722 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
723 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
723 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
724
724
725 def mia_task_request(self, idents, msg):
725 def mia_task_request(self, idents, msg):
726 raise NotImplementedError
726 raise NotImplementedError
727 client_id = idents[0]
727 client_id = idents[0]
728 # content = dict(mia=self.mia,status='ok')
728 # content = dict(mia=self.mia,status='ok')
729 # self.session.send('mia_reply', content=content, idents=client_id)
729 # self.session.send('mia_reply', content=content, idents=client_id)
730
730
731
731
732 #--------------------- IOPub Traffic ------------------------------
732 #--------------------- IOPub Traffic ------------------------------
733
733
734 def save_iopub_message(self, topics, msg):
734 def save_iopub_message(self, topics, msg):
735 """save an iopub message into the db"""
735 """save an iopub message into the db"""
736 # print (topics)
736 # print (topics)
737 try:
737 try:
738 msg = self.session.unpack_message(msg, content=True)
738 msg = self.session.unpack_message(msg, content=True)
739 except:
739 except:
740 self.log.error("iopub::invalid IOPub message", exc_info=True)
740 self.log.error("iopub::invalid IOPub message", exc_info=True)
741 return
741 return
742
742
743 parent = msg['parent_header']
743 parent = msg['parent_header']
744 if not parent:
744 if not parent:
745 self.log.error("iopub::invalid IOPub message: %s"%msg)
745 self.log.error("iopub::invalid IOPub message: %s"%msg)
746 return
746 return
747 msg_id = parent['msg_id']
747 msg_id = parent['msg_id']
748 msg_type = msg['msg_type']
748 msg_type = msg['msg_type']
749 content = msg['content']
749 content = msg['content']
750
750
751 # ensure msg_id is in db
751 # ensure msg_id is in db
752 try:
752 try:
753 rec = self.db.get_record(msg_id)
753 rec = self.db.get_record(msg_id)
754 except KeyError:
754 except KeyError:
755 rec = empty_record()
755 rec = empty_record()
756 rec['msg_id'] = msg_id
756 rec['msg_id'] = msg_id
757 self.db.add_record(msg_id, rec)
757 self.db.add_record(msg_id, rec)
758 # stream
758 # stream
759 d = {}
759 d = {}
760 if msg_type == 'stream':
760 if msg_type == 'stream':
761 name = content['name']
761 name = content['name']
762 s = rec[name] or ''
762 s = rec[name] or ''
763 d[name] = s + content['data']
763 d[name] = s + content['data']
764
764
765 elif msg_type == 'pyerr':
765 elif msg_type == 'pyerr':
766 d['pyerr'] = content
766 d['pyerr'] = content
767 elif msg_type == 'pyin':
768 d['pyin'] = content['code']
767 else:
769 else:
768 d[msg_type] = content['data']
770 d[msg_type] = content.get('data', '')
769
771
770 self.db.update_record(msg_id, d)
772 self.db.update_record(msg_id, d)
771
773
772
774
773
775
774 #-------------------------------------------------------------------------
776 #-------------------------------------------------------------------------
775 # Registration requests
777 # Registration requests
776 #-------------------------------------------------------------------------
778 #-------------------------------------------------------------------------
777
779
778 def connection_request(self, client_id, msg):
780 def connection_request(self, client_id, msg):
779 """Reply with connection addresses for clients."""
781 """Reply with connection addresses for clients."""
780 self.log.info("client::client %s connected"%client_id)
782 self.log.info("client::client %s connected"%client_id)
781 content = dict(status='ok')
783 content = dict(status='ok')
782 content.update(self.client_info)
784 content.update(self.client_info)
783 jsonable = {}
785 jsonable = {}
784 for k,v in self.keytable.iteritems():
786 for k,v in self.keytable.iteritems():
785 if v not in self.dead_engines:
787 if v not in self.dead_engines:
786 jsonable[str(k)] = v
788 jsonable[str(k)] = v
787 content['engines'] = jsonable
789 content['engines'] = jsonable
788 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
790 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
789
791
790 def register_engine(self, reg, msg):
792 def register_engine(self, reg, msg):
791 """Register a new engine."""
793 """Register a new engine."""
792 content = msg['content']
794 content = msg['content']
793 try:
795 try:
794 queue = content['queue']
796 queue = content['queue']
795 except KeyError:
797 except KeyError:
796 self.log.error("registration::queue not specified", exc_info=True)
798 self.log.error("registration::queue not specified", exc_info=True)
797 return
799 return
798 heart = content.get('heartbeat', None)
800 heart = content.get('heartbeat', None)
799 """register a new engine, and create the socket(s) necessary"""
801 """register a new engine, and create the socket(s) necessary"""
800 eid = self._next_id
802 eid = self._next_id
801 # print (eid, queue, reg, heart)
803 # print (eid, queue, reg, heart)
802
804
803 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
805 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
804
806
805 content = dict(id=eid,status='ok')
807 content = dict(id=eid,status='ok')
806 content.update(self.engine_info)
808 content.update(self.engine_info)
807 # check if requesting available IDs:
809 # check if requesting available IDs:
808 if queue in self.by_ident:
810 if queue in self.by_ident:
809 try:
811 try:
810 raise KeyError("queue_id %r in use"%queue)
812 raise KeyError("queue_id %r in use"%queue)
811 except:
813 except:
812 content = error.wrap_exception()
814 content = error.wrap_exception()
813 self.log.error("queue_id %r in use"%queue, exc_info=True)
815 self.log.error("queue_id %r in use"%queue, exc_info=True)
814 elif heart in self.hearts: # need to check unique hearts?
816 elif heart in self.hearts: # need to check unique hearts?
815 try:
817 try:
816 raise KeyError("heart_id %r in use"%heart)
818 raise KeyError("heart_id %r in use"%heart)
817 except:
819 except:
818 self.log.error("heart_id %r in use"%heart, exc_info=True)
820 self.log.error("heart_id %r in use"%heart, exc_info=True)
819 content = error.wrap_exception()
821 content = error.wrap_exception()
820 else:
822 else:
821 for h, pack in self.incoming_registrations.iteritems():
823 for h, pack in self.incoming_registrations.iteritems():
822 if heart == h:
824 if heart == h:
823 try:
825 try:
824 raise KeyError("heart_id %r in use"%heart)
826 raise KeyError("heart_id %r in use"%heart)
825 except:
827 except:
826 self.log.error("heart_id %r in use"%heart, exc_info=True)
828 self.log.error("heart_id %r in use"%heart, exc_info=True)
827 content = error.wrap_exception()
829 content = error.wrap_exception()
828 break
830 break
829 elif queue == pack[1]:
831 elif queue == pack[1]:
830 try:
832 try:
831 raise KeyError("queue_id %r in use"%queue)
833 raise KeyError("queue_id %r in use"%queue)
832 except:
834 except:
833 self.log.error("queue_id %r in use"%queue, exc_info=True)
835 self.log.error("queue_id %r in use"%queue, exc_info=True)
834 content = error.wrap_exception()
836 content = error.wrap_exception()
835 break
837 break
836
838
837 msg = self.session.send(self.query, "registration_reply",
839 msg = self.session.send(self.query, "registration_reply",
838 content=content,
840 content=content,
839 ident=reg)
841 ident=reg)
840
842
841 if content['status'] == 'ok':
843 if content['status'] == 'ok':
842 if heart in self.heartmonitor.hearts:
844 if heart in self.heartmonitor.hearts:
843 # already beating
845 # already beating
844 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
846 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
845 self.finish_registration(heart)
847 self.finish_registration(heart)
846 else:
848 else:
847 purge = lambda : self._purge_stalled_registration(heart)
849 purge = lambda : self._purge_stalled_registration(heart)
848 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
850 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
849 dc.start()
851 dc.start()
850 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
852 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
851 else:
853 else:
852 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
854 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
853 return eid
855 return eid
854
856
855 def unregister_engine(self, ident, msg):
857 def unregister_engine(self, ident, msg):
856 """Unregister an engine that explicitly requested to leave."""
858 """Unregister an engine that explicitly requested to leave."""
857 try:
859 try:
858 eid = msg['content']['id']
860 eid = msg['content']['id']
859 except:
861 except:
860 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
862 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
861 return
863 return
862 self.log.info("registration::unregister_engine(%s)"%eid)
864 self.log.info("registration::unregister_engine(%s)"%eid)
863 # print (eid)
865 # print (eid)
864 uuid = self.keytable[eid]
866 uuid = self.keytable[eid]
865 content=dict(id=eid, queue=uuid)
867 content=dict(id=eid, queue=uuid)
866 self.dead_engines.add(uuid)
868 self.dead_engines.add(uuid)
867 # self.ids.remove(eid)
869 # self.ids.remove(eid)
868 # uuid = self.keytable.pop(eid)
870 # uuid = self.keytable.pop(eid)
869 #
871 #
870 # ec = self.engines.pop(eid)
872 # ec = self.engines.pop(eid)
871 # self.hearts.pop(ec.heartbeat)
873 # self.hearts.pop(ec.heartbeat)
872 # self.by_ident.pop(ec.queue)
874 # self.by_ident.pop(ec.queue)
873 # self.completed.pop(eid)
875 # self.completed.pop(eid)
874 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
876 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
875 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
877 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
876 dc.start()
878 dc.start()
877 ############## TODO: HANDLE IT ################
879 ############## TODO: HANDLE IT ################
878
880
879 if self.notifier:
881 if self.notifier:
880 self.session.send(self.notifier, "unregistration_notification", content=content)
882 self.session.send(self.notifier, "unregistration_notification", content=content)
881
883
882 def _handle_stranded_msgs(self, eid, uuid):
884 def _handle_stranded_msgs(self, eid, uuid):
883 """Handle messages known to be on an engine when the engine unregisters.
885 """Handle messages known to be on an engine when the engine unregisters.
884
886
885 It is possible that this will fire prematurely - that is, an engine will
887 It is possible that this will fire prematurely - that is, an engine will
886 go down after completing a result, and the client will be notified
888 go down after completing a result, and the client will be notified
887 that the result failed and later receive the actual result.
889 that the result failed and later receive the actual result.
888 """
890 """
889
891
890 outstanding = self.queues[eid]
892 outstanding = self.queues[eid]
891
893
892 for msg_id in outstanding:
894 for msg_id in outstanding:
893 self.pending.remove(msg_id)
895 self.pending.remove(msg_id)
894 self.all_completed.add(msg_id)
896 self.all_completed.add(msg_id)
895 try:
897 try:
896 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
898 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
897 except:
899 except:
898 content = error.wrap_exception()
900 content = error.wrap_exception()
899 # build a fake header:
901 # build a fake header:
900 header = {}
902 header = {}
901 header['engine'] = uuid
903 header['engine'] = uuid
902 header['date'] = datetime.now().strftime(ISO8601)
904 header['date'] = datetime.now().strftime(ISO8601)
903 rec = dict(result_content=content, result_header=header, result_buffers=[])
905 rec = dict(result_content=content, result_header=header, result_buffers=[])
904 rec['completed'] = header['date']
906 rec['completed'] = header['date']
905 rec['engine_uuid'] = uuid
907 rec['engine_uuid'] = uuid
906 self.db.update_record(msg_id, rec)
908 self.db.update_record(msg_id, rec)
907
909
908 def finish_registration(self, heart):
910 def finish_registration(self, heart):
909 """Second half of engine registration, called after our HeartMonitor
911 """Second half of engine registration, called after our HeartMonitor
910 has received a beat from the Engine's Heart."""
912 has received a beat from the Engine's Heart."""
911 try:
913 try:
912 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
914 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
913 except KeyError:
915 except KeyError:
914 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
916 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
915 return
917 return
916 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
918 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
917 if purge is not None:
919 if purge is not None:
918 purge.stop()
920 purge.stop()
919 control = queue
921 control = queue
920 self.ids.add(eid)
922 self.ids.add(eid)
921 self.keytable[eid] = queue
923 self.keytable[eid] = queue
922 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
924 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
923 control=control, heartbeat=heart)
925 control=control, heartbeat=heart)
924 self.by_ident[queue] = eid
926 self.by_ident[queue] = eid
925 self.queues[eid] = list()
927 self.queues[eid] = list()
926 self.tasks[eid] = list()
928 self.tasks[eid] = list()
927 self.completed[eid] = list()
929 self.completed[eid] = list()
928 self.hearts[heart] = eid
930 self.hearts[heart] = eid
929 content = dict(id=eid, queue=self.engines[eid].queue)
931 content = dict(id=eid, queue=self.engines[eid].queue)
930 if self.notifier:
932 if self.notifier:
931 self.session.send(self.notifier, "registration_notification", content=content)
933 self.session.send(self.notifier, "registration_notification", content=content)
932 self.log.info("engine::Engine Connected: %i"%eid)
934 self.log.info("engine::Engine Connected: %i"%eid)
933
935
934 def _purge_stalled_registration(self, heart):
936 def _purge_stalled_registration(self, heart):
935 if heart in self.incoming_registrations:
937 if heart in self.incoming_registrations:
936 eid = self.incoming_registrations.pop(heart)[0]
938 eid = self.incoming_registrations.pop(heart)[0]
937 self.log.info("registration::purging stalled registration: %i"%eid)
939 self.log.info("registration::purging stalled registration: %i"%eid)
938 else:
940 else:
939 pass
941 pass
940
942
941 #-------------------------------------------------------------------------
943 #-------------------------------------------------------------------------
942 # Client Requests
944 # Client Requests
943 #-------------------------------------------------------------------------
945 #-------------------------------------------------------------------------
944
946
945 def shutdown_request(self, client_id, msg):
947 def shutdown_request(self, client_id, msg):
946 """handle shutdown request."""
948 """handle shutdown request."""
947 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
949 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
948 # also notify other clients of shutdown
950 # also notify other clients of shutdown
949 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
951 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
950 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
952 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
951 dc.start()
953 dc.start()
952
954
953 def _shutdown(self):
955 def _shutdown(self):
954 self.log.info("hub::hub shutting down.")
956 self.log.info("hub::hub shutting down.")
955 time.sleep(0.1)
957 time.sleep(0.1)
956 sys.exit(0)
958 sys.exit(0)
957
959
958
960
959 def check_load(self, client_id, msg):
961 def check_load(self, client_id, msg):
960 content = msg['content']
962 content = msg['content']
961 try:
963 try:
962 targets = content['targets']
964 targets = content['targets']
963 targets = self._validate_targets(targets)
965 targets = self._validate_targets(targets)
964 except:
966 except:
965 content = error.wrap_exception()
967 content = error.wrap_exception()
966 self.session.send(self.query, "hub_error",
968 self.session.send(self.query, "hub_error",
967 content=content, ident=client_id)
969 content=content, ident=client_id)
968 return
970 return
969
971
970 content = dict(status='ok')
972 content = dict(status='ok')
971 # loads = {}
973 # loads = {}
972 for t in targets:
974 for t in targets:
973 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
975 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
974 self.session.send(self.query, "load_reply", content=content, ident=client_id)
976 self.session.send(self.query, "load_reply", content=content, ident=client_id)
975
977
976
978
977 def queue_status(self, client_id, msg):
979 def queue_status(self, client_id, msg):
978 """Return the Queue status of one or more targets.
980 """Return the Queue status of one or more targets.
979 if verbose: return the msg_ids
981 if verbose: return the msg_ids
980 else: return len of each type.
982 else: return len of each type.
981 keys: queue (pending MUX jobs)
983 keys: queue (pending MUX jobs)
982 tasks (pending Task jobs)
984 tasks (pending Task jobs)
983 completed (finished jobs from both queues)"""
985 completed (finished jobs from both queues)"""
984 content = msg['content']
986 content = msg['content']
985 targets = content['targets']
987 targets = content['targets']
986 try:
988 try:
987 targets = self._validate_targets(targets)
989 targets = self._validate_targets(targets)
988 except:
990 except:
989 content = error.wrap_exception()
991 content = error.wrap_exception()
990 self.session.send(self.query, "hub_error",
992 self.session.send(self.query, "hub_error",
991 content=content, ident=client_id)
993 content=content, ident=client_id)
992 return
994 return
993 verbose = content.get('verbose', False)
995 verbose = content.get('verbose', False)
994 content = dict(status='ok')
996 content = dict(status='ok')
995 for t in targets:
997 for t in targets:
996 queue = self.queues[t]
998 queue = self.queues[t]
997 completed = self.completed[t]
999 completed = self.completed[t]
998 tasks = self.tasks[t]
1000 tasks = self.tasks[t]
999 if not verbose:
1001 if not verbose:
1000 queue = len(queue)
1002 queue = len(queue)
1001 completed = len(completed)
1003 completed = len(completed)
1002 tasks = len(tasks)
1004 tasks = len(tasks)
1003 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1005 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1004 # pending
1006 # pending
1005 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1007 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1006
1008
1007 def purge_results(self, client_id, msg):
1009 def purge_results(self, client_id, msg):
1008 """Purge results from memory. This method is more valuable before we move
1010 """Purge results from memory. This method is more valuable before we move
1009 to a DB based message storage mechanism."""
1011 to a DB based message storage mechanism."""
1010 content = msg['content']
1012 content = msg['content']
1011 msg_ids = content.get('msg_ids', [])
1013 msg_ids = content.get('msg_ids', [])
1012 reply = dict(status='ok')
1014 reply = dict(status='ok')
1013 if msg_ids == 'all':
1015 if msg_ids == 'all':
1014 self.db.drop_matching_records(dict(completed={'$ne':None}))
1016 self.db.drop_matching_records(dict(completed={'$ne':None}))
1015 else:
1017 else:
1016 for msg_id in msg_ids:
1018 for msg_id in msg_ids:
1017 if msg_id in self.all_completed:
1019 if msg_id in self.all_completed:
1018 self.db.drop_record(msg_id)
1020 self.db.drop_record(msg_id)
1019 else:
1021 else:
1020 if msg_id in self.pending:
1022 if msg_id in self.pending:
1021 try:
1023 try:
1022 raise IndexError("msg pending: %r"%msg_id)
1024 raise IndexError("msg pending: %r"%msg_id)
1023 except:
1025 except:
1024 reply = error.wrap_exception()
1026 reply = error.wrap_exception()
1025 else:
1027 else:
1026 try:
1028 try:
1027 raise IndexError("No such msg: %r"%msg_id)
1029 raise IndexError("No such msg: %r"%msg_id)
1028 except:
1030 except:
1029 reply = error.wrap_exception()
1031 reply = error.wrap_exception()
1030 break
1032 break
1031 eids = content.get('engine_ids', [])
1033 eids = content.get('engine_ids', [])
1032 for eid in eids:
1034 for eid in eids:
1033 if eid not in self.engines:
1035 if eid not in self.engines:
1034 try:
1036 try:
1035 raise IndexError("No such engine: %i"%eid)
1037 raise IndexError("No such engine: %i"%eid)
1036 except:
1038 except:
1037 reply = error.wrap_exception()
1039 reply = error.wrap_exception()
1038 break
1040 break
1039 msg_ids = self.completed.pop(eid)
1041 msg_ids = self.completed.pop(eid)
1040 uid = self.engines[eid].queue
1042 uid = self.engines[eid].queue
1041 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1043 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1042
1044
1043 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1045 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1044
1046
1045 def resubmit_task(self, client_id, msg, buffers):
1047 def resubmit_task(self, client_id, msg, buffers):
1046 """Resubmit a task."""
1048 """Resubmit a task."""
1047 raise NotImplementedError
1049 raise NotImplementedError
1048
1050
1049 def get_results(self, client_id, msg):
1051 def get_results(self, client_id, msg):
1050 """Get the result of 1 or more messages."""
1052 """Get the result of 1 or more messages."""
1051 content = msg['content']
1053 content = msg['content']
1052 msg_ids = sorted(set(content['msg_ids']))
1054 msg_ids = sorted(set(content['msg_ids']))
1053 statusonly = content.get('status_only', False)
1055 statusonly = content.get('status_only', False)
1054 pending = []
1056 pending = []
1055 completed = []
1057 completed = []
1056 content = dict(status='ok')
1058 content = dict(status='ok')
1057 content['pending'] = pending
1059 content['pending'] = pending
1058 content['completed'] = completed
1060 content['completed'] = completed
1059 buffers = []
1061 buffers = []
1060 if not statusonly:
1062 if not statusonly:
1061 content['results'] = {}
1063 content['results'] = {}
1062 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1064 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1063 for msg_id in msg_ids:
1065 for msg_id in msg_ids:
1064 if msg_id in self.pending:
1066 if msg_id in self.pending:
1065 pending.append(msg_id)
1067 pending.append(msg_id)
1066 elif msg_id in self.all_completed:
1068 elif msg_id in self.all_completed:
1067 completed.append(msg_id)
1069 completed.append(msg_id)
1068 if not statusonly:
1070 if not statusonly:
1069 rec = records[msg_id]
1071 rec = records[msg_id]
1070 io_dict = {}
1072 io_dict = {}
1071 for key in 'pyin pyout pyerr stdout stderr'.split():
1073 for key in 'pyin pyout pyerr stdout stderr'.split():
1072 io_dict[key] = rec[key]
1074 io_dict[key] = rec[key]
1073 content[msg_id] = { 'result_content': rec['result_content'],
1075 content[msg_id] = { 'result_content': rec['result_content'],
1074 'header': rec['header'],
1076 'header': rec['header'],
1075 'result_header' : rec['result_header'],
1077 'result_header' : rec['result_header'],
1076 'io' : io_dict,
1078 'io' : io_dict,
1077 }
1079 }
1078 if rec['result_buffers']:
1080 if rec['result_buffers']:
1079 buffers.extend(map(str, rec['result_buffers']))
1081 buffers.extend(map(str, rec['result_buffers']))
1080 else:
1082 else:
1081 try:
1083 try:
1082 raise KeyError('No such message: '+msg_id)
1084 raise KeyError('No such message: '+msg_id)
1083 except:
1085 except:
1084 content = error.wrap_exception()
1086 content = error.wrap_exception()
1085 break
1087 break
1086 self.session.send(self.query, "result_reply", content=content,
1088 self.session.send(self.query, "result_reply", content=content,
1087 parent=msg, ident=client_id,
1089 parent=msg, ident=client_id,
1088 buffers=buffers)
1090 buffers=buffers)
1089
1091
@@ -1,423 +1,430
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Kernel adapted from kernel.py to use ZMQ Streams
3 Kernel adapted from kernel.py to use ZMQ Streams
4 """
4 """
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010-2011 The IPython Development Team
6 # Copyright (C) 2010-2011 The IPython Development Team
7 #
7 #
8 # Distributed under the terms of the BSD License. The full license is in
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING, distributed as part of this software.
9 # the file COPYING, distributed as part of this software.
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 # Standard library imports.
16 # Standard library imports.
17 from __future__ import print_function
17 from __future__ import print_function
18
18
19 import sys
19 import sys
20 import time
20 import time
21
21
22 from code import CommandCompiler
22 from code import CommandCompiler
23 from datetime import datetime
23 from datetime import datetime
24 from pprint import pprint
24 from pprint import pprint
25
25
26 # System library imports.
26 # System library imports.
27 import zmq
27 import zmq
28 from zmq.eventloop import ioloop, zmqstream
28 from zmq.eventloop import ioloop, zmqstream
29
29
30 # Local imports.
30 # Local imports.
31 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Str
31 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Str
32 from IPython.zmq.completer import KernelCompleter
32 from IPython.zmq.completer import KernelCompleter
33
33
34 from IPython.parallel.error import wrap_exception
34 from IPython.parallel.error import wrap_exception
35 from IPython.parallel.factory import SessionFactory
35 from IPython.parallel.factory import SessionFactory
36 from IPython.parallel.util import serialize_object, unpack_apply_message, ISO8601
36 from IPython.parallel.util import serialize_object, unpack_apply_message, ISO8601
37
37
38 def printer(*args):
38 def printer(*args):
39 pprint(args, stream=sys.__stdout__)
39 pprint(args, stream=sys.__stdout__)
40
40
41
41
42 class _Passer:
42 class _Passer(zmqstream.ZMQStream):
43 """Empty class that implements `send()` that does nothing."""
43 """Empty class that implements `send()` that does nothing.
44
45 Subclass ZMQStream for StreamSession typechecking
46
47 """
48 def __init__(self, *args, **kwargs):
49 pass
50
44 def send(self, *args, **kwargs):
51 def send(self, *args, **kwargs):
45 pass
52 pass
46 send_multipart = send
53 send_multipart = send
47
54
48
55
49 #-----------------------------------------------------------------------------
56 #-----------------------------------------------------------------------------
50 # Main kernel class
57 # Main kernel class
51 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
52
59
53 class Kernel(SessionFactory):
60 class Kernel(SessionFactory):
54
61
55 #---------------------------------------------------------------------------
62 #---------------------------------------------------------------------------
56 # Kernel interface
63 # Kernel interface
57 #---------------------------------------------------------------------------
64 #---------------------------------------------------------------------------
58
65
59 # kwargs:
66 # kwargs:
60 int_id = Int(-1, config=True)
67 int_id = Int(-1, config=True)
61 user_ns = Dict(config=True)
68 user_ns = Dict(config=True)
62 exec_lines = List(config=True)
69 exec_lines = List(config=True)
63
70
64 control_stream = Instance(zmqstream.ZMQStream)
71 control_stream = Instance(zmqstream.ZMQStream)
65 task_stream = Instance(zmqstream.ZMQStream)
72 task_stream = Instance(zmqstream.ZMQStream)
66 iopub_stream = Instance(zmqstream.ZMQStream)
73 iopub_stream = Instance(zmqstream.ZMQStream)
67 client = Instance('IPython.parallel.Client')
74 client = Instance('IPython.parallel.Client')
68
75
69 # internals
76 # internals
70 shell_streams = List()
77 shell_streams = List()
71 compiler = Instance(CommandCompiler, (), {})
78 compiler = Instance(CommandCompiler, (), {})
72 completer = Instance(KernelCompleter)
79 completer = Instance(KernelCompleter)
73
80
74 aborted = Set()
81 aborted = Set()
75 shell_handlers = Dict()
82 shell_handlers = Dict()
76 control_handlers = Dict()
83 control_handlers = Dict()
77
84
78 def _set_prefix(self):
85 def _set_prefix(self):
79 self.prefix = "engine.%s"%self.int_id
86 self.prefix = "engine.%s"%self.int_id
80
87
81 def _connect_completer(self):
88 def _connect_completer(self):
82 self.completer = KernelCompleter(self.user_ns)
89 self.completer = KernelCompleter(self.user_ns)
83
90
84 def __init__(self, **kwargs):
91 def __init__(self, **kwargs):
85 super(Kernel, self).__init__(**kwargs)
92 super(Kernel, self).__init__(**kwargs)
86 self._set_prefix()
93 self._set_prefix()
87 self._connect_completer()
94 self._connect_completer()
88
95
89 self.on_trait_change(self._set_prefix, 'id')
96 self.on_trait_change(self._set_prefix, 'id')
90 self.on_trait_change(self._connect_completer, 'user_ns')
97 self.on_trait_change(self._connect_completer, 'user_ns')
91
98
92 # Build dict of handlers for message types
99 # Build dict of handlers for message types
93 for msg_type in ['execute_request', 'complete_request', 'apply_request',
100 for msg_type in ['execute_request', 'complete_request', 'apply_request',
94 'clear_request']:
101 'clear_request']:
95 self.shell_handlers[msg_type] = getattr(self, msg_type)
102 self.shell_handlers[msg_type] = getattr(self, msg_type)
96
103
97 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
104 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
98 self.control_handlers[msg_type] = getattr(self, msg_type)
105 self.control_handlers[msg_type] = getattr(self, msg_type)
99
106
100 self._initial_exec_lines()
107 self._initial_exec_lines()
101
108
102 def _wrap_exception(self, method=None):
109 def _wrap_exception(self, method=None):
103 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
110 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
104 content=wrap_exception(e_info)
111 content=wrap_exception(e_info)
105 return content
112 return content
106
113
107 def _initial_exec_lines(self):
114 def _initial_exec_lines(self):
108 s = _Passer()
115 s = _Passer()
109 content = dict(silent=True, user_variable=[],user_expressions=[])
116 content = dict(silent=True, user_variable=[],user_expressions=[])
110 for line in self.exec_lines:
117 for line in self.exec_lines:
111 self.log.debug("executing initialization: %s"%line)
118 self.log.debug("executing initialization: %s"%line)
112 content.update({'code':line})
119 content.update({'code':line})
113 msg = self.session.msg('execute_request', content)
120 msg = self.session.msg('execute_request', content)
114 self.execute_request(s, [], msg)
121 self.execute_request(s, [], msg)
115
122
116
123
117 #-------------------- control handlers -----------------------------
124 #-------------------- control handlers -----------------------------
118 def abort_queues(self):
125 def abort_queues(self):
119 for stream in self.shell_streams:
126 for stream in self.shell_streams:
120 if stream:
127 if stream:
121 self.abort_queue(stream)
128 self.abort_queue(stream)
122
129
123 def abort_queue(self, stream):
130 def abort_queue(self, stream):
124 while True:
131 while True:
125 try:
132 try:
126 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
133 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
127 except zmq.ZMQError as e:
134 except zmq.ZMQError as e:
128 if e.errno == zmq.EAGAIN:
135 if e.errno == zmq.EAGAIN:
129 break
136 break
130 else:
137 else:
131 return
138 return
132 else:
139 else:
133 if msg is None:
140 if msg is None:
134 return
141 return
135 else:
142 else:
136 idents,msg = msg
143 idents,msg = msg
137
144
138 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
145 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
139 # msg = self.reply_socket.recv_json()
146 # msg = self.reply_socket.recv_json()
140 self.log.info("Aborting:")
147 self.log.info("Aborting:")
141 self.log.info(str(msg))
148 self.log.info(str(msg))
142 msg_type = msg['msg_type']
149 msg_type = msg['msg_type']
143 reply_type = msg_type.split('_')[0] + '_reply'
150 reply_type = msg_type.split('_')[0] + '_reply'
144 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
151 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
145 # self.reply_socket.send(ident,zmq.SNDMORE)
152 # self.reply_socket.send(ident,zmq.SNDMORE)
146 # self.reply_socket.send_json(reply_msg)
153 # self.reply_socket.send_json(reply_msg)
147 reply_msg = self.session.send(stream, reply_type,
154 reply_msg = self.session.send(stream, reply_type,
148 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
155 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
149 self.log.debug(str(reply_msg))
156 self.log.debug(str(reply_msg))
150 # We need to wait a bit for requests to come in. This can probably
157 # We need to wait a bit for requests to come in. This can probably
151 # be set shorter for true asynchronous clients.
158 # be set shorter for true asynchronous clients.
152 time.sleep(0.05)
159 time.sleep(0.05)
153
160
154 def abort_request(self, stream, ident, parent):
161 def abort_request(self, stream, ident, parent):
155 """abort a specifig msg by id"""
162 """abort a specifig msg by id"""
156 msg_ids = parent['content'].get('msg_ids', None)
163 msg_ids = parent['content'].get('msg_ids', None)
157 if isinstance(msg_ids, basestring):
164 if isinstance(msg_ids, basestring):
158 msg_ids = [msg_ids]
165 msg_ids = [msg_ids]
159 if not msg_ids:
166 if not msg_ids:
160 self.abort_queues()
167 self.abort_queues()
161 for mid in msg_ids:
168 for mid in msg_ids:
162 self.aborted.add(str(mid))
169 self.aborted.add(str(mid))
163
170
164 content = dict(status='ok')
171 content = dict(status='ok')
165 reply_msg = self.session.send(stream, 'abort_reply', content=content,
172 reply_msg = self.session.send(stream, 'abort_reply', content=content,
166 parent=parent, ident=ident)
173 parent=parent, ident=ident)
167 self.log.debug(str(reply_msg))
174 self.log.debug(str(reply_msg))
168
175
169 def shutdown_request(self, stream, ident, parent):
176 def shutdown_request(self, stream, ident, parent):
170 """kill ourself. This should really be handled in an external process"""
177 """kill ourself. This should really be handled in an external process"""
171 try:
178 try:
172 self.abort_queues()
179 self.abort_queues()
173 except:
180 except:
174 content = self._wrap_exception('shutdown')
181 content = self._wrap_exception('shutdown')
175 else:
182 else:
176 content = dict(parent['content'])
183 content = dict(parent['content'])
177 content['status'] = 'ok'
184 content['status'] = 'ok'
178 msg = self.session.send(stream, 'shutdown_reply',
185 msg = self.session.send(stream, 'shutdown_reply',
179 content=content, parent=parent, ident=ident)
186 content=content, parent=parent, ident=ident)
180 self.log.debug(str(msg))
187 self.log.debug(str(msg))
181 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
188 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
182 dc.start()
189 dc.start()
183
190
184 def dispatch_control(self, msg):
191 def dispatch_control(self, msg):
185 idents,msg = self.session.feed_identities(msg, copy=False)
192 idents,msg = self.session.feed_identities(msg, copy=False)
186 try:
193 try:
187 msg = self.session.unpack_message(msg, content=True, copy=False)
194 msg = self.session.unpack_message(msg, content=True, copy=False)
188 except:
195 except:
189 self.log.error("Invalid Message", exc_info=True)
196 self.log.error("Invalid Message", exc_info=True)
190 return
197 return
191
198
192 header = msg['header']
199 header = msg['header']
193 msg_id = header['msg_id']
200 msg_id = header['msg_id']
194
201
195 handler = self.control_handlers.get(msg['msg_type'], None)
202 handler = self.control_handlers.get(msg['msg_type'], None)
196 if handler is None:
203 if handler is None:
197 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
204 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
198 else:
205 else:
199 handler(self.control_stream, idents, msg)
206 handler(self.control_stream, idents, msg)
200
207
201
208
202 #-------------------- queue helpers ------------------------------
209 #-------------------- queue helpers ------------------------------
203
210
204 def check_dependencies(self, dependencies):
211 def check_dependencies(self, dependencies):
205 if not dependencies:
212 if not dependencies:
206 return True
213 return True
207 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
214 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
208 anyorall = dependencies[0]
215 anyorall = dependencies[0]
209 dependencies = dependencies[1]
216 dependencies = dependencies[1]
210 else:
217 else:
211 anyorall = 'all'
218 anyorall = 'all'
212 results = self.client.get_results(dependencies,status_only=True)
219 results = self.client.get_results(dependencies,status_only=True)
213 if results['status'] != 'ok':
220 if results['status'] != 'ok':
214 return False
221 return False
215
222
216 if anyorall == 'any':
223 if anyorall == 'any':
217 if not results['completed']:
224 if not results['completed']:
218 return False
225 return False
219 else:
226 else:
220 if results['pending']:
227 if results['pending']:
221 return False
228 return False
222
229
223 return True
230 return True
224
231
225 def check_aborted(self, msg_id):
232 def check_aborted(self, msg_id):
226 return msg_id in self.aborted
233 return msg_id in self.aborted
227
234
228 #-------------------- queue handlers -----------------------------
235 #-------------------- queue handlers -----------------------------
229
236
230 def clear_request(self, stream, idents, parent):
237 def clear_request(self, stream, idents, parent):
231 """Clear our namespace."""
238 """Clear our namespace."""
232 self.user_ns = {}
239 self.user_ns = {}
233 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
240 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
234 content = dict(status='ok'))
241 content = dict(status='ok'))
235 self._initial_exec_lines()
242 self._initial_exec_lines()
236
243
237 def execute_request(self, stream, ident, parent):
244 def execute_request(self, stream, ident, parent):
238 self.log.debug('execute request %s'%parent)
245 self.log.debug('execute request %s'%parent)
239 try:
246 try:
240 code = parent[u'content'][u'code']
247 code = parent[u'content'][u'code']
241 except:
248 except:
242 self.log.error("Got bad msg: %s"%parent, exc_info=True)
249 self.log.error("Got bad msg: %s"%parent, exc_info=True)
243 return
250 return
244 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
251 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
245 ident='%s.pyin'%self.prefix)
252 ident='%s.pyin'%self.prefix)
246 started = datetime.now().strftime(ISO8601)
253 started = datetime.now().strftime(ISO8601)
247 try:
254 try:
248 comp_code = self.compiler(code, '<zmq-kernel>')
255 comp_code = self.compiler(code, '<zmq-kernel>')
249 # allow for not overriding displayhook
256 # allow for not overriding displayhook
250 if hasattr(sys.displayhook, 'set_parent'):
257 if hasattr(sys.displayhook, 'set_parent'):
251 sys.displayhook.set_parent(parent)
258 sys.displayhook.set_parent(parent)
252 sys.stdout.set_parent(parent)
259 sys.stdout.set_parent(parent)
253 sys.stderr.set_parent(parent)
260 sys.stderr.set_parent(parent)
254 exec comp_code in self.user_ns, self.user_ns
261 exec comp_code in self.user_ns, self.user_ns
255 except:
262 except:
256 exc_content = self._wrap_exception('execute')
263 exc_content = self._wrap_exception('execute')
257 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
264 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
258 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
265 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
259 ident='%s.pyerr'%self.prefix)
266 ident='%s.pyerr'%self.prefix)
260 reply_content = exc_content
267 reply_content = exc_content
261 else:
268 else:
262 reply_content = {'status' : 'ok'}
269 reply_content = {'status' : 'ok'}
263
270
264 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
271 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
265 ident=ident, subheader = dict(started=started))
272 ident=ident, subheader = dict(started=started))
266 self.log.debug(str(reply_msg))
273 self.log.debug(str(reply_msg))
267 if reply_msg['content']['status'] == u'error':
274 if reply_msg['content']['status'] == u'error':
268 self.abort_queues()
275 self.abort_queues()
269
276
270 def complete_request(self, stream, ident, parent):
277 def complete_request(self, stream, ident, parent):
271 matches = {'matches' : self.complete(parent),
278 matches = {'matches' : self.complete(parent),
272 'status' : 'ok'}
279 'status' : 'ok'}
273 completion_msg = self.session.send(stream, 'complete_reply',
280 completion_msg = self.session.send(stream, 'complete_reply',
274 matches, parent, ident)
281 matches, parent, ident)
275 # print >> sys.__stdout__, completion_msg
282 # print >> sys.__stdout__, completion_msg
276
283
277 def complete(self, msg):
284 def complete(self, msg):
278 return self.completer.complete(msg.content.line, msg.content.text)
285 return self.completer.complete(msg.content.line, msg.content.text)
279
286
280 def apply_request(self, stream, ident, parent):
287 def apply_request(self, stream, ident, parent):
281 # flush previous reply, so this request won't block it
288 # flush previous reply, so this request won't block it
282 stream.flush(zmq.POLLOUT)
289 stream.flush(zmq.POLLOUT)
283
290
284 try:
291 try:
285 content = parent[u'content']
292 content = parent[u'content']
286 bufs = parent[u'buffers']
293 bufs = parent[u'buffers']
287 msg_id = parent['header']['msg_id']
294 msg_id = parent['header']['msg_id']
288 # bound = parent['header'].get('bound', False)
295 # bound = parent['header'].get('bound', False)
289 except:
296 except:
290 self.log.error("Got bad msg: %s"%parent, exc_info=True)
297 self.log.error("Got bad msg: %s"%parent, exc_info=True)
291 return
298 return
292 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
299 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
293 # self.iopub_stream.send(pyin_msg)
300 # self.iopub_stream.send(pyin_msg)
294 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
301 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
295 sub = {'dependencies_met' : True, 'engine' : self.ident,
302 sub = {'dependencies_met' : True, 'engine' : self.ident,
296 'started': datetime.now().strftime(ISO8601)}
303 'started': datetime.now().strftime(ISO8601)}
297 try:
304 try:
298 # allow for not overriding displayhook
305 # allow for not overriding displayhook
299 if hasattr(sys.displayhook, 'set_parent'):
306 if hasattr(sys.displayhook, 'set_parent'):
300 sys.displayhook.set_parent(parent)
307 sys.displayhook.set_parent(parent)
301 sys.stdout.set_parent(parent)
308 sys.stdout.set_parent(parent)
302 sys.stderr.set_parent(parent)
309 sys.stderr.set_parent(parent)
303 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
310 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
304 working = self.user_ns
311 working = self.user_ns
305 # suffix =
312 # suffix =
306 prefix = "_"+str(msg_id).replace("-","")+"_"
313 prefix = "_"+str(msg_id).replace("-","")+"_"
307
314
308 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
315 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
309 # if bound:
316 # if bound:
310 # bound_ns = Namespace(working)
317 # bound_ns = Namespace(working)
311 # args = [bound_ns]+list(args)
318 # args = [bound_ns]+list(args)
312
319
313 fname = getattr(f, '__name__', 'f')
320 fname = getattr(f, '__name__', 'f')
314
321
315 fname = prefix+"f"
322 fname = prefix+"f"
316 argname = prefix+"args"
323 argname = prefix+"args"
317 kwargname = prefix+"kwargs"
324 kwargname = prefix+"kwargs"
318 resultname = prefix+"result"
325 resultname = prefix+"result"
319
326
320 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
327 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
321 # print ns
328 # print ns
322 working.update(ns)
329 working.update(ns)
323 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
330 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
324 try:
331 try:
325 exec code in working,working
332 exec code in working,working
326 result = working.get(resultname)
333 result = working.get(resultname)
327 finally:
334 finally:
328 for key in ns.iterkeys():
335 for key in ns.iterkeys():
329 working.pop(key)
336 working.pop(key)
330 # if bound:
337 # if bound:
331 # working.update(bound_ns)
338 # working.update(bound_ns)
332
339
333 packed_result,buf = serialize_object(result)
340 packed_result,buf = serialize_object(result)
334 result_buf = [packed_result]+buf
341 result_buf = [packed_result]+buf
335 except:
342 except:
336 exc_content = self._wrap_exception('apply')
343 exc_content = self._wrap_exception('apply')
337 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
344 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
338 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
345 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
339 ident='%s.pyerr'%self.prefix)
346 ident='%s.pyerr'%self.prefix)
340 reply_content = exc_content
347 reply_content = exc_content
341 result_buf = []
348 result_buf = []
342
349
343 if exc_content['ename'] == 'UnmetDependency':
350 if exc_content['ename'] == 'UnmetDependency':
344 sub['dependencies_met'] = False
351 sub['dependencies_met'] = False
345 else:
352 else:
346 reply_content = {'status' : 'ok'}
353 reply_content = {'status' : 'ok'}
347
354
348 # put 'ok'/'error' status in header, for scheduler introspection:
355 # put 'ok'/'error' status in header, for scheduler introspection:
349 sub['status'] = reply_content['status']
356 sub['status'] = reply_content['status']
350
357
351 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
358 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
352 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
359 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
353
360
354 # flush i/o
361 # flush i/o
355 # should this be before reply_msg is sent, like in the single-kernel code,
362 # should this be before reply_msg is sent, like in the single-kernel code,
356 # or should nothing get in the way of real results?
363 # or should nothing get in the way of real results?
357 sys.stdout.flush()
364 sys.stdout.flush()
358 sys.stderr.flush()
365 sys.stderr.flush()
359
366
360 def dispatch_queue(self, stream, msg):
367 def dispatch_queue(self, stream, msg):
361 self.control_stream.flush()
368 self.control_stream.flush()
362 idents,msg = self.session.feed_identities(msg, copy=False)
369 idents,msg = self.session.feed_identities(msg, copy=False)
363 try:
370 try:
364 msg = self.session.unpack_message(msg, content=True, copy=False)
371 msg = self.session.unpack_message(msg, content=True, copy=False)
365 except:
372 except:
366 self.log.error("Invalid Message", exc_info=True)
373 self.log.error("Invalid Message", exc_info=True)
367 return
374 return
368
375
369
376
370 header = msg['header']
377 header = msg['header']
371 msg_id = header['msg_id']
378 msg_id = header['msg_id']
372 if self.check_aborted(msg_id):
379 if self.check_aborted(msg_id):
373 self.aborted.remove(msg_id)
380 self.aborted.remove(msg_id)
374 # is it safe to assume a msg_id will not be resubmitted?
381 # is it safe to assume a msg_id will not be resubmitted?
375 reply_type = msg['msg_type'].split('_')[0] + '_reply'
382 reply_type = msg['msg_type'].split('_')[0] + '_reply'
376 reply_msg = self.session.send(stream, reply_type,
383 reply_msg = self.session.send(stream, reply_type,
377 content={'status' : 'aborted'}, parent=msg, ident=idents)
384 content={'status' : 'aborted'}, parent=msg, ident=idents)
378 return
385 return
379 handler = self.shell_handlers.get(msg['msg_type'], None)
386 handler = self.shell_handlers.get(msg['msg_type'], None)
380 if handler is None:
387 if handler is None:
381 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
388 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
382 else:
389 else:
383 handler(stream, idents, msg)
390 handler(stream, idents, msg)
384
391
385 def start(self):
392 def start(self):
386 #### stream mode:
393 #### stream mode:
387 if self.control_stream:
394 if self.control_stream:
388 self.control_stream.on_recv(self.dispatch_control, copy=False)
395 self.control_stream.on_recv(self.dispatch_control, copy=False)
389 self.control_stream.on_err(printer)
396 self.control_stream.on_err(printer)
390
397
391 def make_dispatcher(stream):
398 def make_dispatcher(stream):
392 def dispatcher(msg):
399 def dispatcher(msg):
393 return self.dispatch_queue(stream, msg)
400 return self.dispatch_queue(stream, msg)
394 return dispatcher
401 return dispatcher
395
402
396 for s in self.shell_streams:
403 for s in self.shell_streams:
397 s.on_recv(make_dispatcher(s), copy=False)
404 s.on_recv(make_dispatcher(s), copy=False)
398 s.on_err(printer)
405 s.on_err(printer)
399
406
400 if self.iopub_stream:
407 if self.iopub_stream:
401 self.iopub_stream.on_err(printer)
408 self.iopub_stream.on_err(printer)
402
409
403 #### while True mode:
410 #### while True mode:
404 # while True:
411 # while True:
405 # idle = True
412 # idle = True
406 # try:
413 # try:
407 # msg = self.shell_stream.socket.recv_multipart(
414 # msg = self.shell_stream.socket.recv_multipart(
408 # zmq.NOBLOCK, copy=False)
415 # zmq.NOBLOCK, copy=False)
409 # except zmq.ZMQError, e:
416 # except zmq.ZMQError, e:
410 # if e.errno != zmq.EAGAIN:
417 # if e.errno != zmq.EAGAIN:
411 # raise e
418 # raise e
412 # else:
419 # else:
413 # idle=False
420 # idle=False
414 # self.dispatch_queue(self.shell_stream, msg)
421 # self.dispatch_queue(self.shell_stream, msg)
415 #
422 #
416 # if not self.task_stream.empty():
423 # if not self.task_stream.empty():
417 # idle=False
424 # idle=False
418 # msg = self.task_stream.recv_multipart()
425 # msg = self.task_stream.recv_multipart()
419 # self.dispatch_queue(self.task_stream, msg)
426 # self.dispatch_queue(self.task_stream, msg)
420 # if idle:
427 # if idle:
421 # # don't busywait
428 # # don't busywait
422 # time.sleep(1e-3)
429 # time.sleep(1e-3)
423
430
General Comments 0
You need to be logged in to leave comments. Login now