Coverage for custom_components/remote_logger/syslog/exporter.py: 98%

199 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-07 04:46 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import contextlib 

5import logging 

6import os 

7import socket 

8import ssl 

9import time 

10from dataclasses import dataclass 

11from typing import TYPE_CHECKING, Any, cast 

12 

13from homeassistant.const import CONF_HOST, CONF_PORT, CONF_PROTOCOL 

14from homeassistant.core import Event, HomeAssistant, callback 

15 

16from custom_components.remote_logger.const import ( 

17 CONF_APP_NAME, 

18 CONF_BATCH_MAX_SIZE, 

19 CONF_CLIENT_TIMEOUT, 

20 CONF_FACILITY, 

21 CONF_USE_TLS, 

22 DEFAULT_CLIENT_TIMEOUT, 

23 EVENT_SYSTEM_LOG, 

24) 

25from custom_components.remote_logger.exporter import LogExporter, LogMessage, LogSubmission 

26from custom_components.remote_logger.helpers import flatten_event_data, isotimestamp 

27 

28from .const import ( 

29 DEFAULT_APP_NAME, 

30 DEFAULT_FACILITY, 

31 DEFAULT_SYSLOG_SEVERITY, 

32 PROTOCOL_UDP, 

33 SYSLOG_FACILITY_MAP, 

34 SYSLOG_SEVERITY_MAP, 

35) 

36 

37if TYPE_CHECKING: 

38 from homeassistant.config_entries import ConfigEntry 

39 

40_LOGGER = logging.getLogger(__name__) 

41 

42 

43@dataclass 

44class SyslogMessage(LogMessage): 

45 payload: bytes 

46 

47 

48class SyslogSubmission(LogSubmission): 

49 def __init__(self, records: list[SyslogMessage], protocol: str) -> None: 

50 self.records: list[SyslogMessage] = records 

51 self.protocol: str = protocol 

52 

53 def for_display(self) -> dict[str, Any]: 

54 return { 

55 "protocol": self.protocol, 

56 "data": cast("str", os.linesep).join(r.payload.decode("utf-8") for r in self.records), 

57 } 

58 

59 

60class SyslogExporter(LogExporter): 

61 """Buffers system_log_event records and flushes them as RFC 5424 syslog messages.""" 

62 

63 logger_type = "syslog" 

64 

65 def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None: 

66 super().__init__(hass) 

67 self.name = entry.title 

68 self._in_progress: list[SyslogMessage] = [] 

69 self._lock = asyncio.Lock() 

70 

71 opts = {**entry.data, **entry.options} 

72 

73 self._host = opts[CONF_HOST] 

74 self._port = opts[CONF_PORT] 

75 self._protocol = opts.get(CONF_PROTOCOL, PROTOCOL_UDP) 

76 self.destination = (self._host, str(self._port), self._protocol) 

77 self._use_tls = opts.get(CONF_USE_TLS, False) 

78 self._app_name = opts.get(CONF_APP_NAME, DEFAULT_APP_NAME) 

79 facility_name = opts.get(CONF_FACILITY, DEFAULT_FACILITY) 

80 self._facility = SYSLOG_FACILITY_MAP.get(facility_name, 1) 

81 self._batch_max_size = opts.get(CONF_BATCH_MAX_SIZE, 10) 

82 self._client_timeout = opts.get(CONF_CLIENT_TIMEOUT, DEFAULT_CLIENT_TIMEOUT) 

83 self._hostname = "-" 

84 

85 # TCP connection state (lazily created) 

86 self._tcp_reader: asyncio.StreamReader | None = None 

87 self._tcp_writer: asyncio.StreamWriter | None = None 

88 

89 # UDP transport state (lazily created) 

90 self._udp_transport: asyncio.DatagramTransport | None = None 

91 

92 self.endpoint_desc = ( 

93 f"syslog://{self._host}:{self._port} ({self._protocol.upper()}" 

94 f"{'+TLS' if self._use_tls and self._protocol != PROTOCOL_UDP else ''})" 

95 ) 

96 _LOGGER.info(f"remote_logger: syslog configured for {self.endpoint_desc}") 

97 

98 @callback 

99 def handle_event(self, event: Event) -> None: 

100 """Receive a system_log_event and buffer it.""" 

101 self.on_event() 

102 if ( 

103 event.data 

104 and event.data.get("source") 

105 and len(event.data["source"]) == 2 

106 and "custom_components/remote_logger/syslog" in event.data["source"][0] 

107 ): 

108 # prevent log loops 

109 return 

110 self._buffer.append(self._to_log_record(event)) 

111 if len(self._buffer) >= self._batch_max_size: 

112 self._hass.async_create_task(self.flush()) 

113 

114 def _to_log_record( 

115 self, 

116 event: Event, 

117 message_override: list[str] | None = None, 

118 level_override: str | None = None, 

119 state_only: bool = False, 

120 ) -> SyslogMessage: 

121 """Convert a system_log_event payload to an RFC 5424 syslog message.""" 

122 """ 

123 "name": str 

124 "message": list(str) 

125 "level": str 

126 "source": (str,int) 

127 "timestamp": float 

128 "exception": str 

129 "count": int 

130 "first_occurred": float 

131 """ 

132 data = event.data 

133 level: str = level_override or data.get("level", "INFO").upper() 

134 severity = SYSLOG_SEVERITY_MAP.get(level, DEFAULT_SYSLOG_SEVERITY) 

135 pri = self._facility * 8 + severity 

136 

137 # RFC 3339 timestamp 

138 timestamp_s: float = data.get("timestamp", time.time()) 

139 timestamp = isotimestamp(timestamp_s) 

140 

141 # Message body 

142 messages: list[str] = message_override or data.get("message", []) 

143 msg = " ".join(messages) if messages else "-" 

144 

145 # Structured data with meta info 

146 sd = "-" 

147 sd_params: list[str] = [] 

148 if event.event_type == EVENT_SYSTEM_LOG: 

149 source = data.get("source") 

150 if source and isinstance(source, tuple): 

151 source_path, source_linenum = source 

152 sd_params.append(f'code.file.path="{_sd_escape(source_path)}"') 

153 sd_params.append(f'code.line.number="{source_linenum}"') 

154 logger_name = data.get("name") 

155 if logger_name: 

156 sd_params.append(f'code.function.name="{_sd_escape(logger_name)}"') 

157 if data.get("count"): 

158 sd_params.append(f'exception.count="{data["count"]}"') 

159 if data.get("first_occurred"): 

160 sd_params.append(f'exception.first_occurred="{isotimestamp(data["first_occurred"])}"') 

161 

162 exception = data.get("exception") 

163 if exception: 

164 sd_params.append(f'exception.stacktrace="{data["exception"]}"') 

165 msgid: str = "-" 

166 else: 

167 sd_params.append(f"eventName={event.event_type}") 

168 # Use HA event type as MSGID for non-system-log events; "-" otherwise 

169 msgid = event.event_type or "-" 

170 for k, v in data.items(): 

171 for flat_key, flat_val in flatten_event_data(f"event.data.{k}" if k != "event.data" else k, v, state_only): 

172 sd_params.append(f'{_sd_escape(flat_key)}="{_sd_escape(str(flat_val))}"') 

173 

174 if sd_params: 

175 sd = f"[opentelemetry {' '.join(sd_params)}]" 

176 

177 # RFC 5424: <PRI>VERSION SP TIMESTAMP SP HOSTNAME SP APP-NAME SP PROCID SP MSGID SP SD [SP MSG] 

178 # VERSION = 1, PROCID = - 

179 syslog_line = f"<{pri}>1 {timestamp} {self._hostname} {self._app_name} - {msgid} {sd} {msg}" 

180 

181 return SyslogMessage(payload=syslog_line.encode("utf-8", errors="replace")) 

182 

183 def log_direct(self, event_name: str, message: str, level: str, attributes: dict[str, Any] | None = None) -> None: 

184 """Buffer a custom syslog record without requiring a HA Event.""" 

185 severity = SYSLOG_SEVERITY_MAP.get(level.upper(), DEFAULT_SYSLOG_SEVERITY) 

186 pri = self._facility * 8 + severity 

187 timestamp = isotimestamp(time.time()) 

188 sd = "-" 

189 if attributes: 

190 sd_params = [f'{_sd_escape(k)}="{_sd_escape(str(v))}"' for k, v in attributes.items()] 

191 sd = f"[opentelemetry {' '.join(sd_params)}]" 

192 syslog_line = f"<{pri}>1 {timestamp} {self._hostname} {self._app_name} - {event_name} {sd} {message}" 

193 record = SyslogMessage(payload=syslog_line.encode("utf-8", errors="replace")) 

194 self._buffer.append(record) 

195 self.on_event() 

196 if len(self._buffer) >= self._batch_max_size: 

197 self._hass.async_create_task(self.flush()) 

198 

199 async def flush(self) -> None: 

200 """Flush all buffered log records to the syslog endpoint.""" 

201 records: list[SyslogMessage] | None = None 

202 async with self._lock: 

203 if not self._in_progress: 

204 if not self._buffer: 

205 return 

206 records = cast("list[SyslogMessage]", self._buffer.copy()) 

207 self._buffer.clear() 

208 

209 try: 

210 if records: 

211 self._in_progress = records 

212 else: 

213 self._in_progress = [m for m in self._in_progress if not m.sent] 

214 

215 if self._protocol == PROTOCOL_UDP: 

216 await self._send_udp(self._in_progress) 

217 else: 

218 await self._send_tcp(self._in_progress) 

219 sent: list[SyslogMessage] = [m for m in self._in_progress if m.sent] 

220 if sent: 

221 self.last_sent_payload = SyslogSubmission(sent, protocol=self._protocol) 

222 self.on_success() 

223 self._in_progress = [m for m in self._in_progress if not m.sent] 

224 except Exception as e: 

225 _LOGGER.exception("remote_logger: unexpected error sending syslog messages") 

226 self.on_posting_error(str(e)) 

227 

228 async def _send_udp(self, messages: list[SyslogMessage]) -> None: 

229 """Send syslog messages over UDP.""" 

230 try: 

231 if self._udp_transport is None or self._udp_transport.is_closing(): 

232 loop = asyncio.get_running_loop() 

233 self._udp_transport, _ = await loop.create_datagram_endpoint( 

234 asyncio.DatagramProtocol, 

235 remote_addr=(self._host, self._port), 

236 ) 

237 for msg in messages: 

238 self._udp_transport.sendto(msg.payload) 

239 msg.sent = True 

240 except OSError as err: 

241 _LOGGER.warning("remote_logger: failed to send syslog via UDP: %s", err) 

242 self._udp_transport = None 

243 

244 async def _send_tcp(self, messages: list[SyslogMessage]) -> None: 

245 """Send syslog messages over TCP with octet-counting framing (RFC 6587).""" 

246 try: 

247 if self._tcp_writer is None or self._tcp_writer.is_closing(): 

248 await self._connect_tcp() 

249 

250 writer: asyncio.StreamWriter | None = self._tcp_writer 

251 if writer is None: 

252 raise OSError("Unable to create TCP writer") # Set by _connect_tcp above 

253 

254 for msg in messages: 

255 # Octet-counting: "LEN SP MSG" 

256 frame = f"{len(msg.payload)} ".encode("ascii") + msg.payload 

257 writer.write(frame) 

258 await writer.drain() 

259 for msg in messages: 

260 msg.sent = True 

261 except (OSError, ConnectionError) as err: 

262 _LOGGER.warning("remote_logger: failed to send syslog via TCP: %s", err) 

263 await self._close_tcp() 

264 

265 async def _connect_tcp(self) -> None: 

266 """Establish a TCP connection to the syslog server.""" 

267 ssl_ctx: ssl.SSLContext | None = None 

268 if self._use_tls: 

269 ssl_ctx = ssl.create_default_context() 

270 

271 self._tcp_reader, self._tcp_writer = await asyncio.wait_for( 

272 asyncio.open_connection(self._host, self._port, ssl=ssl_ctx), 

273 timeout=self._client_timeout, 

274 ) 

275 

276 async def _close_tcp(self) -> None: 

277 """Close the TCP connection.""" 

278 if self._tcp_writer is not None: 

279 with contextlib.suppress(Exception): 

280 self._tcp_writer.close() 

281 await self._tcp_writer.wait_closed() 

282 self._tcp_writer = None 

283 self._tcp_reader = None 

284 

285 async def close(self) -> None: 

286 """Clean up transport resources.""" 

287 if self._udp_transport is not None: 

288 self._udp_transport.close() 

289 self._udp_transport = None 

290 await self._close_tcp() 

291 

292 

293def _sd_escape(value: str) -> str: 

294 """Escape special characters for RFC 5424 structured data values.""" 

295 return value.replace("\\", "\\\\").replace('"', '\\"').replace("]", "\\]") 

296 

297 

298async def validate(hass: Any, host: str, port: int, protocol: str, use_tls: bool) -> str | None: 

299 """Test connectivity to a syslog endpoint. Returns error key or None.""" 

300 loop = hass.loop 

301 try: 

302 if protocol == PROTOCOL_UDP: 

303 # Quick UDP test: just resolve and create a socket 

304 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 

305 try: 

306 sock.setblocking(False) 

307 await loop.run_in_executor(None, lambda: socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_DGRAM)) 

308 finally: 

309 sock.close() 

310 else: 

311 # TCP: actually connect 

312 ssl_ctx = True if use_tls else None 

313 _, writer = await asyncio.wait_for( 

314 asyncio.open_connection(host, port, ssl=ssl_ctx), 

315 timeout=10, 

316 ) 

317 writer.close() 

318 await writer.wait_closed() 

319 except (OSError, TimeoutError, ConnectionRefusedError) as err: 

320 _LOGGER.error("Syslog connect failed: %s", err) 

321 return "cannot_connect" 

322 except Exception as err: 

323 _LOGGER.error("Syslog connect unknown error: %s", err) 

324 return "unknown" 

325 return None