[IMP] mail_mail, mail_message: various improvements to try to improve message creation time

Replaced some read by browse
Moved get_reply_to from mail_mail to mail_message
Hint: specifying email_from, reply_to help to enhance computation time

[REF] mail: cleaned some tests, renamed a file, moved mail_group tests into a dedicated file

bzr revid: tde@openerp.com-20130827133058-ko0g0ib0f0jihmdk
This commit is contained in:
Thibault Delavallée 2013-08-27 15:30:58 +02:00
parent 3a67d7dc99
commit b35a44f66d
13 changed files with 346 additions and 332 deletions

View File

@ -20,10 +20,10 @@
##############################################################################
import base64
from openerp.addons.mail.tests.test_mail_base import TestMailBase
from openerp.addons.mail.tests.common import TestMail
class test_message_compose(TestMailBase):
class test_message_compose(TestMail):
def setUp(self):
super(test_message_compose, self).setUp()

View File

@ -82,74 +82,11 @@ class mail_mail(osv.Model):
context = dict(context, default_type=None)
return super(mail_mail, self).default_get(cr, uid, fields, context=context)
def _get_reply_to(self, cr, uid, values, context=None):
""" Return a specific reply_to: alias of the document through message_get_reply_to
or take the email_from
"""
# if value specified: directly return it
if values.get('reply_to'):
return values.get('reply_to')
format_name = True # whether to use a 'Followers of Pigs <pigs@openerp.com' format
email_reply_to = None
ir_config_parameter = self.pool.get("ir.config_parameter")
catchall_domain = ir_config_parameter.get_param(cr, uid, "mail.catchall.domain", context=context)
# model, res_id, email_from: comes from values OR related message
model, res_id, email_from = values.get('model'), values.get('res_id'), values.get('email_from')
if values.get('mail_message_id'):
message = self.pool.get('mail.message').browse(cr, uid, values.get('mail_message_id'), context=context)
if message.reply_to:
email_reply_to = message.reply_to
format_name = False
if not model:
model = message.model
if not res_id:
res_id = message.res_id
if not email_from:
email_from = message.email_from
# if model and res_id: try to use ``message_get_reply_to`` that returns the document alias
if not email_reply_to and model and res_id and hasattr(self.pool[model], 'message_get_reply_to'):
email_reply_to = self.pool[model].message_get_reply_to(cr, uid, [res_id], context=context)[0]
# no alias reply_to -> catchall alias
if not email_reply_to:
catchall_alias = ir_config_parameter.get_param(cr, uid, "mail.catchall.alias", context=context)
if catchall_domain and catchall_alias:
email_reply_to = '%s@%s' % (catchall_alias, catchall_domain)
# still no reply_to -> reply_to will be the email_from
if not email_reply_to and email_from:
email_reply_to = email_from
# format 'Document name <email_address>'
if email_reply_to and model and res_id and format_name:
emails = tools.email_split(email_reply_to)
if emails:
email_reply_to = emails[0]
document_name = self.pool[model].name_get(cr, SUPERUSER_ID, [res_id], context=context)[0]
if document_name:
# sanitize document name
sanitized_doc_name = re.sub(r'[^\w+.]+', '-', document_name[1])
# generate reply to
email_reply_to = _('"Followers of %s" <%s>') % (sanitized_doc_name, email_reply_to)
return email_reply_to
def create(self, cr, uid, values, context=None):
# notification field: if not set, set if mail comes from an existing mail.message
if 'notification' not in values and values.get('mail_message_id'):
values['notification'] = True
mail_id = super(mail_mail, self).create(cr, uid, values, context=context)
# reply_to: if not set, set with default values that require creation values
# but delegate after creation because of mail_message.message_id automatic
# creation using existence of reply_to
if not values.get('reply_to'):
reply_to = self._get_reply_to(cr, uid, values, context=context)
if reply_to:
self.write(cr, uid, [mail_id], {'reply_to': reply_to}, context=context)
return mail_id
return super(mail_mail, self).create(cr, uid, values, context=context)
def unlink(self, cr, uid, ids, context=None):
# cascade-delete the parent message for all mails that are not created for a notification
@ -226,11 +163,6 @@ class mail_mail(osv.Model):
# mail_mail formatting, tools and send mechanism
#------------------------------------------------------
# TODO in 8.0(+): maybe factorize this to enable in modules link generation
# independently of mail_mail model
# TODO in 8.0(+): factorize doc name sanitized and 'Followers of ...' formatting
# because it begins to appear everywhere
def _get_partner_access_link(self, cr, uid, mail, partner=None, context=None):
""" Generate URLs for links in mails:
- partner is an user and has read access to the document: direct link to document with model, res_id

View File

@ -20,6 +20,8 @@
##############################################################################
import logging
import re
from openerp import tools
from email.header import decode_header
@ -87,6 +89,7 @@ class mail_message(osv.Model):
def _get_record_name(self, cr, uid, ids, name, arg, context=None):
""" Return the related document name, using name_get. It is done using
SUPERUSER_ID, to be sure to have the record name correctly stored. """
# return dict.fromkeys(ids, False)
# TDE note: regroup by model/ids, to have less queries to perform
result = dict.fromkeys(ids, False)
for message in self.read(cr, uid, ids, ['model', 'res_id'], context=context):
@ -206,11 +209,11 @@ class mail_message(osv.Model):
raise osv.except_osv(_('Invalid Action!'), _("Unable to send email, please configure the sender's email address or alias."))
def _get_default_author(self, cr, uid, context=None):
return self.pool.get('res.users').read(cr, uid, uid, ['partner_id'], context=context)['partner_id'][0]
return self.pool.get('res.users').browse(cr, SUPERUSER_ID, uid, context=context).partner_id.id
_defaults = {
'type': 'email',
'date': lambda *a: fields.datetime.now(),
'date': fields.datetime.now(),
'author_id': lambda self, cr, uid, ctx=None: self._get_default_author(cr, uid, ctx),
'body': '',
'email_from': lambda self, cr, uid, ctx=None: self._get_default_from(cr, uid, ctx),
@ -645,7 +648,8 @@ class mail_message(osv.Model):
elif not ids:
return ids
pid = self.pool.get('res.users').read(cr, uid, uid, ['partner_id'])['partner_id'][0]
# pid = self.pool.get('res.users').read(cr, uid, uid, ['partner_id'])['partner_id'][0]
pid = self.pool['res.users'].browse(cr, SUPERUSER_ID, uid, context=context).partner_id.id
author_ids, partner_ids, allowed_ids = set([]), set([]), set([])
model_ids = {}
@ -705,7 +709,7 @@ class mail_message(osv.Model):
ids = [ids]
not_obj = self.pool.get('mail.notification')
fol_obj = self.pool.get('mail.followers')
partner_id = self.pool.get('res.users').read(cr, uid, uid, ['partner_id'], context=None)['partner_id'][0]
partner_id = self.pool['res.users'].browse(cr, SUPERUSER_ID, uid, context=None).partner_id.id
# Read mail_message.ids to have their values
message_values = dict.fromkeys(ids)
@ -774,17 +778,66 @@ class mail_message(osv.Model):
_('The requested operation cannot be completed due to security restrictions. Please contact your system administrator.\n\n(Document type: %s, Operation: %s)') % \
(self._description, operation))
def _get_reply_to(self, cr, uid, values, context=None):
""" Return a specific reply_to: alias of the document through message_get_reply_to
or take the email_from
"""
email_reply_to = None
ir_config_parameter = self.pool.get("ir.config_parameter")
catchall_domain = ir_config_parameter.get_param(cr, uid, "mail.catchall.domain", context=context)
# model, res_id, email_from: comes from values OR related message
model, res_id, email_from = values.get('model'), values.get('res_id'), values.get('email_from')
# if model and res_id: try to use ``message_get_reply_to`` that returns the document alias
if not email_reply_to and model and res_id and catchall_domain and hasattr(self.pool[model], 'message_get_reply_to'):
email_reply_to = self.pool[model].message_get_reply_to(cr, uid, [res_id], context=context)[0]
# no alias reply_to -> catchall alias
if not email_reply_to and catchall_domain:
catchall_alias = ir_config_parameter.get_param(cr, uid, "mail.catchall.alias", context=context)
if catchall_alias:
email_reply_to = '%s@%s' % (catchall_alias, catchall_domain)
# still no reply_to -> reply_to will be the email_from
if not email_reply_to and email_from:
email_reply_to = email_from
# format 'Document name <email_address>'
if email_reply_to and model and res_id:
emails = tools.email_split(email_reply_to)
if emails:
email_reply_to = emails[0]
document_name = self.pool[model].name_get(cr, SUPERUSER_ID, [res_id], context=context)[0]
if document_name:
# sanitize document name
sanitized_doc_name = re.sub(r'[^\w+.]+', '-', document_name[1])
# generate reply to
email_reply_to = _('"Followers of %s" <%s>') % (sanitized_doc_name, email_reply_to)
return email_reply_to
def _get_message_id(self, cr, uid, values, context=None):
message_id = None
if not values.get('message_id') and values.get('reply_to'):
message_id = tools.generate_tracking_message_id('reply_to')
elif not values.get('message_id') and values.get('res_id') and values.get('model'):
message_id = tools.generate_tracking_message_id('%(res_id)s-%(model)s' % values)
elif not values.get('message_id'):
message_id = tools.generate_tracking_message_id('private')
return message_id
def create(self, cr, uid, values, context=None):
if context is None:
context = {}
default_starred = context.pop('default_starred', False)
# generate message_id, to redirect answers to the right discussion thread
if not values.get('message_id') and values.get('reply_to'):
values['message_id'] = tools.generate_tracking_message_id('reply_to')
elif not values.get('message_id') and values.get('res_id') and values.get('model'):
values['message_id'] = tools.generate_tracking_message_id('%(res_id)s-%(model)s' % values)
elif not values.get('message_id'):
values['message_id'] = tools.generate_tracking_message_id('private')
if not values.get('email_from'): # needed to compute reply_to
values['email_from'] = self._get_default_from(cr, uid, context=context)
if not values.get('message_id'):
values['message_id'] = self._get_message_id(cr, uid, values, context=context)
if not values.get('reply_to'):
values['reply_to'] = self._get_reply_to(cr, uid, values, context=context)
newid = super(mail_message, self).create(cr, uid, values, context)
self._notify(cr, uid, newid, context=context,
force_send=context.get('mail_notify_force_send', True),
@ -914,26 +967,28 @@ class mail_message(osv.Model):
if message.subtype_id and message.model and message.res_id:
fol_obj = self.pool.get("mail.followers")
# browse as SUPERUSER because rules could restrict the search results
fol_ids = fol_obj.search(cr, SUPERUSER_ID, [
('res_model', '=', message.model),
('res_id', '=', message.res_id),
('subtype_ids', 'in', message.subtype_id.id)
fol_ids = fol_obj.search(
cr, SUPERUSER_ID, [
('res_model', '=', message.model),
('res_id', '=', message.res_id),
('subtype_ids', 'in', message.subtype_id.id)
], context=context)
partners_to_notify |= set(fo.partner_id for fo in fol_obj.browse(cr, SUPERUSER_ID, fol_ids, context=context))
partners_to_notify |= set(fo.partner_id.id for fo in fol_obj.browse(cr, SUPERUSER_ID, fol_ids, context=context))
# remove me from notified partners, unless the message is written on my own wall
if message.subtype_id and message.author_id and message.model == "res.partner" and message.res_id == message.author_id.id:
partners_to_notify |= set([message.author_id])
partners_to_notify |= set([message.author_id.id])
elif message.author_id:
partners_to_notify -= set([message.author_id])
partners_to_notify -= set([message.author_id.id])
# all partner_ids of the mail.message have to be notified regardless of the above (even the author if explicitly added!)
if message.partner_ids:
partners_to_notify |= set(message.partner_ids)
partners_to_notify |= set([p.id for p in message.partner_ids])
# notify
if partners_to_notify:
notification_obj._notify(cr, uid, newid, partners_to_notify=[p.id for p in partners_to_notify], context=context,
force_send=force_send, user_signature=user_signature)
notification_obj._notify(
cr, uid, newid, partners_to_notify=list(partners_to_notify), context=context,
force_send=force_send, user_signature=user_signature
)
message.refresh()
# An error appear when a user receive a notification without notifying

View File

@ -19,9 +19,10 @@
#
##############################################################################
from . import test_mail_message, test_mail_features, test_mail_gateway, test_message_read, test_invite
from . import test_mail_group, test_mail_message, test_mail_features, test_mail_gateway, test_message_read, test_invite
checks = [
test_mail_group,
test_mail_message,
test_mail_features,
test_mail_gateway,

View File

@ -22,7 +22,7 @@
from openerp.tests import common
class TestMailBase(common.TransactionCase):
class TestMail(common.TransactionCase):
def _mock_smtp_gateway(self, *args, **kwargs):
return args[2]['Message-Id']
@ -39,7 +39,7 @@ class TestMailBase(common.TransactionCase):
return self._build_email(*args, **kwargs)
def setUp(self):
super(TestMailBase, self).setUp()
super(TestMail, self).setUp()
cr, uid = self.cr, self.uid
# Install mock SMTP gateway
@ -68,12 +68,46 @@ class TestMailBase(common.TransactionCase):
group_employee_ref = self.registry('ir.model.data').get_object_reference(cr, uid, 'base', 'group_user')
self.group_employee_id = group_employee_ref and group_employee_ref[1] or False
# Partner Data
# User Data: employee, noone
self.user_employee_id = self.res_users.create(cr, uid, {
'name': 'Ernest Employee',
'login': 'ernest',
'alias_name': 'ernest',
'email': 'e.e@example.com',
'signature': '--\nErnest',
'notification_email_send': 'comment',
'groups_id': [(6, 0, [self.group_employee_id])]
}, {'no_reset_password': True})
self.user_noone_id = self.res_users.create(cr, uid, {
'name': 'Noemie NoOne',
'login': 'noemie',
'alias_name': 'noemie',
'email': 'n.n@example.com',
'signature': '--\nNoemie',
'notification_email_send': 'comment',
'groups_id': [(6, 0, [])]
}, {'no_reset_password': True})
# Test users to use through the various tests
self.res_users.write(cr, uid, uid, {'name': 'Administrator'})
self.user_raoul_id = self.res_users.create(cr, uid,
{'name': 'Raoul Grosbedon', 'signature': 'SignRaoul', 'email': 'raoul@raoul.fr', 'login': 'raoul', 'alias_name': 'raoul', 'groups_id': [(6, 0, [self.group_employee_id])]})
self.user_bert_id = self.res_users.create(cr, uid,
{'name': 'Bert Tartignole', 'signature': 'SignBert', 'email': 'bert@bert.fr', 'login': 'bert', 'alias_name': 'bert', 'groups_id': [(6, 0, [])]})
self.user_raoul_id = self.res_users.create(cr, uid, {
'name': 'Raoul Grosbedon',
'signature': 'SignRaoul',
'email': 'raoul@raoul.fr',
'login': 'raoul',
'alias_name': 'raoul',
'groups_id': [(6, 0, [self.group_employee_id])]
})
self.user_bert_id = self.res_users.create(cr, uid, {
'name': 'Bert Tartignole',
'signature': 'SignBert',
'email': 'bert@bert.fr',
'login': 'bert',
'alias_name': 'bert',
'groups_id': [(6, 0, [])]
})
self.user_raoul = self.res_users.browse(cr, uid, self.user_raoul_id)
self.user_bert = self.res_users.browse(cr, uid, self.user_bert_id)
self.user_admin = self.res_users.browse(cr, uid, uid)
@ -82,13 +116,19 @@ class TestMailBase(common.TransactionCase):
self.partner_bert_id = self.user_bert.partner_id.id
# Test 'pigs' group to use through the various tests
self.group_pigs_id = self.mail_group.create(cr, uid,
self.group_pigs_id = self.mail_group.create(
cr, uid,
{'name': 'Pigs', 'description': 'Fans of Pigs, unite !', 'alias_name': 'group+pigs'},
{'mail_create_nolog': True})
{'mail_create_nolog': True}
)
self.group_pigs = self.mail_group.browse(cr, uid, self.group_pigs_id)
# Test mail.group: public to provide access to everyone
self.group_jobs_id = self.mail_group.create(cr, uid, {'name': 'Jobs', 'public': 'public'})
# Test mail.group: private to restrict access
self.group_priv_id = self.mail_group.create(cr, uid, {'name': 'Private', 'public': 'private'})
def tearDown(self):
# Remove mocks
self.registry('ir.mail_server').build_email = self._build_email
self.registry('ir.mail_server').send_email = self._send_email
super(TestMailBase, self).tearDown()
super(TestMail, self).tearDown()

View File

@ -19,10 +19,10 @@
#
##############################################################################
from openerp.addons.mail.tests.test_mail_base import TestMailBase
from openerp.addons.mail.tests.common import TestMail
class test_invite(TestMailBase):
class test_invite(TestMail):
def test_00_basic_invite(self):
cr, uid = self.cr, self.uid

View File

@ -21,12 +21,12 @@
from openerp.addons.mail.mail_mail import mail_mail
from openerp.addons.mail.mail_thread import mail_thread
from openerp.addons.mail.tests.test_mail_base import TestMailBase
from openerp.addons.mail.tests.common import TestMail
from openerp.tools import mute_logger, email_split
from openerp.tools.mail import html_sanitize
class test_mail(TestMailBase):
class test_mail(TestMail):
def test_000_alias_setup(self):
""" Test basic mail.alias setup works, before trying to use them for routing """

View File

@ -19,7 +19,7 @@
#
##############################################################################
from openerp.addons.mail.tests.test_mail_base import TestMailBase
from openerp.addons.mail.tests.common import TestMail
from openerp.tools import mute_logger
MAIL_TEMPLATE = """Return-Path: <whatever-2a840@postmaster.twitter.com>
@ -143,173 +143,9 @@ dGVzdAo=
--089e01536c4ed4d17204e49b8e96--"""
class TestMailgateway(TestMailBase):
class TestMailgateway(TestMail):
def test_00_partner_find_from_email(self):
""" Tests designed for partner fetch based on emails. """
cr, uid, user_raoul, group_pigs = self.cr, self.uid, self.user_raoul, self.group_pigs
# --------------------------------------------------
# Data creation
# --------------------------------------------------
# 1 - Partner ARaoul
p_a_id = self.res_partner.create(cr, uid, {'name': 'ARaoul', 'email': 'test@test.fr'})
# --------------------------------------------------
# CASE1: without object
# --------------------------------------------------
# Do: find partner with email -> first partner should be found
partner_info = self.mail_thread.message_partner_info_from_emails(cr, uid, None, ['Maybe Raoul <test@test.fr>'], link_mail=False)[0]
self.assertEqual(partner_info['full_name'], 'Maybe Raoul <test@test.fr>',
'mail_thread: message_partner_info_from_emails did not handle email')
self.assertEqual(partner_info['partner_id'], p_a_id,
'mail_thread: message_partner_info_from_emails wrong partner found')
# Data: add some data about partners
# 2 - User BRaoul
p_b_id = self.res_partner.create(cr, uid, {'name': 'BRaoul', 'email': 'test@test.fr', 'user_ids': [(4, user_raoul.id)]})
# Do: find partner with email -> first user should be found
partner_info = self.mail_thread.message_partner_info_from_emails(cr, uid, None, ['Maybe Raoul <test@test.fr>'], link_mail=False)[0]
self.assertEqual(partner_info['partner_id'], p_b_id,
'mail_thread: message_partner_info_from_emails wrong partner found')
# --------------------------------------------------
# CASE1: with object
# --------------------------------------------------
# Do: find partner in group where there is a follower with the email -> should be taken
self.mail_group.message_subscribe(cr, uid, [group_pigs.id], [p_b_id])
partner_info = self.mail_group.message_partner_info_from_emails(cr, uid, group_pigs.id, ['Maybe Raoul <test@test.fr>'], link_mail=False)[0]
self.assertEqual(partner_info['partner_id'], p_b_id,
'mail_thread: message_partner_info_from_emails wrong partner found')
def test_05_mail_message_mail_mail(self):
""" Tests designed for testing email values based on mail.message, aliases, ... """
cr, uid, user_raoul_id = self.cr, self.uid, self.user_raoul_id
# Data: update + generic variables
reply_to1 = '_reply_to1@example.com'
reply_to2 = '_reply_to2@example.com'
email_from1 = 'from@example.com'
alias_domain = 'schlouby.fr'
raoul_from = 'Raoul Grosbedon <raoul@raoul.fr>'
raoul_from_alias = 'Raoul Grosbedon <raoul@schlouby.fr>'
raoul_reply = '"Followers of Pigs" <raoul@raoul.fr>'
raoul_reply_alias = '"Followers of Pigs" <group+pigs@schlouby.fr>'
# Data: remove alias_domain to see emails with alias
param_ids = self.registry('ir.config_parameter').search(cr, uid, [('key', '=', 'mail.catchall.domain')])
self.registry('ir.config_parameter').unlink(cr, uid, param_ids)
# Do: free message; specified values > default values
msg_id = self.mail_message.create(cr, user_raoul_id, {'reply_to': reply_to1, 'email_from': email_from1})
msg = self.mail_message.browse(cr, user_raoul_id, msg_id)
# Test: message content
self.assertIn('reply_to', msg.message_id,
'mail_message: message_id should be specific to a mail_message with a given reply_to')
self.assertEqual(msg.reply_to, reply_to1,
'mail_message: incorrect reply_to: should come from values')
self.assertEqual(msg.email_from, email_from1,
'mail_message: incorrect email_from: should come from values')
# Do: create a mail_mail with the previous mail_message
mail_id = self.mail_mail.create(cr, user_raoul_id, {'mail_message_id': msg_id, 'state': 'cancel'})
mail = self.mail_mail.browse(cr, user_raoul_id, mail_id)
# Test: mail_mail content
self.assertEqual(mail.reply_to, reply_to1,
'mail_mail: incorrect reply_to: should come from mail.message')
self.assertEqual(mail.email_from, email_from1,
'mail_mail: incorrect email_from: should come from mail.message')
# Do: create a mail_mail with the previous mail_message + specified reply_to
mail_id = self.mail_mail.create(cr, user_raoul_id, {'mail_message_id': msg_id, 'state': 'cancel', 'reply_to': reply_to2})
mail = self.mail_mail.browse(cr, user_raoul_id, mail_id)
# Test: mail_mail content
self.assertEqual(mail.reply_to, reply_to2,
'mail_mail: incorrect reply_to: should come from values')
self.assertEqual(mail.email_from, email_from1,
'mail_mail: incorrect email_from: should come from mail.message')
# Do: mail_message attached to a document
msg_id = self.mail_message.create(cr, user_raoul_id, {'model': 'mail.group', 'res_id': self.group_pigs_id})
msg = self.mail_message.browse(cr, user_raoul_id, msg_id)
# Test: message content
self.assertIn('mail.group', msg.message_id,
'mail_message: message_id should contain model')
self.assertIn('%s' % self.group_pigs_id, msg.message_id,
'mail_message: message_id should contain res_id')
self.assertFalse(msg.reply_to,
'mail_message: incorrect reply_to: should not be generated if not specified')
self.assertEqual(msg.email_from, raoul_from,
'mail_message: incorrect email_from: should be Raoul')
# Do: create a mail_mail based on the previous mail_message
mail_id = self.mail_mail.create(cr, user_raoul_id, {'mail_message_id': msg_id, 'state': 'cancel'})
mail = self.mail_mail.browse(cr, user_raoul_id, mail_id)
# Test: mail_mail content
self.assertEqual(mail.reply_to, raoul_reply,
'mail_mail: incorrect reply_to: should be Raoul')
# Data: set catchall domain
self.registry('ir.config_parameter').set_param(cr, uid, 'mail.catchall.domain', alias_domain)
self.registry('ir.config_parameter').unlink(cr, uid, self.registry('ir.config_parameter').search(cr, uid, [('key', '=', 'mail.catchall.alias')]))
# Update message
self.mail_message.write(cr, user_raoul_id, [msg_id], {'email_from': False, 'reply_to': False})
msg.refresh()
# Do: create a mail_mail based on the previous mail_message
mail_id = self.mail_mail.create(cr, user_raoul_id, {'mail_message_id': msg_id, 'state': 'cancel'})
mail = self.mail_mail.browse(cr, user_raoul_id, mail_id)
# Test: mail_mail content
self.assertEqual(mail.reply_to, raoul_reply_alias,
'mail_mail: incorrect reply_to: should be Pigs alias')
# Update message: test alias on email_from
msg_id = self.mail_message.create(cr, user_raoul_id, {})
msg = self.mail_message.browse(cr, user_raoul_id, msg_id)
# Do: create a mail_mail based on the previous mail_message
mail_id = self.mail_mail.create(cr, user_raoul_id, {'mail_message_id': msg_id, 'state': 'cancel'})
mail = self.mail_mail.browse(cr, user_raoul_id, mail_id)
# Test: mail_mail content
self.assertEqual(mail.reply_to, raoul_from_alias,
'mail_mail: incorrect reply_to: should be message email_from using Raoul alias')
# Update message
self.mail_message.write(cr, user_raoul_id, [msg_id], {'res_id': False, 'email_from': 'someone@schlouby.fr', 'reply_to': False})
msg.refresh()
# Do: create a mail_mail based on the previous mail_message
mail_id = self.mail_mail.create(cr, user_raoul_id, {'mail_message_id': msg_id, 'state': 'cancel'})
mail = self.mail_mail.browse(cr, user_raoul_id, mail_id)
# Test: mail_mail content
self.assertEqual(mail.reply_to, msg.email_from,
'mail_mail: incorrect reply_to: should be message email_from')
# Data: set catchall alias
self.registry('ir.config_parameter').set_param(self.cr, self.uid, 'mail.catchall.alias', 'gateway')
# Update message
self.mail_message.write(cr, uid, [msg_id], {'email_from': False, 'reply_to': False})
msg.refresh()
# Do: create a mail_mail based on the previous mail_message
mail_id = self.mail_mail.create(cr, uid, {'mail_message_id': msg_id, 'state': 'cancel'})
mail = self.mail_mail.browse(cr, uid, mail_id)
# Test: mail_mail Content-Type
self.assertEqual(mail.reply_to, 'gateway@schlouby.fr',
'mail_mail: reply_to should equal the catchall email alias')
# Do: create a mail_mail
mail_id = self.mail_mail.create(cr, uid, {'state': 'cancel'})
mail = self.mail_mail.browse(cr, uid, mail_id)
# Test: mail_mail content
self.assertEqual(mail.reply_to, 'gateway@schlouby.fr',
'mail_mail: reply_to should equal the catchall email alias')
# Do: create a mail_mail
mail_id = self.mail_mail.create(cr, uid, {'state': 'cancel', 'reply_to': 'someone@example.com'})
mail = self.mail_mail.browse(cr, uid, mail_id)
# Test: mail_mail content
self.assertEqual(mail.reply_to, 'someone@example.com',
'mail_mail: reply_to should equal the rpely_to given to create')
def test_09_message_parse(self):
def test_00_message_parse(self):
""" Testing incoming emails parsing """
cr, uid = self.cr, self.uid
@ -738,9 +574,7 @@ class TestMailgateway(TestMailBase):
'message_post: private discussion: incorrect notified recipients')
self.assertEqual(msg.model, False,
'message_post: private discussion: context key "thread_model" not correctly ignored when having no res_id')
# Test: message reply_to and message-id
self.assertFalse(msg.reply_to,
'message_post: private discussion: initial message should not have any reply_to specified')
# Test: message-id
self.assertIn('openerp-private', msg.message_id,
'message_post: private discussion: message-id should contain the private keyword')

View File

@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
##############################################################################
#
# OpenERP, Open Source Business Applications
# Copyright (c) 2012-TODAY OpenERP S.A. <http://openerp.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
from openerp.addons.mail.tests.common import TestMail
from openerp.osv.orm import except_orm
from openerp.tools import mute_logger
class TestMailGroup(TestMail):
@mute_logger('openerp.addons.base.ir.ir_model', 'openerp.osv.orm')
def test_00_mail_group_access_rights(self):
""" Testing mail_group access rights and basic mail_thread features """
cr, uid, user_noone_id, user_employee_id = self.cr, self.uid, self.user_noone_id, self.user_employee_id
# Do: Bert reads Jobs -> ok, public
self.mail_group.read(cr, user_noone_id, [self.group_jobs_id])
# Do: Bert read Pigs -> ko, restricted to employees
with self.assertRaises(except_orm):
self.mail_group.read(cr, user_noone_id, [self.group_pigs_id])
# Do: Raoul read Pigs -> ok, belong to employees
self.mail_group.read(cr, user_employee_id, [self.group_pigs_id])
# Do: Bert creates a group -> ko, no access rights
with self.assertRaises(except_orm):
self.mail_group.create(cr, user_noone_id, {'name': 'Test'})
# Do: Raoul creates a restricted group -> ok
new_group_id = self.mail_group.create(cr, user_employee_id, {'name': 'Test'})
# Do: Bert added in followers, read -> ok, in followers
self.mail_group.message_subscribe_users(cr, uid, [new_group_id], [user_noone_id])
self.mail_group.read(cr, user_noone_id, [new_group_id])
# Do: Raoul reads Priv -> ko, private
with self.assertRaises(except_orm):
self.mail_group.read(cr, user_employee_id, [self.group_priv_id])
# Do: Raoul added in follower, read -> ok, in followers
self.mail_group.message_subscribe_users(cr, uid, [self.group_priv_id], [user_employee_id])
self.mail_group.read(cr, user_employee_id, [self.group_priv_id])
# Do: Raoul write on Jobs -> ok
self.mail_group.write(cr, user_employee_id, [self.group_priv_id], {'name': 'modified'})
# Do: Bert cannot write on Private -> ko (read but no write)
with self.assertRaises(except_orm):
self.mail_group.write(cr, user_noone_id, [self.group_priv_id], {'name': 're-modified'})
# Test: Bert cannot unlink the group
with self.assertRaises(except_orm):
self.mail_group.unlink(cr, user_noone_id, [self.group_priv_id])
# Do: Raoul unlinks the group, there are no followers and messages left
self.mail_group.unlink(cr, user_employee_id, [self.group_priv_id])
fol_ids = self.mail_followers.search(cr, uid, [('res_model', '=', 'mail.group'), ('res_id', '=', self.group_priv_id)])
self.assertFalse(fol_ids, 'unlinked document should not have any followers left')
msg_ids = self.mail_message.search(cr, uid, [('model', '=', 'mail.group'), ('res_id', '=', self.group_priv_id)])
self.assertFalse(msg_ids, 'unlinked document should not have any followers left')

View File

@ -19,66 +19,147 @@
#
##############################################################################
from openerp.addons.mail.tests.test_mail_base import TestMailBase
from openerp.addons.mail.tests.common import TestMail
from openerp.osv.orm import except_orm
from openerp.tools import mute_logger
class test_mail_access_rights(TestMailBase):
class TestMailMail(TestMail):
def setUp(self):
super(test_mail_access_rights, self).setUp()
cr, uid = self.cr, self.uid
def test_00_partner_find_from_email(self):
""" Tests designed for partner fetch based on emails. """
cr, uid, user_raoul, group_pigs = self.cr, self.uid, self.user_raoul, self.group_pigs
# Test mail.group: public to provide access to everyone
self.group_jobs_id = self.mail_group.create(cr, uid, {'name': 'Jobs', 'public': 'public'})
# Test mail.group: private to restrict access
self.group_priv_id = self.mail_group.create(cr, uid, {'name': 'Private', 'public': 'private'})
# --------------------------------------------------
# Data creation
# --------------------------------------------------
# 1 - Partner ARaoul
p_a_id = self.res_partner.create(cr, uid, {'name': 'ARaoul', 'email': 'test@test.fr'})
@mute_logger('openerp.addons.base.ir.ir_model', 'openerp.osv.orm')
def test_00_mail_group_access_rights(self):
""" Testing mail_group access rights and basic mail_thread features """
cr, uid, user_bert_id, user_raoul_id = self.cr, self.uid, self.user_bert_id, self.user_raoul_id
# --------------------------------------------------
# CASE1: without object
# --------------------------------------------------
# Do: Bert reads Jobs -> ok, public
self.mail_group.read(cr, user_bert_id, [self.group_jobs_id])
# Do: Bert read Pigs -> ko, restricted to employees
self.assertRaises(except_orm, self.mail_group.read,
cr, user_bert_id, [self.group_pigs_id])
# Do: Raoul read Pigs -> ok, belong to employees
self.mail_group.read(cr, user_raoul_id, [self.group_pigs_id])
# Do: find partner with email -> first partner should be found
partner_info = self.mail_thread.message_partner_info_from_emails(cr, uid, None, ['Maybe Raoul <test@test.fr>'], link_mail=False)[0]
self.assertEqual(partner_info['full_name'], 'Maybe Raoul <test@test.fr>',
'mail_thread: message_partner_info_from_emails did not handle email')
self.assertEqual(partner_info['partner_id'], p_a_id,
'mail_thread: message_partner_info_from_emails wrong partner found')
# Do: Bert creates a group -> ko, no access rights
self.assertRaises(except_orm, self.mail_group.create,
cr, user_bert_id, {'name': 'Test'})
# Do: Raoul creates a restricted group -> ok
new_group_id = self.mail_group.create(cr, user_raoul_id, {'name': 'Test'})
# Do: Bert added in followers, read -> ok, in followers
self.mail_group.message_subscribe_users(cr, uid, [new_group_id], [user_bert_id])
self.mail_group.read(cr, user_bert_id, [new_group_id])
# Data: add some data about partners
# 2 - User BRaoul
p_b_id = self.res_partner.create(cr, uid, {'name': 'BRaoul', 'email': 'test@test.fr', 'user_ids': [(4, user_raoul.id)]})
# Do: Raoul reads Priv -> ko, private
self.assertRaises(except_orm, self.mail_group.read,
cr, user_raoul_id, [self.group_priv_id])
# Do: Raoul added in follower, read -> ok, in followers
self.mail_group.message_subscribe_users(cr, uid, [self.group_priv_id], [user_raoul_id])
self.mail_group.read(cr, user_raoul_id, [self.group_priv_id])
# Do: find partner with email -> first user should be found
partner_info = self.mail_thread.message_partner_info_from_emails(cr, uid, None, ['Maybe Raoul <test@test.fr>'], link_mail=False)[0]
self.assertEqual(partner_info['partner_id'], p_b_id,
'mail_thread: message_partner_info_from_emails wrong partner found')
# Do: Raoul write on Jobs -> ok
self.mail_group.write(cr, user_raoul_id, [self.group_priv_id], {'name': 'modified'})
# Do: Bert cannot write on Private -> ko (read but no write)
self.assertRaises(except_orm, self.mail_group.write,
cr, user_bert_id, [self.group_priv_id], {'name': 're-modified'})
# Test: Bert cannot unlink the group
self.assertRaises(except_orm,
self.mail_group.unlink,
cr, user_bert_id, [self.group_priv_id])
# Do: Raoul unlinks the group, there are no followers and messages left
self.mail_group.unlink(cr, user_raoul_id, [self.group_priv_id])
fol_ids = self.mail_followers.search(cr, uid, [('res_model', '=', 'mail.group'), ('res_id', '=', self.group_priv_id)])
self.assertFalse(fol_ids, 'unlinked document should not have any followers left')
msg_ids = self.mail_message.search(cr, uid, [('model', '=', 'mail.group'), ('res_id', '=', self.group_priv_id)])
self.assertFalse(msg_ids, 'unlinked document should not have any followers left')
# --------------------------------------------------
# CASE1: with object
# --------------------------------------------------
# Do: find partner in group where there is a follower with the email -> should be taken
self.mail_group.message_subscribe(cr, uid, [group_pigs.id], [p_b_id])
partner_info = self.mail_group.message_partner_info_from_emails(cr, uid, group_pigs.id, ['Maybe Raoul <test@test.fr>'], link_mail=False)[0]
self.assertEqual(partner_info['partner_id'], p_b_id,
'mail_thread: message_partner_info_from_emails wrong partner found')
class TestMailMessage(TestMail):
def test_00_mail_message_values(self):
""" Tests designed for testing email values based on mail.message, aliases, ... """
cr, uid, user_raoul_id = self.cr, self.uid, self.user_raoul_id
# Data: update + generic variables
reply_to1 = '_reply_to1@example.com'
reply_to2 = '_reply_to2@example.com'
email_from1 = 'from@example.com'
alias_domain = 'schlouby.fr'
raoul_from = 'Raoul Grosbedon <raoul@raoul.fr>'
raoul_from_alias = 'Raoul Grosbedon <raoul@schlouby.fr>'
raoul_reply = '"Followers of Pigs" <raoul@raoul.fr>'
raoul_reply_alias = '"Followers of Pigs" <group+pigs@schlouby.fr>'
# --------------------------------------------------
# Case1: without alias_domain
# --------------------------------------------------
param_ids = self.registry('ir.config_parameter').search(cr, uid, [('key', '=', 'mail.catchall.domain')])
self.registry('ir.config_parameter').unlink(cr, uid, param_ids)
# Do: free message; specified values > default values
msg_id = self.mail_message.create(cr, user_raoul_id, {'reply_to': reply_to1, 'email_from': email_from1})
msg = self.mail_message.browse(cr, user_raoul_id, msg_id)
# Test: message content
self.assertIn('reply_to', msg.message_id,
'mail_message: message_id should be specific to a mail_message with a given reply_to')
self.assertEqual(msg.reply_to, reply_to1,
'mail_message: incorrect reply_to: should come from values')
self.assertEqual(msg.email_from, email_from1,
'mail_message: incorrect email_from: should come from values')
# Do: create a mail_mail with the previous mail_message + specified reply_to
mail_id = self.mail_mail.create(cr, user_raoul_id, {'mail_message_id': msg_id, 'state': 'cancel', 'reply_to': reply_to2})
mail = self.mail_mail.browse(cr, user_raoul_id, mail_id)
# Test: mail_mail content
self.assertEqual(mail.reply_to, reply_to2,
'mail_mail: incorrect reply_to: should come from values')
self.assertEqual(mail.email_from, email_from1,
'mail_mail: incorrect email_from: should come from mail.message')
# Do: mail_message attached to a document
msg_id = self.mail_message.create(cr, user_raoul_id, {'model': 'mail.group', 'res_id': self.group_pigs_id})
msg = self.mail_message.browse(cr, user_raoul_id, msg_id)
# Test: message content
self.assertIn('mail.group', msg.message_id,
'mail_message: message_id should contain model')
self.assertIn('%s' % self.group_pigs_id, msg.message_id,
'mail_message: message_id should contain res_id')
self.assertEqual(msg.reply_to, raoul_reply,
'mail_message: incorrect reply_to: should be Raoul')
self.assertEqual(msg.email_from, raoul_from,
'mail_message: incorrect email_from: should be Raoul')
# --------------------------------------------------
# Case2: with alias_domain, without catchall alias
# --------------------------------------------------
self.registry('ir.config_parameter').set_param(cr, uid, 'mail.catchall.domain', alias_domain)
self.registry('ir.config_parameter').unlink(cr, uid, self.registry('ir.config_parameter').search(cr, uid, [('key', '=', 'mail.catchall.alias')]))
# Update message
msg_id = self.mail_message.create(cr, user_raoul_id, {'model': 'mail.group', 'res_id': self.group_pigs_id})
msg = self.mail_message.browse(cr, user_raoul_id, msg_id)
# Test: generated reply_to
self.assertEqual(msg.reply_to, raoul_reply_alias,
'mail_mail: incorrect reply_to: should be Pigs alias')
# Update message: test alias on email_from
msg_id = self.mail_message.create(cr, user_raoul_id, {})
msg = self.mail_message.browse(cr, user_raoul_id, msg_id)
# Test: generated reply_to
self.assertEqual(msg.reply_to, raoul_from_alias,
'mail_mail: incorrect reply_to: should be message email_from using Raoul alias')
# --------------------------------------------------
# Case2: with alias_domain and catchall alias
# --------------------------------------------------
self.registry('ir.config_parameter').set_param(self.cr, self.uid, 'mail.catchall.alias', 'gateway')
# Update message
msg_id = self.mail_message.create(cr, user_raoul_id, {})
msg = self.mail_message.browse(cr, user_raoul_id, msg_id)
# Test: generated reply_to
self.assertEqual(msg.reply_to, 'gateway@schlouby.fr',
'mail_mail: reply_to should equal the catchall email alias')
# Do: create a mail_mail
mail_id = self.mail_mail.create(cr, uid, {'state': 'cancel', 'reply_to': 'someone@example.com'})
mail = self.mail_mail.browse(cr, uid, mail_id)
# Test: mail_mail content
self.assertEqual(mail.reply_to, 'someone@example.com',
'mail_mail: reply_to should equal the rpely_to given to create')
@mute_logger('openerp.addons.base.ir.ir_model', 'openerp.osv.orm')
def test_10_mail_message_search_access_rights(self):

View File

@ -19,10 +19,10 @@
#
##############################################################################
from openerp.addons.mail.tests.test_mail_base import TestMailBase
from openerp.addons.mail.tests.common import TestMail
class test_mail_access_rights(TestMailBase):
class test_mail_access_rights(TestMail):
def test_00_message_read(self):
""" Tests for message_read and expandables. """

View File

@ -19,12 +19,12 @@
#
##############################################################################
from openerp.addons.mail.tests.test_mail_base import TestMailBase
from openerp.addons.mail.tests.common import TestMail
from openerp.osv.orm import except_orm
from openerp.tools.misc import mute_logger
class test_portal(TestMailBase):
class test_portal(TestMail):
def setUp(self):
super(test_portal, self).setUp()

View File

@ -19,10 +19,10 @@
#
##############################################################################
from openerp.addons.mail.tests.test_mail_base import TestMailBase
from openerp.addons.mail.tests.common import TestMail
class TestProjectBase(TestMailBase):
class TestProjectBase(TestMail):
def setUp(self):
super(TestProjectBase, self).setUp()