Coverage for custom_components/remote_logger/exporter.py: 95%

133 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-07 04:46 +0000

1import asyncio 

2import json 

3import logging 

4import threading 

5from abc import abstractmethod 

6from dataclasses import dataclass 

7from typing import TYPE_CHECKING, Any 

8 

9from homeassistant.auth import EVENT_USER_ADDED, EVENT_USER_REMOVED, EVENT_USER_UPDATED, HomeAssistant 

10from homeassistant.components.automation import EVENT_AUTOMATION_TRIGGERED 

11from homeassistant.components.script import EVENT_SCRIPT_STARTED 

12from homeassistant.const import EVENT_COMPONENT_LOADED, EVENT_STATE_CHANGED 

13from homeassistant.core import EVENT_CALL_SERVICE, EVENT_SERVICE_REGISTERED, EVENT_SERVICE_REMOVED, Event, callback 

14from homeassistant.util import dt as dt_util 

15 

16from custom_components.remote_logger.const import BATCH_FLUSH_INTERVAL_SECONDS 

17 

18if TYPE_CHECKING: 

19 import datetime as dt 

20 

21_LOGGER = logging.getLogger(__name__) 

22 

23_HA_EVENT_BODY_ATTRIBUTE_KEYS: frozenset[str] = frozenset({"entity_id", "domain", "service"}) 

24 

25 

26def _event_data_serializer(obj: Any) -> Any: 

27 """JSON serializer for HA event data objects.""" 

28 if hasattr(obj, "as_dict"): 

29 return obj.as_dict() 

30 if hasattr(obj, "value"): 

31 return obj.value 

32 return str(obj) 

33 

34 

35@dataclass 

36class LogMessage: 

37 payload: Any 

38 sent: bool = False 

39 

40 

41class LogSubmission: 

42 @abstractmethod 

43 def for_display(self) -> dict[str, Any]: 

44 pass 

45 

46 

47class LogExporter: 

48 """Base class for log exporters""" 

49 

50 logger_type: str 

51 

52 def __init__(self, hass: HomeAssistant) -> None: 

53 self._hass: HomeAssistant = hass 

54 self.name: str = self.logger_type 

55 self.destination: tuple[str, ...] 

56 

57 self._batch_max_size: int 

58 self.event_count: int = 0 

59 self.last_event: dt.datetime | None = None 

60 self.posting_count: int = 0 

61 self.last_posting: dt.datetime | None = None 

62 self.format_error_count: int = 0 

63 self.last_format_error_message: str | None = None 

64 self.last_format_error: dt.datetime | None = None 

65 self.posting_error_count: int = 0 

66 self.last_posting_error_message: str | None = None 

67 self.last_posting_error: dt.datetime | None = None 

68 

69 self._buffer: list[LogMessage] = [] 

70 self.self_source: str = f"custom_components/remote_logger/{self.logger_type}" 

71 self.last_sent_payload: LogSubmission | None = None 

72 self.flushing: threading.Event = threading.Event() 

73 

74 async def disable_buffer(self) -> None: 

75 """Flush logs and prevent future buffering, use for shutdowns""" 

76 self._batch_max_size = 0 

77 self.flushing.clear() 

78 await self.flush() 

79 

80 @callback 

81 def handle_event(self, event: Event) -> None: 

82 self.on_event() 

83 if ( 

84 event.data 

85 and event.data.get("source") 

86 and len(event.data["source"]) == 2 

87 and self.self_source in event.data["source"][0] 

88 ): 

89 # prevent log loops 

90 return 

91 try: 

92 record: LogMessage = self._to_log_record(event) 

93 self._buffer.append(record) 

94 

95 if len(self._buffer) >= self._batch_max_size: 

96 self._hass.async_create_task(self.flush()) 

97 except Exception as e: 

98 _LOGGER.error("remote_logger: %s handler failure %s on %s", self.logger_type, e, event.data) 

99 self.on_format_error(str(e)) 

100 

101 @callback 

102 def handle_ha_event(self, event_type: str, event: Event, state_only: bool = False, event_body: bool = False) -> None: 

103 """Handle a non-system-log HA event (lifecycle, core change, or custom).""" 

104 self.on_event() 

105 general_fields: list[str] = ["message", "id", "entity_id", "name", "component", "device_id"] 

106 try: 

107 if ( 

108 event_type == EVENT_CALL_SERVICE 

109 and event.data.get("domain") == "system_log" 

110 and event.data.get("service") == "write" 

111 ): 

112 # don't double count log events 

113 return 

114 if event_type == EVENT_STATE_CHANGED: 

115 old_state: str = (event.data["old_state"] and event.data["old_state"].state) or "N/A" 

116 new_state: str = (event.data["new_state"] and event.data["new_state"].state) or "N/A" 

117 message = [event_type, ":", event.data["entity_id"], old_state, "->", new_state] 

118 elif event_type in (EVENT_CALL_SERVICE, EVENT_SERVICE_REGISTERED, EVENT_SERVICE_REMOVED): 

119 message = [event_type, ":", event.data["domain"], event.data["service"]] 

120 elif event_type == EVENT_COMPONENT_LOADED: 

121 message = [event_type, ":", event.data["component"]] 

122 elif event_type in (EVENT_SCRIPT_STARTED, EVENT_AUTOMATION_TRIGGERED): 

123 message = [event_type, ":", event.data["name"], event.data["entity_id"]] 

124 elif event_type in (EVENT_USER_ADDED, EVENT_USER_REMOVED, EVENT_USER_UPDATED): 

125 message = [event_type, ":", event.data["user_id"]] 

126 elif any(v in event.data for v in general_fields): 

127 message = [event_type, ":"] + [event.data[v] for v in general_fields if v in event.data] 

128 else: 

129 message = [event_type] 

130 

131 if event_body: 

132 flat_event: dict[str, Any] = {k: v for k, v in event.data.items() if k not in _HA_EVENT_BODY_ATTRIBUTE_KEYS} 

133 event.data = {k: v for k, v in event.data.items() if k in _HA_EVENT_BODY_ATTRIBUTE_KEYS} 

134 if flat_event: 

135 event.data["event.data"] = json.dumps(flat_event, default=_event_data_serializer, indent=2) 

136 

137 record: LogMessage = self._to_log_record( 

138 event, message_override=message, level_override="INFO", state_only=state_only 

139 ) 

140 self._buffer.append(record) 

141 if len(self._buffer) >= self._batch_max_size: 

142 self._hass.async_create_task(self.flush()) 

143 except Exception as e: 

144 _LOGGER.error("remote_logger: %s ha_event handler failure %s on %s", self.logger_type, e, event_type) 

145 self.on_format_error(str(e)) 

146 

147 @abstractmethod 

148 def _to_log_record( 

149 self, 

150 event: Event, 

151 message_override: list[str] | None = None, 

152 level_override: str | None = None, 

153 state_only: bool = False, 

154 ) -> LogMessage: 

155 pass 

156 

157 @callback 

158 @abstractmethod 

159 async def flush(self) -> None: 

160 pass 

161 

162 async def flush_loop(self) -> None: 

163 """Periodically flush buffered log records.""" 

164 self.flushing.set() 

165 try: 

166 while self.flushing.is_set(): 

167 await asyncio.sleep(BATCH_FLUSH_INTERVAL_SECONDS) 

168 await self.flush() 

169 except asyncio.CancelledError: 

170 raise 

171 

172 async def close(self) -> None: 

173 """Clean up resources (no-op for HTTP-based exporter).""" 

174 pass 

175 

176 def on_format_error(self, message: str) -> None: 

177 self.format_error_count += 1 

178 self.last_format_error_message = message 

179 self.last_format_error = dt_util.now() 

180 

181 def on_posting_error(self, message: str) -> None: 

182 self.posting_error_count += 1 

183 self.last_posting_error_message = message 

184 self.last_posting_error = dt_util.now() 

185 

186 def on_success(self) -> None: 

187 self.posting_count += 1 

188 self.last_posting = dt_util.now() 

189 

190 def on_event(self) -> None: 

191 self.event_count += 1 

192 self.last_event = dt_util.now() 

193 

194 @abstractmethod 

195 def log_direct(self, event_name: str, message: str, level: str, attributes: dict[str, Any] | None = None) -> None: 

196 """Buffer a custom syslog record without requiring a HA Event."""