Coverage for custom_components/remote_logger/syslog/exporter.py: 97%
167 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-02-18 22:41 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-02-18 22:41 +0000
1from __future__ import annotations
3import asyncio
4import contextlib
5import logging
6import socket
7import ssl
8import time
9from dataclasses import dataclass
10from datetime import UTC, datetime
11from typing import TYPE_CHECKING, Any
13from homeassistant.core import Event, HomeAssistant, callback
15from custom_components.remote_logger.const import (
16 BATCH_FLUSH_INTERVAL_SECONDS,
17 CONF_APP_NAME,
18 CONF_BATCH_MAX_SIZE,
19 CONF_FACILITY,
20 CONF_HOST,
21 CONF_PORT,
22 CONF_PROTOCOL,
23 CONF_USE_TLS,
24)
26from .const import (
27 DEFAULT_APP_NAME,
28 DEFAULT_FACILITY,
29 DEFAULT_SYSLOG_SEVERITY,
30 PROTOCOL_UDP,
31 SYSLOG_FACILITY_MAP,
32 SYSLOG_SEVERITY_MAP,
33)
35if TYPE_CHECKING:
36 from collections.abc import Mapping
38 from homeassistant.config_entries import ConfigEntry
40_LOGGER = logging.getLogger(__name__)
43@dataclass
44class Message:
45 payload: bytes
46 sent: bool = False
49class SyslogExporter:
50 """Buffers system_log_event records and flushes them as RFC 5424 syslog messages."""
52 def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
53 self._hass = hass
54 self._buffer: list[Mapping[str, Any]] = []
55 self._in_progress: list[Message] = []
56 self._lock = asyncio.Lock()
58 self._host = entry.data[CONF_HOST]
59 self._port = entry.data[CONF_PORT]
60 self._protocol = entry.data.get(CONF_PROTOCOL, PROTOCOL_UDP)
61 self._use_tls = entry.data.get(CONF_USE_TLS, False)
62 self._app_name = entry.data.get(CONF_APP_NAME, DEFAULT_APP_NAME)
63 facility_name = entry.data.get(CONF_FACILITY, DEFAULT_FACILITY)
64 self._facility = SYSLOG_FACILITY_MAP.get(facility_name, 1)
65 self._batch_max_size = entry.data.get(CONF_BATCH_MAX_SIZE, 10)
66 self._hostname = "-"
68 # TCP connection state (lazily created)
69 self._tcp_reader: asyncio.StreamReader | None = None
70 self._tcp_writer: asyncio.StreamWriter | None = None
72 # UDP transport state (lazily created)
73 self._udp_transport: asyncio.DatagramTransport | None = None
75 self.endpoint_desc = (
76 f"syslog://{self._host}:{self._port} ({self._protocol.upper()}"
77 f"{'+TLS' if self._use_tls and self._protocol != PROTOCOL_UDP else ''})"
78 )
79 _LOGGER.info(f"remote_logger: syslog configured for {self.endpoint_desc}")
81 @callback
82 def handle_event(self, event: Event) -> None:
83 """Receive a system_log_event and buffer it."""
84 if (
85 event.data
86 and event.data.get("source")
87 and len(event.data["source"]) == 2
88 and "custom_components/remote_logger/syslog" in event.data["source"][0]
89 ):
90 # prevent log loops
91 return
92 self._buffer.append(event.data)
93 if len(self._buffer) >= self._batch_max_size:
94 self._hass.async_create_task(self.flush())
96 def _to_syslog_message(self, data: Mapping[str, Any]) -> Message:
97 """Convert a system_log_event payload to an RFC 5424 syslog message."""
98 """
99 "name": str
100 "message": list(str)
101 "level": str
102 "source": (str,int)
103 "timestamp": float
104 "exception": str
105 "count": int
106 "first_occurred": float
107 """
108 level: str = data.get("level", "INFO").upper()
109 severity = SYSLOG_SEVERITY_MAP.get(level, DEFAULT_SYSLOG_SEVERITY)
110 pri = self._facility * 8 + severity
112 # RFC 5424 timestamp
113 timestamp_s: float = data.get("timestamp", time.time())
114 dt = datetime.fromtimestamp(timestamp_s, tz=UTC)
115 timestamp = dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
117 # Message body
118 messages: list[str] = data.get("message", [])
119 msg = " ".join(messages) if messages else "-"
121 # Structured data with meta info
122 sd = "-"
123 sd_params: list[str] = []
124 source = data.get("source")
125 if source and isinstance(source, tuple):
126 source_path, source_linenum = source
127 sd_params.append(f'code.file.path="{_sd_escape(source_path)}"')
128 sd_params.append(f'code.line.number="{source_linenum}"')
129 logger_name = data.get("name")
130 if logger_name:
131 sd_params.append(f'code.function.name="{_sd_escape(logger_name)}"')
132 if data.get("count"):
133 sd_params.append(f'exception.count="{data["count"]}"')
134 if data.get("first_occurred"):
135 sd_params.append(f'exception.first_occurred="{data["first_occurred"]}"')
137 exception = data.get("exception")
138 if exception:
139 sd_params.append(f'exception.stacktrace="{data["exception"]}"')
141 if sd_params:
142 sd = f"[opentelemetry {' '.join(sd_params)}]"
144 # RFC 5424: <PRI>VERSION SP TIMESTAMP SP HOSTNAME SP APP-NAME SP PROCID SP MSGID SP SD [SP MSG]
145 # VERSION = 1, PROCID = -, MSGID = -
146 syslog_line = f"<{pri}>1 {timestamp} {self._hostname} {self._app_name} - - {sd} {msg}"
148 return Message(syslog_line.encode("utf-8", errors="replace"))
150 async def flush_loop(self) -> None:
151 """Periodically flush buffered log records."""
152 try:
153 while True:
154 await asyncio.sleep(BATCH_FLUSH_INTERVAL_SECONDS)
155 await self.flush()
156 except asyncio.CancelledError:
157 raise
159 async def flush(self) -> None:
160 """Flush all buffered log records to the syslog endpoint."""
161 records: list[Mapping[str, Any]] | None = None
162 async with self._lock:
163 if not self._in_progress:
164 if not self._buffer:
165 return
166 records = self._buffer.copy()
167 self._buffer.clear()
169 try:
170 if records:
171 self._in_progress = [self._to_syslog_message(r) for r in records]
172 else:
173 self._in_progress = [m for m in self._in_progress if not m.sent]
175 if self._protocol == PROTOCOL_UDP:
176 await self._send_udp(self._in_progress)
177 else:
178 await self._send_tcp(self._in_progress)
180 self._in_progress = [m for m in self._in_progress if not m.sent]
181 except Exception:
182 _LOGGER.exception("remote_logger: unexpected error sending syslog messages")
184 async def _send_udp(self, messages: list[Message]) -> None:
185 """Send syslog messages over UDP."""
186 try:
187 if self._udp_transport is None or self._udp_transport.is_closing():
188 loop = asyncio.get_running_loop()
189 self._udp_transport, _ = await loop.create_datagram_endpoint(
190 asyncio.DatagramProtocol,
191 remote_addr=(self._host, self._port),
192 )
193 for msg in messages:
194 self._udp_transport.sendto(msg.payload)
195 msg.sent = True
196 except OSError as err:
197 _LOGGER.warning("remote_logger: failed to send syslog via UDP: %s", err)
198 self._udp_transport = None
200 async def _send_tcp(self, messages: list[Message]) -> None:
201 """Send syslog messages over TCP with octet-counting framing (RFC 6587)."""
202 try:
203 if self._tcp_writer is None or self._tcp_writer.is_closing():
204 await self._connect_tcp()
206 writer: asyncio.StreamWriter | None = self._tcp_writer
207 if writer is None:
208 raise OSError("Unable to create TCP writer") # Set by _connect_tcp above
210 for msg in messages:
211 # Octet-counting: "LEN SP MSG"
212 frame = f"{len(msg.payload)} ".encode("ascii") + msg.payload
213 writer.write(frame)
214 await writer.drain()
215 for msg in messages:
216 msg.sent = True
217 except (OSError, ConnectionError) as err:
218 _LOGGER.warning("remote_logger: failed to send syslog via TCP: %s", err)
219 await self._close_tcp()
221 async def _connect_tcp(self) -> None:
222 """Establish a TCP connection to the syslog server."""
223 ssl_ctx: ssl.SSLContext | None = None
224 if self._use_tls:
225 ssl_ctx = ssl.create_default_context()
227 self._tcp_reader, self._tcp_writer = await asyncio.wait_for(
228 asyncio.open_connection(self._host, self._port, ssl=ssl_ctx),
229 timeout=10,
230 )
232 async def _close_tcp(self) -> None:
233 """Close the TCP connection."""
234 if self._tcp_writer is not None:
235 with contextlib.suppress(Exception):
236 self._tcp_writer.close()
237 await self._tcp_writer.wait_closed()
238 self._tcp_writer = None
239 self._tcp_reader = None
241 async def close(self) -> None:
242 """Clean up transport resources."""
243 if self._udp_transport is not None:
244 self._udp_transport.close()
245 self._udp_transport = None
246 await self._close_tcp()
249def _sd_escape(value: str) -> str:
250 """Escape special characters for RFC 5424 structured data values."""
251 return value.replace("\\", "\\\\").replace('"', '\\"').replace("]", "\\]")
254async def validate(hass: Any, host: str, port: int, protocol: str, use_tls: bool) -> str | None:
255 """Test connectivity to a syslog endpoint. Returns error key or None."""
256 loop = hass.loop
257 try:
258 if protocol == PROTOCOL_UDP:
259 # Quick UDP test: just resolve and create a socket
260 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
261 try:
262 sock.setblocking(False)
263 await loop.run_in_executor(None, lambda: socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_DGRAM))
264 finally:
265 sock.close()
266 else:
267 # TCP: actually connect
268 ssl_ctx = True if use_tls else None
269 _, writer = await asyncio.wait_for(
270 asyncio.open_connection(host, port, ssl=ssl_ctx),
271 timeout=10,
272 )
273 writer.close()
274 await writer.wait_closed()
275 except (OSError, TimeoutError, ConnectionRefusedError) as err:
276 _LOGGER.error("Syslog connect failed: %s", err)
277 return "cannot_connect"
278 except Exception as err:
279 _LOGGER.error("Syslog connect unknown error: %s", err)
280 return "unknown"
281 return None