##// END OF EJS Templates
Specify protocol 2 for pickleshare, so objects stored by Python 3 can be used in Python 2.
Thomas Kluyver -
Show More
@@ -1,362 +1,364 b''
1 1 #!/usr/bin/env python
2 2
3 3 """ PickleShare - a small 'shelve' like datastore with concurrency support
4 4
5 5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
6 6 shelve, many processes can access the database simultaneously. Changing a
7 7 value in database is immediately visible to other processes accessing the
8 8 same database.
9 9
10 10 Concurrency is possible because the values are stored in separate files. Hence
11 11 the "database" is a directory where *all* files are governed by PickleShare.
12 12
13 13 Example usage::
14 14
15 15 from pickleshare import *
16 16 db = PickleShareDB('~/testpickleshare')
17 17 db.clear()
18 18 print "Should be empty:",db.items()
19 19 db['hello'] = 15
20 20 db['aku ankka'] = [1,2,313]
21 21 db['paths/are/ok/key'] = [1,(5,46)]
22 22 print db.keys()
23 23 del db['aku ankka']
24 24
25 25 This module is certainly not ZODB, but can be used for low-load
26 26 (non-mission-critical) situations where tiny code size trumps the
27 27 advanced features of a "real" object database.
28 28
29 29 Installation guide: easy_install pickleshare
30 30
31 31 Author: Ville Vainio <vivainio@gmail.com>
32 32 License: MIT open source license.
33 33
34 34 """
35 35
36 36 from IPython.external.path import path as Path
37 37 import os,stat,time
38 38 import collections
39 39 import cPickle as pickle
40 40 import glob
41 41
42 42 def gethashfile(key):
43 43 return ("%02x" % abs(hash(key) % 256))[-2:]
44 44
45 45 _sentinel = object()
46 46
47 47 class PickleShareDB(collections.MutableMapping):
48 48 """ The main 'connection' object for PickleShare database """
49 49 def __init__(self,root):
50 50 """ Return a db object that will manage the specied directory"""
51 51 self.root = Path(root).expanduser().abspath()
52 52 if not self.root.isdir():
53 53 self.root.makedirs()
54 54 # cache has { 'key' : (obj, orig_mod_time) }
55 55 self.cache = {}
56 56
57 57
58 58 def __getitem__(self,key):
59 59 """ db['key'] reading """
60 60 fil = self.root / key
61 61 try:
62 62 mtime = (fil.stat()[stat.ST_MTIME])
63 63 except OSError:
64 64 raise KeyError(key)
65 65
66 66 if fil in self.cache and mtime == self.cache[fil][1]:
67 67 return self.cache[fil][0]
68 68 try:
69 69 # The cached item has expired, need to read
70 70 obj = pickle.loads(fil.open("rbU").read())
71 71 except:
72 72 raise KeyError(key)
73 73
74 74 self.cache[fil] = (obj,mtime)
75 75 return obj
76 76
77 77 def __setitem__(self,key,value):
78 78 """ db['key'] = 5 """
79 79 fil = self.root / key
80 80 parent = fil.parent
81 81 if parent and not parent.isdir():
82 82 parent.makedirs()
83 pickled = pickle.dump(value,fil.open('wb'))
83 # We specify protocol 2, so that we can mostly go between Python 2
84 # and Python 3. We can upgrade to protocol 3 when Python 2 is obsolete.
85 pickled = pickle.dump(value,fil.open('wb'), protocol=2)
84 86 try:
85 87 self.cache[fil] = (value,fil.mtime)
86 88 except OSError,e:
87 89 if e.errno != 2:
88 90 raise
89 91
90 92 def hset(self, hashroot, key, value):
91 93 """ hashed set """
92 94 hroot = self.root / hashroot
93 95 if not hroot.isdir():
94 96 hroot.makedirs()
95 97 hfile = hroot / gethashfile(key)
96 98 d = self.get(hfile, {})
97 99 d.update( {key : value})
98 100 self[hfile] = d
99 101
100 102
101 103
102 104 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
103 105 """ hashed get """
104 106 hroot = self.root / hashroot
105 107 hfile = hroot / gethashfile(key)
106 108
107 109 d = self.get(hfile, _sentinel )
108 110 #print "got dict",d,"from",hfile
109 111 if d is _sentinel:
110 112 if fast_only:
111 113 if default is _sentinel:
112 114 raise KeyError(key)
113 115
114 116 return default
115 117
116 118 # slow mode ok, works even after hcompress()
117 119 d = self.hdict(hashroot)
118 120
119 121 return d.get(key, default)
120 122
121 123 def hdict(self, hashroot):
122 124 """ Get all data contained in hashed category 'hashroot' as dict """
123 125 hfiles = self.keys(hashroot + "/*")
124 126 hfiles.sort()
125 127 last = len(hfiles) and hfiles[-1] or ''
126 128 if last.endswith('xx'):
127 129 # print "using xx"
128 130 hfiles = [last] + hfiles[:-1]
129 131
130 132 all = {}
131 133
132 134 for f in hfiles:
133 135 # print "using",f
134 136 try:
135 137 all.update(self[f])
136 138 except KeyError:
137 139 print "Corrupt",f,"deleted - hset is not threadsafe!"
138 140 del self[f]
139 141
140 142 self.uncache(f)
141 143
142 144 return all
143 145
144 146 def hcompress(self, hashroot):
145 147 """ Compress category 'hashroot', so hset is fast again
146 148
147 149 hget will fail if fast_only is True for compressed items (that were
148 150 hset before hcompress).
149 151
150 152 """
151 153 hfiles = self.keys(hashroot + "/*")
152 154 all = {}
153 155 for f in hfiles:
154 156 # print "using",f
155 157 all.update(self[f])
156 158 self.uncache(f)
157 159
158 160 self[hashroot + '/xx'] = all
159 161 for f in hfiles:
160 162 p = self.root / f
161 163 if p.basename() == 'xx':
162 164 continue
163 165 p.remove()
164 166
165 167
166 168
167 169 def __delitem__(self,key):
168 170 """ del db["key"] """
169 171 fil = self.root / key
170 172 self.cache.pop(fil,None)
171 173 try:
172 174 fil.remove()
173 175 except OSError:
174 176 # notfound and permission denied are ok - we
175 177 # lost, the other process wins the conflict
176 178 pass
177 179
178 180 def _normalized(self, p):
179 181 """ Make a key suitable for user's eyes """
180 182 return str(self.root.relpathto(p)).replace('\\','/')
181 183
182 184 def keys(self, globpat = None):
183 185 """ All keys in DB, or all keys matching a glob"""
184 186
185 187 if globpat is None:
186 188 files = self.root.walkfiles()
187 189 else:
188 190 files = [Path(p) for p in glob.glob(self.root/globpat)]
189 191 return [self._normalized(p) for p in files if p.isfile()]
190 192
191 193 def __iter__(self):
192 194 return iter(keys)
193 195
194 196 def __len__(self):
195 197 return len(keys)
196 198
197 199 def uncache(self,*items):
198 200 """ Removes all, or specified items from cache
199 201
200 202 Use this after reading a large amount of large objects
201 203 to free up memory, when you won't be needing the objects
202 204 for a while.
203 205
204 206 """
205 207 if not items:
206 208 self.cache = {}
207 209 for it in items:
208 210 self.cache.pop(it,None)
209 211
210 212 def waitget(self,key, maxwaittime = 60 ):
211 213 """ Wait (poll) for a key to get a value
212 214
213 215 Will wait for `maxwaittime` seconds before raising a KeyError.
214 216 The call exits normally if the `key` field in db gets a value
215 217 within the timeout period.
216 218
217 219 Use this for synchronizing different processes or for ensuring
218 220 that an unfortunately timed "db['key'] = newvalue" operation
219 221 in another process (which causes all 'get' operation to cause a
220 222 KeyError for the duration of pickling) won't screw up your program
221 223 logic.
222 224 """
223 225
224 226 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
225 227 tries = 0
226 228 waited = 0
227 229 while 1:
228 230 try:
229 231 val = self[key]
230 232 return val
231 233 except KeyError:
232 234 pass
233 235
234 236 if waited > maxwaittime:
235 237 raise KeyError(key)
236 238
237 239 time.sleep(wtimes[tries])
238 240 waited+=wtimes[tries]
239 241 if tries < len(wtimes) -1:
240 242 tries+=1
241 243
242 244 def getlink(self,folder):
243 245 """ Get a convenient link for accessing items """
244 246 return PickleShareLink(self, folder)
245 247
246 248 def __repr__(self):
247 249 return "PickleShareDB('%s')" % self.root
248 250
249 251
250 252
251 253 class PickleShareLink:
252 254 """ A shortdand for accessing nested PickleShare data conveniently.
253 255
254 256 Created through PickleShareDB.getlink(), example::
255 257
256 258 lnk = db.getlink('myobjects/test')
257 259 lnk.foo = 2
258 260 lnk.bar = lnk.foo + 5
259 261
260 262 """
261 263 def __init__(self, db, keydir ):
262 264 self.__dict__.update(locals())
263 265
264 266 def __getattr__(self,key):
265 267 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
266 268 def __setattr__(self,key,val):
267 269 self.db[self.keydir+'/' + key] = val
268 270 def __repr__(self):
269 271 db = self.__dict__['db']
270 272 keys = db.keys( self.__dict__['keydir'] +"/*")
271 273 return "<PickleShareLink '%s': %s>" % (
272 274 self.__dict__['keydir'],
273 275 ";".join([Path(k).basename() for k in keys]))
274 276
275 277
276 278 def test():
277 279 db = PickleShareDB('~/testpickleshare')
278 280 db.clear()
279 281 print "Should be empty:",db.items()
280 282 db['hello'] = 15
281 283 db['aku ankka'] = [1,2,313]
282 284 db['paths/nest/ok/keyname'] = [1,(5,46)]
283 285 db.hset('hash', 'aku', 12)
284 286 db.hset('hash', 'ankka', 313)
285 287 print "12 =",db.hget('hash','aku')
286 288 print "313 =",db.hget('hash','ankka')
287 289 print "all hashed",db.hdict('hash')
288 290 print db.keys()
289 291 print db.keys('paths/nest/ok/k*')
290 292 print dict(db) # snapsot of whole db
291 293 db.uncache() # frees memory, causes re-reads later
292 294
293 295 # shorthand for accessing deeply nested files
294 296 lnk = db.getlink('myobjects/test')
295 297 lnk.foo = 2
296 298 lnk.bar = lnk.foo + 5
297 299 print lnk.bar # 7
298 300
299 301 def stress():
300 302 db = PickleShareDB('~/fsdbtest')
301 303 import time,sys
302 304 for i in range(1000):
303 305 for j in range(1000):
304 306 if i % 15 == 0 and i < 200:
305 307 if str(j) in db:
306 308 del db[str(j)]
307 309 continue
308 310
309 311 if j%33 == 0:
310 312 time.sleep(0.02)
311 313
312 314 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
313 315 db.hset('hash',j, db.hget('hash',j,15) + 1 )
314 316
315 317 print i,
316 318 sys.stdout.flush()
317 319 if i % 10 == 0:
318 320 db.uncache()
319 321
320 322 def main():
321 323 import textwrap
322 324 usage = textwrap.dedent("""\
323 325 pickleshare - manage PickleShare databases
324 326
325 327 Usage:
326 328
327 329 pickleshare dump /path/to/db > dump.txt
328 330 pickleshare load /path/to/db < dump.txt
329 331 pickleshare test /path/to/db
330 332 """)
331 333 DB = PickleShareDB
332 334 import sys
333 335 if len(sys.argv) < 2:
334 336 print usage
335 337 return
336 338
337 339 cmd = sys.argv[1]
338 340 args = sys.argv[2:]
339 341 if cmd == 'dump':
340 342 if not args: args= ['.']
341 343 db = DB(args[0])
342 344 import pprint
343 345 pprint.pprint(db.items())
344 346 elif cmd == 'load':
345 347 cont = sys.stdin.read()
346 348 db = DB(args[0])
347 349 data = eval(cont)
348 350 db.clear()
349 351 for k,v in db.items():
350 352 db[k] = v
351 353 elif cmd == 'testwait':
352 354 db = DB(args[0])
353 355 db.clear()
354 356 print db.waitget('250')
355 357 elif cmd == 'test':
356 358 test()
357 359 stress()
358 360
359 361 if __name__== "__main__":
360 362 main()
361 363
362 364
General Comments 0
You need to be logged in to leave comments. Login now