Revert "fix: Complete fixes: tests, workflows, coverage"

This reverts commit ed94d40e96.
This commit is contained in:
2025-10-02 12:47:35 +01:00
parent 089e046fd9
commit f4bea6863f
17 changed files with 1756 additions and 1081 deletions

View File

@@ -139,10 +139,6 @@ class AdGuardControlHubCoordinator(DataUpdateCoordinator):
results = await asyncio.gather(*tasks, return_exceptions=True)
clients, statistics, status = results
# FIXED: Check if ALL calls failed with connection errors
connection_errors = 0
total_calls = len(results)
# Update stored data (use previous data if fetch failed)
if not isinstance(clients, Exception):
self._clients = {
@@ -152,26 +148,16 @@ class AdGuardControlHubCoordinator(DataUpdateCoordinator):
}
else:
_LOGGER.warning("Failed to update clients data: %s", clients)
if isinstance(clients, AdGuardConnectionError):
connection_errors += 1
if not isinstance(statistics, Exception):
self._statistics = statistics
else:
_LOGGER.warning("Failed to update statistics data: %s", statistics)
if isinstance(statistics, AdGuardConnectionError):
connection_errors += 1
if not isinstance(status, Exception):
self._protection_status = status
else:
_LOGGER.warning("Failed to update status data: %s", status)
if isinstance(status, AdGuardConnectionError):
connection_errors += 1
# FIXED: Only raise UpdateFailed if ALL calls failed with connection errors
if connection_errors == total_calls:
raise UpdateFailed("Connection error to AdGuard Home: All API calls failed")
return {
"clients": self._clients,

View File

@@ -1,10 +1,10 @@
"""AdGuard Home API client."""
"""API wrapper for AdGuard Home."""
import asyncio
import logging
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Optional
import aiohttp
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from aiohttp import BasicAuth, ClientError, ClientTimeout
from .const import API_ENDPOINTS
@@ -12,141 +12,228 @@ _LOGGER = logging.getLogger(__name__)
class AdGuardHomeError(Exception):
"""Base exception for AdGuard Home errors."""
"""Base exception for AdGuard Home API."""
class AdGuardConnectionError(AdGuardHomeError):
"""Connection error."""
"""Exception for connection errors."""
class AdGuardAuthError(AdGuardHomeError):
"""Authentication error."""
"""Exception for authentication errors."""
class AdGuardNotFoundError(AdGuardHomeError):
"""Exception for not found errors."""
class AdGuardTimeoutError(AdGuardHomeError):
"""Timeout error."""
"""Exception for timeout errors."""
class AdGuardHomeAPI:
"""AdGuard Home API client."""
"""API wrapper for AdGuard Home."""
def __init__(
self,
host: str,
port: int,
port: int = 3000,
username: Optional[str] = None,
password: Optional[str] = None,
ssl: bool = False,
verify_ssl: bool = True,
session: Optional[aiohttp.ClientSession] = None,
timeout: int = 30,
timeout: int = 10,
verify_ssl: bool = True,
) -> None:
"""Initialize the API client."""
"""Initialize the API wrapper."""
self.host = host
self.port = port
self.username = username
self.password = password
self.ssl = ssl
self.verify_ssl = verify_ssl
self.timeout = aiohttp.ClientTimeout(total=timeout)
self._session = session
self._auth = None
self._timeout = ClientTimeout(total=timeout)
protocol = "https" if ssl else "http"
self.base_url = f"{protocol}://{host}:{port}"
self._own_session = session is None
if username and password:
self._auth = aiohttp.BasicAuth(username, password)
async def __aenter__(self):
"""Async context manager entry."""
if self._own_session:
connector = aiohttp.TCPConnector(ssl=self.verify_ssl)
self._session = aiohttp.ClientSession(
timeout=self._timeout,
connector=connector
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
if self._own_session and self._session:
await self._session.close()
@property
def base_url(self) -> str:
"""Return the base URL."""
protocol = "https" if self.ssl else "http"
return f"{protocol}://{self.host}:{self.port}"
def session(self) -> aiohttp.ClientSession:
"""Get the session, creating one if needed."""
if not self._session:
connector = aiohttp.TCPConnector(ssl=self.verify_ssl)
self._session = aiohttp.ClientSession(
timeout=self._timeout,
connector=connector
)
return self._session
async def _request(
self, method: str, endpoint: str, **kwargs
) -> Dict[str, Any]:
"""Make a request to the API."""
async def _request(self, method: str, endpoint: str, data: Optional[Dict] = None) -> Dict[str, Any]:
"""Make an API request."""
url = f"{self.base_url}{endpoint}"
headers = {"Content-Type": "application/json"}
auth = None
if self.username and self.password:
auth = BasicAuth(self.username, self.password)
try:
async with self._session.request(
method,
url,
auth=self._auth,
timeout=self.timeout,
ssl=self.verify_ssl if self.ssl else None,
**kwargs
async with self.session.request(
method, url, json=data, headers=headers, auth=auth, ssl=self.verify_ssl
) as response:
if response.status == 401:
raise AdGuardAuthError("Authentication failed")
elif response.status == 404:
raise AdGuardConnectionError(f"Endpoint not found: {endpoint}")
elif response.status >= 400:
raise AdGuardConnectionError(f"HTTP {response.status}: {response.reason}")
raise AdGuardNotFoundError(f"Endpoint not found: {endpoint}")
elif response.status >= 500:
raise AdGuardConnectionError(f"Server error {response.status}")
return await response.json()
response.raise_for_status()
# Handle empty responses
if response.status == 204 or not response.content_length:
return {}
try:
return await response.json()
except (aiohttp.ContentTypeError, ValueError):
# If not JSON, return text response
text = await response.text()
return {"response": text}
except asyncio.TimeoutError as err:
raise AdGuardTimeoutError(f"Request timeout for {url}") from err
except aiohttp.ClientConnectorError as err:
raise AdGuardConnectionError(f"Connection failed to {url}: {err}") from err
except aiohttp.ClientError as err:
raise AdGuardConnectionError(f"Client error for {url}: {err}") from err
raise AdGuardTimeoutError(f"Request timeout: {err}") from err
except ClientError as err:
raise AdGuardConnectionError(f"Client error: {err}") from err
except Exception as err:
raise AdGuardHomeError(f"Unexpected error for {url}: {err}") from err
if isinstance(err, AdGuardHomeError):
raise
raise AdGuardHomeError(f"Unexpected error: {err}") from err
async def test_connection(self) -> bool:
"""Test the connection to AdGuard Home."""
try:
await self.get_status()
return True
except Exception as err:
_LOGGER.error("Connection test failed: %s", err)
response = await self._request("GET", API_ENDPOINTS["status"])
return isinstance(response, dict) and len(response) > 0
except Exception:
return False
async def get_status(self) -> Dict[str, Any]:
"""Get AdGuard Home status."""
"""Get server status information."""
return await self._request("GET", API_ENDPOINTS["status"])
async def get_clients(self) -> Dict[str, Any]:
"""Get clients list."""
"""Get all configured clients."""
return await self._request("GET", API_ENDPOINTS["clients"])
async def get_statistics(self) -> Dict[str, Any]:
"""Get DNS query statistics."""
return await self._request("GET", API_ENDPOINTS["stats"])
async def set_protection(self, enabled: bool) -> None:
"""Enable or disable protection."""
async def set_protection(self, enabled: bool) -> Dict[str, Any]:
"""Enable or disable AdGuard protection."""
data = {"enabled": enabled}
await self._request("POST", API_ENDPOINTS["protection"], json=data)
return await self._request("POST", API_ENDPOINTS["protection"], data)
async def get_client_by_name(self, name: str) -> Optional[Dict[str, Any]]:
"""Get client by name."""
clients_data = await self.get_clients()
for client in clients_data.get("clients", []):
if client.get("name") == name:
return client
return None
async def add_client(self, client_data: Dict[str, Any]) -> Dict[str, Any]:
"""Add a new client configuration."""
if "name" not in client_data:
raise ValueError("Client name is required")
if "ids" not in client_data or not client_data["ids"]:
raise ValueError("Client IDs are required")
return await self._request("POST", API_ENDPOINTS["clients_add"], client_data)
async def update_client(self, client_data: Dict[str, Any]) -> Dict[str, Any]:
"""Update an existing client configuration."""
if "name" not in client_data:
raise ValueError("Client name is required")
if "data" not in client_data:
raise ValueError("Client data is required")
return await self._request("POST", API_ENDPOINTS["clients_update"], client_data)
async def delete_client(self, client_name: str) -> Dict[str, Any]:
"""Delete a client configuration."""
if not client_name:
raise ValueError("Client name is required")
data = {"name": client_name}
return await self._request("POST", API_ENDPOINTS["clients_delete"], data)
async def get_client_by_name(self, client_name: str) -> Optional[Dict[str, Any]]:
"""Get a specific client by name."""
if not client_name:
return None
try:
clients_data = await self.get_clients()
clients = clients_data.get("clients", [])
for client in clients:
if client.get("name") == client_name:
return client
return None
except Exception as err:
_LOGGER.error("Error getting client %s: %s", client_name, err)
return None
async def update_client_blocked_services(
self, client_name: str, blocked_services: List[str]
) -> None:
"""Update blocked services for a client."""
self,
client_name: str,
blocked_services: list,
) -> Dict[str, Any]:
"""Update blocked services for a specific client."""
if not client_name:
raise ValueError("Client name is required")
client = await self.get_client_by_name(client_name)
if not client:
raise AdGuardConnectionError(f"Client '{client_name}' not found")
raise AdGuardNotFoundError(f"Client '{client_name}' not found")
# Update client with new blocked services
client_data = client.copy()
client_data["blocked_services"] = blocked_services
# Format blocked services data according to AdGuard Home API
blocked_services_data = {
"ids": blocked_services,
"schedule": {"time_zone": "Local"}
}
await self._request("POST", API_ENDPOINTS["clients_update"], json=client_data)
update_data = {
"name": client_name,
"data": {
**client,
"blocked_services": blocked_services_data
}
}
async def add_client(self, client_data: Dict[str, Any]) -> None:
"""Add a new client."""
await self._request("POST", API_ENDPOINTS["clients_add"], json=client_data)
return await self.update_client(update_data)
async def delete_client(self, client_name: str) -> None:
"""Delete a client."""
data = {"name": client_name}
await self._request("POST", API_ENDPOINTS["clients_delete"], json=data)
async def get_blocked_services_list(self) -> Dict[str, Any]:
"""Get list of available blocked services."""
try:
return await self._request("GET", API_ENDPOINTS["blocked_services_all"])
except Exception as err:
_LOGGER.error("Error getting blocked services list: %s", err)
return {}
async def close(self) -> None:
"""Close the API session if we own it."""
if self._own_session and self._session:
await self._session.close()

View File

@@ -1,19 +1,17 @@
"""AdGuard Control Hub binary sensor platform."""
"""Binary sensor platform for AdGuard Control Hub integration."""
import logging
from typing import Any, Dict, List, Optional
from typing import Any, Optional
from homeassistant.components.binary_sensor import (
BinarySensorEntity,
BinarySensorDeviceClass,
)
from homeassistant.components.binary_sensor import BinarySensorEntity, BinarySensorDeviceClass
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.helpers.entity import DeviceInfo, EntityCategory
from . import AdGuardControlHubCoordinator
from .api import AdGuardHomeAPI
from .const import DOMAIN, MANUFACTURER
from .const import DOMAIN, MANUFACTURER, ICON_PROTECTION, ICON_PROTECTION_OFF
_LOGGER = logging.getLogger(__name__)
@@ -27,168 +25,273 @@ async def async_setup_entry(
coordinator = hass.data[DOMAIN][config_entry.entry_id]["coordinator"]
api = hass.data[DOMAIN][config_entry.entry_id]["api"]
entities: List[BinarySensorEntity] = []
# Add main binary sensors
entities.extend([
entities = [
AdGuardProtectionBinarySensor(coordinator, api),
AdGuardServerRunningBinarySensor(coordinator, api),
AdGuardSafeBrowsingBinarySensor(coordinator, api),
AdGuardParentalControlBinarySensor(coordinator, api),
AdGuardSafeSearchBinarySensor(coordinator, api),
])
]
# Add client-specific binary sensors
for client_name in coordinator.clients:
for client_name in coordinator.clients.keys():
entities.extend([
AdGuardClientFilteringBinarySensor(coordinator, api, client_name),
AdGuardClientSafeBrowsingBinarySensor(coordinator, api, client_name),
])
async_add_entities(entities)
async_add_entities(entities, update_before_add=True)
class AdGuardBaseBinarySensor(CoordinatorEntity, BinarySensorEntity):
"""Base AdGuard binary sensor."""
"""Base class for AdGuard binary sensors."""
def __init__(self, coordinator, api: AdGuardHomeAPI) -> None:
def __init__(self, coordinator: AdGuardControlHubCoordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the binary sensor."""
super().__init__(coordinator)
self.api = api
@property
def device_info(self) -> DeviceInfo:
"""Return device info."""
return DeviceInfo(
identifiers={(DOMAIN, "adguard_home")},
name="AdGuard Home",
manufacturer=MANUFACTURER,
model="AdGuard Home",
configuration_url=self.api.base_url,
)
self._attr_device_info = {
"identifiers": {(DOMAIN, f"{api.host}:{api.port}")},
"name": f"AdGuard Control Hub ({api.host})",
"manufacturer": MANUFACTURER,
"model": "AdGuard Home",
"configuration_url": f"{'https' if api.ssl else 'http'}://{api.host}:{api.port}",
}
class AdGuardProtectionBinarySensor(AdGuardBaseBinarySensor):
"""AdGuard protection status binary sensor."""
"""Binary sensor to show AdGuard protection status."""
def __init__(self, coordinator, api: AdGuardHomeAPI) -> None:
def __init__(self, coordinator: AdGuardControlHubCoordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the binary sensor."""
super().__init__(coordinator, api)
self._attr_unique_id = f"{api.host}_{api.port}_protection_enabled"
self._attr_name = "AdGuard Protection Status"
self._attr_unique_id = f"{DOMAIN}_protection_status"
self._attr_device_class = BinarySensorDeviceClass.SAFETY
self._attr_icon = "mdi:shield-check"
@property
def is_on(self) -> bool:
"""Return true if protection is enabled."""
return self.coordinator.protection_status.get("protection_enabled", False)
class AdGuardServerRunningBinarySensor(AdGuardBaseBinarySensor):
"""AdGuard server running binary sensor."""
def __init__(self, coordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the binary sensor."""
super().__init__(coordinator, api)
self._attr_name = "AdGuard Server Running"
self._attr_unique_id = f"{DOMAIN}_server_running"
self._attr_device_class = BinarySensorDeviceClass.RUNNING
self._attr_icon = "mdi:server"
self._attr_entity_category = EntityCategory.DIAGNOSTIC
@property
def is_on(self) -> bool:
def is_on(self) -> Optional[bool]:
"""Return true if protection is enabled."""
return self.coordinator.protection_status.get("protection_enabled", False)
@property
def icon(self) -> str:
"""Return the icon for the binary sensor."""
return ICON_PROTECTION if self.is_on else ICON_PROTECTION_OFF
@property
def available(self) -> bool:
"""Return if sensor is available."""
return self.coordinator.last_update_success and bool(self.coordinator.protection_status)
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return additional state attributes."""
status = self.coordinator.protection_status
return {
"dns_port": status.get("dns_port", "N/A"),
"version": status.get("version", "N/A"),
"running": status.get("running", False),
"dhcp_available": status.get("dhcp_available", False),
}
class AdGuardServerRunningBinarySensor(AdGuardBaseBinarySensor):
"""Binary sensor to show if AdGuard server is running."""
def __init__(self, coordinator: AdGuardControlHubCoordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the binary sensor."""
super().__init__(coordinator, api)
self._attr_unique_id = f"{api.host}_{api.port}_server_running"
self._attr_name = "AdGuard Server Running"
self._attr_device_class = BinarySensorDeviceClass.RUNNING
self._attr_entity_category = EntityCategory.DIAGNOSTIC
@property
def is_on(self) -> Optional[bool]:
"""Return true if server is running."""
return self.coordinator.protection_status.get("running", False)
@property
def icon(self) -> str:
"""Return the icon for the binary sensor."""
return "mdi:server" if self.is_on else "mdi:server-off"
@property
def available(self) -> bool:
"""Return if entity is available."""
return bool(self.coordinator.protection_status)
"""Return if sensor is available."""
return self.coordinator.last_update_success and bool(self.coordinator.protection_status)
class AdGuardSafeBrowsingBinarySensor(AdGuardBaseBinarySensor):
"""AdGuard safe browsing binary sensor."""
"""Binary sensor to show SafeBrowsing status."""
def __init__(self, coordinator, api: AdGuardHomeAPI) -> None:
def __init__(self, coordinator: AdGuardControlHubCoordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the binary sensor."""
super().__init__(coordinator, api)
self._attr_name = "AdGuard Safe Browsing"
self._attr_unique_id = f"{DOMAIN}_safe_browsing"
self._attr_unique_id = f"{api.host}_{api.port}_safebrowsing_enabled"
self._attr_name = "AdGuard SafeBrowsing"
self._attr_device_class = BinarySensorDeviceClass.SAFETY
self._attr_icon = "mdi:web-check"
self._attr_entity_category = EntityCategory.DIAGNOSTIC
@property
def is_on(self) -> bool:
"""Return true if safe browsing is enabled."""
def is_on(self) -> Optional[bool]:
"""Return true if SafeBrowsing is enabled."""
return self.coordinator.protection_status.get("safebrowsing_enabled", False)
class AdGuardParentalControlBinarySensor(AdGuardBaseBinarySensor):
"""AdGuard parental control binary sensor."""
def __init__(self, coordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the binary sensor."""
super().__init__(coordinator, api)
self._attr_name = "AdGuard Parental Control"
self._attr_unique_id = f"{DOMAIN}_parental_control"
self._attr_device_class = BinarySensorDeviceClass.SAFETY
self._attr_icon = "mdi:account-child"
@property
def is_on(self) -> bool:
"""Return true if parental control is enabled."""
return self.coordinator.protection_status.get("parental_enabled", False)
class AdGuardSafeSearchBinarySensor(AdGuardBaseBinarySensor):
"""AdGuard safe search binary sensor."""
def __init__(self, coordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the binary sensor."""
super().__init__(coordinator, api)
self._attr_name = "AdGuard Safe Search"
self._attr_unique_id = f"{DOMAIN}_safe_search"
self._attr_device_class = BinarySensorDeviceClass.SAFETY
self._attr_icon = "mdi:magnify-scan"
@property
def is_on(self) -> bool:
"""Return true if safe search is enabled."""
return self.coordinator.protection_status.get("safesearch_enabled", False)
class AdGuardClientFilteringBinarySensor(CoordinatorEntity, BinarySensorEntity):
"""AdGuard client filtering binary sensor."""
def __init__(self, coordinator, api: AdGuardHomeAPI, client_name: str) -> None:
"""Initialize the binary sensor."""
super().__init__(coordinator)
self.api = api
self._client_name = client_name
self._attr_name = f"AdGuard {client_name} Filtering"
self._attr_unique_id = f"{DOMAIN}_{client_name.lower().replace(' ', '_')}_filtering"
self._attr_device_class = BinarySensorDeviceClass.SAFETY
self._attr_icon = "mdi:filter-check"
@property
def device_info(self) -> DeviceInfo:
"""Return device info."""
return DeviceInfo(
identifiers={(DOMAIN, f"client_{self._client_name}")},
name=f"AdGuard Client: {self._client_name}",
manufacturer=MANUFACTURER,
model="AdGuard Client",
via_device=(DOMAIN, "adguard_home"),
)
@property
def is_on(self) -> bool:
"""Return true if client filtering is enabled."""
client = self.coordinator.clients.get(self._client_name, {})
return client.get("filtering_enabled", True)
def icon(self) -> str:
"""Return the icon for the binary sensor."""
return "mdi:shield-check" if self.is_on else "mdi:shield-off"
@property
def available(self) -> bool:
"""Return if entity is available."""
return self._client_name in self.coordinator.clients
"""Return if sensor is available."""
return self.coordinator.last_update_success and bool(self.coordinator.protection_status)
class AdGuardParentalControlBinarySensor(AdGuardBaseBinarySensor):
"""Binary sensor to show Parental Control status."""
def __init__(self, coordinator: AdGuardControlHubCoordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the binary sensor."""
super().__init__(coordinator, api)
self._attr_unique_id = f"{api.host}_{api.port}_parental_enabled"
self._attr_name = "AdGuard Parental Control"
self._attr_device_class = BinarySensorDeviceClass.SAFETY
self._attr_entity_category = EntityCategory.DIAGNOSTIC
@property
def is_on(self) -> Optional[bool]:
"""Return true if Parental Control is enabled."""
return self.coordinator.protection_status.get("parental_enabled", False)
@property
def icon(self) -> str:
"""Return the icon for the binary sensor."""
return "mdi:account-child" if self.is_on else "mdi:account-child-outline"
@property
def available(self) -> bool:
"""Return if sensor is available."""
return self.coordinator.last_update_success and bool(self.coordinator.protection_status)
class AdGuardSafeSearchBinarySensor(AdGuardBaseBinarySensor):
"""Binary sensor to show Safe Search status."""
def __init__(self, coordinator: AdGuardControlHubCoordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the binary sensor."""
super().__init__(coordinator, api)
self._attr_unique_id = f"{api.host}_{api.port}_safesearch_enabled"
self._attr_name = "AdGuard Safe Search"
self._attr_device_class = BinarySensorDeviceClass.SAFETY
self._attr_entity_category = EntityCategory.DIAGNOSTIC
@property
def is_on(self) -> Optional[bool]:
"""Return true if Safe Search is enabled."""
return self.coordinator.protection_status.get("safesearch_enabled", False)
@property
def icon(self) -> str:
"""Return the icon for the binary sensor."""
return "mdi:shield-search" if self.is_on else "mdi:magnify"
@property
def available(self) -> bool:
"""Return if sensor is available."""
return self.coordinator.last_update_success and bool(self.coordinator.protection_status)
class AdGuardClientFilteringBinarySensor(AdGuardBaseBinarySensor):
"""Binary sensor to show client-specific filtering status."""
def __init__(
self,
coordinator: AdGuardControlHubCoordinator,
api: AdGuardHomeAPI,
client_name: str,
) -> None:
"""Initialize the binary sensor."""
super().__init__(coordinator, api)
self.client_name = client_name
self._attr_unique_id = f"{api.host}_{api.port}_client_{client_name}_filtering"
self._attr_name = f"AdGuard {client_name} Filtering"
self._attr_device_class = BinarySensorDeviceClass.RUNNING
self._attr_entity_category = EntityCategory.DIAGNOSTIC
@property
def is_on(self) -> Optional[bool]:
"""Return true if client filtering is enabled."""
client = self.coordinator.clients.get(self.client_name, {})
return client.get("filtering_enabled", True)
@property
def icon(self) -> str:
"""Return the icon for the binary sensor."""
return "mdi:filter" if self.is_on else "mdi:filter-off"
@property
def available(self) -> bool:
"""Return if sensor is available."""
return (
self.coordinator.last_update_success
and self.client_name in self.coordinator.clients
)
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return additional state attributes."""
client = self.coordinator.clients.get(self.client_name, {})
return {
"client_ids": client.get("ids", []),
"use_global_settings": client.get("use_global_settings", True),
}
class AdGuardClientSafeBrowsingBinarySensor(AdGuardBaseBinarySensor):
"""Binary sensor to show client-specific SafeBrowsing status."""
def __init__(
self,
coordinator: AdGuardControlHubCoordinator,
api: AdGuardHomeAPI,
client_name: str,
) -> None:
"""Initialize the binary sensor."""
super().__init__(coordinator, api)
self.client_name = client_name
self._attr_unique_id = f"{api.host}_{api.port}_client_{client_name}_safebrowsing"
self._attr_name = f"AdGuard {client_name} SafeBrowsing"
self._attr_device_class = BinarySensorDeviceClass.SAFETY
self._attr_entity_category = EntityCategory.DIAGNOSTIC
@property
def is_on(self) -> Optional[bool]:
"""Return true if client SafeBrowsing is enabled."""
client = self.coordinator.clients.get(self.client_name, {})
return client.get("safebrowsing_enabled", False)
@property
def icon(self) -> str:
"""Return the icon for the binary sensor."""
return "mdi:shield-account" if self.is_on else "mdi:shield-account-outline"
@property
def available(self) -> bool:
"""Return if sensor is available."""
return (
self.coordinator.last_update_success
and self.client_name in self.coordinator.clients
)
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return additional state attributes."""
client = self.coordinator.clients.get(self.client_name, {})
return {
"parental_enabled": client.get("parental_enabled", False),
"safesearch_enabled": client.get("safesearch_enabled", False),
}

View File

@@ -1,6 +1,7 @@
"""Config flow for AdGuard Control Hub integration."""
import asyncio
import logging
import re
from typing import Any, Dict, Optional
import voluptuous as vol
@@ -32,6 +33,86 @@ STEP_USER_DATA_SCHEMA = vol.Schema({
})
def validate_host(host: str) -> str:
"""Validate and clean host input."""
host = host.strip()
if not host:
raise InvalidHost("Host cannot be empty")
# Remove protocol if present
if host.startswith(("http://", "https://")):
host = host.split("://", 1)[1]
# Remove path if present
if "/" in host:
host = host.split("/", 1)[0]
return host
async def validate_input(hass, data: Dict[str, Any]) -> Dict[str, Any]:
"""Validate the user input allows us to connect."""
# Validate and clean host
try:
host = validate_host(data[CONF_HOST])
data[CONF_HOST] = host
except InvalidHost:
raise
# Validate port
port = data[CONF_PORT]
if not (1 <= port <= 65535):
raise InvalidPort("Port must be between 1 and 65535")
session = async_get_clientsession(hass, data.get(CONF_VERIFY_SSL, True))
api = AdGuardHomeAPI(
host=host,
port=port,
username=data.get(CONF_USERNAME),
password=data.get(CONF_PASSWORD),
ssl=data.get(CONF_SSL, False),
verify_ssl=data.get(CONF_VERIFY_SSL, True),
session=session,
timeout=10,
)
try:
if not await api.test_connection():
raise CannotConnect("Failed to connect to AdGuard Home")
try:
status = await api.get_status()
version = status.get("version", "unknown")
return {
"title": f"AdGuard Control Hub ({host})",
"version": version,
"host": host,
}
except Exception:
# If we can't get status but connection works, still proceed
return {
"title": f"AdGuard Control Hub ({host})",
"version": "unknown",
"host": host,
}
except AdGuardAuthError as err:
raise InvalidAuth from err
except AdGuardTimeoutError as err:
raise Timeout from err
except AdGuardConnectionError as err:
if "timeout" in str(err).lower():
raise Timeout from err
raise CannotConnect from err
except asyncio.TimeoutError as err:
raise Timeout from err
except Exception as err:
_LOGGER.exception("Unexpected error during validation")
raise CannotConnect from err
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for AdGuard Control Hub."""
@@ -46,42 +127,27 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
if user_input is not None:
try:
# Basic validation
host = user_input[CONF_HOST].strip()
if not host:
errors[CONF_HOST] = "invalid_host"
info = await validate_input(self.hass, user_input)
# Test connection
session = async_get_clientsession(self.hass, user_input.get(CONF_VERIFY_SSL, True))
api = AdGuardHomeAPI(
host=host,
port=user_input[CONF_PORT],
username=user_input.get(CONF_USERNAME),
password=user_input.get(CONF_PASSWORD),
ssl=user_input.get(CONF_SSL, False),
verify_ssl=user_input.get(CONF_VERIFY_SSL, True),
session=session,
timeout=10,
unique_id = f"{info['host']}:{user_input[CONF_PORT]}"
await self.async_set_unique_id(unique_id)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=info["title"],
data=user_input,
)
if not await api.test_connection():
errors["base"] = "cannot_connect"
else:
unique_id = f"{host}:{user_input[CONF_PORT]}"
await self.async_set_unique_id(unique_id)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=f"AdGuard Control Hub ({host})",
data=user_input,
)
except AdGuardAuthError:
errors["base"] = "invalid_auth"
except AdGuardTimeoutError:
errors["base"] = "timeout"
except AdGuardConnectionError:
except CannotConnect:
errors["base"] = "cannot_connect"
except InvalidAuth:
errors["base"] = "invalid_auth"
except InvalidHost:
errors[CONF_HOST] = "invalid_host"
except InvalidPort:
errors[CONF_PORT] = "invalid_port"
except Timeout:
errors["base"] = "timeout"
except Exception:
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
@@ -91,3 +157,23 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
data_schema=STEP_USER_DATA_SCHEMA,
errors=errors,
)
class CannotConnect(Exception):
"""Error to indicate we cannot connect."""
class InvalidAuth(Exception):
"""Error to indicate there is invalid auth."""
class InvalidHost(Exception):
"""Error to indicate invalid host."""
class InvalidPort(Exception):
"""Error to indicate invalid port."""
class Timeout(Exception):
"""Error to indicate connection timeout."""

View File

@@ -1,75 +1,96 @@
"""Constants for AdGuard Control Hub."""
from homeassistant.const import Platform
"""Constants for the AdGuard Control Hub integration."""
from typing import Final
# Integration metadata
DOMAIN = "adguard_hub"
MANUFACTURER = "AdGuard" # FIXED: Added missing MANUFACTURER constant
SCAN_INTERVAL = 30
DEFAULT_PORT = 3000
DEFAULT_SSL = False
DEFAULT_VERIFY_SSL = True
# Integration details
DOMAIN: Final = "adguard_hub"
MANUFACTURER: Final = "AdGuard Control Hub"
INTEGRATION_NAME: Final = "AdGuard Control Hub"
# Configuration keys
CONF_SSL = "ssl"
CONF_VERIFY_SSL = "verify_ssl"
# Configuration
CONF_SSL: Final = "ssl"
CONF_VERIFY_SSL: Final = "verify_ssl"
# Defaults
DEFAULT_PORT: Final = 3000
DEFAULT_SSL: Final = False
DEFAULT_VERIFY_SSL: Final = True
SCAN_INTERVAL: Final = 30
# Platforms
PLATFORMS = [
Platform.SWITCH,
Platform.BINARY_SENSOR,
Platform.SENSOR,
PLATFORMS: Final = [
"switch",
"binary_sensor",
"sensor",
]
# Entity attributes
ATTR_CLIENT_NAME = "client_name"
ATTR_SERVICES = "services"
ATTR_DURATION = "duration"
ATTR_CLIENTS = "clients"
ATTR_ENABLED = "enabled"
# Service names
SERVICE_BLOCK_SERVICES = "block_services"
SERVICE_UNBLOCK_SERVICES = "unblock_services"
SERVICE_EMERGENCY_UNBLOCK = "emergency_unblock"
SERVICE_ADD_CLIENT = "add_client"
SERVICE_REMOVE_CLIENT = "remove_client"
SERVICE_REFRESH_DATA = "refresh_data"
# API endpoints
API_ENDPOINTS = {
# API Endpoints
API_ENDPOINTS: Final = {
"status": "/control/status",
"clients": "/control/clients",
"stats": "/control/stats",
"protection": "/control/protection",
"clients_add": "/control/clients/add",
"clients_update": "/control/clients/update",
"clients_update": "/control/clients/update",
"clients_delete": "/control/clients/delete",
"blocked_services_all": "/control/blocked_services/all",
"protection": "/control/protection",
"stats": "/control/stats",
"rewrite": "/control/rewrite/list",
"querylog": "/control/querylog",
}
# Available services for blocking
BLOCKED_SERVICES = {
# Available blocked services (common ones)
BLOCKED_SERVICES: Final = {
"youtube": "YouTube",
"facebook": "Facebook",
"netflix": "Netflix",
"gaming": "Gaming Services",
"facebook": "Facebook",
"twitter": "Twitter",
"instagram": "Instagram",
"snapchat": "Snapchat",
"telegram": "Telegram",
"whatsapp": "WhatsApp",
"discord": "Discord",
"skype": "Skype",
"linkedin": "LinkedIn",
"pinterest": "Pinterest",
"reddit": "Reddit",
"tiktok": "TikTok",
"amazon_prime": "Amazon Prime Video",
"twitter": "Twitter/X",
"snapchat": "Snapchat",
"reddit": "Reddit",
"disney_plus": "Disney+",
"hulu": "Hulu",
"spotify": "Spotify",
"twitch": "Twitch",
"steam": "Steam",
"epic_games": "Epic Games",
"xbox_live": "Xbox Live",
"whatsapp": "WhatsApp",
"telegram": "Telegram",
"discord": "Discord",
"amazon": "Amazon",
"ebay": "eBay",
"skype": "Skype",
"zoom": "Zoom",
"tinder": "Tinder",
"pinterest": "Pinterest",
"linkedin": "LinkedIn",
"dailymotion": "Dailymotion",
"vimeo": "Vimeo",
"viber": "Viber",
"wechat": "WeChat",
"ok": "Odnoklassniki",
"vk": "VKontakte",
}
# Service attributes
ATTR_CLIENT_NAME: Final = "client_name"
ATTR_SERVICES: Final = "services"
ATTR_DURATION: Final = "duration"
ATTR_CLIENTS: Final = "clients"
ATTR_ENABLED: Final = "enabled"
# Icons
ICON_PROTECTION: Final = "mdi:shield"
ICON_PROTECTION_OFF: Final = "mdi:shield-off"
ICON_CLIENT: Final = "mdi:devices"
ICON_STATISTICS: Final = "mdi:chart-line"
ICON_BLOCKED: Final = "mdi:shield-check"
ICON_QUERIES: Final = "mdi:dns"
ICON_PERCENTAGE: Final = "mdi:percent"
ICON_CLIENTS: Final = "mdi:account-multiple"
# Service names
SERVICE_BLOCK_SERVICES: Final = "block_services"
SERVICE_UNBLOCK_SERVICES: Final = "unblock_services"
SERVICE_EMERGENCY_UNBLOCK: Final = "emergency_unblock"
SERVICE_ADD_CLIENT: Final = "add_client"
SERVICE_REMOVE_CLIENT: Final = "remove_client"
SERVICE_REFRESH_DATA: Final = "refresh_data"

View File

@@ -10,5 +10,5 @@
"requirements": [
"aiohttp>=3.8.0"
],
"version": "1.0.2"
"version": "1.0.1"
}

View File

@@ -1,21 +1,18 @@
"""AdGuard Control Hub sensor platform."""
"""Sensor platform for AdGuard Control Hub integration."""
import logging
from typing import Any, Dict, List, Optional
from typing import Any, Optional
from homeassistant.components.sensor import (
SensorEntity,
SensorDeviceClass,
SensorStateClass,
)
from homeassistant.components.sensor import SensorEntity, SensorStateClass, SensorDeviceClass
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import PERCENTAGE, UnitOfTime
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.helpers.entity import DeviceInfo, EntityCategory
from homeassistant.const import PERCENTAGE, UnitOfTime
from . import AdGuardControlHubCoordinator
from .api import AdGuardHomeAPI
from .const import DOMAIN, MANUFACTURER
from .const import DOMAIN, MANUFACTURER, ICON_STATISTICS, ICON_BLOCKED, ICON_QUERIES, ICON_PERCENTAGE, ICON_CLIENTS
_LOGGER = logging.getLogger(__name__)
@@ -29,191 +26,199 @@ async def async_setup_entry(
coordinator = hass.data[DOMAIN][config_entry.entry_id]["coordinator"]
api = hass.data[DOMAIN][config_entry.entry_id]["api"]
entities: List[SensorEntity] = []
# Add main sensors
entities.extend([
entities = [
AdGuardQueriesCounterSensor(coordinator, api),
AdGuardBlockedCounterSensor(coordinator, api),
AdGuardBlockingPercentageSensor(coordinator, api),
AdGuardClientsCountSensor(coordinator, api),
AdGuardClientCountSensor(coordinator, api),
AdGuardProcessingTimeSensor(coordinator, api),
AdGuardFilteringRulesSensor(coordinator, api),
AdGuardUpstreamServersSensor(coordinator, api),
AdGuardVersionSensor(coordinator, api),
])
]
async_add_entities(entities)
async_add_entities(entities, update_before_add=True)
class AdGuardBaseSensor(CoordinatorEntity, SensorEntity):
"""Base AdGuard sensor."""
"""Base class for AdGuard sensors."""
def __init__(self, coordinator, api: AdGuardHomeAPI) -> None:
def __init__(self, coordinator: AdGuardControlHubCoordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the sensor."""
super().__init__(coordinator)
self.api = api
self._attr_device_info = {
"identifiers": {(DOMAIN, f"{api.host}:{api.port}")},
"name": f"AdGuard Control Hub ({api.host})",
"manufacturer": MANUFACTURER,
"model": "AdGuard Home",
"configuration_url": f"{'https' if api.ssl else 'http'}://{api.host}:{api.port}",
}
@property
def device_info(self) -> DeviceInfo:
"""Return device info."""
return DeviceInfo(
identifiers={(DOMAIN, "adguard_home")},
name="AdGuard Home",
manufacturer=MANUFACTURER,
model="AdGuard Home",
configuration_url=self.api.base_url,
)
def available(self) -> bool:
"""Return if sensor is available."""
return self.coordinator.last_update_success and bool(self.coordinator.statistics)
class AdGuardQueriesCounterSensor(AdGuardBaseSensor):
"""AdGuard DNS queries counter sensor."""
"""Sensor to track DNS queries count."""
def __init__(self, coordinator, api: AdGuardHomeAPI) -> None:
def __init__(self, coordinator: AdGuardControlHubCoordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, api)
self._attr_unique_id = f"{api.host}_{api.port}_dns_queries"
self._attr_name = "AdGuard DNS Queries"
self._attr_unique_id = f"{DOMAIN}_dns_queries"
self._attr_device_class = SensorDeviceClass.ENUM
self._attr_icon = ICON_QUERIES
self._attr_state_class = SensorStateClass.TOTAL_INCREASING
self._attr_icon = "mdi:dns"
self._attr_native_unit_of_measurement = "queries"
self._attr_entity_category = EntityCategory.DIAGNOSTIC
@property
def native_value(self) -> Optional[int]:
"""Return the state of the sensor."""
return self.coordinator.statistics.get("num_dns_queries", 0)
stats = self.coordinator.statistics
return stats.get("num_dns_queries", 0)
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return additional state attributes."""
stats = self.coordinator.statistics
return {
"queries_today": stats.get("num_dns_queries_today", 0),
"replaced_safebrowsing": stats.get("num_replaced_safebrowsing", 0),
"replaced_parental": stats.get("num_replaced_parental", 0),
"replaced_safesearch": stats.get("num_replaced_safesearch", 0),
}
class AdGuardBlockedCounterSensor(AdGuardBaseSensor):
"""AdGuard blocked queries counter sensor."""
"""Sensor to track blocked queries count."""
def __init__(self, coordinator, api: AdGuardHomeAPI) -> None:
def __init__(self, coordinator: AdGuardControlHubCoordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, api)
self._attr_unique_id = f"{api.host}_{api.port}_blocked_queries"
self._attr_name = "AdGuard Blocked Queries"
self._attr_unique_id = f"{DOMAIN}_blocked_queries"
self._attr_device_class = SensorDeviceClass.ENUM
self._attr_icon = ICON_BLOCKED
self._attr_state_class = SensorStateClass.TOTAL_INCREASING
self._attr_icon = "mdi:shield-check"
self._attr_native_unit_of_measurement = "queries"
self._attr_entity_category = EntityCategory.DIAGNOSTIC
@property
def native_value(self) -> Optional[int]:
"""Return the state of the sensor."""
return self.coordinator.statistics.get("num_blocked_filtering", 0)
stats = self.coordinator.statistics
return stats.get("num_blocked_filtering", 0)
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return additional state attributes."""
stats = self.coordinator.statistics
return {
"blocked_today": stats.get("num_blocked_filtering_today", 0),
"malware_phishing": stats.get("num_replaced_safebrowsing", 0),
"adult_websites": stats.get("num_replaced_parental", 0),
}
class AdGuardBlockingPercentageSensor(AdGuardBaseSensor):
"""AdGuard blocking percentage sensor."""
"""Sensor to track blocking percentage."""
def __init__(self, coordinator, api: AdGuardHomeAPI) -> None:
def __init__(self, coordinator: AdGuardControlHubCoordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, api)
self._attr_unique_id = f"{api.host}_{api.port}_blocking_percentage"
self._attr_name = "AdGuard Blocking Percentage"
self._attr_unique_id = f"{DOMAIN}_blocking_percentage"
self._attr_device_class = SensorDeviceClass.ENUM
self._attr_icon = ICON_PERCENTAGE
self._attr_state_class = SensorStateClass.MEASUREMENT
self._attr_native_unit_of_measurement = PERCENTAGE
self._attr_icon = "mdi:percent"
self._attr_entity_category = EntityCategory.DIAGNOSTIC
@property
def native_value(self) -> Optional[float]:
"""Return the state of the sensor."""
total_queries = self.coordinator.statistics.get("num_dns_queries", 0)
blocked_queries = self.coordinator.statistics.get("num_blocked_filtering", 0)
stats = self.coordinator.statistics
total_queries = stats.get("num_dns_queries", 0)
blocked_queries = stats.get("num_blocked_filtering", 0)
if total_queries > 0:
return round((blocked_queries / total_queries) * 100, 2)
return 0.0
if total_queries == 0:
return 0.0
percentage = (blocked_queries / total_queries) * 100
return round(percentage, 2)
class AdGuardClientsCountSensor(AdGuardBaseSensor):
"""AdGuard clients count sensor."""
class AdGuardClientCountSensor(AdGuardBaseSensor):
"""Sensor to track active clients count."""
def __init__(self, coordinator, api: AdGuardHomeAPI) -> None:
def __init__(self, coordinator: AdGuardControlHubCoordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, api)
self._attr_unique_id = f"{api.host}_{api.port}_clients_count"
self._attr_name = "AdGuard Clients Count"
self._attr_unique_id = f"{DOMAIN}_clients_count"
self._attr_device_class = SensorDeviceClass.ENUM
self._attr_icon = ICON_CLIENTS
self._attr_state_class = SensorStateClass.MEASUREMENT
self._attr_icon = "mdi:account-multiple"
self._attr_entity_category = EntityCategory.DIAGNOSTIC
@property
def native_value(self) -> int:
"""Return the state of the sensor."""
return len(self.coordinator.clients)
class AdGuardProcessingTimeSensor(AdGuardBaseSensor):
"""AdGuard average processing time sensor."""
def __init__(self, coordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, api)
self._attr_name = "AdGuard Average Processing Time"
self._attr_unique_id = f"{DOMAIN}_avg_processing_time"
self._attr_device_class = SensorDeviceClass.DURATION
self._attr_state_class = SensorStateClass.MEASUREMENT
self._attr_native_unit_of_measurement = UnitOfTime.MILLISECONDS
self._attr_icon = "mdi:speedometer"
self._attr_entity_category = EntityCategory.DIAGNOSTIC
@property
def native_value(self) -> Optional[float]:
"""Return the state of the sensor."""
return self.coordinator.statistics.get("avg_processing_time", 0.0)
class AdGuardFilteringRulesSensor(AdGuardBaseSensor):
"""AdGuard filtering rules count sensor."""
def __init__(self, coordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, api)
self._attr_name = "AdGuard Filtering Rules"
self._attr_unique_id = f"{DOMAIN}_filtering_rules"
self._attr_device_class = SensorDeviceClass.ENUM
self._attr_state_class = SensorStateClass.MEASUREMENT
self._attr_icon = "mdi:filter"
self._attr_native_unit_of_measurement = "clients"
self._attr_entity_category = EntityCategory.DIAGNOSTIC
@property
def native_value(self) -> Optional[int]:
"""Return the state of the sensor."""
return self.coordinator.protection_status.get("num_filtering_rules", 0)
return len(self.coordinator.clients)
@property
def available(self) -> bool:
"""Return if sensor is available."""
return self.coordinator.last_update_success
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return additional state attributes."""
clients = self.coordinator.clients
protected_clients = sum(1 for c in clients.values() if c.get("filtering_enabled", True))
return {
"protected_clients": protected_clients,
"unprotected_clients": len(clients) - protected_clients,
"client_names": list(clients.keys()),
}
class AdGuardUpstreamServersSensor(AdGuardBaseSensor):
"""AdGuard upstream servers sensor."""
class AdGuardProcessingTimeSensor(AdGuardBaseSensor):
"""Sensor to track average processing time."""
def __init__(self, coordinator, api: AdGuardHomeAPI) -> None:
def __init__(self, coordinator: AdGuardControlHubCoordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, api)
self._attr_name = "AdGuard Upstream Servers"
self._attr_unique_id = f"{DOMAIN}_upstream_servers"
self._attr_icon = "mdi:server-network"
self._attr_unique_id = f"{api.host}_{api.port}_avg_processing_time"
self._attr_name = "AdGuard Average Processing Time"
self._attr_icon = "mdi:speedometer"
self._attr_state_class = SensorStateClass.MEASUREMENT
self._attr_native_unit_of_measurement = UnitOfTime.MILLISECONDS
self._attr_entity_category = EntityCategory.DIAGNOSTIC
self._attr_device_class = SensorDeviceClass.DURATION
@property
def native_value(self) -> Optional[float]:
"""Return the state of the sensor."""
stats = self.coordinator.statistics
avg_time = stats.get("avg_processing_time", 0)
return round(avg_time, 2) if avg_time else 0
class AdGuardFilteringRulesSensor(AdGuardBaseSensor):
"""Sensor to track number of filtering rules."""
def __init__(self, coordinator: AdGuardControlHubCoordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, api)
self._attr_unique_id = f"{api.host}_{api.port}_filtering_rules"
self._attr_name = "AdGuard Filtering Rules"
self._attr_icon = "mdi:filter"
self._attr_state_class = SensorStateClass.MEASUREMENT
self._attr_native_unit_of_measurement = "rules"
self._attr_entity_category = EntityCategory.DIAGNOSTIC
@property
def native_value(self) -> str:
def native_value(self) -> Optional[int]:
"""Return the state of the sensor."""
servers = self.coordinator.protection_status.get("dns_addresses", [])
return ", ".join(servers) if servers else "Unknown"
class AdGuardVersionSensor(AdGuardBaseSensor):
"""AdGuard version sensor."""
def __init__(self, coordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, api)
self._attr_name = "AdGuard Version"
self._attr_unique_id = f"{DOMAIN}_version"
self._attr_icon = "mdi:information"
self._attr_entity_category = EntityCategory.DIAGNOSTIC
@property
def native_value(self) -> str:
"""Return the state of the sensor."""
return self.coordinator.protection_status.get("version", "Unknown")
stats = self.coordinator.statistics
return stats.get("filtering_rules_count", 0)

View File

@@ -1,81 +1,94 @@
"""AdGuard Control Hub services."""
"""Service implementations for AdGuard Control Hub integration."""
import asyncio
import logging
from typing import Any, Dict, List
from typing import Any, Dict
import voluptuous as vol
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.helpers import config_validation as cv
import voluptuous as vol
from .api import AdGuardConnectionError, AdGuardHomeError
from .api import AdGuardHomeAPI, AdGuardHomeError
from .const import (
ATTR_CLIENT_NAME,
ATTR_CLIENTS,
ATTR_DURATION,
ATTR_SERVICES,
BLOCKED_SERVICES,
DOMAIN,
SERVICE_ADD_CLIENT,
BLOCKED_SERVICES,
ATTR_CLIENT_NAME,
ATTR_SERVICES,
ATTR_DURATION,
ATTR_CLIENTS,
ATTR_ENABLED,
SERVICE_BLOCK_SERVICES,
SERVICE_EMERGENCY_UNBLOCK,
SERVICE_REFRESH_DATA,
SERVICE_REMOVE_CLIENT,
SERVICE_UNBLOCK_SERVICES,
SERVICE_EMERGENCY_UNBLOCK,
SERVICE_ADD_CLIENT,
SERVICE_REMOVE_CLIENT,
SERVICE_REFRESH_DATA,
)
_LOGGER = logging.getLogger(__name__)
# Service schemas
SCHEMA_BLOCK_SERVICES = vol.Schema({
vol.Required(ATTR_CLIENT_NAME): cv.string,
vol.Required(ATTR_SERVICES): vol.All(cv.ensure_list, [vol.In(BLOCKED_SERVICES.keys())]),
})
SCHEMA_UNBLOCK_SERVICES = vol.Schema({
vol.Required(ATTR_CLIENT_NAME): cv.string,
vol.Required(ATTR_SERVICES): vol.All(cv.ensure_list, [vol.In(BLOCKED_SERVICES.keys())]),
})
SCHEMA_EMERGENCY_UNBLOCK = vol.Schema({
vol.Required(ATTR_DURATION): cv.positive_int,
vol.Optional(ATTR_CLIENTS, default=["all"]): vol.All(cv.ensure_list, [cv.string]),
})
SCHEMA_ADD_CLIENT = vol.Schema({
vol.Required("name"): cv.string,
vol.Required("ids"): vol.All(cv.ensure_list, [cv.string]),
vol.Optional("filtering_enabled", default=True): cv.boolean,
vol.Optional("safebrowsing_enabled", default=False): cv.boolean,
vol.Optional("parental_enabled", default=False): cv.boolean,
vol.Optional("safesearch_enabled", default=False): cv.boolean,
vol.Optional("use_global_blocked_services", default=True): cv.boolean,
vol.Optional("blocked_services", default=[]): vol.All(cv.ensure_list, [cv.string]),
})
SCHEMA_REMOVE_CLIENT = vol.Schema({
vol.Required("name"): cv.string,
})
SCHEMA_REFRESH_DATA = vol.Schema({})
class AdGuardControlHubServices:
"""AdGuard Control Hub services."""
"""Handle services for AdGuard Control Hub."""
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize services."""
"""Initialize the services."""
self.hass = hass
def register_services(self) -> None:
"""Register services."""
# FIXED: All service constants are now properly defined
self.hass.services.register(
DOMAIN,
SERVICE_BLOCK_SERVICES,
self.block_services,
)
"""Register all services."""
_LOGGER.debug("Registering AdGuard Control Hub services")
self.hass.services.register(
DOMAIN,
SERVICE_UNBLOCK_SERVICES,
self.unblock_services,
)
services = [
(SERVICE_BLOCK_SERVICES, self.block_services, SCHEMA_BLOCK_SERVICES),
(SERVICE_UNBLOCK_SERVICES, self.unblock_services, SCHEMA_UNBLOCK_SERVICES),
(SERVICE_EMERGENCY_UNBLOCK, self.emergency_unblock, SCHEMA_EMERGENCY_UNBLOCK),
(SERVICE_ADD_CLIENT, self.add_client, SCHEMA_ADD_CLIENT),
(SERVICE_REMOVE_CLIENT, self.remove_client, SCHEMA_REMOVE_CLIENT),
(SERVICE_REFRESH_DATA, self.refresh_data, SCHEMA_REFRESH_DATA),
]
self.hass.services.register(
DOMAIN,
SERVICE_EMERGENCY_UNBLOCK,
self.emergency_unblock,
)
self.hass.services.register(
DOMAIN,
SERVICE_ADD_CLIENT,
self.add_client,
)
self.hass.services.register(
DOMAIN,
SERVICE_REMOVE_CLIENT,
self.remove_client,
)
self.hass.services.register(
DOMAIN,
SERVICE_REFRESH_DATA,
self.refresh_data,
)
_LOGGER.info("AdGuard Control Hub services registered")
for service_name, service_func, schema in services:
if not self.hass.services.has_service(DOMAIN, service_name):
self.hass.services.register(DOMAIN, service_name, service_func, schema=schema)
_LOGGER.debug("Registered service: %s", service_name)
def unregister_services(self) -> None:
"""Unregister services."""
"""Unregister all services."""
_LOGGER.debug("Unregistering AdGuard Control Hub services")
services = [
SERVICE_BLOCK_SERVICES,
SERVICE_UNBLOCK_SERVICES,
@@ -85,163 +98,179 @@ class AdGuardControlHubServices:
SERVICE_REFRESH_DATA,
]
for service in services:
if self.hass.services.has_service(DOMAIN, service):
self.hass.services.remove(DOMAIN, service)
for service_name in services:
if self.hass.services.has_service(DOMAIN, service_name):
self.hass.services.remove(DOMAIN, service_name)
_LOGGER.debug("Unregistered service: %s", service_name)
_LOGGER.info("AdGuard Control Hub services unregistered")
def _get_api(self):
"""Get API instance from first available entry."""
for entry_id, entry_data in self.hass.data[DOMAIN].items():
def _get_api_instances(self) -> list[AdGuardHomeAPI]:
"""Get all API instances."""
apis = []
for entry_data in self.hass.data.get(DOMAIN, {}).values():
if isinstance(entry_data, dict) and "api" in entry_data:
return entry_data["api"]
raise AdGuardConnectionError("No AdGuard Control Hub API available")
def _get_coordinator(self):
"""Get coordinator instance from first available entry."""
for entry_id, entry_data in self.hass.data[DOMAIN].items():
if isinstance(entry_data, dict) and "coordinator" in entry_data:
return entry_data["coordinator"]
raise AdGuardConnectionError("No AdGuard Control Hub coordinator available")
apis.append(entry_data["api"])
return apis
async def block_services(self, call: ServiceCall) -> None:
"""Block services for a client."""
"""Block services for a specific client."""
client_name = call.data[ATTR_CLIENT_NAME]
services_to_block = call.data[ATTR_SERVICES]
services = call.data[ATTR_SERVICES]
try:
api = self._get_api()
client = await api.get_client_by_name(client_name)
_LOGGER.info("Blocking services %s for client %s", services, client_name)
if not client:
_LOGGER.error("Client '%s' not found", client_name)
return
success_count = 0
for api in self._get_api_instances():
try:
client = await api.get_client_by_name(client_name)
if client:
current_blocked = client.get("blocked_services", {})
if isinstance(current_blocked, dict):
current_services = current_blocked.get("ids", [])
else:
current_services = current_blocked or []
# Get current blocked services and add new ones
current_blocked = set(client.get("blocked_services", []))
current_blocked.update(services_to_block)
updated_services = list(set(current_services + services))
await api.update_client_blocked_services(client_name, updated_services)
success_count += 1
_LOGGER.info("Successfully blocked services for %s", client_name)
else:
_LOGGER.warning("Client %s not found", client_name)
except AdGuardHomeError as err:
_LOGGER.error("AdGuard error blocking services for %s: %s", client_name, err)
except Exception as err:
_LOGGER.exception("Unexpected error blocking services for %s: %s", client_name, err)
await api.update_client_blocked_services(
client_name, list(current_blocked)
)
coordinator = self._get_coordinator()
await coordinator.async_request_refresh()
_LOGGER.info(
"Blocked services %s for client '%s'", services_to_block, client_name
)
except AdGuardHomeError as err:
_LOGGER.error("Failed to block services for '%s': %s", client_name, err)
if success_count == 0:
_LOGGER.error("Failed to block services for %s on any instance", client_name)
async def unblock_services(self, call: ServiceCall) -> None:
"""Unblock services for a client."""
"""Unblock services for a specific client."""
client_name = call.data[ATTR_CLIENT_NAME]
services_to_unblock = call.data[ATTR_SERVICES]
services = call.data[ATTR_SERVICES]
try:
api = self._get_api()
client = await api.get_client_by_name(client_name)
_LOGGER.info("Unblocking services %s for client %s", services, client_name)
if not client:
_LOGGER.error("Client '%s' not found", client_name)
return
success_count = 0
for api in self._get_api_instances():
try:
client = await api.get_client_by_name(client_name)
if client:
current_blocked = client.get("blocked_services", {})
if isinstance(current_blocked, dict):
current_services = current_blocked.get("ids", [])
else:
current_services = current_blocked or []
# Get current blocked services and remove specified ones
current_blocked = set(client.get("blocked_services", []))
current_blocked.difference_update(services_to_unblock)
updated_services = [s for s in current_services if s not in services]
await api.update_client_blocked_services(client_name, updated_services)
success_count += 1
_LOGGER.info("Successfully unblocked services for %s", client_name)
else:
_LOGGER.warning("Client %s not found", client_name)
except AdGuardHomeError as err:
_LOGGER.error("AdGuard error unblocking services for %s: %s", client_name, err)
except Exception as err:
_LOGGER.exception("Unexpected error unblocking services for %s: %s", client_name, err)
await api.update_client_blocked_services(
client_name, list(current_blocked)
)
coordinator = self._get_coordinator()
await coordinator.async_request_refresh()
_LOGGER.info(
"Unblocked services %s for client '%s'", services_to_unblock, client_name
)
except AdGuardHomeError as err:
_LOGGER.error("Failed to unblock services for '%s': %s", client_name, err)
if success_count == 0:
_LOGGER.error("Failed to unblock services for %s on any instance", client_name)
async def emergency_unblock(self, call: ServiceCall) -> None:
"""Emergency unblock - disable protection temporarily."""
duration = call.data.get(ATTR_DURATION, 300)
clients = call.data.get(ATTR_CLIENTS, ["all"])
"""Emergency unblock - temporarily disable protection."""
duration = call.data[ATTR_DURATION]
clients = call.data[ATTR_CLIENTS]
try:
api = self._get_api()
_LOGGER.warning("Emergency unblock activated for %s seconds", duration)
if "all" in clients:
# Global protection disable
await api.set_protection(False)
_LOGGER.warning(
"Emergency unblock activated globally for %d seconds", duration
)
for api in self._get_api_instances():
try:
if "all" in clients:
await api.set_protection(False)
_LOGGER.warning("Protection disabled for %s:%s", api.host, api.port)
coordinator = self._get_coordinator()
await coordinator.async_request_refresh()
# Re-enable after duration
async def delayed_enable(api_instance: AdGuardHomeAPI):
await asyncio.sleep(duration)
try:
await api_instance.set_protection(True)
_LOGGER.info("Emergency unblock expired - protection re-enabled for %s:%s",
api_instance.host, api_instance.port)
except Exception as err:
_LOGGER.error("Failed to re-enable protection for %s:%s: %s",
api_instance.host, api_instance.port, err)
# Schedule re-enabling protection
async def restore_protection():
await asyncio.sleep(duration)
try:
if "all" in clients:
await api.set_protection(True)
asyncio.create_task(delayed_enable(api))
else:
# Individual client emergency unblock
for client_name in clients:
if client_name == "all":
continue
try:
client = await api.get_client_by_name(client_name)
if client:
update_data = {
"name": client_name,
"data": {**client, "filtering_enabled": False}
}
await api.update_client(update_data)
_LOGGER.info("Emergency unblock applied to client %s", client_name)
except Exception as err:
_LOGGER.error("Failed to emergency unblock client %s: %s", client_name, err)
await coordinator.async_request_refresh()
_LOGGER.info("Emergency unblock period ended, protection restored")
except Exception as err:
_LOGGER.error("Failed to restore protection after emergency unblock: %s", err)
# Schedule restoration
self.hass.async_create_task(restore_protection())
except AdGuardHomeError as err:
_LOGGER.error("Failed to activate emergency unblock: %s", err)
except AdGuardHomeError as err:
_LOGGER.error("AdGuard error during emergency unblock: %s", err)
except Exception as err:
_LOGGER.exception("Unexpected error during emergency unblock: %s", err)
async def add_client(self, call: ServiceCall) -> None:
"""Add a new client."""
client_data = dict(call.data)
try:
api = self._get_api()
await api.add_client(client_data)
_LOGGER.info("Adding new client: %s", client_data.get("name"))
coordinator = self._get_coordinator()
await coordinator.async_request_refresh()
success_count = 0
for api in self._get_api_instances():
try:
await api.add_client(client_data)
success_count += 1
_LOGGER.info("Successfully added client: %s", client_data.get("name"))
except AdGuardHomeError as err:
_LOGGER.error("AdGuard error adding client: %s", err)
except Exception as err:
_LOGGER.exception("Unexpected error adding client: %s", err)
_LOGGER.info("Added new client: %s", client_data["name"])
except AdGuardHomeError as err:
_LOGGER.error("Failed to add client '%s': %s", client_data["name"], err)
if success_count == 0:
_LOGGER.error("Failed to add client %s on any instance", client_data.get("name"))
async def remove_client(self, call: ServiceCall) -> None:
"""Remove a client."""
client_name = call.data["name"]
client_name = call.data.get("name")
try:
api = self._get_api()
await api.delete_client(client_name)
_LOGGER.info("Removing client: %s", client_name)
coordinator = self._get_coordinator()
await coordinator.async_request_refresh()
success_count = 0
for api in self._get_api_instances():
try:
await api.delete_client(client_name)
success_count += 1
_LOGGER.info("Successfully removed client: %s", client_name)
except AdGuardHomeError as err:
_LOGGER.error("AdGuard error removing client: %s", err)
except Exception as err:
_LOGGER.exception("Unexpected error removing client: %s", err)
_LOGGER.info("Removed client: %s", client_name)
except AdGuardHomeError as err:
_LOGGER.error("Failed to remove client '%s': %s", client_name, err)
if success_count == 0:
_LOGGER.error("Failed to remove client %s on any instance", client_name)
async def refresh_data(self, call: ServiceCall) -> None:
"""Refresh data from AdGuard Home."""
try:
coordinator = self._get_coordinator()
await coordinator.async_request_refresh()
"""Refresh data for all coordinators."""
_LOGGER.info("Manually refreshing AdGuard Control Hub data")
_LOGGER.info("Data refresh requested")
except Exception as err:
_LOGGER.error("Failed to refresh data: %s", err)
for entry_data in self.hass.data.get(DOMAIN, {}).values():
if isinstance(entry_data, dict) and "coordinator" in entry_data:
coordinator = entry_data["coordinator"]
try:
await coordinator.async_request_refresh()
_LOGGER.debug("Refreshed coordinator data")
except Exception as err:
_LOGGER.error("Failed to refresh coordinator: %s", err)

View File

@@ -6,7 +6,7 @@
"description": "Configure your AdGuard Home connection",
"data": {
"host": "Host",
"port": "Port",
"port": "Port",
"username": "Username (optional)",
"password": "Password (optional)",
"ssl": "Use SSL",

View File

@@ -1,16 +1,17 @@
"""AdGuard Control Hub switch platform."""
"""Switch platform for AdGuard Control Hub integration."""
import logging
from typing import Any, Dict, List, Optional
from typing import Any, Optional
from homeassistant.components.switch import SwitchEntity
from homeassistant.components.switch import SwitchEntity, SwitchDeviceClass
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.helpers.entity import DeviceInfo
from .api import AdGuardHomeAPI, AdGuardConnectionError
from .const import DOMAIN, MANUFACTURER
from . import AdGuardControlHubCoordinator
from .api import AdGuardHomeAPI, AdGuardHomeError
from .const import DOMAIN, ICON_PROTECTION, ICON_PROTECTION_OFF, ICON_CLIENT, MANUFACTURER
_LOGGER = logging.getLogger(__name__)
@@ -24,122 +25,189 @@ async def async_setup_entry(
coordinator = hass.data[DOMAIN][config_entry.entry_id]["coordinator"]
api = hass.data[DOMAIN][config_entry.entry_id]["api"]
entities: List[SwitchEntity] = []
entities = [AdGuardProtectionSwitch(coordinator, api)]
# Add main protection switch
entities.append(AdGuardProtectionSwitch(coordinator, api))
# Add client switches
for client_name in coordinator.clients:
# Add client switches if clients exist
for client_name in coordinator.clients.keys():
entities.append(AdGuardClientSwitch(coordinator, api, client_name))
async_add_entities(entities)
async_add_entities(entities, update_before_add=True)
class AdGuardProtectionSwitch(CoordinatorEntity, SwitchEntity):
"""AdGuard Home protection switch."""
class AdGuardBaseSwitch(CoordinatorEntity, SwitchEntity):
"""Base class for AdGuard switches."""
def __init__(self, coordinator, api: AdGuardHomeAPI) -> None:
def __init__(self, coordinator: AdGuardControlHubCoordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the switch."""
super().__init__(coordinator)
self.api = api
self._attr_device_info = {
"identifiers": {(DOMAIN, f"{api.host}:{api.port}")},
"name": f"AdGuard Control Hub ({api.host})",
"manufacturer": MANUFACTURER,
"model": "AdGuard Home",
"configuration_url": f"{'https' if api.ssl else 'http'}://{api.host}:{api.port}",
}
@property
def available(self) -> bool:
"""Return if switch is available."""
return self.coordinator.last_update_success
class AdGuardProtectionSwitch(AdGuardBaseSwitch):
"""Switch to control global AdGuard protection."""
def __init__(self, coordinator: AdGuardControlHubCoordinator, api: AdGuardHomeAPI) -> None:
"""Initialize the switch."""
super().__init__(coordinator, api)
self._attr_unique_id = f"{api.host}_{api.port}_protection"
self._attr_name = "AdGuard Protection"
self._attr_unique_id = f"{DOMAIN}_protection"
self._attr_device_class = SwitchDeviceClass.SWITCH
self._attr_entity_category = EntityCategory.CONFIG
@property
def device_info(self) -> DeviceInfo:
"""Return device info."""
return DeviceInfo(
identifiers={(DOMAIN, "adguard_home")},
name="AdGuard Home",
manufacturer=MANUFACTURER, # FIXED: Now uses imported MANUFACTURER
model="AdGuard Home",
configuration_url=self.api.base_url,
)
@property
def is_on(self) -> bool:
def is_on(self) -> Optional[bool]:
"""Return true if protection is enabled."""
return self.coordinator.protection_status.get("protection_enabled", False)
@property
def icon(self) -> str:
"""Return the icon."""
return "mdi:shield-check" if self.is_on else "mdi:shield-off"
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn on protection."""
try:
await self.api.set_protection(True)
await self.coordinator.async_request_refresh()
except AdGuardConnectionError as err:
_LOGGER.error("Failed to turn on protection: %s", err)
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn off protection."""
try:
await self.api.set_protection(False)
await self.coordinator.async_request_refresh()
except AdGuardConnectionError as err:
_LOGGER.error("Failed to turn off protection: %s", err)
class AdGuardClientSwitch(CoordinatorEntity, SwitchEntity):
"""AdGuard Home client switch."""
def __init__(self, coordinator, api: AdGuardHomeAPI, client_name: str) -> None:
"""Initialize the client switch."""
super().__init__(coordinator)
self.api = api
self._client_name = client_name
self._attr_name = f"AdGuard {client_name}"
self._attr_unique_id = f"{DOMAIN}_{client_name.lower().replace(' ', '_')}"
@property
def device_info(self) -> DeviceInfo:
"""Return device info."""
return DeviceInfo(
identifiers={(DOMAIN, f"client_{self._client_name}")},
name=f"AdGuard Client: {self._client_name}",
manufacturer=MANUFACTURER,
model="AdGuard Client",
via_device=(DOMAIN, "adguard_home"),
)
@property
def is_on(self) -> bool:
"""Return true if client filtering is enabled."""
client = self.coordinator.clients.get(self._client_name, {})
return not client.get("filtering_enabled", True) is False
@property
def icon(self) -> str:
"""Return the icon."""
return "mdi:devices" if self.is_on else "mdi:devices-off"
"""Return the icon for the switch."""
return ICON_PROTECTION if self.is_on else ICON_PROTECTION_OFF
@property
def available(self) -> bool:
"""Return if entity is available."""
return self._client_name in self.coordinator.clients
"""Return if switch is available."""
return self.coordinator.last_update_success and bool(self.coordinator.protection_status)
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return additional state attributes."""
status = self.coordinator.protection_status
return {
"dns_port": status.get("dns_port", "N/A"),
"version": status.get("version", "N/A"),
"running": status.get("running", False),
"dns_addresses": status.get("dns_addresses", []),
}
async def async_turn_on(self, **kwargs: Any) -> None:
"""Enable filtering for client."""
"""Turn on AdGuard protection."""
try:
client = await self.api.get_client_by_name(self._client_name)
if client:
client["filtering_enabled"] = True
await self.api._request("POST", "/control/clients/update", json=client)
await self.coordinator.async_request_refresh()
except AdGuardConnectionError as err:
_LOGGER.error("Failed to enable filtering for %s: %s", self._client_name, err)
await self.api.set_protection(True)
await self.coordinator.async_request_refresh()
_LOGGER.info("AdGuard protection enabled")
except AdGuardHomeError as err:
_LOGGER.error("Failed to enable AdGuard protection: %s", err)
raise
except Exception as err:
_LOGGER.exception("Unexpected error enabling AdGuard protection")
raise
async def async_turn_off(self, **kwargs: Any) -> None:
"""Disable filtering for client."""
"""Turn off AdGuard protection."""
try:
client = await self.api.get_client_by_name(self._client_name)
await self.api.set_protection(False)
await self.coordinator.async_request_refresh()
_LOGGER.warning("AdGuard protection disabled")
except AdGuardHomeError as err:
_LOGGER.error("Failed to disable AdGuard protection: %s", err)
raise
except Exception as err:
_LOGGER.exception("Unexpected error disabling AdGuard protection")
raise
class AdGuardClientSwitch(AdGuardBaseSwitch):
"""Switch to control client-specific protection."""
def __init__(
self,
coordinator: AdGuardControlHubCoordinator,
api: AdGuardHomeAPI,
client_name: str,
) -> None:
"""Initialize the switch."""
super().__init__(coordinator, api)
self.client_name = client_name
self._attr_unique_id = f"{api.host}_{api.port}_client_{client_name}"
self._attr_name = f"AdGuard {client_name}"
self._attr_icon = ICON_CLIENT
self._attr_device_class = SwitchDeviceClass.SWITCH
self._attr_entity_category = EntityCategory.CONFIG
@property
def is_on(self) -> Optional[bool]:
"""Return true if client protection is enabled."""
client = self.coordinator.clients.get(self.client_name, {})
return client.get("filtering_enabled", True)
@property
def available(self) -> bool:
"""Return if switch is available."""
return (
self.coordinator.last_update_success
and self.client_name in self.coordinator.clients
)
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return additional state attributes."""
client = self.coordinator.clients.get(self.client_name, {})
blocked_services = client.get("blocked_services", {})
if isinstance(blocked_services, dict):
blocked_list = blocked_services.get("ids", [])
else:
blocked_list = blocked_services or []
return {
"client_ids": client.get("ids", []),
"safebrowsing_enabled": client.get("safebrowsing_enabled", False),
"parental_enabled": client.get("parental_enabled", False),
"safesearch_enabled": client.get("safesearch_enabled", False),
"blocked_services_count": len(blocked_list),
"blocked_services": blocked_list,
}
async def async_turn_on(self, **kwargs: Any) -> None:
"""Enable protection for this client."""
try:
client = await self.api.get_client_by_name(self.client_name)
if client:
client["filtering_enabled"] = False
await self.api._request("POST", "/control/clients/update", json=client)
update_data = {
"name": self.client_name,
"data": {**client, "filtering_enabled": True}
}
await self.api.update_client(update_data)
await self.coordinator.async_request_refresh()
except AdGuardConnectionError as err:
_LOGGER.error("Failed to disable filtering for %s: %s", self._client_name, err)
_LOGGER.info("Enabled protection for client: %s", self.client_name)
else:
_LOGGER.error("Client not found: %s", self.client_name)
except AdGuardHomeError as err:
_LOGGER.error("Failed to enable protection for %s: %s", self.client_name, err)
raise
except Exception as err:
_LOGGER.exception("Unexpected error enabling protection for %s", self.client_name)
raise
async def async_turn_off(self, **kwargs: Any) -> None:
"""Disable protection for this client."""
try:
client = await self.api.get_client_by_name(self.client_name)
if client:
update_data = {
"name": self.client_name,
"data": {**client, "filtering_enabled": False}
}
await self.api.update_client(update_data)
await self.coordinator.async_request_refresh()
_LOGGER.info("Disabled protection for client: %s", self.client_name)
else:
_LOGGER.error("Client not found: %s", self.client_name)
except AdGuardHomeError as err:
_LOGGER.error("Failed to disable protection for %s: %s", self.client_name, err)
raise
except Exception as err:
_LOGGER.exception("Unexpected error disabling protection for %s", self.client_name)
raise