Coverage for custom_components / remote_logger / exporter.py: 91%
171 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-22 21:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-22 21:55 +0000
1import asyncio
2import json
3import logging
4import threading
5from abc import abstractmethod
6from collections.abc import Mapping
7from dataclasses import dataclass
8from typing import TYPE_CHECKING, Any
10from homeassistant.auth import EVENT_USER_ADDED, EVENT_USER_REMOVED, EVENT_USER_UPDATED, HomeAssistant
11from homeassistant.components.automation import EVENT_AUTOMATION_TRIGGERED
12from homeassistant.components.script import EVENT_SCRIPT_STARTED
13from homeassistant.const import EVENT_COMPONENT_LOADED, EVENT_STATE_CHANGED
14from homeassistant.core import EVENT_CALL_SERVICE, EVENT_SERVICE_REGISTERED, EVENT_SERVICE_REMOVED, Event, callback
15from homeassistant.helpers.area_registry import EVENT_AREA_REGISTRY_UPDATED
16from homeassistant.helpers.category_registry import EVENT_CATEGORY_REGISTRY_UPDATED
17from homeassistant.helpers.device_registry import EVENT_DEVICE_REGISTRY_UPDATED
18from homeassistant.helpers.entity_registry import EVENT_ENTITY_REGISTRY_UPDATED
19from homeassistant.helpers.floor_registry import EVENT_FLOOR_REGISTRY_UPDATED
20from homeassistant.helpers.label_registry import EVENT_LABEL_REGISTRY_UPDATED
21from homeassistant.util import dt as dt_util
23from custom_components.remote_logger.const import BATCH_FLUSH_INTERVAL_SECONDS
25if TYPE_CHECKING:
26 import datetime as dt
27 from collections.abc import Mapping
29_LOGGER = logging.getLogger(__name__)
31_HA_EVENT_BODY_ATTRIBUTE_KEYS: frozenset[str] = frozenset({"entity_id", "domain", "service"})
34def _event_data_serializer(obj: Any) -> Any:
35 """JSON serializer for HA event data objects."""
36 if hasattr(obj, "as_dict"):
37 return obj.as_dict()
38 if hasattr(obj, "value"):
39 return obj.value
40 return str(obj)
43@dataclass
44class LogMessage:
45 payload: Any
46 sent: bool = False
49class LogSubmission:
50 @abstractmethod
51 def for_display(self) -> dict[str, Any]:
52 pass
55class LogExporter:
56 """Base class for log exporters"""
58 logger_type: str
60 def __init__(self, hass: HomeAssistant) -> None:
61 self._hass: HomeAssistant = hass
62 self.name: str = self.logger_type
63 self.destination: tuple[str, ...]
64 self.tz = dt_util.get_default_time_zone()
66 self._batch_max_size: int
67 self.event_count: int = 0
68 self.last_event: dt.datetime | None = None
69 self.posting_count: int = 0
70 self.last_posting: dt.datetime | None = None
71 self.format_error_count: int = 0
72 self.last_format_error_message: str | None = None
73 self.last_format_error: dt.datetime | None = None
74 self.posting_error_count: int = 0
75 self.last_posting_error_message: str | None = None
76 self.last_posting_error: dt.datetime | None = None
78 self._buffer: list[LogMessage] = []
79 self.self_source: str = f"custom_components/remote_logger/{self.logger_type}"
80 self.last_sent_payload: LogSubmission | None = None
81 self.flushing: threading.Event = threading.Event()
83 async def disable_buffer(self) -> None:
84 """Flush logs and prevent future buffering, use for shutdowns"""
85 self._batch_max_size = 0
86 self.flushing.clear()
87 await self.flush()
89 @callback
90 def handle_event(self, event: Event) -> None:
91 self.on_event()
92 if (
93 event.data
94 and event.data.get("source")
95 and len(event.data["source"]) == 2
96 and self.self_source in event.data["source"][0]
97 ):
98 # prevent log loops
99 return
100 try:
101 record: LogMessage = self.create_log_record(event.data, event.event_type, event.time_fired)
102 self._buffer.append(record)
104 if len(self._buffer) >= self._batch_max_size:
105 self._hass.async_create_task(self.flush())
106 except Exception as e:
107 _LOGGER.error("remote_logger: %s event handler failure %s on %s", self.logger_type, e, event.data)
108 self.on_format_error(str(e))
110 def handle_entry(self, entry: Mapping[str, Any], time_fired: dt.datetime) -> None:
111 self.on_event()
112 if entry and entry.get("source") and len(entry["source"]) == 2 and self.self_source in entry["source"][0]:
113 # prevent log loops
114 return
115 try:
116 record: LogMessage = self.create_log_record(entry, None, time_fired)
117 self._buffer.append(record)
119 if len(self._buffer) >= self._batch_max_size:
120 try:
121 asyncio.get_running_loop().create_task(self.flush())
122 except RuntimeError:
123 self._hass.create_task(self.flush())
124 except Exception as e:
125 _LOGGER.error("remote_logger: %s entry handler failure %s on %s", self.logger_type, e, entry)
126 self.on_format_error(str(e))
128 @abstractmethod
129 def create_log_record(
130 self,
131 event_data: Mapping[str, Any],
132 event_type: str | None = None,
133 time_fired: dt.datetime | None = None,
134 message_override: list[str] | None = None,
135 level_override: str | None = None,
136 state_only: bool = False,
137 ) -> LogMessage:
138 pass
140 @callback
141 def handle_ha_event(self, event_type: str, event: Event, state_only: bool = False, event_body: bool = False) -> None:
142 """Handle a non-system-log HA event (lifecycle, core change, or custom)."""
143 self.on_event()
144 title_fields: list[str] = ["message", "id", "entity_id", "name", "component", "device_id"]
145 try:
146 if (
147 event_type == EVENT_CALL_SERVICE
148 and event.data.get("domain") == "system_log"
149 and event.data.get("service") == "write"
150 ):
151 # don't double count log events
152 return
153 if event_type == EVENT_STATE_CHANGED:
154 old_state: str = (event.data["old_state"] and event.data["old_state"].state) or "N/A"
155 new_state: str = (event.data["new_state"] and event.data["new_state"].state) or "N/A"
156 message: list[str] = [event_type, ":", event.data["entity_id"], old_state, "->", new_state]
157 elif event_type in (EVENT_CALL_SERVICE, EVENT_SERVICE_REGISTERED, EVENT_SERVICE_REMOVED):
158 message = [event_type, ":", event.data["domain"], event.data["service"]]
159 elif event_type == EVENT_COMPONENT_LOADED:
160 message = [event_type, ":", event.data["component"]]
161 elif event_type in (EVENT_SCRIPT_STARTED, EVENT_AUTOMATION_TRIGGERED):
162 message = [event_type, ":", event.data["entity_id"]]
163 elif event_type == EVENT_DEVICE_REGISTRY_UPDATED:
164 message = [event_type, ":", event.data["device_id"], event.data.get("action", "???")]
165 elif event_type == EVENT_ENTITY_REGISTRY_UPDATED:
166 message = [event_type, ":", event.data["entity_id"], event.data.get("action", "???")]
167 elif event_type == EVENT_LABEL_REGISTRY_UPDATED:
168 message = [event_type, ":", event.data["label_id"], event.data.get("action", "???")]
169 elif event_type == EVENT_AREA_REGISTRY_UPDATED:
170 message = [event_type, ":", event.data["area_id"], event.data.get("action", "???")]
171 elif event_type == EVENT_CATEGORY_REGISTRY_UPDATED:
172 message = [event_type, ":", event.data["category_id"], event.data.get("action", "???")]
173 elif event_type == EVENT_FLOOR_REGISTRY_UPDATED:
174 message = [event_type, ":", event.data["floor_id"], event.data.get("action", "???")]
175 elif event_type in (EVENT_USER_ADDED, EVENT_USER_REMOVED, EVENT_USER_UPDATED):
176 message = [event_type, ":", event.data["user_id"]]
177 elif event_type == "autoarm_change":
178 message = [
179 event_type,
180 ":",
181 event.data["original_state"],
182 "->",
183 event.data["new_state"],
184 " from ",
185 event.data["change_source"],
186 ]
187 elif any(v in event.data for v in title_fields):
188 message = [event_type, ":"] + [event.data[v] for v in title_fields if v in event.data]
189 else:
190 message = [event_type]
192 if event_body:
193 flat_event: dict[str, Any] = {k: v for k, v in event.data.items() if k not in _HA_EVENT_BODY_ATTRIBUTE_KEYS}
194 event_data: dict[str, Any] = {k: v for k, v in event.data.items() if k in _HA_EVENT_BODY_ATTRIBUTE_KEYS}
195 if flat_event:
196 event_data["event.data"] = json.dumps(flat_event, default=_event_data_serializer, indent=2)
197 else:
198 event_data = dict(event.data)
200 record: LogMessage = self.create_log_record(
201 event_data,
202 event.event_type,
203 event.time_fired,
204 message_override=message,
205 level_override="INFO",
206 state_only=state_only,
207 )
208 self._buffer.append(record)
209 if len(self._buffer) >= self._batch_max_size:
210 self._hass.async_create_task(self.flush())
211 except Exception as e:
212 _LOGGER.error("remote_logger: %s ha_event handler failure %s on %s", self.logger_type, e, event_type)
213 self.on_format_error(str(e))
215 @callback
216 @abstractmethod
217 async def flush(self) -> None:
218 pass
220 async def flush_loop(self) -> None:
221 """Periodically flush buffered log records."""
222 self.flushing.set()
223 try:
224 while self.flushing.is_set():
225 await asyncio.sleep(BATCH_FLUSH_INTERVAL_SECONDS)
226 await self.flush()
227 except asyncio.CancelledError:
228 raise
230 async def close(self) -> None:
231 """Clean up resources (no-op for HTTP-based exporter)."""
232 pass
234 def on_format_error(self, message: str) -> None:
235 self.format_error_count += 1
236 self.last_format_error_message = message
237 self.last_format_error = dt_util.now()
239 def on_posting_error(self, message: str) -> None:
240 self.posting_error_count += 1
241 self.last_posting_error_message = message
242 self.last_posting_error = dt_util.now()
244 def on_success(self) -> None:
245 self.posting_count += 1
246 self.last_posting = dt_util.now()
248 def on_event(self) -> None:
249 self.event_count += 1
250 self.last_event = dt_util.now()
252 @abstractmethod
253 def log_direct(self, event_name: str, message: str, level: str, attributes: dict[str, Any] | None = None) -> None:
254 """Buffer a custom syslog record without requiring a HA Event."""