[MERGE] Forward-port of 7.0 bugfixes, up to rev. 5251 revision-id: odo@openerp.com-20140304152927-havybom9x1jgdzae

bzr revid: odo@openerp.com-20140305213115-w55dqs3xra6ejmni
This commit is contained in:
Olivier Dony 2014-03-05 22:31:15 +01:00
commit 07fbbe288c
4 changed files with 164 additions and 73 deletions

View File

@ -33,7 +33,7 @@ from openerp.osv.orm import Model
from openerp.tools.safe_eval import safe_eval as eval
from openerp.tools import config
from openerp.tools.translate import _
from openerp.osv.orm import except_orm, browse_record
from openerp.osv.orm import except_orm, browse_record, MAGIC_COLUMNS
_logger = logging.getLogger(__name__)
@ -300,6 +300,8 @@ class ir_model_fields(osv.osv):
def _drop_column(self, cr, uid, ids, context=None):
for field in self.browse(cr, uid, ids, context):
if field.name in MAGIC_COLUMNS:
continue
model = self.pool[field.model]
cr.execute('select relkind from pg_class where relname=%s', (model._table,))
result = cr.fetchone()

View File

@ -9,6 +9,7 @@ class test_base(common.TransactionCase):
super(test_base,self).setUp()
self.res_partner = self.registry('res.partner')
self.res_users = self.registry('res.users')
self.res_partner_title = self.registry('res.partner.title')
# samples use effective TLDs from the Mozilla public suffix
# list at http://publicsuffix.org
@ -285,27 +286,99 @@ class test_base(common.TransactionCase):
def test_60_read_group(self):
cr, uid = self.cr, self.uid
for user_data in [
{'name': 'Alice', 'login': 'alice', 'color': 1, 'function': 'Friend'},
{'name': 'Bob', 'login': 'bob', 'color': 2, 'function': 'Friend'},
{'name': 'Eve', 'login': 'eve', 'color': 3, 'function': 'Eavesdropper'},
{'name': 'Nab', 'login': 'nab', 'color': 2, 'function': '5$ Wrench'},
]:
self.res_users.create(cr, uid, user_data)
title_sir = self.res_partner_title.create(cr, uid, {'name': 'Sir', 'domain': 'contact'})
title_lady = self.res_partner_title.create(cr, uid, {'name': 'Lady', 'domain': 'contact'})
test_users = [
{'name': 'Alice', 'login': 'alice', 'color': 1, 'function': 'Friend', 'date': '2015-03-28', 'title': title_lady},
{'name': 'Alice', 'login': 'alice2', 'color': 0, 'function': 'Friend', 'date': '2015-01-28', 'title': title_lady},
{'name': 'Bob', 'login': 'bob', 'color': 2, 'function': 'Friend', 'date': '2015-03-02', 'title': title_sir},
{'name': 'Eve', 'login': 'eve', 'color': 3, 'function': 'Eavesdropper', 'date': '2015-03-20', 'title': title_lady},
{'name': 'Nab', 'login': 'nab', 'color': -3, 'function': '5$ Wrench', 'date': '2014-09-10', 'title': title_sir},
{'name': 'Nab', 'login': 'nab-she', 'color': 6, 'function': '5$ Wrench', 'date': '2014-01-02', 'title': title_lady},
]
ids = [self.res_users.create(cr, uid, u) for u in test_users]
domain = [('id', 'in', ids)]
groups_data = self.res_users.read_group(cr, uid, domain=[('login', 'in', ('alice', 'bob', 'eve'))], fields=['name', 'color', 'function'], groupby='function')
self.assertEqual(len(groups_data), 2, "Incorrect number of results when grouping on a field")
# group on local char field without domain and without active_test (-> empty WHERE clause)
groups_data = self.res_users.read_group(cr, uid, [], fields=['login'], groupby=['login'], orderby='login DESC', context={'active_test': False})
self.assertGreater(len(groups_data), 6, "Incorrect number of results when grouping on a field")
# group on local char field with limit
groups_data = self.res_users.read_group(cr, uid, domain, fields=['login'], groupby=['login'], orderby='login DESC', limit=3, offset=3)
self.assertEqual(len(groups_data), 3, "Incorrect number of results when grouping on a field with limit")
self.assertEqual(['bob', 'alice2', 'alice'], [g['login'] for g in groups_data], 'Result mismatch')
# group on inherited char field, aggregate on int field (second groupby ignored on purpose)
groups_data = self.res_users.read_group(cr, uid, domain, fields=['name', 'color', 'function'], groupby=['function', 'login'])
self.assertEqual(len(groups_data), 3, "Incorrect number of results when grouping on a field")
self.assertEqual(['5$ Wrench', 'Eavesdropper', 'Friend'], [g['function'] for g in groups_data], 'incorrect read_group order')
for group_data in groups_data:
self.assertIn('color', group_data, "Aggregated data for the column 'color' is not present in read_group return values")
self.assertEqual(group_data['color'], 3, "Incorrect sum for aggregated data for the column 'color'")
self.assertIn('color', group_data, "Aggregated data for the column 'color' is not present in read_group return values")
self.assertEqual(group_data['color'], 3, "Incorrect sum for aggregated data for the column 'color'")
groups_data = self.res_users.read_group(cr, uid, domain=[('login', 'in', ('alice', 'bob', 'eve'))], fields=['name', 'color'], groupby='name', orderby='name DESC, color asc')
self.assertEqual(len(groups_data), 3, "Incorrect number of results when grouping on a field")
self.assertEqual([user['name'] for user in groups_data], ['Eve', 'Bob', 'Alice'], 'Incorrect ordering of the list')
# group on inherited char field, reverse order
groups_data = self.res_users.read_group(cr, uid, domain, fields=['name', 'color'], groupby='name', orderby='name DESC')
self.assertEqual(['Nab', 'Eve', 'Bob', 'Alice'], [g['name'] for g in groups_data], 'Incorrect ordering of the list')
# group on int field, default ordering
groups_data = self.res_users.read_group(cr, uid, domain, fields=['color'], groupby='color')
self.assertEqual([-3, 0, 1, 2, 3, 6], [g['color'] for g in groups_data], 'Incorrect ordering of the list')
# multi group, second level is int field, should still be summed in first level grouping
groups_data = self.res_users.read_group(cr, uid, domain, fields=['name', 'color'], groupby=['name', 'color'], orderby='name DESC')
self.assertEqual(['Nab', 'Eve', 'Bob', 'Alice'], [g['name'] for g in groups_data], 'Incorrect ordering of the list')
self.assertEqual([3, 3, 2, 1], [g['color'] for g in groups_data], 'Incorrect ordering of the list')
# group on inherited char field, multiple orders with directions
groups_data = self.res_users.read_group(cr, uid, domain, fields=['name', 'color'], groupby='name', orderby='color DESC, name')
self.assertEqual(len(groups_data), 4, "Incorrect number of results when grouping on a field")
self.assertEqual(['Eve', 'Nab', 'Bob', 'Alice'], [g['name'] for g in groups_data], 'Incorrect ordering of the list')
self.assertEqual([1, 2, 1, 2], [g['name_count'] for g in groups_data], 'Incorrect number of results')
# group on inherited date column (res_partner.date) -> Year-Month, default ordering
groups_data = self.res_users.read_group(cr, uid, domain, fields=['function', 'color', 'date'], groupby=['date'])
self.assertEqual(len(groups_data), 4, "Incorrect number of results when grouping on a field")
self.assertEqual(['January 2014', 'September 2014', 'January 2015', 'March 2015'], [g['date'] for g in groups_data], 'Incorrect ordering of the list')
self.assertEqual([1, 1, 1, 3], [g['date_count'] for g in groups_data], 'Incorrect number of results')
# group on inherited date column (res_partner.date) -> Year-Month, custom order
groups_data = self.res_users.read_group(cr, uid, domain, fields=['function', 'color', 'date'], groupby=['date'], orderby='date DESC')
self.assertEqual(len(groups_data), 4, "Incorrect number of results when grouping on a field")
self.assertEqual(['March 2015', 'January 2015', 'September 2014', 'January 2014'], [g['date'] for g in groups_data], 'Incorrect ordering of the list')
self.assertEqual([3, 1, 1, 1], [g['date_count'] for g in groups_data], 'Incorrect number of results')
# group on inherited many2one (res_partner.title), default order
groups_data = self.res_users.read_group(cr, uid, domain, fields=['function', 'color', 'title'], groupby=['title'])
self.assertEqual(len(groups_data), 2, "Incorrect number of results when grouping on a field")
# m2o is returned as a (id, label) pair
self.assertEqual([(title_lady, 'Lady'), (title_sir, 'Sir')], [g['title'] for g in groups_data], 'Incorrect ordering of the list')
self.assertEqual([4, 2], [g['title_count'] for g in groups_data], 'Incorrect number of results')
self.assertEqual([10, -1], [g['color'] for g in groups_data], 'Incorrect aggregation of int column')
# group on inherited many2one (res_partner.title), reversed natural order
groups_data = self.res_users.read_group(cr, uid, domain, fields=['function', 'color', 'title'], groupby=['title'], orderby="title desc")
self.assertEqual(len(groups_data), 2, "Incorrect number of results when grouping on a field")
# m2o is returned as a (id, label) pair
self.assertEqual([(title_sir, 'Sir'), (title_lady, 'Lady')], [g['title'] for g in groups_data], 'Incorrect ordering of the list')
self.assertEqual([2, 4], [g['title_count'] for g in groups_data], 'Incorrect number of results')
self.assertEqual([-1, 10], [g['color'] for g in groups_data], 'Incorrect aggregation of int column')
# group on inherited many2one (res_partner.title), multiple orders with m2o in second position
groups_data = self.res_users.read_group(cr, uid, domain, fields=['function', 'color', 'title'], groupby=['title'], orderby="color desc, title desc")
self.assertEqual(len(groups_data), 2, "Incorrect number of results when grouping on a field")
# m2o is returned as a (id, label) pair
self.assertEqual([(title_lady, 'Lady'), (title_sir, 'Sir')], [g['title'] for g in groups_data], 'Incorrect ordering of the result')
self.assertEqual([4, 2], [g['title_count'] for g in groups_data], 'Incorrect number of results')
self.assertEqual([10, -1], [g['color'] for g in groups_data], 'Incorrect aggregation of int column')
# group on inherited many2one (res_partner.title), ordered by other inherited field (color)
groups_data = self.res_users.read_group(cr, uid, domain, fields=['function', 'color', 'title'], groupby=['title'], orderby='color')
self.assertEqual(len(groups_data), 2, "Incorrect number of results when grouping on a field")
# m2o is returned as a (id, label) pair
self.assertEqual([(title_sir, 'Sir'), (title_lady, 'Lady')], [g['title'] for g in groups_data], 'Incorrect ordering of the list')
self.assertEqual([2, 4], [g['title_count'] for g in groups_data], 'Incorrect number of results')
self.assertEqual([-1, 10], [g['color'] for g in groups_data], 'Incorrect aggregation of int column')
groups_data = self.res_users.read_group(cr, uid, domain=[('login', 'in', ('alice', 'bob', 'eve', 'nab'))], fields=['function', 'color'], groupby='function', orderby='color ASC')
self.assertEqual(len(groups_data), 3, "Incorrect number of results when grouping on a field")
self.assertEqual(groups_data, sorted(groups_data, key=lambda x: x['color']), 'Incorrect ordering of the list')
class test_partner_recursion(common.TransactionCase):

View File

@ -74,7 +74,7 @@ _schema = logging.getLogger(__name__ + '.schema')
# List of etree._Element subclasses that we choose to ignore when parsing XML.
from openerp.tools import SKIPPED_ELEMENT_TYPES
regex_order = re.compile('^(([a-z0-9_]+|"[a-z0-9_]+")( *desc| *asc)?( *, *|))+$', re.I)
regex_order = re.compile('^( *([a-z0-9_]+|"[a-z0-9_]+")( *desc| *asc)?( *, *|))+$', re.I)
regex_object_name = re.compile(r'^[a-z0-9_.]+$')
# TODO for trunk, raise the value to 1000
@ -1124,7 +1124,7 @@ class BaseModel(object):
def _get_xml_id(self, cr, uid, r):
model_data = self.pool.get('ir.model.data')
data_ids = model_data.search(cr, uid, [('model', '=', r._table_name), ('res_id', '=', r['id'])])
data_ids = model_data.search(cr, uid, [('model', '=', r._model._name), ('res_id', '=', r['id'])])
if len(data_ids):
d = model_data.read(cr, uid, data_ids, ['name', 'module'])[0]
if d['module']:
@ -1134,13 +1134,13 @@ class BaseModel(object):
else:
postfix = 0
while True:
n = self._table+'_'+str(r['id']) + (postfix and ('_'+str(postfix)) or '' )
n = r._model._table+'_'+str(r['id']) + (postfix and ('_'+str(postfix)) or '' )
if not model_data.search(cr, uid, [('name', '=', n)]):
break
postfix += 1
model_data.create(cr, SUPERUSER_ID, {
'name': n,
'model': self._name,
'model': r._model._name,
'res_id': r['id'],
'module': '__export__',
})
@ -2604,36 +2604,42 @@ class BaseModel(object):
r['__fold'] = folded.get(r[groupby] and r[groupby][0], False)
return result
def _read_group_generate_order_by(self, orderby, aggregated_fields, groupby, query):
def _read_group_prepare(self, orderby, aggregated_fields, groupby, qualified_groupby_field, query, groupby_type=None):
"""
Generates the ORDER BY sql clause for the read group method. Adds the missing JOIN clause
Prepares the GROUP BY and ORDER BY terms for the read_group method. Adds the missing JOIN clause
to the query if order should be computed against m2o field.
:param orderby: the orderby definition in the form "%(field)s %(order)s"
:param aggregated_fields: list of aggregated fields in the query
:param groupby: the current groupby field name
:param query: the query object used to construct the query afterwards
:param qualified_groupby_field: the fully qualified SQL name for the grouped field
:param osv.Query query: the query under construction
:param groupby_type: the type of the grouped field
:return: (groupby_terms, orderby_terms)
"""
orderby_list = []
ob = []
for order_splits in orderby.split(','):
order_split = order_splits.split()
orderby_field = order_split[0]
fields = openerp.osv.fields
if isinstance(self._all_columns[orderby_field].column, (fields.date, fields.datetime)):
continue
orderby_dir = len(order_split) == 2 and order_split[1].upper() == 'ASC' and 'ASC' or 'DESC'
if orderby_field == groupby:
orderby_item = self._generate_order_by(order_splits, query).replace('ORDER BY ', '')
if orderby_item:
orderby_list.append(orderby_item)
ob += [obi.split()[0] for obi in orderby_item.split(',')]
elif orderby_field in aggregated_fields:
orderby_list.append('%s %s' % (orderby_field,orderby_dir))
orderby_terms = []
groupby_terms = [qualified_groupby_field] if groupby else []
if not orderby:
return groupby_terms, orderby_terms
if orderby_list:
return ' ORDER BY %s' % (','.join(orderby_list)), ob and ','.join(ob) or ''
else:
return '', ''
self._check_qorder(orderby)
for order_part in orderby.split(','):
order_split = order_part.split()
order_field = order_split[0]
if order_field == groupby:
if groupby_type == 'many2one':
order_clause = self._generate_order_by(order_part, query).replace('ORDER BY ', '')
if order_clause:
orderby_terms.append(order_clause)
groupby_terms += [order_term.split()[0] for order_term in order_clause.split(',')]
else:
orderby_terms.append(order_part)
elif order_field in aggregated_fields:
orderby_terms.append(order_part)
else:
# Cannot order by a field that will not appear in the results (needs to be grouped or aggregated)
_logger.warn('%s: read_group order by `%s` ignored, cannot sort on empty columns (not grouped/aggregated)',
self._name, order_part)
return groupby_terms, orderby_terms
def read_group(self, cr, uid, domain, fields, groupby, offset=0, limit=None, context=None, orderby=False):
"""
@ -2695,9 +2701,9 @@ class BaseModel(object):
# TODO it seems fields_get can be replaced by _all_columns (no need for translation)
fget = self.fields_get(cr, uid, fields)
flist = ''
group_count = group_by = groupby
group_by_params = {}
select_terms = []
groupby_type = None
if groupby:
if fget.get(groupby):
groupby_type = fget[groupby]['type']
@ -2717,12 +2723,9 @@ class BaseModel(object):
'interval': interval,
}
qualified_groupby_field = "to_char(%s,%%s)" % qualified_groupby_field
flist = "%s as %s " % (qualified_groupby_field, groupby)
elif groupby_type == 'boolean':
qualified_groupby_field = "coalesce(%s,false)" % qualified_groupby_field
flist = "%s as %s " % (qualified_groupby_field, groupby)
else:
flist = qualified_groupby_field
select_terms.append("%s as %s " % (qualified_groupby_field, groupby))
else:
# Don't allow arbitrary values, as this would be a SQL injection vector!
raise except_orm(_('Invalid group_by'),
@ -2730,36 +2733,50 @@ class BaseModel(object):
aggregated_fields = [
f for f in fields
if f not in ('id', 'sequence')
if f not in ('id', 'sequence', groupby)
if fget[f]['type'] in ('integer', 'float')
if (f in self._all_columns and getattr(self._all_columns[f].column, '_classic_write'))]
for f in aggregated_fields:
group_operator = fget[f].get('group_operator', 'sum')
if flist:
flist += ', '
qualified_field = self._inherits_join_calc(f, query)
flist += "%s(%s) AS %s" % (group_operator, qualified_field, f)
select_terms.append("%s(%s) AS %s" % (group_operator, qualified_field, f))
order = orderby or groupby
orderby_clause = ''
ob = ''
if order:
orderby_clause, ob = self._read_group_generate_order_by(order, aggregated_fields, groupby, query)
gb = groupby and (' GROUP BY ' + qualified_groupby_field) or ''
order = orderby or groupby or ''
groupby_terms, orderby_terms = self._read_group_prepare(order, aggregated_fields, groupby, qualified_groupby_field, query, groupby_type)
from_clause, where_clause, where_clause_params = query.get_sql()
if group_by_params and group_by_params.get('groupby_format'):
where_clause_params = [group_by_params['groupby_format']] + where_clause_params + [group_by_params['groupby_format']]
where_clause = where_clause and ' WHERE ' + where_clause
limit_str = limit and ' limit %d' % limit or ''
offset_str = offset and ' offset %d' % offset or ''
if len(groupby_list) < 2 and context.get('group_by_no_leaf'):
group_count = '_'
cr.execute('SELECT min(%s.id) AS id, count(%s.id) AS %s_count' % (self._table, self._table, group_count) + (flist and ',') + flist + ' FROM ' + from_clause + where_clause + gb + (ob and ',') + ob + orderby_clause + limit_str + offset_str, where_clause_params)
alldata = {}
groupby = group_by
count_field = '_'
else:
count_field = groupby
prefix_terms = lambda prefix, terms: (prefix + " " + ",".join(terms)) if terms else ''
prefix_term = lambda prefix, term: ('%s %s' % (prefix, term)) if term else ''
query = """
SELECT min(%(table)s.id) AS id, count(%(table)s.id) AS %(count_field)s_count
%(extra_fields)s
FROM %(from)s
%(where)s
%(groupby)s
%(orderby)s
%(limit)s
%(offset)s
""" % {
'table': self._table,
'count_field': count_field,
'extra_fields': prefix_terms(',', select_terms),
'from': from_clause,
'where': prefix_term('WHERE', where_clause),
'groupby': prefix_terms('GROUP BY', groupby_terms),
'orderby': prefix_terms('ORDER BY', orderby_terms),
'limit': prefix_term('LIMIT', int(limit) if limit else None),
'offset': prefix_term('OFFSET', int(offset) if limit else None),
}
cr.execute(query, where_clause_params)
alldata = {}
fetched_data = cr.dictfetchall()
data_ids = []
@ -2770,8 +2787,6 @@ class BaseModel(object):
data_ids.append(r['id'])
del r['id']
if groupby:
data = self.read(cr, uid, data_ids, [groupby], context=context)
# restore order of the search as read() uses the default _order (this is only for groups, so the footprint of data should be small):

View File

@ -309,8 +309,9 @@ class test_m2o(CreatorCase):
def test_external_id(self):
integer_id = self.registry('export.integer').create(
self.cr, openerp.SUPERUSER_ID, {'value': 42})
# __export__.$class.$id
external_id = u'__export__.export_many2one_%d' % integer_id
# Expecting the m2o target model name in the external id,
# not this model's name
external_id = u'__export__.export_integer_%d' % integer_id
self.assertEqual(
self.export(integer_id, fields=['value/id']),
[[external_id]])