Coverage for custom_components/remote_logger/syslog/exporter.py: 97%

167 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-02-18 22:41 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import contextlib 

5import logging 

6import socket 

7import ssl 

8import time 

9from dataclasses import dataclass 

10from datetime import UTC, datetime 

11from typing import TYPE_CHECKING, Any 

12 

13from homeassistant.core import Event, HomeAssistant, callback 

14 

15from custom_components.remote_logger.const import ( 

16 BATCH_FLUSH_INTERVAL_SECONDS, 

17 CONF_APP_NAME, 

18 CONF_BATCH_MAX_SIZE, 

19 CONF_FACILITY, 

20 CONF_HOST, 

21 CONF_PORT, 

22 CONF_PROTOCOL, 

23 CONF_USE_TLS, 

24) 

25 

26from .const import ( 

27 DEFAULT_APP_NAME, 

28 DEFAULT_FACILITY, 

29 DEFAULT_SYSLOG_SEVERITY, 

30 PROTOCOL_UDP, 

31 SYSLOG_FACILITY_MAP, 

32 SYSLOG_SEVERITY_MAP, 

33) 

34 

35if TYPE_CHECKING: 

36 from collections.abc import Mapping 

37 

38 from homeassistant.config_entries import ConfigEntry 

39 

40_LOGGER = logging.getLogger(__name__) 

41 

42 

43@dataclass 

44class Message: 

45 payload: bytes 

46 sent: bool = False 

47 

48 

49class SyslogExporter: 

50 """Buffers system_log_event records and flushes them as RFC 5424 syslog messages.""" 

51 

52 def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None: 

53 self._hass = hass 

54 self._buffer: list[Mapping[str, Any]] = [] 

55 self._in_progress: list[Message] = [] 

56 self._lock = asyncio.Lock() 

57 

58 self._host = entry.data[CONF_HOST] 

59 self._port = entry.data[CONF_PORT] 

60 self._protocol = entry.data.get(CONF_PROTOCOL, PROTOCOL_UDP) 

61 self._use_tls = entry.data.get(CONF_USE_TLS, False) 

62 self._app_name = entry.data.get(CONF_APP_NAME, DEFAULT_APP_NAME) 

63 facility_name = entry.data.get(CONF_FACILITY, DEFAULT_FACILITY) 

64 self._facility = SYSLOG_FACILITY_MAP.get(facility_name, 1) 

65 self._batch_max_size = entry.data.get(CONF_BATCH_MAX_SIZE, 10) 

66 self._hostname = "-" 

67 

68 # TCP connection state (lazily created) 

69 self._tcp_reader: asyncio.StreamReader | None = None 

70 self._tcp_writer: asyncio.StreamWriter | None = None 

71 

72 # UDP transport state (lazily created) 

73 self._udp_transport: asyncio.DatagramTransport | None = None 

74 

75 self.endpoint_desc = ( 

76 f"syslog://{self._host}:{self._port} ({self._protocol.upper()}" 

77 f"{'+TLS' if self._use_tls and self._protocol != PROTOCOL_UDP else ''})" 

78 ) 

79 _LOGGER.info(f"remote_logger: syslog configured for {self.endpoint_desc}") 

80 

81 @callback 

82 def handle_event(self, event: Event) -> None: 

83 """Receive a system_log_event and buffer it.""" 

84 if ( 

85 event.data 

86 and event.data.get("source") 

87 and len(event.data["source"]) == 2 

88 and "custom_components/remote_logger/syslog" in event.data["source"][0] 

89 ): 

90 # prevent log loops 

91 return 

92 self._buffer.append(event.data) 

93 if len(self._buffer) >= self._batch_max_size: 

94 self._hass.async_create_task(self.flush()) 

95 

96 def _to_syslog_message(self, data: Mapping[str, Any]) -> Message: 

97 """Convert a system_log_event payload to an RFC 5424 syslog message.""" 

98 """ 

99 "name": str 

100 "message": list(str) 

101 "level": str 

102 "source": (str,int) 

103 "timestamp": float 

104 "exception": str 

105 "count": int 

106 "first_occurred": float 

107 """ 

108 level: str = data.get("level", "INFO").upper() 

109 severity = SYSLOG_SEVERITY_MAP.get(level, DEFAULT_SYSLOG_SEVERITY) 

110 pri = self._facility * 8 + severity 

111 

112 # RFC 5424 timestamp 

113 timestamp_s: float = data.get("timestamp", time.time()) 

114 dt = datetime.fromtimestamp(timestamp_s, tz=UTC) 

115 timestamp = dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 

116 

117 # Message body 

118 messages: list[str] = data.get("message", []) 

119 msg = " ".join(messages) if messages else "-" 

120 

121 # Structured data with meta info 

122 sd = "-" 

123 sd_params: list[str] = [] 

124 source = data.get("source") 

125 if source and isinstance(source, tuple): 

126 source_path, source_linenum = source 

127 sd_params.append(f'code.file.path="{_sd_escape(source_path)}"') 

128 sd_params.append(f'code.line.number="{source_linenum}"') 

129 logger_name = data.get("name") 

130 if logger_name: 

131 sd_params.append(f'code.function.name="{_sd_escape(logger_name)}"') 

132 if data.get("count"): 

133 sd_params.append(f'exception.count="{data["count"]}"') 

134 if data.get("first_occurred"): 

135 sd_params.append(f'exception.first_occurred="{data["first_occurred"]}"') 

136 

137 exception = data.get("exception") 

138 if exception: 

139 sd_params.append(f'exception.stacktrace="{data["exception"]}"') 

140 

141 if sd_params: 

142 sd = f"[opentelemetry {' '.join(sd_params)}]" 

143 

144 # RFC 5424: <PRI>VERSION SP TIMESTAMP SP HOSTNAME SP APP-NAME SP PROCID SP MSGID SP SD [SP MSG] 

145 # VERSION = 1, PROCID = -, MSGID = - 

146 syslog_line = f"<{pri}>1 {timestamp} {self._hostname} {self._app_name} - - {sd} {msg}" 

147 

148 return Message(syslog_line.encode("utf-8", errors="replace")) 

149 

150 async def flush_loop(self) -> None: 

151 """Periodically flush buffered log records.""" 

152 try: 

153 while True: 

154 await asyncio.sleep(BATCH_FLUSH_INTERVAL_SECONDS) 

155 await self.flush() 

156 except asyncio.CancelledError: 

157 raise 

158 

159 async def flush(self) -> None: 

160 """Flush all buffered log records to the syslog endpoint.""" 

161 records: list[Mapping[str, Any]] | None = None 

162 async with self._lock: 

163 if not self._in_progress: 

164 if not self._buffer: 

165 return 

166 records = self._buffer.copy() 

167 self._buffer.clear() 

168 

169 try: 

170 if records: 

171 self._in_progress = [self._to_syslog_message(r) for r in records] 

172 else: 

173 self._in_progress = [m for m in self._in_progress if not m.sent] 

174 

175 if self._protocol == PROTOCOL_UDP: 

176 await self._send_udp(self._in_progress) 

177 else: 

178 await self._send_tcp(self._in_progress) 

179 

180 self._in_progress = [m for m in self._in_progress if not m.sent] 

181 except Exception: 

182 _LOGGER.exception("remote_logger: unexpected error sending syslog messages") 

183 

184 async def _send_udp(self, messages: list[Message]) -> None: 

185 """Send syslog messages over UDP.""" 

186 try: 

187 if self._udp_transport is None or self._udp_transport.is_closing(): 

188 loop = asyncio.get_running_loop() 

189 self._udp_transport, _ = await loop.create_datagram_endpoint( 

190 asyncio.DatagramProtocol, 

191 remote_addr=(self._host, self._port), 

192 ) 

193 for msg in messages: 

194 self._udp_transport.sendto(msg.payload) 

195 msg.sent = True 

196 except OSError as err: 

197 _LOGGER.warning("remote_logger: failed to send syslog via UDP: %s", err) 

198 self._udp_transport = None 

199 

200 async def _send_tcp(self, messages: list[Message]) -> None: 

201 """Send syslog messages over TCP with octet-counting framing (RFC 6587).""" 

202 try: 

203 if self._tcp_writer is None or self._tcp_writer.is_closing(): 

204 await self._connect_tcp() 

205 

206 writer: asyncio.StreamWriter | None = self._tcp_writer 

207 if writer is None: 

208 raise OSError("Unable to create TCP writer") # Set by _connect_tcp above 

209 

210 for msg in messages: 

211 # Octet-counting: "LEN SP MSG" 

212 frame = f"{len(msg.payload)} ".encode("ascii") + msg.payload 

213 writer.write(frame) 

214 await writer.drain() 

215 for msg in messages: 

216 msg.sent = True 

217 except (OSError, ConnectionError) as err: 

218 _LOGGER.warning("remote_logger: failed to send syslog via TCP: %s", err) 

219 await self._close_tcp() 

220 

221 async def _connect_tcp(self) -> None: 

222 """Establish a TCP connection to the syslog server.""" 

223 ssl_ctx: ssl.SSLContext | None = None 

224 if self._use_tls: 

225 ssl_ctx = ssl.create_default_context() 

226 

227 self._tcp_reader, self._tcp_writer = await asyncio.wait_for( 

228 asyncio.open_connection(self._host, self._port, ssl=ssl_ctx), 

229 timeout=10, 

230 ) 

231 

232 async def _close_tcp(self) -> None: 

233 """Close the TCP connection.""" 

234 if self._tcp_writer is not None: 

235 with contextlib.suppress(Exception): 

236 self._tcp_writer.close() 

237 await self._tcp_writer.wait_closed() 

238 self._tcp_writer = None 

239 self._tcp_reader = None 

240 

241 async def close(self) -> None: 

242 """Clean up transport resources.""" 

243 if self._udp_transport is not None: 

244 self._udp_transport.close() 

245 self._udp_transport = None 

246 await self._close_tcp() 

247 

248 

249def _sd_escape(value: str) -> str: 

250 """Escape special characters for RFC 5424 structured data values.""" 

251 return value.replace("\\", "\\\\").replace('"', '\\"').replace("]", "\\]") 

252 

253 

254async def validate(hass: Any, host: str, port: int, protocol: str, use_tls: bool) -> str | None: 

255 """Test connectivity to a syslog endpoint. Returns error key or None.""" 

256 loop = hass.loop 

257 try: 

258 if protocol == PROTOCOL_UDP: 

259 # Quick UDP test: just resolve and create a socket 

260 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 

261 try: 

262 sock.setblocking(False) 

263 await loop.run_in_executor(None, lambda: socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_DGRAM)) 

264 finally: 

265 sock.close() 

266 else: 

267 # TCP: actually connect 

268 ssl_ctx = True if use_tls else None 

269 _, writer = await asyncio.wait_for( 

270 asyncio.open_connection(host, port, ssl=ssl_ctx), 

271 timeout=10, 

272 ) 

273 writer.close() 

274 await writer.wait_closed() 

275 except (OSError, TimeoutError, ConnectionRefusedError) as err: 

276 _LOGGER.error("Syslog connect failed: %s", err) 

277 return "cannot_connect" 

278 except Exception as err: 

279 _LOGGER.error("Syslog connect unknown error: %s", err) 

280 return "unknown" 

281 return None