Source code for aiosimon_io.auth

# aiosimon_io/auth.py
#
# Copyright (c) 2025 Datakatalyst
#
# This file is part of aiosimon-io.
#
# aiosimon-io is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# aiosimon-io is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with aiosimon-io. If not, see <https://www.gnu.org/licenses/>.

"""
Module for handling authentication and making authenticated requests.

This module provides an abstract base class and a concrete implementation
for managing authentication with the Simon Cloud API.
"""

from __future__ import annotations

import datetime
import logging
from abc import ABC, abstractmethod
from typing import Dict, Optional

import aiohttp

from .const import AUTH_BASE_URL, SNS_BASE_URL

logger = logging.getLogger(__name__)


[docs]class AbstractAuth(ABC): """Abstract base class for making authenticated requests. This class provides a framework for handling authentication and making HTTP requests with an access token. :canonical: aiosimon_io.auth.AbstractAuth """ def __init__(self, websession: aiohttp.ClientSession, host: str): """Initialize the AbstractAuth instance. Args: websession (aiohttp.ClientSession): The aiohttp session to use for requests. host (str): The base URL for the API. """ self.session = websession self.sns_url = host
[docs] @abstractmethod async def async_get_access_token(self) -> str: """Retrieve a valid access token. Returns: str: A valid access token. """
[docs] async def async_request(self, method, endpoint, **kwargs) -> dict: """Make an authenticated HTTP request. Args: method (str): The HTTP method (e.g., 'GET', 'POST'). endpoint (str): The API endpoint to call. **kwargs: Additional arguments to pass to the request. Returns: dict: The JSON response from the API. Raises: aiohttp.ClientConnectionError: If a connection error occurs. aiohttp.ClientResponseError: If the response contains an error. aiohttp.ClientError: For other client-related errors. Exception: For unexpected errors. """ try: token: str = await self.async_get_access_token() headers: Dict[str, str] = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } url: str = f"{self.sns_url}/{endpoint}" logger.debug(f"Making SNS request: {method} {url}") async with self.session.request( method, url, headers=headers, **kwargs ) as response: response.raise_for_status() logger.debug(f"SNS request successful. Status: {response.status}") return await response.json() except aiohttp.ClientConnectionError as e: logger.error(f"Connection error: {e}") raise except aiohttp.ClientResponseError as e: logger.error(f"Response error: {e}") raise except aiohttp.ClientError as e: logger.error(f"Client error: {e}") raise except Exception as e: logger.error(f"Unexpected error: {e}") raise
[docs] async def async_request_hub( self, method, url, failover_endpoint, auth, **kwargs ) -> dict: """Make a request to the hub, with failover support. Args: method (str): The HTTP method (e.g., 'GET', 'POST'). url (str): The primary URL for the request. failover_endpoint (str): The failover endpoint to use if the primary request fails. auth: Authentication information for the request. **kwargs: Additional arguments to pass to the request. Returns: dict: The JSON response from the API. Raises: Exception: If both the primary and failover requests fail. """ try: headers: Dict[str, str] = {"Content-Type": "application/json"} logger.debug(f"Making hub request: {method} {url}") async with self.session.request( method, url, headers=headers, auth=auth, **kwargs ) as response: response.raise_for_status() logger.debug(f"Hub request successful. Status: {response.status}") return await response.json() except Exception as e: logger.debug(f"Getting error in hub local request: {e}") if failover_endpoint: logger.debug(f"Failing over to {failover_endpoint} by S&S API.") try: return await self.async_request(method, failover_endpoint, **kwargs) except Exception as e: logger.error(f"Error in hub failover request: {e}") raise else: raise
[docs]class SimonAuth(AbstractAuth): """Handle authentication with the Simon Cloud API. This class manages access and refresh tokens for interacting with the Simon Cloud API. :canonical: aiosimon_io.auth.SimonAuth """ def __init__( self, client_id: str, client_secret: str, username: str, password: str, session: aiohttp.ClientSession, ) -> None: """Initialize the SimonAuth instance. Args: client_id (str): The client ID for the application. client_secret (str): The client secret for the application. username (str): The username for authentication. password (str): The password for authentication. session (aiohttp.ClientSession): The aiohttp session to use for requests. """ self.client_id: str = client_id self.client_secret: str = client_secret self.username: str = username self.password: str = password self.auth_base_url: str = AUTH_BASE_URL self.sns_base_url: str = SNS_BASE_URL self.access_token: Optional[str] = None self.refresh_token: Optional[str] = None self.token_expires_at: Optional[datetime.datetime] = None self.session: aiohttp.ClientSession = session super().__init__(self.session, self.sns_base_url) async def _async_authenticate(self) -> None: """Authenticate with the Simon Cloud API. Retrieves access and refresh tokens. Raises: aiohttp.ClientResponseError: If the authentication request fails. """ url: str = f"{self.auth_base_url}/oauth/v2/token" logger.debug(f"Authenticating with {url}") async with self.session.post( url, data={ "client_id": self.client_id, "client_secret": self.client_secret, "username": self.username, "password": self.password, "grant_type": "password", }, ) as response: if response.status != 200: logger.error( f"Authentication failed: {response.status} {await response.text()}" ) response.raise_for_status() logger.debug(f"Authentication successful: {response.status}") response_data: dict = await response.json() self.access_token = response_data.get("access_token") self.refresh_token = response_data.get("refresh_token") self.token_expires_at = self._parse_expires_in( response_data.get("expires_in", 0) )
[docs] async def async_refresh_access_token(self) -> None: """Refresh the access token using the refresh token. Raises: ValueError: If no refresh token is available. aiohttp.ClientResponseError: If the token refresh request fails. """ if not self.refresh_token: raise ValueError("No refresh token available.") url: str = f"{self.auth_base_url}/oauth/v2/token" logger.debug(f"Refreshing access token with {url}") async with self.session.post( url, data={ "client_id": self.client_id, "client_secret": self.client_secret, "grant_type": "refresh_token", "refresh_token": self.refresh_token, }, ) as response: if response.status != 200: logger.error( f"Access token refresh failed: {response.status} {await response.text()}" ) response.raise_for_status() logger.debug(f"Access token refresh successful: {response.status}") response_data: dict = await response.json() self.access_token = response_data.get("access_token") self.refresh_token = response_data.get("refresh_token") self.token_expires_at = self._parse_expires_in( response_data.get("expires_in", 0) )
def _parse_expires_in(self, expires_in: int) -> datetime.datetime: """Calculate the token expiration time. Args: expires_in (int): The number of seconds until the token expires. Returns: datetime.datetime: The calculated expiration time. Raises: ValueError: If no expires_in value is provided. """ if expires_in is None: raise ValueError("No expires_in value provided.") elif expires_in is not None and expires_in > 500: seconds = expires_in - 500 else: seconds = expires_in return datetime.datetime.now() + datetime.timedelta(seconds=seconds) def _is_token_expired(self) -> bool: """Check if the current access token is expired. Returns: bool: True if the token is expired, False otherwise. Raises: ValueError: If no refresh token is available. """ if self.refresh_token is not None and self.token_expires_at is not None: return True if datetime.datetime.now() >= self.token_expires_at else False else: raise ValueError("No refresh token available.")
[docs] async def async_get_access_token(self) -> str: """Retrieve the current access token, refreshing it if necessary. Returns: str: The current access token. Raises: ValueError: If no access token is available. """ if self.access_token is None: await self._async_authenticate() elif self._is_token_expired(): await self.async_refresh_access_token() if not self.access_token: raise ValueError("No access token available.") return self.access_token