Coverage for custom_components / remote_logger / syslog / exporter.py: 98%
196 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-22 21:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-22 21:55 +0000
1from __future__ import annotations
3import asyncio
4import contextlib
5import logging
6import os
7import socket
8import ssl
9import time
10from dataclasses import dataclass
11from typing import TYPE_CHECKING, Any, cast
13from homeassistant.components.system_log import EVENT_SYSTEM_LOG
14from homeassistant.const import CONF_HOST, CONF_PORT, CONF_PROTOCOL
16from custom_components.remote_logger.const import (
17 CONF_APP_NAME,
18 CONF_BATCH_MAX_SIZE,
19 CONF_CLIENT_TIMEOUT,
20 CONF_FACILITY,
21 CONF_SUPPRESS_SYSTEM_LOG_EVENT_NAME,
22 CONF_USE_TLS,
23 DEFAULT_CLIENT_TIMEOUT,
24)
25from custom_components.remote_logger.exporter import LogExporter, LogMessage, LogSubmission
26from custom_components.remote_logger.helpers import flatten_event_data, isotimestamp
28from .const import (
29 DEFAULT_APP_NAME,
30 DEFAULT_FACILITY,
31 DEFAULT_SYSLOG_SEVERITY,
32 PROTOCOL_UDP,
33 SYSLOG_FACILITY_MAP,
34 SYSLOG_SEVERITY_MAP,
35)
37if TYPE_CHECKING:
38 import datetime as dt
39 from collections.abc import Mapping
41 from homeassistant.config_entries import ConfigEntry
42 from homeassistant.core import HomeAssistant
44_LOGGER = logging.getLogger(__name__)
47@dataclass
48class SyslogMessage(LogMessage):
49 payload: bytes
52class SyslogSubmission(LogSubmission):
53 def __init__(self, records: list[SyslogMessage], protocol: str) -> None:
54 self.records: list[SyslogMessage] = records
55 self.protocol: str = protocol
57 def for_display(self) -> dict[str, Any]:
58 return {
59 "protocol": self.protocol,
60 "data": cast("str", os.linesep).join(r.payload.decode("utf-8") for r in self.records),
61 }
64class SyslogExporter(LogExporter):
65 """Buffers system_log_event records and flushes them as RFC 5424 syslog messages."""
67 logger_type = "syslog"
69 def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
70 super().__init__(hass)
71 self.name = entry.title
72 self._in_progress: list[SyslogMessage] = []
73 self._lock = asyncio.Lock()
75 opts = {**entry.data, **entry.options}
77 self._host = opts[CONF_HOST]
78 self._port = opts[CONF_PORT]
79 self._protocol = opts.get(CONF_PROTOCOL, PROTOCOL_UDP)
80 self.destination = (self._host, str(self._port), self._protocol)
81 self._use_tls = opts.get(CONF_USE_TLS, False)
82 self._app_name = opts.get(CONF_APP_NAME, DEFAULT_APP_NAME)
83 facility_name = opts.get(CONF_FACILITY, DEFAULT_FACILITY)
84 self._facility = SYSLOG_FACILITY_MAP.get(facility_name, 1)
85 self._batch_max_size = opts.get(CONF_BATCH_MAX_SIZE, 10)
86 self._client_timeout = opts.get(CONF_CLIENT_TIMEOUT, DEFAULT_CLIENT_TIMEOUT)
87 self._suppress_system_log_event_name = opts.get(CONF_SUPPRESS_SYSTEM_LOG_EVENT_NAME, True)
88 self._hostname = "-"
90 # TCP connection state (lazily created)
91 self._tcp_reader: asyncio.StreamReader | None = None
92 self._tcp_writer: asyncio.StreamWriter | None = None
94 # UDP transport state (lazily created)
95 self._udp_transport: asyncio.DatagramTransport | None = None
97 self.endpoint_desc = (
98 f"syslog://{self._host}:{self._port} ({self._protocol.upper()}"
99 f"{'+TLS' if self._use_tls and self._protocol != PROTOCOL_UDP else ''})"
100 )
101 _LOGGER.info(f"remote_logger: syslog configured for {self.endpoint_desc}")
103 def create_log_record(
104 self,
105 event_data: Mapping[str, Any],
106 event_type: str | None = None,
107 time_fired: dt.datetime | None = None,
108 message_override: list[str] | None = None,
109 level_override: str | None = None,
110 state_only: bool = False,
111 ) -> SyslogMessage:
112 """Convert a system_log_event payload to an RFC 5424 syslog message."""
113 """
114 "name": str
115 "message": list(str)
116 "level": str
117 "source": (str,int)
118 "timestamp": float
119 "exception": str
120 "count": int
121 "first_occurred": float
122 """
123 data = event_data
124 level: str = level_override or data.get("level", "INFO").upper()
125 severity = SYSLOG_SEVERITY_MAP.get(level, DEFAULT_SYSLOG_SEVERITY)
126 pri = self._facility * 8 + severity
128 # RFC 3339 timestamp
129 timestamp_s: float = data.get("timestamp", time_fired.timestamp() if time_fired else time.time())
130 timestamp = isotimestamp(timestamp_s)
132 # Message body
133 messages: list[str] = message_override or data.get("message", [])
134 msg = " ".join(messages) if messages else "-"
136 # Structured data with meta info
137 sd = "-"
138 sd_params: list[str] = []
139 if event_type == EVENT_SYSTEM_LOG:
140 source = data.get("source")
141 if source and isinstance(source, tuple):
142 source_path, source_linenum = source
143 sd_params.append(f'code.file.path="{_sd_escape(source_path)}"')
144 sd_params.append(f'code.line.number="{source_linenum}"')
145 logger_name = data.get("name")
146 if logger_name:
147 sd_params.append(f'code.function.name="{_sd_escape(logger_name)}"')
148 if data.get("count"):
149 sd_params.append(f'exception.count="{data["count"]}"')
150 if data.get("first_occurred"):
151 sd_params.append(f'exception.first_occurred="{isotimestamp(data["first_occurred"])}"')
153 exception = data.get("exception")
154 if exception:
155 sd_params.append(f'exception.stacktrace="{data["exception"]}"')
156 if not self._suppress_system_log_event_name:
157 sd_params.append(f'eventName="{EVENT_SYSTEM_LOG}"')
158 msgid: str = EVENT_SYSTEM_LOG
159 else:
160 msgid = "-"
161 else:
162 if event_type is not None:
163 sd_params.append(f"eventName={event_type}")
164 # Use HA event type as MSGID for non-system-log events; "-" otherwise
165 msgid = event_type or "-"
166 for k, v in data.items():
167 for flat_key, flat_val in flatten_event_data(f"event.data.{k}" if k != "event.data" else k, v, state_only):
168 sd_params.append(f'{_sd_escape(flat_key)}="{_sd_escape(str(flat_val))}"')
170 if sd_params:
171 sd = f"[opentelemetry {' '.join(sd_params)}]"
173 # RFC 5424: <PRI>VERSION SP TIMESTAMP SP HOSTNAME SP APP-NAME SP PROCID SP MSGID SP SD [SP MSG]
174 # VERSION = 1, PROCID = -
175 syslog_line = f"<{pri}>1 {timestamp} {self._hostname} {self._app_name} - {msgid} {sd} {msg}"
177 return SyslogMessage(payload=syslog_line.encode("utf-8", errors="replace"))
179 def log_direct(self, event_name: str, message: str, level: str, attributes: dict[str, Any] | None = None) -> None:
180 """Buffer a custom syslog record without requiring a HA Event."""
181 severity = SYSLOG_SEVERITY_MAP.get(level.upper(), DEFAULT_SYSLOG_SEVERITY)
182 pri = self._facility * 8 + severity
183 timestamp = isotimestamp(time.time())
184 sd = "-"
185 if attributes:
186 sd_params = [f'{_sd_escape(k)}="{_sd_escape(str(v))}"' for k, v in attributes.items()]
187 sd = f"[opentelemetry {' '.join(sd_params)}]"
188 syslog_line = f"<{pri}>1 {timestamp} {self._hostname} {self._app_name} - {event_name} {sd} {message}"
189 record = SyslogMessage(payload=syslog_line.encode("utf-8", errors="replace"))
190 self._buffer.append(record)
191 self.on_event()
192 if len(self._buffer) >= self._batch_max_size:
193 self._hass.async_create_task(self.flush())
195 async def flush(self) -> None:
196 """Flush all buffered log records to the syslog endpoint."""
197 records: list[SyslogMessage] | None = None
198 async with self._lock:
199 if not self._in_progress:
200 if not self._buffer:
201 return
202 records = cast("list[SyslogMessage]", self._buffer.copy())
203 self._buffer.clear()
205 try:
206 if records:
207 self._in_progress = records
208 else:
209 self._in_progress = [m for m in self._in_progress if not m.sent]
211 if self._protocol == PROTOCOL_UDP:
212 await self._send_udp(self._in_progress)
213 else:
214 await self._send_tcp(self._in_progress)
215 sent: list[SyslogMessage] = [m for m in self._in_progress if m.sent]
216 if sent:
217 self.last_sent_payload = SyslogSubmission(sent, protocol=self._protocol)
218 self.on_success()
219 self._in_progress = [m for m in self._in_progress if not m.sent]
220 except Exception as e:
221 _LOGGER.exception("remote_logger: unexpected error sending syslog messages")
222 self.on_posting_error(str(e))
224 async def _send_udp(self, messages: list[SyslogMessage]) -> None:
225 """Send syslog messages over UDP."""
226 try:
227 if self._udp_transport is None or self._udp_transport.is_closing():
228 loop = asyncio.get_running_loop()
229 self._udp_transport, _ = await loop.create_datagram_endpoint(
230 asyncio.DatagramProtocol,
231 remote_addr=(self._host, self._port),
232 )
233 for msg in messages:
234 self._udp_transport.sendto(msg.payload)
235 msg.sent = True
236 except OSError as err:
237 _LOGGER.warning("remote_logger: failed to send syslog via UDP: %s", err)
238 self._udp_transport = None
240 async def _send_tcp(self, messages: list[SyslogMessage]) -> None:
241 """Send syslog messages over TCP with octet-counting framing (RFC 6587)."""
242 try:
243 if self._tcp_writer is None or self._tcp_writer.is_closing():
244 await self._connect_tcp()
246 writer: asyncio.StreamWriter | None = self._tcp_writer
247 if writer is None:
248 raise OSError("Unable to create TCP writer") # Set by _connect_tcp above
250 for msg in messages:
251 # Octet-counting: "LEN SP MSG"
252 frame = f"{len(msg.payload)} ".encode("ascii") + msg.payload
253 writer.write(frame)
254 await writer.drain()
255 for msg in messages:
256 msg.sent = True
257 except (OSError, ConnectionError) as err:
258 _LOGGER.warning("remote_logger: failed to send syslog via TCP: %s", err)
259 await self._close_tcp()
261 async def _connect_tcp(self) -> None:
262 """Establish a TCP connection to the syslog server."""
263 ssl_ctx: ssl.SSLContext | None = None
264 if self._use_tls:
265 ssl_ctx = ssl.create_default_context()
267 self._tcp_reader, self._tcp_writer = await asyncio.wait_for(
268 asyncio.open_connection(self._host, self._port, ssl=ssl_ctx),
269 timeout=self._client_timeout,
270 )
272 async def _close_tcp(self) -> None:
273 """Close the TCP connection."""
274 if self._tcp_writer is not None:
275 with contextlib.suppress(Exception):
276 self._tcp_writer.close()
277 await self._tcp_writer.wait_closed()
278 self._tcp_writer = None
279 self._tcp_reader = None
281 async def close(self) -> None:
282 """Clean up transport resources."""
283 if self._udp_transport is not None:
284 self._udp_transport.close()
285 self._udp_transport = None
286 await self._close_tcp()
289def _sd_escape(value: str) -> str:
290 """Escape special characters for RFC 5424 structured data values."""
291 return value.replace("\\", "\\\\").replace('"', '\\"').replace("]", "\\]")
294async def validate(hass: Any, host: str, port: int, protocol: str, use_tls: bool) -> str | None:
295 """Test connectivity to a syslog endpoint. Returns error key or None."""
296 loop = hass.loop
297 try:
298 if protocol == PROTOCOL_UDP:
299 # Quick UDP test: just resolve and create a socket
300 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
301 try:
302 sock.setblocking(False)
303 await loop.run_in_executor(None, lambda: socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_DGRAM))
304 finally:
305 sock.close()
306 else:
307 # TCP: actually connect
308 ssl_ctx = True if use_tls else None
309 _, writer = await asyncio.wait_for(
310 asyncio.open_connection(host, port, ssl=ssl_ctx),
311 timeout=10,
312 )
313 writer.close()
314 await writer.wait_closed()
315 except (OSError, TimeoutError, ConnectionRefusedError) as err:
316 _LOGGER.error("remote_logger: Syslog connect failed: %s", err)
317 return "cannot_connect"
318 except Exception as err:
319 _LOGGER.error("remote_logger: Syslog connect unknown error: %s", err)
320 return "unknown"
321 return None