pickleshare.py
362 lines
| 9.9 KiB
| text/x-python
|
PythonLexer
vivainio
|
r165 | #!/usr/bin/env python | ||
""" PickleShare - a small 'shelve' like datastore with concurrency support | ||||
Bernardo B. Marques
|
r4872 | Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike | ||
shelve, many processes can access the database simultaneously. Changing a | ||||
value in database is immediately visible to other processes accessing the | ||||
vivainio
|
r165 | same database. | ||
Concurrency is possible because the values are stored in separate files. Hence | ||||
the "database" is a directory where *all* files are governed by PickleShare. | ||||
Example usage:: | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | from pickleshare import * | ||
db = PickleShareDB('~/testpickleshare') | ||||
db.clear() | ||||
print "Should be empty:",db.items() | ||||
db['hello'] = 15 | ||||
db['aku ankka'] = [1,2,313] | ||||
db['paths/are/ok/key'] = [1,(5,46)] | ||||
print db.keys() | ||||
del db['aku ankka'] | ||||
Bernardo B. Marques
|
r4872 | This module is certainly not ZODB, but can be used for low-load | ||
(non-mission-critical) situations where tiny code size trumps the | ||||
vivainio
|
r165 | advanced features of a "real" object database. | ||
Installation guide: easy_install pickleshare | ||||
Author: Ville Vainio <vivainio@gmail.com> | ||||
License: MIT open source license. | ||||
""" | ||||
vivainio
|
r964 | from IPython.external.path import path as Path | ||
vivainio
|
r165 | import os,stat,time | ||
Thomas Kluyver
|
r4754 | import collections | ||
vivainio
|
r165 | import cPickle as pickle | ||
import glob | ||||
vivainio
|
r720 | def gethashfile(key): | ||
return ("%02x" % abs(hash(key) % 256))[-2:] | ||||
vivainio
|
r731 | _sentinel = object() | ||
Thomas Kluyver
|
r4754 | class PickleShareDB(collections.MutableMapping): | ||
vivainio
|
r165 | """ The main 'connection' object for PickleShare database """ | ||
def __init__(self,root): | ||||
""" Return a db object that will manage the specied directory""" | ||||
self.root = Path(root).expanduser().abspath() | ||||
if not self.root.isdir(): | ||||
self.root.makedirs() | ||||
# cache has { 'key' : (obj, orig_mod_time) } | ||||
self.cache = {} | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | |||
def __getitem__(self,key): | ||||
""" db['key'] reading """ | ||||
fil = self.root / key | ||||
try: | ||||
mtime = (fil.stat()[stat.ST_MTIME]) | ||||
except OSError: | ||||
raise KeyError(key) | ||||
if fil in self.cache and mtime == self.cache[fil][1]: | ||||
return self.cache[fil][0] | ||||
try: | ||||
# The cached item has expired, need to read | ||||
Thomas Kluyver
|
r4896 | obj = pickle.load(fil.open("rb")) | ||
vivainio
|
r165 | except: | ||
raise KeyError(key) | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | self.cache[fil] = (obj,mtime) | ||
return obj | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | def __setitem__(self,key,value): | ||
""" db['key'] = 5 """ | ||||
fil = self.root / key | ||||
parent = fil.parent | ||||
if parent and not parent.isdir(): | ||||
parent.makedirs() | ||||
Thomas Kluyver
|
r4896 | pickled = pickle.dump(value,fil.open('wb')) | ||
vivainio
|
r165 | try: | ||
self.cache[fil] = (value,fil.mtime) | ||||
except OSError,e: | ||||
if e.errno != 2: | ||||
raise | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r720 | def hset(self, hashroot, key, value): | ||
vivainio
|
r731 | """ hashed set """ | ||
vivainio
|
r720 | hroot = self.root / hashroot | ||
if not hroot.isdir(): | ||||
hroot.makedirs() | ||||
hfile = hroot / gethashfile(key) | ||||
d = self.get(hfile, {}) | ||||
d.update( {key : value}) | ||||
Bernardo B. Marques
|
r4872 | self[hfile] = d | ||
vivainio
|
r720 | |||
vivainio
|
r731 | def hget(self, hashroot, key, default = _sentinel, fast_only = True): | ||
""" hashed get """ | ||||
vivainio
|
r720 | hroot = self.root / hashroot | ||
hfile = hroot / gethashfile(key) | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r731 | d = self.get(hfile, _sentinel ) | ||
vivainio
|
r720 | #print "got dict",d,"from",hfile | ||
vivainio
|
r731 | if d is _sentinel: | ||
if fast_only: | ||||
if default is _sentinel: | ||||
raise KeyError(key) | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r731 | return default | ||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r731 | # slow mode ok, works even after hcompress() | ||
d = self.hdict(hashroot) | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r720 | return d.get(key, default) | ||
def hdict(self, hashroot): | ||||
vivainio
|
r731 | """ Get all data contained in hashed category 'hashroot' as dict """ | ||
hfiles = self.keys(hashroot + "/*") | ||||
vivainio
|
r732 | hfiles.sort() | ||
vivainio
|
r731 | last = len(hfiles) and hfiles[-1] or '' | ||
if last.endswith('xx'): | ||||
vivainio
|
r732 | # print "using xx" | ||
vivainio
|
r731 | hfiles = [last] + hfiles[:-1] | ||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r720 | all = {} | ||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r720 | for f in hfiles: | ||
# print "using",f | ||||
vivainio
|
r813 | try: | ||
all.update(self[f]) | ||||
except KeyError: | ||||
print "Corrupt",f,"deleted - hset is not threadsafe!" | ||||
del self[f] | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r720 | self.uncache(f) | ||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r720 | return all | ||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r731 | def hcompress(self, hashroot): | ||
""" Compress category 'hashroot', so hset is fast again | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r731 | hget will fail if fast_only is True for compressed items (that were | ||
hset before hcompress). | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r731 | """ | ||
hfiles = self.keys(hashroot + "/*") | ||||
all = {} | ||||
for f in hfiles: | ||||
# print "using",f | ||||
all.update(self[f]) | ||||
self.uncache(f) | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r731 | self[hashroot + '/xx'] = all | ||
for f in hfiles: | ||||
p = self.root / f | ||||
if p.basename() == 'xx': | ||||
continue | ||||
p.remove() | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | def __delitem__(self,key): | ||
""" del db["key"] """ | ||||
fil = self.root / key | ||||
self.cache.pop(fil,None) | ||||
try: | ||||
fil.remove() | ||||
except OSError: | ||||
# notfound and permission denied are ok - we | ||||
# lost, the other process wins the conflict | ||||
pass | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | def _normalized(self, p): | ||
""" Make a key suitable for user's eyes """ | ||||
return str(self.root.relpathto(p)).replace('\\','/') | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | def keys(self, globpat = None): | ||
""" All keys in DB, or all keys matching a glob""" | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | if globpat is None: | ||
files = self.root.walkfiles() | ||||
else: | ||||
files = [Path(p) for p in glob.glob(self.root/globpat)] | ||||
return [self._normalized(p) for p in files if p.isfile()] | ||||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r4754 | def __iter__(self): | ||
return iter(keys) | ||||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r4754 | def __len__(self): | ||
return len(keys) | ||||
vivainio
|
r165 | |||
def uncache(self,*items): | ||||
""" Removes all, or specified items from cache | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | Use this after reading a large amount of large objects | ||
to free up memory, when you won't be needing the objects | ||||
for a while. | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | """ | ||
if not items: | ||||
self.cache = {} | ||||
for it in items: | ||||
self.cache.pop(it,None) | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | def waitget(self,key, maxwaittime = 60 ): | ||
""" Wait (poll) for a key to get a value | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | Will wait for `maxwaittime` seconds before raising a KeyError. | ||
The call exits normally if the `key` field in db gets a value | ||||
within the timeout period. | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | Use this for synchronizing different processes or for ensuring | ||
Bernardo B. Marques
|
r4872 | that an unfortunately timed "db['key'] = newvalue" operation | ||
in another process (which causes all 'get' operation to cause a | ||||
KeyError for the duration of pickling) won't screw up your program | ||||
logic. | ||||
vivainio
|
r165 | """ | ||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | wtimes = [0.2] * 3 + [0.5] * 2 + [1] | ||
tries = 0 | ||||
waited = 0 | ||||
while 1: | ||||
try: | ||||
val = self[key] | ||||
return val | ||||
except KeyError: | ||||
pass | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | if waited > maxwaittime: | ||
raise KeyError(key) | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | time.sleep(wtimes[tries]) | ||
waited+=wtimes[tries] | ||||
if tries < len(wtimes) -1: | ||||
tries+=1 | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | def getlink(self,folder): | ||
""" Get a convenient link for accessing items """ | ||||
return PickleShareLink(self, folder) | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | def __repr__(self): | ||
return "PickleShareDB('%s')" % self.root | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | class PickleShareLink: | ||
""" A shortdand for accessing nested PickleShare data conveniently. | ||||
Created through PickleShareDB.getlink(), example:: | ||||
lnk = db.getlink('myobjects/test') | ||||
lnk.foo = 2 | ||||
lnk.bar = lnk.foo + 5 | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | """ | ||
Bernardo B. Marques
|
r4872 | def __init__(self, db, keydir ): | ||
vivainio
|
r165 | self.__dict__.update(locals()) | ||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | def __getattr__(self,key): | ||
return self.__dict__['db'][self.__dict__['keydir']+'/' + key] | ||||
def __setattr__(self,key,val): | ||||
self.db[self.keydir+'/' + key] = val | ||||
def __repr__(self): | ||||
db = self.__dict__['db'] | ||||
keys = db.keys( self.__dict__['keydir'] +"/*") | ||||
return "<PickleShareLink '%s': %s>" % ( | ||||
self.__dict__['keydir'], | ||||
";".join([Path(k).basename() for k in keys])) | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | def test(): | ||
db = PickleShareDB('~/testpickleshare') | ||||
db.clear() | ||||
print "Should be empty:",db.items() | ||||
db['hello'] = 15 | ||||
db['aku ankka'] = [1,2,313] | ||||
db['paths/nest/ok/keyname'] = [1,(5,46)] | ||||
vivainio
|
r720 | db.hset('hash', 'aku', 12) | ||
db.hset('hash', 'ankka', 313) | ||||
print "12 =",db.hget('hash','aku') | ||||
print "313 =",db.hget('hash','ankka') | ||||
print "all hashed",db.hdict('hash') | ||||
vivainio
|
r165 | print db.keys() | ||
print db.keys('paths/nest/ok/k*') | ||||
print dict(db) # snapsot of whole db | ||||
db.uncache() # frees memory, causes re-reads later | ||||
# shorthand for accessing deeply nested files | ||||
lnk = db.getlink('myobjects/test') | ||||
lnk.foo = 2 | ||||
lnk.bar = lnk.foo + 5 | ||||
print lnk.bar # 7 | ||||
def stress(): | ||||
db = PickleShareDB('~/fsdbtest') | ||||
import time,sys | ||||
for i in range(1000): | ||||
vivainio
|
r720 | for j in range(1000): | ||
vivainio
|
r165 | if i % 15 == 0 and i < 200: | ||
if str(j) in db: | ||||
del db[str(j)] | ||||
continue | ||||
if j%33 == 0: | ||||
time.sleep(0.02) | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())] | ||
vivainio
|
r720 | db.hset('hash',j, db.hget('hash',j,15) + 1 ) | ||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | print i, | ||
sys.stdout.flush() | ||||
if i % 10 == 0: | ||||
db.uncache() | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | def main(): | ||
import textwrap | ||||
usage = textwrap.dedent("""\ | ||||
Bernardo B. Marques
|
r4872 | pickleshare - manage PickleShare databases | ||
vivainio
|
r165 | Usage: | ||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | pickleshare dump /path/to/db > dump.txt | ||
pickleshare load /path/to/db < dump.txt | ||||
pickleshare test /path/to/db | ||||
""") | ||||
DB = PickleShareDB | ||||
import sys | ||||
if len(sys.argv) < 2: | ||||
print usage | ||||
return | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | cmd = sys.argv[1] | ||
args = sys.argv[2:] | ||||
if cmd == 'dump': | ||||
if not args: args= ['.'] | ||||
db = DB(args[0]) | ||||
import pprint | ||||
pprint.pprint(db.items()) | ||||
elif cmd == 'load': | ||||
cont = sys.stdin.read() | ||||
db = DB(args[0]) | ||||
data = eval(cont) | ||||
db.clear() | ||||
for k,v in db.items(): | ||||
db[k] = v | ||||
elif cmd == 'testwait': | ||||
db = DB(args[0]) | ||||
db.clear() | ||||
print db.waitget('250') | ||||
elif cmd == 'test': | ||||
test() | ||||
stress() | ||||
Bernardo B. Marques
|
r4872 | |||
vivainio
|
r165 | if __name__== "__main__": | ||
main() | ||||
Bernardo B. Marques
|
r4872 | |||