##// END OF EJS Templates
Ensure files are closed after reading.
Bradley M. Froehle -
Show More
@@ -1,364 +1,366
1 1 #!/usr/bin/env python
2 2
3 3 """ PickleShare - a small 'shelve' like datastore with concurrency support
4 4
5 5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
6 6 shelve, many processes can access the database simultaneously. Changing a
7 7 value in database is immediately visible to other processes accessing the
8 8 same database.
9 9
10 10 Concurrency is possible because the values are stored in separate files. Hence
11 11 the "database" is a directory where *all* files are governed by PickleShare.
12 12
13 13 Example usage::
14 14
15 15 from pickleshare import *
16 16 db = PickleShareDB('~/testpickleshare')
17 17 db.clear()
18 18 print "Should be empty:",db.items()
19 19 db['hello'] = 15
20 20 db['aku ankka'] = [1,2,313]
21 21 db['paths/are/ok/key'] = [1,(5,46)]
22 22 print db.keys()
23 23 del db['aku ankka']
24 24
25 25 This module is certainly not ZODB, but can be used for low-load
26 26 (non-mission-critical) situations where tiny code size trumps the
27 27 advanced features of a "real" object database.
28 28
29 29 Installation guide: easy_install pickleshare
30 30
31 31 Author: Ville Vainio <vivainio@gmail.com>
32 32 License: MIT open source license.
33 33
34 34 """
35 35
36 36 from IPython.external.path import path as Path
37 37 import os,stat,time
38 38 import collections
39 39 import cPickle as pickle
40 40 import glob
41 41
42 42 def gethashfile(key):
43 43 return ("%02x" % abs(hash(key) % 256))[-2:]
44 44
45 45 _sentinel = object()
46 46
47 47 class PickleShareDB(collections.MutableMapping):
48 48 """ The main 'connection' object for PickleShare database """
49 49 def __init__(self,root):
50 50 """ Return a db object that will manage the specied directory"""
51 51 self.root = Path(root).expanduser().abspath()
52 52 if not self.root.isdir():
53 53 self.root.makedirs()
54 54 # cache has { 'key' : (obj, orig_mod_time) }
55 55 self.cache = {}
56 56
57 57
58 58 def __getitem__(self,key):
59 59 """ db['key'] reading """
60 60 fil = self.root / key
61 61 try:
62 62 mtime = (fil.stat()[stat.ST_MTIME])
63 63 except OSError:
64 64 raise KeyError(key)
65 65
66 66 if fil in self.cache and mtime == self.cache[fil][1]:
67 67 return self.cache[fil][0]
68 68 try:
69 69 # The cached item has expired, need to read
70 obj = pickle.loads(fil.open("rb").read())
70 with fil.open("rb") as f:
71 obj = pickle.loads(f.read())
71 72 except:
72 73 raise KeyError(key)
73 74
74 75 self.cache[fil] = (obj,mtime)
75 76 return obj
76 77
77 78 def __setitem__(self,key,value):
78 79 """ db['key'] = 5 """
79 80 fil = self.root / key
80 81 parent = fil.parent
81 82 if parent and not parent.isdir():
82 83 parent.makedirs()
83 84 # We specify protocol 2, so that we can mostly go between Python 2
84 85 # and Python 3. We can upgrade to protocol 3 when Python 2 is obsolete.
85 pickled = pickle.dump(value,fil.open('wb'), protocol=2)
86 with fil.open('wb') as f:
87 pickled = pickle.dump(value, f, protocol=2)
86 88 try:
87 89 self.cache[fil] = (value,fil.mtime)
88 90 except OSError,e:
89 91 if e.errno != 2:
90 92 raise
91 93
92 94 def hset(self, hashroot, key, value):
93 95 """ hashed set """
94 96 hroot = self.root / hashroot
95 97 if not hroot.isdir():
96 98 hroot.makedirs()
97 99 hfile = hroot / gethashfile(key)
98 100 d = self.get(hfile, {})
99 101 d.update( {key : value})
100 102 self[hfile] = d
101 103
102 104
103 105
104 106 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
105 107 """ hashed get """
106 108 hroot = self.root / hashroot
107 109 hfile = hroot / gethashfile(key)
108 110
109 111 d = self.get(hfile, _sentinel )
110 112 #print "got dict",d,"from",hfile
111 113 if d is _sentinel:
112 114 if fast_only:
113 115 if default is _sentinel:
114 116 raise KeyError(key)
115 117
116 118 return default
117 119
118 120 # slow mode ok, works even after hcompress()
119 121 d = self.hdict(hashroot)
120 122
121 123 return d.get(key, default)
122 124
123 125 def hdict(self, hashroot):
124 126 """ Get all data contained in hashed category 'hashroot' as dict """
125 127 hfiles = self.keys(hashroot + "/*")
126 128 hfiles.sort()
127 129 last = len(hfiles) and hfiles[-1] or ''
128 130 if last.endswith('xx'):
129 131 # print "using xx"
130 132 hfiles = [last] + hfiles[:-1]
131 133
132 134 all = {}
133 135
134 136 for f in hfiles:
135 137 # print "using",f
136 138 try:
137 139 all.update(self[f])
138 140 except KeyError:
139 141 print "Corrupt",f,"deleted - hset is not threadsafe!"
140 142 del self[f]
141 143
142 144 self.uncache(f)
143 145
144 146 return all
145 147
146 148 def hcompress(self, hashroot):
147 149 """ Compress category 'hashroot', so hset is fast again
148 150
149 151 hget will fail if fast_only is True for compressed items (that were
150 152 hset before hcompress).
151 153
152 154 """
153 155 hfiles = self.keys(hashroot + "/*")
154 156 all = {}
155 157 for f in hfiles:
156 158 # print "using",f
157 159 all.update(self[f])
158 160 self.uncache(f)
159 161
160 162 self[hashroot + '/xx'] = all
161 163 for f in hfiles:
162 164 p = self.root / f
163 165 if p.basename() == 'xx':
164 166 continue
165 167 p.remove()
166 168
167 169
168 170
169 171 def __delitem__(self,key):
170 172 """ del db["key"] """
171 173 fil = self.root / key
172 174 self.cache.pop(fil,None)
173 175 try:
174 176 fil.remove()
175 177 except OSError:
176 178 # notfound and permission denied are ok - we
177 179 # lost, the other process wins the conflict
178 180 pass
179 181
180 182 def _normalized(self, p):
181 183 """ Make a key suitable for user's eyes """
182 184 return str(self.root.relpathto(p)).replace('\\','/')
183 185
184 186 def keys(self, globpat = None):
185 187 """ All keys in DB, or all keys matching a glob"""
186 188
187 189 if globpat is None:
188 190 files = self.root.walkfiles()
189 191 else:
190 192 files = [Path(p) for p in glob.glob(self.root/globpat)]
191 193 return [self._normalized(p) for p in files if p.isfile()]
192 194
193 195 def __iter__(self):
194 196 return iter(self.keys())
195 197
196 198 def __len__(self):
197 199 return len(self.keys())
198 200
199 201 def uncache(self,*items):
200 202 """ Removes all, or specified items from cache
201 203
202 204 Use this after reading a large amount of large objects
203 205 to free up memory, when you won't be needing the objects
204 206 for a while.
205 207
206 208 """
207 209 if not items:
208 210 self.cache = {}
209 211 for it in items:
210 212 self.cache.pop(it,None)
211 213
212 214 def waitget(self,key, maxwaittime = 60 ):
213 215 """ Wait (poll) for a key to get a value
214 216
215 217 Will wait for `maxwaittime` seconds before raising a KeyError.
216 218 The call exits normally if the `key` field in db gets a value
217 219 within the timeout period.
218 220
219 221 Use this for synchronizing different processes or for ensuring
220 222 that an unfortunately timed "db['key'] = newvalue" operation
221 223 in another process (which causes all 'get' operation to cause a
222 224 KeyError for the duration of pickling) won't screw up your program
223 225 logic.
224 226 """
225 227
226 228 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
227 229 tries = 0
228 230 waited = 0
229 231 while 1:
230 232 try:
231 233 val = self[key]
232 234 return val
233 235 except KeyError:
234 236 pass
235 237
236 238 if waited > maxwaittime:
237 239 raise KeyError(key)
238 240
239 241 time.sleep(wtimes[tries])
240 242 waited+=wtimes[tries]
241 243 if tries < len(wtimes) -1:
242 244 tries+=1
243 245
244 246 def getlink(self,folder):
245 247 """ Get a convenient link for accessing items """
246 248 return PickleShareLink(self, folder)
247 249
248 250 def __repr__(self):
249 251 return "PickleShareDB('%s')" % self.root
250 252
251 253
252 254
253 255 class PickleShareLink:
254 256 """ A shortdand for accessing nested PickleShare data conveniently.
255 257
256 258 Created through PickleShareDB.getlink(), example::
257 259
258 260 lnk = db.getlink('myobjects/test')
259 261 lnk.foo = 2
260 262 lnk.bar = lnk.foo + 5
261 263
262 264 """
263 265 def __init__(self, db, keydir ):
264 266 self.__dict__.update(locals())
265 267
266 268 def __getattr__(self,key):
267 269 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
268 270 def __setattr__(self,key,val):
269 271 self.db[self.keydir+'/' + key] = val
270 272 def __repr__(self):
271 273 db = self.__dict__['db']
272 274 keys = db.keys( self.__dict__['keydir'] +"/*")
273 275 return "<PickleShareLink '%s': %s>" % (
274 276 self.__dict__['keydir'],
275 277 ";".join([Path(k).basename() for k in keys]))
276 278
277 279
278 280 def test():
279 281 db = PickleShareDB('~/testpickleshare')
280 282 db.clear()
281 283 print "Should be empty:",db.items()
282 284 db['hello'] = 15
283 285 db['aku ankka'] = [1,2,313]
284 286 db['paths/nest/ok/keyname'] = [1,(5,46)]
285 287 db.hset('hash', 'aku', 12)
286 288 db.hset('hash', 'ankka', 313)
287 289 print "12 =",db.hget('hash','aku')
288 290 print "313 =",db.hget('hash','ankka')
289 291 print "all hashed",db.hdict('hash')
290 292 print db.keys()
291 293 print db.keys('paths/nest/ok/k*')
292 294 print dict(db) # snapsot of whole db
293 295 db.uncache() # frees memory, causes re-reads later
294 296
295 297 # shorthand for accessing deeply nested files
296 298 lnk = db.getlink('myobjects/test')
297 299 lnk.foo = 2
298 300 lnk.bar = lnk.foo + 5
299 301 print lnk.bar # 7
300 302
301 303 def stress():
302 304 db = PickleShareDB('~/fsdbtest')
303 305 import time,sys
304 306 for i in range(1000):
305 307 for j in range(1000):
306 308 if i % 15 == 0 and i < 200:
307 309 if str(j) in db:
308 310 del db[str(j)]
309 311 continue
310 312
311 313 if j%33 == 0:
312 314 time.sleep(0.02)
313 315
314 316 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
315 317 db.hset('hash',j, db.hget('hash',j,15) + 1 )
316 318
317 319 print i,
318 320 sys.stdout.flush()
319 321 if i % 10 == 0:
320 322 db.uncache()
321 323
322 324 def main():
323 325 import textwrap
324 326 usage = textwrap.dedent("""\
325 327 pickleshare - manage PickleShare databases
326 328
327 329 Usage:
328 330
329 331 pickleshare dump /path/to/db > dump.txt
330 332 pickleshare load /path/to/db < dump.txt
331 333 pickleshare test /path/to/db
332 334 """)
333 335 DB = PickleShareDB
334 336 import sys
335 337 if len(sys.argv) < 2:
336 338 print usage
337 339 return
338 340
339 341 cmd = sys.argv[1]
340 342 args = sys.argv[2:]
341 343 if cmd == 'dump':
342 344 if not args: args= ['.']
343 345 db = DB(args[0])
344 346 import pprint
345 347 pprint.pprint(db.items())
346 348 elif cmd == 'load':
347 349 cont = sys.stdin.read()
348 350 db = DB(args[0])
349 351 data = eval(cont)
350 352 db.clear()
351 353 for k,v in db.items():
352 354 db[k] = v
353 355 elif cmd == 'testwait':
354 356 db = DB(args[0])
355 357 db.clear()
356 358 print db.waitget('250')
357 359 elif cmd == 'test':
358 360 test()
359 361 stress()
360 362
361 363 if __name__== "__main__":
362 364 main()
363 365
364 366
General Comments 0
You need to be logged in to leave comments. Login now