Coverage for custom_components/remote_logger/exporter.py: 95%
133 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-07 04:46 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-07 04:46 +0000
1import asyncio
2import json
3import logging
4import threading
5from abc import abstractmethod
6from dataclasses import dataclass
7from typing import TYPE_CHECKING, Any
9from homeassistant.auth import EVENT_USER_ADDED, EVENT_USER_REMOVED, EVENT_USER_UPDATED, HomeAssistant
10from homeassistant.components.automation import EVENT_AUTOMATION_TRIGGERED
11from homeassistant.components.script import EVENT_SCRIPT_STARTED
12from homeassistant.const import EVENT_COMPONENT_LOADED, EVENT_STATE_CHANGED
13from homeassistant.core import EVENT_CALL_SERVICE, EVENT_SERVICE_REGISTERED, EVENT_SERVICE_REMOVED, Event, callback
14from homeassistant.util import dt as dt_util
16from custom_components.remote_logger.const import BATCH_FLUSH_INTERVAL_SECONDS
18if TYPE_CHECKING:
19 import datetime as dt
21_LOGGER = logging.getLogger(__name__)
23_HA_EVENT_BODY_ATTRIBUTE_KEYS: frozenset[str] = frozenset({"entity_id", "domain", "service"})
26def _event_data_serializer(obj: Any) -> Any:
27 """JSON serializer for HA event data objects."""
28 if hasattr(obj, "as_dict"):
29 return obj.as_dict()
30 if hasattr(obj, "value"):
31 return obj.value
32 return str(obj)
35@dataclass
36class LogMessage:
37 payload: Any
38 sent: bool = False
41class LogSubmission:
42 @abstractmethod
43 def for_display(self) -> dict[str, Any]:
44 pass
47class LogExporter:
48 """Base class for log exporters"""
50 logger_type: str
52 def __init__(self, hass: HomeAssistant) -> None:
53 self._hass: HomeAssistant = hass
54 self.name: str = self.logger_type
55 self.destination: tuple[str, ...]
57 self._batch_max_size: int
58 self.event_count: int = 0
59 self.last_event: dt.datetime | None = None
60 self.posting_count: int = 0
61 self.last_posting: dt.datetime | None = None
62 self.format_error_count: int = 0
63 self.last_format_error_message: str | None = None
64 self.last_format_error: dt.datetime | None = None
65 self.posting_error_count: int = 0
66 self.last_posting_error_message: str | None = None
67 self.last_posting_error: dt.datetime | None = None
69 self._buffer: list[LogMessage] = []
70 self.self_source: str = f"custom_components/remote_logger/{self.logger_type}"
71 self.last_sent_payload: LogSubmission | None = None
72 self.flushing: threading.Event = threading.Event()
74 async def disable_buffer(self) -> None:
75 """Flush logs and prevent future buffering, use for shutdowns"""
76 self._batch_max_size = 0
77 self.flushing.clear()
78 await self.flush()
80 @callback
81 def handle_event(self, event: Event) -> None:
82 self.on_event()
83 if (
84 event.data
85 and event.data.get("source")
86 and len(event.data["source"]) == 2
87 and self.self_source in event.data["source"][0]
88 ):
89 # prevent log loops
90 return
91 try:
92 record: LogMessage = self._to_log_record(event)
93 self._buffer.append(record)
95 if len(self._buffer) >= self._batch_max_size:
96 self._hass.async_create_task(self.flush())
97 except Exception as e:
98 _LOGGER.error("remote_logger: %s handler failure %s on %s", self.logger_type, e, event.data)
99 self.on_format_error(str(e))
101 @callback
102 def handle_ha_event(self, event_type: str, event: Event, state_only: bool = False, event_body: bool = False) -> None:
103 """Handle a non-system-log HA event (lifecycle, core change, or custom)."""
104 self.on_event()
105 general_fields: list[str] = ["message", "id", "entity_id", "name", "component", "device_id"]
106 try:
107 if (
108 event_type == EVENT_CALL_SERVICE
109 and event.data.get("domain") == "system_log"
110 and event.data.get("service") == "write"
111 ):
112 # don't double count log events
113 return
114 if event_type == EVENT_STATE_CHANGED:
115 old_state: str = (event.data["old_state"] and event.data["old_state"].state) or "N/A"
116 new_state: str = (event.data["new_state"] and event.data["new_state"].state) or "N/A"
117 message = [event_type, ":", event.data["entity_id"], old_state, "->", new_state]
118 elif event_type in (EVENT_CALL_SERVICE, EVENT_SERVICE_REGISTERED, EVENT_SERVICE_REMOVED):
119 message = [event_type, ":", event.data["domain"], event.data["service"]]
120 elif event_type == EVENT_COMPONENT_LOADED:
121 message = [event_type, ":", event.data["component"]]
122 elif event_type in (EVENT_SCRIPT_STARTED, EVENT_AUTOMATION_TRIGGERED):
123 message = [event_type, ":", event.data["name"], event.data["entity_id"]]
124 elif event_type in (EVENT_USER_ADDED, EVENT_USER_REMOVED, EVENT_USER_UPDATED):
125 message = [event_type, ":", event.data["user_id"]]
126 elif any(v in event.data for v in general_fields):
127 message = [event_type, ":"] + [event.data[v] for v in general_fields if v in event.data]
128 else:
129 message = [event_type]
131 if event_body:
132 flat_event: dict[str, Any] = {k: v for k, v in event.data.items() if k not in _HA_EVENT_BODY_ATTRIBUTE_KEYS}
133 event.data = {k: v for k, v in event.data.items() if k in _HA_EVENT_BODY_ATTRIBUTE_KEYS}
134 if flat_event:
135 event.data["event.data"] = json.dumps(flat_event, default=_event_data_serializer, indent=2)
137 record: LogMessage = self._to_log_record(
138 event, message_override=message, level_override="INFO", state_only=state_only
139 )
140 self._buffer.append(record)
141 if len(self._buffer) >= self._batch_max_size:
142 self._hass.async_create_task(self.flush())
143 except Exception as e:
144 _LOGGER.error("remote_logger: %s ha_event handler failure %s on %s", self.logger_type, e, event_type)
145 self.on_format_error(str(e))
147 @abstractmethod
148 def _to_log_record(
149 self,
150 event: Event,
151 message_override: list[str] | None = None,
152 level_override: str | None = None,
153 state_only: bool = False,
154 ) -> LogMessage:
155 pass
157 @callback
158 @abstractmethod
159 async def flush(self) -> None:
160 pass
162 async def flush_loop(self) -> None:
163 """Periodically flush buffered log records."""
164 self.flushing.set()
165 try:
166 while self.flushing.is_set():
167 await asyncio.sleep(BATCH_FLUSH_INTERVAL_SECONDS)
168 await self.flush()
169 except asyncio.CancelledError:
170 raise
172 async def close(self) -> None:
173 """Clean up resources (no-op for HTTP-based exporter)."""
174 pass
176 def on_format_error(self, message: str) -> None:
177 self.format_error_count += 1
178 self.last_format_error_message = message
179 self.last_format_error = dt_util.now()
181 def on_posting_error(self, message: str) -> None:
182 self.posting_error_count += 1
183 self.last_posting_error_message = message
184 self.last_posting_error = dt_util.now()
186 def on_success(self) -> None:
187 self.posting_count += 1
188 self.last_posting = dt_util.now()
190 def on_event(self) -> None:
191 self.event_count += 1
192 self.last_event = dt_util.now()
194 @abstractmethod
195 def log_direct(self, event_name: str, message: str, level: str, attributes: dict[str, Any] | None = None) -> None:
196 """Buffer a custom syslog record without requiring a HA Event."""