[FIX] Allow missing opcodes, harden check for private attributes (dunder), check inside embedded code objects.

bzr revid: jke@openerp.com-20140409151659-xwttchbtbj02v1w7
This commit is contained in:
Kersten Jeremy 2014-04-09 17:16:59 +02:00
parent 1578776814
commit 3285feab53
2 changed files with 66 additions and 23 deletions

View File

@ -172,7 +172,7 @@ class Cursor(object):
self.sql_log_count = 0 self.sql_log_count = 0
self.__closed = True # avoid the call of close() (by __del__) if an exception self.__closed = True # avoid the call of close() (by __del__) if an exception
# is raised by any of the following initialisations # is raised by any of the following initialisations
self._pool = pool self.__pool = pool
self.dbname = dbname self.dbname = dbname
# Whether to enable snapshot isolation level for this cursor. # Whether to enable snapshot isolation level for this cursor.
@ -318,7 +318,7 @@ class Cursor(object):
chosen_template = tools.config['db_template'] chosen_template = tools.config['db_template']
templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template])) templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
keep_in_pool = self.dbname not in templates_list keep_in_pool = self.dbname not in templates_list
self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool) self.__pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
@check @check
def autocommit(self, on): def autocommit(self, on):
@ -476,12 +476,12 @@ class Connection(object):
def __init__(self, pool, dbname): def __init__(self, pool, dbname):
self.dbname = dbname self.dbname = dbname
self._pool = pool self.__pool = pool
def cursor(self, serialized=True): def cursor(self, serialized=True):
cursor_type = serialized and 'serialized ' or '' cursor_type = serialized and 'serialized ' or ''
_logger.debug('create %scursor to %r', cursor_type, self.dbname) _logger.debug('create %scursor to %r', cursor_type, self.dbname)
return Cursor(self._pool, self.dbname, serialized=serialized) return Cursor(self.__pool, self.dbname, serialized=serialized)
# serialized_cursor is deprecated - cursors are serialized by default # serialized_cursor is deprecated - cursors are serialized by default
serialized_cursor = cursor serialized_cursor = cursor

View File

@ -66,7 +66,8 @@ _SAFE_OPCODES = _EXPR_OPCODES.union(set(opmap[x] for x in [
'MAKE_FUNCTION', 'SLICE+0', 'SLICE+1', 'SLICE+2', 'SLICE+3', 'MAKE_FUNCTION', 'SLICE+0', 'SLICE+1', 'SLICE+2', 'SLICE+3',
# New in Python 2.7 - http://bugs.python.org/issue4715 : # New in Python 2.7 - http://bugs.python.org/issue4715 :
'JUMP_IF_FALSE_OR_POP', 'JUMP_IF_TRUE_OR_POP', 'POP_JUMP_IF_FALSE', 'JUMP_IF_FALSE_OR_POP', 'JUMP_IF_TRUE_OR_POP', 'POP_JUMP_IF_FALSE',
'POP_JUMP_IF_TRUE', 'SETUP_EXCEPT', 'END_FINALLY' 'POP_JUMP_IF_TRUE', 'SETUP_EXCEPT', 'END_FINALLY', 'LOAD_FAST',
'LOAD_GLOBAL', # Only allows access to restricted globals
] if x in opmap)) ] if x in opmap))
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
@ -81,16 +82,65 @@ def _get_opcodes(codeobj):
[100, 100, 23, 100, 100, 102, 103, 83] [100, 100, 23, 100, 100, 102, 103, 83]
""" """
i = 0 i = 0
opcodes = []
byte_codes = codeobj.co_code byte_codes = codeobj.co_code
while i < len(byte_codes): while i < len(byte_codes):
code = ord(byte_codes[i]) code = ord(byte_codes[i])
opcodes.append(code) yield code
if code >= HAVE_ARGUMENT: if code >= HAVE_ARGUMENT:
i += 3 i += 3
else: else:
i += 1 i += 1
return opcodes
def assert_no_dunder_name(code_obj, expr):
""" assert_no_dunder_name(code_obj, expr) -> None
Asserts that the code object does not refer to any "dunder name"
(__$name__), so that safe_eval prevents access to any internal-ish Python
attribute or method (both are loaded via LOAD_ATTR which uses a name, not a
const or a var).
Checks that no such name exists in the provided code object (co_names).
:param code_obj: code object to name-validate
:type code_obj: CodeType
:param str expr: expression corresponding to the code object, for debugging
purposes
:raises NameError: in case a forbidden name (containing two underscores)
is found in ``code_obj``
.. note:: actually forbids every name containing 2 underscores
"""
for name in code_obj.co_names:
if "__" in name:
raise NameError('Access to forbidden name %r (%r)' % (name, expr))
def assert_valid_codeobj(allowed_codes, code_obj, expr):
""" Asserts that the provided code object validates against the bytecode
and name constraints.
Recursively validates the code objects stored in its co_consts in case
lambdas are being created/used (lambdas generate their own separated code
objects and don't live in the root one)
:param allowed_codes: list of permissible bytecode instructions
:type allowed_codes: set(int)
:param code_obj: code object to name-validate
:type code_obj: CodeType
:param str expr: expression corresponding to the code object, for debugging
purposes
:raises ValueError: in case of forbidden bytecode in ``code_obj``
:raises NameError: in case a forbidden name (containing two underscores)
is found in ``code_obj``
"""
assert_no_dunder_name(code_obj, expr)
for opcode in _get_opcodes(code_obj):
if opcode not in allowed_codes:
raise ValueError(
"opcode %s not allowed (%r)" % (opname[opcode], expr))
for const in code_obj.co_consts:
if isinstance(const, CodeType):
assert_valid_codeobj(allowed_codes, const, 'lambda')
def test_expr(expr, allowed_codes, mode="eval"): def test_expr(expr, allowed_codes, mode="eval"):
"""test_expr(expression, allowed_codes[, mode]) -> code_object """test_expr(expression, allowed_codes[, mode]) -> code_object
@ -105,15 +155,14 @@ def test_expr(expr, allowed_codes, mode="eval"):
# eval() does not like leading/trailing whitespace # eval() does not like leading/trailing whitespace
expr = expr.strip() expr = expr.strip()
code_obj = compile(expr, "", mode) code_obj = compile(expr, "", mode)
except (SyntaxError, TypeError): except (SyntaxError, TypeError, ValueError):
_logger.debug('Invalid eval expression', exc_info=True) _logger.debug('Invalid eval expression', exc_info=True)
raise raise
except Exception: except Exception:
_logger.debug('Disallowed or invalid eval expression', exc_info=True) _logger.debug('Disallowed or invalid eval expression', exc_info=True)
raise ValueError("%s is not a valid expression" % expr) raise ValueError("%s is not a valid expression" % expr)
for code in _get_opcodes(code_obj):
if code not in allowed_codes: assert_valid_codeobj(allowed_codes, code_obj, expr)
raise ValueError("opcode %s not allowed (%r)" % (opname[code], expr))
return code_obj return code_obj
@ -182,19 +231,13 @@ def safe_eval(expr, globals_dict=None, locals_dict=None, mode="eval", nocopy=Fal
This can be used to e.g. evaluate This can be used to e.g. evaluate
an OpenERP domain expression from an untrusted source. an OpenERP domain expression from an untrusted source.
Throws TypeError, SyntaxError or ValueError (not allowed) accordingly. :throws TypeError: If the expression provided is a code object
:throws SyntaxError: If the expression provided is not valid Python
>>> safe_eval("__import__('sys').modules") :throws NameError: If the expression provided accesses forbidden names
Traceback (most recent call last): :throws ValueError: If the expression provided uses forbidden bytecode
...
ValueError: opcode LOAD_NAME not allowed
""" """
if isinstance(expr, CodeType): if isinstance(expr, CodeType):
raise ValueError("safe_eval does not allow direct evaluation of code objects.") raise TypeError("safe_eval does not allow direct evaluation of code objects.")
if '__subclasses__' in expr:
raise ValueError('expression not allowed (__subclasses__)')
if globals_dict is None: if globals_dict is None:
globals_dict = {} globals_dict = {}