pjproject/tests/pjsua/run.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

434 lines
13 KiB
Python
Raw Normal View History

import sys
import imp
import re
import os
import subprocess
import random
import telnetlib
import time
import threading
import traceback
import getopt
import inc_cfg as inc
import inc_const as const
# Vars
G_EXE = "" # pjsua executable path
G_INUNIX = False # flags that test is running in Unix
# Usage string
usage = \
"""
run.py - Automated test driver
Usage:
run.py [options] MODULE CONFIG
Options:
--exe, -e pjsua executable path
--null-audio, -n use null audio
Sample:
run.py -n mod_run.py scripts-run/100_simple.py
"""
# Parse arguments
try:
opts, args = getopt.getopt(sys.argv[1:], "hne:", ["help", "null-audio", "exe="])
except getopt.GetoptError, err:
print str(err)
print usage
sys.exit(2)
for o, a in opts:
if o in ("-h", "--help"):
print usage
sys.exit()
elif o in ("-n", "--null-audio"):
inc.HAS_SND_DEV = 0
elif o in ("-e", "--exe"):
G_EXE = a
else:
print "Unknown options"
sys.exit(2)
if len(args) != 2:
print "Invalid arguments"
print usage
sys.exit(2)
# Set global ARGS to be used by modules
inc.ARGS = args
# Get the pjsua executable name
if G_EXE == "":
if sys.platform.find("win32")!=-1:
EXE_DIR = "../../pjsip-apps/bin/"
EXECUTABLES = [ "pjsua_vc6d.exe",
"pjsua_vc6.exe",
"pjsua-i386-Win32-vc8-Debug.exe",
"pjsua-i386-Win32-vc8-Debug-Dynamic.exe",
"pjsua-i386-Win32-vc8-Debug-Static.exe",
"pjsua-i386-Win32-vc8-Release.exe",
"pjsua-i386-Win32-vc8-Release-Dynamic.exe",
"pjsua-i386-Win32-vc8-Release-Static.exe",
"pjsua-i386-Win32-vc14-Debug.exe",
"pjsua-i386-Win32-vc14-Debug-Dynamic.exe",
"pjsua-i386-Win32-vc14-Debug-Static.exe",
"pjsua-i386-Win32-vc14-Release.exe",
"pjsua-i386-Win32-vc14-Release-Dynamic.exe",
"pjsua-i386-Win32-vc14-Release-Static.exe"
]
e_ts = 0
for e in EXECUTABLES:
e = EXE_DIR + e
if os.access(e, os.F_OK):
st = os.stat(e)
if e_ts==0 or e_ts<st.st_mtime:
G_EXE = e
e_ts = st.st_mtime
if G_EXE=="":
print "Unable to find valid pjsua. Please build pjsip first"
sys.exit(1)
G_INUNIX = False
else:
f = open("../../build.mak", "r")
while True:
line = f.readline()
if not line:
break
if line.find("TARGET_NAME")!=-1:
print line
G_EXE="../../pjsip-apps/bin/pjsua-" + line.split(":= ")[1]
break
if G_EXE=="":
print "Unable to find ../../../build.mak. Please build pjsip first"
sys.exit(1)
G_INUNIX = True
else:
if sys.platform.lower().find("win32")!=-1 or sys.platform.lower().find("microsoft")!=-1:
G_INUNIX = False
else:
G_INUNIX = True
G_EXE = G_EXE.rstrip("\n\r \t")
###################################
# Poor man's 'expect'-like class
class Expect(threading.Thread):
proc = None
telnet = None
use_telnet = False
echo = False
trace_enabled = False
inst_param = None
rh = re.compile(const.DESTROYED)
ra = re.compile(const.ASSERT, re.I)
rr = re.compile(const.STDOUT_REFRESH)
t0 = time.time()
output = ""
lock = threading.Lock()
running = False
def __init__(self, inst_param):
threading.Thread.__init__(self)
self.inst_param = inst_param
self.name = inst_param.name
self.echo = inst_param.echo_enabled
self.trace_enabled = inst_param.trace_enabled
self.use_telnet = inst_param.telnet_enabled
self.telnet = None
def run(self):
if self.use_telnet:
fullcmd = G_EXE + " " + self.inst_param.arg + " --use-cli --no-cli-console --cli-telnet-port=%d" % (self.inst_param.telnet_port)
self.trace("Popen " + fullcmd)
self.proc = subprocess.Popen(fullcmd, shell=G_INUNIX)
# start telnet-ing to pjsua, raise exception if telnet fails after 5s
t0 = time.time()
while self.proc.poll() is None and self.telnet is None:
try:
time.sleep(0.01)
self.telnet = telnetlib.Telnet('127.0.0.1', port=self.inst_param.telnet_port, timeout=60)
except Exception as e:
t1 = time.time()
dur = int(t1 - t0)
if dur > 5:
raise inc.TestError(self.name + ": Timeout connecting to pjsua: " + repr(e))
self.running = True
while self.proc.poll() is None:
line = self.telnet.read_until('\n', 60)
if line == "" or const.DESTROYED in line:
break;
#Print the line if echo is ON
if self.echo:
print self.name + ": " + line.rstrip()
self.lock.acquire()
self.output += line
self.lock.release()
self.running = False
else:
fullcmd = G_EXE + " " + self.inst_param.arg + " --stdout-refresh=5 --stdout-refresh-text=" + const.STDOUT_REFRESH
if not self.inst_param.enable_buffer:
fullcmd = fullcmd + " --stdout-no-buf"
self.trace("Popen " + fullcmd)
self.proc = subprocess.Popen(fullcmd, shell=G_INUNIX, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False)
self.running = True
while self.proc.poll() is None:
line = self.proc.stdout.readline()
if line == "":
break;
#Print the line if echo is ON
if self.echo:
print self.name + ": " + line.rstrip()
self.lock.acquire()
self.output += line
self.lock.release()
self.running = False
def send(self, cmd):
self.trace("send " + cmd)
if self.use_telnet:
self.telnet.write(cmd + '\r\n')
else:
self.proc.stdin.writelines(cmd + "\n")
self.proc.stdin.flush()
def expect(self, pattern, raise_on_error=True, title="", timeout=15):
# no prompt for telnet
if self.use_telnet and pattern==const.PROMPT:
return
self.trace("expect " + pattern)
r = re.compile(pattern, re.I)
found_at = -1
t0 = time.time()
while found_at < 0:
self.lock.acquire()
lines = self.output.splitlines()
for i, line in enumerate(lines):
# Search for expected text
if r.search(line) != None:
found_at = i
break
# Trap assertion error
if raise_on_error:
if self.ra.search(line) != None:
self.lock.release()
raise inc.TestError(self.name + ": " + line)
self.output = '\n'.join(lines[found_at+1:])+"\n" if found_at >= 0 else ""
self.lock.release()
if found_at >= 0:
return line
if not self.running:
if raise_on_error:
raise inc.TestError(self.name + ": Premature EOF")
break
else:
t1 = time.time()
dur = int(t1 - t0)
if dur > timeout:
self.trace("Timed-out!")
if raise_on_error:
raise inc.TestError(self.name + " " + title + ": Timeout expecting pattern: \"" + pattern + "\"")
break
else:
time.sleep(0.01)
return None
def get_config(self, key_config):
self.send("dd")
line = self.expect(key_config);
return line
def sync_stdout(self):
if not self.use_telnet:
self.trace("sync_stdout")
cmd = "echo 1" + str(random.randint(1000,9999))
self.send(cmd)
self.expect(cmd)
def wait(self):
self.trace("wait")
self.join()
self.proc.communicate()
if self.telnet:
self.telnet.close()
def trace(self, s):
if self.trace_enabled:
now = time.time()
fmt = self.name + ": " + "================== " + s + " ==================" + " [at t=%(time)03d]"
print fmt % {'time':int(now - self.t0)}
#########################
# Error handling
def handle_error(errmsg, t, close_processes = True):
print "====== Caught error: " + errmsg + " ======"
if (close_processes):
time.sleep(1)
for p in t.process:
# Protect against 'Broken pipe' exception
try:
if not p.use_telnet:
p.send("q")
p.send("q")
else:
p.send("shutdown")
except:
pass
is_err = False
try:
ret = p.expect(const.DESTROYED, False)
if ret is None:
is_err = True
except:
is_err = True
if is_err and p.proc.poll() is None:
if sys.hexversion >= 0x02060000:
p.proc.terminate()
else:
p.wait()
else:
p.wait()
print "Test completed with error: " + errmsg
sys.exit(1)
#########################
# MAIN
# Import the test script
script = imp.load_source("script", inc.ARGS[0])
# Init random seed
random.seed()
# Validate
if script.test == None:
print "Error: no test defined"
sys.exit(1)
if script.test.skip:
print "Test " + script.test.title + " is skipped"
sys.exit(0)
if len(script.test.inst_params) == 0:
print "Error: test doesn't contain pjsua run descriptions"
sys.exit(1)
# Instantiate pjsuas
print "====== Running " + script.test.title + " ======"
print "Using " + G_EXE + " as pjsua executable"
for inst_param in script.test.inst_params:
retry = 0
process_running = False
while (not process_running) and retry < 3:
p = None
retry += 1
try:
# Create pjsua's Expect instance from the param
p = Expect(inst_param)
p.start()
except inc.TestError, e:
handle_error(e.desc, script.test)
# wait process ready
if not p.use_telnet:
while True:
try:
p.send("echo 1")
except:
time.sleep(0.1)
continue
break
process_running = True
else:
t0 = time.time()
while p.telnet is None:
time.sleep(0.1)
dur = int(time.time() - t0)
if dur > 5:
break
process_running = p.telnet is not None
# wait before retrying
if not process_running and retry < 2:
time.sleep(2)
if not process_running:
handle_error("Failed running pjsua", script.test)
# add running instance
script.test.process.append(p)
for p in script.test.process:
try:
# Wait until registration completes
if p.inst_param.have_reg:
p.send("rr")
p.expect(p.inst_param.uri+".*registration success")
# Synchronize stdout
if not p.use_telnet:
p.send("")
p.expect(const.PROMPT)
p.send("echo 1")
p.expect("echo 1")
except inc.TestError, e:
handle_error(e.desc, script.test)
# Run the test function
if script.test.test_func != None:
try:
script.test.test_func(script.test)
except inc.TestError, e:
handle_error(e.desc, script.test)
except:
handle_error("Unknown error: " + str(traceback.format_exc()), script.test)
# Shutdown all instances
for p in script.test.process:
# Unregister if we have_reg to make sure that next tests
# won't fail
if p.inst_param.have_reg:
p.send("ru")
p.expect(p.inst_param.uri+".*unregistration success")
if p.use_telnet:
p.send("shutdown")
else:
p.send("q")
time.sleep(0.5)
for p in script.test.process:
if p.running:
p.expect(const.DESTROYED, False)
p.wait()
# Run the post test function
if script.test.post_func != None:
try:
script.test.post_func(script.test)
except inc.TestError, e:
handle_error(e.desc, script.test, False)
# Done
print "Test " + script.test.title + " completed successfully"
sys.exit(0)