import asyncio import inspect import logging import re import xml.etree.ElementTree as ET from dataclasses import dataclass from functools import lru_cache from typing import Callable, Coroutine, Dict, List, Optional, Type, Union from . import introspection as intr from . import message_bus from ._private.util import replace_idx_with_fds from .constants import ErrorType, MessageType from .errors import DBusError, InterfaceNotFoundError from .message import Message from .unpack import unpack_variants as unpack from .validators import assert_bus_name_valid, assert_object_path_valid @dataclass class SignalHandler: """Signal handler.""" fn: Callable unpack_variants: bool class BaseProxyInterface: """An abstract class representing a proxy to an interface exported on the bus by another client. Implementations of this class are not meant to be constructed directly by users. Use :func:`BaseProxyObject.get_interface` to get a proxy interface. Each message bus implementation provides its own proxy interface implementation that will be returned by that method. Proxy interfaces can be used to call methods, get properties, and listen to signals on the interface. Proxy interfaces are created dynamically with a family of methods for each of these operations based on what members the interface exposes. Each proxy interface implementation exposes these members in a different way depending on the features of the backend. See the documentation of the proxy interface implementation you use for more details. :ivar bus_name: The name of the bus this interface is exported on. :vartype bus_name: str :ivar path: The object path exported on the client that owns the bus name. :vartype path: str :ivar introspection: Parsed introspection data for the proxy interface. :vartype introspection: :class:`Node <dbus_fast.introspection.Interface>` :ivar bus: The message bus this proxy interface is connected to. :vartype bus: :class:`BaseMessageBus <dbus_fast.message_bus.BaseMessageBus>` """ def __init__( self, bus_name: str, path: str, introspection: intr.Interface, bus: "message_bus.BaseMessageBus", ) -> None: self.bus_name = bus_name self.path = path self.introspection = introspection self.bus = bus self._signal_handlers: Dict[str, List[SignalHandler]] = {} self._signal_match_rule = f"type='signal',sender={bus_name},interface={introspection.name},path={path}" _underscorer1 = re.compile(r"(.)([A-Z][a-z]+)") _underscorer2 = re.compile(r"([a-z0-9])([A-Z])") @staticmethod @lru_cache(maxsize=128) def _to_snake_case(member: str) -> str: subbed = BaseProxyInterface._underscorer1.sub(r"\1_\2", member) return BaseProxyInterface._underscorer2.sub(r"\1_\2", subbed).lower() @staticmethod def _check_method_return(msg: Message, signature: Optional[str] = None): if msg.message_type == MessageType.ERROR: raise DBusError._from_message(msg) elif msg.message_type != MessageType.METHOD_RETURN: raise DBusError( ErrorType.CLIENT_ERROR, "method call didnt return a method return", msg ) elif signature is not None and msg.signature != signature: raise DBusError( ErrorType.CLIENT_ERROR, f'method call returned unexpected signature: "{msg.signature}"', msg, ) def _add_method(self, intr_method: intr.Method) -> None: raise NotImplementedError("this must be implemented in the inheriting class") def _add_property(self, intr_property: intr.Property) -> None: raise NotImplementedError("this must be implemented in the inheriting class") def _message_handler(self, msg: Message) -> None: if ( msg.message_type != MessageType.SIGNAL or msg.interface != self.introspection.name or msg.path != self.path or msg.member not in self._signal_handlers ): return if ( msg.sender != self.bus_name and self.bus._name_owners.get(self.bus_name, "") != msg.sender ): # The sender is always a unique name, but the bus name given might # be a well known name. If the sender isn't an exact match, check # to see if it owns the bus_name we were given from the cache kept # on the bus for this purpose. return match = [s for s in self.introspection.signals if s.name == msg.member] if not len(match): return intr_signal = match[0] if intr_signal.signature != msg.signature: logging.warning( f'got signal "{self.introspection.name}.{msg.member}" with unexpected signature "{msg.signature}"' ) return body = replace_idx_with_fds(msg.signature, msg.body, msg.unix_fds) no_sig = None for handler in self._signal_handlers[msg.member]: if handler.unpack_variants: if not no_sig: no_sig = unpack(body) data = no_sig else: data = body cb_result = handler.fn(*data) if isinstance(cb_result, Coroutine): asyncio.create_task(cb_result) def _add_signal(self, intr_signal: intr.Signal, interface: intr.Interface) -> None: def on_signal_fn(fn: Callable, *, unpack_variants: bool = False): fn_signature = inspect.signature(fn) if 0 < len( [ par for par in fn_signature.parameters.values() if par.kind == inspect.Parameter.KEYWORD_ONLY and par.default == inspect.Parameter.empty ] ): raise TypeError( "reply_notify cannot have required keyword only parameters" ) positional_params = [ par.kind for par in fn_signature.parameters.values() if par.kind not in [inspect.Parameter.KEYWORD_ONLY, inspect.Parameter.VAR_KEYWORD] ] if len(positional_params) != len(intr_signal.args) and ( inspect.Parameter.VAR_POSITIONAL not in positional_params or len(positional_params) - 1 > len(intr_signal.args) ): raise TypeError( f"reply_notify must be a function with {len(intr_signal.args)} positional parameters" ) if not self._signal_handlers: self.bus._add_match_rule(self._signal_match_rule) self.bus.add_message_handler(self._message_handler) if intr_signal.name not in self._signal_handlers: self._signal_handlers[intr_signal.name] = [] self._signal_handlers[intr_signal.name].append( SignalHandler(fn, unpack_variants) ) def off_signal_fn(fn: Callable, *, unpack_variants: bool = False) -> None: try: i = self._signal_handlers[intr_signal.name].index( SignalHandler(fn, unpack_variants) ) del self._signal_handlers[intr_signal.name][i] if not self._signal_handlers[intr_signal.name]: del self._signal_handlers[intr_signal.name] except (KeyError, ValueError): return if not self._signal_handlers: self.bus._remove_match_rule(self._signal_match_rule) self.bus.remove_message_handler(self._message_handler) snake_case = BaseProxyInterface._to_snake_case(intr_signal.name) setattr(interface, f"on_{snake_case}", on_signal_fn) setattr(interface, f"off_{snake_case}", off_signal_fn) class BaseProxyObject: """An abstract class representing a proxy to an object exported on the bus by another client. Implementations of this class are not meant to be constructed directly. Use :func:`BaseMessageBus.get_proxy_object() <dbus_fast.message_bus.BaseMessageBus.get_proxy_object>` to get a proxy object. Each message bus implementation provides its own proxy object implementation that will be returned by that method. The primary use of the proxy object is to select a proxy interface to act on. Information on what interfaces are available is provided by introspection data provided to this class. This introspection data can either be included in your project as an XML file (recommended) or retrieved from the ``org.freedesktop.DBus.Introspectable`` interface at runtime. :ivar bus_name: The name of the bus this object is exported on. :vartype bus_name: str :ivar path: The object path exported on the client that owns the bus name. :vartype path: str :ivar introspection: Parsed introspection data for the proxy object. :vartype introspection: :class:`Node <dbus_fast.introspection.Node>` :ivar bus: The message bus this proxy object is connected to. :vartype bus: :class:`BaseMessageBus <dbus_fast.message_bus.BaseMessageBus>` :ivar ~.ProxyInterface: The proxy interface class this proxy object uses. :vartype ~.ProxyInterface: Type[:class:`BaseProxyInterface <dbus_fast.proxy_object.BaseProxyObject>`] :ivar child_paths: A list of absolute object paths of the children of this object. :vartype child_paths: list(str) :raises: - :class:`InvalidBusNameError <dbus_fast.InvalidBusNameError>` - If the given bus name is not valid. - :class:`InvalidObjectPathError <dbus_fast.InvalidObjectPathError>` - If the given object path is not valid. - :class:`InvalidIntrospectionError <dbus_fast.InvalidIntrospectionError>` - If the introspection data for the node is not valid. """ def __init__( self, bus_name: str, path: str, introspection: Union[intr.Node, str, ET.Element], bus: "message_bus.BaseMessageBus", ProxyInterface: Type[BaseProxyInterface], ) -> None: assert_object_path_valid(path) assert_bus_name_valid(bus_name) if not isinstance(bus, message_bus.BaseMessageBus): raise TypeError("bus must be an instance of BaseMessageBus") if not issubclass(ProxyInterface, BaseProxyInterface): raise TypeError("ProxyInterface must be an instance of BaseProxyInterface") if type(introspection) is intr.Node: self.introspection = introspection elif type(introspection) is str: self.introspection = intr.Node.parse(introspection) elif type(introspection) is ET.Element: self.introspection = intr.Node.from_xml(introspection) else: raise TypeError( "introspection must be xml node introspection or introspection.Node class" ) self.bus_name = bus_name self.path = path self.bus = bus self.ProxyInterface = ProxyInterface self.child_paths = [f"{path}/{n.name}" for n in self.introspection.nodes] self._interfaces = {} # lazy loaded by get_children() self._children = None def get_interface(self, name: str) -> BaseProxyInterface: """Get an interface exported on this proxy object and connect it to the bus. :param name: The name of the interface to retrieve. :type name: str :raises: - :class:`InterfaceNotFoundError <dbus_fast.InterfaceNotFoundError>` - If there is no interface by this name exported on the bus. """ if name in self._interfaces: return self._interfaces[name] try: intr_interface = next( i for i in self.introspection.interfaces if i.name == name ) except StopIteration: raise InterfaceNotFoundError(f"interface not found on this object: {name}") interface = self.ProxyInterface( self.bus_name, self.path, intr_interface, self.bus ) for intr_method in intr_interface.methods: interface._add_method(intr_method) for intr_property in intr_interface.properties: interface._add_property(intr_property) for intr_signal in intr_interface.signals: interface._add_signal(intr_signal, interface) def get_owner_notify(msg: Message, err: Optional[Exception]) -> None: if err: logging.error(f'getting name owner for "{name}" failed, {err}') return if msg.message_type == MessageType.ERROR: if msg.error_name != ErrorType.NAME_HAS_NO_OWNER.value: logging.error( f'getting name owner for "{name}" failed, {msg.body[0]}' ) return self.bus._name_owners[self.bus_name] = msg.body[0] if self.bus_name[0] != ":" and not self.bus._name_owners.get(self.bus_name, ""): self.bus._call( Message( destination="org.freedesktop.DBus", interface="org.freedesktop.DBus", path="/org/freedesktop/DBus", member="GetNameOwner", signature="s", body=[self.bus_name], ), get_owner_notify, ) self._interfaces[name] = interface return interface def get_children(self) -> List["BaseProxyObject"]: """Get the child nodes of this proxy object according to the introspection data.""" if self._children is None: self._children = [ self.__class__(self.bus_name, self.path, child, self.bus) for child in self.introspection.nodes ] return self._children