[FIX] /login is not webservice friendly, base_import_module uses it's own

Exceptions in /base_import_module/* will generate 500 pages with a short message

bzr revid: fme@openerp.com-20140326215709-esc4zkpfp8uzuww1
This commit is contained in:
Fabien Meghazi 2014-03-26 22:57:09 +01:00
parent b7fbef1241
commit e49b8f5ae3
2 changed files with 51 additions and 41 deletions

View File

@ -14,52 +14,41 @@ except ImportError:
session = requests.session()
def deploy_module(module_path, url, login, password, db=None):
if url.endswith('/'):
url = url[:-1]
def deploy_module(module_path, url, login, password, db=''):
url = url.rstrip('/')
authenticate(url, login, password, db)
check_import(url)
module_file = zip_module(module_path)
try:
return upload_module(url, module_file)
finally:
os.remove(module_file)
def check_import(server):
url = server +'/base_import_module/check'
res = session.get(url)
if res.status_code == 404:
raise Exception("The server %r does not have the 'base_import_module' installed." % server)
elif res.status_code != 200:
raise Exception("Server %r returned %s http error.", (server, res.status_code))
def upload_module(server, module_file):
print("Uploading module file...")
url = server + '/base_import_module/upload'
files = dict(mod_file=open(module_file, 'rb'))
res = session.post(url, files=files)
if res.status_code != 200:
raise Exception("Could not authenticate on server %r" % server)
raise Exception("Could not authenticate on server '%s'" % server)
return res.text
def authenticate(server, login, password, db):
print("Connecting to server %r" % server)
print("Waiting for server authentication...")
if db:
url = server + '/login'
args = dict(db=db, login=login, key=password)
res = session.get(url, params=args)
else:
url = server + '/web/login'
args = dict(login=login, password=password)
res = session.post(url, args)
if res.status_code != 200:
raise Exception("Could not authenticate to OpenERP server %r" % server)
def authenticate(server, login, password, db=''):
print("Authenticating on server '%s' ..." % server)
# Fixate session with a given db if any
session.get(server + '/web/login', params=dict(db=db))
args = dict(login=login, password=password, db=db)
res = session.post(server + '/base_import_module/login', args)
if res.status_code == 404:
raise Exception("The server '%s' does not have the 'base_import_module' installed." % server)
elif res.status_code != 200:
raise Exception(res.text)
def zip_module(path):
path = os.path.abspath(path)
if not os.path.isdir(path):
raise Exception("Could not find module directory %r" % path)
raise Exception("Could not find module directory '%s'" % path)
container, module_name = os.path.split(path)
temp = tempfile.mktemp(suffix='.zip')
try:
@ -78,7 +67,7 @@ if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Deploy a module on an OpenERP server.')
parser.add_argument('path', help="Path of the module to deploy")
parser.add_argument('--url', dest='url', help='Url of the server (default=http://localhost:8069)', default="http://localhost:8069")
parser.add_argument('--database', dest='database', help='Database to use if server does not use db-filter.')
parser.add_argument('--db', dest='db', help='Database to use if server does not use db-filter.')
parser.add_argument('--login', dest='login', default="admin", help='Login (default=admin)')
parser.add_argument('--password', dest='password', default="admin", help='Password (default=admin)')
parser.add_argument('--no-ssl-check', dest='no_ssl_check', action='store_true', help='Do not check ssl cert')
@ -91,7 +80,7 @@ if __name__ == '__main__':
session.verify = False
try:
result = deploy_module(args.path, args.url, args.login, args.password, args.database)
result = deploy_module(args.path, args.url, args.login, args.password, args.db)
print(result)
except Exception, e:
sys.exit("ERROR: %s" % e)

View File

@ -1,26 +1,47 @@
# -*- coding: utf-8 -*-
import functools
import os
import zipfile
from os.path import join as opj
import openerp
from openerp.http import Controller, route, request
from openerp.http import Controller, route, request, Response
MAX_FILE_SIZE = 100 * 1024 * 1024 # in megabytes
def webservice(f):
@functools.wraps(f)
def wrap(*args, **kw):
try:
return f(*args, **kw)
except Exception, e:
return Response(response=str(e), status=500)
return wrap
class ImportModule(Controller):
@route('/base_import_module/check', type='http', auth='user', methods=['GET'])
def check(self):
assert request.db # TODO: custom ensure_db?
assert request.uid == openerp.SUPERUSER_ID # TODO: check admin group
return 'ok'
def check_user(self, uid=None):
if uid is None:
uid = request.uid
is_admin = request.registry['res.users'].has_group(request.cr, uid, 'base.group_erp_manager')
if not is_admin:
raise openerp.exceptions.AccessError("Only administrators can upload a module")
@route('/base_import_module/login', type='http', auth='none', methods=['POST'])
@webservice
def login(self, login, password, db=None):
if db and db != request.db:
raise Exception("Could not select database '%s'" % db)
uid = request.session.authenticate(request.db, login, password)
if not uid:
return Response(response="Wrong login/password", status=401)
self.check_user(uid)
return "ok"
@route('/base_import_module/upload', type='http', auth='user', methods=['POST'])
@webservice
def upload(self, mod_file=None, **kw):
assert request.db # TODO: custom ensure_db?
assert request.uid == openerp.SUPERUSER_ID # TODO: check admin group
self.check_user()
imm = request.registry['ir.module.module']
if not mod_file:
@ -33,7 +54,7 @@ class ImportModule(Controller):
with zipfile.ZipFile(mod_file, "r") as z:
for zf in z.filelist:
if zf.file_size > MAX_FILE_SIZE:
raise Exception("File %r exceed maximum allowed file size" % zf.filename)
raise Exception("File '%s' exceed maximum allowed file size" % zf.filename)
with openerp.tools.osutil.tempdir() as module_dir:
z.extractall(module_dir)
@ -46,7 +67,7 @@ class ImportModule(Controller):
success.append(mod_name)
except Exception, e:
errors[mod_name] = str(e)
r = ["Successfully imported module %r" % mod for mod in success]
r = ["Successfully imported module '%s'" % mod for mod in success]
for mod, error in errors.items():
r.append("Error while importing module %r: %r" % (mod, error))
r.append("Error while importing module '%s': %r" % (mod, error))
return '\n'.join(r)