[MERGE] trunk-bug-related-field-rco (fix and simplify related fields, to handle field chains of length 1)

bzr revid: rco@openerp.com-20121105141051-1333e5jdrfxulalj
This commit is contained in:
Raphael Collet 2012-11-05 15:10:51 +01:00
commit 0db0876322
3 changed files with 153 additions and 101 deletions

View File

@ -1152,96 +1152,51 @@ class related(function):
"""
def _fnct_search(self, tobj, cr, uid, obj=None, name=None, domain=None, context=None):
self._field_get2(cr, uid, obj, context)
i = len(self._arg)-1
sarg = name
while i>0:
if type(sarg) in [type([]), type( (1,) )]:
where = [(self._arg[i], 'in', sarg)]
else:
where = [(self._arg[i], '=', sarg)]
if domain:
where = map(lambda x: (self._arg[i],x[1], x[2]), domain)
domain = []
sarg = obj.pool.get(self._relations[i]['object']).search(cr, uid, where, context=context)
i -= 1
return [(self._arg[0], 'in', sarg)]
# assume self._arg = ('foo', 'bar', 'baz')
# domain = [(name, op, val)] => search [('foo.bar.baz', op, val)]
field = '.'.join(self._arg)
return map(lambda x: (field, x[1], x[2]), domain)
def _fnct_write(self,obj,cr, uid, ids, field_name, values, args, context=None):
self._field_get2(cr, uid, obj, context=context)
if type(ids) != type([]):
ids=[ids]
objlst = obj.browse(cr, uid, ids)
for data in objlst:
t_id = data.id
t_data = data
for i in range(len(self.arg)):
if not t_data: break
field_detail = self._relations[i]
if not t_data[self.arg[i]]:
if self._type not in ('one2many', 'many2many'):
t_id = t_data['id']
t_data = False
elif field_detail['type'] in ('one2many', 'many2many'):
if self._type != "many2one":
t_id = t_data.id
t_data = t_data[self.arg[i]][0]
else:
t_data = False
else:
t_id = t_data['id']
t_data = t_data[self.arg[i]]
else:
model = obj.pool.get(self._relations[-1]['object'])
model.write(cr, uid, [t_id], {args[-1]: values}, context=context)
if isinstance(ids, (int, long)):
ids = [ids]
for record in obj.browse(cr, uid, ids, context=context):
# traverse all fields except the last one
for field in self.arg[:-1]:
record = record[field] or False
if not record:
break
elif isinstance(record, list):
# record is the result of a one2many or many2many field
record = record[0]
if record:
# write on the last field
record.write({self.arg[-1]: values})
def _fnct_read(self, obj, cr, uid, ids, field_name, args, context=None):
self._field_get2(cr, uid, obj, context)
if not ids: return {}
relation = obj._name
if self._type in ('one2many', 'many2many'):
res = dict([(i, []) for i in ids])
else:
res = {}.fromkeys(ids, False)
objlst = obj.browse(cr, SUPERUSER_ID, ids, context=context)
for data in objlst:
if not data:
continue
t_data = data
relation = obj._name
for i in range(len(self.arg)):
field_detail = self._relations[i]
relation = field_detail['object']
try:
if not t_data[self.arg[i]]:
t_data = False
break
except:
t_data = False
res = {}
for record in obj.browse(cr, SUPERUSER_ID, ids, context=context):
value = record
for field in self.arg:
if isinstance(value, list):
value = value[0]
value = value[field] or False
if not value:
break
if field_detail['type'] in ('one2many', 'many2many') and i != len(self.arg) - 1:
t_data = t_data[self.arg[i]][0]
elif t_data:
t_data = t_data[self.arg[i]]
if type(t_data) == type(objlst[0]):
res[data.id] = t_data.id
elif t_data:
res[data.id] = t_data
if self._type=='many2one':
ids = filter(None, res.values())
if ids:
# name_get as root, as seeing the name of a related
# object depends on access right of source document,
# not target, so user may not have access.
ng = dict(obj.pool.get(self._obj).name_get(cr, SUPERUSER_ID, ids, context=context))
for r in res:
if res[r]:
res[r] = (res[r], ng[res[r]])
res[record.id] = value
if self._type == 'many2one':
# res[id] is a browse_record or False; convert it to (id, name) or False.
# Perform name_get as root, as seeing the name of a related object depends on
# access right of source document, not target, so user may not have access.
value_ids = list(set(value.id for value in res.itervalues() if value))
value_name = dict(obj.pool.get(self._obj).name_get(cr, SUPERUSER_ID, value_ids, context=context))
res = dict((id, value and (value.id, value_name[value.id])) for id, value in res.iteritems())
elif self._type in ('one2many', 'many2many'):
for r in res:
if res[r]:
res[r] = [x.id for x in res[r]]
# res[id] is a list of browse_record or False; convert it to a list of ids
res = dict((id, value and map(int, value) or []) for id, value in res.iteritems())
return res
def __init__(self, *arg, **args):
@ -1252,22 +1207,6 @@ class related(function):
# TODO: improve here to change self.store = {...} according to related objects
pass
def _field_get2(self, cr, uid, obj, context=None):
if self._relations:
return
result = []
obj_name = obj._name
for i in range(len(self._arg)):
f = obj.pool.get(obj_name).fields_get(cr, uid, [self._arg[i]], context=context)[self._arg[i]]
result.append({
'object': obj_name,
'type': f['type']
})
if f.get('relation',False):
obj_name = f['relation']
result[-1]['relation'] = f['relation']
self._relations = result
class sparse(function):

View File

@ -9,7 +9,7 @@ See the :ref:`test-framework` section in the :ref:`features` list.
"""
from . import test_expression, test_html_sanitize, test_ir_sequence, test_orm,\
test_basecase, \
test_fields, test_basecase, \
test_view_validation, test_uninstall, test_misc, test_db_cursor
fast_suite = [
@ -21,6 +21,7 @@ checks = [
test_html_sanitize,
test_db_cursor,
test_orm,
test_fields,
test_basecase,
test_view_validation,
test_misc,

View File

@ -0,0 +1,112 @@
#
# test cases for fields access, etc.
#
import unittest2
import common
import openerp
from openerp.osv import fields
class TestRelatedField(common.TransactionCase):
def setUp(self):
super(TestRelatedField, self).setUp()
self.partner = self.registry('res.partner')
self.company = self.registry('res.company')
def test_0_related(self):
""" test an usual related field """
# add a related field test_related_company_id on res.partner
old_columns = self.partner._columns
self.partner._columns = dict(old_columns)
self.partner._columns.update({
'related_company_partner_id': fields.related('company_id', 'partner_id', type='many2one', obj='res.partner'),
})
# find a company with a non-null partner_id
ids = self.company.search(self.cr, self.uid, [('partner_id', '!=', False)], limit=1)
id = ids[0]
# find partners that satisfy [('partner_id.company_id', '=', id)]
company_ids = self.company.search(self.cr, self.uid, [('partner_id', '=', id)])
partner_ids1 = self.partner.search(self.cr, self.uid, [('company_id', 'in', company_ids)])
partner_ids2 = self.partner.search(self.cr, self.uid, [('related_company_partner_id', '=', id)])
self.assertEqual(partner_ids1, partner_ids2)
# restore res.partner fields
self.partner._columns = old_columns
def do_test_company_field(self, field):
# get a partner with a non-null company_id
ids = self.partner.search(self.cr, self.uid, [('company_id', '!=', False)], limit=1)
partner = self.partner.browse(self.cr, self.uid, ids[0])
# check reading related field
self.assertEqual(partner[field], partner.company_id)
# check that search on related field is equivalent to original field
ids1 = self.partner.search(self.cr, self.uid, [('company_id', '=', partner.company_id.id)])
ids2 = self.partner.search(self.cr, self.uid, [(field, '=', partner.company_id.id)])
self.assertEqual(ids1, ids2)
def test_1_single_related(self):
""" test a related field with a single indirection like fields.related('foo') """
# add a related field test_related_company_id on res.partner
old_columns = self.partner._columns
self.partner._columns = dict(old_columns)
self.partner._columns.update({
'single_related_company_id': fields.related('company_id', type='many2one', obj='res.company'),
})
self.do_test_company_field('single_related_company_id')
# restore res.partner fields
self.partner._columns = old_columns
def test_2_related_related(self):
""" test a related field referring to a related field """
# add a related field on a related field on res.partner
old_columns = self.partner._columns
self.partner._columns = dict(old_columns)
self.partner._columns.update({
'single_related_company_id': fields.related('company_id', type='many2one', obj='res.company'),
'related_related_company_id': fields.related('single_related_company_id', type='many2one', obj='res.company'),
})
self.do_test_company_field('related_related_company_id')
# restore res.partner fields
self.partner._columns = old_columns
def test_3_read_write(self):
""" write on a related field """
# add a related field test_related_company_id on res.partner
old_columns = self.partner._columns
self.partner._columns = dict(old_columns)
self.partner._columns.update({
'related_company_partner_id': fields.related('company_id', 'partner_id', type='many2one', obj='res.partner'),
})
# find a company with a non-null partner_id
company_ids = self.company.search(self.cr, self.uid, [('partner_id', '!=', False)], limit=1)
company = self.company.browse(self.cr, self.uid, company_ids[0])
# find partners that satisfy [('partner_id.company_id', '=', company.id)]
partner_ids = self.partner.search(self.cr, self.uid, [('related_company_partner_id', '=', company.id)])
partner = self.partner.browse(self.cr, self.uid, partner_ids[0])
# create a new partner, and assign it to company
new_partner_id = self.partner.create(self.cr, self.uid, {'name': 'Foo'})
partner.write({'related_company_partner_id': new_partner_id})
company = self.company.browse(self.cr, self.uid, company_ids[0])
self.assertEqual(company.partner_id.id, new_partner_id)
partner = self.partner.browse(self.cr, self.uid, partner_ids[0])
self.assertEqual(partner.related_company_partner_id.id, new_partner_id)
# restore res.partner fields
self.partner._columns = old_columns
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: