[IMP] reimplement BaseModel.import_data on top of BaseModel.load

adapt tests to assert the corrected behavior of load instead of import_data's broken behavior

bzr revid: xmo@openerp.com-20120927154244-e56ygz2yytullg2l
This commit is contained in:
Xavier Morel 2012-09-27 17:42:44 +02:00
parent 87af7a6365
commit def7e61901
3 changed files with 98 additions and 277 deletions

View File

@ -1254,189 +1254,43 @@ class BaseModel(object):
:returns: 4-tuple in the form (return_code, errored_resource, error_message, unused)
:rtype: (int, dict or 0, str or 0, str or 0)
"""
if not context:
context = {}
context = dict(context) if context is not None else {}
context['_import_current_module'] = current_module
fields = map(fix_import_export_id_paths, fields)
ir_model_data_obj = self.pool.get('ir.model.data')
# mode: id (XML id) or .id (database id) or False for name_get
def _get_id(model_name, id, current_module=False, mode='id'):
if mode=='.id':
id = int(id)
obj_model = self.pool.get(model_name)
ids = obj_model.search(cr, uid, [('id', '=', int(id))])
if not len(ids):
raise Exception(_("Database ID doesn't exist: %s : %s") %(model_name, id))
elif mode=='id':
if '.' in id:
module, xml_id = id.rsplit('.', 1)
else:
module, xml_id = current_module, id
record_id = ir_model_data_obj._get_id(cr, uid, module, xml_id)
ir_model_data = ir_model_data_obj.read(cr, uid, [record_id], ['res_id'])
if not ir_model_data:
raise ValueError('No references to %s.%s' % (module, xml_id))
id = ir_model_data[0]['res_id']
else:
obj_model = self.pool.get(model_name)
ids = obj_model.name_search(cr, uid, id, operator='=', context=context)
if not ids:
raise ValueError('No record found for %s' % (id,))
id = ids[0][0]
return id
def log(m):
if m['type'] == 'error':
raise Exception(m['message'])
# IN:
# datas: a list of records, each record is defined by a list of values
# prefix: a list of prefix fields ['line_ids']
# position: the line to process, skip is False if it's the first line of the current record
# OUT:
# (res, position, warning, res_id) with
# res: the record for the next line to process (including it's one2many)
# position: the new position for the next line
# res_id: the ID of the record if it's a modification
def process_liness(self, datas, prefix, current_module, model_name, fields_def, position=0, skip=0):
line = datas[position]
row = {}
warning = []
data_res_id = False
xml_id = False
nbrmax = position+1
done = {}
for i, field in enumerate(fields):
res = False
if i >= len(line):
raise Exception(_('Please check that all your lines have %d columns.'
'Stopped around line %d having %d columns.') % \
(len(fields), position+2, len(line)))
if not line[i]:
continue
if field[:len(prefix)] <> prefix:
if line[i] and skip:
return False
continue
field_name = field[len(prefix)]
#set the mode for m2o, o2m, m2m : xml_id/id/name
if len(field) == len(prefix)+1:
mode = False
else:
mode = field[len(prefix)+1]
# TODO: improve this by using csv.csv_reader
def many_ids(line, relation, current_module, mode):
res = []
for db_id in line.split(config.get('csv_internal_sep')):
res.append(_get_id(relation, db_id, current_module, mode))
return [(6,0,res)]
# ID of the record using a XML ID
if field_name == 'id':
try:
data_res_id = _get_id(model_name, line[i], current_module)
except ValueError:
pass
xml_id = line[i]
continue
# ID of the record using a database ID
elif field_name == '.id':
data_res_id = _get_id(model_name, line[i], current_module, '.id')
continue
field_type = fields_def[field_name]['type']
# recursive call for getting children and returning [(0,0,{})] or [(1,ID,{})]
if field_type == 'one2many':
if field_name in done:
continue
done[field_name] = True
relation = fields_def[field_name]['relation']
relation_obj = self.pool.get(relation)
newfd = relation_obj.fields_get( cr, uid, context=context )
pos = position
res = []
first = 0
while pos < len(datas):
res2 = process_liness(self, datas, prefix + [field_name], current_module, relation_obj._name, newfd, pos, first)
if not res2:
break
(newrow, pos, w2, data_res_id2, xml_id2) = res2
nbrmax = max(nbrmax, pos)
warning += w2
first += 1
if (not newrow) or not reduce(lambda x, y: x or y, newrow.values(), 0):
break
res.append( (data_res_id2 and 1 or 0, data_res_id2 or 0, newrow) )
elif field_type == 'many2one':
relation = fields_def[field_name]['relation']
res = _get_id(relation, line[i], current_module, mode)
elif field_type == 'many2many':
relation = fields_def[field_name]['relation']
res = many_ids(line[i], relation, current_module, mode)
elif field_type == 'integer':
res = line[i] and int(line[i]) or 0
elif field_type == 'boolean':
res = line[i].lower() not in ('0', 'false', 'off')
elif field_type == 'float':
res = line[i] and float(line[i]) or 0.0
elif field_type == 'selection':
for key, val in fields_def[field_name]['selection']:
if tools.ustr(line[i]) in [tools.ustr(key), tools.ustr(val)]:
res = key
break
if line[i] and not res:
_logger.warning(
_("key '%s' not found in selection field '%s'"),
tools.ustr(line[i]), tools.ustr(field_name))
warning.append(_("Key/value '%s' not found in selection field '%s'") % (
tools.ustr(line[i]), tools.ustr(field_name)))
else:
res = line[i]
row[field_name] = res or False
return row, nbrmax, warning, data_res_id, xml_id
fields_def = self.fields_get(cr, uid, context=context)
position = 0
if config.get('import_partial') and filename:
with open(config.get('import_partial'), 'rb') as partial_import_file:
data = pickle.load(partial_import_file)
position = data.get(filename, 0)
while position<len(datas):
(res, position, warning, res_id, xml_id) = \
process_liness(self, datas, [], current_module, self._name, fields_def, position=position)
if len(warning):
cr.rollback()
return -1, res, 'Line ' + str(position) +' : ' + '!\n'.join(warning), ''
try:
position = 0
try:
for res_id, xml_id, res, info in self._convert_records(cr, uid,
self._extract_records(cr, uid, fields, datas,
context=context, log=log),
context=context, log=log):
ir_model_data_obj._update(cr, uid, self._name,
current_module, res, mode=mode, xml_id=xml_id,
noupdate=noupdate, res_id=res_id, context=context)
except Exception, e:
return -1, res, 'Line ' + str(position) + ' : ' + tools.ustr(e), ''
if config.get('import_partial') and filename and (not (position%100)):
with open(config.get('import_partial'), 'rb') as partial_import:
data = pickle.load(partial_import)
data[filename] = position
with open(config.get('import_partial'), 'wb') as partial_import:
pickle.dump(data, partial_import)
if context.get('defer_parent_store_computation'):
self._parent_store_compute(cr)
cr.commit()
position = info.get('rows', {}).get('to', 0) + 1
if config.get('import_partial') and filename and (not (position%100)):
with open(config.get('import_partial'), 'rb') as partial_import:
data = pickle.load(partial_import)
data[filename] = position
with open(config.get('import_partial'), 'wb') as partial_import:
pickle.dump(data, partial_import)
if context.get('defer_parent_store_computation'):
self._parent_store_compute(cr)
cr.commit()
except Exception, e:
cr.rollback()
return -1, {}, 'Line %d : %s' % (position + 1, tools.ustr(e)), ''
if context.get('defer_parent_store_computation'):
self._parent_store_compute(cr)

View File

@ -80,9 +80,9 @@ class test_ids_stuff(ImporterCase):
model_name = 'export.integer'
def test_create_with_id(self):
self.assertRaises(
Exception, # dammit
self.import_, ['.id', 'value'], [['42', '36']])
self.assertEqual(
self.import_(['.id', 'value'], [['42', '36']]),
error(1, u"Unknown database identifier '42'"))
def test_create_with_xid(self):
self.assertEqual(
self.import_(['id', 'value'], [['somexmlid', '42']]),
@ -136,27 +136,25 @@ class test_boolean_field(ImporterCase):
self.assertEqual(
self.import_(['value'], [
[u'0'],
[u'off'],
[u'no'],
[u'false'],
[u'FALSE'],
[u'OFF'],
[u''],
]),
ok(6))
ok(5))
self.assertEqual([
False,
False,
False,
False,
False,
False,
],
values(self.read()))
def test_trues(self):
self.assertEqual(
self.import_(['value'], [
['no'],
['off'],
['None'],
['nil'],
['()'],
@ -164,10 +162,11 @@ class test_boolean_field(ImporterCase):
['#f'],
# Problem: OpenOffice (and probably excel) output localized booleans
['VRAI'],
[u'OFF'],
]),
ok(7))
ok(8))
self.assertEqual(
[True] * 7,
[True] * 8,
values(self.read()))
class test_integer_field(ImporterCase):
@ -226,21 +225,20 @@ class test_integer_field(ImporterCase):
def test_out_of_range(self):
self.assertEqual(
self.import_(['value'], [[str(2**31)]]),
error(1, "integer out of range\n", value=2**31))
error(1, "integer out of range\n"))
# auto-rollbacks if error is in process_liness, but not during
# ir.model.data write. Can differentiate because former ends lines
# error lines with "!"
self.cr.rollback()
self.assertEqual(
self.import_(['value'], [[str(-2**32)]]),
error(1, "integer out of range\n", value=-2**32))
error(1, "integer out of range\n"))
def test_nonsense(self):
# FIXME: shit error reporting, exceptions half the time, messages the other half
self.assertRaises(
ValueError,
self.import_, ['value'], [['zorglub']])
self.assertEqual(
self.import_(['value'], [['zorglub']]),
error(1, u"'zorglub' does not seem to be an integer for field 'unknown'"))
class test_float_field(ImporterCase):
model_name = 'export.float'
@ -298,9 +296,9 @@ class test_float_field(ImporterCase):
], values(self.read()))
def test_nonsense(self):
self.assertRaises(
ValueError,
self.import_, ['value'], [['foobar']])
self.assertEqual(
self.import_(['value'], [['foobar']]),
error(1, u"'foobar' does not seem to be a number for field 'unknown'"))
class test_string_field(ImporterCase):
model_name = 'export.string.bounded'
@ -405,8 +403,6 @@ class test_selection(ImporterCase):
'value': value
})
# FIXME: can't import an exported selection field label if lang != en_US
# (see test_export.test_selection.test_localized_export)
self.assertEqual(
self.import_(['value'], [
['toto'],
@ -417,27 +413,23 @@ class test_selection(ImporterCase):
self.assertEqual([3, 1, 2], values(self.read()))
self.assertEqual(
self.import_(['value'], [['Foo']], context={'lang': 'fr_FR'}),
error(1, "Key/value 'Foo' not found in selection field 'value'",
value=False))
ok(1))
def test_invalid(self):
self.assertEqual(
self.import_(['value'], [['Baz']]),
error(1, "Key/value 'Baz' not found in selection field 'value'",
# what the fuck?
value=False))
error(1, u"Value 'Baz' not found in selection field 'unknown'"))
self.cr.rollback()
self.assertEqual(
self.import_(['value'], [[42]]),
error(1, "Key/value '42' not found in selection field 'value'",
value=False))
error(1, u"Value '42' not found in selection field 'unknown'"))
class test_selection_function(ImporterCase):
model_name = 'export.selection.function'
translations_fr = [
("Corge", "toto"),
("Grault", "titi"),
("Whee", "tete"),
("Wheee", "tete"),
("Moog", "tutu"),
]
@ -482,8 +474,7 @@ class test_selection_function(ImporterCase):
['toto'],
['tete'],
], context={'lang': 'fr_FR'}),
error(1, "Key/value 'toto' not found in selection field 'value'",
value=False))
ok(2))
self.assertEqual(
self.import_(['value'], [['Wheee']], context={'lang': 'fr_FR'}),
ok(1))
@ -555,7 +546,6 @@ class test_m2o(ImporterCase):
self.assertEqual(
self.import_(['value'], [[name2]]),
ok(1))
# FIXME: is it really normal import does not care for name_search collisions?
self.assertEqual([
(integer_id1, name1)
], values(self.read()))
@ -569,35 +559,35 @@ class test_m2o(ImporterCase):
integer_id2 = self.registry('export.integer').create(
self.cr, openerp.SUPERUSER_ID, {'value': 36})
self.assertRaises(
ValueError, # Because name_search all the things. Fallback schmallback
self.import_, ['value'], [
self.assertEqual(
self.import_(['value'], [
# import by id, without specifying it
[integer_id1],
[integer_id2],
[integer_id1],
])
]),
error(1, u"No matching record found for name '%s' in field 'unknown'" % integer_id1))
def test_sub_field(self):
""" Does not implicitly create the record, does not warn that you can't
import m2o subfields (at all)...
"""
self.assertRaises(
ValueError, # No record found for 42, name_searches the bloody thing
self.import_, ['value/value'], [['42']])
self.assertEqual(
self.import_(['value/value'], [['42']]),
error(1, u"Can not create Many-To-One records indirectly, import the field separately"))
def test_fail_noids(self):
self.assertRaises(
ValueError,
self.import_, ['value'], [['nameisnoexist:3']])
self.assertEqual(
self.import_(['value'], [['nameisnoexist:3']]),
error(1, u"No matching record found for name 'nameisnoexist:3' in field 'unknown'"))
self.cr.rollback()
self.assertRaises(
ValueError,
self.import_, ['value/id'], [['noxidhere']]),
self.assertEqual(
self.import_(['value/id'], [['noxidhere']]),
error(1, u"No matching record found for external id 'noxidhere' in field 'unknown'"))
self.cr.rollback()
self.assertRaises(
Exception, # FIXME: Why can't you be a ValueError like everybody else?
self.import_, ['value/.id'], [[66]])
self.assertEqual(
self.import_(['value/.id'], [[66]]),
error(1, u"No matching record found for database id '66' in field 'unknown'"))
class test_m2m(ImporterCase):
model_name = 'export.many2many'
@ -635,12 +625,9 @@ class test_m2m(ImporterCase):
self.assertEqual(values(b[2].value), [3, 44, 84])
def test_noids(self):
try:
self.import_(['value/.id'], [['42']])
self.fail("Should have raised an exception")
except Exception, e:
self.assertIs(type(e), Exception,
"test should be fixed on exception subclass")
self.assertEqual(
self.import_(['value/.id'], [['42']]),
error(1, u"No matching record found for database id '42' in field 'unknown'"))
def test_xids(self):
M2O_o = self.registry('export.many2many.other')
@ -662,9 +649,9 @@ class test_m2m(ImporterCase):
self.assertEqual(values(b[0].value), [3, 44])
self.assertEqual(values(b[2].value), [44, 84])
def test_noxids(self):
self.assertRaises(
ValueError,
self.import_, ['value/id'], [['noxidforthat']])
self.assertEqual(
self.import_(['value/id'], [['noxidforthat']]),
error(1, u"No matching record found for external id 'noxidforthat' in field 'unknown'"))
def test_names(self):
M2O_o = self.registry('export.many2many.other')
@ -689,9 +676,9 @@ class test_m2m(ImporterCase):
self.assertEqual(values(b[2].value), [3, 9])
def test_nonames(self):
self.assertRaises(
ValueError,
self.import_, ['value'], [['wherethem2mhavenonames']])
self.assertEqual(
self.import_(['value'], [['wherethem2mhavenonames']]),
error(1, u"No matching record found for name 'wherethem2mhavenonames' in field 'unknown'"))
def test_import_to_existing(self):
M2O_o = self.registry('export.many2many.other')
@ -717,13 +704,13 @@ class test_o2m(ImporterCase):
model_name = 'export.one2many'
def test_name_get(self):
# FIXME: bloody hell why can't this just name_create the record?
self.assertRaises(
IndexError,
self.import_,
['const', 'value'],
[['5', u'Java is a DSL for taking large XML files'
u' and converting them to stack traces']])
s = u'Java is a DSL for taking large XML files and converting them to' \
u' stack traces'
self.assertEqual(
self.import_(
['const', 'value'],
[['5', s]]),
error(1, u"No matching record found for name '%s' in field 'unknown'" % s))
def test_single(self):
self.assertEqual(
@ -813,14 +800,11 @@ class test_o2m(ImporterCase):
]),
ok(2))
# No record values alongside id => o2m resolution skipped altogether,
# creates 2 records => remove/don't import columns sideshow columns,
# get completely different semantics
b, b1 = self.browse()
[b] = self.browse()
self.assertEqual(b.const, 42)
self.assertEqual(values(b.value), [])
self.assertEqual(b1.const, 4)
self.assertEqual(values(b1.value), [])
# automatically forces link between core record and o2ms
self.assertEqual(values(b.value), [109, 262])
self.assertEqual(values(b.value, field='parent_id'), [b, b])
def test_link_2(self):
O2M_c = self.registry('export.one2many.child')
@ -838,21 +822,10 @@ class test_o2m(ImporterCase):
]),
ok(2))
(b,) = self.browse()
# if an id (db or xid) is provided, expectations that objects are
# *already* linked and emits UPDATE (1, id, {}).
# Noid => CREATE (0, ?, {})
# TODO: xid ignored aside from getting corresponding db id?
[b] = self.browse()
self.assertEqual(b.const, 42)
self.assertEqual(values(b.value), [])
# FIXME: updates somebody else's records?
self.assertEqual(
O2M_c.read(self.cr, openerp.SUPERUSER_ID, id1),
{'id': id1, 'str': 'Bf', 'value': 1, 'parent_id': False})
self.assertEqual(
O2M_c.read(self.cr, openerp.SUPERUSER_ID, id2),
{'id': id2, 'str': 'Me', 'value': 2, 'parent_id': False})
self.assertEqual(values(b.value), [1, 2])
self.assertEqual(values(b.value, field='parent_id'), [b, b])
class test_o2m_multiple(ImporterCase):
model_name = 'export.one2many.multiple'
@ -866,16 +839,10 @@ class test_o2m_multiple(ImporterCase):
['', '14', ''],
]),
ok(4))
# Oh yeah, that's the stuff
(b, b1, b2) = self.browse()
self.assertEqual(values(b.child1), [11])
self.assertEqual(values(b.child2), [21])
self.assertEqual(values(b1.child1), [12])
self.assertEqual(values(b1.child2), [22])
self.assertEqual(values(b2.child1), [13, 14])
self.assertEqual(values(b2.child2), [23])
[b] = self.browse()
self.assertEqual(values(b.child1), [11, 12, 13, 14])
self.assertEqual(values(b.child2), [21, 22, 23])
def test_multi(self):
self.assertEqual(
@ -888,11 +855,10 @@ class test_o2m_multiple(ImporterCase):
['', '', '23'],
]),
ok(6))
# What the actual fuck?
(b, b1) = self.browse()
[b] = self.browse()
self.assertEqual(values(b.child1), [11, 12, 13, 14])
self.assertEqual(values(b.child2), [21])
self.assertEqual(values(b1.child2), [22, 23])
self.assertEqual(values(b.child2), [21, 22, 23])
def test_multi_fullsplit(self):
self.assertEqual(
@ -906,12 +872,11 @@ class test_o2m_multiple(ImporterCase):
['', '', '23'],
]),
ok(7))
# oh wow
(b, b1) = self.browse()
[b] = self.browse()
self.assertEqual(b.const, 5)
self.assertEqual(values(b.child1), [11, 12, 13, 14])
self.assertEqual(b1.const, 36)
self.assertEqual(values(b1.child2), [21, 22, 23])
self.assertEqual(values(b.child2), [21, 22, 23])
# function, related, reference: written to db as-is...
# => function uses @type for value coercion/conversion

View File

@ -394,12 +394,14 @@ class test_unbound_string_field(ImporterCase):
class test_required_string_field(ImporterCase):
model_name = 'export.string.required'
@mute_logger('openerp.sql_db')
def test_empty(self):
result = self.import_(['value'], [[]])
self.assertEqual(result['messages'], [message(
u"Missing required value for the field 'unknown'")])
self.assertIs(result['ids'], False)
@mute_logger('openerp.sql_db')
def test_not_provided(self):
result = self.import_(['const'], [['12']])
self.assertEqual(result['messages'], [message(
@ -920,7 +922,7 @@ class test_o2m_multiple(ImporterCase):
])
self.assertFalse(result['messages'])
self.assertEqual(len(result['ids']), 1)
# Oh yeah, that's the stuff
[b] = self.browse()
self.assertEqual(values(b.child1), [11, 12, 13, 14])
self.assertEqual(values(b.child2), [21, 22, 23])