Coverage for custom_components/remote_logger/otel/exporter.py: 96%
179 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-02-18 22:41 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-02-18 22:41 +0000
1from __future__ import annotations
3import asyncio
4import json
5import logging
6import time
7from typing import TYPE_CHECKING, Any
9import aiohttp
10from homeassistant.const import __version__ as hass_version
11from homeassistant.core import Event, HomeAssistant, callback
12from homeassistant.helpers.aiohttp_client import async_get_clientsession
14from custom_components.remote_logger.const import (
15 BATCH_FLUSH_INTERVAL_SECONDS,
16 CONF_BATCH_MAX_SIZE,
17 CONF_ENCODING,
18 CONF_HOST,
19 CONF_PORT,
20 CONF_RESOURCE_ATTRIBUTES,
21 CONF_USE_TLS,
22)
24from .const import (
25 DEFAULT_RESOURCE_ATTRIBUTES,
26 DEFAULT_SERVICE_NAME,
27 DEFAULT_SEVERITY,
28 ENCODING_JSON,
29 ENCODING_PROTOBUF,
30 OTLP_LOGS_PATH,
31 SCOPE_NAME,
32 SCOPE_VERSION,
33 SEVERITY_MAP,
34)
35from .protobuf_encoder import encode_export_logs_request
37if TYPE_CHECKING:
38 from homeassistant.config_entries import ConfigEntry
40_LOGGER = logging.getLogger(__name__)
43def parse_resource_attributes(raw: str) -> list[tuple[str, str]]:
44 """Parse 'key1=val1,key2=val2' into a list of (key, value) tuples.
46 Raises ValueError if the format is invalid.
47 """
48 result = []
49 for pair in raw.split(","):
50 pair = pair.strip()
51 if not pair:
52 continue
53 if "=" not in pair:
54 raise ValueError(f"Invalid attribute pair: {pair!r}")
55 key, _, value = pair.partition("=")
56 key = key.strip()
57 value = value.strip()
58 if not key:
59 raise ValueError("Attribute key cannot be empty")
60 result.append((key, value))
61 return result
64def _kv(key: str, value: Any) -> dict[str, Any]:
65 """Build an OTLP KeyValue attribute"""
66 if isinstance(value, str):
67 return {"key": key, "value": {"string_value": value}}
68 if isinstance(value, bool):
69 return {"key": key, "value": {"bool_value": value}}
70 if isinstance(value, int):
71 return {"key": key, "value": {"int_value": value}}
72 if isinstance(value, float):
73 return {"key": key, "value": {"float_value": value}}
74 if isinstance(value, bytes):
75 return {"key": key, "value": {"byte_value": value}}
76 return {"key": key, "value": {"string_value": str(value)}}
79async def validate(session: aiohttp.ClientSession, url: str, encoding: str) -> dict[str, str]:
80 # Validate connectivity
81 errors: dict[str, str] = {}
82 if encoding == ENCODING_PROTOBUF:
83 data: bytes = encode_export_logs_request({"resourceLogs": []})
84 content_type = "application/x-protobuf"
85 elif encoding == ENCODING_JSON:
86 data = json.dumps({"resourceLogs": []}).encode("utf-8")
87 content_type = "application/json"
88 else:
89 raise ValueError(f"Unknown encoding {encoding}")
90 try:
91 async with session.post(
92 url,
93 data=data,
94 headers={"Content-Type": content_type},
95 timeout=aiohttp.ClientTimeout(total=10),
96 ) as resp:
97 if resp.status >= 400 and resp.status < 500:
98 errors["base"] = "cannot_connect"
99 _LOGGER.error("OTEL-LOGS client connect failed (%s): %s", resp.status, await resp.text())
100 if resp.status >= 500:
101 errors["base"] = "cannot_connect"
102 _LOGGER.error("OTEL-LOGS server connect failed (%s): %s", resp.status, await resp.text())
103 except aiohttp.ClientError as e1:
104 errors["base"] = "cannot_connect"
105 _LOGGER.error("OTEL-LOGS connect client error: %s", e1)
106 except Exception as e2:
107 errors["base"] = "unknown"
108 _LOGGER.error("OTEL-LOGS connect unknown error: %s", e2)
109 return errors
112class OtlpLogExporter:
113 """Buffers system_log_event records and flushes them as OTLP/HTTP JSON."""
115 def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
116 self._hass = hass
118 self._buffer: list[dict[str, Any]] = []
119 self._in_progress: dict[str, Any] | None = None
120 self._lock = asyncio.Lock()
121 if hass and hass.config and hass.config.api:
122 self.server_address = hass.config.api.local_ip
123 self.server_port = hass.config.api.port
124 else:
125 self.server_address = None
126 self.server_port = None
128 host = entry.data[CONF_HOST]
129 port = entry.data[CONF_PORT]
130 use_tls = entry.data[CONF_USE_TLS]
131 scheme = "https" if use_tls else "http"
132 self.endpoint_url = f"{scheme}://{host}:{port}{OTLP_LOGS_PATH}"
133 self._use_tls = use_tls
134 self._use_protobuf = entry.data.get(CONF_ENCODING) == ENCODING_PROTOBUF
135 self._batch_max_size = entry.data.get(CONF_BATCH_MAX_SIZE, 100)
137 self._resource = self._build_resource(entry)
139 _LOGGER.info(f"remote_logger: otel configured for {self.endpoint_url}, protobuf={self._use_protobuf}")
141 def _build_resource(self, entry: ConfigEntry) -> dict[str, Any]:
142 """Build the OTLP Resource object with attributes."""
143 attrs: list[dict[str, Any]] = [
144 _kv("service.name", DEFAULT_SERVICE_NAME),
145 _kv("service.version", hass_version or "unknown"),
146 ]
147 if self.server_address:
148 attrs.append(_kv("service.address", self.server_address))
149 if self.server_port:
150 attrs.append(_kv("service.port", self.server_port))
152 raw = entry.data.get(CONF_RESOURCE_ATTRIBUTES, DEFAULT_RESOURCE_ATTRIBUTES)
153 if raw and raw.strip():
154 for key, value in parse_resource_attributes(raw):
155 attrs.append(_kv(key, value))
157 return {"attributes": attrs}
159 @callback
160 def handle_event(self, event: Event) -> None:
161 """Receive a system_log_event and buffer an OTLP logRecord."""
162 if (
163 event.data
164 and event.data.get("source")
165 and len(event.data["source"]) == 2
166 and "custom_components/remote_logger/otel" in event.data["source"][0]
167 ):
168 # prevent log loops
169 return
170 try:
171 record = self._to_log_record(event.data)
172 self._buffer.append(record)
174 if len(self._buffer) >= self._batch_max_size:
175 self._hass.async_create_task(self.flush())
176 except Exception as e:
177 _LOGGER.error("remote_logger: otel handler failure %s on %s", e, event.data)
179 def _to_log_record(self, data: Any) -> dict[str, Any]:
180 """Convert a system_log_event payload to an OTLP logRecord dict."""
181 """
182 "name": str
183 "message": list(str)
184 "level": str
185 "source": (str,int)
186 "timestamp": float
187 "exception": str
188 "count": int
189 "first_occurred": float
190 """
191 timestamp_s: float = data.get("timestamp", time.time())
192 time_unix_nano = str(int(timestamp_s * 1_000_000_000))
193 observed_time_unix_nano = str(int(time.time() * 1_000_000_000))
195 level: str = data.get("level", "INFO").upper()
196 severity_number, severity_text = SEVERITY_MAP.get(level, DEFAULT_SEVERITY)
198 messages: list[str] = data.get("message", [])
199 message: str = "\n".join(messages)
201 attributes: list[dict[str, Any]] = []
202 source = data.get("source")
203 if source and isinstance(source, tuple):
204 source_path, source_lineno = source
205 attributes.append(_kv("code.file.path", source_path))
206 attributes.append(_kv("code.line.number", source_lineno))
207 logger_name = data.get("name")
208 if data.get("count"):
209 attributes.append(_kv("exception.count", data["count"]))
210 if data.get("first_occurred"):
211 attributes.append(_kv("exception.first_occurred", data["first_occurred"]))
212 if logger_name:
213 attributes.append(_kv("code.function.name", logger_name))
214 exception = data.get("exception")
215 if exception:
216 attributes.append(_kv("exception.stacktrace", exception))
218 return {
219 "timeUnixNano": time_unix_nano,
220 "observedTimeUnixNano": observed_time_unix_nano,
221 "severityNumber": severity_number,
222 "severityText": severity_text,
223 "body": {"string_value": message},
224 "attributes": attributes,
225 }
227 async def flush_loop(self) -> None:
228 """Periodically flush buffered log records."""
229 try:
230 while True:
231 await asyncio.sleep(BATCH_FLUSH_INTERVAL_SECONDS)
232 await self.flush()
233 except asyncio.CancelledError:
234 raise
236 def generate_submission(self, records: list[dict[str, Any]]) -> dict[str, Any]:
237 result: dict[str, str | bytes | dict[str, Any]] = {"headers": {}}
238 request = self._build_export_request(records)
240 if self._use_protobuf:
241 result["data"] = encode_export_logs_request(request)
242 result["headers"]["Content-Type"] = "application/x-protobuf" # type: ignore
243 else:
244 result["json"] = request
245 result["headers"]["Content-Type"] = "application/json" # type: ignore
246 return result
248 async def flush(self) -> None:
249 """Flush all buffered log records to the OTLP endpoint."""
250 records: list[dict[str, Any]] | None = None
251 async with self._lock:
252 if not self._in_progress:
253 if not self._buffer:
254 return
255 records = self._buffer.copy()
256 self._buffer.clear()
258 try:
259 if records and not self._in_progress:
260 msg: dict[str, Any] = self.generate_submission(records)
261 elif self._in_progress:
262 msg = self._in_progress
263 else:
264 return
265 session: aiohttp.ClientSession = async_get_clientsession(self._hass, verify_ssl=self._use_tls)
266 async with session.post(self.endpoint_url, timeout=aiohttp.ClientTimeout(total=10), **msg) as resp:
267 if resp.status >= 400:
268 body = await resp.text()
269 _LOGGER.warning(
270 "remote_logger: OTLP endpoint returned HTTP %s: %s",
271 resp.status,
272 body[:200],
273 )
274 if resp.ok or (resp.status >= 400 and resp.status < 500):
275 # records were sent, or there was a client-side error
276 self._in_progress = None
278 except aiohttp.ClientError as err:
279 _LOGGER.warning("remote_logger: failed to send logs: %s", err)
280 except Exception:
281 _LOGGER.exception("remote_logger: unexpected error sending logs, skipping records")
282 self._in_progress = None
284 async def close(self) -> None:
285 """Clean up resources (no-op for HTTP-based exporter)."""
287 def _build_export_request(self, records: list[dict[str, Any]]) -> dict[str, Any]:
288 """Wrap logRecords in the ExportLogsServiceRequest envelope."""
289 return {
290 "resourceLogs": [
291 {
292 "resource": self._resource,
293 "scopeLogs": [
294 {
295 "scope": {
296 "name": SCOPE_NAME,
297 "version": SCOPE_VERSION,
298 },
299 "logRecords": records,
300 }
301 ],
302 }
303 ],
304 }