#!/usr/bin/python3 import dbus import dbus.mainloop.glib import sys from gi.repository import GLib import os def print_menu(): print("Select test case") print("----------------------------------------------------------------") print("[0] Activate cbs") print("[1] Deactivate cbs") print("[2] Get cbs properties") print("[3] Set/Register topics") print(" If several - give topics separated with comma. \ \n E.g. 20,50-51,60") print("[4] Clear/Unregister topics") print("[5] NetReg Base Station - Get current serving cell") print("[x] Exit") print("----------------------------------------------------------------") def property_changed(property, value): if value == "" and property == "Topics": print("User selected Topics have been cleared. \ \nRegistered for emergency topics only.") else: print("Cell Broadcast property %s is changed to %s" % (property, value)) print("\nPress ENTER to continue") def incoming_broadcast(text, topic): print("Broadcast msg: %s \n Topic channel: %s" % (text, topic)) print("\nPress ENTER to continue") def emergency_broadcast(text, properties): emergType = properties["EmergencyType"] emergAlert = properties["EmergencyAlert"] print("Broadcast msg: %s \n\t Type: %s \n\t Alert: %s \n\t Popup: %s" \ % (text, emergType, emergAlert, popup)) if properties["Popup"] == True: print("Popup required.") print("\nPress ENTER to continue") def set_cbs_state(cbs, state): if state == True: print("Activating cell broadcast...") cbs.SetProperty("Powered", dbus.Boolean(1)) else: print("Deactivating cell broadcast...") cbs.SetProperty("Powered", dbus.Boolean(0)) print("-----------------------------------------------------------") def print_cbs_properties(cbs): properties = cbs.GetProperties() print("---------------------PROPERTIES----------------------------") for p in properties: if len(properties[p].__str__()) > 0: if p == "Powered": if properties[p] == True: print("Cell Broadcast is Activated.") else: print("Cell Broadcast is Deactivated.") elif p == "Topics": print("Currently set CBS %s are: %s" \ % (p, properties[p])) topics_available = True else: print("Cell Broadcast %s value empty" % (p)) print("-----------------------------------------------------------") def set_topics(cbs): print_cbs_properties(cbs) topicTemp = "" invalidData = False; index = 0 topics = input('Enter the topic ID(s) you want to register to: ') while index < len(topics): if topics[index] == ',' or topics[index] == '-': topicTemp = "" elif topics[index] >= '0' and topics[index] <= '9': topicTemp = topicTemp + topics[index] else: print("Invalid char. \"%s\" entered. Topic not set." \ % (topics[index])) invalidData = True break if topicTemp: if int(topicTemp) > 999: invalidData = True print("Invalid Topic ID %s (range 0-999). \ \nCould not register." % topicTemp) index = index + 1 if invalidData == False: try: print("Setting Cell Broadcast topics...") cbs.SetProperty("Topics", topics); except dbus.DBusException as e: print("Unable to set topic: %s" % e) print("-----------------------------------------------------------") def get_serving_cell_name(netReg): wasFound = False; properties = netReg.GetProperties() for p in properties: if p == "BaseStation": if len(properties[p].__str__()) > 0: print("Current serving cell name: %s" \ % (properties["BaseStation"])) wasFound = True; else: print("Current Serving cell name empty. \ Base Station CBS not available.") if wasFound == False: print("Base Station parameter not found. \ \nBase Station CBS not available.") print("-----------------------------------------------------------") def stdin_handler(channel, condition, cbs, netReg): in_key = os.read(channel.unix_get_fd(), 160).rstrip().decode('UTF-8') if in_key == '0': set_cbs_state(cbs, True) elif in_key == '1': set_cbs_state(cbs, False) elif in_key == '2': print_cbs_properties(cbs) elif in_key == '3': set_topics(cbs) elif in_key == '4': cbs.SetProperty("Topics", "") elif in_key == '5': get_serving_cell_name(netReg) elif in_key == 'x': sys.exit(1) print('\n' * 2) print_menu() return True if __name__ == "__main__": dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object('org.ofono', '/'), 'org.ofono.Manager') modems = manager.GetModems() path = modems[0][0] cbs = dbus.Interface(bus.get_object('org.ofono', path), 'org.ofono.CellBroadcast') netReg = dbus.Interface(bus.get_object('org.ofono', path), 'org.ofono.NetworkRegistration') cbs.connect_to_signal("PropertyChanged", property_changed) cbs.connect_to_signal("IncomingBroadcast", incoming_broadcast) cbs.connect_to_signal("EmergencyBroadcast", emergency_broadcast) print('\n' * 2) print_menu() GLib.io_add_watch(GLib.IOChannel(filedes=sys.stdin.fileno()), GLib.PRIORITY_DEFAULT, GLib.IO_IN, stdin_handler, cbs, \ netReg) mainloop = GLib.MainLoop() mainloop.run()