##// END OF EJS Templates
Merge pull request #4552 from minrk/use_dill...
Min RK -
r13728:818f778e merge
parent child Browse files
Show More
@@ -0,0 +1,7 b''
1 Using dill to expand serialization support
2 ------------------------------------------
3
4 adds :func:`~IPython.utils.pickleutil.use_dill` for allowing
5 dill to extend serialization support in :mod:`IPython.parallel` (closures, etc.).
6 Also adds :meth:`DirectView.use_dill` convenience method for enabling dill
7 locally and on all engines with one call.
@@ -0,0 +1,463 b''
1 {
2 "metadata": {
3 "gist_id": "5241793",
4 "name": ""
5 },
6 "nbformat": 3,
7 "nbformat_minor": 0,
8 "worksheets": [
9 {
10 "cells": [
11 {
12 "cell_type": "heading",
13 "level": 1,
14 "metadata": {},
15 "source": [
16 "Using dill to pickle anything"
17 ]
18 },
19 {
20 "cell_type": "markdown",
21 "metadata": {},
22 "source": [
23 "IPython.parallel doesn't do much in the way of serialization.\n",
24 "It has custom zero-copy handling of numpy arrays,\n",
25 "but other than that, it doesn't do anything other than the bare minimum to make basic interactively defined functions and classes sendable.\n",
26 "\n",
27 "There are a few projects that extend pickle to make just about anything sendable, and one of these is [dill](http://www.cacr.caltech.edu/~mmckerns/dill).\n",
28 "\n",
29 "To install dill:\n",
30 " \n",
31 " pip install dill"
32 ]
33 },
34 {
35 "cell_type": "markdown",
36 "metadata": {},
37 "source": [
38 "First, as always, we create a task function, this time with a closure"
39 ]
40 },
41 {
42 "cell_type": "code",
43 "collapsed": false,
44 "input": [
45 "def make_closure(a):\n",
46 " \"\"\"make a function with a closure, and return it\"\"\"\n",
47 " def has_closure(b):\n",
48 " return a * b\n",
49 " return has_closure"
50 ],
51 "language": "python",
52 "metadata": {},
53 "outputs": [],
54 "prompt_number": 1
55 },
56 {
57 "cell_type": "code",
58 "collapsed": false,
59 "input": [
60 "closed = make_closure(5)"
61 ],
62 "language": "python",
63 "metadata": {},
64 "outputs": [],
65 "prompt_number": 2
66 },
67 {
68 "cell_type": "code",
69 "collapsed": false,
70 "input": [
71 "closed(2)"
72 ],
73 "language": "python",
74 "metadata": {},
75 "outputs": [
76 {
77 "metadata": {},
78 "output_type": "pyout",
79 "prompt_number": 3,
80 "text": [
81 "10"
82 ]
83 }
84 ],
85 "prompt_number": 3
86 },
87 {
88 "cell_type": "code",
89 "collapsed": false,
90 "input": [
91 "import pickle"
92 ],
93 "language": "python",
94 "metadata": {},
95 "outputs": [],
96 "prompt_number": 4
97 },
98 {
99 "cell_type": "markdown",
100 "metadata": {},
101 "source": [
102 "Without help, pickle can't deal with closures"
103 ]
104 },
105 {
106 "cell_type": "code",
107 "collapsed": false,
108 "input": [
109 "pickle.dumps(closed)"
110 ],
111 "language": "python",
112 "metadata": {},
113 "outputs": [
114 {
115 "ename": "PicklingError",
116 "evalue": "Can't pickle <function has_closure at 0x10d2552a8>: it's not found as __main__.has_closure",
117 "output_type": "pyerr",
118 "traceback": [
119 "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m\n\u001b[1;31mPicklingError\u001b[0m Traceback (most recent call last)",
120 "\u001b[1;32m<ipython-input-5-0f1f376cfea0>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[1;32m----> 1\u001b[1;33m \u001b[0mpickle\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mdumps\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mclosed\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m",
121 "\u001b[1;32m/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.pyc\u001b[0m in \u001b[0;36mdumps\u001b[1;34m(obj, protocol)\u001b[0m\n\u001b[0;32m 1372\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mdumps\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mobj\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mprotocol\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mNone\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1373\u001b[0m \u001b[0mfile\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mStringIO\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m-> 1374\u001b[1;33m \u001b[0mPickler\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mfile\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mprotocol\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mdump\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mobj\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 1375\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mfile\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mgetvalue\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1376\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
122 "\u001b[1;32m/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.pyc\u001b[0m in \u001b[0;36mdump\u001b[1;34m(self, obj)\u001b[0m\n\u001b[0;32m 222\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mproto\u001b[0m \u001b[1;33m>=\u001b[0m \u001b[1;36m2\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 223\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mwrite\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mPROTO\u001b[0m \u001b[1;33m+\u001b[0m \u001b[0mchr\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mproto\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 224\u001b[1;33m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msave\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mobj\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 225\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mwrite\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mSTOP\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 226\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
123 "\u001b[1;32m/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.pyc\u001b[0m in \u001b[0;36msave\u001b[1;34m(self, obj)\u001b[0m\n\u001b[0;32m 284\u001b[0m \u001b[0mf\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mdispatch\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mget\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mt\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 285\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mf\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 286\u001b[1;33m \u001b[0mf\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mobj\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;31m# Call unbound method with explicit self\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 287\u001b[0m \u001b[1;32mreturn\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 288\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
124 "\u001b[1;32m/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.pyc\u001b[0m in \u001b[0;36msave_global\u001b[1;34m(self, obj, name, pack)\u001b[0m\n\u001b[0;32m 746\u001b[0m raise PicklingError(\n\u001b[0;32m 747\u001b[0m \u001b[1;34m\"Can't pickle %r: it's not found as %s.%s\"\u001b[0m \u001b[1;33m%\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 748\u001b[1;33m (obj, module, name))\n\u001b[0m\u001b[0;32m 749\u001b[0m \u001b[1;32melse\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 750\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mklass\u001b[0m \u001b[1;32mis\u001b[0m \u001b[1;32mnot\u001b[0m \u001b[0mobj\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
125 "\u001b[1;31mPicklingError\u001b[0m: Can't pickle <function has_closure at 0x10d2552a8>: it's not found as __main__.has_closure"
126 ]
127 }
128 ],
129 "prompt_number": 5
130 },
131 {
132 "cell_type": "markdown",
133 "metadata": {},
134 "source": [
135 "But after we import dill, magic happens"
136 ]
137 },
138 {
139 "cell_type": "code",
140 "collapsed": false,
141 "input": [
142 "import dill"
143 ],
144 "language": "python",
145 "metadata": {},
146 "outputs": [],
147 "prompt_number": 6
148 },
149 {
150 "cell_type": "code",
151 "collapsed": false,
152 "input": [
153 "pickle.dumps(closed)[:64] + '...'"
154 ],
155 "language": "python",
156 "metadata": {},
157 "outputs": [
158 {
159 "metadata": {},
160 "output_type": "pyout",
161 "prompt_number": 7,
162 "text": [
163 "\"cdill.dill\\n_load_type\\np0\\n(S'FunctionType'\\np1\\ntp2\\nRp3\\n(cdill.dill...\""
164 ]
165 }
166 ],
167 "prompt_number": 7
168 },
169 {
170 "cell_type": "markdown",
171 "metadata": {},
172 "source": [
173 "So from now on, pretty much everything is pickleable."
174 ]
175 },
176 {
177 "cell_type": "heading",
178 "level": 2,
179 "metadata": {},
180 "source": [
181 "Now use this in IPython.parallel"
182 ]
183 },
184 {
185 "cell_type": "markdown",
186 "metadata": {},
187 "source": [
188 "As usual, we start by creating our Client and View"
189 ]
190 },
191 {
192 "cell_type": "code",
193 "collapsed": false,
194 "input": [
195 "from IPython import parallel\n",
196 "rc = parallel.Client()\n",
197 "view = rc.load_balanced_view()"
198 ],
199 "language": "python",
200 "metadata": {},
201 "outputs": [],
202 "prompt_number": 8
203 },
204 {
205 "cell_type": "markdown",
206 "metadata": {},
207 "source": [
208 "Now let's try sending our function with a closure:"
209 ]
210 },
211 {
212 "cell_type": "code",
213 "collapsed": false,
214 "input": [
215 "view.apply_sync(closed, 3)"
216 ],
217 "language": "python",
218 "metadata": {},
219 "outputs": [
220 {
221 "ename": "ValueError",
222 "evalue": "Sorry, cannot pickle code objects with closures",
223 "output_type": "pyerr",
224 "traceback": [
225 "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m\n\u001b[1;31mValueError\u001b[0m Traceback (most recent call last)",
226 "\u001b[1;32m<ipython-input-9-23a646829fdc>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[1;32m----> 1\u001b[1;33m \u001b[0mview\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mapply_sync\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mclosed\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m3\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m",
227 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc\u001b[0m in \u001b[0;36mapply_sync\u001b[1;34m(self, f, *args, **kwargs)\u001b[0m\n",
228 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc\u001b[0m in \u001b[0;36mspin_after\u001b[1;34m(f, self, *args, **kwargs)\u001b[0m\n\u001b[0;32m 73\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mspin_after\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mf\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m*\u001b[0m\u001b[0margs\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 74\u001b[0m \u001b[1;34m\"\"\"call spin after the method.\"\"\"\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 75\u001b[1;33m \u001b[0mret\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mf\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m*\u001b[0m\u001b[0margs\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 76\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mspin\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 77\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mret\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
229 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc\u001b[0m in \u001b[0;36mapply_sync\u001b[1;34m(self, f, *args, **kwargs)\u001b[0m\n\u001b[0;32m 248\u001b[0m \u001b[0mreturns\u001b[0m\u001b[1;33m:\u001b[0m \u001b[0mactual\u001b[0m \u001b[0mresult\u001b[0m \u001b[0mof\u001b[0m \u001b[0mf\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m*\u001b[0m\u001b[0margs\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 249\u001b[0m \"\"\"\n\u001b[1;32m--> 250\u001b[1;33m \u001b[1;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_really_apply\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mf\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0margs\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mkwargs\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mblock\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mTrue\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 251\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 252\u001b[0m \u001b[1;31m#----------------------------------------------------------------\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
230 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc\u001b[0m in \u001b[0;36m_really_apply\u001b[1;34m(self, f, args, kwargs, block, track, after, follow, timeout, targets, retries)\u001b[0m\n",
231 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc\u001b[0m in \u001b[0;36msync_results\u001b[1;34m(f, self, *args, **kwargs)\u001b[0m\n\u001b[0;32m 64\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_in_sync_results\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mTrue\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 65\u001b[0m \u001b[1;32mtry\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 66\u001b[1;33m \u001b[0mret\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mf\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m*\u001b[0m\u001b[0margs\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 67\u001b[0m \u001b[1;32mfinally\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 68\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_in_sync_results\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mFalse\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
232 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc\u001b[0m in \u001b[0;36m_really_apply\u001b[1;34m(self, f, args, kwargs, block, track, after, follow, timeout, targets, retries)\u001b[0m\n",
233 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc\u001b[0m in \u001b[0;36msave_ids\u001b[1;34m(f, self, *args, **kwargs)\u001b[0m\n\u001b[0;32m 49\u001b[0m \u001b[0mn_previous\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mlen\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mclient\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mhistory\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 50\u001b[0m \u001b[1;32mtry\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 51\u001b[1;33m \u001b[0mret\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mf\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m*\u001b[0m\u001b[0margs\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 52\u001b[0m \u001b[1;32mfinally\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 53\u001b[0m \u001b[0mnmsgs\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mlen\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mclient\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mhistory\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;33m-\u001b[0m \u001b[0mn_previous\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
234 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc\u001b[0m in \u001b[0;36m_really_apply\u001b[1;34m(self, f, args, kwargs, block, track, after, follow, timeout, targets, retries)\u001b[0m\n\u001b[0;32m 1053\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1054\u001b[0m msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,\n\u001b[1;32m-> 1055\u001b[1;33m metadata=metadata)\n\u001b[0m\u001b[0;32m 1056\u001b[0m \u001b[0mtracker\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mNone\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mtrack\u001b[0m \u001b[1;32mis\u001b[0m \u001b[0mFalse\u001b[0m \u001b[1;32melse\u001b[0m \u001b[0mmsg\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;34m'tracker'\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1057\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
235 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/parallel/client/client.pyc\u001b[0m in \u001b[0;36msend_apply_request\u001b[1;34m(self, socket, f, args, kwargs, metadata, track, ident)\u001b[0m\n\u001b[0;32m 1252\u001b[0m bufs = serialize.pack_apply_message(f, args, kwargs,\n\u001b[0;32m 1253\u001b[0m \u001b[0mbuffer_threshold\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msession\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mbuffer_threshold\u001b[0m\u001b[1;33m,\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m-> 1254\u001b[1;33m \u001b[0mitem_threshold\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msession\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mitem_threshold\u001b[0m\u001b[1;33m,\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 1255\u001b[0m )\n\u001b[0;32m 1256\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
236 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/kernel/zmq/serialize.pyc\u001b[0m in \u001b[0;36mpack_apply_message\u001b[1;34m(f, args, kwargs, buffer_threshold, item_threshold)\u001b[0m\n\u001b[0;32m 163\u001b[0m \u001b[0minfo\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mdict\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mnargs\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mlen\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0margs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mnarg_bufs\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mlen\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0marg_bufs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mkw_keys\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mkw_keys\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 164\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 165\u001b[1;33m \u001b[0mmsg\u001b[0m \u001b[1;33m=\u001b[0m \u001b[1;33m[\u001b[0m\u001b[0mpickle\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mdumps\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcan\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mf\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m,\u001b[0m\u001b[1;33m-\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 166\u001b[0m \u001b[0mmsg\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mappend\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mpickle\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mdumps\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0minfo\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m-\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 167\u001b[0m \u001b[0mmsg\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mextend\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0marg_bufs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
237 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/utils/codeutil.pyc\u001b[0m in \u001b[0;36mreduce_code\u001b[1;34m(co)\u001b[0m\n\u001b[0;32m 36\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mreduce_code\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mco\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 37\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_freevars\u001b[0m \u001b[1;32mor\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_cellvars\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 38\u001b[1;33m \u001b[1;32mraise\u001b[0m \u001b[0mValueError\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m\"Sorry, cannot pickle code objects with closures\"\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 39\u001b[0m args = [co.co_argcount, co.co_nlocals, co.co_stacksize,\n\u001b[0;32m 40\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_flags\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_code\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_consts\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_names\u001b[0m\u001b[1;33m,\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
238 "\u001b[1;31mValueError\u001b[0m: Sorry, cannot pickle code objects with closures"
239 ]
240 }
241 ],
242 "prompt_number": 9
243 },
244 {
245 "cell_type": "markdown",
246 "metadata": {},
247 "source": [
248 "Oops, no dice. For IPython to work with dill,\n",
249 "there are one or two more steps. IPython will do these for you if you call `pickleutil.use_dill`:"
250 ]
251 },
252 {
253 "cell_type": "code",
254 "collapsed": false,
255 "input": [
256 "from IPython.utils import pickleutil\n",
257 "pickleutil.use_dill()"
258 ],
259 "language": "python",
260 "metadata": {},
261 "outputs": [],
262 "prompt_number": 10
263 },
264 {
265 "cell_type": "markdown",
266 "metadata": {},
267 "source": [
268 "Now let's try again"
269 ]
270 },
271 {
272 "cell_type": "code",
273 "collapsed": false,
274 "input": [
275 "view.apply_sync(closed, 3)"
276 ],
277 "language": "python",
278 "metadata": {},
279 "outputs": [
280 {
281 "metadata": {},
282 "output_type": "pyout",
283 "prompt_number": 11,
284 "text": [
285 "15"
286 ]
287 }
288 ],
289 "prompt_number": 11
290 },
291 {
292 "cell_type": "markdown",
293 "metadata": {},
294 "source": [
295 "Yay! Now we can use dill to allow IPython.parallel to send anything.\n",
296 "\n",
297 "And that's it! We can send closures and other previously non-pickleables to our engines.\n",
298 "\n",
299 "But wait, there's more!"
300 ]
301 },
302 {
303 "cell_type": "code",
304 "collapsed": false,
305 "input": [
306 "view.apply_sync(make_closure, 2)"
307 ],
308 "language": "python",
309 "metadata": {},
310 "outputs": [
311 {
312 "ename": "RemoteError",
313 "evalue": "ValueError(Sorry, cannot pickle code objects with closures)",
314 "output_type": "pyerr",
315 "traceback": [
316 "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
317 "\u001b[1;31mValueError\u001b[0m Traceback (most recent call last)\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/kernel/zmq/serialize.pyc\u001b[0m in \u001b[0;36mserialize_object\u001b[1;34m(obj, buffer_threshold, item_threshold)\u001b[0m",
318 "\u001b[0;32m 100\u001b[0m \u001b[0mbuffers\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mextend\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0m_extract_buffers\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcobj\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mbuffer_threshold\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m",
319 "\u001b[0;32m 101\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m",
320 "\u001b[1;32m--> 102\u001b[1;33m \u001b[0mbuffers\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0minsert\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;36m0\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mpickle\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mdumps\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcobj\u001b[0m\u001b[1;33m,\u001b[0m\u001b[1;33m-\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m",
321 "\u001b[0m\u001b[0;32m 103\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mbuffers\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m",
322 "\u001b[0;32m 104\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m",
323 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/utils/codeutil.pyc\u001b[0m in \u001b[0;36mreduce_code\u001b[1;34m(co)\u001b[0m",
324 "\u001b[0;32m 36\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mreduce_code\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mco\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m",
325 "\u001b[0;32m 37\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_freevars\u001b[0m \u001b[1;32mor\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_cellvars\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m",
326 "\u001b[1;32m---> 38\u001b[1;33m \u001b[1;32mraise\u001b[0m \u001b[0mValueError\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m\"Sorry, cannot pickle code objects with closures\"\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m",
327 "\u001b[0m\u001b[0;32m 39\u001b[0m args = [co.co_argcount, co.co_nlocals, co.co_stacksize,",
328 "\u001b[0;32m 40\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_flags\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_code\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_consts\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_names\u001b[0m\u001b[1;33m,\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m",
329 "\u001b[1;31mValueError\u001b[0m: Sorry, cannot pickle code objects with closures"
330 ]
331 }
332 ],
333 "prompt_number": 12
334 },
335 {
336 "cell_type": "markdown",
337 "metadata": {},
338 "source": [
339 "If we want dill support for objects coming *from* the engines,\n",
340 "then we need to call `use_dill()` there as well.\n",
341 "\n",
342 "`DirectView` objects have a method to call `use_dill` locally and on every engine:"
343 ]
344 },
345 {
346 "cell_type": "code",
347 "collapsed": false,
348 "input": [
349 "rc[:].use_dill()"
350 ],
351 "language": "python",
352 "metadata": {},
353 "outputs": [
354 {
355 "metadata": {},
356 "output_type": "pyout",
357 "prompt_number": 13,
358 "text": [
359 "<AsyncResult: use_dill>"
360 ]
361 }
362 ],
363 "prompt_number": 13
364 },
365 {
366 "cell_type": "markdown",
367 "metadata": {},
368 "source": [
369 "This is equivalent to\n",
370 "\n",
371 "```python\n",
372 "from IPython.utils.pickleutil import use_dill\n",
373 "use_dill()\n",
374 "rc[:].apply(use_dill)\n",
375 "```"
376 ]
377 },
378 {
379 "cell_type": "markdown",
380 "metadata": {},
381 "source": [
382 "Let's give it a try now:"
383 ]
384 },
385 {
386 "cell_type": "code",
387 "collapsed": false,
388 "input": [
389 "remote_closure = view.apply_sync(make_closure, 4)\n",
390 "remote_closure(5)"
391 ],
392 "language": "python",
393 "metadata": {},
394 "outputs": [
395 {
396 "metadata": {},
397 "output_type": "pyout",
398 "prompt_number": 14,
399 "text": [
400 "20"
401 ]
402 }
403 ],
404 "prompt_number": 14
405 },
406 {
407 "cell_type": "markdown",
408 "metadata": {},
409 "source": [
410 "At this point, we can send/recv all kinds of stuff"
411 ]
412 },
413 {
414 "cell_type": "code",
415 "collapsed": false,
416 "input": [
417 "def outer(a):\n",
418 " def inner(b):\n",
419 " def inner_again(c):\n",
420 " return c * b * a\n",
421 " return inner_again\n",
422 " return inner"
423 ],
424 "language": "python",
425 "metadata": {},
426 "outputs": [],
427 "prompt_number": 15
428 },
429 {
430 "cell_type": "markdown",
431 "metadata": {},
432 "source": [
433 "So outer returns a function with a closure, which returns a function with a closure.\n",
434 "\n",
435 "Now, we can resolve the first closure on the engine, the second here, and the third on a different engine,\n",
436 "after passing through a lambda we define here and call there, just for good measure."
437 ]
438 },
439 {
440 "cell_type": "code",
441 "collapsed": false,
442 "input": [
443 "view.apply_sync(lambda f: f(3),view.apply_sync(outer, 1)(2))"
444 ],
445 "language": "python",
446 "metadata": {},
447 "outputs": [
448 {
449 "metadata": {},
450 "output_type": "pyout",
451 "prompt_number": 16,
452 "text": [
453 "6"
454 ]
455 }
456 ],
457 "prompt_number": 16
458 }
459 ],
460 "metadata": {}
461 }
462 ]
463 } No newline at end of file
@@ -1,1114 +1,1125 b''
1 """Views of remote engines.
1 """Views of remote engines.
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 from __future__ import print_function
7 from __future__ import print_function
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2010-2011 The IPython Development Team
9 # Copyright (C) 2010-2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 import imp
19 import imp
20 import sys
20 import sys
21 import warnings
21 import warnings
22 from contextlib import contextmanager
22 from contextlib import contextmanager
23 from types import ModuleType
23 from types import ModuleType
24
24
25 import zmq
25 import zmq
26
26
27 from IPython.testing.skipdoctest import skip_doctest
27 from IPython.testing.skipdoctest import skip_doctest
28 from IPython.utils import pickleutil
28 from IPython.utils.traitlets import (
29 from IPython.utils.traitlets import (
29 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
30 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
30 )
31 )
31 from IPython.external.decorator import decorator
32 from IPython.external.decorator import decorator
32
33
33 from IPython.parallel import util
34 from IPython.parallel import util
34 from IPython.parallel.controller.dependency import Dependency, dependent
35 from IPython.parallel.controller.dependency import Dependency, dependent
35 from IPython.utils.py3compat import string_types, iteritems, PY3
36 from IPython.utils.py3compat import string_types, iteritems, PY3
36
37
37 from . import map as Map
38 from . import map as Map
38 from .asyncresult import AsyncResult, AsyncMapResult
39 from .asyncresult import AsyncResult, AsyncMapResult
39 from .remotefunction import ParallelFunction, parallel, remote, getname
40 from .remotefunction import ParallelFunction, parallel, remote, getname
40
41
41 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
42 # Decorators
43 # Decorators
43 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
44
45
45 @decorator
46 @decorator
46 def save_ids(f, self, *args, **kwargs):
47 def save_ids(f, self, *args, **kwargs):
47 """Keep our history and outstanding attributes up to date after a method call."""
48 """Keep our history and outstanding attributes up to date after a method call."""
48 n_previous = len(self.client.history)
49 n_previous = len(self.client.history)
49 try:
50 try:
50 ret = f(self, *args, **kwargs)
51 ret = f(self, *args, **kwargs)
51 finally:
52 finally:
52 nmsgs = len(self.client.history) - n_previous
53 nmsgs = len(self.client.history) - n_previous
53 msg_ids = self.client.history[-nmsgs:]
54 msg_ids = self.client.history[-nmsgs:]
54 self.history.extend(msg_ids)
55 self.history.extend(msg_ids)
55 self.outstanding.update(msg_ids)
56 self.outstanding.update(msg_ids)
56 return ret
57 return ret
57
58
58 @decorator
59 @decorator
59 def sync_results(f, self, *args, **kwargs):
60 def sync_results(f, self, *args, **kwargs):
60 """sync relevant results from self.client to our results attribute."""
61 """sync relevant results from self.client to our results attribute."""
61 if self._in_sync_results:
62 if self._in_sync_results:
62 return f(self, *args, **kwargs)
63 return f(self, *args, **kwargs)
63 self._in_sync_results = True
64 self._in_sync_results = True
64 try:
65 try:
65 ret = f(self, *args, **kwargs)
66 ret = f(self, *args, **kwargs)
66 finally:
67 finally:
67 self._in_sync_results = False
68 self._in_sync_results = False
68 self._sync_results()
69 self._sync_results()
69 return ret
70 return ret
70
71
71 @decorator
72 @decorator
72 def spin_after(f, self, *args, **kwargs):
73 def spin_after(f, self, *args, **kwargs):
73 """call spin after the method."""
74 """call spin after the method."""
74 ret = f(self, *args, **kwargs)
75 ret = f(self, *args, **kwargs)
75 self.spin()
76 self.spin()
76 return ret
77 return ret
77
78
78 #-----------------------------------------------------------------------------
79 #-----------------------------------------------------------------------------
79 # Classes
80 # Classes
80 #-----------------------------------------------------------------------------
81 #-----------------------------------------------------------------------------
81
82
82 @skip_doctest
83 @skip_doctest
83 class View(HasTraits):
84 class View(HasTraits):
84 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
85 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
85
86
86 Don't use this class, use subclasses.
87 Don't use this class, use subclasses.
87
88
88 Methods
89 Methods
89 -------
90 -------
90
91
91 spin
92 spin
92 flushes incoming results and registration state changes
93 flushes incoming results and registration state changes
93 control methods spin, and requesting `ids` also ensures up to date
94 control methods spin, and requesting `ids` also ensures up to date
94
95
95 wait
96 wait
96 wait on one or more msg_ids
97 wait on one or more msg_ids
97
98
98 execution methods
99 execution methods
99 apply
100 apply
100 legacy: execute, run
101 legacy: execute, run
101
102
102 data movement
103 data movement
103 push, pull, scatter, gather
104 push, pull, scatter, gather
104
105
105 query methods
106 query methods
106 get_result, queue_status, purge_results, result_status
107 get_result, queue_status, purge_results, result_status
107
108
108 control methods
109 control methods
109 abort, shutdown
110 abort, shutdown
110
111
111 """
112 """
112 # flags
113 # flags
113 block=Bool(False)
114 block=Bool(False)
114 track=Bool(True)
115 track=Bool(True)
115 targets = Any()
116 targets = Any()
116
117
117 history=List()
118 history=List()
118 outstanding = Set()
119 outstanding = Set()
119 results = Dict()
120 results = Dict()
120 client = Instance('IPython.parallel.Client')
121 client = Instance('IPython.parallel.Client')
121
122
122 _socket = Instance('zmq.Socket')
123 _socket = Instance('zmq.Socket')
123 _flag_names = List(['targets', 'block', 'track'])
124 _flag_names = List(['targets', 'block', 'track'])
124 _in_sync_results = Bool(False)
125 _in_sync_results = Bool(False)
125 _targets = Any()
126 _targets = Any()
126 _idents = Any()
127 _idents = Any()
127
128
128 def __init__(self, client=None, socket=None, **flags):
129 def __init__(self, client=None, socket=None, **flags):
129 super(View, self).__init__(client=client, _socket=socket)
130 super(View, self).__init__(client=client, _socket=socket)
130 self.results = client.results
131 self.results = client.results
131 self.block = client.block
132 self.block = client.block
132
133
133 self.set_flags(**flags)
134 self.set_flags(**flags)
134
135
135 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
136 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
136
137
137 def __repr__(self):
138 def __repr__(self):
138 strtargets = str(self.targets)
139 strtargets = str(self.targets)
139 if len(strtargets) > 16:
140 if len(strtargets) > 16:
140 strtargets = strtargets[:12]+'...]'
141 strtargets = strtargets[:12]+'...]'
141 return "<%s %s>"%(self.__class__.__name__, strtargets)
142 return "<%s %s>"%(self.__class__.__name__, strtargets)
142
143
143 def __len__(self):
144 def __len__(self):
144 if isinstance(self.targets, list):
145 if isinstance(self.targets, list):
145 return len(self.targets)
146 return len(self.targets)
146 elif isinstance(self.targets, int):
147 elif isinstance(self.targets, int):
147 return 1
148 return 1
148 else:
149 else:
149 return len(self.client)
150 return len(self.client)
150
151
151 def set_flags(self, **kwargs):
152 def set_flags(self, **kwargs):
152 """set my attribute flags by keyword.
153 """set my attribute flags by keyword.
153
154
154 Views determine behavior with a few attributes (`block`, `track`, etc.).
155 Views determine behavior with a few attributes (`block`, `track`, etc.).
155 These attributes can be set all at once by name with this method.
156 These attributes can be set all at once by name with this method.
156
157
157 Parameters
158 Parameters
158 ----------
159 ----------
159
160
160 block : bool
161 block : bool
161 whether to wait for results
162 whether to wait for results
162 track : bool
163 track : bool
163 whether to create a MessageTracker to allow the user to
164 whether to create a MessageTracker to allow the user to
164 safely edit after arrays and buffers during non-copying
165 safely edit after arrays and buffers during non-copying
165 sends.
166 sends.
166 """
167 """
167 for name, value in iteritems(kwargs):
168 for name, value in iteritems(kwargs):
168 if name not in self._flag_names:
169 if name not in self._flag_names:
169 raise KeyError("Invalid name: %r"%name)
170 raise KeyError("Invalid name: %r"%name)
170 else:
171 else:
171 setattr(self, name, value)
172 setattr(self, name, value)
172
173
173 @contextmanager
174 @contextmanager
174 def temp_flags(self, **kwargs):
175 def temp_flags(self, **kwargs):
175 """temporarily set flags, for use in `with` statements.
176 """temporarily set flags, for use in `with` statements.
176
177
177 See set_flags for permanent setting of flags
178 See set_flags for permanent setting of flags
178
179
179 Examples
180 Examples
180 --------
181 --------
181
182
182 >>> view.track=False
183 >>> view.track=False
183 ...
184 ...
184 >>> with view.temp_flags(track=True):
185 >>> with view.temp_flags(track=True):
185 ... ar = view.apply(dostuff, my_big_array)
186 ... ar = view.apply(dostuff, my_big_array)
186 ... ar.tracker.wait() # wait for send to finish
187 ... ar.tracker.wait() # wait for send to finish
187 >>> view.track
188 >>> view.track
188 False
189 False
189
190
190 """
191 """
191 # preflight: save flags, and set temporaries
192 # preflight: save flags, and set temporaries
192 saved_flags = {}
193 saved_flags = {}
193 for f in self._flag_names:
194 for f in self._flag_names:
194 saved_flags[f] = getattr(self, f)
195 saved_flags[f] = getattr(self, f)
195 self.set_flags(**kwargs)
196 self.set_flags(**kwargs)
196 # yield to the with-statement block
197 # yield to the with-statement block
197 try:
198 try:
198 yield
199 yield
199 finally:
200 finally:
200 # postflight: restore saved flags
201 # postflight: restore saved flags
201 self.set_flags(**saved_flags)
202 self.set_flags(**saved_flags)
202
203
203
204
204 #----------------------------------------------------------------
205 #----------------------------------------------------------------
205 # apply
206 # apply
206 #----------------------------------------------------------------
207 #----------------------------------------------------------------
207
208
208 def _sync_results(self):
209 def _sync_results(self):
209 """to be called by @sync_results decorator
210 """to be called by @sync_results decorator
210
211
211 after submitting any tasks.
212 after submitting any tasks.
212 """
213 """
213 delta = self.outstanding.difference(self.client.outstanding)
214 delta = self.outstanding.difference(self.client.outstanding)
214 completed = self.outstanding.intersection(delta)
215 completed = self.outstanding.intersection(delta)
215 self.outstanding = self.outstanding.difference(completed)
216 self.outstanding = self.outstanding.difference(completed)
216
217
217 @sync_results
218 @sync_results
218 @save_ids
219 @save_ids
219 def _really_apply(self, f, args, kwargs, block=None, **options):
220 def _really_apply(self, f, args, kwargs, block=None, **options):
220 """wrapper for client.send_apply_request"""
221 """wrapper for client.send_apply_request"""
221 raise NotImplementedError("Implement in subclasses")
222 raise NotImplementedError("Implement in subclasses")
222
223
223 def apply(self, f, *args, **kwargs):
224 def apply(self, f, *args, **kwargs):
224 """calls ``f(*args, **kwargs)`` on remote engines, returning the result.
225 """calls ``f(*args, **kwargs)`` on remote engines, returning the result.
225
226
226 This method sets all apply flags via this View's attributes.
227 This method sets all apply flags via this View's attributes.
227
228
228 Returns :class:`~IPython.parallel.client.asyncresult.AsyncResult`
229 Returns :class:`~IPython.parallel.client.asyncresult.AsyncResult`
229 instance if ``self.block`` is False, otherwise the return value of
230 instance if ``self.block`` is False, otherwise the return value of
230 ``f(*args, **kwargs)``.
231 ``f(*args, **kwargs)``.
231 """
232 """
232 return self._really_apply(f, args, kwargs)
233 return self._really_apply(f, args, kwargs)
233
234
234 def apply_async(self, f, *args, **kwargs):
235 def apply_async(self, f, *args, **kwargs):
235 """calls ``f(*args, **kwargs)`` on remote engines in a nonblocking manner.
236 """calls ``f(*args, **kwargs)`` on remote engines in a nonblocking manner.
236
237
237 Returns :class:`~IPython.parallel.client.asyncresult.AsyncResult` instance.
238 Returns :class:`~IPython.parallel.client.asyncresult.AsyncResult` instance.
238 """
239 """
239 return self._really_apply(f, args, kwargs, block=False)
240 return self._really_apply(f, args, kwargs, block=False)
240
241
241 @spin_after
242 @spin_after
242 def apply_sync(self, f, *args, **kwargs):
243 def apply_sync(self, f, *args, **kwargs):
243 """calls ``f(*args, **kwargs)`` on remote engines in a blocking manner,
244 """calls ``f(*args, **kwargs)`` on remote engines in a blocking manner,
244 returning the result.
245 returning the result.
245 """
246 """
246 return self._really_apply(f, args, kwargs, block=True)
247 return self._really_apply(f, args, kwargs, block=True)
247
248
248 #----------------------------------------------------------------
249 #----------------------------------------------------------------
249 # wrappers for client and control methods
250 # wrappers for client and control methods
250 #----------------------------------------------------------------
251 #----------------------------------------------------------------
251 @sync_results
252 @sync_results
252 def spin(self):
253 def spin(self):
253 """spin the client, and sync"""
254 """spin the client, and sync"""
254 self.client.spin()
255 self.client.spin()
255
256
256 @sync_results
257 @sync_results
257 def wait(self, jobs=None, timeout=-1):
258 def wait(self, jobs=None, timeout=-1):
258 """waits on one or more `jobs`, for up to `timeout` seconds.
259 """waits on one or more `jobs`, for up to `timeout` seconds.
259
260
260 Parameters
261 Parameters
261 ----------
262 ----------
262
263
263 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
264 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
264 ints are indices to self.history
265 ints are indices to self.history
265 strs are msg_ids
266 strs are msg_ids
266 default: wait on all outstanding messages
267 default: wait on all outstanding messages
267 timeout : float
268 timeout : float
268 a time in seconds, after which to give up.
269 a time in seconds, after which to give up.
269 default is -1, which means no timeout
270 default is -1, which means no timeout
270
271
271 Returns
272 Returns
272 -------
273 -------
273
274
274 True : when all msg_ids are done
275 True : when all msg_ids are done
275 False : timeout reached, some msg_ids still outstanding
276 False : timeout reached, some msg_ids still outstanding
276 """
277 """
277 if jobs is None:
278 if jobs is None:
278 jobs = self.history
279 jobs = self.history
279 return self.client.wait(jobs, timeout)
280 return self.client.wait(jobs, timeout)
280
281
281 def abort(self, jobs=None, targets=None, block=None):
282 def abort(self, jobs=None, targets=None, block=None):
282 """Abort jobs on my engines.
283 """Abort jobs on my engines.
283
284
284 Parameters
285 Parameters
285 ----------
286 ----------
286
287
287 jobs : None, str, list of strs, optional
288 jobs : None, str, list of strs, optional
288 if None: abort all jobs.
289 if None: abort all jobs.
289 else: abort specific msg_id(s).
290 else: abort specific msg_id(s).
290 """
291 """
291 block = block if block is not None else self.block
292 block = block if block is not None else self.block
292 targets = targets if targets is not None else self.targets
293 targets = targets if targets is not None else self.targets
293 jobs = jobs if jobs is not None else list(self.outstanding)
294 jobs = jobs if jobs is not None else list(self.outstanding)
294
295
295 return self.client.abort(jobs=jobs, targets=targets, block=block)
296 return self.client.abort(jobs=jobs, targets=targets, block=block)
296
297
297 def queue_status(self, targets=None, verbose=False):
298 def queue_status(self, targets=None, verbose=False):
298 """Fetch the Queue status of my engines"""
299 """Fetch the Queue status of my engines"""
299 targets = targets if targets is not None else self.targets
300 targets = targets if targets is not None else self.targets
300 return self.client.queue_status(targets=targets, verbose=verbose)
301 return self.client.queue_status(targets=targets, verbose=verbose)
301
302
302 def purge_results(self, jobs=[], targets=[]):
303 def purge_results(self, jobs=[], targets=[]):
303 """Instruct the controller to forget specific results."""
304 """Instruct the controller to forget specific results."""
304 if targets is None or targets == 'all':
305 if targets is None or targets == 'all':
305 targets = self.targets
306 targets = self.targets
306 return self.client.purge_results(jobs=jobs, targets=targets)
307 return self.client.purge_results(jobs=jobs, targets=targets)
307
308
308 def shutdown(self, targets=None, restart=False, hub=False, block=None):
309 def shutdown(self, targets=None, restart=False, hub=False, block=None):
309 """Terminates one or more engine processes, optionally including the hub.
310 """Terminates one or more engine processes, optionally including the hub.
310 """
311 """
311 block = self.block if block is None else block
312 block = self.block if block is None else block
312 if targets is None or targets == 'all':
313 if targets is None or targets == 'all':
313 targets = self.targets
314 targets = self.targets
314 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
315 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
315
316
316 @spin_after
317 @spin_after
317 def get_result(self, indices_or_msg_ids=None):
318 def get_result(self, indices_or_msg_ids=None):
318 """return one or more results, specified by history index or msg_id.
319 """return one or more results, specified by history index or msg_id.
319
320
320 See :meth:`IPython.parallel.client.client.Client.get_result` for details.
321 See :meth:`IPython.parallel.client.client.Client.get_result` for details.
321 """
322 """
322
323
323 if indices_or_msg_ids is None:
324 if indices_or_msg_ids is None:
324 indices_or_msg_ids = -1
325 indices_or_msg_ids = -1
325 if isinstance(indices_or_msg_ids, int):
326 if isinstance(indices_or_msg_ids, int):
326 indices_or_msg_ids = self.history[indices_or_msg_ids]
327 indices_or_msg_ids = self.history[indices_or_msg_ids]
327 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
328 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
328 indices_or_msg_ids = list(indices_or_msg_ids)
329 indices_or_msg_ids = list(indices_or_msg_ids)
329 for i,index in enumerate(indices_or_msg_ids):
330 for i,index in enumerate(indices_or_msg_ids):
330 if isinstance(index, int):
331 if isinstance(index, int):
331 indices_or_msg_ids[i] = self.history[index]
332 indices_or_msg_ids[i] = self.history[index]
332 return self.client.get_result(indices_or_msg_ids)
333 return self.client.get_result(indices_or_msg_ids)
333
334
334 #-------------------------------------------------------------------
335 #-------------------------------------------------------------------
335 # Map
336 # Map
336 #-------------------------------------------------------------------
337 #-------------------------------------------------------------------
337
338
338 @sync_results
339 @sync_results
339 def map(self, f, *sequences, **kwargs):
340 def map(self, f, *sequences, **kwargs):
340 """override in subclasses"""
341 """override in subclasses"""
341 raise NotImplementedError
342 raise NotImplementedError
342
343
343 def map_async(self, f, *sequences, **kwargs):
344 def map_async(self, f, *sequences, **kwargs):
344 """Parallel version of builtin :func:`python:map`, using this view's engines.
345 """Parallel version of builtin :func:`python:map`, using this view's engines.
345
346
346 This is equivalent to ``map(...block=False)``.
347 This is equivalent to ``map(...block=False)``.
347
348
348 See `self.map` for details.
349 See `self.map` for details.
349 """
350 """
350 if 'block' in kwargs:
351 if 'block' in kwargs:
351 raise TypeError("map_async doesn't take a `block` keyword argument.")
352 raise TypeError("map_async doesn't take a `block` keyword argument.")
352 kwargs['block'] = False
353 kwargs['block'] = False
353 return self.map(f,*sequences,**kwargs)
354 return self.map(f,*sequences,**kwargs)
354
355
355 def map_sync(self, f, *sequences, **kwargs):
356 def map_sync(self, f, *sequences, **kwargs):
356 """Parallel version of builtin :func:`python:map`, using this view's engines.
357 """Parallel version of builtin :func:`python:map`, using this view's engines.
357
358
358 This is equivalent to ``map(...block=True)``.
359 This is equivalent to ``map(...block=True)``.
359
360
360 See `self.map` for details.
361 See `self.map` for details.
361 """
362 """
362 if 'block' in kwargs:
363 if 'block' in kwargs:
363 raise TypeError("map_sync doesn't take a `block` keyword argument.")
364 raise TypeError("map_sync doesn't take a `block` keyword argument.")
364 kwargs['block'] = True
365 kwargs['block'] = True
365 return self.map(f,*sequences,**kwargs)
366 return self.map(f,*sequences,**kwargs)
366
367
367 def imap(self, f, *sequences, **kwargs):
368 def imap(self, f, *sequences, **kwargs):
368 """Parallel version of :func:`itertools.imap`.
369 """Parallel version of :func:`itertools.imap`.
369
370
370 See `self.map` for details.
371 See `self.map` for details.
371
372
372 """
373 """
373
374
374 return iter(self.map_async(f,*sequences, **kwargs))
375 return iter(self.map_async(f,*sequences, **kwargs))
375
376
376 #-------------------------------------------------------------------
377 #-------------------------------------------------------------------
377 # Decorators
378 # Decorators
378 #-------------------------------------------------------------------
379 #-------------------------------------------------------------------
379
380
380 def remote(self, block=None, **flags):
381 def remote(self, block=None, **flags):
381 """Decorator for making a RemoteFunction"""
382 """Decorator for making a RemoteFunction"""
382 block = self.block if block is None else block
383 block = self.block if block is None else block
383 return remote(self, block=block, **flags)
384 return remote(self, block=block, **flags)
384
385
385 def parallel(self, dist='b', block=None, **flags):
386 def parallel(self, dist='b', block=None, **flags):
386 """Decorator for making a ParallelFunction"""
387 """Decorator for making a ParallelFunction"""
387 block = self.block if block is None else block
388 block = self.block if block is None else block
388 return parallel(self, dist=dist, block=block, **flags)
389 return parallel(self, dist=dist, block=block, **flags)
389
390
390 @skip_doctest
391 @skip_doctest
391 class DirectView(View):
392 class DirectView(View):
392 """Direct Multiplexer View of one or more engines.
393 """Direct Multiplexer View of one or more engines.
393
394
394 These are created via indexed access to a client:
395 These are created via indexed access to a client:
395
396
396 >>> dv_1 = client[1]
397 >>> dv_1 = client[1]
397 >>> dv_all = client[:]
398 >>> dv_all = client[:]
398 >>> dv_even = client[::2]
399 >>> dv_even = client[::2]
399 >>> dv_some = client[1:3]
400 >>> dv_some = client[1:3]
400
401
401 This object provides dictionary access to engine namespaces:
402 This object provides dictionary access to engine namespaces:
402
403
403 # push a=5:
404 # push a=5:
404 >>> dv['a'] = 5
405 >>> dv['a'] = 5
405 # pull 'foo':
406 # pull 'foo':
406 >>> db['foo']
407 >>> db['foo']
407
408
408 """
409 """
409
410
410 def __init__(self, client=None, socket=None, targets=None):
411 def __init__(self, client=None, socket=None, targets=None):
411 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
412 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
412
413
413 @property
414 @property
414 def importer(self):
415 def importer(self):
415 """sync_imports(local=True) as a property.
416 """sync_imports(local=True) as a property.
416
417
417 See sync_imports for details.
418 See sync_imports for details.
418
419
419 """
420 """
420 return self.sync_imports(True)
421 return self.sync_imports(True)
421
422
422 @contextmanager
423 @contextmanager
423 def sync_imports(self, local=True, quiet=False):
424 def sync_imports(self, local=True, quiet=False):
424 """Context Manager for performing simultaneous local and remote imports.
425 """Context Manager for performing simultaneous local and remote imports.
425
426
426 'import x as y' will *not* work. The 'as y' part will simply be ignored.
427 'import x as y' will *not* work. The 'as y' part will simply be ignored.
427
428
428 If `local=True`, then the package will also be imported locally.
429 If `local=True`, then the package will also be imported locally.
429
430
430 If `quiet=True`, no output will be produced when attempting remote
431 If `quiet=True`, no output will be produced when attempting remote
431 imports.
432 imports.
432
433
433 Note that remote-only (`local=False`) imports have not been implemented.
434 Note that remote-only (`local=False`) imports have not been implemented.
434
435
435 >>> with view.sync_imports():
436 >>> with view.sync_imports():
436 ... from numpy import recarray
437 ... from numpy import recarray
437 importing recarray from numpy on engine(s)
438 importing recarray from numpy on engine(s)
438
439
439 """
440 """
440 from IPython.utils.py3compat import builtin_mod
441 from IPython.utils.py3compat import builtin_mod
441 local_import = builtin_mod.__import__
442 local_import = builtin_mod.__import__
442 modules = set()
443 modules = set()
443 results = []
444 results = []
444 @util.interactive
445 @util.interactive
445 def remote_import(name, fromlist, level):
446 def remote_import(name, fromlist, level):
446 """the function to be passed to apply, that actually performs the import
447 """the function to be passed to apply, that actually performs the import
447 on the engine, and loads up the user namespace.
448 on the engine, and loads up the user namespace.
448 """
449 """
449 import sys
450 import sys
450 user_ns = globals()
451 user_ns = globals()
451 mod = __import__(name, fromlist=fromlist, level=level)
452 mod = __import__(name, fromlist=fromlist, level=level)
452 if fromlist:
453 if fromlist:
453 for key in fromlist:
454 for key in fromlist:
454 user_ns[key] = getattr(mod, key)
455 user_ns[key] = getattr(mod, key)
455 else:
456 else:
456 user_ns[name] = sys.modules[name]
457 user_ns[name] = sys.modules[name]
457
458
458 def view_import(name, globals={}, locals={}, fromlist=[], level=0):
459 def view_import(name, globals={}, locals={}, fromlist=[], level=0):
459 """the drop-in replacement for __import__, that optionally imports
460 """the drop-in replacement for __import__, that optionally imports
460 locally as well.
461 locally as well.
461 """
462 """
462 # don't override nested imports
463 # don't override nested imports
463 save_import = builtin_mod.__import__
464 save_import = builtin_mod.__import__
464 builtin_mod.__import__ = local_import
465 builtin_mod.__import__ = local_import
465
466
466 if imp.lock_held():
467 if imp.lock_held():
467 # this is a side-effect import, don't do it remotely, or even
468 # this is a side-effect import, don't do it remotely, or even
468 # ignore the local effects
469 # ignore the local effects
469 return local_import(name, globals, locals, fromlist, level)
470 return local_import(name, globals, locals, fromlist, level)
470
471
471 imp.acquire_lock()
472 imp.acquire_lock()
472 if local:
473 if local:
473 mod = local_import(name, globals, locals, fromlist, level)
474 mod = local_import(name, globals, locals, fromlist, level)
474 else:
475 else:
475 raise NotImplementedError("remote-only imports not yet implemented")
476 raise NotImplementedError("remote-only imports not yet implemented")
476 imp.release_lock()
477 imp.release_lock()
477
478
478 key = name+':'+','.join(fromlist or [])
479 key = name+':'+','.join(fromlist or [])
479 if level <= 0 and key not in modules:
480 if level <= 0 and key not in modules:
480 modules.add(key)
481 modules.add(key)
481 if not quiet:
482 if not quiet:
482 if fromlist:
483 if fromlist:
483 print("importing %s from %s on engine(s)"%(','.join(fromlist), name))
484 print("importing %s from %s on engine(s)"%(','.join(fromlist), name))
484 else:
485 else:
485 print("importing %s on engine(s)"%name)
486 print("importing %s on engine(s)"%name)
486 results.append(self.apply_async(remote_import, name, fromlist, level))
487 results.append(self.apply_async(remote_import, name, fromlist, level))
487 # restore override
488 # restore override
488 builtin_mod.__import__ = save_import
489 builtin_mod.__import__ = save_import
489
490
490 return mod
491 return mod
491
492
492 # override __import__
493 # override __import__
493 builtin_mod.__import__ = view_import
494 builtin_mod.__import__ = view_import
494 try:
495 try:
495 # enter the block
496 # enter the block
496 yield
497 yield
497 except ImportError:
498 except ImportError:
498 if local:
499 if local:
499 raise
500 raise
500 else:
501 else:
501 # ignore import errors if not doing local imports
502 # ignore import errors if not doing local imports
502 pass
503 pass
503 finally:
504 finally:
504 # always restore __import__
505 # always restore __import__
505 builtin_mod.__import__ = local_import
506 builtin_mod.__import__ = local_import
506
507
507 for r in results:
508 for r in results:
508 # raise possible remote ImportErrors here
509 # raise possible remote ImportErrors here
509 r.get()
510 r.get()
511
512 def use_dill(self):
513 """Expand serialization support with dill
514
515 adds support for closures, etc.
516
517 This calls IPython.utils.pickleutil.use_dill() here and on each engine.
518 """
519 pickleutil.use_dill()
520 return self.apply(pickleutil.use_dill)
510
521
511
522
512 @sync_results
523 @sync_results
513 @save_ids
524 @save_ids
514 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
525 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
515 """calls f(*args, **kwargs) on remote engines, returning the result.
526 """calls f(*args, **kwargs) on remote engines, returning the result.
516
527
517 This method sets all of `apply`'s flags via this View's attributes.
528 This method sets all of `apply`'s flags via this View's attributes.
518
529
519 Parameters
530 Parameters
520 ----------
531 ----------
521
532
522 f : callable
533 f : callable
523
534
524 args : list [default: empty]
535 args : list [default: empty]
525
536
526 kwargs : dict [default: empty]
537 kwargs : dict [default: empty]
527
538
528 targets : target list [default: self.targets]
539 targets : target list [default: self.targets]
529 where to run
540 where to run
530 block : bool [default: self.block]
541 block : bool [default: self.block]
531 whether to block
542 whether to block
532 track : bool [default: self.track]
543 track : bool [default: self.track]
533 whether to ask zmq to track the message, for safe non-copying sends
544 whether to ask zmq to track the message, for safe non-copying sends
534
545
535 Returns
546 Returns
536 -------
547 -------
537
548
538 if self.block is False:
549 if self.block is False:
539 returns AsyncResult
550 returns AsyncResult
540 else:
551 else:
541 returns actual result of f(*args, **kwargs) on the engine(s)
552 returns actual result of f(*args, **kwargs) on the engine(s)
542 This will be a list of self.targets is also a list (even length 1), or
553 This will be a list of self.targets is also a list (even length 1), or
543 the single result if self.targets is an integer engine id
554 the single result if self.targets is an integer engine id
544 """
555 """
545 args = [] if args is None else args
556 args = [] if args is None else args
546 kwargs = {} if kwargs is None else kwargs
557 kwargs = {} if kwargs is None else kwargs
547 block = self.block if block is None else block
558 block = self.block if block is None else block
548 track = self.track if track is None else track
559 track = self.track if track is None else track
549 targets = self.targets if targets is None else targets
560 targets = self.targets if targets is None else targets
550
561
551 _idents, _targets = self.client._build_targets(targets)
562 _idents, _targets = self.client._build_targets(targets)
552 msg_ids = []
563 msg_ids = []
553 trackers = []
564 trackers = []
554 for ident in _idents:
565 for ident in _idents:
555 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
566 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
556 ident=ident)
567 ident=ident)
557 if track:
568 if track:
558 trackers.append(msg['tracker'])
569 trackers.append(msg['tracker'])
559 msg_ids.append(msg['header']['msg_id'])
570 msg_ids.append(msg['header']['msg_id'])
560 if isinstance(targets, int):
571 if isinstance(targets, int):
561 msg_ids = msg_ids[0]
572 msg_ids = msg_ids[0]
562 tracker = None if track is False else zmq.MessageTracker(*trackers)
573 tracker = None if track is False else zmq.MessageTracker(*trackers)
563 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=_targets, tracker=tracker)
574 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=_targets, tracker=tracker)
564 if block:
575 if block:
565 try:
576 try:
566 return ar.get()
577 return ar.get()
567 except KeyboardInterrupt:
578 except KeyboardInterrupt:
568 pass
579 pass
569 return ar
580 return ar
570
581
571
582
572 @sync_results
583 @sync_results
573 def map(self, f, *sequences, **kwargs):
584 def map(self, f, *sequences, **kwargs):
574 """``view.map(f, *sequences, block=self.block)`` => list|AsyncMapResult
585 """``view.map(f, *sequences, block=self.block)`` => list|AsyncMapResult
575
586
576 Parallel version of builtin `map`, using this View's `targets`.
587 Parallel version of builtin `map`, using this View's `targets`.
577
588
578 There will be one task per target, so work will be chunked
589 There will be one task per target, so work will be chunked
579 if the sequences are longer than `targets`.
590 if the sequences are longer than `targets`.
580
591
581 Results can be iterated as they are ready, but will become available in chunks.
592 Results can be iterated as they are ready, but will become available in chunks.
582
593
583 Parameters
594 Parameters
584 ----------
595 ----------
585
596
586 f : callable
597 f : callable
587 function to be mapped
598 function to be mapped
588 *sequences: one or more sequences of matching length
599 *sequences: one or more sequences of matching length
589 the sequences to be distributed and passed to `f`
600 the sequences to be distributed and passed to `f`
590 block : bool
601 block : bool
591 whether to wait for the result or not [default self.block]
602 whether to wait for the result or not [default self.block]
592
603
593 Returns
604 Returns
594 -------
605 -------
595
606
596
607
597 If block=False
608 If block=False
598 An :class:`~IPython.parallel.client.asyncresult.AsyncMapResult` instance.
609 An :class:`~IPython.parallel.client.asyncresult.AsyncMapResult` instance.
599 An object like AsyncResult, but which reassembles the sequence of results
610 An object like AsyncResult, but which reassembles the sequence of results
600 into a single list. AsyncMapResults can be iterated through before all
611 into a single list. AsyncMapResults can be iterated through before all
601 results are complete.
612 results are complete.
602 else
613 else
603 A list, the result of ``map(f,*sequences)``
614 A list, the result of ``map(f,*sequences)``
604 """
615 """
605
616
606 block = kwargs.pop('block', self.block)
617 block = kwargs.pop('block', self.block)
607 for k in kwargs.keys():
618 for k in kwargs.keys():
608 if k not in ['block', 'track']:
619 if k not in ['block', 'track']:
609 raise TypeError("invalid keyword arg, %r"%k)
620 raise TypeError("invalid keyword arg, %r"%k)
610
621
611 assert len(sequences) > 0, "must have some sequences to map onto!"
622 assert len(sequences) > 0, "must have some sequences to map onto!"
612 pf = ParallelFunction(self, f, block=block, **kwargs)
623 pf = ParallelFunction(self, f, block=block, **kwargs)
613 return pf.map(*sequences)
624 return pf.map(*sequences)
614
625
615 @sync_results
626 @sync_results
616 @save_ids
627 @save_ids
617 def execute(self, code, silent=True, targets=None, block=None):
628 def execute(self, code, silent=True, targets=None, block=None):
618 """Executes `code` on `targets` in blocking or nonblocking manner.
629 """Executes `code` on `targets` in blocking or nonblocking manner.
619
630
620 ``execute`` is always `bound` (affects engine namespace)
631 ``execute`` is always `bound` (affects engine namespace)
621
632
622 Parameters
633 Parameters
623 ----------
634 ----------
624
635
625 code : str
636 code : str
626 the code string to be executed
637 the code string to be executed
627 block : bool
638 block : bool
628 whether or not to wait until done to return
639 whether or not to wait until done to return
629 default: self.block
640 default: self.block
630 """
641 """
631 block = self.block if block is None else block
642 block = self.block if block is None else block
632 targets = self.targets if targets is None else targets
643 targets = self.targets if targets is None else targets
633
644
634 _idents, _targets = self.client._build_targets(targets)
645 _idents, _targets = self.client._build_targets(targets)
635 msg_ids = []
646 msg_ids = []
636 trackers = []
647 trackers = []
637 for ident in _idents:
648 for ident in _idents:
638 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
649 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
639 msg_ids.append(msg['header']['msg_id'])
650 msg_ids.append(msg['header']['msg_id'])
640 if isinstance(targets, int):
651 if isinstance(targets, int):
641 msg_ids = msg_ids[0]
652 msg_ids = msg_ids[0]
642 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=_targets)
653 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=_targets)
643 if block:
654 if block:
644 try:
655 try:
645 ar.get()
656 ar.get()
646 except KeyboardInterrupt:
657 except KeyboardInterrupt:
647 pass
658 pass
648 return ar
659 return ar
649
660
650 def run(self, filename, targets=None, block=None):
661 def run(self, filename, targets=None, block=None):
651 """Execute contents of `filename` on my engine(s).
662 """Execute contents of `filename` on my engine(s).
652
663
653 This simply reads the contents of the file and calls `execute`.
664 This simply reads the contents of the file and calls `execute`.
654
665
655 Parameters
666 Parameters
656 ----------
667 ----------
657
668
658 filename : str
669 filename : str
659 The path to the file
670 The path to the file
660 targets : int/str/list of ints/strs
671 targets : int/str/list of ints/strs
661 the engines on which to execute
672 the engines on which to execute
662 default : all
673 default : all
663 block : bool
674 block : bool
664 whether or not to wait until done
675 whether or not to wait until done
665 default: self.block
676 default: self.block
666
677
667 """
678 """
668 with open(filename, 'r') as f:
679 with open(filename, 'r') as f:
669 # add newline in case of trailing indented whitespace
680 # add newline in case of trailing indented whitespace
670 # which will cause SyntaxError
681 # which will cause SyntaxError
671 code = f.read()+'\n'
682 code = f.read()+'\n'
672 return self.execute(code, block=block, targets=targets)
683 return self.execute(code, block=block, targets=targets)
673
684
674 def update(self, ns):
685 def update(self, ns):
675 """update remote namespace with dict `ns`
686 """update remote namespace with dict `ns`
676
687
677 See `push` for details.
688 See `push` for details.
678 """
689 """
679 return self.push(ns, block=self.block, track=self.track)
690 return self.push(ns, block=self.block, track=self.track)
680
691
681 def push(self, ns, targets=None, block=None, track=None):
692 def push(self, ns, targets=None, block=None, track=None):
682 """update remote namespace with dict `ns`
693 """update remote namespace with dict `ns`
683
694
684 Parameters
695 Parameters
685 ----------
696 ----------
686
697
687 ns : dict
698 ns : dict
688 dict of keys with which to update engine namespace(s)
699 dict of keys with which to update engine namespace(s)
689 block : bool [default : self.block]
700 block : bool [default : self.block]
690 whether to wait to be notified of engine receipt
701 whether to wait to be notified of engine receipt
691
702
692 """
703 """
693
704
694 block = block if block is not None else self.block
705 block = block if block is not None else self.block
695 track = track if track is not None else self.track
706 track = track if track is not None else self.track
696 targets = targets if targets is not None else self.targets
707 targets = targets if targets is not None else self.targets
697 # applier = self.apply_sync if block else self.apply_async
708 # applier = self.apply_sync if block else self.apply_async
698 if not isinstance(ns, dict):
709 if not isinstance(ns, dict):
699 raise TypeError("Must be a dict, not %s"%type(ns))
710 raise TypeError("Must be a dict, not %s"%type(ns))
700 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
711 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
701
712
702 def get(self, key_s):
713 def get(self, key_s):
703 """get object(s) by `key_s` from remote namespace
714 """get object(s) by `key_s` from remote namespace
704
715
705 see `pull` for details.
716 see `pull` for details.
706 """
717 """
707 # block = block if block is not None else self.block
718 # block = block if block is not None else self.block
708 return self.pull(key_s, block=True)
719 return self.pull(key_s, block=True)
709
720
710 def pull(self, names, targets=None, block=None):
721 def pull(self, names, targets=None, block=None):
711 """get object(s) by `name` from remote namespace
722 """get object(s) by `name` from remote namespace
712
723
713 will return one object if it is a key.
724 will return one object if it is a key.
714 can also take a list of keys, in which case it will return a list of objects.
725 can also take a list of keys, in which case it will return a list of objects.
715 """
726 """
716 block = block if block is not None else self.block
727 block = block if block is not None else self.block
717 targets = targets if targets is not None else self.targets
728 targets = targets if targets is not None else self.targets
718 applier = self.apply_sync if block else self.apply_async
729 applier = self.apply_sync if block else self.apply_async
719 if isinstance(names, string_types):
730 if isinstance(names, string_types):
720 pass
731 pass
721 elif isinstance(names, (list,tuple,set)):
732 elif isinstance(names, (list,tuple,set)):
722 for key in names:
733 for key in names:
723 if not isinstance(key, string_types):
734 if not isinstance(key, string_types):
724 raise TypeError("keys must be str, not type %r"%type(key))
735 raise TypeError("keys must be str, not type %r"%type(key))
725 else:
736 else:
726 raise TypeError("names must be strs, not %r"%names)
737 raise TypeError("names must be strs, not %r"%names)
727 return self._really_apply(util._pull, (names,), block=block, targets=targets)
738 return self._really_apply(util._pull, (names,), block=block, targets=targets)
728
739
729 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
740 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
730 """
741 """
731 Partition a Python sequence and send the partitions to a set of engines.
742 Partition a Python sequence and send the partitions to a set of engines.
732 """
743 """
733 block = block if block is not None else self.block
744 block = block if block is not None else self.block
734 track = track if track is not None else self.track
745 track = track if track is not None else self.track
735 targets = targets if targets is not None else self.targets
746 targets = targets if targets is not None else self.targets
736
747
737 # construct integer ID list:
748 # construct integer ID list:
738 targets = self.client._build_targets(targets)[1]
749 targets = self.client._build_targets(targets)[1]
739
750
740 mapObject = Map.dists[dist]()
751 mapObject = Map.dists[dist]()
741 nparts = len(targets)
752 nparts = len(targets)
742 msg_ids = []
753 msg_ids = []
743 trackers = []
754 trackers = []
744 for index, engineid in enumerate(targets):
755 for index, engineid in enumerate(targets):
745 partition = mapObject.getPartition(seq, index, nparts)
756 partition = mapObject.getPartition(seq, index, nparts)
746 if flatten and len(partition) == 1:
757 if flatten and len(partition) == 1:
747 ns = {key: partition[0]}
758 ns = {key: partition[0]}
748 else:
759 else:
749 ns = {key: partition}
760 ns = {key: partition}
750 r = self.push(ns, block=False, track=track, targets=engineid)
761 r = self.push(ns, block=False, track=track, targets=engineid)
751 msg_ids.extend(r.msg_ids)
762 msg_ids.extend(r.msg_ids)
752 if track:
763 if track:
753 trackers.append(r._tracker)
764 trackers.append(r._tracker)
754
765
755 if track:
766 if track:
756 tracker = zmq.MessageTracker(*trackers)
767 tracker = zmq.MessageTracker(*trackers)
757 else:
768 else:
758 tracker = None
769 tracker = None
759
770
760 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
771 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
761 if block:
772 if block:
762 r.wait()
773 r.wait()
763 else:
774 else:
764 return r
775 return r
765
776
766 @sync_results
777 @sync_results
767 @save_ids
778 @save_ids
768 def gather(self, key, dist='b', targets=None, block=None):
779 def gather(self, key, dist='b', targets=None, block=None):
769 """
780 """
770 Gather a partitioned sequence on a set of engines as a single local seq.
781 Gather a partitioned sequence on a set of engines as a single local seq.
771 """
782 """
772 block = block if block is not None else self.block
783 block = block if block is not None else self.block
773 targets = targets if targets is not None else self.targets
784 targets = targets if targets is not None else self.targets
774 mapObject = Map.dists[dist]()
785 mapObject = Map.dists[dist]()
775 msg_ids = []
786 msg_ids = []
776
787
777 # construct integer ID list:
788 # construct integer ID list:
778 targets = self.client._build_targets(targets)[1]
789 targets = self.client._build_targets(targets)[1]
779
790
780 for index, engineid in enumerate(targets):
791 for index, engineid in enumerate(targets):
781 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
792 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
782
793
783 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
794 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
784
795
785 if block:
796 if block:
786 try:
797 try:
787 return r.get()
798 return r.get()
788 except KeyboardInterrupt:
799 except KeyboardInterrupt:
789 pass
800 pass
790 return r
801 return r
791
802
792 def __getitem__(self, key):
803 def __getitem__(self, key):
793 return self.get(key)
804 return self.get(key)
794
805
795 def __setitem__(self,key, value):
806 def __setitem__(self,key, value):
796 self.update({key:value})
807 self.update({key:value})
797
808
798 def clear(self, targets=None, block=None):
809 def clear(self, targets=None, block=None):
799 """Clear the remote namespaces on my engines."""
810 """Clear the remote namespaces on my engines."""
800 block = block if block is not None else self.block
811 block = block if block is not None else self.block
801 targets = targets if targets is not None else self.targets
812 targets = targets if targets is not None else self.targets
802 return self.client.clear(targets=targets, block=block)
813 return self.client.clear(targets=targets, block=block)
803
814
804 #----------------------------------------
815 #----------------------------------------
805 # activate for %px, %autopx, etc. magics
816 # activate for %px, %autopx, etc. magics
806 #----------------------------------------
817 #----------------------------------------
807
818
808 def activate(self, suffix=''):
819 def activate(self, suffix=''):
809 """Activate IPython magics associated with this View
820 """Activate IPython magics associated with this View
810
821
811 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig`
822 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig`
812
823
813 Parameters
824 Parameters
814 ----------
825 ----------
815
826
816 suffix: str [default: '']
827 suffix: str [default: '']
817 The suffix, if any, for the magics. This allows you to have
828 The suffix, if any, for the magics. This allows you to have
818 multiple views associated with parallel magics at the same time.
829 multiple views associated with parallel magics at the same time.
819
830
820 e.g. ``rc[::2].activate(suffix='_even')`` will give you
831 e.g. ``rc[::2].activate(suffix='_even')`` will give you
821 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics
832 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics
822 on the even engines.
833 on the even engines.
823 """
834 """
824
835
825 from IPython.parallel.client.magics import ParallelMagics
836 from IPython.parallel.client.magics import ParallelMagics
826
837
827 try:
838 try:
828 # This is injected into __builtins__.
839 # This is injected into __builtins__.
829 ip = get_ipython()
840 ip = get_ipython()
830 except NameError:
841 except NameError:
831 print("The IPython parallel magics (%px, etc.) only work within IPython.")
842 print("The IPython parallel magics (%px, etc.) only work within IPython.")
832 return
843 return
833
844
834 M = ParallelMagics(ip, self, suffix)
845 M = ParallelMagics(ip, self, suffix)
835 ip.magics_manager.register(M)
846 ip.magics_manager.register(M)
836
847
837
848
838 @skip_doctest
849 @skip_doctest
839 class LoadBalancedView(View):
850 class LoadBalancedView(View):
840 """An load-balancing View that only executes via the Task scheduler.
851 """An load-balancing View that only executes via the Task scheduler.
841
852
842 Load-balanced views can be created with the client's `view` method:
853 Load-balanced views can be created with the client's `view` method:
843
854
844 >>> v = client.load_balanced_view()
855 >>> v = client.load_balanced_view()
845
856
846 or targets can be specified, to restrict the potential destinations:
857 or targets can be specified, to restrict the potential destinations:
847
858
848 >>> v = client.client.load_balanced_view([1,3])
859 >>> v = client.client.load_balanced_view([1,3])
849
860
850 which would restrict loadbalancing to between engines 1 and 3.
861 which would restrict loadbalancing to between engines 1 and 3.
851
862
852 """
863 """
853
864
854 follow=Any()
865 follow=Any()
855 after=Any()
866 after=Any()
856 timeout=CFloat()
867 timeout=CFloat()
857 retries = Integer(0)
868 retries = Integer(0)
858
869
859 _task_scheme = Any()
870 _task_scheme = Any()
860 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
871 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
861
872
862 def __init__(self, client=None, socket=None, **flags):
873 def __init__(self, client=None, socket=None, **flags):
863 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
874 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
864 self._task_scheme=client._task_scheme
875 self._task_scheme=client._task_scheme
865
876
866 def _validate_dependency(self, dep):
877 def _validate_dependency(self, dep):
867 """validate a dependency.
878 """validate a dependency.
868
879
869 For use in `set_flags`.
880 For use in `set_flags`.
870 """
881 """
871 if dep is None or isinstance(dep, string_types + (AsyncResult, Dependency)):
882 if dep is None or isinstance(dep, string_types + (AsyncResult, Dependency)):
872 return True
883 return True
873 elif isinstance(dep, (list,set, tuple)):
884 elif isinstance(dep, (list,set, tuple)):
874 for d in dep:
885 for d in dep:
875 if not isinstance(d, string_types + (AsyncResult,)):
886 if not isinstance(d, string_types + (AsyncResult,)):
876 return False
887 return False
877 elif isinstance(dep, dict):
888 elif isinstance(dep, dict):
878 if set(dep.keys()) != set(Dependency().as_dict().keys()):
889 if set(dep.keys()) != set(Dependency().as_dict().keys()):
879 return False
890 return False
880 if not isinstance(dep['msg_ids'], list):
891 if not isinstance(dep['msg_ids'], list):
881 return False
892 return False
882 for d in dep['msg_ids']:
893 for d in dep['msg_ids']:
883 if not isinstance(d, string_types):
894 if not isinstance(d, string_types):
884 return False
895 return False
885 else:
896 else:
886 return False
897 return False
887
898
888 return True
899 return True
889
900
890 def _render_dependency(self, dep):
901 def _render_dependency(self, dep):
891 """helper for building jsonable dependencies from various input forms."""
902 """helper for building jsonable dependencies from various input forms."""
892 if isinstance(dep, Dependency):
903 if isinstance(dep, Dependency):
893 return dep.as_dict()
904 return dep.as_dict()
894 elif isinstance(dep, AsyncResult):
905 elif isinstance(dep, AsyncResult):
895 return dep.msg_ids
906 return dep.msg_ids
896 elif dep is None:
907 elif dep is None:
897 return []
908 return []
898 else:
909 else:
899 # pass to Dependency constructor
910 # pass to Dependency constructor
900 return list(Dependency(dep))
911 return list(Dependency(dep))
901
912
902 def set_flags(self, **kwargs):
913 def set_flags(self, **kwargs):
903 """set my attribute flags by keyword.
914 """set my attribute flags by keyword.
904
915
905 A View is a wrapper for the Client's apply method, but with attributes
916 A View is a wrapper for the Client's apply method, but with attributes
906 that specify keyword arguments, those attributes can be set by keyword
917 that specify keyword arguments, those attributes can be set by keyword
907 argument with this method.
918 argument with this method.
908
919
909 Parameters
920 Parameters
910 ----------
921 ----------
911
922
912 block : bool
923 block : bool
913 whether to wait for results
924 whether to wait for results
914 track : bool
925 track : bool
915 whether to create a MessageTracker to allow the user to
926 whether to create a MessageTracker to allow the user to
916 safely edit after arrays and buffers during non-copying
927 safely edit after arrays and buffers during non-copying
917 sends.
928 sends.
918
929
919 after : Dependency or collection of msg_ids
930 after : Dependency or collection of msg_ids
920 Only for load-balanced execution (targets=None)
931 Only for load-balanced execution (targets=None)
921 Specify a list of msg_ids as a time-based dependency.
932 Specify a list of msg_ids as a time-based dependency.
922 This job will only be run *after* the dependencies
933 This job will only be run *after* the dependencies
923 have been met.
934 have been met.
924
935
925 follow : Dependency or collection of msg_ids
936 follow : Dependency or collection of msg_ids
926 Only for load-balanced execution (targets=None)
937 Only for load-balanced execution (targets=None)
927 Specify a list of msg_ids as a location-based dependency.
938 Specify a list of msg_ids as a location-based dependency.
928 This job will only be run on an engine where this dependency
939 This job will only be run on an engine where this dependency
929 is met.
940 is met.
930
941
931 timeout : float/int or None
942 timeout : float/int or None
932 Only for load-balanced execution (targets=None)
943 Only for load-balanced execution (targets=None)
933 Specify an amount of time (in seconds) for the scheduler to
944 Specify an amount of time (in seconds) for the scheduler to
934 wait for dependencies to be met before failing with a
945 wait for dependencies to be met before failing with a
935 DependencyTimeout.
946 DependencyTimeout.
936
947
937 retries : int
948 retries : int
938 Number of times a task will be retried on failure.
949 Number of times a task will be retried on failure.
939 """
950 """
940
951
941 super(LoadBalancedView, self).set_flags(**kwargs)
952 super(LoadBalancedView, self).set_flags(**kwargs)
942 for name in ('follow', 'after'):
953 for name in ('follow', 'after'):
943 if name in kwargs:
954 if name in kwargs:
944 value = kwargs[name]
955 value = kwargs[name]
945 if self._validate_dependency(value):
956 if self._validate_dependency(value):
946 setattr(self, name, value)
957 setattr(self, name, value)
947 else:
958 else:
948 raise ValueError("Invalid dependency: %r"%value)
959 raise ValueError("Invalid dependency: %r"%value)
949 if 'timeout' in kwargs:
960 if 'timeout' in kwargs:
950 t = kwargs['timeout']
961 t = kwargs['timeout']
951 if not isinstance(t, (int, float, type(None))):
962 if not isinstance(t, (int, float, type(None))):
952 if (not PY3) and (not isinstance(t, long)):
963 if (not PY3) and (not isinstance(t, long)):
953 raise TypeError("Invalid type for timeout: %r"%type(t))
964 raise TypeError("Invalid type for timeout: %r"%type(t))
954 if t is not None:
965 if t is not None:
955 if t < 0:
966 if t < 0:
956 raise ValueError("Invalid timeout: %s"%t)
967 raise ValueError("Invalid timeout: %s"%t)
957 self.timeout = t
968 self.timeout = t
958
969
959 @sync_results
970 @sync_results
960 @save_ids
971 @save_ids
961 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
972 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
962 after=None, follow=None, timeout=None,
973 after=None, follow=None, timeout=None,
963 targets=None, retries=None):
974 targets=None, retries=None):
964 """calls f(*args, **kwargs) on a remote engine, returning the result.
975 """calls f(*args, **kwargs) on a remote engine, returning the result.
965
976
966 This method temporarily sets all of `apply`'s flags for a single call.
977 This method temporarily sets all of `apply`'s flags for a single call.
967
978
968 Parameters
979 Parameters
969 ----------
980 ----------
970
981
971 f : callable
982 f : callable
972
983
973 args : list [default: empty]
984 args : list [default: empty]
974
985
975 kwargs : dict [default: empty]
986 kwargs : dict [default: empty]
976
987
977 block : bool [default: self.block]
988 block : bool [default: self.block]
978 whether to block
989 whether to block
979 track : bool [default: self.track]
990 track : bool [default: self.track]
980 whether to ask zmq to track the message, for safe non-copying sends
991 whether to ask zmq to track the message, for safe non-copying sends
981
992
982 !!!!!! TODO: THE REST HERE !!!!
993 !!!!!! TODO: THE REST HERE !!!!
983
994
984 Returns
995 Returns
985 -------
996 -------
986
997
987 if self.block is False:
998 if self.block is False:
988 returns AsyncResult
999 returns AsyncResult
989 else:
1000 else:
990 returns actual result of f(*args, **kwargs) on the engine(s)
1001 returns actual result of f(*args, **kwargs) on the engine(s)
991 This will be a list of self.targets is also a list (even length 1), or
1002 This will be a list of self.targets is also a list (even length 1), or
992 the single result if self.targets is an integer engine id
1003 the single result if self.targets is an integer engine id
993 """
1004 """
994
1005
995 # validate whether we can run
1006 # validate whether we can run
996 if self._socket.closed:
1007 if self._socket.closed:
997 msg = "Task farming is disabled"
1008 msg = "Task farming is disabled"
998 if self._task_scheme == 'pure':
1009 if self._task_scheme == 'pure':
999 msg += " because the pure ZMQ scheduler cannot handle"
1010 msg += " because the pure ZMQ scheduler cannot handle"
1000 msg += " disappearing engines."
1011 msg += " disappearing engines."
1001 raise RuntimeError(msg)
1012 raise RuntimeError(msg)
1002
1013
1003 if self._task_scheme == 'pure':
1014 if self._task_scheme == 'pure':
1004 # pure zmq scheme doesn't support extra features
1015 # pure zmq scheme doesn't support extra features
1005 msg = "Pure ZMQ scheduler doesn't support the following flags:"
1016 msg = "Pure ZMQ scheduler doesn't support the following flags:"
1006 "follow, after, retries, targets, timeout"
1017 "follow, after, retries, targets, timeout"
1007 if (follow or after or retries or targets or timeout):
1018 if (follow or after or retries or targets or timeout):
1008 # hard fail on Scheduler flags
1019 # hard fail on Scheduler flags
1009 raise RuntimeError(msg)
1020 raise RuntimeError(msg)
1010 if isinstance(f, dependent):
1021 if isinstance(f, dependent):
1011 # soft warn on functional dependencies
1022 # soft warn on functional dependencies
1012 warnings.warn(msg, RuntimeWarning)
1023 warnings.warn(msg, RuntimeWarning)
1013
1024
1014 # build args
1025 # build args
1015 args = [] if args is None else args
1026 args = [] if args is None else args
1016 kwargs = {} if kwargs is None else kwargs
1027 kwargs = {} if kwargs is None else kwargs
1017 block = self.block if block is None else block
1028 block = self.block if block is None else block
1018 track = self.track if track is None else track
1029 track = self.track if track is None else track
1019 after = self.after if after is None else after
1030 after = self.after if after is None else after
1020 retries = self.retries if retries is None else retries
1031 retries = self.retries if retries is None else retries
1021 follow = self.follow if follow is None else follow
1032 follow = self.follow if follow is None else follow
1022 timeout = self.timeout if timeout is None else timeout
1033 timeout = self.timeout if timeout is None else timeout
1023 targets = self.targets if targets is None else targets
1034 targets = self.targets if targets is None else targets
1024
1035
1025 if not isinstance(retries, int):
1036 if not isinstance(retries, int):
1026 raise TypeError('retries must be int, not %r'%type(retries))
1037 raise TypeError('retries must be int, not %r'%type(retries))
1027
1038
1028 if targets is None:
1039 if targets is None:
1029 idents = []
1040 idents = []
1030 else:
1041 else:
1031 idents = self.client._build_targets(targets)[0]
1042 idents = self.client._build_targets(targets)[0]
1032 # ensure *not* bytes
1043 # ensure *not* bytes
1033 idents = [ ident.decode() for ident in idents ]
1044 idents = [ ident.decode() for ident in idents ]
1034
1045
1035 after = self._render_dependency(after)
1046 after = self._render_dependency(after)
1036 follow = self._render_dependency(follow)
1047 follow = self._render_dependency(follow)
1037 metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1048 metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1038
1049
1039 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1050 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1040 metadata=metadata)
1051 metadata=metadata)
1041 tracker = None if track is False else msg['tracker']
1052 tracker = None if track is False else msg['tracker']
1042
1053
1043 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
1054 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
1044
1055
1045 if block:
1056 if block:
1046 try:
1057 try:
1047 return ar.get()
1058 return ar.get()
1048 except KeyboardInterrupt:
1059 except KeyboardInterrupt:
1049 pass
1060 pass
1050 return ar
1061 return ar
1051
1062
1052 @sync_results
1063 @sync_results
1053 @save_ids
1064 @save_ids
1054 def map(self, f, *sequences, **kwargs):
1065 def map(self, f, *sequences, **kwargs):
1055 """``view.map(f, *sequences, block=self.block, chunksize=1, ordered=True)`` => list|AsyncMapResult
1066 """``view.map(f, *sequences, block=self.block, chunksize=1, ordered=True)`` => list|AsyncMapResult
1056
1067
1057 Parallel version of builtin `map`, load-balanced by this View.
1068 Parallel version of builtin `map`, load-balanced by this View.
1058
1069
1059 `block`, and `chunksize` can be specified by keyword only.
1070 `block`, and `chunksize` can be specified by keyword only.
1060
1071
1061 Each `chunksize` elements will be a separate task, and will be
1072 Each `chunksize` elements will be a separate task, and will be
1062 load-balanced. This lets individual elements be available for iteration
1073 load-balanced. This lets individual elements be available for iteration
1063 as soon as they arrive.
1074 as soon as they arrive.
1064
1075
1065 Parameters
1076 Parameters
1066 ----------
1077 ----------
1067
1078
1068 f : callable
1079 f : callable
1069 function to be mapped
1080 function to be mapped
1070 *sequences: one or more sequences of matching length
1081 *sequences: one or more sequences of matching length
1071 the sequences to be distributed and passed to `f`
1082 the sequences to be distributed and passed to `f`
1072 block : bool [default self.block]
1083 block : bool [default self.block]
1073 whether to wait for the result or not
1084 whether to wait for the result or not
1074 track : bool
1085 track : bool
1075 whether to create a MessageTracker to allow the user to
1086 whether to create a MessageTracker to allow the user to
1076 safely edit after arrays and buffers during non-copying
1087 safely edit after arrays and buffers during non-copying
1077 sends.
1088 sends.
1078 chunksize : int [default 1]
1089 chunksize : int [default 1]
1079 how many elements should be in each task.
1090 how many elements should be in each task.
1080 ordered : bool [default True]
1091 ordered : bool [default True]
1081 Whether the results should be gathered as they arrive, or enforce
1092 Whether the results should be gathered as they arrive, or enforce
1082 the order of submission.
1093 the order of submission.
1083
1094
1084 Only applies when iterating through AsyncMapResult as results arrive.
1095 Only applies when iterating through AsyncMapResult as results arrive.
1085 Has no effect when block=True.
1096 Has no effect when block=True.
1086
1097
1087 Returns
1098 Returns
1088 -------
1099 -------
1089
1100
1090 if block=False
1101 if block=False
1091 An :class:`~IPython.parallel.client.asyncresult.AsyncMapResult` instance.
1102 An :class:`~IPython.parallel.client.asyncresult.AsyncMapResult` instance.
1092 An object like AsyncResult, but which reassembles the sequence of results
1103 An object like AsyncResult, but which reassembles the sequence of results
1093 into a single list. AsyncMapResults can be iterated through before all
1104 into a single list. AsyncMapResults can be iterated through before all
1094 results are complete.
1105 results are complete.
1095 else
1106 else
1096 A list, the result of ``map(f,*sequences)``
1107 A list, the result of ``map(f,*sequences)``
1097 """
1108 """
1098
1109
1099 # default
1110 # default
1100 block = kwargs.get('block', self.block)
1111 block = kwargs.get('block', self.block)
1101 chunksize = kwargs.get('chunksize', 1)
1112 chunksize = kwargs.get('chunksize', 1)
1102 ordered = kwargs.get('ordered', True)
1113 ordered = kwargs.get('ordered', True)
1103
1114
1104 keyset = set(kwargs.keys())
1115 keyset = set(kwargs.keys())
1105 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1116 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1106 if extra_keys:
1117 if extra_keys:
1107 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1118 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1108
1119
1109 assert len(sequences) > 0, "must have some sequences to map onto!"
1120 assert len(sequences) > 0, "must have some sequences to map onto!"
1110
1121
1111 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1122 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1112 return pf.map(*sequences)
1123 return pf.map(*sequences)
1113
1124
1114 __all__ = ['LoadBalancedView', 'DirectView']
1125 __all__ = ['LoadBalancedView', 'DirectView']
@@ -1,352 +1,382 b''
1 # encoding: utf-8
1 # encoding: utf-8
2
2
3 """Pickle related utilities. Perhaps this should be called 'can'."""
3 """Pickle related utilities. Perhaps this should be called 'can'."""
4
4
5 __docformat__ = "restructuredtext en"
5 __docformat__ = "restructuredtext en"
6
6
7 #-------------------------------------------------------------------------------
7 #-------------------------------------------------------------------------------
8 # Copyright (C) 2008-2011 The IPython Development Team
8 # Copyright (C) 2008-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 #-------------------------------------------------------------------------------
14 #-------------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 import copy
18 import copy
19 import logging
19 import logging
20 import sys
20 import sys
21 from types import FunctionType
21 from types import FunctionType
22
22
23 try:
23 try:
24 import cPickle as pickle
24 import cPickle as pickle
25 except ImportError:
25 except ImportError:
26 import pickle
26 import pickle
27
27
28 from . import codeutil # This registers a hook when it's imported
28 from . import codeutil # This registers a hook when it's imported
29 from . import py3compat
29 from . import py3compat
30 from .importstring import import_item
30 from .importstring import import_item
31 from .py3compat import string_types, iteritems
31 from .py3compat import string_types, iteritems
32
32
33 from IPython.config import Application
33 from IPython.config import Application
34
34
35 if py3compat.PY3:
35 if py3compat.PY3:
36 buffer = memoryview
36 buffer = memoryview
37 class_type = type
37 class_type = type
38 else:
38 else:
39 from types import ClassType
39 from types import ClassType
40 class_type = (type, ClassType)
40 class_type = (type, ClassType)
41
41
42 #-------------------------------------------------------------------------------
42 #-------------------------------------------------------------------------------
43 # Functions
44 #-------------------------------------------------------------------------------
45
46
47 def use_dill():
48 """use dill to expand serialization support
49
50 adds support for object methods and closures to serialization.
51 """
52 # import dill causes most of the magic
53 import dill
54
55 # dill doesn't work with cPickle,
56 # tell the two relevant modules to use plain pickle
57
58 global pickle
59 pickle = dill
60
61 try:
62 from IPython.kernel.zmq import serialize
63 except ImportError:
64 pass
65 else:
66 serialize.pickle = dill
67
68 # disable special function handling, let dill take care of it
69 can_map.pop(FunctionType, None)
70
71
72 #-------------------------------------------------------------------------------
43 # Classes
73 # Classes
44 #-------------------------------------------------------------------------------
74 #-------------------------------------------------------------------------------
45
75
46
76
47 class CannedObject(object):
77 class CannedObject(object):
48 def __init__(self, obj, keys=[], hook=None):
78 def __init__(self, obj, keys=[], hook=None):
49 """can an object for safe pickling
79 """can an object for safe pickling
50
80
51 Parameters
81 Parameters
52 ==========
82 ==========
53
83
54 obj:
84 obj:
55 The object to be canned
85 The object to be canned
56 keys: list (optional)
86 keys: list (optional)
57 list of attribute names that will be explicitly canned / uncanned
87 list of attribute names that will be explicitly canned / uncanned
58 hook: callable (optional)
88 hook: callable (optional)
59 An optional extra callable,
89 An optional extra callable,
60 which can do additional processing of the uncanned object.
90 which can do additional processing of the uncanned object.
61
91
62 large data may be offloaded into the buffers list,
92 large data may be offloaded into the buffers list,
63 used for zero-copy transfers.
93 used for zero-copy transfers.
64 """
94 """
65 self.keys = keys
95 self.keys = keys
66 self.obj = copy.copy(obj)
96 self.obj = copy.copy(obj)
67 self.hook = can(hook)
97 self.hook = can(hook)
68 for key in keys:
98 for key in keys:
69 setattr(self.obj, key, can(getattr(obj, key)))
99 setattr(self.obj, key, can(getattr(obj, key)))
70
100
71 self.buffers = []
101 self.buffers = []
72
102
73 def get_object(self, g=None):
103 def get_object(self, g=None):
74 if g is None:
104 if g is None:
75 g = {}
105 g = {}
76 obj = self.obj
106 obj = self.obj
77 for key in self.keys:
107 for key in self.keys:
78 setattr(obj, key, uncan(getattr(obj, key), g))
108 setattr(obj, key, uncan(getattr(obj, key), g))
79
109
80 if self.hook:
110 if self.hook:
81 self.hook = uncan(self.hook, g)
111 self.hook = uncan(self.hook, g)
82 self.hook(obj, g)
112 self.hook(obj, g)
83 return self.obj
113 return self.obj
84
114
85
115
86 class Reference(CannedObject):
116 class Reference(CannedObject):
87 """object for wrapping a remote reference by name."""
117 """object for wrapping a remote reference by name."""
88 def __init__(self, name):
118 def __init__(self, name):
89 if not isinstance(name, string_types):
119 if not isinstance(name, string_types):
90 raise TypeError("illegal name: %r"%name)
120 raise TypeError("illegal name: %r"%name)
91 self.name = name
121 self.name = name
92 self.buffers = []
122 self.buffers = []
93
123
94 def __repr__(self):
124 def __repr__(self):
95 return "<Reference: %r>"%self.name
125 return "<Reference: %r>"%self.name
96
126
97 def get_object(self, g=None):
127 def get_object(self, g=None):
98 if g is None:
128 if g is None:
99 g = {}
129 g = {}
100
130
101 return eval(self.name, g)
131 return eval(self.name, g)
102
132
103
133
104 class CannedFunction(CannedObject):
134 class CannedFunction(CannedObject):
105
135
106 def __init__(self, f):
136 def __init__(self, f):
107 self._check_type(f)
137 self._check_type(f)
108 self.code = f.__code__
138 self.code = f.__code__
109 if f.__defaults__:
139 if f.__defaults__:
110 self.defaults = [ can(fd) for fd in f.__defaults__ ]
140 self.defaults = [ can(fd) for fd in f.__defaults__ ]
111 else:
141 else:
112 self.defaults = None
142 self.defaults = None
113 self.module = f.__module__ or '__main__'
143 self.module = f.__module__ or '__main__'
114 self.__name__ = f.__name__
144 self.__name__ = f.__name__
115 self.buffers = []
145 self.buffers = []
116
146
117 def _check_type(self, obj):
147 def _check_type(self, obj):
118 assert isinstance(obj, FunctionType), "Not a function type"
148 assert isinstance(obj, FunctionType), "Not a function type"
119
149
120 def get_object(self, g=None):
150 def get_object(self, g=None):
121 # try to load function back into its module:
151 # try to load function back into its module:
122 if not self.module.startswith('__'):
152 if not self.module.startswith('__'):
123 __import__(self.module)
153 __import__(self.module)
124 g = sys.modules[self.module].__dict__
154 g = sys.modules[self.module].__dict__
125
155
126 if g is None:
156 if g is None:
127 g = {}
157 g = {}
128 if self.defaults:
158 if self.defaults:
129 defaults = tuple(uncan(cfd, g) for cfd in self.defaults)
159 defaults = tuple(uncan(cfd, g) for cfd in self.defaults)
130 else:
160 else:
131 defaults = None
161 defaults = None
132 newFunc = FunctionType(self.code, g, self.__name__, defaults)
162 newFunc = FunctionType(self.code, g, self.__name__, defaults)
133 return newFunc
163 return newFunc
134
164
135 class CannedClass(CannedObject):
165 class CannedClass(CannedObject):
136
166
137 def __init__(self, cls):
167 def __init__(self, cls):
138 self._check_type(cls)
168 self._check_type(cls)
139 self.name = cls.__name__
169 self.name = cls.__name__
140 self.old_style = not isinstance(cls, type)
170 self.old_style = not isinstance(cls, type)
141 self._canned_dict = {}
171 self._canned_dict = {}
142 for k,v in cls.__dict__.items():
172 for k,v in cls.__dict__.items():
143 if k not in ('__weakref__', '__dict__'):
173 if k not in ('__weakref__', '__dict__'):
144 self._canned_dict[k] = can(v)
174 self._canned_dict[k] = can(v)
145 if self.old_style:
175 if self.old_style:
146 mro = []
176 mro = []
147 else:
177 else:
148 mro = cls.mro()
178 mro = cls.mro()
149
179
150 self.parents = [ can(c) for c in mro[1:] ]
180 self.parents = [ can(c) for c in mro[1:] ]
151 self.buffers = []
181 self.buffers = []
152
182
153 def _check_type(self, obj):
183 def _check_type(self, obj):
154 assert isinstance(obj, class_type), "Not a class type"
184 assert isinstance(obj, class_type), "Not a class type"
155
185
156 def get_object(self, g=None):
186 def get_object(self, g=None):
157 parents = tuple(uncan(p, g) for p in self.parents)
187 parents = tuple(uncan(p, g) for p in self.parents)
158 return type(self.name, parents, uncan_dict(self._canned_dict, g=g))
188 return type(self.name, parents, uncan_dict(self._canned_dict, g=g))
159
189
160 class CannedArray(CannedObject):
190 class CannedArray(CannedObject):
161 def __init__(self, obj):
191 def __init__(self, obj):
162 from numpy import ascontiguousarray
192 from numpy import ascontiguousarray
163 self.shape = obj.shape
193 self.shape = obj.shape
164 self.dtype = obj.dtype.descr if obj.dtype.fields else obj.dtype.str
194 self.dtype = obj.dtype.descr if obj.dtype.fields else obj.dtype.str
165 if sum(obj.shape) == 0:
195 if sum(obj.shape) == 0:
166 # just pickle it
196 # just pickle it
167 self.buffers = [pickle.dumps(obj, -1)]
197 self.buffers = [pickle.dumps(obj, -1)]
168 else:
198 else:
169 # ensure contiguous
199 # ensure contiguous
170 obj = ascontiguousarray(obj, dtype=None)
200 obj = ascontiguousarray(obj, dtype=None)
171 self.buffers = [buffer(obj)]
201 self.buffers = [buffer(obj)]
172
202
173 def get_object(self, g=None):
203 def get_object(self, g=None):
174 from numpy import frombuffer
204 from numpy import frombuffer
175 data = self.buffers[0]
205 data = self.buffers[0]
176 if sum(self.shape) == 0:
206 if sum(self.shape) == 0:
177 # no shape, we just pickled it
207 # no shape, we just pickled it
178 return pickle.loads(data)
208 return pickle.loads(data)
179 else:
209 else:
180 return frombuffer(data, dtype=self.dtype).reshape(self.shape)
210 return frombuffer(data, dtype=self.dtype).reshape(self.shape)
181
211
182
212
183 class CannedBytes(CannedObject):
213 class CannedBytes(CannedObject):
184 wrap = bytes
214 wrap = bytes
185 def __init__(self, obj):
215 def __init__(self, obj):
186 self.buffers = [obj]
216 self.buffers = [obj]
187
217
188 def get_object(self, g=None):
218 def get_object(self, g=None):
189 data = self.buffers[0]
219 data = self.buffers[0]
190 return self.wrap(data)
220 return self.wrap(data)
191
221
192 def CannedBuffer(CannedBytes):
222 def CannedBuffer(CannedBytes):
193 wrap = buffer
223 wrap = buffer
194
224
195 #-------------------------------------------------------------------------------
225 #-------------------------------------------------------------------------------
196 # Functions
226 # Functions
197 #-------------------------------------------------------------------------------
227 #-------------------------------------------------------------------------------
198
228
199 def _logger():
229 def _logger():
200 """get the logger for the current Application
230 """get the logger for the current Application
201
231
202 the root logger will be used if no Application is running
232 the root logger will be used if no Application is running
203 """
233 """
204 if Application.initialized():
234 if Application.initialized():
205 logger = Application.instance().log
235 logger = Application.instance().log
206 else:
236 else:
207 logger = logging.getLogger()
237 logger = logging.getLogger()
208 if not logger.handlers:
238 if not logger.handlers:
209 logging.basicConfig()
239 logging.basicConfig()
210
240
211 return logger
241 return logger
212
242
213 def _import_mapping(mapping, original=None):
243 def _import_mapping(mapping, original=None):
214 """import any string-keys in a type mapping
244 """import any string-keys in a type mapping
215
245
216 """
246 """
217 log = _logger()
247 log = _logger()
218 log.debug("Importing canning map")
248 log.debug("Importing canning map")
219 for key,value in list(mapping.items()):
249 for key,value in list(mapping.items()):
220 if isinstance(key, string_types):
250 if isinstance(key, string_types):
221 try:
251 try:
222 cls = import_item(key)
252 cls = import_item(key)
223 except Exception:
253 except Exception:
224 if original and key not in original:
254 if original and key not in original:
225 # only message on user-added classes
255 # only message on user-added classes
226 log.error("canning class not importable: %r", key, exc_info=True)
256 log.error("canning class not importable: %r", key, exc_info=True)
227 mapping.pop(key)
257 mapping.pop(key)
228 else:
258 else:
229 mapping[cls] = mapping.pop(key)
259 mapping[cls] = mapping.pop(key)
230
260
231 def istype(obj, check):
261 def istype(obj, check):
232 """like isinstance(obj, check), but strict
262 """like isinstance(obj, check), but strict
233
263
234 This won't catch subclasses.
264 This won't catch subclasses.
235 """
265 """
236 if isinstance(check, tuple):
266 if isinstance(check, tuple):
237 for cls in check:
267 for cls in check:
238 if type(obj) is cls:
268 if type(obj) is cls:
239 return True
269 return True
240 return False
270 return False
241 else:
271 else:
242 return type(obj) is check
272 return type(obj) is check
243
273
244 def can(obj):
274 def can(obj):
245 """prepare an object for pickling"""
275 """prepare an object for pickling"""
246
276
247 import_needed = False
277 import_needed = False
248
278
249 for cls,canner in iteritems(can_map):
279 for cls,canner in iteritems(can_map):
250 if isinstance(cls, string_types):
280 if isinstance(cls, string_types):
251 import_needed = True
281 import_needed = True
252 break
282 break
253 elif istype(obj, cls):
283 elif istype(obj, cls):
254 return canner(obj)
284 return canner(obj)
255
285
256 if import_needed:
286 if import_needed:
257 # perform can_map imports, then try again
287 # perform can_map imports, then try again
258 # this will usually only happen once
288 # this will usually only happen once
259 _import_mapping(can_map, _original_can_map)
289 _import_mapping(can_map, _original_can_map)
260 return can(obj)
290 return can(obj)
261
291
262 return obj
292 return obj
263
293
264 def can_class(obj):
294 def can_class(obj):
265 if isinstance(obj, class_type) and obj.__module__ == '__main__':
295 if isinstance(obj, class_type) and obj.__module__ == '__main__':
266 return CannedClass(obj)
296 return CannedClass(obj)
267 else:
297 else:
268 return obj
298 return obj
269
299
270 def can_dict(obj):
300 def can_dict(obj):
271 """can the *values* of a dict"""
301 """can the *values* of a dict"""
272 if istype(obj, dict):
302 if istype(obj, dict):
273 newobj = {}
303 newobj = {}
274 for k, v in iteritems(obj):
304 for k, v in iteritems(obj):
275 newobj[k] = can(v)
305 newobj[k] = can(v)
276 return newobj
306 return newobj
277 else:
307 else:
278 return obj
308 return obj
279
309
280 sequence_types = (list, tuple, set)
310 sequence_types = (list, tuple, set)
281
311
282 def can_sequence(obj):
312 def can_sequence(obj):
283 """can the elements of a sequence"""
313 """can the elements of a sequence"""
284 if istype(obj, sequence_types):
314 if istype(obj, sequence_types):
285 t = type(obj)
315 t = type(obj)
286 return t([can(i) for i in obj])
316 return t([can(i) for i in obj])
287 else:
317 else:
288 return obj
318 return obj
289
319
290 def uncan(obj, g=None):
320 def uncan(obj, g=None):
291 """invert canning"""
321 """invert canning"""
292
322
293 import_needed = False
323 import_needed = False
294 for cls,uncanner in iteritems(uncan_map):
324 for cls,uncanner in iteritems(uncan_map):
295 if isinstance(cls, string_types):
325 if isinstance(cls, string_types):
296 import_needed = True
326 import_needed = True
297 break
327 break
298 elif isinstance(obj, cls):
328 elif isinstance(obj, cls):
299 return uncanner(obj, g)
329 return uncanner(obj, g)
300
330
301 if import_needed:
331 if import_needed:
302 # perform uncan_map imports, then try again
332 # perform uncan_map imports, then try again
303 # this will usually only happen once
333 # this will usually only happen once
304 _import_mapping(uncan_map, _original_uncan_map)
334 _import_mapping(uncan_map, _original_uncan_map)
305 return uncan(obj, g)
335 return uncan(obj, g)
306
336
307 return obj
337 return obj
308
338
309 def uncan_dict(obj, g=None):
339 def uncan_dict(obj, g=None):
310 if istype(obj, dict):
340 if istype(obj, dict):
311 newobj = {}
341 newobj = {}
312 for k, v in iteritems(obj):
342 for k, v in iteritems(obj):
313 newobj[k] = uncan(v,g)
343 newobj[k] = uncan(v,g)
314 return newobj
344 return newobj
315 else:
345 else:
316 return obj
346 return obj
317
347
318 def uncan_sequence(obj, g=None):
348 def uncan_sequence(obj, g=None):
319 if istype(obj, sequence_types):
349 if istype(obj, sequence_types):
320 t = type(obj)
350 t = type(obj)
321 return t([uncan(i,g) for i in obj])
351 return t([uncan(i,g) for i in obj])
322 else:
352 else:
323 return obj
353 return obj
324
354
325 def _uncan_dependent_hook(dep, g=None):
355 def _uncan_dependent_hook(dep, g=None):
326 dep.check_dependency()
356 dep.check_dependency()
327
357
328 def can_dependent(obj):
358 def can_dependent(obj):
329 return CannedObject(obj, keys=('f', 'df'), hook=_uncan_dependent_hook)
359 return CannedObject(obj, keys=('f', 'df'), hook=_uncan_dependent_hook)
330
360
331 #-------------------------------------------------------------------------------
361 #-------------------------------------------------------------------------------
332 # API dictionaries
362 # API dictionaries
333 #-------------------------------------------------------------------------------
363 #-------------------------------------------------------------------------------
334
364
335 # These dicts can be extended for custom serialization of new objects
365 # These dicts can be extended for custom serialization of new objects
336
366
337 can_map = {
367 can_map = {
338 'IPython.parallel.dependent' : can_dependent,
368 'IPython.parallel.dependent' : can_dependent,
339 'numpy.ndarray' : CannedArray,
369 'numpy.ndarray' : CannedArray,
340 FunctionType : CannedFunction,
370 FunctionType : CannedFunction,
341 bytes : CannedBytes,
371 bytes : CannedBytes,
342 buffer : CannedBuffer,
372 buffer : CannedBuffer,
343 class_type : can_class,
373 class_type : can_class,
344 }
374 }
345
375
346 uncan_map = {
376 uncan_map = {
347 CannedObject : lambda obj, g: obj.get_object(g),
377 CannedObject : lambda obj, g: obj.get_object(g),
348 }
378 }
349
379
350 # for use in _import_mapping:
380 # for use in _import_mapping:
351 _original_can_map = can_map.copy()
381 _original_can_map = can_map.copy()
352 _original_uncan_map = uncan_map.copy()
382 _original_uncan_map = uncan_map.copy()
General Comments 0
You need to be logged in to leave comments. Login now