##// END OF EJS Templates
move apply serialization into zmq.serialize
MinRK -
Show More
@@ -0,0 +1,179 b''
1 """serialization utilities for apply messages
2
3 Authors:
4
5 * Min RK
6 """
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 # Standard library imports
19 import logging
20 import os
21 import re
22 import socket
23 import sys
24
25 try:
26 import cPickle
27 pickle = cPickle
28 except:
29 cPickle = None
30 import pickle
31
32
33 # IPython imports
34 from IPython.utils import py3compat
35 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
36 from IPython.utils.newserialized import serialize, unserialize
37
38 if py3compat.PY3:
39 buffer = memoryview
40
41 #-----------------------------------------------------------------------------
42 # Serialization Functions
43 #-----------------------------------------------------------------------------
44
45 def serialize_object(obj, threshold=64e-6):
46 """Serialize an object into a list of sendable buffers.
47
48 Parameters
49 ----------
50
51 obj : object
52 The object to be serialized
53 threshold : float
54 The threshold for not double-pickling the content.
55
56
57 Returns
58 -------
59 ('pmd', [bufs]) :
60 where pmd is the pickled metadata wrapper,
61 bufs is a list of data buffers
62 """
63 databuffers = []
64 if isinstance(obj, (list, tuple)):
65 clist = canSequence(obj)
66 slist = map(serialize, clist)
67 for s in slist:
68 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
69 databuffers.append(s.getData())
70 s.data = None
71 return pickle.dumps(slist,-1), databuffers
72 elif isinstance(obj, dict):
73 sobj = {}
74 for k in sorted(obj.iterkeys()):
75 s = serialize(can(obj[k]))
76 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
77 databuffers.append(s.getData())
78 s.data = None
79 sobj[k] = s
80 return pickle.dumps(sobj,-1),databuffers
81 else:
82 s = serialize(can(obj))
83 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
84 databuffers.append(s.getData())
85 s.data = None
86 return pickle.dumps(s,-1),databuffers
87
88
89 def unserialize_object(bufs):
90 """reconstruct an object serialized by serialize_object from data buffers."""
91 bufs = list(bufs)
92 sobj = pickle.loads(bufs.pop(0))
93 if isinstance(sobj, (list, tuple)):
94 for s in sobj:
95 if s.data is None:
96 s.data = bufs.pop(0)
97 return uncanSequence(map(unserialize, sobj)), bufs
98 elif isinstance(sobj, dict):
99 newobj = {}
100 for k in sorted(sobj.iterkeys()):
101 s = sobj[k]
102 if s.data is None:
103 s.data = bufs.pop(0)
104 newobj[k] = uncan(unserialize(s))
105 return newobj, bufs
106 else:
107 if sobj.data is None:
108 sobj.data = bufs.pop(0)
109 return uncan(unserialize(sobj)), bufs
110
111 def pack_apply_message(f, args, kwargs, threshold=64e-6):
112 """pack up a function, args, and kwargs to be sent over the wire
113 as a series of buffers. Any object whose data is larger than `threshold`
114 will not have their data copied (currently only numpy arrays support zero-copy)"""
115 msg = [pickle.dumps(can(f),-1)]
116 databuffers = [] # for large objects
117 sargs, bufs = serialize_object(args,threshold)
118 msg.append(sargs)
119 databuffers.extend(bufs)
120 skwargs, bufs = serialize_object(kwargs,threshold)
121 msg.append(skwargs)
122 databuffers.extend(bufs)
123 msg.extend(databuffers)
124 return msg
125
126 def unpack_apply_message(bufs, g=None, copy=True):
127 """unpack f,args,kwargs from buffers packed by pack_apply_message()
128 Returns: original f,args,kwargs"""
129 bufs = list(bufs) # allow us to pop
130 assert len(bufs) >= 3, "not enough buffers!"
131 if not copy:
132 for i in range(3):
133 bufs[i] = bufs[i].bytes
134 cf = pickle.loads(bufs.pop(0))
135 sargs = list(pickle.loads(bufs.pop(0)))
136 skwargs = dict(pickle.loads(bufs.pop(0)))
137 # print sargs, skwargs
138 f = uncan(cf, g)
139 for sa in sargs:
140 if sa.data is None:
141 m = bufs.pop(0)
142 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
143 # always use a buffer, until memoryviews get sorted out
144 sa.data = buffer(m)
145 # disable memoryview support
146 # if copy:
147 # sa.data = buffer(m)
148 # else:
149 # sa.data = m.buffer
150 else:
151 if copy:
152 sa.data = m
153 else:
154 sa.data = m.bytes
155
156 args = uncanSequence(map(unserialize, sargs), g)
157 kwargs = {}
158 for k in sorted(skwargs.iterkeys()):
159 sa = skwargs[k]
160 if sa.data is None:
161 m = bufs.pop(0)
162 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
163 # always use a buffer, until memoryviews get sorted out
164 sa.data = buffer(m)
165 # disable memoryview support
166 # if copy:
167 # sa.data = buffer(m)
168 # else:
169 # sa.data = m.buffer
170 else:
171 if copy:
172 sa.data = m
173 else:
174 sa.data = m.bytes
175
176 kwargs[k] = uncan(unserialize(sa), g)
177
178 return f,args,kwargs
179
@@ -47,6 +47,9 b' from IPython.utils import py3compat'
47 47 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
48 48 from IPython.utils.newserialized import serialize, unserialize
49 49 from IPython.zmq.log import EnginePUBHandler
50 from IPython.zmq.serialize import (
51 unserialize_object, serialize_object, pack_apply_message, unpack_apply_message
52 )
50 53
51 54 if py3compat.PY3:
52 55 buffer = memoryview
@@ -222,140 +225,6 b' def disambiguate_url(url, location=None):'
222 225
223 226 return "%s://%s:%s"%(proto,ip,port)
224 227
225 def serialize_object(obj, threshold=64e-6):
226 """Serialize an object into a list of sendable buffers.
227
228 Parameters
229 ----------
230
231 obj : object
232 The object to be serialized
233 threshold : float
234 The threshold for not double-pickling the content.
235
236
237 Returns
238 -------
239 ('pmd', [bufs]) :
240 where pmd is the pickled metadata wrapper,
241 bufs is a list of data buffers
242 """
243 databuffers = []
244 if isinstance(obj, (list, tuple)):
245 clist = canSequence(obj)
246 slist = map(serialize, clist)
247 for s in slist:
248 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
249 databuffers.append(s.getData())
250 s.data = None
251 return pickle.dumps(slist,-1), databuffers
252 elif isinstance(obj, dict):
253 sobj = {}
254 for k in sorted(obj.iterkeys()):
255 s = serialize(can(obj[k]))
256 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
257 databuffers.append(s.getData())
258 s.data = None
259 sobj[k] = s
260 return pickle.dumps(sobj,-1),databuffers
261 else:
262 s = serialize(can(obj))
263 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
264 databuffers.append(s.getData())
265 s.data = None
266 return pickle.dumps(s,-1),databuffers
267
268
269 def unserialize_object(bufs):
270 """reconstruct an object serialized by serialize_object from data buffers."""
271 bufs = list(bufs)
272 sobj = pickle.loads(bufs.pop(0))
273 if isinstance(sobj, (list, tuple)):
274 for s in sobj:
275 if s.data is None:
276 s.data = bufs.pop(0)
277 return uncanSequence(map(unserialize, sobj)), bufs
278 elif isinstance(sobj, dict):
279 newobj = {}
280 for k in sorted(sobj.iterkeys()):
281 s = sobj[k]
282 if s.data is None:
283 s.data = bufs.pop(0)
284 newobj[k] = uncan(unserialize(s))
285 return newobj, bufs
286 else:
287 if sobj.data is None:
288 sobj.data = bufs.pop(0)
289 return uncan(unserialize(sobj)), bufs
290
291 def pack_apply_message(f, args, kwargs, threshold=64e-6):
292 """pack up a function, args, and kwargs to be sent over the wire
293 as a series of buffers. Any object whose data is larger than `threshold`
294 will not have their data copied (currently only numpy arrays support zero-copy)"""
295 msg = [pickle.dumps(can(f),-1)]
296 databuffers = [] # for large objects
297 sargs, bufs = serialize_object(args,threshold)
298 msg.append(sargs)
299 databuffers.extend(bufs)
300 skwargs, bufs = serialize_object(kwargs,threshold)
301 msg.append(skwargs)
302 databuffers.extend(bufs)
303 msg.extend(databuffers)
304 return msg
305
306 def unpack_apply_message(bufs, g=None, copy=True):
307 """unpack f,args,kwargs from buffers packed by pack_apply_message()
308 Returns: original f,args,kwargs"""
309 bufs = list(bufs) # allow us to pop
310 assert len(bufs) >= 3, "not enough buffers!"
311 if not copy:
312 for i in range(3):
313 bufs[i] = bufs[i].bytes
314 cf = pickle.loads(bufs.pop(0))
315 sargs = list(pickle.loads(bufs.pop(0)))
316 skwargs = dict(pickle.loads(bufs.pop(0)))
317 # print sargs, skwargs
318 f = uncan(cf, g)
319 for sa in sargs:
320 if sa.data is None:
321 m = bufs.pop(0)
322 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
323 # always use a buffer, until memoryviews get sorted out
324 sa.data = buffer(m)
325 # disable memoryview support
326 # if copy:
327 # sa.data = buffer(m)
328 # else:
329 # sa.data = m.buffer
330 else:
331 if copy:
332 sa.data = m
333 else:
334 sa.data = m.bytes
335
336 args = uncanSequence(map(unserialize, sargs), g)
337 kwargs = {}
338 for k in sorted(skwargs.iterkeys()):
339 sa = skwargs[k]
340 if sa.data is None:
341 m = bufs.pop(0)
342 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
343 # always use a buffer, until memoryviews get sorted out
344 sa.data = buffer(m)
345 # disable memoryview support
346 # if copy:
347 # sa.data = buffer(m)
348 # else:
349 # sa.data = m.buffer
350 else:
351 if copy:
352 sa.data = m
353 else:
354 sa.data = m.bytes
355
356 kwargs[k] = uncan(unserialize(sa), g)
357
358 return f,args,kwargs
359 228
360 229 #--------------------------------------------------------------------------
361 230 # helpers for implementing old MEC API via view.apply
@@ -22,6 +22,9 b' import sys'
22 22 import time
23 23 import traceback
24 24 import logging
25 import uuid
26
27 from datetime import datetime
25 28 from signal import (
26 29 signal, default_int_handler, SIGINT, SIG_IGN
27 30 )
@@ -47,6 +50,7 b' from IPython.utils.traitlets import ('
47 50
48 51 from entry_point import base_launch_kernel
49 52 from kernelapp import KernelApp, kernel_flags, kernel_aliases
53 from serialize import serialize_object, unpack_apply_message
50 54 from session import Session, Message
51 55 from zmqshell import ZMQInteractiveShell
52 56
@@ -540,7 +544,7 b' class Kernel(Configurable):'
540 544 exc_content = self._wrap_exception('apply')
541 545 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
542 546 self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent,
543 ident=asbytes('%s.pyerr'%self.prefix))
547 ident=py3compat.str_to_bytes('%s.pyerr'%self.prefix))
544 548 reply_content = exc_content
545 549 result_buf = []
546 550
General Comments 0
You need to be logged in to leave comments. Login now