Source code for aiosimon_io.installations

# aiosimon_io/installations.py
#
# Copyright (c) 2025 Datakatalyst
#
# This file is part of aiosimon-io.
#
# aiosimon-io is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# aiosimon-io is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with aiosimon-io. If not, see <https://www.gnu.org/licenses/>.

"""
Module for managing installations in the Simon iO system.

This module provides classes and methods to interact with installations,
retrieve their details, and manage associated devices.
"""

from __future__ import annotations

import asyncio
import base64
import logging
from datetime import datetime, timedelta
from typing import ClassVar, Dict, List, Literal, Optional

import aiohttp
from pydantic import BaseModel, Field

from .auth import AbstractAuth
from .const import (
    HUB_ELEMENTS_ENDPOINT,
    HUB_HARDWARE_TOKEN_ENDPOINT,
    INSTALLATIONS_ENDPOINT,
    SNS_ELEMENTS_ENDPOINT,
)
from .devices import Device

logger = logging.getLogger(__name__)


[docs]class Reachable(BaseModel): """Represents the reachability status of an installation.""" type: Literal["WAN", "LAN"] next_check: Optional[datetime] = None
[docs]class Elements(BaseModel): """Represents the elements associated with an installation.""" devices: Dict[str, "Device"] = {}
# experiences: Dict[str, Experience] = {}
[docs]class Installation(BaseModel): """Represents an installation in the Simon iO system. :canonical: aiosimon_io.installations.Installation """ _installations_endpoint: ClassVar[str] = INSTALLATIONS_ENDPOINT _sns_elements_endpoint: ClassVar[str] = SNS_ELEMENTS_ENDPOINT _hub_elements_endpoint: ClassVar[str] = HUB_ELEMENTS_ENDPOINT _api_client: ClassVar[AbstractAuth] id: str name: str icon: str role: Optional[str] = None mode: Literal["managed", "virtual"] status: Literal["up", "down"] username: Optional[str] = None password: Optional[str] = None hardwareToken: Optional[str] = None cleanMac: Optional[str] = None mac: Optional[str] = None apiVersion: Optional[str] = None softwareVersion: Optional[str] = None hardwareType: Optional[str] = None hardwareSubType: Optional[str] = None hardwareVersion: Optional[str] = None lanIp: Optional[str] = None wifiIp: Optional[str] = None port: Optional[int] = None currentNetwork: Optional[str] = None notificationSettings: Optional[dict] = None savedDateTime: Optional[str] = None mqttHost: Optional[str] = None mqttPort: Optional[int] = None mqttTls: Optional[bool] = None mdns: Optional[str] = None location: Optional[dict] = None backups: Optional[List[dict]] = None countryCode: Optional[str] = None elements: Elements = Field(default_factory=Elements) _reachable: Reachable = Reachable(type="WAN", next_check=None) _refresh_after: Optional[datetime] = None _ttl: int = 5 _refresh_future: Optional[asyncio.Future] = None def __init__(self, **data) -> None: """Initialize an Installation instance. Args: **data: Arbitrary keyword arguments representing installation attributes. """ super().__init__(**data) self._fetch_lock = asyncio.Lock()
[docs] @classmethod async def async_get_installations( cls, api_client: AbstractAuth, ttl: int = 5 ) -> List[Installation]: """ Retrieve all installations for the current user asynchronously. Args: api_client (aiosimon_io.auth.AbstractAuth): The API client for authentication. ttl (int): The time-to-live for the cache in seconds. Returns: List[Installation]: A list of installations. """ logger.debug("Getting installations") cls._api_client = api_client cls._ttl = ttl response: dict = await cls._api_client.async_request( "GET", cls._installations_endpoint ) return [cls(**data) for data in response]
[docs] @classmethod async def async_get_installation( cls, api_client: AbstractAuth, id: str, ttl: int = 5 ) -> Installation: """ Retrieve a specific installation by its ID asynchronously. Args: api_client (aiosimon_io.auth.AbstractAuth): The API client for authentication. id (str): The ID of the installation. ttl (int): The time-to-live for the cache in seconds. Returns: Installation: The installation object. """ logger.debug(f"Getting intallation with id {id}") cls._api_client = api_client cls._ttl = ttl response: dict = await cls._api_client.async_request( "GET", f"{cls._installations_endpoint}/{id}" ) return cls(**response)
[docs] async def async_get_devices(self) -> Dict[str, Device]: """ Retrieve all devices for the installation asynchronously. Returns: Dict[str, aiosimon_io.devices.Device]: A dictionary of devices associated with the installation. """ logger.debug(f"Getting devices for installation {self.name}") if self.elements.devices is None or self._refresh(): await self._fetch_elements_from_origin() return self.elements.devices
[docs] async def async_get_device(self, id: str) -> Optional[Device]: """ Retrieve a specific device by its ID asynchronously. Args: id (str): The ID of the device. Returns: Optional[aiosimon_io.devices.Device]: The device object if found, otherwise None. """ logger.debug(f"Getting device {id} for installation {self.name}") if self.elements.devices.get(id) is None or self._refresh(): await self._fetch_elements_from_origin() return self.elements.devices.get(id)
def _get_local_base_url(self) -> str: """ Get the base URL for the installation. Returns: str: The base URL. Raises: ValueError: If the installation is not in 'managed' mode or has invalid LAN IP or port. """ if self.mode != "managed": raise ValueError( f"Base URL is only available for 'managed' installations. Installation {self.name} is in mode '{self.mode}'." ) if not self.lanIp or not self.port: raise ValueError( f"Invalid LAN IP or port for installation {self.name}. LAN IP: {self.lanIp}, Port: {self.port}" ) return f"http://{self.lanIp}:{self.port}" def _get_local_basic_auth(self) -> aiohttp.BasicAuth: """ Get the basic authentication credentials for the installation. Returns: aiohttp.BasicAuth: The basic authentication object. Raises: ValueError: If the installation is not in 'managed' mode or has invalid credentials. """ if self.mode != "managed": raise ValueError( f"Basic Auth is only available for 'managed' installations. Installation {self.name} is in mode '{self.mode}'." ) if not self.username or not self.password: raise ValueError( f"Invalid username or password for installation {self.name}." ) try: decoded_password: str = base64.b64decode(self.password).decode("utf-8") return aiohttp.BasicAuth(self.username, decoded_password) except Exception as e: raise ValueError( f"Error getting Basic Auth credentials for installation {self.name}: {e}" ) def _refresh(self) -> bool: """ Check if the devices need to be refreshed. Returns: bool: True if refresh is needed, otherwise False. """ if self._refresh_after is None or self._refresh_after <= datetime.now(): return True return False async def _fetch_elements_from_origin(self) -> None: """ Fetch the elements from the origin and update the elements dictionary. Raises: Exception: If an error occurs during the fetch process. """ # If there is an ongoing fetch, wait for it to finish if self._refresh_future is not None: logger.debug(f"Waiting for an ongoing fetch for installation {self.id}") await self._refresh_future return # Shield the fetch with a lock async with self._fetch_lock: logger.debug( f"Acquiring 'fetch from origin' lock for installation {self.id}" ) # If the devices don't need to be refreshed, return if not self._refresh(): return # Create a new Future self._refresh_future = asyncio.Future() try: local_path = self._hub_elements_endpoint sns_path = ( f"{self._sns_elements_endpoint.format(installation_id=self.id)}" ) response: dict = await self._async_request_switcher( "GET", local_path, sns_path ) # Create, update or delete devices # 1. Get devices from response devices_from_response = { device["id"]: device for device in response.get("devices", []) } # 2. Update or create devices for device_id, device_data in devices_from_response.items(): if device_id in self.elements.devices: # Update existing device for key, value in device_data.items(): if hasattr(self.elements.devices[device_id], key): setattr(self.elements.devices[device_id], key, value) else: logger.warning( f"{device_id}: Attribute '{key}' does not exist in the Device class. Ignoring it." ) else: # Create new device self.elements.devices[device_id] = Device( **device_data, installation=self ) self.elements.devices[device_id].installation = self # 3. Delete devices that are no longer in the response existing_device_ids = set(self.elements.devices.keys()) response_device_ids = set(devices_from_response.keys()) devices_to_remove = existing_device_ids - response_device_ids for device_id in devices_to_remove: del self.elements.devices[device_id] # Set the Future result self._refresh_future.set_result(None) except Exception as e: logger.error(f"Error fetching elements for installation {self.id}: {e}") # Propagate the exception to the Future self._refresh_future.set_exception(e) raise finally: # Set a new refresh time self._refresh_after = datetime.now() + timedelta(seconds=self._ttl) # Clear the Future self._refresh_future = None async def _async_request_switcher( self, method, local_path, sns_path, **kwargs ) -> dict: """ Switch between local and S&S API requests based on the installation reachability. Args: method (str): The HTTP method (e.g., "GET", "POST"). local_path (str): The local API path. sns_path (str): The S&S API path. **kwargs: Additional arguments for the request. Returns: dict: The response from the API. """ response: dict = {} if await self._async_reachable_by_lan(): logger.debug("Make request to hub by local network") response = await self._api_client.async_request_hub( method=method, url=f"{self._get_local_base_url()}/{local_path}", failover_endpoint=sns_path, auth=self._get_local_basic_auth(), **kwargs, ) else: logger.debug("Make request to hub by S&S API") response = await self._api_client.async_request(method, sns_path, **kwargs) return response async def _async_reachable_by_lan(self) -> bool: """ Check if the installation is reachable by LAN. Returns: bool: True if reachable by LAN, otherwise False. """ if self.mode != "managed": return False if ( self._reachable.next_check is None or self._reachable.next_check <= datetime.now() ): url = self._get_local_base_url() auth = self._get_local_basic_auth() timeout = aiohttp.ClientTimeout(total=0.5) try: token = await self._api_client.async_request_hub( method="GET", url=f"{url}/{HUB_HARDWARE_TOKEN_ENDPOINT}", failover_endpoint=None, auth=auth, timeout=timeout, ) if token == self.hardwareToken: self._reachable.type = "LAN" self._reachable.next_check = datetime.now() + timedelta(days=1) return True else: self._reachable.type = "WAN" self._reachable.next_check = datetime.now() + timedelta(days=1) return False except aiohttp.ClientConnectionError as e: logger.debug(f"Installation {self.name} is unreachable by LAN: {e}") self._reachable.type = "WAN" self._reachable.next_check = datetime.now() + timedelta(days=1) return False except Exception as e: logger.debug( f"Error checking connection for installation {self.name}: {e}" ) self._reachable.type = "WAN" self._reachable.next_check = datetime.now() + timedelta(hours=1) return False else: return True if self._reachable.type == "LAN" else False
Elements.model_rebuild() Installation.model_rebuild() Device.model_rebuild()