Coverage for custom_components/remote_logger/syslog/exporter.py: 98%
199 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-07 04:46 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-07 04:46 +0000
1from __future__ import annotations
3import asyncio
4import contextlib
5import logging
6import os
7import socket
8import ssl
9import time
10from dataclasses import dataclass
11from typing import TYPE_CHECKING, Any, cast
13from homeassistant.const import CONF_HOST, CONF_PORT, CONF_PROTOCOL
14from homeassistant.core import Event, HomeAssistant, callback
16from custom_components.remote_logger.const import (
17 CONF_APP_NAME,
18 CONF_BATCH_MAX_SIZE,
19 CONF_CLIENT_TIMEOUT,
20 CONF_FACILITY,
21 CONF_USE_TLS,
22 DEFAULT_CLIENT_TIMEOUT,
23 EVENT_SYSTEM_LOG,
24)
25from custom_components.remote_logger.exporter import LogExporter, LogMessage, LogSubmission
26from custom_components.remote_logger.helpers import flatten_event_data, isotimestamp
28from .const import (
29 DEFAULT_APP_NAME,
30 DEFAULT_FACILITY,
31 DEFAULT_SYSLOG_SEVERITY,
32 PROTOCOL_UDP,
33 SYSLOG_FACILITY_MAP,
34 SYSLOG_SEVERITY_MAP,
35)
37if TYPE_CHECKING:
38 from homeassistant.config_entries import ConfigEntry
40_LOGGER = logging.getLogger(__name__)
43@dataclass
44class SyslogMessage(LogMessage):
45 payload: bytes
48class SyslogSubmission(LogSubmission):
49 def __init__(self, records: list[SyslogMessage], protocol: str) -> None:
50 self.records: list[SyslogMessage] = records
51 self.protocol: str = protocol
53 def for_display(self) -> dict[str, Any]:
54 return {
55 "protocol": self.protocol,
56 "data": cast("str", os.linesep).join(r.payload.decode("utf-8") for r in self.records),
57 }
60class SyslogExporter(LogExporter):
61 """Buffers system_log_event records and flushes them as RFC 5424 syslog messages."""
63 logger_type = "syslog"
65 def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
66 super().__init__(hass)
67 self.name = entry.title
68 self._in_progress: list[SyslogMessage] = []
69 self._lock = asyncio.Lock()
71 opts = {**entry.data, **entry.options}
73 self._host = opts[CONF_HOST]
74 self._port = opts[CONF_PORT]
75 self._protocol = opts.get(CONF_PROTOCOL, PROTOCOL_UDP)
76 self.destination = (self._host, str(self._port), self._protocol)
77 self._use_tls = opts.get(CONF_USE_TLS, False)
78 self._app_name = opts.get(CONF_APP_NAME, DEFAULT_APP_NAME)
79 facility_name = opts.get(CONF_FACILITY, DEFAULT_FACILITY)
80 self._facility = SYSLOG_FACILITY_MAP.get(facility_name, 1)
81 self._batch_max_size = opts.get(CONF_BATCH_MAX_SIZE, 10)
82 self._client_timeout = opts.get(CONF_CLIENT_TIMEOUT, DEFAULT_CLIENT_TIMEOUT)
83 self._hostname = "-"
85 # TCP connection state (lazily created)
86 self._tcp_reader: asyncio.StreamReader | None = None
87 self._tcp_writer: asyncio.StreamWriter | None = None
89 # UDP transport state (lazily created)
90 self._udp_transport: asyncio.DatagramTransport | None = None
92 self.endpoint_desc = (
93 f"syslog://{self._host}:{self._port} ({self._protocol.upper()}"
94 f"{'+TLS' if self._use_tls and self._protocol != PROTOCOL_UDP else ''})"
95 )
96 _LOGGER.info(f"remote_logger: syslog configured for {self.endpoint_desc}")
98 @callback
99 def handle_event(self, event: Event) -> None:
100 """Receive a system_log_event and buffer it."""
101 self.on_event()
102 if (
103 event.data
104 and event.data.get("source")
105 and len(event.data["source"]) == 2
106 and "custom_components/remote_logger/syslog" in event.data["source"][0]
107 ):
108 # prevent log loops
109 return
110 self._buffer.append(self._to_log_record(event))
111 if len(self._buffer) >= self._batch_max_size:
112 self._hass.async_create_task(self.flush())
114 def _to_log_record(
115 self,
116 event: Event,
117 message_override: list[str] | None = None,
118 level_override: str | None = None,
119 state_only: bool = False,
120 ) -> SyslogMessage:
121 """Convert a system_log_event payload to an RFC 5424 syslog message."""
122 """
123 "name": str
124 "message": list(str)
125 "level": str
126 "source": (str,int)
127 "timestamp": float
128 "exception": str
129 "count": int
130 "first_occurred": float
131 """
132 data = event.data
133 level: str = level_override or data.get("level", "INFO").upper()
134 severity = SYSLOG_SEVERITY_MAP.get(level, DEFAULT_SYSLOG_SEVERITY)
135 pri = self._facility * 8 + severity
137 # RFC 3339 timestamp
138 timestamp_s: float = data.get("timestamp", time.time())
139 timestamp = isotimestamp(timestamp_s)
141 # Message body
142 messages: list[str] = message_override or data.get("message", [])
143 msg = " ".join(messages) if messages else "-"
145 # Structured data with meta info
146 sd = "-"
147 sd_params: list[str] = []
148 if event.event_type == EVENT_SYSTEM_LOG:
149 source = data.get("source")
150 if source and isinstance(source, tuple):
151 source_path, source_linenum = source
152 sd_params.append(f'code.file.path="{_sd_escape(source_path)}"')
153 sd_params.append(f'code.line.number="{source_linenum}"')
154 logger_name = data.get("name")
155 if logger_name:
156 sd_params.append(f'code.function.name="{_sd_escape(logger_name)}"')
157 if data.get("count"):
158 sd_params.append(f'exception.count="{data["count"]}"')
159 if data.get("first_occurred"):
160 sd_params.append(f'exception.first_occurred="{isotimestamp(data["first_occurred"])}"')
162 exception = data.get("exception")
163 if exception:
164 sd_params.append(f'exception.stacktrace="{data["exception"]}"')
165 msgid: str = "-"
166 else:
167 sd_params.append(f"eventName={event.event_type}")
168 # Use HA event type as MSGID for non-system-log events; "-" otherwise
169 msgid = event.event_type or "-"
170 for k, v in data.items():
171 for flat_key, flat_val in flatten_event_data(f"event.data.{k}" if k != "event.data" else k, v, state_only):
172 sd_params.append(f'{_sd_escape(flat_key)}="{_sd_escape(str(flat_val))}"')
174 if sd_params:
175 sd = f"[opentelemetry {' '.join(sd_params)}]"
177 # RFC 5424: <PRI>VERSION SP TIMESTAMP SP HOSTNAME SP APP-NAME SP PROCID SP MSGID SP SD [SP MSG]
178 # VERSION = 1, PROCID = -
179 syslog_line = f"<{pri}>1 {timestamp} {self._hostname} {self._app_name} - {msgid} {sd} {msg}"
181 return SyslogMessage(payload=syslog_line.encode("utf-8", errors="replace"))
183 def log_direct(self, event_name: str, message: str, level: str, attributes: dict[str, Any] | None = None) -> None:
184 """Buffer a custom syslog record without requiring a HA Event."""
185 severity = SYSLOG_SEVERITY_MAP.get(level.upper(), DEFAULT_SYSLOG_SEVERITY)
186 pri = self._facility * 8 + severity
187 timestamp = isotimestamp(time.time())
188 sd = "-"
189 if attributes:
190 sd_params = [f'{_sd_escape(k)}="{_sd_escape(str(v))}"' for k, v in attributes.items()]
191 sd = f"[opentelemetry {' '.join(sd_params)}]"
192 syslog_line = f"<{pri}>1 {timestamp} {self._hostname} {self._app_name} - {event_name} {sd} {message}"
193 record = SyslogMessage(payload=syslog_line.encode("utf-8", errors="replace"))
194 self._buffer.append(record)
195 self.on_event()
196 if len(self._buffer) >= self._batch_max_size:
197 self._hass.async_create_task(self.flush())
199 async def flush(self) -> None:
200 """Flush all buffered log records to the syslog endpoint."""
201 records: list[SyslogMessage] | None = None
202 async with self._lock:
203 if not self._in_progress:
204 if not self._buffer:
205 return
206 records = cast("list[SyslogMessage]", self._buffer.copy())
207 self._buffer.clear()
209 try:
210 if records:
211 self._in_progress = records
212 else:
213 self._in_progress = [m for m in self._in_progress if not m.sent]
215 if self._protocol == PROTOCOL_UDP:
216 await self._send_udp(self._in_progress)
217 else:
218 await self._send_tcp(self._in_progress)
219 sent: list[SyslogMessage] = [m for m in self._in_progress if m.sent]
220 if sent:
221 self.last_sent_payload = SyslogSubmission(sent, protocol=self._protocol)
222 self.on_success()
223 self._in_progress = [m for m in self._in_progress if not m.sent]
224 except Exception as e:
225 _LOGGER.exception("remote_logger: unexpected error sending syslog messages")
226 self.on_posting_error(str(e))
228 async def _send_udp(self, messages: list[SyslogMessage]) -> None:
229 """Send syslog messages over UDP."""
230 try:
231 if self._udp_transport is None or self._udp_transport.is_closing():
232 loop = asyncio.get_running_loop()
233 self._udp_transport, _ = await loop.create_datagram_endpoint(
234 asyncio.DatagramProtocol,
235 remote_addr=(self._host, self._port),
236 )
237 for msg in messages:
238 self._udp_transport.sendto(msg.payload)
239 msg.sent = True
240 except OSError as err:
241 _LOGGER.warning("remote_logger: failed to send syslog via UDP: %s", err)
242 self._udp_transport = None
244 async def _send_tcp(self, messages: list[SyslogMessage]) -> None:
245 """Send syslog messages over TCP with octet-counting framing (RFC 6587)."""
246 try:
247 if self._tcp_writer is None or self._tcp_writer.is_closing():
248 await self._connect_tcp()
250 writer: asyncio.StreamWriter | None = self._tcp_writer
251 if writer is None:
252 raise OSError("Unable to create TCP writer") # Set by _connect_tcp above
254 for msg in messages:
255 # Octet-counting: "LEN SP MSG"
256 frame = f"{len(msg.payload)} ".encode("ascii") + msg.payload
257 writer.write(frame)
258 await writer.drain()
259 for msg in messages:
260 msg.sent = True
261 except (OSError, ConnectionError) as err:
262 _LOGGER.warning("remote_logger: failed to send syslog via TCP: %s", err)
263 await self._close_tcp()
265 async def _connect_tcp(self) -> None:
266 """Establish a TCP connection to the syslog server."""
267 ssl_ctx: ssl.SSLContext | None = None
268 if self._use_tls:
269 ssl_ctx = ssl.create_default_context()
271 self._tcp_reader, self._tcp_writer = await asyncio.wait_for(
272 asyncio.open_connection(self._host, self._port, ssl=ssl_ctx),
273 timeout=self._client_timeout,
274 )
276 async def _close_tcp(self) -> None:
277 """Close the TCP connection."""
278 if self._tcp_writer is not None:
279 with contextlib.suppress(Exception):
280 self._tcp_writer.close()
281 await self._tcp_writer.wait_closed()
282 self._tcp_writer = None
283 self._tcp_reader = None
285 async def close(self) -> None:
286 """Clean up transport resources."""
287 if self._udp_transport is not None:
288 self._udp_transport.close()
289 self._udp_transport = None
290 await self._close_tcp()
293def _sd_escape(value: str) -> str:
294 """Escape special characters for RFC 5424 structured data values."""
295 return value.replace("\\", "\\\\").replace('"', '\\"').replace("]", "\\]")
298async def validate(hass: Any, host: str, port: int, protocol: str, use_tls: bool) -> str | None:
299 """Test connectivity to a syslog endpoint. Returns error key or None."""
300 loop = hass.loop
301 try:
302 if protocol == PROTOCOL_UDP:
303 # Quick UDP test: just resolve and create a socket
304 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
305 try:
306 sock.setblocking(False)
307 await loop.run_in_executor(None, lambda: socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_DGRAM))
308 finally:
309 sock.close()
310 else:
311 # TCP: actually connect
312 ssl_ctx = True if use_tls else None
313 _, writer = await asyncio.wait_for(
314 asyncio.open_connection(host, port, ssl=ssl_ctx),
315 timeout=10,
316 )
317 writer.close()
318 await writer.wait_closed()
319 except (OSError, TimeoutError, ConnectionRefusedError) as err:
320 _LOGGER.error("Syslog connect failed: %s", err)
321 return "cannot_connect"
322 except Exception as err:
323 _LOGGER.error("Syslog connect unknown error: %s", err)
324 return "unknown"
325 return None