diff --git a/pyoverkiz/models.py b/pyoverkiz/models.py index 624a4331..02732307 100644 --- a/pyoverkiz/models.py +++ b/pyoverkiz/models.py @@ -147,29 +147,73 @@ def __init__( self.dusk_offset = dusk_offset +@define(init=False, kw_only=True) +class DeviceIdentifier: + """Parsed components from a device URL.""" + + protocol: Protocol + gateway_id: str = field(repr=obfuscate_id) + device_address: str = field(repr=obfuscate_id) + subsystem_id: int | None = None + base_device_url: str = field(repr=obfuscate_id, init=False) + + def __init__( + self, + *, + protocol: Protocol, + gateway_id: str, + device_address: str, + subsystem_id: int | None = None, + ) -> None: + """Initialize DeviceIdentifier with required URL components.""" + self.protocol = protocol + self.gateway_id = gateway_id + self.device_address = device_address + self.subsystem_id = subsystem_id + self.base_device_url = f"{protocol}://{gateway_id}/{device_address}" + + @property + def is_sub_device(self) -> bool: + """Return True if this identifier represents a sub-device (subsystem_id > 1).""" + return self.subsystem_id is not None and self.subsystem_id > 1 + + @classmethod + def from_device_url(cls, device_url: str) -> DeviceIdentifier: + """Parse a device URL into its structured identifier components.""" + match = re.search(DEVICE_URL_RE, device_url) + if not match: + raise ValueError(f"Invalid device URL: {device_url}") + + subsystem_id = ( + int(match.group("subsystemId")) if match.group("subsystemId") else None + ) + + return cls( + protocol=Protocol(match.group("protocol")), + gateway_id=match.group("gatewayId"), + device_address=match.group("deviceAddress"), + subsystem_id=subsystem_id, + ) + + @define(init=False, kw_only=True) class Device: """Representation of a device in the setup including parsed fields and states.""" - id: str = field(repr=False) attributes: States available: bool enabled: bool label: str = field(repr=obfuscate_string) device_url: str = field(repr=obfuscate_id) - gateway_id: str | None = field(repr=obfuscate_id) - device_address: str | None = field(repr=obfuscate_id) - subsystem_id: int | None = None - is_sub_device: bool = False controllable_name: str definition: Definition data_properties: list[dict[str, Any]] | None = None - widget: UIWidget - ui_class: UIClass states: States type: ProductType place_oid: str | None = None - protocol: Protocol | None = field(init=False, repr=False) + identifier: DeviceIdentifier = field(init=False, repr=False) + _ui_class: UIClass | None = field(init=False, repr=False) + _widget: UIWidget | None = field(init=False, repr=False) def __init__( self, @@ -183,7 +227,6 @@ def __init__( definition: dict[str, Any], data_properties: list[dict[str, Any]] | None = None, widget: str | None = None, - widget_name: str | None = None, ui_class: str | None = None, states: list[dict[str, Any]] | None = None, type: int, @@ -191,7 +234,6 @@ def __init__( **_: Any, ) -> None: """Initialize Device and parse URL, protocol and nested definitions.""" - self.id = device_url self.attributes = States(attributes) self.available = available self.definition = Definition(**definition) @@ -204,33 +246,54 @@ def __init__( self.type = ProductType(type) self.place_oid = place_oid - self.protocol = None - self.gateway_id = None - self.device_address = None - self.subsystem_id = None - self.is_sub_device = False + self.identifier = DeviceIdentifier.from_device_url(device_url) - # Split :///[#] into multiple variables - match = re.search(DEVICE_URL_RE, device_url) + self._ui_class = UIClass(ui_class) if ui_class else None + self._widget = UIWidget(widget) if widget else None + + @property + def ui_class(self) -> UIClass: + """Return the UI class, falling back to the definition if available.""" + if self._ui_class is not None: + return self._ui_class + if self.definition.ui_class: + return UIClass(self.definition.ui_class) + raise ValueError(f"Device {self.device_url} has no UI class defined") + + @property + def widget(self) -> UIWidget: + """Return the widget, falling back to the definition if available.""" + if self._widget is not None: + return self._widget + if self.definition.widget_name: + return UIWidget(self.definition.widget_name) + raise ValueError(f"Device {self.device_url} has no widget defined") + + def get_supported_command_name( + self, commands: list[str | OverkizCommand] + ) -> str | None: + """Return the first command name that exists in this device's definition.""" + return self.definition.commands.select(commands) + + def has_supported_command(self, commands: list[str | OverkizCommand]) -> bool: + """Return True if any of the given commands exist in this device's definition.""" + return self.definition.commands.has_any(commands) - if match: - self.protocol = Protocol(match.group("protocol")) - self.gateway_id = match.group("gatewayId") - self.device_address = match.group("deviceAddress") + def get_state_value(self, states: list[str]) -> StateType | None: + """Return the value of the first state that exists with a non-None value.""" + return self.states.select_value(states) - if match.group("subsystemId"): - self.subsystem_id = int(match.group("subsystemId")) - self.is_sub_device = self.subsystem_id > 1 + def has_state_value(self, states: list[str]) -> bool: + """Return True if any of the given states exist with a non-None value.""" + return self.states.has_any(states) - if ui_class: - self.ui_class = UIClass(ui_class) - elif self.definition.ui_class: - self.ui_class = UIClass(self.definition.ui_class) + def get_state_definition(self, states: list[str]) -> StateDefinition | None: + """Return the first StateDefinition that matches, from the device definition.""" + return self.definition.get_state_definition(states) - if widget: - self.widget = UIWidget(widget) - elif self.definition.widget_name: - self.widget = UIWidget(self.definition.widget_name) + def get_attribute_value(self, attributes: list[str]) -> StateType: + """Return the value of the first attribute that exists with a non-None value.""" + return self.attributes.select_value(attributes) @define(init=False, kw_only=True) @@ -286,6 +349,18 @@ def __init__( self.ui_class = ui_class self.qualified_name = qualified_name + def get_state_definition(self, states: list[str]) -> StateDefinition | None: + """Return the first StateDefinition whose `qualified_name` matches, or None.""" + states_set = set(states) + for state_def in self.states: + if state_def.qualified_name in states_set: + return state_def + return None + + def has_state_definition(self, states: list[str]) -> bool: + """Return True if any of the given state definitions exist.""" + return self.get_state_definition(states) is not None + @define(init=False, kw_only=True) class CommandDefinition: @@ -328,6 +403,16 @@ def __len__(self) -> int: get = __getitem__ + def select(self, commands: list[str | OverkizCommand]) -> str | None: + """Return the first command name that exists in this definition, or None.""" + return next( + (str(command) for command in commands if str(command) in self), None + ) + + def has_any(self, commands: list[str | OverkizCommand]) -> bool: + """Return True if any of the given commands exist in this definition.""" + return self.select(commands) is not None + @define(init=False, kw_only=True) class State: @@ -466,6 +551,23 @@ def __len__(self) -> int: get = __getitem__ + def select(self, names: list[str]) -> State | None: + """Return the first State that exists and has a non-None value, or None.""" + for name in names: + if (state := self[name]) and state.value is not None: + return state + return None + + def select_value(self, names: list[str]) -> StateType: + """Return the value of the first State that exists with a non-None value.""" + if state := self.select(names): + return state.value + return None + + def has_any(self, names: list[str]) -> bool: + """Return True if any of the given state names exist with a non-None value.""" + return self.select(names) is not None + @define(init=False, kw_only=True) class Command: diff --git a/tests/test_client.py b/tests/test_client.py index 0276d3f4..b6f3edce 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -276,9 +276,9 @@ async def test_get_setup( assert len(setup.gateways) == gateway_count for device in setup.devices: - assert device.gateway_id - assert device.device_address - assert device.protocol + assert device.identifier.gateway_id + assert device.identifier.device_address + assert device.identifier.protocol @pytest.mark.parametrize( "fixture_name", diff --git a/tests/test_models.py b/tests/test_models.py index 5f76c32f..5b3c228d 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -6,7 +6,13 @@ import pytest from pyoverkiz.enums import DataType, Protocol -from pyoverkiz.models import Device, State, States +from pyoverkiz.models import ( + CommandDefinitions, + Definition, + Device, + State, + States, +) RAW_STATES = [ {"name": "core:NameState", "type": 3, "value": "alarm name"}, @@ -151,23 +157,6 @@ class TestDevice: 1, False, ), - # Wrong device urls: - ( - "foo://whatever-blah/12", - Protocol.UNKNOWN, - "whatever-blah", - "12", - None, - False, - ), - ( - "foo://whatever", - None, - None, - None, - None, - False, - ), ], ) def test_base_url_parsing( @@ -187,11 +176,22 @@ def test_base_url_parsing( hump_device = humps.decamelize(test_device) device = Device(**hump_device) - assert device.protocol == protocol - assert device.gateway_id == gateway_id - assert device.device_address == device_address - assert device.subsystem_id == subsystem_id - assert device.is_sub_device == is_sub_device + assert device.identifier.protocol == protocol + assert device.identifier.gateway_id == gateway_id + assert device.identifier.device_address == device_address + assert device.identifier.subsystem_id == subsystem_id + assert device.identifier.is_sub_device == is_sub_device + + def test_invalid_device_url_raises(self): + """Invalid device URLs should raise during identifier parsing.""" + test_device = { + **RAW_DEVICES, + **{"deviceURL": "foo://whatever"}, + } + hump_device = humps.decamelize(test_device) + + with pytest.raises(ValueError, match="Invalid device URL"): + Device(**hump_device) def test_none_states(self): """Devices without a `states` field should provide an empty States object.""" @@ -200,6 +200,90 @@ def test_none_states(self): device = Device(**hump_device) assert not device.states.get(STATE) + def test_get_supported_command_name(self): + """Device.get_supported_command_name() delegates to commands.select().""" + hump_device = humps.decamelize(RAW_DEVICES) + device = Device(**hump_device) + assert ( + device.get_supported_command_name(["nonexistent", "open", "close"]) + == "open" + ) + assert device.get_supported_command_name(["nonexistent"]) is None + + def test_has_supported_command(self): + """Device.has_supported_command() delegates to commands.has_any().""" + hump_device = humps.decamelize(RAW_DEVICES) + device = Device(**hump_device) + assert device.has_supported_command(["nonexistent", "open"]) + assert not device.has_supported_command(["nonexistent"]) + + def test_get_state_value(self): + """Device.get_state_value() returns the value of the first matching state.""" + hump_device = humps.decamelize(RAW_DEVICES) + device = Device(**hump_device) + value = device.get_state_value(["nonexistent", "core:ClosureState"]) + assert value == 100 + + def test_has_state_value(self): + """Device.has_state_value() returns True if any state exists with non-None value.""" + hump_device = humps.decamelize(RAW_DEVICES) + device = Device(**hump_device) + assert device.has_state_value(["nonexistent", "core:ClosureState"]) + assert not device.has_state_value(["nonexistent"]) + + def test_get_state_definition(self): + """Device.get_state_definition() returns the first matching StateDefinition.""" + hump_device = humps.decamelize(RAW_DEVICES) + device = Device(**hump_device) + state_def = device.get_state_definition(["nonexistent", "core:ClosureState"]) + assert state_def is not None + assert state_def.qualified_name == "core:ClosureState" + + def test_get_attribute_value_returns_first_match(self): + """Device.get_attribute_value() returns the value of the first matching attribute.""" + test_device = { + **RAW_DEVICES, + "attributes": [ + {"name": "core:Manufacturer", "type": 3, "value": "VELUX"}, + {"name": "core:Model", "type": 3, "value": "WINDOW 100"}, + ], + } + hump_device = humps.decamelize(test_device) + device = Device(**hump_device) + value = device.get_attribute_value( + ["nonexistent", "core:Model", "core:Manufacturer"] + ) + assert value == "WINDOW 100" + + def test_get_attribute_value_returns_none_when_no_match(self): + """Device.get_attribute_value() returns None when no attribute matches.""" + hump_device = humps.decamelize(RAW_DEVICES) + device = Device(**hump_device) + value = device.get_attribute_value(["nonexistent", "also_nonexistent"]) + assert value is None + + def test_get_attribute_value_empty_attributes(self): + """Device.get_attribute_value() returns None for devices with no attributes.""" + test_device = {**RAW_DEVICES, "attributes": []} + hump_device = humps.decamelize(test_device) + device = Device(**hump_device) + value = device.get_attribute_value(["core:Manufacturer"]) + assert value is None + + def test_get_attribute_value_with_none_values(self): + """Device.get_attribute_value() skips attributes with None values.""" + test_device = { + **RAW_DEVICES, + "attributes": [ + {"name": "core:Model", "type": 3, "value": None}, + {"name": "core:Manufacturer", "type": 3, "value": "VELUX"}, + ], + } + hump_device = humps.decamelize(test_device) + device = Device(**hump_device) + value = device.get_attribute_value(["core:Model", "core:Manufacturer"]) + assert value == "VELUX" + class TestStates: """Tests for the States container behaviour and getter semantics.""" @@ -231,6 +315,140 @@ def test_getter_missing(self): state = states.get("FooState") assert not state + def test_select_returns_first_match(self): + """select() returns the first state with a non-None value.""" + states = States(RAW_STATES) + state = states.select( + ["nonexistent", "core:NameState", "internal:AlarmDelayState"] + ) + assert state is not None + assert state.name == "core:NameState" + + def test_select_returns_none_when_no_match(self): + """select() returns None when no state matches.""" + states = States(RAW_STATES) + assert states.select(["nonexistent", "also_nonexistent"]) is None + + def test_select_value_returns_first_value(self): + """select_value() returns the value of the first matching state.""" + states = States(RAW_STATES) + value = states.select_value(["nonexistent", "core:NameState"]) + assert value == "alarm name" + + def test_select_value_returns_none_when_no_match(self): + """select_value() returns None when no state matches.""" + states = States(RAW_STATES) + assert states.select_value(["nonexistent"]) is None + + def test_has_any_true(self): + """has_any() returns True when at least one state exists.""" + states = States(RAW_STATES) + assert states.has_any(["nonexistent", "core:NameState"]) + + def test_has_any_false(self): + """has_any() returns False when no state exists.""" + states = States(RAW_STATES) + assert not states.has_any(["nonexistent", "also_nonexistent"]) + + +class TestCommandDefinitions: + """Tests for CommandDefinitions container and helper methods.""" + + def test_select_returns_first_match(self): + """select() returns the first command name that exists.""" + cmds = CommandDefinitions( + [ + {"command_name": "close", "nparams": 0}, + {"command_name": "open", "nparams": 0}, + {"command_name": "setPosition", "nparams": 1}, + ] + ) + assert cmds.select(["nonexistent", "open", "close"]) == "open" + + def test_select_returns_none_when_no_match(self): + """select() returns None when no command matches.""" + cmds = CommandDefinitions([{"command_name": "close", "nparams": 0}]) + assert cmds.select(["nonexistent", "also_nonexistent"]) is None + + def test_has_any_true(self): + """has_any() returns True when at least one command exists.""" + cmds = CommandDefinitions([{"command_name": "close", "nparams": 0}]) + assert cmds.has_any(["nonexistent", "close"]) + + def test_has_any_false(self): + """has_any() returns False when no command matches.""" + cmds = CommandDefinitions([{"command_name": "close", "nparams": 0}]) + assert not cmds.has_any(["nonexistent", "also_nonexistent"]) + + +class TestDefinition: + """Tests for Definition model and its helper methods.""" + + def test_get_state_definition_returns_first_match(self): + """get_state_definition() returns the first StateDefinition in definition.states. + + The definition is matched by `qualified_name` against the input list. + """ + definition = Definition( + commands=[], + states=[ + {"qualified_name": "core:ClosureState", "type": "ContinuousState"}, + { + "qualified_name": "core:TargetClosureState", + "type": "ContinuousState", + }, + ], + ) + # Iterates definition.states in order, returns first match found + state_def = definition.get_state_definition( + ["core:TargetClosureState", "core:ClosureState"] + ) + assert state_def is not None + # core:ClosureState appears first in definition.states, so it's returned + assert state_def.qualified_name == "core:ClosureState" + + # Only asking for TargetClosureState works + state_def2 = definition.get_state_definition(["core:TargetClosureState"]) + assert state_def2 is not None + assert state_def2.qualified_name == "core:TargetClosureState" + + def test_get_state_definition_returns_none_when_no_match(self): + """get_state_definition() returns None when no state definition matches.""" + definition = Definition(commands=[], states=[]) + assert definition.get_state_definition(["nonexistent"]) is None + + def test_has_state_definition_returns_true(self): + """has_state_definition() returns True when a state definition matches.""" + definition = Definition( + commands=[], + states=[ + {"qualified_name": "core:ClosureState", "type": "ContinuousState"}, + { + "qualified_name": "core:TargetClosureState", + "type": "ContinuousState", + }, + ], + ) + assert definition.has_state_definition(["core:ClosureState"]) + assert definition.has_state_definition( + ["nonexistent", "core:TargetClosureState"] + ) + + def test_has_state_definition_returns_false(self): + """has_state_definition() returns False when no state definition matches.""" + definition = Definition( + commands=[], + states=[ + {"qualified_name": "core:ClosureState", "type": "ContinuousState"}, + ], + ) + assert not definition.has_state_definition(["nonexistent", "also_nonexistent"]) + + def test_has_state_definition_empty_states(self): + """has_state_definition() returns False for definitions with no states.""" + definition = Definition(commands=[], states=[]) + assert not definition.has_state_definition(["core:ClosureState"]) + class TestState: """Unit tests for State value accessors and type validation."""