Coverage for custom_components / remote_logger / exporter.py: 91%

171 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-22 21:55 +0000

1import asyncio 

2import json 

3import logging 

4import threading 

5from abc import abstractmethod 

6from collections.abc import Mapping 

7from dataclasses import dataclass 

8from typing import TYPE_CHECKING, Any 

9 

10from homeassistant.auth import EVENT_USER_ADDED, EVENT_USER_REMOVED, EVENT_USER_UPDATED, HomeAssistant 

11from homeassistant.components.automation import EVENT_AUTOMATION_TRIGGERED 

12from homeassistant.components.script import EVENT_SCRIPT_STARTED 

13from homeassistant.const import EVENT_COMPONENT_LOADED, EVENT_STATE_CHANGED 

14from homeassistant.core import EVENT_CALL_SERVICE, EVENT_SERVICE_REGISTERED, EVENT_SERVICE_REMOVED, Event, callback 

15from homeassistant.helpers.area_registry import EVENT_AREA_REGISTRY_UPDATED 

16from homeassistant.helpers.category_registry import EVENT_CATEGORY_REGISTRY_UPDATED 

17from homeassistant.helpers.device_registry import EVENT_DEVICE_REGISTRY_UPDATED 

18from homeassistant.helpers.entity_registry import EVENT_ENTITY_REGISTRY_UPDATED 

19from homeassistant.helpers.floor_registry import EVENT_FLOOR_REGISTRY_UPDATED 

20from homeassistant.helpers.label_registry import EVENT_LABEL_REGISTRY_UPDATED 

21from homeassistant.util import dt as dt_util 

22 

23from custom_components.remote_logger.const import BATCH_FLUSH_INTERVAL_SECONDS 

24 

25if TYPE_CHECKING: 

26 import datetime as dt 

27 from collections.abc import Mapping 

28 

29_LOGGER = logging.getLogger(__name__) 

30 

31_HA_EVENT_BODY_ATTRIBUTE_KEYS: frozenset[str] = frozenset({"entity_id", "domain", "service"}) 

32 

33 

34def _event_data_serializer(obj: Any) -> Any: 

35 """JSON serializer for HA event data objects.""" 

36 if hasattr(obj, "as_dict"): 

37 return obj.as_dict() 

38 if hasattr(obj, "value"): 

39 return obj.value 

40 return str(obj) 

41 

42 

43@dataclass 

44class LogMessage: 

45 payload: Any 

46 sent: bool = False 

47 

48 

49class LogSubmission: 

50 @abstractmethod 

51 def for_display(self) -> dict[str, Any]: 

52 pass 

53 

54 

55class LogExporter: 

56 """Base class for log exporters""" 

57 

58 logger_type: str 

59 

60 def __init__(self, hass: HomeAssistant) -> None: 

61 self._hass: HomeAssistant = hass 

62 self.name: str = self.logger_type 

63 self.destination: tuple[str, ...] 

64 self.tz = dt_util.get_default_time_zone() 

65 

66 self._batch_max_size: int 

67 self.event_count: int = 0 

68 self.last_event: dt.datetime | None = None 

69 self.posting_count: int = 0 

70 self.last_posting: dt.datetime | None = None 

71 self.format_error_count: int = 0 

72 self.last_format_error_message: str | None = None 

73 self.last_format_error: dt.datetime | None = None 

74 self.posting_error_count: int = 0 

75 self.last_posting_error_message: str | None = None 

76 self.last_posting_error: dt.datetime | None = None 

77 

78 self._buffer: list[LogMessage] = [] 

79 self.self_source: str = f"custom_components/remote_logger/{self.logger_type}" 

80 self.last_sent_payload: LogSubmission | None = None 

81 self.flushing: threading.Event = threading.Event() 

82 

83 async def disable_buffer(self) -> None: 

84 """Flush logs and prevent future buffering, use for shutdowns""" 

85 self._batch_max_size = 0 

86 self.flushing.clear() 

87 await self.flush() 

88 

89 @callback 

90 def handle_event(self, event: Event) -> None: 

91 self.on_event() 

92 if ( 

93 event.data 

94 and event.data.get("source") 

95 and len(event.data["source"]) == 2 

96 and self.self_source in event.data["source"][0] 

97 ): 

98 # prevent log loops 

99 return 

100 try: 

101 record: LogMessage = self.create_log_record(event.data, event.event_type, event.time_fired) 

102 self._buffer.append(record) 

103 

104 if len(self._buffer) >= self._batch_max_size: 

105 self._hass.async_create_task(self.flush()) 

106 except Exception as e: 

107 _LOGGER.error("remote_logger: %s event handler failure %s on %s", self.logger_type, e, event.data) 

108 self.on_format_error(str(e)) 

109 

110 def handle_entry(self, entry: Mapping[str, Any], time_fired: dt.datetime) -> None: 

111 self.on_event() 

112 if entry and entry.get("source") and len(entry["source"]) == 2 and self.self_source in entry["source"][0]: 

113 # prevent log loops 

114 return 

115 try: 

116 record: LogMessage = self.create_log_record(entry, None, time_fired) 

117 self._buffer.append(record) 

118 

119 if len(self._buffer) >= self._batch_max_size: 

120 try: 

121 asyncio.get_running_loop().create_task(self.flush()) 

122 except RuntimeError: 

123 self._hass.create_task(self.flush()) 

124 except Exception as e: 

125 _LOGGER.error("remote_logger: %s entry handler failure %s on %s", self.logger_type, e, entry) 

126 self.on_format_error(str(e)) 

127 

128 @abstractmethod 

129 def create_log_record( 

130 self, 

131 event_data: Mapping[str, Any], 

132 event_type: str | None = None, 

133 time_fired: dt.datetime | None = None, 

134 message_override: list[str] | None = None, 

135 level_override: str | None = None, 

136 state_only: bool = False, 

137 ) -> LogMessage: 

138 pass 

139 

140 @callback 

141 def handle_ha_event(self, event_type: str, event: Event, state_only: bool = False, event_body: bool = False) -> None: 

142 """Handle a non-system-log HA event (lifecycle, core change, or custom).""" 

143 self.on_event() 

144 title_fields: list[str] = ["message", "id", "entity_id", "name", "component", "device_id"] 

145 try: 

146 if ( 

147 event_type == EVENT_CALL_SERVICE 

148 and event.data.get("domain") == "system_log" 

149 and event.data.get("service") == "write" 

150 ): 

151 # don't double count log events 

152 return 

153 if event_type == EVENT_STATE_CHANGED: 

154 old_state: str = (event.data["old_state"] and event.data["old_state"].state) or "N/A" 

155 new_state: str = (event.data["new_state"] and event.data["new_state"].state) or "N/A" 

156 message: list[str] = [event_type, ":", event.data["entity_id"], old_state, "->", new_state] 

157 elif event_type in (EVENT_CALL_SERVICE, EVENT_SERVICE_REGISTERED, EVENT_SERVICE_REMOVED): 

158 message = [event_type, ":", event.data["domain"], event.data["service"]] 

159 elif event_type == EVENT_COMPONENT_LOADED: 

160 message = [event_type, ":", event.data["component"]] 

161 elif event_type in (EVENT_SCRIPT_STARTED, EVENT_AUTOMATION_TRIGGERED): 

162 message = [event_type, ":", event.data["entity_id"]] 

163 elif event_type == EVENT_DEVICE_REGISTRY_UPDATED: 

164 message = [event_type, ":", event.data["device_id"], event.data.get("action", "???")] 

165 elif event_type == EVENT_ENTITY_REGISTRY_UPDATED: 

166 message = [event_type, ":", event.data["entity_id"], event.data.get("action", "???")] 

167 elif event_type == EVENT_LABEL_REGISTRY_UPDATED: 

168 message = [event_type, ":", event.data["label_id"], event.data.get("action", "???")] 

169 elif event_type == EVENT_AREA_REGISTRY_UPDATED: 

170 message = [event_type, ":", event.data["area_id"], event.data.get("action", "???")] 

171 elif event_type == EVENT_CATEGORY_REGISTRY_UPDATED: 

172 message = [event_type, ":", event.data["category_id"], event.data.get("action", "???")] 

173 elif event_type == EVENT_FLOOR_REGISTRY_UPDATED: 

174 message = [event_type, ":", event.data["floor_id"], event.data.get("action", "???")] 

175 elif event_type in (EVENT_USER_ADDED, EVENT_USER_REMOVED, EVENT_USER_UPDATED): 

176 message = [event_type, ":", event.data["user_id"]] 

177 elif event_type == "autoarm_change": 

178 message = [ 

179 event_type, 

180 ":", 

181 event.data["original_state"], 

182 "->", 

183 event.data["new_state"], 

184 " from ", 

185 event.data["change_source"], 

186 ] 

187 elif any(v in event.data for v in title_fields): 

188 message = [event_type, ":"] + [event.data[v] for v in title_fields if v in event.data] 

189 else: 

190 message = [event_type] 

191 

192 if event_body: 

193 flat_event: dict[str, Any] = {k: v for k, v in event.data.items() if k not in _HA_EVENT_BODY_ATTRIBUTE_KEYS} 

194 event_data: dict[str, Any] = {k: v for k, v in event.data.items() if k in _HA_EVENT_BODY_ATTRIBUTE_KEYS} 

195 if flat_event: 

196 event_data["event.data"] = json.dumps(flat_event, default=_event_data_serializer, indent=2) 

197 else: 

198 event_data = dict(event.data) 

199 

200 record: LogMessage = self.create_log_record( 

201 event_data, 

202 event.event_type, 

203 event.time_fired, 

204 message_override=message, 

205 level_override="INFO", 

206 state_only=state_only, 

207 ) 

208 self._buffer.append(record) 

209 if len(self._buffer) >= self._batch_max_size: 

210 self._hass.async_create_task(self.flush()) 

211 except Exception as e: 

212 _LOGGER.error("remote_logger: %s ha_event handler failure %s on %s", self.logger_type, e, event_type) 

213 self.on_format_error(str(e)) 

214 

215 @callback 

216 @abstractmethod 

217 async def flush(self) -> None: 

218 pass 

219 

220 async def flush_loop(self) -> None: 

221 """Periodically flush buffered log records.""" 

222 self.flushing.set() 

223 try: 

224 while self.flushing.is_set(): 

225 await asyncio.sleep(BATCH_FLUSH_INTERVAL_SECONDS) 

226 await self.flush() 

227 except asyncio.CancelledError: 

228 raise 

229 

230 async def close(self) -> None: 

231 """Clean up resources (no-op for HTTP-based exporter).""" 

232 pass 

233 

234 def on_format_error(self, message: str) -> None: 

235 self.format_error_count += 1 

236 self.last_format_error_message = message 

237 self.last_format_error = dt_util.now() 

238 

239 def on_posting_error(self, message: str) -> None: 

240 self.posting_error_count += 1 

241 self.last_posting_error_message = message 

242 self.last_posting_error = dt_util.now() 

243 

244 def on_success(self) -> None: 

245 self.posting_count += 1 

246 self.last_posting = dt_util.now() 

247 

248 def on_event(self) -> None: 

249 self.event_count += 1 

250 self.last_event = dt_util.now() 

251 

252 @abstractmethod 

253 def log_direct(self, event_name: str, message: str, level: str, attributes: dict[str, Any] | None = None) -> None: 

254 """Buffer a custom syslog record without requiring a HA Event."""