[MERGE] new ORM method for converting o2m commands sequences to record dicts, useful for addons (by Xavier Morel)

bzr revid: odo@openerp.com-20111011144927-mch6uy11z347fcku
This commit is contained in:
Olivier Dony 2011-10-11 16:49:27 +02:00
commit 7f34a19db6
3 changed files with 239 additions and 9 deletions

View File

@ -108,14 +108,8 @@ class res_partner_bank(osv.osv):
if not context.get('address'): if not context.get('address'):
return value return value
for _, id, address_data in context['address']: for address in self.pool.get('res.partner').resolve_o2m_commands_to_record_dicts(
if not (id or address): continue cursor, user, 'address', context['address'], ['type', field], context=context):
address = {}
if id:
address.update(self.pool['res.partner.address']
.read(cursor, user, [id], ['type', field], context=context)[0])
if address_data:
address.update(address_data)
if address.get('type') == 'default': if address.get('type') == 'default':
return address.get(field, value) return address.get(field, value)

View File

@ -4762,6 +4762,67 @@ class BaseModel(object):
return True return True
def resolve_o2m_commands_to_record_dicts(self, cr, uid, field_name, o2m_commands, fields=None, context=None):
""" Serializes o2m commands into record dictionaries (as if
all the o2m records came from the database via a read()), and
returns an iterable over these dictionaries.
Because o2m commands might be creation commands, not all
record ids will contain an ``id`` field. Commands matching an
existing record (``UPDATE`` and ``LINK_TO``) will have an id.
.. note:: ``CREATE``, ``UPDATE`` and ``LINK_TO`` stand for the
o2m command codes ``0``, ``1`` and ``4``
respectively
:param field_name: name of the o2m field matching the commands
:type field_name: str
:param o2m_commands: one2many commands to execute on ``field_name``
:type o2m_commands: list((int|False, int|False, dict|False))
:param fields: list of fields to read from the database, when applicable
:type fields: list(str)
:raises AssertionError: if a command is not ``CREATE``, ``UPDATE`` or ``LINK_TO``
:returns: o2m records in a shape similar to that returned by
``read()`` (except records may be missing the ``id``
field if they don't exist in db)
:rtype: ``list(dict)``
"""
o2m_model = self._all_columns[field_name].column._obj
# convert single ids and pairs to tripled commands
commands = []
for o2m_command in o2m_commands:
if not isinstance(o2m_command, (list, tuple)):
command = 4
commands.append((command, o2m_command, False))
elif len(o2m_command) == 1:
(command,) = o2m_command
commands.append((command, False, False))
elif len(o2m_command) == 2:
command, id = o2m_command
commands.append((command, id, False))
else:
command = o2m_command[0]
commands.append(o2m_command)
assert command in (0, 1, 4), \
"Only CREATE, UPDATE and LINK_TO commands are supported in resolver"
# extract records to read, by id, in a mapping dict
ids_to_read = [id for (command, id, _) in commands if command in (1, 4)]
records_by_id = dict(
(record['id'], record)
for record in self.pool.get(o2m_model).read(
cr, uid, ids_to_read, fields=fields, context=context))
record_dicts = []
# merge record from db with record provided by command
for command, id, record in commands:
item = {}
if command in (1, 4): item.update(records_by_id[id])
if command in (0, 1): item.update(record)
record_dicts.append(item)
return record_dicts
# keep this import here, at top it will cause dependency cycle errors # keep this import here, at top it will cause dependency cycle errors
import expression import expression
@ -4782,7 +4843,7 @@ class Model(BaseModel):
class TransientModel(BaseModel): class TransientModel(BaseModel):
"""Model super-class for transient records, meant to be temporarily """Model super-class for transient records, meant to be temporarily
persisted, and regularly vaccuum-cleaned. persisted, and regularly vaccuum-cleaned.
A TransientModel has a simplified access rights management, A TransientModel has a simplified access rights management,
all users can create new records, and may only access the all users can create new records, and may only access the
records they created. The super-user has unrestricted access records they created. The super-user has unrestricted access

175
tests/test_orm.py Normal file
View File

@ -0,0 +1,175 @@
import os
import unittest2
import openerp
UID = 1
DB = os.environ['OPENERP_DATABASE']
CREATE = lambda values: (0, False, values)
UPDATE = lambda id, values: (1, id, values)
DELETE = lambda id: (2, id, False)
FORGET = lambda id: (3, id, False)
LINK_TO = lambda id: (4, id, False)
DELETE_ALL = lambda: (5, False, False)
REPLACE_WITH = lambda ids: (6, False, ids)
def setUpModule():
openerp.tools.config['addons_path'] = os.environ['OPENERP_ADDONS_PATH']
class TestO2MSerialization(unittest2.TestCase):
def setUp(self):
self.cr = openerp.modules.registry.RegistryManager.get(DB).db.cursor()
self.partner = openerp.modules.registry.RegistryManager.get(DB)['res.partner']
self.address = openerp.modules.registry.RegistryManager.get(DB)['res.partner.address']
def tearDown(self):
self.cr.rollback()
self.cr.close()
def test_no_command(self):
" empty list of commands yields an empty list of records "
results = self.partner.resolve_o2m_commands_to_record_dicts(
self.cr, UID, 'address', [])
self.assertEqual(results, [])
def test_CREATE_commands(self):
" returns the VALUES dict as-is "
results = self.partner.resolve_o2m_commands_to_record_dicts(
self.cr, UID, 'address',
map(CREATE, [{'foo': 'bar'}, {'foo': 'baz'}, {'foo': 'baq'}]))
self.assertEqual(results, [
{'foo': 'bar'},
{'foo': 'baz'},
{'foo': 'baq'}
])
def test_LINK_TO_command(self):
" reads the records from the database, records are returned with their ids. "
ids = [
self.address.create(self.cr, UID, {'name': 'foo'}),
self.address.create(self.cr, UID, {'name': 'bar'}),
self.address.create(self.cr, UID, {'name': 'baz'})
]
commands = map(LINK_TO, ids)
results = self.partner.resolve_o2m_commands_to_record_dicts(
self.cr, UID, 'address', commands, ['name'])
self.assertEqual(results, [
{'id': ids[0], 'name': 'foo'},
{'id': ids[1], 'name': 'bar'},
{'id': ids[2], 'name': 'baz'}
])
def test_bare_ids_command(self):
" same as the equivalent LINK_TO commands "
ids = [
self.address.create(self.cr, UID, {'name': 'foo'}),
self.address.create(self.cr, UID, {'name': 'bar'}),
self.address.create(self.cr, UID, {'name': 'baz'})
]
results = self.partner.resolve_o2m_commands_to_record_dicts(
self.cr, UID, 'address', ids, ['name'])
self.assertEqual(results, [
{'id': ids[0], 'name': 'foo'},
{'id': ids[1], 'name': 'bar'},
{'id': ids[2], 'name': 'baz'}
])
def test_UPDATE_command(self):
" take the in-db records and merge the provided information in "
id_foo = self.address.create(self.cr, UID, {'name': 'foo'})
id_bar = self.address.create(self.cr, UID, {'name': 'bar'})
id_baz = self.address.create(self.cr, UID, {'name': 'baz', 'city': 'tag'})
results = self.partner.resolve_o2m_commands_to_record_dicts(
self.cr, UID, 'address', [
LINK_TO(id_foo),
UPDATE(id_bar, {'name': 'qux', 'city': 'tagtag'}),
UPDATE(id_baz, {'name': 'quux'})
], ['name', 'city'])
self.assertEqual(results, [
{'id': id_foo, 'name': 'foo', 'city': False},
{'id': id_bar, 'name': 'qux', 'city': 'tagtag'},
{'id': id_baz, 'name': 'quux', 'city': 'tag'}
])
def test_mixed_commands(self):
ids = [
self.address.create(self.cr, UID, {'name': name})
for name in ['NObar', 'baz', 'qux', 'NOquux', 'NOcorge', 'garply']
]
results = self.partner.resolve_o2m_commands_to_record_dicts(
self.cr, UID, 'address', [
CREATE({'name': 'foo'}),
UPDATE(ids[0], {'name': 'bar'}),
LINK_TO(ids[1]),
LINK_TO(ids[2]),
UPDATE(ids[3], {'name': 'quux',}),
UPDATE(ids[4], {'name': 'corge'}),
CREATE({'name': 'grault'}),
LINK_TO(ids[5])
], ['name'])
self.assertEqual(results, [
{'name': 'foo'},
{'id': ids[0], 'name': 'bar'},
{'id': ids[1], 'name': 'baz'},
{'id': ids[2], 'name': 'qux'},
{'id': ids[3], 'name': 'quux'},
{'id': ids[4], 'name': 'corge'},
{'name': 'grault'},
{'id': ids[5], 'name': 'garply'}
])
def test_LINK_TO_pairs(self):
"LINK_TO commands can be written as pairs, instead of triplets"
ids = [
self.address.create(self.cr, UID, {'name': 'foo'}),
self.address.create(self.cr, UID, {'name': 'bar'}),
self.address.create(self.cr, UID, {'name': 'baz'})
]
commands = map(lambda id: (4, id), ids)
results = self.partner.resolve_o2m_commands_to_record_dicts(
self.cr, UID, 'address', commands, ['name'])
self.assertEqual(results, [
{'id': ids[0], 'name': 'foo'},
{'id': ids[1], 'name': 'bar'},
{'id': ids[2], 'name': 'baz'}
])
def test_singleton_commands(self):
"DELETE_ALL can appear as a singleton"
try:
self.partner.resolve_o2m_commands_to_record_dicts(
self.cr, UID, 'address', [(5,)], ['name'])
except AssertionError:
# 5 should fail with an assert error, but not e.g. a ValueError
pass
def test_invalid_commands(self):
"Commands with uncertain semantics in this context should be forbidden"
with self.assertRaises(AssertionError):
self.partner.resolve_o2m_commands_to_record_dicts(
self.cr, UID, 'address', [DELETE(42)], ['name'])
with self.assertRaises(AssertionError):
self.partner.resolve_o2m_commands_to_record_dicts(
self.cr, UID, 'address', [FORGET(42)], ['name'])
with self.assertRaises(AssertionError):
self.partner.resolve_o2m_commands_to_record_dicts(
self.cr, UID, 'address', [DELETE_ALL()], ['name'])
with self.assertRaises(AssertionError):
self.partner.resolve_o2m_commands_to_record_dicts(
self.cr, UID, 'address', [REPLACE_WITH([42])], ['name'])