ipy_profile_zope.py
324 lines
| 9.1 KiB
| text/x-python
|
PythonLexer
vivainio
|
r822 | # -*- coding: utf-8 -*- | ||
Fernando Perez
|
r1875 | """ An ipython profile for zope and plone. | ||
Some ideas stolen from http://www.tomster.org. | ||||
Authors | ||||
------- | ||||
- Stefan Eletzhofer <stefan.eletzhofer@inquant.de> | ||||
""" | ||||
vivainio
|
r822 | # File: ipy_profile_zope.py | ||
# | ||||
# Copyright (c) InQuant GmbH | ||||
# | ||||
# | ||||
vivainio
|
r823 | # Distributed under the terms of the BSD License. The full license is in | ||
# the file COPYING, distributed as part of this software. | ||||
vivainio
|
r822 | |||
Brian Granger
|
r2027 | from IPython.core import ipapi | ||
Brian Granger
|
r2043 | from IPython.core import release | ||
vivainio
|
r822 | from types import StringType | ||
import sys | ||||
import os | ||||
import textwrap | ||||
Kiorky
|
r2580 | sys_oldstdin = sys.stdin | ||
vivainio
|
r822 | # The import below effectively obsoletes your old-style ipythonrc[.ini], | ||
# so consider yourself warned! | ||||
# import ipy_defaults | ||||
_marker = [] | ||||
def shasattr(obj, attr, acquire=False): | ||||
""" See Archetypes/utils.py | ||||
""" | ||||
if not acquire: | ||||
obj = obj.aq_base | ||||
return getattr(obj, attr, _marker) is not _marker | ||||
class ZopeDebug(object): | ||||
def __init__(self): | ||||
self.instancehome = os.environ.get( "INSTANCE_HOME" ) | ||||
configfile = os.environ.get( "CONFIG_FILE" ) | ||||
if configfile is None and self.instancehome is not None: | ||||
configfile = os.path.join( self.instancehome, "etc", "zope.conf" ) | ||||
if configfile is None: | ||||
raise RuntimeError( "CONFIG_FILE env not set" ) | ||||
print "CONFIG_FILE=", configfile | ||||
print "INSTANCE_HOME=", self.instancehome | ||||
self.configfile = configfile | ||||
try: | ||||
from Zope2 import configure | ||||
except ImportError: | ||||
from Zope import configure | ||||
configure( configfile ) | ||||
try: | ||||
import Zope2 | ||||
app = Zope2.app() | ||||
except ImportError: | ||||
import Zope | ||||
app = Zope.app() | ||||
from Testing.makerequest import makerequest | ||||
self.app = makerequest( app ) | ||||
try: | ||||
self._make_permissive() | ||||
print "Permissive security installed" | ||||
except: | ||||
print "Permissive security NOT installed" | ||||
self._pwd = self.portal or self.app | ||||
try: | ||||
from zope.component import getSiteManager | ||||
from zope.component import getGlobalSiteManager | ||||
from zope.app.component.hooks import setSite | ||||
if self.portal is not None: | ||||
setSite( self.portal ) | ||||
gsm = getGlobalSiteManager() | ||||
sm = getSiteManager() | ||||
if sm is gsm: | ||||
print "ERROR SETTING SITE!" | ||||
except: | ||||
pass | ||||
@property | ||||
def utils(self): | ||||
class Utils(object): | ||||
commit = self.commit | ||||
sync = self.sync | ||||
objectInfo = self.objectInfo | ||||
ls = self.ls | ||||
pwd = self.pwd | ||||
cd = self.cd | ||||
su = self.su | ||||
getCatalogInfo = self.getCatalogInfo | ||||
@property | ||||
def cwd(self): | ||||
return self.pwd() | ||||
return Utils() | ||||
@property | ||||
def namespace(self): | ||||
return dict( utils=self.utils, app=self.app, portal=self.portal ) | ||||
@property | ||||
def portal(self): | ||||
portals = self.app.objectValues( "Plone Site" ) | ||||
if len(portals): | ||||
return portals[0] | ||||
else: | ||||
raise KeyError( "No Plone Site found.") | ||||
def pwd(self): | ||||
return self._pwd | ||||
def _make_permissive(self): | ||||
""" | ||||
Make a permissive security manager with all rights. Hell, | ||||
we're developers, aren't we? Security is for whimps. :) | ||||
""" | ||||
from Products.CMFCore.tests.base.security import PermissiveSecurityPolicy | ||||
import AccessControl | ||||
from AccessControl.SecurityManagement import newSecurityManager | ||||
from AccessControl.SecurityManager import setSecurityPolicy | ||||
_policy = PermissiveSecurityPolicy() | ||||
self.oldpolicy = setSecurityPolicy(_policy) | ||||
newSecurityManager(None, AccessControl.User.system) | ||||
def su(self, username): | ||||
""" Change to named user. | ||||
""" | ||||
# TODO Make it easy to change back to permissive security. | ||||
user = self.portal.acl_users.getUser(username) | ||||
if not user: | ||||
print "Can't find %s in %s" % (username, self.portal.acl_users) | ||||
return | ||||
from AccessControl import ZopeSecurityPolicy | ||||
import AccessControl | ||||
from AccessControl.SecurityManagement import newSecurityManager, getSecurityManager | ||||
from AccessControl.SecurityManager import setSecurityPolicy | ||||
_policy = ZopeSecurityPolicy | ||||
self.oldpolicy = setSecurityPolicy(_policy) | ||||
wrapped_user = user.__of__(self.portal.acl_users) | ||||
newSecurityManager(None, user) | ||||
print 'User changed.' | ||||
return getSecurityManager().getUser() | ||||
def getCatalogInfo(self, obj=None, catalog='portal_catalog', query=None, sort_on='created', sort_order='reverse' ): | ||||
""" Inspect portal_catalog. Pass an object or object id for a | ||||
default query on that object, or pass an explicit query. | ||||
""" | ||||
if obj and query: | ||||
print "Ignoring %s, using query." % obj | ||||
catalog = self.portal.get(catalog) | ||||
if not catalog: | ||||
return 'No catalog' | ||||
indexes = catalog._catalog.indexes | ||||
if not query: | ||||
if type(obj) is StringType: | ||||
cwd = self.pwd() | ||||
obj = cwd.unrestrictedTraverse( obj ) | ||||
# If the default in the signature is mutable, its value will | ||||
# persist across invocations. | ||||
query = {} | ||||
if indexes.get('path'): | ||||
from string import join | ||||
path = join(obj.getPhysicalPath(), '/') | ||||
query.update({'path': path}) | ||||
if indexes.get('getID'): | ||||
query.update({'getID': obj.id, }) | ||||
if indexes.get('UID') and shasattr(obj, 'UID'): | ||||
query.update({'UID': obj.UID(), }) | ||||
if indexes.get(sort_on): | ||||
query.update({'sort_on': sort_on, 'sort_order': sort_order}) | ||||
if not query: | ||||
return 'Empty query' | ||||
results = catalog(**query) | ||||
result_info = [] | ||||
for r in results: | ||||
rid = r.getRID() | ||||
if rid: | ||||
result_info.append( | ||||
{'path': catalog.getpath(rid), | ||||
'metadata': catalog.getMetadataForRID(rid), | ||||
'indexes': catalog.getIndexDataForRID(rid), } | ||||
) | ||||
else: | ||||
result_info.append({'missing': rid}) | ||||
if len(result_info) == 1: | ||||
return result_info[0] | ||||
return result_info | ||||
def commit(self): | ||||
""" | ||||
Commit the transaction. | ||||
""" | ||||
try: | ||||
import transaction | ||||
transaction.get().commit() | ||||
except ImportError: | ||||
get_transaction().commit() | ||||
def sync(self): | ||||
""" | ||||
Sync the app's view of the zodb. | ||||
""" | ||||
self.app._p_jar.sync() | ||||
def objectInfo( self, o ): | ||||
""" | ||||
Return a descriptive string of an object | ||||
""" | ||||
Title = "" | ||||
t = getattr( o, 'Title', None ) | ||||
if t: | ||||
Title = t() | ||||
return {'id': o.getId(), | ||||
'Title': Title, | ||||
'portal_type': getattr( o, 'portal_type', o.meta_type), | ||||
'folderish': o.isPrincipiaFolderish | ||||
} | ||||
def cd( self, path ): | ||||
""" | ||||
Change current dir to a specific folder. | ||||
cd( ".." ) | ||||
cd( "/plone/Members/admin" ) | ||||
cd( portal.Members.admin ) | ||||
etc. | ||||
""" | ||||
if type(path) is not StringType: | ||||
path = '/'.join(path.getPhysicalPath()) | ||||
cwd = self.pwd() | ||||
x = cwd.unrestrictedTraverse( path ) | ||||
if x is None: | ||||
raise KeyError( "Can't cd to %s" % path ) | ||||
print "%s -> %s" % ( self.pwd().getId(), x.getId() ) | ||||
self._pwd = x | ||||
def ls( self, x=None ): | ||||
""" | ||||
List object(s) | ||||
""" | ||||
if type(x) is StringType: | ||||
cwd = self.pwd() | ||||
x = cwd.unrestrictedTraverse( x ) | ||||
if x is None: | ||||
x = self.pwd() | ||||
if x.isPrincipiaFolderish: | ||||
return [self.objectInfo(o) for id, o in x.objectItems()] | ||||
else: | ||||
return self.objectInfo( x ) | ||||
zope_debug = None | ||||
def ipy_set_trace(): | ||||
Brian Granger
|
r2015 | from IPython.core import debugger | ||
debugger.Pdb().set_trace() | ||||
vivainio
|
r822 | |||
def main(): | ||||
global zope_debug | ||||
ip = ipapi.get() | ||||
o = ip.options | ||||
# autocall to "full" mode (smart mode is default, I like full mode) | ||||
SOFTWARE_HOME = os.environ.get( "SOFTWARE_HOME" ) | ||||
sys.path.append( SOFTWARE_HOME ) | ||||
print "SOFTWARE_HOME=%s\n" % SOFTWARE_HOME | ||||
zope_debug = ZopeDebug() | ||||
# <HACK ALERT> | ||||
import pdb; | ||||
pdb.set_trace = ipy_set_trace | ||||
# </HACK ALERT> | ||||
# I like my banner minimal. | ||||
Brian Granger
|
r2043 | o.banner = "ZOPE Py %s IPy %s\n" % (sys.version.split('\n')[0],release.version) | ||
vivainio
|
r822 | |||
print textwrap.dedent("""\ | ||||
ZOPE mode iPython shell. | ||||
Bound names: | ||||
app | ||||
portal | ||||
utils.{ %s } | ||||
Uses the $SOFTWARE_HOME and $CONFIG_FILE environment | ||||
variables. | ||||
""" % ( ",".join([ x for x in dir(zope_debug.utils) if not x.startswith("_") ] ) ) ) | ||||
Kiorky
|
r2580 | sys.stdin = sys_oldstdin | ||
vivainio
|
r822 | ip.user_ns.update( zope_debug.namespace ) | ||
main() | ||||
# vim: set ft=python ts=4 sw=4 expandtab : | ||||