notification.py
219 lines
| 7.9 KiB
| text/x-python
|
PythonLexer
r1702 | # -*- coding: utf-8 -*- | |||
""" | ||||
rhodecode.model.notification | ||||
r1839 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |||
r1702 | ||||
Model for notifications | ||||
r1800 | ||||
r1702 | :created_on: Nov 20, 2011 | |||
:author: marcink | ||||
r1824 | :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com> | |||
r1702 | :license: GPLv3, see COPYING for more details. | |||
""" | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation, either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
r1717 | import os | |||
r1702 | import logging | |||
import traceback | ||||
r1717 | import datetime | |||
r1702 | ||||
from pylons.i18n.translation import _ | ||||
r1723 | import rhodecode | |||
r1717 | from rhodecode.lib import helpers as h | |||
r1702 | from rhodecode.model import BaseModel | |||
from rhodecode.model.db import Notification, User, UserNotification | ||||
r1712 | log = logging.getLogger(__name__) | |||
r1702 | ||||
r1716 | ||||
r1702 | class NotificationModel(BaseModel): | |||
r1712 | def __get_user(self, user): | |||
r1716 | if isinstance(user, basestring): | |||
r1712 | return User.get_by_username(username=user) | |||
else: | ||||
r1716 | return self._get_instance(User, user) | |||
r1712 | ||||
def __get_notification(self, notification): | ||||
if isinstance(notification, Notification): | ||||
return notification | ||||
elif isinstance(notification, int): | ||||
return Notification.get(notification) | ||||
else: | ||||
if notification: | ||||
raise Exception('notification must be int or Instance' | ||||
' of Notification got %s' % type(notification)) | ||||
r1731 | def create(self, created_by, subject, body, recipients=None, | |||
type_=Notification.TYPE_MESSAGE, with_email=True, | ||||
email_kwargs={}): | ||||
r1703 | """ | |||
r1800 | ||||
r1703 | Creates notification of given type | |||
r1800 | ||||
r1703 | :param created_by: int, str or User instance. User who created this | |||
notification | ||||
:param subject: | ||||
:param body: | ||||
r1818 | :param recipients: list of int, str or User objects, when None | |||
r1731 | is given send to all admins | |||
r1703 | :param type_: type of notification | |||
r1731 | :param with_email: send email with this notification | |||
:param email_kwargs: additional dict to pass as args to email template | ||||
r1703 | """ | |||
r1722 | from rhodecode.lib.celerylib import tasks, run_task | |||
r1702 | ||||
r1731 | if recipients and not getattr(recipients, '__iter__', False): | |||
r1702 | raise Exception('recipients must be a list of iterable') | |||
r1712 | created_by_obj = self.__get_user(created_by) | |||
r1702 | ||||
r1731 | if recipients: | |||
recipients_objs = [] | ||||
for u in recipients: | ||||
obj = self.__get_user(u) | ||||
if obj: | ||||
recipients_objs.append(obj) | ||||
recipients_objs = set(recipients_objs) | ||||
else: | ||||
# empty recipients means to all admins | ||||
recipients_objs = User.query().filter(User.admin == True).all() | ||||
r1717 | ||||
notif = Notification.create(created_by=created_by_obj, subject=subject, | ||||
body=body, recipients=recipients_objs, | ||||
type_=type_) | ||||
r1731 | if with_email is False: | |||
return notif | ||||
r1717 | # send email with notification | |||
for rec in recipients_objs: | ||||
email_subject = NotificationModel().make_description(notif, False) | ||||
r1731 | type_ = type_ | |||
r1717 | email_body = body | |||
r1800 | kwargs = {'subject': subject, 'body': h.rst_w_mentions(body)} | |||
r1731 | kwargs.update(email_kwargs) | |||
r1717 | email_body_html = EmailNotificationModel()\ | |||
r1731 | .get_email_tmpl(type_, **kwargs) | |||
r1722 | run_task(tasks.send_email, rec.email, email_subject, email_body, | |||
r1717 | email_body_html) | |||
return notif | ||||
r1702 | ||||
r1713 | def delete(self, user, notification): | |||
# we don't want to remove actual notification just the assignment | ||||
r1712 | try: | |||
r1713 | notification = self.__get_notification(notification) | |||
user = self.__get_user(user) | ||||
if notification and user: | ||||
r1717 | obj = UserNotification.query()\ | |||
.filter(UserNotification.user == user)\ | ||||
.filter(UserNotification.notification | ||||
== notification)\ | ||||
.one() | ||||
r1713 | self.sa.delete(obj) | |||
r1712 | return True | |||
except Exception: | ||||
log.error(traceback.format_exc()) | ||||
raise | ||||
r1702 | ||||
r1713 | def get_for_user(self, user): | |||
user = self.__get_user(user) | ||||
return user.notifications | ||||
r1702 | ||||
r1791 | def mark_all_read_for_user(self, user): | |||
user = self.__get_user(user) | ||||
UserNotification.query()\ | ||||
.filter(UserNotification.read==False)\ | ||||
.update({'read': True}) | ||||
r1713 | def get_unread_cnt_for_user(self, user): | |||
user = self.__get_user(user) | ||||
r1702 | return UserNotification.query()\ | |||
r1712 | .filter(UserNotification.read == False)\ | |||
r1713 | .filter(UserNotification.user == user).count() | |||
r1702 | ||||
r1713 | def get_unread_for_user(self, user): | |||
user = self.__get_user(user) | ||||
r1702 | return [x.notification for x in UserNotification.query()\ | |||
r1712 | .filter(UserNotification.read == False)\ | |||
r1713 | .filter(UserNotification.user == user).all()] | |||
r1712 | ||||
def get_user_notification(self, user, notification): | ||||
user = self.__get_user(user) | ||||
notification = self.__get_notification(notification) | ||||
return UserNotification.query()\ | ||||
.filter(UserNotification.notification == notification)\ | ||||
.filter(UserNotification.user == user).scalar() | ||||
r1717 | def make_description(self, notification, show_age=True): | |||
r1712 | """ | |||
Creates a human readable description based on properties | ||||
of notification object | ||||
""" | ||||
_map = {notification.TYPE_CHANGESET_COMMENT:_('commented on commit'), | ||||
notification.TYPE_MESSAGE:_('sent message'), | ||||
r1731 | notification.TYPE_MENTION:_('mentioned you'), | |||
notification.TYPE_REGISTRATION:_('registered in RhodeCode')} | ||||
r1717 | DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" | |||
r1712 | ||||
tmpl = "%(user)s %(action)s %(when)s" | ||||
r1717 | if show_age: | |||
when = h.age(notification.created_on) | ||||
else: | ||||
DTF = lambda d: datetime.datetime.strftime(d, DATETIME_FORMAT) | ||||
when = DTF(notification.created_on) | ||||
r1712 | data = dict(user=notification.created_by_user.username, | |||
action=_map[notification.type_], | ||||
r1717 | when=when) | |||
r1712 | return tmpl % data | |||
r1717 | ||||
class EmailNotificationModel(BaseModel): | ||||
r1732 | TYPE_CHANGESET_COMMENT = Notification.TYPE_CHANGESET_COMMENT | |||
r1717 | TYPE_PASSWORD_RESET = 'passoword_link' | |||
r1732 | TYPE_REGISTRATION = Notification.TYPE_REGISTRATION | |||
r1717 | TYPE_DEFAULT = 'default' | |||
def __init__(self): | ||||
r1723 | self._template_root = rhodecode.CONFIG['pylons.paths']['templates'][0] | |||
self._tmpl_lookup = rhodecode.CONFIG['pylons.app_globals'].mako_lookup | ||||
r1717 | ||||
self.email_types = { | ||||
self.TYPE_CHANGESET_COMMENT:'email_templates/changeset_comment.html', | ||||
self.TYPE_PASSWORD_RESET:'email_templates/password_reset.html', | ||||
self.TYPE_REGISTRATION:'email_templates/registration.html', | ||||
self.TYPE_DEFAULT:'email_templates/default.html' | ||||
} | ||||
def get_email_tmpl(self, type_, **kwargs): | ||||
""" | ||||
return generated template for email based on given type | ||||
r1818 | ||||
r1717 | :param type_: | |||
""" | ||||
r1723 | ||||
r1732 | base = self.email_types.get(type_, self.email_types[self.TYPE_DEFAULT]) | |||
r1723 | email_template = self._tmpl_lookup.get_template(base) | |||
r1717 | # translator inject | |||
_kwargs = {'_':_} | ||||
_kwargs.update(kwargs) | ||||
log.debug('rendering tmpl %s with kwargs %s' % (base, _kwargs)) | ||||
return email_template.render(**_kwargs) | ||||