mirror of git://git.sysmocom.de/ofono
155 lines
3.9 KiB
Python
Executable File
155 lines
3.9 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
import gobject
|
|
|
|
import dbus
|
|
import dbus.mainloop.glib
|
|
|
|
_dbus2py = {
|
|
dbus.String : unicode,
|
|
dbus.UInt32 : int,
|
|
dbus.Int32 : int,
|
|
dbus.Int16 : int,
|
|
dbus.UInt16 : int,
|
|
dbus.UInt64 : int,
|
|
dbus.Int64 : int,
|
|
dbus.Byte : int,
|
|
dbus.Boolean : bool,
|
|
dbus.ByteArray : str,
|
|
dbus.ObjectPath : str
|
|
}
|
|
|
|
def dbus2py(d):
|
|
t = type(d)
|
|
if t in _dbus2py:
|
|
return _dbus2py[t](d)
|
|
if t is dbus.Dictionary:
|
|
return dict([(dbus2py(k), dbus2py(v)) for k, v in d.items()])
|
|
if t is dbus.Array and d.signature == "y":
|
|
return "".join([chr(b) for b in d])
|
|
if t is dbus.Array or t is list:
|
|
return [dbus2py(v) for v in d]
|
|
if t is dbus.Struct or t is tuple:
|
|
return tuple([dbus2py(v) for v in d])
|
|
return d
|
|
|
|
def pretty(d):
|
|
d = dbus2py(d)
|
|
t = type(d)
|
|
|
|
if t in (dict, tuple, list) and len(d) > 0:
|
|
if t is dict:
|
|
d = ", ".join(["%s = %s" % (k, pretty(v))
|
|
for k, v in d.items()])
|
|
return "{ %s }" % d
|
|
|
|
d = " ".join([pretty(e) for e in d])
|
|
|
|
if t is tuple:
|
|
return "( %s )" % d
|
|
|
|
if t is str:
|
|
return "%s" % d
|
|
|
|
return str(d)
|
|
|
|
def property_changed(name, value, path, interface):
|
|
iface = interface[interface.rfind(".") + 1:]
|
|
print "{%s} [%s] %s = %s" % (iface, path, name, pretty(value))
|
|
|
|
def added(name, value, member, path, interface):
|
|
iface = interface[interface.rfind(".") + 1:]
|
|
print "{%s} [%s] %s %s" % (iface, member, name, pretty(value))
|
|
|
|
def removed(name, member, path, interface):
|
|
iface = interface[interface.rfind(".") + 1:]
|
|
print "{%s} [%s] %s" % (iface, name, member)
|
|
|
|
def event(member, path, interface):
|
|
iface = interface[interface.rfind(".") + 1:]
|
|
print "{%s} [%s] %s" % (iface, path, member)
|
|
|
|
def message(msg, args, member, path, interface):
|
|
iface = interface[interface.rfind(".") + 1:]
|
|
print "{%s} [%s] %s %s (%s)" % (iface, path, member,
|
|
msg, pretty(args))
|
|
|
|
def ussd(msg, member, path, interface):
|
|
iface = interface[interface.rfind(".") + 1:]
|
|
print "{%s} [%s] %s %s" % (iface, path, member, msg)
|
|
|
|
def value(value, member, path, interface):
|
|
iface = interface[interface.rfind(".") + 1:]
|
|
print "{%s} [%s] %s %s" % (iface, path, member, str(value))
|
|
|
|
if __name__ == '__main__':
|
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
|
|
bus = dbus.SystemBus()
|
|
|
|
bus.add_signal_receiver(property_changed,
|
|
bus_name="org.ofono",
|
|
signal_name = "PropertyChanged",
|
|
path_keyword="path",
|
|
interface_keyword="interface")
|
|
|
|
for member in ["IncomingBarringInEffect",
|
|
"OutgoingBarringInEffect",
|
|
"NearMaximumWarning"]:
|
|
bus.add_signal_receiver(event,
|
|
bus_name="org.ofono",
|
|
signal_name = member,
|
|
member_keyword="member",
|
|
path_keyword="path",
|
|
interface_keyword="interface")
|
|
|
|
for member in ["ModemAdded",
|
|
"ContextAdded",
|
|
"CallAdded",
|
|
"MessageAdded"]:
|
|
bus.add_signal_receiver(added,
|
|
bus_name="org.ofono",
|
|
signal_name = member,
|
|
member_keyword="member",
|
|
path_keyword="path",
|
|
interface_keyword="interface")
|
|
|
|
for member in ["ModemRemoved",
|
|
"ContextRemoved",
|
|
"CallRemoved",
|
|
"MessageRemoved"]:
|
|
bus.add_signal_receiver(removed,
|
|
bus_name="org.ofono",
|
|
signal_name = member,
|
|
member_keyword="member",
|
|
path_keyword="path",
|
|
interface_keyword="interface")
|
|
|
|
for member in ["DisconnectReason", "Forwarded", "BarringActive"]:
|
|
bus.add_signal_receiver(value,
|
|
bus_name="org.ofono",
|
|
signal_name = member,
|
|
member_keyword="member",
|
|
path_keyword="path",
|
|
interface_keyword="interface")
|
|
|
|
for member in ["IncomingBroadcast", "EmergencyBroadcast",
|
|
"IncomingMessage", "ImmediateMessage"]:
|
|
bus.add_signal_receiver(message,
|
|
bus_name="org.ofono",
|
|
signal_name = member,
|
|
member_keyword="member",
|
|
path_keyword="path",
|
|
interface_keyword="interface")
|
|
|
|
for member in ["NotificationReceived", "RequestReceived"]:
|
|
bus.add_signal_receiver(ussd,
|
|
bus_name="org.ofono",
|
|
signal_name = member,
|
|
member_keyword="member",
|
|
path_keyword="path",
|
|
interface_keyword="interface")
|
|
|
|
mainloop = gobject.MainLoop()
|
|
mainloop.run()
|