##// END OF EJS Templates
rm extra print
vivainio -
Show More
@@ -1,353 +1,354 b''
1 1 #!/usr/bin/env python
2 2
3 3 """ PickleShare - a small 'shelve' like datastore with concurrency support
4 4
5 5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
6 6 shelve, many processes can access the database simultaneously. Changing a
7 7 value in database is immediately visible to other processes accessing the
8 8 same database.
9 9
10 10 Concurrency is possible because the values are stored in separate files. Hence
11 11 the "database" is a directory where *all* files are governed by PickleShare.
12 12
13 13 Example usage::
14 14
15 15 from pickleshare import *
16 16 db = PickleShareDB('~/testpickleshare')
17 17 db.clear()
18 18 print "Should be empty:",db.items()
19 19 db['hello'] = 15
20 20 db['aku ankka'] = [1,2,313]
21 21 db['paths/are/ok/key'] = [1,(5,46)]
22 22 print db.keys()
23 23 del db['aku ankka']
24 24
25 25 This module is certainly not ZODB, but can be used for low-load
26 26 (non-mission-critical) situations where tiny code size trumps the
27 27 advanced features of a "real" object database.
28 28
29 29 Installation guide: easy_install pickleshare
30 30
31 31 Author: Ville Vainio <vivainio@gmail.com>
32 32 License: MIT open source license.
33 33
34 34 """
35 35
36 36 from path import path as Path
37 37 import os,stat,time
38 38 import cPickle as pickle
39 39 import UserDict
40 40 import warnings
41 41 import glob
42 42
43 43 from sets import Set as set
44 44
45 45 def gethashfile(key):
46 46 return ("%02x" % abs(hash(key) % 256))[-2:]
47 47
48 48 _sentinel = object()
49 49
50 50 class PickleShareDB(UserDict.DictMixin):
51 51 """ The main 'connection' object for PickleShare database """
52 52 def __init__(self,root):
53 53 """ Return a db object that will manage the specied directory"""
54 54 self.root = Path(root).expanduser().abspath()
55 55 if not self.root.isdir():
56 56 self.root.makedirs()
57 57 # cache has { 'key' : (obj, orig_mod_time) }
58 58 self.cache = {}
59 59
60 60
61 61 def __getitem__(self,key):
62 62 """ db['key'] reading """
63 63 fil = self.root / key
64 64 try:
65 65 mtime = (fil.stat()[stat.ST_MTIME])
66 66 except OSError:
67 67 raise KeyError(key)
68 68
69 69 if fil in self.cache and mtime == self.cache[fil][1]:
70 70 return self.cache[fil][0]
71 71 try:
72 72 # The cached item has expired, need to read
73 73 obj = pickle.load(fil.open())
74 74 except:
75 75 raise KeyError(key)
76 76
77 77 self.cache[fil] = (obj,mtime)
78 78 return obj
79 79
80 80 def __setitem__(self,key,value):
81 81 """ db['key'] = 5 """
82 82 fil = self.root / key
83 83 parent = fil.parent
84 84 if parent and not parent.isdir():
85 85 parent.makedirs()
86 86 pickled = pickle.dump(value,fil.open('w'))
87 87 try:
88 88 self.cache[fil] = (value,fil.mtime)
89 89 except OSError,e:
90 90 if e.errno != 2:
91 91 raise
92 92
93 93 def hset(self, hashroot, key, value):
94 94 """ hashed set """
95 95 hroot = self.root / hashroot
96 96 if not hroot.isdir():
97 97 hroot.makedirs()
98 98 hfile = hroot / gethashfile(key)
99 99 d = self.get(hfile, {})
100 100 d.update( {key : value})
101 101 self[hfile] = d
102 102
103 103
104 104
105 105 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
106 106 """ hashed get """
107 107 hroot = self.root / hashroot
108 108 hfile = hroot / gethashfile(key)
109 109
110 110 d = self.get(hfile, _sentinel )
111 111 #print "got dict",d,"from",hfile
112 112 if d is _sentinel:
113 113 if fast_only:
114 114 if default is _sentinel:
115 115 raise KeyError(key)
116 116
117 117 return default
118 118
119 119 # slow mode ok, works even after hcompress()
120 120 d = self.hdict(hashroot)
121 121
122 122 return d.get(key, default)
123 123
124 124 def hdict(self, hashroot):
125 125 """ Get all data contained in hashed category 'hashroot' as dict """
126 126 hfiles = self.keys(hashroot + "/*")
127 hfiles.sort()
127 128 last = len(hfiles) and hfiles[-1] or ''
128 129 if last.endswith('xx'):
129 print "using xx"
130 # print "using xx"
130 131 hfiles = [last] + hfiles[:-1]
131 132
132 133 all = {}
133 134
134 135 for f in hfiles:
135 136 # print "using",f
136 137 all.update(self[f])
137 138 self.uncache(f)
138 139
139 140 return all
140 141
141 142 def hcompress(self, hashroot):
142 143 """ Compress category 'hashroot', so hset is fast again
143 144
144 145 hget will fail if fast_only is True for compressed items (that were
145 146 hset before hcompress).
146 147
147 148 """
148 149 hfiles = self.keys(hashroot + "/*")
149 150 all = {}
150 151 for f in hfiles:
151 152 # print "using",f
152 153 all.update(self[f])
153 154 self.uncache(f)
154 155
155 156 self[hashroot + '/xx'] = all
156 157 for f in hfiles:
157 158 p = self.root / f
158 159 if p.basename() == 'xx':
159 160 continue
160 161 p.remove()
161 162
162 163
163 164
164 165 def __delitem__(self,key):
165 166 """ del db["key"] """
166 167 fil = self.root / key
167 168 self.cache.pop(fil,None)
168 169 try:
169 170 fil.remove()
170 171 except OSError:
171 172 # notfound and permission denied are ok - we
172 173 # lost, the other process wins the conflict
173 174 pass
174 175
175 176 def _normalized(self, p):
176 177 """ Make a key suitable for user's eyes """
177 178 return str(self.root.relpathto(p)).replace('\\','/')
178 179
179 180 def keys(self, globpat = None):
180 181 """ All keys in DB, or all keys matching a glob"""
181 182
182 183 if globpat is None:
183 184 files = self.root.walkfiles()
184 185 else:
185 186 files = [Path(p) for p in glob.glob(self.root/globpat)]
186 187 return [self._normalized(p) for p in files if p.isfile()]
187 188
188 189 def uncache(self,*items):
189 190 """ Removes all, or specified items from cache
190 191
191 192 Use this after reading a large amount of large objects
192 193 to free up memory, when you won't be needing the objects
193 194 for a while.
194 195
195 196 """
196 197 if not items:
197 198 self.cache = {}
198 199 for it in items:
199 200 self.cache.pop(it,None)
200 201
201 202 def waitget(self,key, maxwaittime = 60 ):
202 203 """ Wait (poll) for a key to get a value
203 204
204 205 Will wait for `maxwaittime` seconds before raising a KeyError.
205 206 The call exits normally if the `key` field in db gets a value
206 207 within the timeout period.
207 208
208 209 Use this for synchronizing different processes or for ensuring
209 210 that an unfortunately timed "db['key'] = newvalue" operation
210 211 in another process (which causes all 'get' operation to cause a
211 212 KeyError for the duration of pickling) won't screw up your program
212 213 logic.
213 214 """
214 215
215 216 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
216 217 tries = 0
217 218 waited = 0
218 219 while 1:
219 220 try:
220 221 val = self[key]
221 222 return val
222 223 except KeyError:
223 224 pass
224 225
225 226 if waited > maxwaittime:
226 227 raise KeyError(key)
227 228
228 229 time.sleep(wtimes[tries])
229 230 waited+=wtimes[tries]
230 231 if tries < len(wtimes) -1:
231 232 tries+=1
232 233
233 234 def getlink(self,folder):
234 235 """ Get a convenient link for accessing items """
235 236 return PickleShareLink(self, folder)
236 237
237 238 def __repr__(self):
238 239 return "PickleShareDB('%s')" % self.root
239 240
240 241
241 242
242 243 class PickleShareLink:
243 244 """ A shortdand for accessing nested PickleShare data conveniently.
244 245
245 246 Created through PickleShareDB.getlink(), example::
246 247
247 248 lnk = db.getlink('myobjects/test')
248 249 lnk.foo = 2
249 250 lnk.bar = lnk.foo + 5
250 251
251 252 """
252 253 def __init__(self, db, keydir ):
253 254 self.__dict__.update(locals())
254 255
255 256 def __getattr__(self,key):
256 257 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
257 258 def __setattr__(self,key,val):
258 259 self.db[self.keydir+'/' + key] = val
259 260 def __repr__(self):
260 261 db = self.__dict__['db']
261 262 keys = db.keys( self.__dict__['keydir'] +"/*")
262 263 return "<PickleShareLink '%s': %s>" % (
263 264 self.__dict__['keydir'],
264 265 ";".join([Path(k).basename() for k in keys]))
265 266
266 267
267 268 def test():
268 269 db = PickleShareDB('~/testpickleshare')
269 270 db.clear()
270 271 print "Should be empty:",db.items()
271 272 db['hello'] = 15
272 273 db['aku ankka'] = [1,2,313]
273 274 db['paths/nest/ok/keyname'] = [1,(5,46)]
274 275 db.hset('hash', 'aku', 12)
275 276 db.hset('hash', 'ankka', 313)
276 277 print "12 =",db.hget('hash','aku')
277 278 print "313 =",db.hget('hash','ankka')
278 279 print "all hashed",db.hdict('hash')
279 280 print db.keys()
280 281 print db.keys('paths/nest/ok/k*')
281 282 print dict(db) # snapsot of whole db
282 283 db.uncache() # frees memory, causes re-reads later
283 284
284 285 # shorthand for accessing deeply nested files
285 286 lnk = db.getlink('myobjects/test')
286 287 lnk.foo = 2
287 288 lnk.bar = lnk.foo + 5
288 289 print lnk.bar # 7
289 290
290 291 def stress():
291 292 db = PickleShareDB('~/fsdbtest')
292 293 import time,sys
293 294 for i in range(1000):
294 295 for j in range(1000):
295 296 if i % 15 == 0 and i < 200:
296 297 if str(j) in db:
297 298 del db[str(j)]
298 299 continue
299 300
300 301 if j%33 == 0:
301 302 time.sleep(0.02)
302 303
303 304 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
304 305 db.hset('hash',j, db.hget('hash',j,15) + 1 )
305 306
306 307 print i,
307 308 sys.stdout.flush()
308 309 if i % 10 == 0:
309 310 db.uncache()
310 311
311 312 def main():
312 313 import textwrap
313 314 usage = textwrap.dedent("""\
314 315 pickleshare - manage PickleShare databases
315 316
316 317 Usage:
317 318
318 319 pickleshare dump /path/to/db > dump.txt
319 320 pickleshare load /path/to/db < dump.txt
320 321 pickleshare test /path/to/db
321 322 """)
322 323 DB = PickleShareDB
323 324 import sys
324 325 if len(sys.argv) < 2:
325 326 print usage
326 327 return
327 328
328 329 cmd = sys.argv[1]
329 330 args = sys.argv[2:]
330 331 if cmd == 'dump':
331 332 if not args: args= ['.']
332 333 db = DB(args[0])
333 334 import pprint
334 335 pprint.pprint(db.items())
335 336 elif cmd == 'load':
336 337 cont = sys.stdin.read()
337 338 db = DB(args[0])
338 339 data = eval(cont)
339 340 db.clear()
340 341 for k,v in db.items():
341 342 db[k] = v
342 343 elif cmd == 'testwait':
343 344 db = DB(args[0])
344 345 db.clear()
345 346 print db.waitget('250')
346 347 elif cmd == 'test':
347 348 test()
348 349 stress()
349 350
350 351 if __name__== "__main__":
351 352 main()
352 353
353 354 No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now