Show More
@@ -0,0 +1,179 b'' | |||||
|
1 | """serialization utilities for apply messages | |||
|
2 | ||||
|
3 | Authors: | |||
|
4 | ||||
|
5 | * Min RK | |||
|
6 | """ | |||
|
7 | #----------------------------------------------------------------------------- | |||
|
8 | # Copyright (C) 2010-2011 The IPython Development Team | |||
|
9 | # | |||
|
10 | # Distributed under the terms of the BSD License. The full license is in | |||
|
11 | # the file COPYING, distributed as part of this software. | |||
|
12 | #----------------------------------------------------------------------------- | |||
|
13 | ||||
|
14 | #----------------------------------------------------------------------------- | |||
|
15 | # Imports | |||
|
16 | #----------------------------------------------------------------------------- | |||
|
17 | ||||
|
18 | # Standard library imports | |||
|
19 | import logging | |||
|
20 | import os | |||
|
21 | import re | |||
|
22 | import socket | |||
|
23 | import sys | |||
|
24 | ||||
|
25 | try: | |||
|
26 | import cPickle | |||
|
27 | pickle = cPickle | |||
|
28 | except: | |||
|
29 | cPickle = None | |||
|
30 | import pickle | |||
|
31 | ||||
|
32 | ||||
|
33 | # IPython imports | |||
|
34 | from IPython.utils import py3compat | |||
|
35 | from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence | |||
|
36 | from IPython.utils.newserialized import serialize, unserialize | |||
|
37 | ||||
|
38 | if py3compat.PY3: | |||
|
39 | buffer = memoryview | |||
|
40 | ||||
|
41 | #----------------------------------------------------------------------------- | |||
|
42 | # Serialization Functions | |||
|
43 | #----------------------------------------------------------------------------- | |||
|
44 | ||||
|
45 | def serialize_object(obj, threshold=64e-6): | |||
|
46 | """Serialize an object into a list of sendable buffers. | |||
|
47 | ||||
|
48 | Parameters | |||
|
49 | ---------- | |||
|
50 | ||||
|
51 | obj : object | |||
|
52 | The object to be serialized | |||
|
53 | threshold : float | |||
|
54 | The threshold for not double-pickling the content. | |||
|
55 | ||||
|
56 | ||||
|
57 | Returns | |||
|
58 | ------- | |||
|
59 | ('pmd', [bufs]) : | |||
|
60 | where pmd is the pickled metadata wrapper, | |||
|
61 | bufs is a list of data buffers | |||
|
62 | """ | |||
|
63 | databuffers = [] | |||
|
64 | if isinstance(obj, (list, tuple)): | |||
|
65 | clist = canSequence(obj) | |||
|
66 | slist = map(serialize, clist) | |||
|
67 | for s in slist: | |||
|
68 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: | |||
|
69 | databuffers.append(s.getData()) | |||
|
70 | s.data = None | |||
|
71 | return pickle.dumps(slist,-1), databuffers | |||
|
72 | elif isinstance(obj, dict): | |||
|
73 | sobj = {} | |||
|
74 | for k in sorted(obj.iterkeys()): | |||
|
75 | s = serialize(can(obj[k])) | |||
|
76 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: | |||
|
77 | databuffers.append(s.getData()) | |||
|
78 | s.data = None | |||
|
79 | sobj[k] = s | |||
|
80 | return pickle.dumps(sobj,-1),databuffers | |||
|
81 | else: | |||
|
82 | s = serialize(can(obj)) | |||
|
83 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: | |||
|
84 | databuffers.append(s.getData()) | |||
|
85 | s.data = None | |||
|
86 | return pickle.dumps(s,-1),databuffers | |||
|
87 | ||||
|
88 | ||||
|
89 | def unserialize_object(bufs): | |||
|
90 | """reconstruct an object serialized by serialize_object from data buffers.""" | |||
|
91 | bufs = list(bufs) | |||
|
92 | sobj = pickle.loads(bufs.pop(0)) | |||
|
93 | if isinstance(sobj, (list, tuple)): | |||
|
94 | for s in sobj: | |||
|
95 | if s.data is None: | |||
|
96 | s.data = bufs.pop(0) | |||
|
97 | return uncanSequence(map(unserialize, sobj)), bufs | |||
|
98 | elif isinstance(sobj, dict): | |||
|
99 | newobj = {} | |||
|
100 | for k in sorted(sobj.iterkeys()): | |||
|
101 | s = sobj[k] | |||
|
102 | if s.data is None: | |||
|
103 | s.data = bufs.pop(0) | |||
|
104 | newobj[k] = uncan(unserialize(s)) | |||
|
105 | return newobj, bufs | |||
|
106 | else: | |||
|
107 | if sobj.data is None: | |||
|
108 | sobj.data = bufs.pop(0) | |||
|
109 | return uncan(unserialize(sobj)), bufs | |||
|
110 | ||||
|
111 | def pack_apply_message(f, args, kwargs, threshold=64e-6): | |||
|
112 | """pack up a function, args, and kwargs to be sent over the wire | |||
|
113 | as a series of buffers. Any object whose data is larger than `threshold` | |||
|
114 | will not have their data copied (currently only numpy arrays support zero-copy)""" | |||
|
115 | msg = [pickle.dumps(can(f),-1)] | |||
|
116 | databuffers = [] # for large objects | |||
|
117 | sargs, bufs = serialize_object(args,threshold) | |||
|
118 | msg.append(sargs) | |||
|
119 | databuffers.extend(bufs) | |||
|
120 | skwargs, bufs = serialize_object(kwargs,threshold) | |||
|
121 | msg.append(skwargs) | |||
|
122 | databuffers.extend(bufs) | |||
|
123 | msg.extend(databuffers) | |||
|
124 | return msg | |||
|
125 | ||||
|
126 | def unpack_apply_message(bufs, g=None, copy=True): | |||
|
127 | """unpack f,args,kwargs from buffers packed by pack_apply_message() | |||
|
128 | Returns: original f,args,kwargs""" | |||
|
129 | bufs = list(bufs) # allow us to pop | |||
|
130 | assert len(bufs) >= 3, "not enough buffers!" | |||
|
131 | if not copy: | |||
|
132 | for i in range(3): | |||
|
133 | bufs[i] = bufs[i].bytes | |||
|
134 | cf = pickle.loads(bufs.pop(0)) | |||
|
135 | sargs = list(pickle.loads(bufs.pop(0))) | |||
|
136 | skwargs = dict(pickle.loads(bufs.pop(0))) | |||
|
137 | # print sargs, skwargs | |||
|
138 | f = uncan(cf, g) | |||
|
139 | for sa in sargs: | |||
|
140 | if sa.data is None: | |||
|
141 | m = bufs.pop(0) | |||
|
142 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): | |||
|
143 | # always use a buffer, until memoryviews get sorted out | |||
|
144 | sa.data = buffer(m) | |||
|
145 | # disable memoryview support | |||
|
146 | # if copy: | |||
|
147 | # sa.data = buffer(m) | |||
|
148 | # else: | |||
|
149 | # sa.data = m.buffer | |||
|
150 | else: | |||
|
151 | if copy: | |||
|
152 | sa.data = m | |||
|
153 | else: | |||
|
154 | sa.data = m.bytes | |||
|
155 | ||||
|
156 | args = uncanSequence(map(unserialize, sargs), g) | |||
|
157 | kwargs = {} | |||
|
158 | for k in sorted(skwargs.iterkeys()): | |||
|
159 | sa = skwargs[k] | |||
|
160 | if sa.data is None: | |||
|
161 | m = bufs.pop(0) | |||
|
162 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): | |||
|
163 | # always use a buffer, until memoryviews get sorted out | |||
|
164 | sa.data = buffer(m) | |||
|
165 | # disable memoryview support | |||
|
166 | # if copy: | |||
|
167 | # sa.data = buffer(m) | |||
|
168 | # else: | |||
|
169 | # sa.data = m.buffer | |||
|
170 | else: | |||
|
171 | if copy: | |||
|
172 | sa.data = m | |||
|
173 | else: | |||
|
174 | sa.data = m.bytes | |||
|
175 | ||||
|
176 | kwargs[k] = uncan(unserialize(sa), g) | |||
|
177 | ||||
|
178 | return f,args,kwargs | |||
|
179 |
@@ -47,6 +47,9 b' from IPython.utils import py3compat' | |||||
47 | from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence |
|
47 | from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence | |
48 | from IPython.utils.newserialized import serialize, unserialize |
|
48 | from IPython.utils.newserialized import serialize, unserialize | |
49 | from IPython.zmq.log import EnginePUBHandler |
|
49 | from IPython.zmq.log import EnginePUBHandler | |
|
50 | from IPython.zmq.serialize import ( | |||
|
51 | unserialize_object, serialize_object, pack_apply_message, unpack_apply_message | |||
|
52 | ) | |||
50 |
|
53 | |||
51 | if py3compat.PY3: |
|
54 | if py3compat.PY3: | |
52 | buffer = memoryview |
|
55 | buffer = memoryview | |
@@ -222,140 +225,6 b' def disambiguate_url(url, location=None):' | |||||
222 |
|
225 | |||
223 | return "%s://%s:%s"%(proto,ip,port) |
|
226 | return "%s://%s:%s"%(proto,ip,port) | |
224 |
|
227 | |||
225 | def serialize_object(obj, threshold=64e-6): |
|
|||
226 | """Serialize an object into a list of sendable buffers. |
|
|||
227 |
|
||||
228 | Parameters |
|
|||
229 | ---------- |
|
|||
230 |
|
||||
231 | obj : object |
|
|||
232 | The object to be serialized |
|
|||
233 | threshold : float |
|
|||
234 | The threshold for not double-pickling the content. |
|
|||
235 |
|
||||
236 |
|
||||
237 | Returns |
|
|||
238 | ------- |
|
|||
239 | ('pmd', [bufs]) : |
|
|||
240 | where pmd is the pickled metadata wrapper, |
|
|||
241 | bufs is a list of data buffers |
|
|||
242 | """ |
|
|||
243 | databuffers = [] |
|
|||
244 | if isinstance(obj, (list, tuple)): |
|
|||
245 | clist = canSequence(obj) |
|
|||
246 | slist = map(serialize, clist) |
|
|||
247 | for s in slist: |
|
|||
248 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: |
|
|||
249 | databuffers.append(s.getData()) |
|
|||
250 | s.data = None |
|
|||
251 | return pickle.dumps(slist,-1), databuffers |
|
|||
252 | elif isinstance(obj, dict): |
|
|||
253 | sobj = {} |
|
|||
254 | for k in sorted(obj.iterkeys()): |
|
|||
255 | s = serialize(can(obj[k])) |
|
|||
256 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: |
|
|||
257 | databuffers.append(s.getData()) |
|
|||
258 | s.data = None |
|
|||
259 | sobj[k] = s |
|
|||
260 | return pickle.dumps(sobj,-1),databuffers |
|
|||
261 | else: |
|
|||
262 | s = serialize(can(obj)) |
|
|||
263 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: |
|
|||
264 | databuffers.append(s.getData()) |
|
|||
265 | s.data = None |
|
|||
266 | return pickle.dumps(s,-1),databuffers |
|
|||
267 |
|
||||
268 |
|
||||
269 | def unserialize_object(bufs): |
|
|||
270 | """reconstruct an object serialized by serialize_object from data buffers.""" |
|
|||
271 | bufs = list(bufs) |
|
|||
272 | sobj = pickle.loads(bufs.pop(0)) |
|
|||
273 | if isinstance(sobj, (list, tuple)): |
|
|||
274 | for s in sobj: |
|
|||
275 | if s.data is None: |
|
|||
276 | s.data = bufs.pop(0) |
|
|||
277 | return uncanSequence(map(unserialize, sobj)), bufs |
|
|||
278 | elif isinstance(sobj, dict): |
|
|||
279 | newobj = {} |
|
|||
280 | for k in sorted(sobj.iterkeys()): |
|
|||
281 | s = sobj[k] |
|
|||
282 | if s.data is None: |
|
|||
283 | s.data = bufs.pop(0) |
|
|||
284 | newobj[k] = uncan(unserialize(s)) |
|
|||
285 | return newobj, bufs |
|
|||
286 | else: |
|
|||
287 | if sobj.data is None: |
|
|||
288 | sobj.data = bufs.pop(0) |
|
|||
289 | return uncan(unserialize(sobj)), bufs |
|
|||
290 |
|
||||
291 | def pack_apply_message(f, args, kwargs, threshold=64e-6): |
|
|||
292 | """pack up a function, args, and kwargs to be sent over the wire |
|
|||
293 | as a series of buffers. Any object whose data is larger than `threshold` |
|
|||
294 | will not have their data copied (currently only numpy arrays support zero-copy)""" |
|
|||
295 | msg = [pickle.dumps(can(f),-1)] |
|
|||
296 | databuffers = [] # for large objects |
|
|||
297 | sargs, bufs = serialize_object(args,threshold) |
|
|||
298 | msg.append(sargs) |
|
|||
299 | databuffers.extend(bufs) |
|
|||
300 | skwargs, bufs = serialize_object(kwargs,threshold) |
|
|||
301 | msg.append(skwargs) |
|
|||
302 | databuffers.extend(bufs) |
|
|||
303 | msg.extend(databuffers) |
|
|||
304 | return msg |
|
|||
305 |
|
||||
306 | def unpack_apply_message(bufs, g=None, copy=True): |
|
|||
307 | """unpack f,args,kwargs from buffers packed by pack_apply_message() |
|
|||
308 | Returns: original f,args,kwargs""" |
|
|||
309 | bufs = list(bufs) # allow us to pop |
|
|||
310 | assert len(bufs) >= 3, "not enough buffers!" |
|
|||
311 | if not copy: |
|
|||
312 | for i in range(3): |
|
|||
313 | bufs[i] = bufs[i].bytes |
|
|||
314 | cf = pickle.loads(bufs.pop(0)) |
|
|||
315 | sargs = list(pickle.loads(bufs.pop(0))) |
|
|||
316 | skwargs = dict(pickle.loads(bufs.pop(0))) |
|
|||
317 | # print sargs, skwargs |
|
|||
318 | f = uncan(cf, g) |
|
|||
319 | for sa in sargs: |
|
|||
320 | if sa.data is None: |
|
|||
321 | m = bufs.pop(0) |
|
|||
322 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): |
|
|||
323 | # always use a buffer, until memoryviews get sorted out |
|
|||
324 | sa.data = buffer(m) |
|
|||
325 | # disable memoryview support |
|
|||
326 | # if copy: |
|
|||
327 | # sa.data = buffer(m) |
|
|||
328 | # else: |
|
|||
329 | # sa.data = m.buffer |
|
|||
330 | else: |
|
|||
331 | if copy: |
|
|||
332 | sa.data = m |
|
|||
333 | else: |
|
|||
334 | sa.data = m.bytes |
|
|||
335 |
|
||||
336 | args = uncanSequence(map(unserialize, sargs), g) |
|
|||
337 | kwargs = {} |
|
|||
338 | for k in sorted(skwargs.iterkeys()): |
|
|||
339 | sa = skwargs[k] |
|
|||
340 | if sa.data is None: |
|
|||
341 | m = bufs.pop(0) |
|
|||
342 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): |
|
|||
343 | # always use a buffer, until memoryviews get sorted out |
|
|||
344 | sa.data = buffer(m) |
|
|||
345 | # disable memoryview support |
|
|||
346 | # if copy: |
|
|||
347 | # sa.data = buffer(m) |
|
|||
348 | # else: |
|
|||
349 | # sa.data = m.buffer |
|
|||
350 | else: |
|
|||
351 | if copy: |
|
|||
352 | sa.data = m |
|
|||
353 | else: |
|
|||
354 | sa.data = m.bytes |
|
|||
355 |
|
||||
356 | kwargs[k] = uncan(unserialize(sa), g) |
|
|||
357 |
|
||||
358 | return f,args,kwargs |
|
|||
359 |
|
228 | |||
360 | #-------------------------------------------------------------------------- |
|
229 | #-------------------------------------------------------------------------- | |
361 | # helpers for implementing old MEC API via view.apply |
|
230 | # helpers for implementing old MEC API via view.apply |
@@ -22,6 +22,9 b' import sys' | |||||
22 | import time |
|
22 | import time | |
23 | import traceback |
|
23 | import traceback | |
24 | import logging |
|
24 | import logging | |
|
25 | import uuid | |||
|
26 | ||||
|
27 | from datetime import datetime | |||
25 | from signal import ( |
|
28 | from signal import ( | |
26 | signal, default_int_handler, SIGINT, SIG_IGN |
|
29 | signal, default_int_handler, SIGINT, SIG_IGN | |
27 | ) |
|
30 | ) | |
@@ -47,6 +50,7 b' from IPython.utils.traitlets import (' | |||||
47 |
|
50 | |||
48 | from entry_point import base_launch_kernel |
|
51 | from entry_point import base_launch_kernel | |
49 | from kernelapp import KernelApp, kernel_flags, kernel_aliases |
|
52 | from kernelapp import KernelApp, kernel_flags, kernel_aliases | |
|
53 | from serialize import serialize_object, unpack_apply_message | |||
50 | from session import Session, Message |
|
54 | from session import Session, Message | |
51 | from zmqshell import ZMQInteractiveShell |
|
55 | from zmqshell import ZMQInteractiveShell | |
52 |
|
56 | |||
@@ -540,7 +544,7 b' class Kernel(Configurable):' | |||||
540 | exc_content = self._wrap_exception('apply') |
|
544 | exc_content = self._wrap_exception('apply') | |
541 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
545 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |
542 | self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent, |
|
546 | self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent, | |
543 | ident=asbytes('%s.pyerr'%self.prefix)) |
|
547 | ident=py3compat.str_to_bytes('%s.pyerr'%self.prefix)) | |
544 | reply_content = exc_content |
|
548 | reply_content = exc_content | |
545 | result_buf = [] |
|
549 | result_buf = [] | |
546 |
|
550 |
General Comments 0
You need to be logged in to leave comments.
Login now