Coverage for custom_components / remote_logger / syslog / exporter.py: 98%

196 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-22 21:55 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import contextlib 

5import logging 

6import os 

7import socket 

8import ssl 

9import time 

10from dataclasses import dataclass 

11from typing import TYPE_CHECKING, Any, cast 

12 

13from homeassistant.components.system_log import EVENT_SYSTEM_LOG 

14from homeassistant.const import CONF_HOST, CONF_PORT, CONF_PROTOCOL 

15 

16from custom_components.remote_logger.const import ( 

17 CONF_APP_NAME, 

18 CONF_BATCH_MAX_SIZE, 

19 CONF_CLIENT_TIMEOUT, 

20 CONF_FACILITY, 

21 CONF_SUPPRESS_SYSTEM_LOG_EVENT_NAME, 

22 CONF_USE_TLS, 

23 DEFAULT_CLIENT_TIMEOUT, 

24) 

25from custom_components.remote_logger.exporter import LogExporter, LogMessage, LogSubmission 

26from custom_components.remote_logger.helpers import flatten_event_data, isotimestamp 

27 

28from .const import ( 

29 DEFAULT_APP_NAME, 

30 DEFAULT_FACILITY, 

31 DEFAULT_SYSLOG_SEVERITY, 

32 PROTOCOL_UDP, 

33 SYSLOG_FACILITY_MAP, 

34 SYSLOG_SEVERITY_MAP, 

35) 

36 

37if TYPE_CHECKING: 

38 import datetime as dt 

39 from collections.abc import Mapping 

40 

41 from homeassistant.config_entries import ConfigEntry 

42 from homeassistant.core import HomeAssistant 

43 

44_LOGGER = logging.getLogger(__name__) 

45 

46 

47@dataclass 

48class SyslogMessage(LogMessage): 

49 payload: bytes 

50 

51 

52class SyslogSubmission(LogSubmission): 

53 def __init__(self, records: list[SyslogMessage], protocol: str) -> None: 

54 self.records: list[SyslogMessage] = records 

55 self.protocol: str = protocol 

56 

57 def for_display(self) -> dict[str, Any]: 

58 return { 

59 "protocol": self.protocol, 

60 "data": cast("str", os.linesep).join(r.payload.decode("utf-8") for r in self.records), 

61 } 

62 

63 

64class SyslogExporter(LogExporter): 

65 """Buffers system_log_event records and flushes them as RFC 5424 syslog messages.""" 

66 

67 logger_type = "syslog" 

68 

69 def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None: 

70 super().__init__(hass) 

71 self.name = entry.title 

72 self._in_progress: list[SyslogMessage] = [] 

73 self._lock = asyncio.Lock() 

74 

75 opts = {**entry.data, **entry.options} 

76 

77 self._host = opts[CONF_HOST] 

78 self._port = opts[CONF_PORT] 

79 self._protocol = opts.get(CONF_PROTOCOL, PROTOCOL_UDP) 

80 self.destination = (self._host, str(self._port), self._protocol) 

81 self._use_tls = opts.get(CONF_USE_TLS, False) 

82 self._app_name = opts.get(CONF_APP_NAME, DEFAULT_APP_NAME) 

83 facility_name = opts.get(CONF_FACILITY, DEFAULT_FACILITY) 

84 self._facility = SYSLOG_FACILITY_MAP.get(facility_name, 1) 

85 self._batch_max_size = opts.get(CONF_BATCH_MAX_SIZE, 10) 

86 self._client_timeout = opts.get(CONF_CLIENT_TIMEOUT, DEFAULT_CLIENT_TIMEOUT) 

87 self._suppress_system_log_event_name = opts.get(CONF_SUPPRESS_SYSTEM_LOG_EVENT_NAME, True) 

88 self._hostname = "-" 

89 

90 # TCP connection state (lazily created) 

91 self._tcp_reader: asyncio.StreamReader | None = None 

92 self._tcp_writer: asyncio.StreamWriter | None = None 

93 

94 # UDP transport state (lazily created) 

95 self._udp_transport: asyncio.DatagramTransport | None = None 

96 

97 self.endpoint_desc = ( 

98 f"syslog://{self._host}:{self._port} ({self._protocol.upper()}" 

99 f"{'+TLS' if self._use_tls and self._protocol != PROTOCOL_UDP else ''})" 

100 ) 

101 _LOGGER.info(f"remote_logger: syslog configured for {self.endpoint_desc}") 

102 

103 def create_log_record( 

104 self, 

105 event_data: Mapping[str, Any], 

106 event_type: str | None = None, 

107 time_fired: dt.datetime | None = None, 

108 message_override: list[str] | None = None, 

109 level_override: str | None = None, 

110 state_only: bool = False, 

111 ) -> SyslogMessage: 

112 """Convert a system_log_event payload to an RFC 5424 syslog message.""" 

113 """ 

114 "name": str 

115 "message": list(str) 

116 "level": str 

117 "source": (str,int) 

118 "timestamp": float 

119 "exception": str 

120 "count": int 

121 "first_occurred": float 

122 """ 

123 data = event_data 

124 level: str = level_override or data.get("level", "INFO").upper() 

125 severity = SYSLOG_SEVERITY_MAP.get(level, DEFAULT_SYSLOG_SEVERITY) 

126 pri = self._facility * 8 + severity 

127 

128 # RFC 3339 timestamp 

129 timestamp_s: float = data.get("timestamp", time_fired.timestamp() if time_fired else time.time()) 

130 timestamp = isotimestamp(timestamp_s) 

131 

132 # Message body 

133 messages: list[str] = message_override or data.get("message", []) 

134 msg = " ".join(messages) if messages else "-" 

135 

136 # Structured data with meta info 

137 sd = "-" 

138 sd_params: list[str] = [] 

139 if event_type == EVENT_SYSTEM_LOG: 

140 source = data.get("source") 

141 if source and isinstance(source, tuple): 

142 source_path, source_linenum = source 

143 sd_params.append(f'code.file.path="{_sd_escape(source_path)}"') 

144 sd_params.append(f'code.line.number="{source_linenum}"') 

145 logger_name = data.get("name") 

146 if logger_name: 

147 sd_params.append(f'code.function.name="{_sd_escape(logger_name)}"') 

148 if data.get("count"): 

149 sd_params.append(f'exception.count="{data["count"]}"') 

150 if data.get("first_occurred"): 

151 sd_params.append(f'exception.first_occurred="{isotimestamp(data["first_occurred"])}"') 

152 

153 exception = data.get("exception") 

154 if exception: 

155 sd_params.append(f'exception.stacktrace="{data["exception"]}"') 

156 if not self._suppress_system_log_event_name: 

157 sd_params.append(f'eventName="{EVENT_SYSTEM_LOG}"') 

158 msgid: str = EVENT_SYSTEM_LOG 

159 else: 

160 msgid = "-" 

161 else: 

162 if event_type is not None: 

163 sd_params.append(f"eventName={event_type}") 

164 # Use HA event type as MSGID for non-system-log events; "-" otherwise 

165 msgid = event_type or "-" 

166 for k, v in data.items(): 

167 for flat_key, flat_val in flatten_event_data(f"event.data.{k}" if k != "event.data" else k, v, state_only): 

168 sd_params.append(f'{_sd_escape(flat_key)}="{_sd_escape(str(flat_val))}"') 

169 

170 if sd_params: 

171 sd = f"[opentelemetry {' '.join(sd_params)}]" 

172 

173 # RFC 5424: <PRI>VERSION SP TIMESTAMP SP HOSTNAME SP APP-NAME SP PROCID SP MSGID SP SD [SP MSG] 

174 # VERSION = 1, PROCID = - 

175 syslog_line = f"<{pri}>1 {timestamp} {self._hostname} {self._app_name} - {msgid} {sd} {msg}" 

176 

177 return SyslogMessage(payload=syslog_line.encode("utf-8", errors="replace")) 

178 

179 def log_direct(self, event_name: str, message: str, level: str, attributes: dict[str, Any] | None = None) -> None: 

180 """Buffer a custom syslog record without requiring a HA Event.""" 

181 severity = SYSLOG_SEVERITY_MAP.get(level.upper(), DEFAULT_SYSLOG_SEVERITY) 

182 pri = self._facility * 8 + severity 

183 timestamp = isotimestamp(time.time()) 

184 sd = "-" 

185 if attributes: 

186 sd_params = [f'{_sd_escape(k)}="{_sd_escape(str(v))}"' for k, v in attributes.items()] 

187 sd = f"[opentelemetry {' '.join(sd_params)}]" 

188 syslog_line = f"<{pri}>1 {timestamp} {self._hostname} {self._app_name} - {event_name} {sd} {message}" 

189 record = SyslogMessage(payload=syslog_line.encode("utf-8", errors="replace")) 

190 self._buffer.append(record) 

191 self.on_event() 

192 if len(self._buffer) >= self._batch_max_size: 

193 self._hass.async_create_task(self.flush()) 

194 

195 async def flush(self) -> None: 

196 """Flush all buffered log records to the syslog endpoint.""" 

197 records: list[SyslogMessage] | None = None 

198 async with self._lock: 

199 if not self._in_progress: 

200 if not self._buffer: 

201 return 

202 records = cast("list[SyslogMessage]", self._buffer.copy()) 

203 self._buffer.clear() 

204 

205 try: 

206 if records: 

207 self._in_progress = records 

208 else: 

209 self._in_progress = [m for m in self._in_progress if not m.sent] 

210 

211 if self._protocol == PROTOCOL_UDP: 

212 await self._send_udp(self._in_progress) 

213 else: 

214 await self._send_tcp(self._in_progress) 

215 sent: list[SyslogMessage] = [m for m in self._in_progress if m.sent] 

216 if sent: 

217 self.last_sent_payload = SyslogSubmission(sent, protocol=self._protocol) 

218 self.on_success() 

219 self._in_progress = [m for m in self._in_progress if not m.sent] 

220 except Exception as e: 

221 _LOGGER.exception("remote_logger: unexpected error sending syslog messages") 

222 self.on_posting_error(str(e)) 

223 

224 async def _send_udp(self, messages: list[SyslogMessage]) -> None: 

225 """Send syslog messages over UDP.""" 

226 try: 

227 if self._udp_transport is None or self._udp_transport.is_closing(): 

228 loop = asyncio.get_running_loop() 

229 self._udp_transport, _ = await loop.create_datagram_endpoint( 

230 asyncio.DatagramProtocol, 

231 remote_addr=(self._host, self._port), 

232 ) 

233 for msg in messages: 

234 self._udp_transport.sendto(msg.payload) 

235 msg.sent = True 

236 except OSError as err: 

237 _LOGGER.warning("remote_logger: failed to send syslog via UDP: %s", err) 

238 self._udp_transport = None 

239 

240 async def _send_tcp(self, messages: list[SyslogMessage]) -> None: 

241 """Send syslog messages over TCP with octet-counting framing (RFC 6587).""" 

242 try: 

243 if self._tcp_writer is None or self._tcp_writer.is_closing(): 

244 await self._connect_tcp() 

245 

246 writer: asyncio.StreamWriter | None = self._tcp_writer 

247 if writer is None: 

248 raise OSError("Unable to create TCP writer") # Set by _connect_tcp above 

249 

250 for msg in messages: 

251 # Octet-counting: "LEN SP MSG" 

252 frame = f"{len(msg.payload)} ".encode("ascii") + msg.payload 

253 writer.write(frame) 

254 await writer.drain() 

255 for msg in messages: 

256 msg.sent = True 

257 except (OSError, ConnectionError) as err: 

258 _LOGGER.warning("remote_logger: failed to send syslog via TCP: %s", err) 

259 await self._close_tcp() 

260 

261 async def _connect_tcp(self) -> None: 

262 """Establish a TCP connection to the syslog server.""" 

263 ssl_ctx: ssl.SSLContext | None = None 

264 if self._use_tls: 

265 ssl_ctx = ssl.create_default_context() 

266 

267 self._tcp_reader, self._tcp_writer = await asyncio.wait_for( 

268 asyncio.open_connection(self._host, self._port, ssl=ssl_ctx), 

269 timeout=self._client_timeout, 

270 ) 

271 

272 async def _close_tcp(self) -> None: 

273 """Close the TCP connection.""" 

274 if self._tcp_writer is not None: 

275 with contextlib.suppress(Exception): 

276 self._tcp_writer.close() 

277 await self._tcp_writer.wait_closed() 

278 self._tcp_writer = None 

279 self._tcp_reader = None 

280 

281 async def close(self) -> None: 

282 """Clean up transport resources.""" 

283 if self._udp_transport is not None: 

284 self._udp_transport.close() 

285 self._udp_transport = None 

286 await self._close_tcp() 

287 

288 

289def _sd_escape(value: str) -> str: 

290 """Escape special characters for RFC 5424 structured data values.""" 

291 return value.replace("\\", "\\\\").replace('"', '\\"').replace("]", "\\]") 

292 

293 

294async def validate(hass: Any, host: str, port: int, protocol: str, use_tls: bool) -> str | None: 

295 """Test connectivity to a syslog endpoint. Returns error key or None.""" 

296 loop = hass.loop 

297 try: 

298 if protocol == PROTOCOL_UDP: 

299 # Quick UDP test: just resolve and create a socket 

300 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 

301 try: 

302 sock.setblocking(False) 

303 await loop.run_in_executor(None, lambda: socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_DGRAM)) 

304 finally: 

305 sock.close() 

306 else: 

307 # TCP: actually connect 

308 ssl_ctx = True if use_tls else None 

309 _, writer = await asyncio.wait_for( 

310 asyncio.open_connection(host, port, ssl=ssl_ctx), 

311 timeout=10, 

312 ) 

313 writer.close() 

314 await writer.wait_closed() 

315 except (OSError, TimeoutError, ConnectionRefusedError) as err: 

316 _LOGGER.error("remote_logger: Syslog connect failed: %s", err) 

317 return "cannot_connect" 

318 except Exception as err: 

319 _LOGGER.error("remote_logger: Syslog connect unknown error: %s", err) 

320 return "unknown" 

321 return None