pickleshare.py
353 lines
| 10.1 KiB
| text/x-python
|
PythonLexer
vivainio
|
r165 | #!/usr/bin/env python | |
""" PickleShare - a small 'shelve' like datastore with concurrency support | |||
Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike | |||
shelve, many processes can access the database simultaneously. Changing a | |||
value in database is immediately visible to other processes accessing the | |||
same database. | |||
Concurrency is possible because the values are stored in separate files. Hence | |||
the "database" is a directory where *all* files are governed by PickleShare. | |||
Example usage:: | |||
from pickleshare import * | |||
db = PickleShareDB('~/testpickleshare') | |||
db.clear() | |||
print "Should be empty:",db.items() | |||
db['hello'] = 15 | |||
db['aku ankka'] = [1,2,313] | |||
db['paths/are/ok/key'] = [1,(5,46)] | |||
print db.keys() | |||
del db['aku ankka'] | |||
This module is certainly not ZODB, but can be used for low-load | |||
(non-mission-critical) situations where tiny code size trumps the | |||
advanced features of a "real" object database. | |||
Installation guide: easy_install pickleshare | |||
Author: Ville Vainio <vivainio@gmail.com> | |||
License: MIT open source license. | |||
""" | |||
from path import path as Path | |||
import os,stat,time | |||
import cPickle as pickle | |||
import UserDict | |||
import warnings | |||
import glob | |||
vivainio
|
r729 | from sets import Set as set | |
vivainio
|
r720 | def gethashfile(key): | |
return ("%02x" % abs(hash(key) % 256))[-2:] | |||
vivainio
|
r731 | _sentinel = object() | |
vivainio
|
r165 | class PickleShareDB(UserDict.DictMixin): | |
""" The main 'connection' object for PickleShare database """ | |||
def __init__(self,root): | |||
""" Return a db object that will manage the specied directory""" | |||
self.root = Path(root).expanduser().abspath() | |||
if not self.root.isdir(): | |||
self.root.makedirs() | |||
# cache has { 'key' : (obj, orig_mod_time) } | |||
self.cache = {} | |||
vivainio
|
r729 | ||
vivainio
|
r165 | ||
def __getitem__(self,key): | |||
""" db['key'] reading """ | |||
fil = self.root / key | |||
try: | |||
mtime = (fil.stat()[stat.ST_MTIME]) | |||
except OSError: | |||
raise KeyError(key) | |||
if fil in self.cache and mtime == self.cache[fil][1]: | |||
return self.cache[fil][0] | |||
try: | |||
# The cached item has expired, need to read | |||
obj = pickle.load(fil.open()) | |||
except: | |||
raise KeyError(key) | |||
self.cache[fil] = (obj,mtime) | |||
return obj | |||
def __setitem__(self,key,value): | |||
""" db['key'] = 5 """ | |||
fil = self.root / key | |||
parent = fil.parent | |||
if parent and not parent.isdir(): | |||
parent.makedirs() | |||
pickled = pickle.dump(value,fil.open('w')) | |||
try: | |||
self.cache[fil] = (value,fil.mtime) | |||
except OSError,e: | |||
if e.errno != 2: | |||
raise | |||
vivainio
|
r720 | def hset(self, hashroot, key, value): | |
vivainio
|
r731 | """ hashed set """ | |
vivainio
|
r720 | hroot = self.root / hashroot | |
if not hroot.isdir(): | |||
hroot.makedirs() | |||
hfile = hroot / gethashfile(key) | |||
d = self.get(hfile, {}) | |||
d.update( {key : value}) | |||
self[hfile] = d | |||
vivainio
|
r731 | ||
def hget(self, hashroot, key, default = _sentinel, fast_only = True): | |||
""" hashed get """ | |||
vivainio
|
r720 | hroot = self.root / hashroot | |
hfile = hroot / gethashfile(key) | |||
vivainio
|
r731 | ||
d = self.get(hfile, _sentinel ) | |||
vivainio
|
r720 | #print "got dict",d,"from",hfile | |
vivainio
|
r731 | if d is _sentinel: | |
if fast_only: | |||
if default is _sentinel: | |||
raise KeyError(key) | |||
return default | |||
# slow mode ok, works even after hcompress() | |||
d = self.hdict(hashroot) | |||
vivainio
|
r720 | return d.get(key, default) | |
def hdict(self, hashroot): | |||
vivainio
|
r731 | """ Get all data contained in hashed category 'hashroot' as dict """ | |
hfiles = self.keys(hashroot + "/*") | |||
vivainio
|
r732 | hfiles.sort() | |
vivainio
|
r731 | last = len(hfiles) and hfiles[-1] or '' | |
if last.endswith('xx'): | |||
vivainio
|
r732 | # print "using xx" | |
vivainio
|
r731 | hfiles = [last] + hfiles[:-1] | |
vivainio
|
r720 | all = {} | |
vivainio
|
r731 | ||
vivainio
|
r720 | for f in hfiles: | |
# print "using",f | |||
all.update(self[f]) | |||
self.uncache(f) | |||
return all | |||
vivainio
|
r731 | def hcompress(self, hashroot): | |
""" Compress category 'hashroot', so hset is fast again | |||
hget will fail if fast_only is True for compressed items (that were | |||
hset before hcompress). | |||
""" | |||
hfiles = self.keys(hashroot + "/*") | |||
all = {} | |||
for f in hfiles: | |||
# print "using",f | |||
all.update(self[f]) | |||
self.uncache(f) | |||
self[hashroot + '/xx'] = all | |||
for f in hfiles: | |||
p = self.root / f | |||
if p.basename() == 'xx': | |||
continue | |||
p.remove() | |||
vivainio
|
r165 | def __delitem__(self,key): | |
""" del db["key"] """ | |||
fil = self.root / key | |||
self.cache.pop(fil,None) | |||
try: | |||
fil.remove() | |||
except OSError: | |||
# notfound and permission denied are ok - we | |||
# lost, the other process wins the conflict | |||
pass | |||
def _normalized(self, p): | |||
""" Make a key suitable for user's eyes """ | |||
return str(self.root.relpathto(p)).replace('\\','/') | |||
def keys(self, globpat = None): | |||
""" All keys in DB, or all keys matching a glob""" | |||
if globpat is None: | |||
files = self.root.walkfiles() | |||
else: | |||
files = [Path(p) for p in glob.glob(self.root/globpat)] | |||
return [self._normalized(p) for p in files if p.isfile()] | |||
def uncache(self,*items): | |||
""" Removes all, or specified items from cache | |||
Use this after reading a large amount of large objects | |||
to free up memory, when you won't be needing the objects | |||
for a while. | |||
""" | |||
if not items: | |||
self.cache = {} | |||
for it in items: | |||
self.cache.pop(it,None) | |||
def waitget(self,key, maxwaittime = 60 ): | |||
""" Wait (poll) for a key to get a value | |||
Will wait for `maxwaittime` seconds before raising a KeyError. | |||
The call exits normally if the `key` field in db gets a value | |||
within the timeout period. | |||
Use this for synchronizing different processes or for ensuring | |||
that an unfortunately timed "db['key'] = newvalue" operation | |||
in another process (which causes all 'get' operation to cause a | |||
KeyError for the duration of pickling) won't screw up your program | |||
logic. | |||
""" | |||
wtimes = [0.2] * 3 + [0.5] * 2 + [1] | |||
tries = 0 | |||
waited = 0 | |||
while 1: | |||
try: | |||
val = self[key] | |||
return val | |||
except KeyError: | |||
pass | |||
if waited > maxwaittime: | |||
raise KeyError(key) | |||
time.sleep(wtimes[tries]) | |||
waited+=wtimes[tries] | |||
if tries < len(wtimes) -1: | |||
tries+=1 | |||
def getlink(self,folder): | |||
""" Get a convenient link for accessing items """ | |||
return PickleShareLink(self, folder) | |||
def __repr__(self): | |||
return "PickleShareDB('%s')" % self.root | |||
class PickleShareLink: | |||
""" A shortdand for accessing nested PickleShare data conveniently. | |||
Created through PickleShareDB.getlink(), example:: | |||
lnk = db.getlink('myobjects/test') | |||
lnk.foo = 2 | |||
lnk.bar = lnk.foo + 5 | |||
""" | |||
def __init__(self, db, keydir ): | |||
self.__dict__.update(locals()) | |||
def __getattr__(self,key): | |||
return self.__dict__['db'][self.__dict__['keydir']+'/' + key] | |||
def __setattr__(self,key,val): | |||
self.db[self.keydir+'/' + key] = val | |||
def __repr__(self): | |||
db = self.__dict__['db'] | |||
keys = db.keys( self.__dict__['keydir'] +"/*") | |||
return "<PickleShareLink '%s': %s>" % ( | |||
self.__dict__['keydir'], | |||
";".join([Path(k).basename() for k in keys])) | |||
def test(): | |||
db = PickleShareDB('~/testpickleshare') | |||
db.clear() | |||
print "Should be empty:",db.items() | |||
db['hello'] = 15 | |||
db['aku ankka'] = [1,2,313] | |||
db['paths/nest/ok/keyname'] = [1,(5,46)] | |||
vivainio
|
r720 | db.hset('hash', 'aku', 12) | |
db.hset('hash', 'ankka', 313) | |||
print "12 =",db.hget('hash','aku') | |||
print "313 =",db.hget('hash','ankka') | |||
print "all hashed",db.hdict('hash') | |||
vivainio
|
r165 | print db.keys() | |
print db.keys('paths/nest/ok/k*') | |||
print dict(db) # snapsot of whole db | |||
db.uncache() # frees memory, causes re-reads later | |||
# shorthand for accessing deeply nested files | |||
lnk = db.getlink('myobjects/test') | |||
lnk.foo = 2 | |||
lnk.bar = lnk.foo + 5 | |||
print lnk.bar # 7 | |||
def stress(): | |||
db = PickleShareDB('~/fsdbtest') | |||
import time,sys | |||
for i in range(1000): | |||
vivainio
|
r720 | for j in range(1000): | |
vivainio
|
r165 | if i % 15 == 0 and i < 200: | |
if str(j) in db: | |||
del db[str(j)] | |||
continue | |||
if j%33 == 0: | |||
time.sleep(0.02) | |||
db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())] | |||
vivainio
|
r720 | db.hset('hash',j, db.hget('hash',j,15) + 1 ) | |
vivainio
|
r165 | print i, | |
sys.stdout.flush() | |||
if i % 10 == 0: | |||
db.uncache() | |||
def main(): | |||
import textwrap | |||
usage = textwrap.dedent("""\ | |||
pickleshare - manage PickleShare databases | |||
Usage: | |||
pickleshare dump /path/to/db > dump.txt | |||
pickleshare load /path/to/db < dump.txt | |||
pickleshare test /path/to/db | |||
""") | |||
DB = PickleShareDB | |||
import sys | |||
if len(sys.argv) < 2: | |||
print usage | |||
return | |||
cmd = sys.argv[1] | |||
args = sys.argv[2:] | |||
if cmd == 'dump': | |||
if not args: args= ['.'] | |||
db = DB(args[0]) | |||
import pprint | |||
pprint.pprint(db.items()) | |||
elif cmd == 'load': | |||
cont = sys.stdin.read() | |||
db = DB(args[0]) | |||
data = eval(cont) | |||
db.clear() | |||
for k,v in db.items(): | |||
db[k] = v | |||
elif cmd == 'testwait': | |||
db = DB(args[0]) | |||
db.clear() | |||
print db.waitget('250') | |||
elif cmd == 'test': | |||
test() | |||
stress() | |||
if __name__== "__main__": | |||
main() | |||