##// END OF EJS Templates
serialize elements of args/kwargs in pack_apply message...
MinRK -
Show More
@@ -1,180 +1,204 b''
1 1 """serialization utilities for apply messages
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports
19 19 import logging
20 20 import os
21 21 import re
22 22 import socket
23 23 import sys
24 24
25 25 try:
26 26 import cPickle
27 27 pickle = cPickle
28 28 except:
29 29 cPickle = None
30 30 import pickle
31 31
32 32
33 33 # IPython imports
34 34 from IPython.utils import py3compat
35 from IPython.utils.data import flatten
35 36 from IPython.utils.pickleutil import (
36 37 can, uncan, can_sequence, uncan_sequence, CannedObject
37 38 )
38 39
39 40 if py3compat.PY3:
40 41 buffer = memoryview
41 42
42 43 #-----------------------------------------------------------------------------
43 44 # Serialization Functions
44 45 #-----------------------------------------------------------------------------
45 46
46 47 # default values for the thresholds:
47 48 MAX_ITEMS = 64
48 49 MAX_BYTES = 1024
49 50
50 51 def _extract_buffers(obj, threshold=MAX_BYTES):
51 52 """extract buffers larger than a certain threshold"""
52 53 buffers = []
53 54 if isinstance(obj, CannedObject) and obj.buffers:
54 55 for i,buf in enumerate(obj.buffers):
55 56 if len(buf) > threshold:
56 57 # buffer larger than threshold, prevent pickling
57 58 obj.buffers[i] = None
58 59 buffers.append(buf)
59 60 elif isinstance(buf, buffer):
60 61 # buffer too small for separate send, coerce to bytes
61 62 # because pickling buffer objects just results in broken pointers
62 63 obj.buffers[i] = bytes(buf)
63 64 return buffers
64 65
65 66 def _restore_buffers(obj, buffers):
66 67 """restore buffers extracted by """
67 68 if isinstance(obj, CannedObject) and obj.buffers:
68 69 for i,buf in enumerate(obj.buffers):
69 70 if buf is None:
70 71 obj.buffers[i] = buffers.pop(0)
71 72
72 73 def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
73 74 """Serialize an object into a list of sendable buffers.
74 75
75 76 Parameters
76 77 ----------
77 78
78 79 obj : object
79 80 The object to be serialized
80 81 buffer_threshold : int
81 82 The threshold (in bytes) for pulling out data buffers
82 83 to avoid pickling them.
83 84 item_threshold : int
84 85 The maximum number of items over which canning will iterate.
85 86 Containers (lists, dicts) larger than this will be pickled without
86 87 introspection.
87 88
88 89 Returns
89 90 -------
90 91 [bufs] : list of buffers representing the serialized object.
91 92 """
92 93 buffers = []
93 94 if isinstance(obj, (list, tuple)) and len(obj) < item_threshold:
94 95 cobj = can_sequence(obj)
95 96 for c in cobj:
96 97 buffers.extend(_extract_buffers(c, buffer_threshold))
97 98 elif isinstance(obj, dict) and len(obj) < item_threshold:
98 99 cobj = {}
99 100 for k in sorted(obj.iterkeys()):
100 101 c = can(obj[k])
101 102 buffers.extend(_extract_buffers(c, buffer_threshold))
102 103 cobj[k] = c
103 104 else:
104 105 cobj = can(obj)
105 106 buffers.extend(_extract_buffers(cobj, buffer_threshold))
106 107
107 108 buffers.insert(0, pickle.dumps(cobj,-1))
108 109 return buffers
109 110
110 111 def unserialize_object(buffers, g=None):
111 112 """reconstruct an object serialized by serialize_object from data buffers.
112 113
113 114 Parameters
114 115 ----------
115 116
116 117 bufs : list of buffers/bytes
117 118
118 119 g : globals to be used when uncanning
119 120
120 121 Returns
121 122 -------
122 123
123 124 (newobj, bufs) : unpacked object, and the list of remaining unused buffers.
124 125 """
125 126 bufs = list(buffers)
126 canned = pickle.loads(bufs.pop(0))
127 pobj = bufs.pop(0)
128 if not isinstance(pobj, bytes):
129 # a zmq message
130 pobj = bytes(pobj)
131 canned = pickle.loads(pobj)
127 132 if isinstance(canned, (list, tuple)) and len(canned) < MAX_ITEMS:
128 133 for c in canned:
129 134 _restore_buffers(c, bufs)
130 135 newobj = uncan_sequence(canned, g)
131 136 elif isinstance(canned, dict) and len(canned) < MAX_ITEMS:
132 137 newobj = {}
133 138 for k in sorted(canned.iterkeys()):
134 139 c = canned[k]
135 140 _restore_buffers(c, bufs)
136 141 newobj[k] = uncan(c, g)
137 142 else:
138 143 _restore_buffers(canned, bufs)
139 144 newobj = uncan(canned, g)
140 145
141 146 return newobj, bufs
142 147
143 148 def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
144 149 """pack up a function, args, and kwargs to be sent over the wire
145 150
146 as a series of buffers. Any object whose data is larger than `threshold`
147 will not have their data copied (currently only numpy arrays support zero-copy)
151 Each element of args/kwargs will be canned for special treatment,
152 but inspection will not go any deeper than that.
153
154 Any object whose data is larger than `threshold` will not have their data copied
155 (only numpy arrays and bytes/buffers support zero-copy)
156
157 Message will be a list of bytes/buffers of the format:
158
159 [ cf, pinfo, <arg_bufs>, <kwarg_bufs> ]
160
161 With length at least two + len(args) + len(kwargs)
148 162 """
163
164 arg_bufs = flatten(serialize_object(arg, buffer_threshold, item_threshold) for arg in args)
165
166 kw_keys = sorted(kwargs.keys())
167 kwarg_bufs = flatten(serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys)
168
169 info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
170
149 171 msg = [pickle.dumps(can(f),-1)]
150 databuffers = [] # for large objects
151 sargs = serialize_object(args, buffer_threshold, item_threshold)
152 msg.append(sargs[0])
153 databuffers.extend(sargs[1:])
154 skwargs = serialize_object(kwargs, buffer_threshold, item_threshold)
155 msg.append(skwargs[0])
156 databuffers.extend(skwargs[1:])
157 msg.extend(databuffers)
172 msg.append(pickle.dumps(info, -1))
173 msg.extend(arg_bufs)
174 msg.extend(kwarg_bufs)
175
158 176 return msg
159 177
160 178 def unpack_apply_message(bufs, g=None, copy=True):
161 179 """unpack f,args,kwargs from buffers packed by pack_apply_message()
162 180 Returns: original f,args,kwargs"""
163 181 bufs = list(bufs) # allow us to pop
164 assert len(bufs) >= 3, "not enough buffers!"
182 assert len(bufs) >= 2, "not enough buffers!"
165 183 if not copy:
166 for i in range(3):
184 for i in range(2):
167 185 bufs[i] = bufs[i].bytes
168 186 f = uncan(pickle.loads(bufs.pop(0)), g)
169 # sargs = bufs.pop(0)
170 # pop kwargs out, so first n-elements are args, serialized
171 skwargs = bufs.pop(1)
172 args, bufs = unserialize_object(bufs, g)
173 # put skwargs back in as the first element
174 bufs.insert(0, skwargs)
175 kwargs, bufs = unserialize_object(bufs, g)
176
177 assert not bufs, "Shouldn't be any data left over"
187 info = pickle.loads(bufs.pop(0))
188 arg_bufs, kwarg_bufs = bufs[:info['narg_bufs']], bufs[info['narg_bufs']:]
189
190 args = []
191 for i in range(info['nargs']):
192 arg, arg_bufs = unserialize_object(arg_bufs, g)
193 args.append(arg)
194 args = tuple(args)
195 assert not arg_bufs, "Shouldn't be any arg bufs left over"
196
197 kwargs = {}
198 for key in info['kw_keys']:
199 kwarg, kwarg_bufs = unserialize_object(kwarg_bufs, g)
200 kwargs[key] = kwarg
201 assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"
178 202
179 203 return f,args,kwargs
180 204
General Comments 0
You need to be logged in to leave comments. Login now