|
|
|
#!/usr/bin/python3
|
|
|
|
|
|
|
|
import dbus
|
|
|
|
import dbus.mainloop.glib
|
|
|
|
import sys
|
|
|
|
from gi.repository import GLib
|
|
|
|
import os
|
|
|
|
|
|
|
|
def print_menu():
|
|
|
|
print("Select test case")
|
|
|
|
print("----------------------------------------------------------------")
|
|
|
|
print("[0] Activate cbs")
|
|
|
|
print("[1] Deactivate cbs")
|
|
|
|
print("[2] Get cbs properties")
|
|
|
|
print("[3] Set/Register topics")
|
|
|
|
print(" If several - give topics separated with comma. \
|
|
|
|
\n E.g. 20,50-51,60")
|
|
|
|
print("[4] Clear/Unregister topics")
|
|
|
|
print("[5] NetReg Base Station - Get current serving cell")
|
|
|
|
print("[x] Exit")
|
|
|
|
print("----------------------------------------------------------------")
|
|
|
|
|
|
|
|
def property_changed(property, value):
|
|
|
|
if value == "" and property == "Topics":
|
|
|
|
print("User selected Topics have been cleared. \
|
|
|
|
\nRegistered for emergency topics only.")
|
|
|
|
else:
|
|
|
|
print("Cell Broadcast property %s is changed to %s" % (property, value))
|
|
|
|
print("\nPress ENTER to continue")
|
|
|
|
|
|
|
|
def incoming_broadcast(text, topic):
|
|
|
|
print("Broadcast msg: %s \n Topic channel: %s" % (text, topic))
|
|
|
|
print("\nPress ENTER to continue")
|
|
|
|
|
|
|
|
def emergency_broadcast(text, properties):
|
|
|
|
emergType = properties["EmergencyType"]
|
|
|
|
emergAlert = properties["EmergencyAlert"]
|
|
|
|
|
|
|
|
print("Broadcast msg: %s \n\t Type: %s \n\t Alert: %s \n\t Popup: %s" \
|
|
|
|
% (text, emergType, emergAlert, popup))
|
|
|
|
|
|
|
|
if properties["Popup"] == True:
|
|
|
|
print("Popup required.")
|
|
|
|
|
|
|
|
print("\nPress ENTER to continue")
|
|
|
|
|
|
|
|
def set_cbs_state(cbs, state):
|
|
|
|
if state == True:
|
|
|
|
print("Activating cell broadcast...")
|
|
|
|
cbs.SetProperty("Powered", dbus.Boolean(1))
|
|
|
|
else:
|
|
|
|
print("Deactivating cell broadcast...")
|
|
|
|
cbs.SetProperty("Powered", dbus.Boolean(0))
|
|
|
|
print("-----------------------------------------------------------")
|
|
|
|
|
|
|
|
def print_cbs_properties(cbs):
|
|
|
|
properties = cbs.GetProperties()
|
|
|
|
print("---------------------PROPERTIES----------------------------")
|
|
|
|
for p in properties:
|
|
|
|
if len(properties[p].__str__()) > 0:
|
|
|
|
if p == "Powered":
|
|
|
|
if properties[p] == True:
|
|
|
|
print("Cell Broadcast is Activated.")
|
|
|
|
else:
|
|
|
|
print("Cell Broadcast is Deactivated.")
|
|
|
|
elif p == "Topics":
|
|
|
|
print("Currently set CBS %s are: %s" \
|
|
|
|
% (p, properties[p]))
|
|
|
|
topics_available = True
|
|
|
|
else:
|
|
|
|
print("Cell Broadcast %s value empty" % (p))
|
|
|
|
print("-----------------------------------------------------------")
|
|
|
|
|
|
|
|
def set_topics(cbs):
|
|
|
|
print_cbs_properties(cbs)
|
|
|
|
|
|
|
|
topicTemp = ""
|
|
|
|
invalidData = False;
|
|
|
|
index = 0
|
|
|
|
|
|
|
|
topics = input('Enter the topic ID(s) you want to register to: ')
|
|
|
|
|
|
|
|
while index < len(topics):
|
|
|
|
if topics[index] == ',' or topics[index] == '-':
|
|
|
|
topicTemp = ""
|
|
|
|
elif topics[index] >= '0' and topics[index] <= '9':
|
|
|
|
topicTemp = topicTemp + topics[index]
|
|
|
|
else:
|
|
|
|
print("Invalid char. \"%s\" entered. Topic not set." \
|
|
|
|
% (topics[index]))
|
|
|
|
invalidData = True
|
|
|
|
break
|
|
|
|
|
|
|
|
if topicTemp:
|
|
|
|
if int(topicTemp) > 999:
|
|
|
|
invalidData = True
|
|
|
|
print("Invalid Topic ID %s (range 0-999). \
|
|
|
|
\nCould not register." % topicTemp)
|
|
|
|
|
|
|
|
index = index + 1
|
|
|
|
|
|
|
|
if invalidData == False:
|
|
|
|
try:
|
|
|
|
print("Setting Cell Broadcast topics...")
|
|
|
|
cbs.SetProperty("Topics", topics);
|
|
|
|
except dbus.DBusException as e:
|
|
|
|
print("Unable to set topic: %s" % e)
|
|
|
|
|
|
|
|
print("-----------------------------------------------------------")
|
|
|
|
|
|
|
|
def get_serving_cell_name(netReg):
|
|
|
|
wasFound = False;
|
|
|
|
properties = netReg.GetProperties()
|
|
|
|
|
|
|
|
for p in properties:
|
|
|
|
if p == "BaseStation":
|
|
|
|
if len(properties[p].__str__()) > 0:
|
|
|
|
print("Current serving cell name: %s" \
|
|
|
|
% (properties["BaseStation"]))
|
|
|
|
wasFound = True;
|
|
|
|
else:
|
|
|
|
print("Current Serving cell name empty. \
|
|
|
|
Base Station CBS not available.")
|
|
|
|
|
|
|
|
if wasFound == False:
|
|
|
|
print("Base Station parameter not found. \
|
|
|
|
\nBase Station CBS not available.")
|
|
|
|
print("-----------------------------------------------------------")
|
|
|
|
|
|
|
|
def stdin_handler(channel, condition, cbs, netReg):
|
|
|
|
in_key = os.read(channel.unix_get_fd(), 160).rstrip().decode('UTF-8')
|
|
|
|
|
|
|
|
if in_key == '0':
|
|
|
|
set_cbs_state(cbs, True)
|
|
|
|
|
|
|
|
elif in_key == '1':
|
|
|
|
set_cbs_state(cbs, False)
|
|
|
|
|
|
|
|
elif in_key == '2':
|
|
|
|
print_cbs_properties(cbs)
|
|
|
|
|
|
|
|
elif in_key == '3':
|
|
|
|
set_topics(cbs)
|
|
|
|
|
|
|
|
elif in_key == '4':
|
|
|
|
cbs.SetProperty("Topics", "")
|
|
|
|
|
|
|
|
elif in_key == '5':
|
|
|
|
get_serving_cell_name(netReg)
|
|
|
|
|
|
|
|
elif in_key == 'x':
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
print('\n' * 2)
|
|
|
|
print_menu()
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|
|
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
|
|
bus = dbus.SystemBus()
|
|
|
|
|
|
|
|
manager = dbus.Interface(bus.get_object('org.ofono', '/'),
|
|
|
|
'org.ofono.Manager')
|
|
|
|
|
|
|
|
modems = manager.GetModems()
|
|
|
|
path = modems[0][0]
|
|
|
|
|
|
|
|
cbs = dbus.Interface(bus.get_object('org.ofono', path),
|
|
|
|
'org.ofono.CellBroadcast')
|
|
|
|
|
|
|
|
netReg = dbus.Interface(bus.get_object('org.ofono', path),
|
|
|
|
'org.ofono.NetworkRegistration')
|
|
|
|
|
|
|
|
cbs.connect_to_signal("PropertyChanged", property_changed)
|
|
|
|
cbs.connect_to_signal("IncomingBroadcast", incoming_broadcast)
|
|
|
|
cbs.connect_to_signal("EmergencyBroadcast", emergency_broadcast)
|
|
|
|
|
|
|
|
print('\n' * 2)
|
|
|
|
|
|
|
|
print_menu()
|
|
|
|
|
|
|
|
GLib.io_add_watch(GLib.IOChannel(filedes=sys.stdin.fileno()),
|
|
|
|
GLib.PRIORITY_DEFAULT, GLib.IO_IN, stdin_handler, cbs, \
|
|
|
|
netReg)
|
|
|
|
|
|
|
|
mainloop = GLib.MainLoop()
|
|
|
|
mainloop.run()
|