doc webdav: more tests in lib, yaml
bzr revid: p_christ@hol.gr-20101103112534-7vdnxjxr9wb3736t
This commit is contained in:
parent
23103d89cf
commit
872f6be340
|
@ -11,3 +11,29 @@
|
|||
dc.get_creds(self, cr, uid)
|
||||
dc.gd_options(path=cr.dbname, expect={'DAV': ['1',]})
|
||||
-
|
||||
I will test the propnames at the document root
|
||||
-
|
||||
!python {model: ir.attachment}: |
|
||||
from document_webdav import test_davclient as te
|
||||
dc = te.DAVClient()
|
||||
dc.get_creds(self, cr, uid)
|
||||
dc.gd_propname(path=cr.dbname+'/Documents/')
|
||||
-
|
||||
I will test the ETags of the document root
|
||||
-
|
||||
!python {model: ir.attachment}: |
|
||||
from document_webdav import test_davclient as te
|
||||
dc = te.DAVClient()
|
||||
dc.get_creds(self, cr, uid)
|
||||
dc.gd_getetag(path=cr.dbname+'/Documents/')
|
||||
|
||||
-
|
||||
I will now ls -l the document root.
|
||||
-
|
||||
!python {model: ir.attachment}: |
|
||||
from document_webdav import test_davclient as te
|
||||
dc = te.DAVClient()
|
||||
dc.get_creds(self, cr, uid)
|
||||
res = dc.gd_lsl(path=cr.dbname+'/Documents/')
|
||||
for lin in res:
|
||||
print "%(type)s\t%(uid)s\t%(gid)s\t%(size)s\t%(mtime)s\t%(name)s" % lin
|
||||
|
|
|
@ -323,7 +323,7 @@ class DAVClient(object):
|
|||
"""An instance of a WebDAV client, connected to the OpenERP server
|
||||
"""
|
||||
|
||||
def __init__(self, user=None, passwd=None, dbg=0, use_ssl=False):
|
||||
def __init__(self, user=None, passwd=None, dbg=0, use_ssl=False, useragent=False):
|
||||
if use_ssl:
|
||||
self.host = config.get_misc('httpsd', 'interface', False)
|
||||
self.port = config.get_misc('httpsd', 'port', 8071)
|
||||
|
@ -345,6 +345,9 @@ class DAVClient(object):
|
|||
self.user = user
|
||||
self.passwd = passwd
|
||||
self.dbg = dbg
|
||||
self.hdrs = {}
|
||||
if useragent:
|
||||
self.set_useragent(useragent)
|
||||
|
||||
def get_creds(self, obj, cr, uid):
|
||||
"""Read back the user credentials from cr, uid
|
||||
|
@ -362,11 +365,25 @@ class DAVClient(object):
|
|||
self.passwd = res[0]['password']
|
||||
return True
|
||||
|
||||
def set_useragent(self, uastr):
|
||||
""" Set the user-agent header to something meaningful.
|
||||
Some shorthand names will be replaced by stock strings.
|
||||
"""
|
||||
if uastr in ('KDE4', 'Korganizer'):
|
||||
self.hdrs['User-Agent'] = "Mozilla/5.0 (compatible; Konqueror/4.4; Linux) KHTML/4.4.3 (like Gecko)"
|
||||
elif uastr == 'iPhone3':
|
||||
self.hdrs['User-Agent'] = "DAVKit/5.0 (765); iCalendar/5.0 (79); iPhone/4.1 8B117"
|
||||
elif uastr == "MacOS":
|
||||
self.hdrs['User-Agent'] = "WebDAVFS/1.8 (01808000) Darwin/9.8.0 (i386)"
|
||||
else:
|
||||
self.hdrs['User-Agent'] = uastr
|
||||
|
||||
def _http_request(self, path, method='GET', hdrs=None, body=None):
|
||||
if not hdrs:
|
||||
hdrs = {}
|
||||
import base64
|
||||
dbg = self.dbg
|
||||
hdrs.update(self.hdrs)
|
||||
log.debug("Getting %s http://%s:%d/%s", method, self.host, self.port, path)
|
||||
conn = httplib.HTTPConnection(self.host, port=self.port)
|
||||
conn.set_debuglevel(dbg)
|
||||
|
@ -454,7 +471,169 @@ class DAVClient(object):
|
|||
|
||||
if expect:
|
||||
self._assert_headers(expect, m)
|
||||
|
||||
def _parse_prop_response(self, data):
|
||||
""" Parse a propfind/propname response
|
||||
"""
|
||||
def getText(node):
|
||||
rc = []
|
||||
for node in node.childNodes:
|
||||
if node.nodeType == node.TEXT_NODE:
|
||||
rc.append(node.data)
|
||||
return ''.join(rc)
|
||||
|
||||
def getElements(node, namespaces=None, strict=False):
|
||||
for cnod in node.childNodes:
|
||||
if cnod.nodeType != node.ELEMENT_NODE:
|
||||
if strict:
|
||||
log.debug("Found %r inside <%s>", cnod, node.tagName)
|
||||
continue
|
||||
if namespaces and (cnod.namespaceURI not in namespaces):
|
||||
log.debug("Ignoring <%s> in <%s>", cnod.tagName, node.localName)
|
||||
continue
|
||||
yield cnod
|
||||
|
||||
nod = xml.dom.minidom.parseString(data)
|
||||
nod_r = nod.documentElement
|
||||
res = {}
|
||||
assert nod_r.localName == 'multistatus', nod_r.tagName
|
||||
for resp in nod_r.getElementsByTagNameNS('DAV:', 'response'):
|
||||
href = None
|
||||
status = 200
|
||||
res_nss = {}
|
||||
for cno in getElements(resp, namespaces=['DAV:',]):
|
||||
if cno.localName == 'href':
|
||||
assert href is None, "Second href in same response"
|
||||
href = getText(cno)
|
||||
elif cno.localName == 'propstat':
|
||||
for pno in getElements(cno, namespaces=['DAV:',]):
|
||||
rstatus = None
|
||||
if pno.localName == 'prop':
|
||||
for prop in getElements(pno):
|
||||
key = prop.localName
|
||||
tval = getText(prop).strip()
|
||||
val = tval or (True, rstatus or status)
|
||||
if prop.namespaceURI == 'DAV:' and prop.localName == 'resourcetype':
|
||||
val = 'plain'
|
||||
for rte in getElements(prop, namespaces=['DAV:',]):
|
||||
# Note: we only look at DAV:... elements, we
|
||||
# actually expect only one DAV:collection child
|
||||
val = rte.localName
|
||||
res_nss.setdefault(prop.namespaceURI,{})[key] = val
|
||||
elif pno.localName == 'status':
|
||||
rstr = getText(pno)
|
||||
htver, sta, msg = rstr.split(' ', 3)
|
||||
assert htver == 'HTTP/1.1'
|
||||
rstatus = int(sta)
|
||||
else:
|
||||
log.debug("What is <%s> inside a <propstat>?", pno.tagName)
|
||||
|
||||
else:
|
||||
log.debug("Unknown node: %s", cno.tagName)
|
||||
|
||||
res.setdefault(href,[]).append((status, res_nss))
|
||||
|
||||
return res
|
||||
|
||||
def gd_propfind(self, path, props=None, depth=0):
|
||||
if not props:
|
||||
propstr = '<allprop/>'
|
||||
else:
|
||||
propstr = '<prop>'
|
||||
nscount = 0
|
||||
for p in props:
|
||||
ns = None
|
||||
if isinstance(p, tuple):
|
||||
p, ns = p
|
||||
if ns is None or ns == 'DAV:':
|
||||
propstr += '<%s/>' % p
|
||||
else:
|
||||
propstr += '<ns%d:%s xmlns:ns%d="%s" />' %(nscount, p, nscount, ns)
|
||||
nscount += 1
|
||||
propstr += '</prop>'
|
||||
|
||||
body="""<?xml version="1.0" encoding="utf-8"?>
|
||||
<propfind xmlns="DAV:">%s</propfind>""" % propstr
|
||||
hdrs = { 'Content-Type': 'text/xml; charset=utf-8',
|
||||
'Accept': 'text/xml',
|
||||
'Depth': depth,
|
||||
}
|
||||
|
||||
s, m, d = self._http_request(self.davpath + path, method='PROPFIND',
|
||||
hdrs=hdrs, body=body)
|
||||
assert s == 207, "Bad status: %s" % s
|
||||
ctype = m.getheader('Content-Type').split(';',1)[0]
|
||||
assert ctype == 'text/xml', m.getheader('Content-Type')
|
||||
res = self._parse_prop_response(d)
|
||||
if depth == 0:
|
||||
assert len(res) == 1
|
||||
res = res.values()[0]
|
||||
else:
|
||||
assert len(res) >= 1
|
||||
return res
|
||||
|
||||
|
||||
def gd_propname(self, path, depth=0):
|
||||
body="""<?xml version="1.0" encoding="utf-8"?>
|
||||
<propfind xmlns="DAV:"><propname/></propfind>"""
|
||||
hdrs = { 'Content-Type': 'text/xml; charset=utf-8',
|
||||
'Accept': 'text/xml',
|
||||
'Depth': depth
|
||||
}
|
||||
s, m, d = self._http_request(self.davpath + path, method='PROPFIND',
|
||||
hdrs=hdrs, body=body)
|
||||
assert s == 207, "Bad status: %s" % s
|
||||
ctype = m.getheader('Content-Type').split(';',1)[0]
|
||||
assert ctype == 'text/xml', m.getheader('Content-Type')
|
||||
res = self._parse_prop_response(d)
|
||||
if depth == 0:
|
||||
assert len(res) == 1
|
||||
res = res.values()[0]
|
||||
else:
|
||||
assert len(res) >= 1
|
||||
return res
|
||||
|
||||
def gd_getetag(self, path, depth=0):
|
||||
return self.gd_propfind(path, props=['getetag',], depth=depth)
|
||||
|
||||
def gd_lsl(self, path):
|
||||
""" Return a list of 'ls -l' kind of data for a folder
|
||||
|
||||
This is based on propfind.
|
||||
"""
|
||||
|
||||
lspairs = [ ('name', 'displayname', 'n/a'), ('size', 'getcontentlength', '0'),
|
||||
('type', 'resourcetype', '----------'), ('uid', 'owner', 'nobody'),
|
||||
('gid', 'group', 'nogroup'), ('mtime', 'getlastmodified', 'n/a'),
|
||||
('mime', 'getcontenttype', 'application/data'), ]
|
||||
|
||||
propnames = [ l[1] for l in lspairs]
|
||||
propres = self.gd_propfind(path, props=propnames, depth=1)
|
||||
|
||||
res = []
|
||||
for href, pr in propres.items():
|
||||
lsline = {}
|
||||
for st, nsdic in pr:
|
||||
davprops = nsdic['DAV:']
|
||||
if st == 200:
|
||||
for lsp in lspairs:
|
||||
if lsp[1] in davprops:
|
||||
if lsp[1] == 'resourcetype':
|
||||
if davprops[lsp[1]] == 'collection':
|
||||
lsline[lsp[0]] = 'dr-xr-x---'
|
||||
else:
|
||||
lsline[lsp[0]] = '-r-xr-x---'
|
||||
else:
|
||||
lsline[lsp[0]] = davprops[lsp[1]]
|
||||
elif st in (404, 403):
|
||||
for lsp in lspairs:
|
||||
if lsp[1] in davprops:
|
||||
lsline[lsp[0]] = lsp[2]
|
||||
else:
|
||||
log.debug("Strange status: %s", st)
|
||||
|
||||
res.append(lsline)
|
||||
|
||||
return res
|
||||
|
||||
#eof
|
Loading…
Reference in New Issue