##// END OF EJS Templates
move apply serialization into zmq.serialize
MinRK -
Show More
@@ -0,0 +1,179 b''
1 """serialization utilities for apply messages
2
3 Authors:
4
5 * Min RK
6 """
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 # Standard library imports
19 import logging
20 import os
21 import re
22 import socket
23 import sys
24
25 try:
26 import cPickle
27 pickle = cPickle
28 except:
29 cPickle = None
30 import pickle
31
32
33 # IPython imports
34 from IPython.utils import py3compat
35 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
36 from IPython.utils.newserialized import serialize, unserialize
37
38 if py3compat.PY3:
39 buffer = memoryview
40
41 #-----------------------------------------------------------------------------
42 # Serialization Functions
43 #-----------------------------------------------------------------------------
44
45 def serialize_object(obj, threshold=64e-6):
46 """Serialize an object into a list of sendable buffers.
47
48 Parameters
49 ----------
50
51 obj : object
52 The object to be serialized
53 threshold : float
54 The threshold for not double-pickling the content.
55
56
57 Returns
58 -------
59 ('pmd', [bufs]) :
60 where pmd is the pickled metadata wrapper,
61 bufs is a list of data buffers
62 """
63 databuffers = []
64 if isinstance(obj, (list, tuple)):
65 clist = canSequence(obj)
66 slist = map(serialize, clist)
67 for s in slist:
68 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
69 databuffers.append(s.getData())
70 s.data = None
71 return pickle.dumps(slist,-1), databuffers
72 elif isinstance(obj, dict):
73 sobj = {}
74 for k in sorted(obj.iterkeys()):
75 s = serialize(can(obj[k]))
76 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
77 databuffers.append(s.getData())
78 s.data = None
79 sobj[k] = s
80 return pickle.dumps(sobj,-1),databuffers
81 else:
82 s = serialize(can(obj))
83 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
84 databuffers.append(s.getData())
85 s.data = None
86 return pickle.dumps(s,-1),databuffers
87
88
89 def unserialize_object(bufs):
90 """reconstruct an object serialized by serialize_object from data buffers."""
91 bufs = list(bufs)
92 sobj = pickle.loads(bufs.pop(0))
93 if isinstance(sobj, (list, tuple)):
94 for s in sobj:
95 if s.data is None:
96 s.data = bufs.pop(0)
97 return uncanSequence(map(unserialize, sobj)), bufs
98 elif isinstance(sobj, dict):
99 newobj = {}
100 for k in sorted(sobj.iterkeys()):
101 s = sobj[k]
102 if s.data is None:
103 s.data = bufs.pop(0)
104 newobj[k] = uncan(unserialize(s))
105 return newobj, bufs
106 else:
107 if sobj.data is None:
108 sobj.data = bufs.pop(0)
109 return uncan(unserialize(sobj)), bufs
110
111 def pack_apply_message(f, args, kwargs, threshold=64e-6):
112 """pack up a function, args, and kwargs to be sent over the wire
113 as a series of buffers. Any object whose data is larger than `threshold`
114 will not have their data copied (currently only numpy arrays support zero-copy)"""
115 msg = [pickle.dumps(can(f),-1)]
116 databuffers = [] # for large objects
117 sargs, bufs = serialize_object(args,threshold)
118 msg.append(sargs)
119 databuffers.extend(bufs)
120 skwargs, bufs = serialize_object(kwargs,threshold)
121 msg.append(skwargs)
122 databuffers.extend(bufs)
123 msg.extend(databuffers)
124 return msg
125
126 def unpack_apply_message(bufs, g=None, copy=True):
127 """unpack f,args,kwargs from buffers packed by pack_apply_message()
128 Returns: original f,args,kwargs"""
129 bufs = list(bufs) # allow us to pop
130 assert len(bufs) >= 3, "not enough buffers!"
131 if not copy:
132 for i in range(3):
133 bufs[i] = bufs[i].bytes
134 cf = pickle.loads(bufs.pop(0))
135 sargs = list(pickle.loads(bufs.pop(0)))
136 skwargs = dict(pickle.loads(bufs.pop(0)))
137 # print sargs, skwargs
138 f = uncan(cf, g)
139 for sa in sargs:
140 if sa.data is None:
141 m = bufs.pop(0)
142 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
143 # always use a buffer, until memoryviews get sorted out
144 sa.data = buffer(m)
145 # disable memoryview support
146 # if copy:
147 # sa.data = buffer(m)
148 # else:
149 # sa.data = m.buffer
150 else:
151 if copy:
152 sa.data = m
153 else:
154 sa.data = m.bytes
155
156 args = uncanSequence(map(unserialize, sargs), g)
157 kwargs = {}
158 for k in sorted(skwargs.iterkeys()):
159 sa = skwargs[k]
160 if sa.data is None:
161 m = bufs.pop(0)
162 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
163 # always use a buffer, until memoryviews get sorted out
164 sa.data = buffer(m)
165 # disable memoryview support
166 # if copy:
167 # sa.data = buffer(m)
168 # else:
169 # sa.data = m.buffer
170 else:
171 if copy:
172 sa.data = m
173 else:
174 sa.data = m.bytes
175
176 kwargs[k] = uncan(unserialize(sa), g)
177
178 return f,args,kwargs
179
@@ -47,6 +47,9 b' from IPython.utils import py3compat'
47 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
47 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
48 from IPython.utils.newserialized import serialize, unserialize
48 from IPython.utils.newserialized import serialize, unserialize
49 from IPython.zmq.log import EnginePUBHandler
49 from IPython.zmq.log import EnginePUBHandler
50 from IPython.zmq.serialize import (
51 unserialize_object, serialize_object, pack_apply_message, unpack_apply_message
52 )
50
53
51 if py3compat.PY3:
54 if py3compat.PY3:
52 buffer = memoryview
55 buffer = memoryview
@@ -222,140 +225,6 b' def disambiguate_url(url, location=None):'
222
225
223 return "%s://%s:%s"%(proto,ip,port)
226 return "%s://%s:%s"%(proto,ip,port)
224
227
225 def serialize_object(obj, threshold=64e-6):
226 """Serialize an object into a list of sendable buffers.
227
228 Parameters
229 ----------
230
231 obj : object
232 The object to be serialized
233 threshold : float
234 The threshold for not double-pickling the content.
235
236
237 Returns
238 -------
239 ('pmd', [bufs]) :
240 where pmd is the pickled metadata wrapper,
241 bufs is a list of data buffers
242 """
243 databuffers = []
244 if isinstance(obj, (list, tuple)):
245 clist = canSequence(obj)
246 slist = map(serialize, clist)
247 for s in slist:
248 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
249 databuffers.append(s.getData())
250 s.data = None
251 return pickle.dumps(slist,-1), databuffers
252 elif isinstance(obj, dict):
253 sobj = {}
254 for k in sorted(obj.iterkeys()):
255 s = serialize(can(obj[k]))
256 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
257 databuffers.append(s.getData())
258 s.data = None
259 sobj[k] = s
260 return pickle.dumps(sobj,-1),databuffers
261 else:
262 s = serialize(can(obj))
263 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
264 databuffers.append(s.getData())
265 s.data = None
266 return pickle.dumps(s,-1),databuffers
267
268
269 def unserialize_object(bufs):
270 """reconstruct an object serialized by serialize_object from data buffers."""
271 bufs = list(bufs)
272 sobj = pickle.loads(bufs.pop(0))
273 if isinstance(sobj, (list, tuple)):
274 for s in sobj:
275 if s.data is None:
276 s.data = bufs.pop(0)
277 return uncanSequence(map(unserialize, sobj)), bufs
278 elif isinstance(sobj, dict):
279 newobj = {}
280 for k in sorted(sobj.iterkeys()):
281 s = sobj[k]
282 if s.data is None:
283 s.data = bufs.pop(0)
284 newobj[k] = uncan(unserialize(s))
285 return newobj, bufs
286 else:
287 if sobj.data is None:
288 sobj.data = bufs.pop(0)
289 return uncan(unserialize(sobj)), bufs
290
291 def pack_apply_message(f, args, kwargs, threshold=64e-6):
292 """pack up a function, args, and kwargs to be sent over the wire
293 as a series of buffers. Any object whose data is larger than `threshold`
294 will not have their data copied (currently only numpy arrays support zero-copy)"""
295 msg = [pickle.dumps(can(f),-1)]
296 databuffers = [] # for large objects
297 sargs, bufs = serialize_object(args,threshold)
298 msg.append(sargs)
299 databuffers.extend(bufs)
300 skwargs, bufs = serialize_object(kwargs,threshold)
301 msg.append(skwargs)
302 databuffers.extend(bufs)
303 msg.extend(databuffers)
304 return msg
305
306 def unpack_apply_message(bufs, g=None, copy=True):
307 """unpack f,args,kwargs from buffers packed by pack_apply_message()
308 Returns: original f,args,kwargs"""
309 bufs = list(bufs) # allow us to pop
310 assert len(bufs) >= 3, "not enough buffers!"
311 if not copy:
312 for i in range(3):
313 bufs[i] = bufs[i].bytes
314 cf = pickle.loads(bufs.pop(0))
315 sargs = list(pickle.loads(bufs.pop(0)))
316 skwargs = dict(pickle.loads(bufs.pop(0)))
317 # print sargs, skwargs
318 f = uncan(cf, g)
319 for sa in sargs:
320 if sa.data is None:
321 m = bufs.pop(0)
322 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
323 # always use a buffer, until memoryviews get sorted out
324 sa.data = buffer(m)
325 # disable memoryview support
326 # if copy:
327 # sa.data = buffer(m)
328 # else:
329 # sa.data = m.buffer
330 else:
331 if copy:
332 sa.data = m
333 else:
334 sa.data = m.bytes
335
336 args = uncanSequence(map(unserialize, sargs), g)
337 kwargs = {}
338 for k in sorted(skwargs.iterkeys()):
339 sa = skwargs[k]
340 if sa.data is None:
341 m = bufs.pop(0)
342 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
343 # always use a buffer, until memoryviews get sorted out
344 sa.data = buffer(m)
345 # disable memoryview support
346 # if copy:
347 # sa.data = buffer(m)
348 # else:
349 # sa.data = m.buffer
350 else:
351 if copy:
352 sa.data = m
353 else:
354 sa.data = m.bytes
355
356 kwargs[k] = uncan(unserialize(sa), g)
357
358 return f,args,kwargs
359
228
360 #--------------------------------------------------------------------------
229 #--------------------------------------------------------------------------
361 # helpers for implementing old MEC API via view.apply
230 # helpers for implementing old MEC API via view.apply
@@ -22,6 +22,9 b' import sys'
22 import time
22 import time
23 import traceback
23 import traceback
24 import logging
24 import logging
25 import uuid
26
27 from datetime import datetime
25 from signal import (
28 from signal import (
26 signal, default_int_handler, SIGINT, SIG_IGN
29 signal, default_int_handler, SIGINT, SIG_IGN
27 )
30 )
@@ -47,6 +50,7 b' from IPython.utils.traitlets import ('
47
50
48 from entry_point import base_launch_kernel
51 from entry_point import base_launch_kernel
49 from kernelapp import KernelApp, kernel_flags, kernel_aliases
52 from kernelapp import KernelApp, kernel_flags, kernel_aliases
53 from serialize import serialize_object, unpack_apply_message
50 from session import Session, Message
54 from session import Session, Message
51 from zmqshell import ZMQInteractiveShell
55 from zmqshell import ZMQInteractiveShell
52
56
@@ -540,7 +544,7 b' class Kernel(Configurable):'
540 exc_content = self._wrap_exception('apply')
544 exc_content = self._wrap_exception('apply')
541 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
545 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
542 self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent,
546 self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent,
543 ident=asbytes('%s.pyerr'%self.prefix))
547 ident=py3compat.str_to_bytes('%s.pyerr'%self.prefix))
544 reply_content = exc_content
548 reply_content = exc_content
545 result_buf = []
549 result_buf = []
546
550
General Comments 0
You need to be logged in to leave comments. Login now