##// END OF EJS Templates
Merge pull request #4552 from minrk/use_dill...
Min RK -
r13728:818f778e merge
parent child Browse files
Show More
@@ -0,0 +1,7 b''
1 Using dill to expand serialization support
2 ------------------------------------------
3
4 adds :func:`~IPython.utils.pickleutil.use_dill` for allowing
5 dill to extend serialization support in :mod:`IPython.parallel` (closures, etc.).
6 Also adds :meth:`DirectView.use_dill` convenience method for enabling dill
7 locally and on all engines with one call.
@@ -0,0 +1,463 b''
1 {
2 "metadata": {
3 "gist_id": "5241793",
4 "name": ""
5 },
6 "nbformat": 3,
7 "nbformat_minor": 0,
8 "worksheets": [
9 {
10 "cells": [
11 {
12 "cell_type": "heading",
13 "level": 1,
14 "metadata": {},
15 "source": [
16 "Using dill to pickle anything"
17 ]
18 },
19 {
20 "cell_type": "markdown",
21 "metadata": {},
22 "source": [
23 "IPython.parallel doesn't do much in the way of serialization.\n",
24 "It has custom zero-copy handling of numpy arrays,\n",
25 "but other than that, it doesn't do anything other than the bare minimum to make basic interactively defined functions and classes sendable.\n",
26 "\n",
27 "There are a few projects that extend pickle to make just about anything sendable, and one of these is [dill](http://www.cacr.caltech.edu/~mmckerns/dill).\n",
28 "\n",
29 "To install dill:\n",
30 " \n",
31 " pip install dill"
32 ]
33 },
34 {
35 "cell_type": "markdown",
36 "metadata": {},
37 "source": [
38 "First, as always, we create a task function, this time with a closure"
39 ]
40 },
41 {
42 "cell_type": "code",
43 "collapsed": false,
44 "input": [
45 "def make_closure(a):\n",
46 " \"\"\"make a function with a closure, and return it\"\"\"\n",
47 " def has_closure(b):\n",
48 " return a * b\n",
49 " return has_closure"
50 ],
51 "language": "python",
52 "metadata": {},
53 "outputs": [],
54 "prompt_number": 1
55 },
56 {
57 "cell_type": "code",
58 "collapsed": false,
59 "input": [
60 "closed = make_closure(5)"
61 ],
62 "language": "python",
63 "metadata": {},
64 "outputs": [],
65 "prompt_number": 2
66 },
67 {
68 "cell_type": "code",
69 "collapsed": false,
70 "input": [
71 "closed(2)"
72 ],
73 "language": "python",
74 "metadata": {},
75 "outputs": [
76 {
77 "metadata": {},
78 "output_type": "pyout",
79 "prompt_number": 3,
80 "text": [
81 "10"
82 ]
83 }
84 ],
85 "prompt_number": 3
86 },
87 {
88 "cell_type": "code",
89 "collapsed": false,
90 "input": [
91 "import pickle"
92 ],
93 "language": "python",
94 "metadata": {},
95 "outputs": [],
96 "prompt_number": 4
97 },
98 {
99 "cell_type": "markdown",
100 "metadata": {},
101 "source": [
102 "Without help, pickle can't deal with closures"
103 ]
104 },
105 {
106 "cell_type": "code",
107 "collapsed": false,
108 "input": [
109 "pickle.dumps(closed)"
110 ],
111 "language": "python",
112 "metadata": {},
113 "outputs": [
114 {
115 "ename": "PicklingError",
116 "evalue": "Can't pickle <function has_closure at 0x10d2552a8>: it's not found as __main__.has_closure",
117 "output_type": "pyerr",
118 "traceback": [
119 "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m\n\u001b[1;31mPicklingError\u001b[0m Traceback (most recent call last)",
120 "\u001b[1;32m<ipython-input-5-0f1f376cfea0>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[1;32m----> 1\u001b[1;33m \u001b[0mpickle\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mdumps\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mclosed\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m",
121 "\u001b[1;32m/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.pyc\u001b[0m in \u001b[0;36mdumps\u001b[1;34m(obj, protocol)\u001b[0m\n\u001b[0;32m 1372\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mdumps\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mobj\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mprotocol\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mNone\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1373\u001b[0m \u001b[0mfile\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mStringIO\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m-> 1374\u001b[1;33m \u001b[0mPickler\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mfile\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mprotocol\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mdump\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mobj\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 1375\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mfile\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mgetvalue\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1376\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
122 "\u001b[1;32m/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.pyc\u001b[0m in \u001b[0;36mdump\u001b[1;34m(self, obj)\u001b[0m\n\u001b[0;32m 222\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mproto\u001b[0m \u001b[1;33m>=\u001b[0m \u001b[1;36m2\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 223\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mwrite\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mPROTO\u001b[0m \u001b[1;33m+\u001b[0m \u001b[0mchr\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mproto\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 224\u001b[1;33m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msave\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mobj\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 225\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mwrite\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mSTOP\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 226\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
123 "\u001b[1;32m/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.pyc\u001b[0m in \u001b[0;36msave\u001b[1;34m(self, obj)\u001b[0m\n\u001b[0;32m 284\u001b[0m \u001b[0mf\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mdispatch\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mget\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mt\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 285\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mf\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 286\u001b[1;33m \u001b[0mf\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mobj\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;31m# Call unbound method with explicit self\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 287\u001b[0m \u001b[1;32mreturn\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 288\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
124 "\u001b[1;32m/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.pyc\u001b[0m in \u001b[0;36msave_global\u001b[1;34m(self, obj, name, pack)\u001b[0m\n\u001b[0;32m 746\u001b[0m raise PicklingError(\n\u001b[0;32m 747\u001b[0m \u001b[1;34m\"Can't pickle %r: it's not found as %s.%s\"\u001b[0m \u001b[1;33m%\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 748\u001b[1;33m (obj, module, name))\n\u001b[0m\u001b[0;32m 749\u001b[0m \u001b[1;32melse\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 750\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mklass\u001b[0m \u001b[1;32mis\u001b[0m \u001b[1;32mnot\u001b[0m \u001b[0mobj\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
125 "\u001b[1;31mPicklingError\u001b[0m: Can't pickle <function has_closure at 0x10d2552a8>: it's not found as __main__.has_closure"
126 ]
127 }
128 ],
129 "prompt_number": 5
130 },
131 {
132 "cell_type": "markdown",
133 "metadata": {},
134 "source": [
135 "But after we import dill, magic happens"
136 ]
137 },
138 {
139 "cell_type": "code",
140 "collapsed": false,
141 "input": [
142 "import dill"
143 ],
144 "language": "python",
145 "metadata": {},
146 "outputs": [],
147 "prompt_number": 6
148 },
149 {
150 "cell_type": "code",
151 "collapsed": false,
152 "input": [
153 "pickle.dumps(closed)[:64] + '...'"
154 ],
155 "language": "python",
156 "metadata": {},
157 "outputs": [
158 {
159 "metadata": {},
160 "output_type": "pyout",
161 "prompt_number": 7,
162 "text": [
163 "\"cdill.dill\\n_load_type\\np0\\n(S'FunctionType'\\np1\\ntp2\\nRp3\\n(cdill.dill...\""
164 ]
165 }
166 ],
167 "prompt_number": 7
168 },
169 {
170 "cell_type": "markdown",
171 "metadata": {},
172 "source": [
173 "So from now on, pretty much everything is pickleable."
174 ]
175 },
176 {
177 "cell_type": "heading",
178 "level": 2,
179 "metadata": {},
180 "source": [
181 "Now use this in IPython.parallel"
182 ]
183 },
184 {
185 "cell_type": "markdown",
186 "metadata": {},
187 "source": [
188 "As usual, we start by creating our Client and View"
189 ]
190 },
191 {
192 "cell_type": "code",
193 "collapsed": false,
194 "input": [
195 "from IPython import parallel\n",
196 "rc = parallel.Client()\n",
197 "view = rc.load_balanced_view()"
198 ],
199 "language": "python",
200 "metadata": {},
201 "outputs": [],
202 "prompt_number": 8
203 },
204 {
205 "cell_type": "markdown",
206 "metadata": {},
207 "source": [
208 "Now let's try sending our function with a closure:"
209 ]
210 },
211 {
212 "cell_type": "code",
213 "collapsed": false,
214 "input": [
215 "view.apply_sync(closed, 3)"
216 ],
217 "language": "python",
218 "metadata": {},
219 "outputs": [
220 {
221 "ename": "ValueError",
222 "evalue": "Sorry, cannot pickle code objects with closures",
223 "output_type": "pyerr",
224 "traceback": [
225 "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m\n\u001b[1;31mValueError\u001b[0m Traceback (most recent call last)",
226 "\u001b[1;32m<ipython-input-9-23a646829fdc>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[1;32m----> 1\u001b[1;33m \u001b[0mview\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mapply_sync\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mclosed\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m3\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m",
227 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc\u001b[0m in \u001b[0;36mapply_sync\u001b[1;34m(self, f, *args, **kwargs)\u001b[0m\n",
228 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc\u001b[0m in \u001b[0;36mspin_after\u001b[1;34m(f, self, *args, **kwargs)\u001b[0m\n\u001b[0;32m 73\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mspin_after\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mf\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m*\u001b[0m\u001b[0margs\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 74\u001b[0m \u001b[1;34m\"\"\"call spin after the method.\"\"\"\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 75\u001b[1;33m \u001b[0mret\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mf\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m*\u001b[0m\u001b[0margs\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 76\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mspin\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 77\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mret\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
229 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc\u001b[0m in \u001b[0;36mapply_sync\u001b[1;34m(self, f, *args, **kwargs)\u001b[0m\n\u001b[0;32m 248\u001b[0m \u001b[0mreturns\u001b[0m\u001b[1;33m:\u001b[0m \u001b[0mactual\u001b[0m \u001b[0mresult\u001b[0m \u001b[0mof\u001b[0m \u001b[0mf\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m*\u001b[0m\u001b[0margs\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 249\u001b[0m \"\"\"\n\u001b[1;32m--> 250\u001b[1;33m \u001b[1;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_really_apply\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mf\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0margs\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mkwargs\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mblock\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mTrue\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 251\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 252\u001b[0m \u001b[1;31m#----------------------------------------------------------------\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
230 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc\u001b[0m in \u001b[0;36m_really_apply\u001b[1;34m(self, f, args, kwargs, block, track, after, follow, timeout, targets, retries)\u001b[0m\n",
231 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc\u001b[0m in \u001b[0;36msync_results\u001b[1;34m(f, self, *args, **kwargs)\u001b[0m\n\u001b[0;32m 64\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_in_sync_results\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mTrue\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 65\u001b[0m \u001b[1;32mtry\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 66\u001b[1;33m \u001b[0mret\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mf\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m*\u001b[0m\u001b[0margs\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 67\u001b[0m \u001b[1;32mfinally\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 68\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_in_sync_results\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mFalse\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
232 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc\u001b[0m in \u001b[0;36m_really_apply\u001b[1;34m(self, f, args, kwargs, block, track, after, follow, timeout, targets, retries)\u001b[0m\n",
233 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc\u001b[0m in \u001b[0;36msave_ids\u001b[1;34m(f, self, *args, **kwargs)\u001b[0m\n\u001b[0;32m 49\u001b[0m \u001b[0mn_previous\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mlen\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mclient\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mhistory\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 50\u001b[0m \u001b[1;32mtry\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 51\u001b[1;33m \u001b[0mret\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mf\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m*\u001b[0m\u001b[0margs\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 52\u001b[0m \u001b[1;32mfinally\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 53\u001b[0m \u001b[0mnmsgs\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mlen\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mclient\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mhistory\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;33m-\u001b[0m \u001b[0mn_previous\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
234 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc\u001b[0m in \u001b[0;36m_really_apply\u001b[1;34m(self, f, args, kwargs, block, track, after, follow, timeout, targets, retries)\u001b[0m\n\u001b[0;32m 1053\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1054\u001b[0m msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,\n\u001b[1;32m-> 1055\u001b[1;33m metadata=metadata)\n\u001b[0m\u001b[0;32m 1056\u001b[0m \u001b[0mtracker\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mNone\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mtrack\u001b[0m \u001b[1;32mis\u001b[0m \u001b[0mFalse\u001b[0m \u001b[1;32melse\u001b[0m \u001b[0mmsg\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;34m'tracker'\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 1057\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
235 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/parallel/client/client.pyc\u001b[0m in \u001b[0;36msend_apply_request\u001b[1;34m(self, socket, f, args, kwargs, metadata, track, ident)\u001b[0m\n\u001b[0;32m 1252\u001b[0m bufs = serialize.pack_apply_message(f, args, kwargs,\n\u001b[0;32m 1253\u001b[0m \u001b[0mbuffer_threshold\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msession\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mbuffer_threshold\u001b[0m\u001b[1;33m,\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m-> 1254\u001b[1;33m \u001b[0mitem_threshold\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0msession\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mitem_threshold\u001b[0m\u001b[1;33m,\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 1255\u001b[0m )\n\u001b[0;32m 1256\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
236 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/kernel/zmq/serialize.pyc\u001b[0m in \u001b[0;36mpack_apply_message\u001b[1;34m(f, args, kwargs, buffer_threshold, item_threshold)\u001b[0m\n\u001b[0;32m 163\u001b[0m \u001b[0minfo\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mdict\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mnargs\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mlen\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0margs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mnarg_bufs\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mlen\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0marg_bufs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mkw_keys\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mkw_keys\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 164\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 165\u001b[1;33m \u001b[0mmsg\u001b[0m \u001b[1;33m=\u001b[0m \u001b[1;33m[\u001b[0m\u001b[0mpickle\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mdumps\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcan\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mf\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m,\u001b[0m\u001b[1;33m-\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 166\u001b[0m \u001b[0mmsg\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mappend\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mpickle\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mdumps\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0minfo\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m-\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 167\u001b[0m \u001b[0mmsg\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mextend\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0marg_bufs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
237 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/utils/codeutil.pyc\u001b[0m in \u001b[0;36mreduce_code\u001b[1;34m(co)\u001b[0m\n\u001b[0;32m 36\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mreduce_code\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mco\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 37\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_freevars\u001b[0m \u001b[1;32mor\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_cellvars\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 38\u001b[1;33m \u001b[1;32mraise\u001b[0m \u001b[0mValueError\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m\"Sorry, cannot pickle code objects with closures\"\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 39\u001b[0m args = [co.co_argcount, co.co_nlocals, co.co_stacksize,\n\u001b[0;32m 40\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_flags\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_code\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_consts\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_names\u001b[0m\u001b[1;33m,\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
238 "\u001b[1;31mValueError\u001b[0m: Sorry, cannot pickle code objects with closures"
239 ]
240 }
241 ],
242 "prompt_number": 9
243 },
244 {
245 "cell_type": "markdown",
246 "metadata": {},
247 "source": [
248 "Oops, no dice. For IPython to work with dill,\n",
249 "there are one or two more steps. IPython will do these for you if you call `pickleutil.use_dill`:"
250 ]
251 },
252 {
253 "cell_type": "code",
254 "collapsed": false,
255 "input": [
256 "from IPython.utils import pickleutil\n",
257 "pickleutil.use_dill()"
258 ],
259 "language": "python",
260 "metadata": {},
261 "outputs": [],
262 "prompt_number": 10
263 },
264 {
265 "cell_type": "markdown",
266 "metadata": {},
267 "source": [
268 "Now let's try again"
269 ]
270 },
271 {
272 "cell_type": "code",
273 "collapsed": false,
274 "input": [
275 "view.apply_sync(closed, 3)"
276 ],
277 "language": "python",
278 "metadata": {},
279 "outputs": [
280 {
281 "metadata": {},
282 "output_type": "pyout",
283 "prompt_number": 11,
284 "text": [
285 "15"
286 ]
287 }
288 ],
289 "prompt_number": 11
290 },
291 {
292 "cell_type": "markdown",
293 "metadata": {},
294 "source": [
295 "Yay! Now we can use dill to allow IPython.parallel to send anything.\n",
296 "\n",
297 "And that's it! We can send closures and other previously non-pickleables to our engines.\n",
298 "\n",
299 "But wait, there's more!"
300 ]
301 },
302 {
303 "cell_type": "code",
304 "collapsed": false,
305 "input": [
306 "view.apply_sync(make_closure, 2)"
307 ],
308 "language": "python",
309 "metadata": {},
310 "outputs": [
311 {
312 "ename": "RemoteError",
313 "evalue": "ValueError(Sorry, cannot pickle code objects with closures)",
314 "output_type": "pyerr",
315 "traceback": [
316 "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
317 "\u001b[1;31mValueError\u001b[0m Traceback (most recent call last)\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/kernel/zmq/serialize.pyc\u001b[0m in \u001b[0;36mserialize_object\u001b[1;34m(obj, buffer_threshold, item_threshold)\u001b[0m",
318 "\u001b[0;32m 100\u001b[0m \u001b[0mbuffers\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mextend\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0m_extract_buffers\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcobj\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mbuffer_threshold\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m",
319 "\u001b[0;32m 101\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m",
320 "\u001b[1;32m--> 102\u001b[1;33m \u001b[0mbuffers\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0minsert\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;36m0\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mpickle\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mdumps\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcobj\u001b[0m\u001b[1;33m,\u001b[0m\u001b[1;33m-\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m",
321 "\u001b[0m\u001b[0;32m 103\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mbuffers\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m",
322 "\u001b[0;32m 104\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m",
323 "\u001b[1;32m/Users/minrk/dev/ip/mine/IPython/utils/codeutil.pyc\u001b[0m in \u001b[0;36mreduce_code\u001b[1;34m(co)\u001b[0m",
324 "\u001b[0;32m 36\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mreduce_code\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mco\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m",
325 "\u001b[0;32m 37\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_freevars\u001b[0m \u001b[1;32mor\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_cellvars\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m",
326 "\u001b[1;32m---> 38\u001b[1;33m \u001b[1;32mraise\u001b[0m \u001b[0mValueError\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m\"Sorry, cannot pickle code objects with closures\"\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m",
327 "\u001b[0m\u001b[0;32m 39\u001b[0m args = [co.co_argcount, co.co_nlocals, co.co_stacksize,",
328 "\u001b[0;32m 40\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_flags\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_code\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_consts\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mco\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mco_names\u001b[0m\u001b[1;33m,\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m",
329 "\u001b[1;31mValueError\u001b[0m: Sorry, cannot pickle code objects with closures"
330 ]
331 }
332 ],
333 "prompt_number": 12
334 },
335 {
336 "cell_type": "markdown",
337 "metadata": {},
338 "source": [
339 "If we want dill support for objects coming *from* the engines,\n",
340 "then we need to call `use_dill()` there as well.\n",
341 "\n",
342 "`DirectView` objects have a method to call `use_dill` locally and on every engine:"
343 ]
344 },
345 {
346 "cell_type": "code",
347 "collapsed": false,
348 "input": [
349 "rc[:].use_dill()"
350 ],
351 "language": "python",
352 "metadata": {},
353 "outputs": [
354 {
355 "metadata": {},
356 "output_type": "pyout",
357 "prompt_number": 13,
358 "text": [
359 "<AsyncResult: use_dill>"
360 ]
361 }
362 ],
363 "prompt_number": 13
364 },
365 {
366 "cell_type": "markdown",
367 "metadata": {},
368 "source": [
369 "This is equivalent to\n",
370 "\n",
371 "```python\n",
372 "from IPython.utils.pickleutil import use_dill\n",
373 "use_dill()\n",
374 "rc[:].apply(use_dill)\n",
375 "```"
376 ]
377 },
378 {
379 "cell_type": "markdown",
380 "metadata": {},
381 "source": [
382 "Let's give it a try now:"
383 ]
384 },
385 {
386 "cell_type": "code",
387 "collapsed": false,
388 "input": [
389 "remote_closure = view.apply_sync(make_closure, 4)\n",
390 "remote_closure(5)"
391 ],
392 "language": "python",
393 "metadata": {},
394 "outputs": [
395 {
396 "metadata": {},
397 "output_type": "pyout",
398 "prompt_number": 14,
399 "text": [
400 "20"
401 ]
402 }
403 ],
404 "prompt_number": 14
405 },
406 {
407 "cell_type": "markdown",
408 "metadata": {},
409 "source": [
410 "At this point, we can send/recv all kinds of stuff"
411 ]
412 },
413 {
414 "cell_type": "code",
415 "collapsed": false,
416 "input": [
417 "def outer(a):\n",
418 " def inner(b):\n",
419 " def inner_again(c):\n",
420 " return c * b * a\n",
421 " return inner_again\n",
422 " return inner"
423 ],
424 "language": "python",
425 "metadata": {},
426 "outputs": [],
427 "prompt_number": 15
428 },
429 {
430 "cell_type": "markdown",
431 "metadata": {},
432 "source": [
433 "So outer returns a function with a closure, which returns a function with a closure.\n",
434 "\n",
435 "Now, we can resolve the first closure on the engine, the second here, and the third on a different engine,\n",
436 "after passing through a lambda we define here and call there, just for good measure."
437 ]
438 },
439 {
440 "cell_type": "code",
441 "collapsed": false,
442 "input": [
443 "view.apply_sync(lambda f: f(3),view.apply_sync(outer, 1)(2))"
444 ],
445 "language": "python",
446 "metadata": {},
447 "outputs": [
448 {
449 "metadata": {},
450 "output_type": "pyout",
451 "prompt_number": 16,
452 "text": [
453 "6"
454 ]
455 }
456 ],
457 "prompt_number": 16
458 }
459 ],
460 "metadata": {}
461 }
462 ]
463 } No newline at end of file
@@ -1,1114 +1,1125 b''
1 1 """Views of remote engines.
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7 from __future__ import print_function
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2010-2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 import imp
20 20 import sys
21 21 import warnings
22 22 from contextlib import contextmanager
23 23 from types import ModuleType
24 24
25 25 import zmq
26 26
27 27 from IPython.testing.skipdoctest import skip_doctest
28 from IPython.utils import pickleutil
28 29 from IPython.utils.traitlets import (
29 30 HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer
30 31 )
31 32 from IPython.external.decorator import decorator
32 33
33 34 from IPython.parallel import util
34 35 from IPython.parallel.controller.dependency import Dependency, dependent
35 36 from IPython.utils.py3compat import string_types, iteritems, PY3
36 37
37 38 from . import map as Map
38 39 from .asyncresult import AsyncResult, AsyncMapResult
39 40 from .remotefunction import ParallelFunction, parallel, remote, getname
40 41
41 42 #-----------------------------------------------------------------------------
42 43 # Decorators
43 44 #-----------------------------------------------------------------------------
44 45
45 46 @decorator
46 47 def save_ids(f, self, *args, **kwargs):
47 48 """Keep our history and outstanding attributes up to date after a method call."""
48 49 n_previous = len(self.client.history)
49 50 try:
50 51 ret = f(self, *args, **kwargs)
51 52 finally:
52 53 nmsgs = len(self.client.history) - n_previous
53 54 msg_ids = self.client.history[-nmsgs:]
54 55 self.history.extend(msg_ids)
55 56 self.outstanding.update(msg_ids)
56 57 return ret
57 58
58 59 @decorator
59 60 def sync_results(f, self, *args, **kwargs):
60 61 """sync relevant results from self.client to our results attribute."""
61 62 if self._in_sync_results:
62 63 return f(self, *args, **kwargs)
63 64 self._in_sync_results = True
64 65 try:
65 66 ret = f(self, *args, **kwargs)
66 67 finally:
67 68 self._in_sync_results = False
68 69 self._sync_results()
69 70 return ret
70 71
71 72 @decorator
72 73 def spin_after(f, self, *args, **kwargs):
73 74 """call spin after the method."""
74 75 ret = f(self, *args, **kwargs)
75 76 self.spin()
76 77 return ret
77 78
78 79 #-----------------------------------------------------------------------------
79 80 # Classes
80 81 #-----------------------------------------------------------------------------
81 82
82 83 @skip_doctest
83 84 class View(HasTraits):
84 85 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
85 86
86 87 Don't use this class, use subclasses.
87 88
88 89 Methods
89 90 -------
90 91
91 92 spin
92 93 flushes incoming results and registration state changes
93 94 control methods spin, and requesting `ids` also ensures up to date
94 95
95 96 wait
96 97 wait on one or more msg_ids
97 98
98 99 execution methods
99 100 apply
100 101 legacy: execute, run
101 102
102 103 data movement
103 104 push, pull, scatter, gather
104 105
105 106 query methods
106 107 get_result, queue_status, purge_results, result_status
107 108
108 109 control methods
109 110 abort, shutdown
110 111
111 112 """
112 113 # flags
113 114 block=Bool(False)
114 115 track=Bool(True)
115 116 targets = Any()
116 117
117 118 history=List()
118 119 outstanding = Set()
119 120 results = Dict()
120 121 client = Instance('IPython.parallel.Client')
121 122
122 123 _socket = Instance('zmq.Socket')
123 124 _flag_names = List(['targets', 'block', 'track'])
124 125 _in_sync_results = Bool(False)
125 126 _targets = Any()
126 127 _idents = Any()
127 128
128 129 def __init__(self, client=None, socket=None, **flags):
129 130 super(View, self).__init__(client=client, _socket=socket)
130 131 self.results = client.results
131 132 self.block = client.block
132 133
133 134 self.set_flags(**flags)
134 135
135 136 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
136 137
137 138 def __repr__(self):
138 139 strtargets = str(self.targets)
139 140 if len(strtargets) > 16:
140 141 strtargets = strtargets[:12]+'...]'
141 142 return "<%s %s>"%(self.__class__.__name__, strtargets)
142 143
143 144 def __len__(self):
144 145 if isinstance(self.targets, list):
145 146 return len(self.targets)
146 147 elif isinstance(self.targets, int):
147 148 return 1
148 149 else:
149 150 return len(self.client)
150 151
151 152 def set_flags(self, **kwargs):
152 153 """set my attribute flags by keyword.
153 154
154 155 Views determine behavior with a few attributes (`block`, `track`, etc.).
155 156 These attributes can be set all at once by name with this method.
156 157
157 158 Parameters
158 159 ----------
159 160
160 161 block : bool
161 162 whether to wait for results
162 163 track : bool
163 164 whether to create a MessageTracker to allow the user to
164 165 safely edit after arrays and buffers during non-copying
165 166 sends.
166 167 """
167 168 for name, value in iteritems(kwargs):
168 169 if name not in self._flag_names:
169 170 raise KeyError("Invalid name: %r"%name)
170 171 else:
171 172 setattr(self, name, value)
172 173
173 174 @contextmanager
174 175 def temp_flags(self, **kwargs):
175 176 """temporarily set flags, for use in `with` statements.
176 177
177 178 See set_flags for permanent setting of flags
178 179
179 180 Examples
180 181 --------
181 182
182 183 >>> view.track=False
183 184 ...
184 185 >>> with view.temp_flags(track=True):
185 186 ... ar = view.apply(dostuff, my_big_array)
186 187 ... ar.tracker.wait() # wait for send to finish
187 188 >>> view.track
188 189 False
189 190
190 191 """
191 192 # preflight: save flags, and set temporaries
192 193 saved_flags = {}
193 194 for f in self._flag_names:
194 195 saved_flags[f] = getattr(self, f)
195 196 self.set_flags(**kwargs)
196 197 # yield to the with-statement block
197 198 try:
198 199 yield
199 200 finally:
200 201 # postflight: restore saved flags
201 202 self.set_flags(**saved_flags)
202 203
203 204
204 205 #----------------------------------------------------------------
205 206 # apply
206 207 #----------------------------------------------------------------
207 208
208 209 def _sync_results(self):
209 210 """to be called by @sync_results decorator
210 211
211 212 after submitting any tasks.
212 213 """
213 214 delta = self.outstanding.difference(self.client.outstanding)
214 215 completed = self.outstanding.intersection(delta)
215 216 self.outstanding = self.outstanding.difference(completed)
216 217
217 218 @sync_results
218 219 @save_ids
219 220 def _really_apply(self, f, args, kwargs, block=None, **options):
220 221 """wrapper for client.send_apply_request"""
221 222 raise NotImplementedError("Implement in subclasses")
222 223
223 224 def apply(self, f, *args, **kwargs):
224 225 """calls ``f(*args, **kwargs)`` on remote engines, returning the result.
225 226
226 227 This method sets all apply flags via this View's attributes.
227 228
228 229 Returns :class:`~IPython.parallel.client.asyncresult.AsyncResult`
229 230 instance if ``self.block`` is False, otherwise the return value of
230 231 ``f(*args, **kwargs)``.
231 232 """
232 233 return self._really_apply(f, args, kwargs)
233 234
234 235 def apply_async(self, f, *args, **kwargs):
235 236 """calls ``f(*args, **kwargs)`` on remote engines in a nonblocking manner.
236 237
237 238 Returns :class:`~IPython.parallel.client.asyncresult.AsyncResult` instance.
238 239 """
239 240 return self._really_apply(f, args, kwargs, block=False)
240 241
241 242 @spin_after
242 243 def apply_sync(self, f, *args, **kwargs):
243 244 """calls ``f(*args, **kwargs)`` on remote engines in a blocking manner,
244 245 returning the result.
245 246 """
246 247 return self._really_apply(f, args, kwargs, block=True)
247 248
248 249 #----------------------------------------------------------------
249 250 # wrappers for client and control methods
250 251 #----------------------------------------------------------------
251 252 @sync_results
252 253 def spin(self):
253 254 """spin the client, and sync"""
254 255 self.client.spin()
255 256
256 257 @sync_results
257 258 def wait(self, jobs=None, timeout=-1):
258 259 """waits on one or more `jobs`, for up to `timeout` seconds.
259 260
260 261 Parameters
261 262 ----------
262 263
263 264 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
264 265 ints are indices to self.history
265 266 strs are msg_ids
266 267 default: wait on all outstanding messages
267 268 timeout : float
268 269 a time in seconds, after which to give up.
269 270 default is -1, which means no timeout
270 271
271 272 Returns
272 273 -------
273 274
274 275 True : when all msg_ids are done
275 276 False : timeout reached, some msg_ids still outstanding
276 277 """
277 278 if jobs is None:
278 279 jobs = self.history
279 280 return self.client.wait(jobs, timeout)
280 281
281 282 def abort(self, jobs=None, targets=None, block=None):
282 283 """Abort jobs on my engines.
283 284
284 285 Parameters
285 286 ----------
286 287
287 288 jobs : None, str, list of strs, optional
288 289 if None: abort all jobs.
289 290 else: abort specific msg_id(s).
290 291 """
291 292 block = block if block is not None else self.block
292 293 targets = targets if targets is not None else self.targets
293 294 jobs = jobs if jobs is not None else list(self.outstanding)
294 295
295 296 return self.client.abort(jobs=jobs, targets=targets, block=block)
296 297
297 298 def queue_status(self, targets=None, verbose=False):
298 299 """Fetch the Queue status of my engines"""
299 300 targets = targets if targets is not None else self.targets
300 301 return self.client.queue_status(targets=targets, verbose=verbose)
301 302
302 303 def purge_results(self, jobs=[], targets=[]):
303 304 """Instruct the controller to forget specific results."""
304 305 if targets is None or targets == 'all':
305 306 targets = self.targets
306 307 return self.client.purge_results(jobs=jobs, targets=targets)
307 308
308 309 def shutdown(self, targets=None, restart=False, hub=False, block=None):
309 310 """Terminates one or more engine processes, optionally including the hub.
310 311 """
311 312 block = self.block if block is None else block
312 313 if targets is None or targets == 'all':
313 314 targets = self.targets
314 315 return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block)
315 316
316 317 @spin_after
317 318 def get_result(self, indices_or_msg_ids=None):
318 319 """return one or more results, specified by history index or msg_id.
319 320
320 321 See :meth:`IPython.parallel.client.client.Client.get_result` for details.
321 322 """
322 323
323 324 if indices_or_msg_ids is None:
324 325 indices_or_msg_ids = -1
325 326 if isinstance(indices_or_msg_ids, int):
326 327 indices_or_msg_ids = self.history[indices_or_msg_ids]
327 328 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
328 329 indices_or_msg_ids = list(indices_or_msg_ids)
329 330 for i,index in enumerate(indices_or_msg_ids):
330 331 if isinstance(index, int):
331 332 indices_or_msg_ids[i] = self.history[index]
332 333 return self.client.get_result(indices_or_msg_ids)
333 334
334 335 #-------------------------------------------------------------------
335 336 # Map
336 337 #-------------------------------------------------------------------
337 338
338 339 @sync_results
339 340 def map(self, f, *sequences, **kwargs):
340 341 """override in subclasses"""
341 342 raise NotImplementedError
342 343
343 344 def map_async(self, f, *sequences, **kwargs):
344 345 """Parallel version of builtin :func:`python:map`, using this view's engines.
345 346
346 347 This is equivalent to ``map(...block=False)``.
347 348
348 349 See `self.map` for details.
349 350 """
350 351 if 'block' in kwargs:
351 352 raise TypeError("map_async doesn't take a `block` keyword argument.")
352 353 kwargs['block'] = False
353 354 return self.map(f,*sequences,**kwargs)
354 355
355 356 def map_sync(self, f, *sequences, **kwargs):
356 357 """Parallel version of builtin :func:`python:map`, using this view's engines.
357 358
358 359 This is equivalent to ``map(...block=True)``.
359 360
360 361 See `self.map` for details.
361 362 """
362 363 if 'block' in kwargs:
363 364 raise TypeError("map_sync doesn't take a `block` keyword argument.")
364 365 kwargs['block'] = True
365 366 return self.map(f,*sequences,**kwargs)
366 367
367 368 def imap(self, f, *sequences, **kwargs):
368 369 """Parallel version of :func:`itertools.imap`.
369 370
370 371 See `self.map` for details.
371 372
372 373 """
373 374
374 375 return iter(self.map_async(f,*sequences, **kwargs))
375 376
376 377 #-------------------------------------------------------------------
377 378 # Decorators
378 379 #-------------------------------------------------------------------
379 380
380 381 def remote(self, block=None, **flags):
381 382 """Decorator for making a RemoteFunction"""
382 383 block = self.block if block is None else block
383 384 return remote(self, block=block, **flags)
384 385
385 386 def parallel(self, dist='b', block=None, **flags):
386 387 """Decorator for making a ParallelFunction"""
387 388 block = self.block if block is None else block
388 389 return parallel(self, dist=dist, block=block, **flags)
389 390
390 391 @skip_doctest
391 392 class DirectView(View):
392 393 """Direct Multiplexer View of one or more engines.
393 394
394 395 These are created via indexed access to a client:
395 396
396 397 >>> dv_1 = client[1]
397 398 >>> dv_all = client[:]
398 399 >>> dv_even = client[::2]
399 400 >>> dv_some = client[1:3]
400 401
401 402 This object provides dictionary access to engine namespaces:
402 403
403 404 # push a=5:
404 405 >>> dv['a'] = 5
405 406 # pull 'foo':
406 407 >>> db['foo']
407 408
408 409 """
409 410
410 411 def __init__(self, client=None, socket=None, targets=None):
411 412 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
412 413
413 414 @property
414 415 def importer(self):
415 416 """sync_imports(local=True) as a property.
416 417
417 418 See sync_imports for details.
418 419
419 420 """
420 421 return self.sync_imports(True)
421 422
422 423 @contextmanager
423 424 def sync_imports(self, local=True, quiet=False):
424 425 """Context Manager for performing simultaneous local and remote imports.
425 426
426 427 'import x as y' will *not* work. The 'as y' part will simply be ignored.
427 428
428 429 If `local=True`, then the package will also be imported locally.
429 430
430 431 If `quiet=True`, no output will be produced when attempting remote
431 432 imports.
432 433
433 434 Note that remote-only (`local=False`) imports have not been implemented.
434 435
435 436 >>> with view.sync_imports():
436 437 ... from numpy import recarray
437 438 importing recarray from numpy on engine(s)
438 439
439 440 """
440 441 from IPython.utils.py3compat import builtin_mod
441 442 local_import = builtin_mod.__import__
442 443 modules = set()
443 444 results = []
444 445 @util.interactive
445 446 def remote_import(name, fromlist, level):
446 447 """the function to be passed to apply, that actually performs the import
447 448 on the engine, and loads up the user namespace.
448 449 """
449 450 import sys
450 451 user_ns = globals()
451 452 mod = __import__(name, fromlist=fromlist, level=level)
452 453 if fromlist:
453 454 for key in fromlist:
454 455 user_ns[key] = getattr(mod, key)
455 456 else:
456 457 user_ns[name] = sys.modules[name]
457 458
458 459 def view_import(name, globals={}, locals={}, fromlist=[], level=0):
459 460 """the drop-in replacement for __import__, that optionally imports
460 461 locally as well.
461 462 """
462 463 # don't override nested imports
463 464 save_import = builtin_mod.__import__
464 465 builtin_mod.__import__ = local_import
465 466
466 467 if imp.lock_held():
467 468 # this is a side-effect import, don't do it remotely, or even
468 469 # ignore the local effects
469 470 return local_import(name, globals, locals, fromlist, level)
470 471
471 472 imp.acquire_lock()
472 473 if local:
473 474 mod = local_import(name, globals, locals, fromlist, level)
474 475 else:
475 476 raise NotImplementedError("remote-only imports not yet implemented")
476 477 imp.release_lock()
477 478
478 479 key = name+':'+','.join(fromlist or [])
479 480 if level <= 0 and key not in modules:
480 481 modules.add(key)
481 482 if not quiet:
482 483 if fromlist:
483 484 print("importing %s from %s on engine(s)"%(','.join(fromlist), name))
484 485 else:
485 486 print("importing %s on engine(s)"%name)
486 487 results.append(self.apply_async(remote_import, name, fromlist, level))
487 488 # restore override
488 489 builtin_mod.__import__ = save_import
489 490
490 491 return mod
491 492
492 493 # override __import__
493 494 builtin_mod.__import__ = view_import
494 495 try:
495 496 # enter the block
496 497 yield
497 498 except ImportError:
498 499 if local:
499 500 raise
500 501 else:
501 502 # ignore import errors if not doing local imports
502 503 pass
503 504 finally:
504 505 # always restore __import__
505 506 builtin_mod.__import__ = local_import
506 507
507 508 for r in results:
508 509 # raise possible remote ImportErrors here
509 510 r.get()
511
512 def use_dill(self):
513 """Expand serialization support with dill
514
515 adds support for closures, etc.
516
517 This calls IPython.utils.pickleutil.use_dill() here and on each engine.
518 """
519 pickleutil.use_dill()
520 return self.apply(pickleutil.use_dill)
510 521
511 522
512 523 @sync_results
513 524 @save_ids
514 525 def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None):
515 526 """calls f(*args, **kwargs) on remote engines, returning the result.
516 527
517 528 This method sets all of `apply`'s flags via this View's attributes.
518 529
519 530 Parameters
520 531 ----------
521 532
522 533 f : callable
523 534
524 535 args : list [default: empty]
525 536
526 537 kwargs : dict [default: empty]
527 538
528 539 targets : target list [default: self.targets]
529 540 where to run
530 541 block : bool [default: self.block]
531 542 whether to block
532 543 track : bool [default: self.track]
533 544 whether to ask zmq to track the message, for safe non-copying sends
534 545
535 546 Returns
536 547 -------
537 548
538 549 if self.block is False:
539 550 returns AsyncResult
540 551 else:
541 552 returns actual result of f(*args, **kwargs) on the engine(s)
542 553 This will be a list of self.targets is also a list (even length 1), or
543 554 the single result if self.targets is an integer engine id
544 555 """
545 556 args = [] if args is None else args
546 557 kwargs = {} if kwargs is None else kwargs
547 558 block = self.block if block is None else block
548 559 track = self.track if track is None else track
549 560 targets = self.targets if targets is None else targets
550 561
551 562 _idents, _targets = self.client._build_targets(targets)
552 563 msg_ids = []
553 564 trackers = []
554 565 for ident in _idents:
555 566 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
556 567 ident=ident)
557 568 if track:
558 569 trackers.append(msg['tracker'])
559 570 msg_ids.append(msg['header']['msg_id'])
560 571 if isinstance(targets, int):
561 572 msg_ids = msg_ids[0]
562 573 tracker = None if track is False else zmq.MessageTracker(*trackers)
563 574 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=_targets, tracker=tracker)
564 575 if block:
565 576 try:
566 577 return ar.get()
567 578 except KeyboardInterrupt:
568 579 pass
569 580 return ar
570 581
571 582
572 583 @sync_results
573 584 def map(self, f, *sequences, **kwargs):
574 585 """``view.map(f, *sequences, block=self.block)`` => list|AsyncMapResult
575 586
576 587 Parallel version of builtin `map`, using this View's `targets`.
577 588
578 589 There will be one task per target, so work will be chunked
579 590 if the sequences are longer than `targets`.
580 591
581 592 Results can be iterated as they are ready, but will become available in chunks.
582 593
583 594 Parameters
584 595 ----------
585 596
586 597 f : callable
587 598 function to be mapped
588 599 *sequences: one or more sequences of matching length
589 600 the sequences to be distributed and passed to `f`
590 601 block : bool
591 602 whether to wait for the result or not [default self.block]
592 603
593 604 Returns
594 605 -------
595 606
596 607
597 608 If block=False
598 609 An :class:`~IPython.parallel.client.asyncresult.AsyncMapResult` instance.
599 610 An object like AsyncResult, but which reassembles the sequence of results
600 611 into a single list. AsyncMapResults can be iterated through before all
601 612 results are complete.
602 613 else
603 614 A list, the result of ``map(f,*sequences)``
604 615 """
605 616
606 617 block = kwargs.pop('block', self.block)
607 618 for k in kwargs.keys():
608 619 if k not in ['block', 'track']:
609 620 raise TypeError("invalid keyword arg, %r"%k)
610 621
611 622 assert len(sequences) > 0, "must have some sequences to map onto!"
612 623 pf = ParallelFunction(self, f, block=block, **kwargs)
613 624 return pf.map(*sequences)
614 625
615 626 @sync_results
616 627 @save_ids
617 628 def execute(self, code, silent=True, targets=None, block=None):
618 629 """Executes `code` on `targets` in blocking or nonblocking manner.
619 630
620 631 ``execute`` is always `bound` (affects engine namespace)
621 632
622 633 Parameters
623 634 ----------
624 635
625 636 code : str
626 637 the code string to be executed
627 638 block : bool
628 639 whether or not to wait until done to return
629 640 default: self.block
630 641 """
631 642 block = self.block if block is None else block
632 643 targets = self.targets if targets is None else targets
633 644
634 645 _idents, _targets = self.client._build_targets(targets)
635 646 msg_ids = []
636 647 trackers = []
637 648 for ident in _idents:
638 649 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
639 650 msg_ids.append(msg['header']['msg_id'])
640 651 if isinstance(targets, int):
641 652 msg_ids = msg_ids[0]
642 653 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=_targets)
643 654 if block:
644 655 try:
645 656 ar.get()
646 657 except KeyboardInterrupt:
647 658 pass
648 659 return ar
649 660
650 661 def run(self, filename, targets=None, block=None):
651 662 """Execute contents of `filename` on my engine(s).
652 663
653 664 This simply reads the contents of the file and calls `execute`.
654 665
655 666 Parameters
656 667 ----------
657 668
658 669 filename : str
659 670 The path to the file
660 671 targets : int/str/list of ints/strs
661 672 the engines on which to execute
662 673 default : all
663 674 block : bool
664 675 whether or not to wait until done
665 676 default: self.block
666 677
667 678 """
668 679 with open(filename, 'r') as f:
669 680 # add newline in case of trailing indented whitespace
670 681 # which will cause SyntaxError
671 682 code = f.read()+'\n'
672 683 return self.execute(code, block=block, targets=targets)
673 684
674 685 def update(self, ns):
675 686 """update remote namespace with dict `ns`
676 687
677 688 See `push` for details.
678 689 """
679 690 return self.push(ns, block=self.block, track=self.track)
680 691
681 692 def push(self, ns, targets=None, block=None, track=None):
682 693 """update remote namespace with dict `ns`
683 694
684 695 Parameters
685 696 ----------
686 697
687 698 ns : dict
688 699 dict of keys with which to update engine namespace(s)
689 700 block : bool [default : self.block]
690 701 whether to wait to be notified of engine receipt
691 702
692 703 """
693 704
694 705 block = block if block is not None else self.block
695 706 track = track if track is not None else self.track
696 707 targets = targets if targets is not None else self.targets
697 708 # applier = self.apply_sync if block else self.apply_async
698 709 if not isinstance(ns, dict):
699 710 raise TypeError("Must be a dict, not %s"%type(ns))
700 711 return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets)
701 712
702 713 def get(self, key_s):
703 714 """get object(s) by `key_s` from remote namespace
704 715
705 716 see `pull` for details.
706 717 """
707 718 # block = block if block is not None else self.block
708 719 return self.pull(key_s, block=True)
709 720
710 721 def pull(self, names, targets=None, block=None):
711 722 """get object(s) by `name` from remote namespace
712 723
713 724 will return one object if it is a key.
714 725 can also take a list of keys, in which case it will return a list of objects.
715 726 """
716 727 block = block if block is not None else self.block
717 728 targets = targets if targets is not None else self.targets
718 729 applier = self.apply_sync if block else self.apply_async
719 730 if isinstance(names, string_types):
720 731 pass
721 732 elif isinstance(names, (list,tuple,set)):
722 733 for key in names:
723 734 if not isinstance(key, string_types):
724 735 raise TypeError("keys must be str, not type %r"%type(key))
725 736 else:
726 737 raise TypeError("names must be strs, not %r"%names)
727 738 return self._really_apply(util._pull, (names,), block=block, targets=targets)
728 739
729 740 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None):
730 741 """
731 742 Partition a Python sequence and send the partitions to a set of engines.
732 743 """
733 744 block = block if block is not None else self.block
734 745 track = track if track is not None else self.track
735 746 targets = targets if targets is not None else self.targets
736 747
737 748 # construct integer ID list:
738 749 targets = self.client._build_targets(targets)[1]
739 750
740 751 mapObject = Map.dists[dist]()
741 752 nparts = len(targets)
742 753 msg_ids = []
743 754 trackers = []
744 755 for index, engineid in enumerate(targets):
745 756 partition = mapObject.getPartition(seq, index, nparts)
746 757 if flatten and len(partition) == 1:
747 758 ns = {key: partition[0]}
748 759 else:
749 760 ns = {key: partition}
750 761 r = self.push(ns, block=False, track=track, targets=engineid)
751 762 msg_ids.extend(r.msg_ids)
752 763 if track:
753 764 trackers.append(r._tracker)
754 765
755 766 if track:
756 767 tracker = zmq.MessageTracker(*trackers)
757 768 else:
758 769 tracker = None
759 770
760 771 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
761 772 if block:
762 773 r.wait()
763 774 else:
764 775 return r
765 776
766 777 @sync_results
767 778 @save_ids
768 779 def gather(self, key, dist='b', targets=None, block=None):
769 780 """
770 781 Gather a partitioned sequence on a set of engines as a single local seq.
771 782 """
772 783 block = block if block is not None else self.block
773 784 targets = targets if targets is not None else self.targets
774 785 mapObject = Map.dists[dist]()
775 786 msg_ids = []
776 787
777 788 # construct integer ID list:
778 789 targets = self.client._build_targets(targets)[1]
779 790
780 791 for index, engineid in enumerate(targets):
781 792 msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids)
782 793
783 794 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
784 795
785 796 if block:
786 797 try:
787 798 return r.get()
788 799 except KeyboardInterrupt:
789 800 pass
790 801 return r
791 802
792 803 def __getitem__(self, key):
793 804 return self.get(key)
794 805
795 806 def __setitem__(self,key, value):
796 807 self.update({key:value})
797 808
798 809 def clear(self, targets=None, block=None):
799 810 """Clear the remote namespaces on my engines."""
800 811 block = block if block is not None else self.block
801 812 targets = targets if targets is not None else self.targets
802 813 return self.client.clear(targets=targets, block=block)
803 814
804 815 #----------------------------------------
805 816 # activate for %px, %autopx, etc. magics
806 817 #----------------------------------------
807 818
808 819 def activate(self, suffix=''):
809 820 """Activate IPython magics associated with this View
810 821
811 822 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig`
812 823
813 824 Parameters
814 825 ----------
815 826
816 827 suffix: str [default: '']
817 828 The suffix, if any, for the magics. This allows you to have
818 829 multiple views associated with parallel magics at the same time.
819 830
820 831 e.g. ``rc[::2].activate(suffix='_even')`` will give you
821 832 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics
822 833 on the even engines.
823 834 """
824 835
825 836 from IPython.parallel.client.magics import ParallelMagics
826 837
827 838 try:
828 839 # This is injected into __builtins__.
829 840 ip = get_ipython()
830 841 except NameError:
831 842 print("The IPython parallel magics (%px, etc.) only work within IPython.")
832 843 return
833 844
834 845 M = ParallelMagics(ip, self, suffix)
835 846 ip.magics_manager.register(M)
836 847
837 848
838 849 @skip_doctest
839 850 class LoadBalancedView(View):
840 851 """An load-balancing View that only executes via the Task scheduler.
841 852
842 853 Load-balanced views can be created with the client's `view` method:
843 854
844 855 >>> v = client.load_balanced_view()
845 856
846 857 or targets can be specified, to restrict the potential destinations:
847 858
848 859 >>> v = client.client.load_balanced_view([1,3])
849 860
850 861 which would restrict loadbalancing to between engines 1 and 3.
851 862
852 863 """
853 864
854 865 follow=Any()
855 866 after=Any()
856 867 timeout=CFloat()
857 868 retries = Integer(0)
858 869
859 870 _task_scheme = Any()
860 871 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
861 872
862 873 def __init__(self, client=None, socket=None, **flags):
863 874 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
864 875 self._task_scheme=client._task_scheme
865 876
866 877 def _validate_dependency(self, dep):
867 878 """validate a dependency.
868 879
869 880 For use in `set_flags`.
870 881 """
871 882 if dep is None or isinstance(dep, string_types + (AsyncResult, Dependency)):
872 883 return True
873 884 elif isinstance(dep, (list,set, tuple)):
874 885 for d in dep:
875 886 if not isinstance(d, string_types + (AsyncResult,)):
876 887 return False
877 888 elif isinstance(dep, dict):
878 889 if set(dep.keys()) != set(Dependency().as_dict().keys()):
879 890 return False
880 891 if not isinstance(dep['msg_ids'], list):
881 892 return False
882 893 for d in dep['msg_ids']:
883 894 if not isinstance(d, string_types):
884 895 return False
885 896 else:
886 897 return False
887 898
888 899 return True
889 900
890 901 def _render_dependency(self, dep):
891 902 """helper for building jsonable dependencies from various input forms."""
892 903 if isinstance(dep, Dependency):
893 904 return dep.as_dict()
894 905 elif isinstance(dep, AsyncResult):
895 906 return dep.msg_ids
896 907 elif dep is None:
897 908 return []
898 909 else:
899 910 # pass to Dependency constructor
900 911 return list(Dependency(dep))
901 912
902 913 def set_flags(self, **kwargs):
903 914 """set my attribute flags by keyword.
904 915
905 916 A View is a wrapper for the Client's apply method, but with attributes
906 917 that specify keyword arguments, those attributes can be set by keyword
907 918 argument with this method.
908 919
909 920 Parameters
910 921 ----------
911 922
912 923 block : bool
913 924 whether to wait for results
914 925 track : bool
915 926 whether to create a MessageTracker to allow the user to
916 927 safely edit after arrays and buffers during non-copying
917 928 sends.
918 929
919 930 after : Dependency or collection of msg_ids
920 931 Only for load-balanced execution (targets=None)
921 932 Specify a list of msg_ids as a time-based dependency.
922 933 This job will only be run *after* the dependencies
923 934 have been met.
924 935
925 936 follow : Dependency or collection of msg_ids
926 937 Only for load-balanced execution (targets=None)
927 938 Specify a list of msg_ids as a location-based dependency.
928 939 This job will only be run on an engine where this dependency
929 940 is met.
930 941
931 942 timeout : float/int or None
932 943 Only for load-balanced execution (targets=None)
933 944 Specify an amount of time (in seconds) for the scheduler to
934 945 wait for dependencies to be met before failing with a
935 946 DependencyTimeout.
936 947
937 948 retries : int
938 949 Number of times a task will be retried on failure.
939 950 """
940 951
941 952 super(LoadBalancedView, self).set_flags(**kwargs)
942 953 for name in ('follow', 'after'):
943 954 if name in kwargs:
944 955 value = kwargs[name]
945 956 if self._validate_dependency(value):
946 957 setattr(self, name, value)
947 958 else:
948 959 raise ValueError("Invalid dependency: %r"%value)
949 960 if 'timeout' in kwargs:
950 961 t = kwargs['timeout']
951 962 if not isinstance(t, (int, float, type(None))):
952 963 if (not PY3) and (not isinstance(t, long)):
953 964 raise TypeError("Invalid type for timeout: %r"%type(t))
954 965 if t is not None:
955 966 if t < 0:
956 967 raise ValueError("Invalid timeout: %s"%t)
957 968 self.timeout = t
958 969
959 970 @sync_results
960 971 @save_ids
961 972 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
962 973 after=None, follow=None, timeout=None,
963 974 targets=None, retries=None):
964 975 """calls f(*args, **kwargs) on a remote engine, returning the result.
965 976
966 977 This method temporarily sets all of `apply`'s flags for a single call.
967 978
968 979 Parameters
969 980 ----------
970 981
971 982 f : callable
972 983
973 984 args : list [default: empty]
974 985
975 986 kwargs : dict [default: empty]
976 987
977 988 block : bool [default: self.block]
978 989 whether to block
979 990 track : bool [default: self.track]
980 991 whether to ask zmq to track the message, for safe non-copying sends
981 992
982 993 !!!!!! TODO: THE REST HERE !!!!
983 994
984 995 Returns
985 996 -------
986 997
987 998 if self.block is False:
988 999 returns AsyncResult
989 1000 else:
990 1001 returns actual result of f(*args, **kwargs) on the engine(s)
991 1002 This will be a list of self.targets is also a list (even length 1), or
992 1003 the single result if self.targets is an integer engine id
993 1004 """
994 1005
995 1006 # validate whether we can run
996 1007 if self._socket.closed:
997 1008 msg = "Task farming is disabled"
998 1009 if self._task_scheme == 'pure':
999 1010 msg += " because the pure ZMQ scheduler cannot handle"
1000 1011 msg += " disappearing engines."
1001 1012 raise RuntimeError(msg)
1002 1013
1003 1014 if self._task_scheme == 'pure':
1004 1015 # pure zmq scheme doesn't support extra features
1005 1016 msg = "Pure ZMQ scheduler doesn't support the following flags:"
1006 1017 "follow, after, retries, targets, timeout"
1007 1018 if (follow or after or retries or targets or timeout):
1008 1019 # hard fail on Scheduler flags
1009 1020 raise RuntimeError(msg)
1010 1021 if isinstance(f, dependent):
1011 1022 # soft warn on functional dependencies
1012 1023 warnings.warn(msg, RuntimeWarning)
1013 1024
1014 1025 # build args
1015 1026 args = [] if args is None else args
1016 1027 kwargs = {} if kwargs is None else kwargs
1017 1028 block = self.block if block is None else block
1018 1029 track = self.track if track is None else track
1019 1030 after = self.after if after is None else after
1020 1031 retries = self.retries if retries is None else retries
1021 1032 follow = self.follow if follow is None else follow
1022 1033 timeout = self.timeout if timeout is None else timeout
1023 1034 targets = self.targets if targets is None else targets
1024 1035
1025 1036 if not isinstance(retries, int):
1026 1037 raise TypeError('retries must be int, not %r'%type(retries))
1027 1038
1028 1039 if targets is None:
1029 1040 idents = []
1030 1041 else:
1031 1042 idents = self.client._build_targets(targets)[0]
1032 1043 # ensure *not* bytes
1033 1044 idents = [ ident.decode() for ident in idents ]
1034 1045
1035 1046 after = self._render_dependency(after)
1036 1047 follow = self._render_dependency(follow)
1037 1048 metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1038 1049
1039 1050 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1040 1051 metadata=metadata)
1041 1052 tracker = None if track is False else msg['tracker']
1042 1053
1043 1054 ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
1044 1055
1045 1056 if block:
1046 1057 try:
1047 1058 return ar.get()
1048 1059 except KeyboardInterrupt:
1049 1060 pass
1050 1061 return ar
1051 1062
1052 1063 @sync_results
1053 1064 @save_ids
1054 1065 def map(self, f, *sequences, **kwargs):
1055 1066 """``view.map(f, *sequences, block=self.block, chunksize=1, ordered=True)`` => list|AsyncMapResult
1056 1067
1057 1068 Parallel version of builtin `map`, load-balanced by this View.
1058 1069
1059 1070 `block`, and `chunksize` can be specified by keyword only.
1060 1071
1061 1072 Each `chunksize` elements will be a separate task, and will be
1062 1073 load-balanced. This lets individual elements be available for iteration
1063 1074 as soon as they arrive.
1064 1075
1065 1076 Parameters
1066 1077 ----------
1067 1078
1068 1079 f : callable
1069 1080 function to be mapped
1070 1081 *sequences: one or more sequences of matching length
1071 1082 the sequences to be distributed and passed to `f`
1072 1083 block : bool [default self.block]
1073 1084 whether to wait for the result or not
1074 1085 track : bool
1075 1086 whether to create a MessageTracker to allow the user to
1076 1087 safely edit after arrays and buffers during non-copying
1077 1088 sends.
1078 1089 chunksize : int [default 1]
1079 1090 how many elements should be in each task.
1080 1091 ordered : bool [default True]
1081 1092 Whether the results should be gathered as they arrive, or enforce
1082 1093 the order of submission.
1083 1094
1084 1095 Only applies when iterating through AsyncMapResult as results arrive.
1085 1096 Has no effect when block=True.
1086 1097
1087 1098 Returns
1088 1099 -------
1089 1100
1090 1101 if block=False
1091 1102 An :class:`~IPython.parallel.client.asyncresult.AsyncMapResult` instance.
1092 1103 An object like AsyncResult, but which reassembles the sequence of results
1093 1104 into a single list. AsyncMapResults can be iterated through before all
1094 1105 results are complete.
1095 1106 else
1096 1107 A list, the result of ``map(f,*sequences)``
1097 1108 """
1098 1109
1099 1110 # default
1100 1111 block = kwargs.get('block', self.block)
1101 1112 chunksize = kwargs.get('chunksize', 1)
1102 1113 ordered = kwargs.get('ordered', True)
1103 1114
1104 1115 keyset = set(kwargs.keys())
1105 1116 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1106 1117 if extra_keys:
1107 1118 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
1108 1119
1109 1120 assert len(sequences) > 0, "must have some sequences to map onto!"
1110 1121
1111 1122 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1112 1123 return pf.map(*sequences)
1113 1124
1114 1125 __all__ = ['LoadBalancedView', 'DirectView']
@@ -1,352 +1,382 b''
1 1 # encoding: utf-8
2 2
3 3 """Pickle related utilities. Perhaps this should be called 'can'."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-------------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-------------------------------------------------------------------------------
13 13
14 14 #-------------------------------------------------------------------------------
15 15 # Imports
16 16 #-------------------------------------------------------------------------------
17 17
18 18 import copy
19 19 import logging
20 20 import sys
21 21 from types import FunctionType
22 22
23 23 try:
24 24 import cPickle as pickle
25 25 except ImportError:
26 26 import pickle
27 27
28 28 from . import codeutil # This registers a hook when it's imported
29 29 from . import py3compat
30 30 from .importstring import import_item
31 31 from .py3compat import string_types, iteritems
32 32
33 33 from IPython.config import Application
34 34
35 35 if py3compat.PY3:
36 36 buffer = memoryview
37 37 class_type = type
38 38 else:
39 39 from types import ClassType
40 40 class_type = (type, ClassType)
41 41
42 42 #-------------------------------------------------------------------------------
43 # Functions
44 #-------------------------------------------------------------------------------
45
46
47 def use_dill():
48 """use dill to expand serialization support
49
50 adds support for object methods and closures to serialization.
51 """
52 # import dill causes most of the magic
53 import dill
54
55 # dill doesn't work with cPickle,
56 # tell the two relevant modules to use plain pickle
57
58 global pickle
59 pickle = dill
60
61 try:
62 from IPython.kernel.zmq import serialize
63 except ImportError:
64 pass
65 else:
66 serialize.pickle = dill
67
68 # disable special function handling, let dill take care of it
69 can_map.pop(FunctionType, None)
70
71
72 #-------------------------------------------------------------------------------
43 73 # Classes
44 74 #-------------------------------------------------------------------------------
45 75
46 76
47 77 class CannedObject(object):
48 78 def __init__(self, obj, keys=[], hook=None):
49 79 """can an object for safe pickling
50 80
51 81 Parameters
52 82 ==========
53 83
54 84 obj:
55 85 The object to be canned
56 86 keys: list (optional)
57 87 list of attribute names that will be explicitly canned / uncanned
58 88 hook: callable (optional)
59 89 An optional extra callable,
60 90 which can do additional processing of the uncanned object.
61 91
62 92 large data may be offloaded into the buffers list,
63 93 used for zero-copy transfers.
64 94 """
65 95 self.keys = keys
66 96 self.obj = copy.copy(obj)
67 97 self.hook = can(hook)
68 98 for key in keys:
69 99 setattr(self.obj, key, can(getattr(obj, key)))
70 100
71 101 self.buffers = []
72 102
73 103 def get_object(self, g=None):
74 104 if g is None:
75 105 g = {}
76 106 obj = self.obj
77 107 for key in self.keys:
78 108 setattr(obj, key, uncan(getattr(obj, key), g))
79 109
80 110 if self.hook:
81 111 self.hook = uncan(self.hook, g)
82 112 self.hook(obj, g)
83 113 return self.obj
84 114
85 115
86 116 class Reference(CannedObject):
87 117 """object for wrapping a remote reference by name."""
88 118 def __init__(self, name):
89 119 if not isinstance(name, string_types):
90 120 raise TypeError("illegal name: %r"%name)
91 121 self.name = name
92 122 self.buffers = []
93 123
94 124 def __repr__(self):
95 125 return "<Reference: %r>"%self.name
96 126
97 127 def get_object(self, g=None):
98 128 if g is None:
99 129 g = {}
100 130
101 131 return eval(self.name, g)
102 132
103 133
104 134 class CannedFunction(CannedObject):
105 135
106 136 def __init__(self, f):
107 137 self._check_type(f)
108 138 self.code = f.__code__
109 139 if f.__defaults__:
110 140 self.defaults = [ can(fd) for fd in f.__defaults__ ]
111 141 else:
112 142 self.defaults = None
113 143 self.module = f.__module__ or '__main__'
114 144 self.__name__ = f.__name__
115 145 self.buffers = []
116 146
117 147 def _check_type(self, obj):
118 148 assert isinstance(obj, FunctionType), "Not a function type"
119 149
120 150 def get_object(self, g=None):
121 151 # try to load function back into its module:
122 152 if not self.module.startswith('__'):
123 153 __import__(self.module)
124 154 g = sys.modules[self.module].__dict__
125 155
126 156 if g is None:
127 157 g = {}
128 158 if self.defaults:
129 159 defaults = tuple(uncan(cfd, g) for cfd in self.defaults)
130 160 else:
131 161 defaults = None
132 162 newFunc = FunctionType(self.code, g, self.__name__, defaults)
133 163 return newFunc
134 164
135 165 class CannedClass(CannedObject):
136 166
137 167 def __init__(self, cls):
138 168 self._check_type(cls)
139 169 self.name = cls.__name__
140 170 self.old_style = not isinstance(cls, type)
141 171 self._canned_dict = {}
142 172 for k,v in cls.__dict__.items():
143 173 if k not in ('__weakref__', '__dict__'):
144 174 self._canned_dict[k] = can(v)
145 175 if self.old_style:
146 176 mro = []
147 177 else:
148 178 mro = cls.mro()
149 179
150 180 self.parents = [ can(c) for c in mro[1:] ]
151 181 self.buffers = []
152 182
153 183 def _check_type(self, obj):
154 184 assert isinstance(obj, class_type), "Not a class type"
155 185
156 186 def get_object(self, g=None):
157 187 parents = tuple(uncan(p, g) for p in self.parents)
158 188 return type(self.name, parents, uncan_dict(self._canned_dict, g=g))
159 189
160 190 class CannedArray(CannedObject):
161 191 def __init__(self, obj):
162 192 from numpy import ascontiguousarray
163 193 self.shape = obj.shape
164 194 self.dtype = obj.dtype.descr if obj.dtype.fields else obj.dtype.str
165 195 if sum(obj.shape) == 0:
166 196 # just pickle it
167 197 self.buffers = [pickle.dumps(obj, -1)]
168 198 else:
169 199 # ensure contiguous
170 200 obj = ascontiguousarray(obj, dtype=None)
171 201 self.buffers = [buffer(obj)]
172 202
173 203 def get_object(self, g=None):
174 204 from numpy import frombuffer
175 205 data = self.buffers[0]
176 206 if sum(self.shape) == 0:
177 207 # no shape, we just pickled it
178 208 return pickle.loads(data)
179 209 else:
180 210 return frombuffer(data, dtype=self.dtype).reshape(self.shape)
181 211
182 212
183 213 class CannedBytes(CannedObject):
184 214 wrap = bytes
185 215 def __init__(self, obj):
186 216 self.buffers = [obj]
187 217
188 218 def get_object(self, g=None):
189 219 data = self.buffers[0]
190 220 return self.wrap(data)
191 221
192 222 def CannedBuffer(CannedBytes):
193 223 wrap = buffer
194 224
195 225 #-------------------------------------------------------------------------------
196 226 # Functions
197 227 #-------------------------------------------------------------------------------
198 228
199 229 def _logger():
200 230 """get the logger for the current Application
201 231
202 232 the root logger will be used if no Application is running
203 233 """
204 234 if Application.initialized():
205 235 logger = Application.instance().log
206 236 else:
207 237 logger = logging.getLogger()
208 238 if not logger.handlers:
209 239 logging.basicConfig()
210 240
211 241 return logger
212 242
213 243 def _import_mapping(mapping, original=None):
214 244 """import any string-keys in a type mapping
215 245
216 246 """
217 247 log = _logger()
218 248 log.debug("Importing canning map")
219 249 for key,value in list(mapping.items()):
220 250 if isinstance(key, string_types):
221 251 try:
222 252 cls = import_item(key)
223 253 except Exception:
224 254 if original and key not in original:
225 255 # only message on user-added classes
226 256 log.error("canning class not importable: %r", key, exc_info=True)
227 257 mapping.pop(key)
228 258 else:
229 259 mapping[cls] = mapping.pop(key)
230 260
231 261 def istype(obj, check):
232 262 """like isinstance(obj, check), but strict
233 263
234 264 This won't catch subclasses.
235 265 """
236 266 if isinstance(check, tuple):
237 267 for cls in check:
238 268 if type(obj) is cls:
239 269 return True
240 270 return False
241 271 else:
242 272 return type(obj) is check
243 273
244 274 def can(obj):
245 275 """prepare an object for pickling"""
246 276
247 277 import_needed = False
248 278
249 279 for cls,canner in iteritems(can_map):
250 280 if isinstance(cls, string_types):
251 281 import_needed = True
252 282 break
253 283 elif istype(obj, cls):
254 284 return canner(obj)
255 285
256 286 if import_needed:
257 287 # perform can_map imports, then try again
258 288 # this will usually only happen once
259 289 _import_mapping(can_map, _original_can_map)
260 290 return can(obj)
261 291
262 292 return obj
263 293
264 294 def can_class(obj):
265 295 if isinstance(obj, class_type) and obj.__module__ == '__main__':
266 296 return CannedClass(obj)
267 297 else:
268 298 return obj
269 299
270 300 def can_dict(obj):
271 301 """can the *values* of a dict"""
272 302 if istype(obj, dict):
273 303 newobj = {}
274 304 for k, v in iteritems(obj):
275 305 newobj[k] = can(v)
276 306 return newobj
277 307 else:
278 308 return obj
279 309
280 310 sequence_types = (list, tuple, set)
281 311
282 312 def can_sequence(obj):
283 313 """can the elements of a sequence"""
284 314 if istype(obj, sequence_types):
285 315 t = type(obj)
286 316 return t([can(i) for i in obj])
287 317 else:
288 318 return obj
289 319
290 320 def uncan(obj, g=None):
291 321 """invert canning"""
292 322
293 323 import_needed = False
294 324 for cls,uncanner in iteritems(uncan_map):
295 325 if isinstance(cls, string_types):
296 326 import_needed = True
297 327 break
298 328 elif isinstance(obj, cls):
299 329 return uncanner(obj, g)
300 330
301 331 if import_needed:
302 332 # perform uncan_map imports, then try again
303 333 # this will usually only happen once
304 334 _import_mapping(uncan_map, _original_uncan_map)
305 335 return uncan(obj, g)
306 336
307 337 return obj
308 338
309 339 def uncan_dict(obj, g=None):
310 340 if istype(obj, dict):
311 341 newobj = {}
312 342 for k, v in iteritems(obj):
313 343 newobj[k] = uncan(v,g)
314 344 return newobj
315 345 else:
316 346 return obj
317 347
318 348 def uncan_sequence(obj, g=None):
319 349 if istype(obj, sequence_types):
320 350 t = type(obj)
321 351 return t([uncan(i,g) for i in obj])
322 352 else:
323 353 return obj
324 354
325 355 def _uncan_dependent_hook(dep, g=None):
326 356 dep.check_dependency()
327 357
328 358 def can_dependent(obj):
329 359 return CannedObject(obj, keys=('f', 'df'), hook=_uncan_dependent_hook)
330 360
331 361 #-------------------------------------------------------------------------------
332 362 # API dictionaries
333 363 #-------------------------------------------------------------------------------
334 364
335 365 # These dicts can be extended for custom serialization of new objects
336 366
337 367 can_map = {
338 368 'IPython.parallel.dependent' : can_dependent,
339 369 'numpy.ndarray' : CannedArray,
340 370 FunctionType : CannedFunction,
341 371 bytes : CannedBytes,
342 372 buffer : CannedBuffer,
343 373 class_type : can_class,
344 374 }
345 375
346 376 uncan_map = {
347 377 CannedObject : lambda obj, g: obj.get_object(g),
348 378 }
349 379
350 380 # for use in _import_mapping:
351 381 _original_can_map = can_map.copy()
352 382 _original_uncan_map = uncan_map.copy()
General Comments 0
You need to be logged in to leave comments. Login now