Coverage for custom_components/remote_logger/otel/exporter.py: 91%
278 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-07 04:46 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-07 04:46 +0000
1from __future__ import annotations
3import asyncio
4import base64
5import json
6import logging
7import time
8from abc import abstractmethod
9from dataclasses import dataclass
10from typing import TYPE_CHECKING, Any, cast
12import aiohttp
13from homeassistant.const import CONF_HEADERS, CONF_HOST, CONF_PATH, CONF_PORT, CONF_TOKEN
14from homeassistant.const import __version__ as hass_version
15from homeassistant.helpers.aiohttp_client import async_get_clientsession
17from custom_components.remote_logger.const import (
18 CONF_BATCH_MAX_SIZE,
19 CONF_CLIENT_TIMEOUT,
20 CONF_ENCODING,
21 CONF_RESOURCE_ATTRIBUTES,
22 CONF_USE_TLS,
23 DEFAULT_CLIENT_TIMEOUT,
24 EVENT_SYSTEM_LOG,
25)
26from custom_components.remote_logger.exporter import LogExporter, LogMessage, LogSubmission
27from custom_components.remote_logger.helpers import flatten_event_data, isotimestamp
29from .const import (
30 CONF_TOKEN_TYPE,
31 DEFAULT_RESOURCE_ATTRIBUTES,
32 DEFAULT_SERVICE_NAME,
33 DEFAULT_SEVERITY,
34 ENCODING_JSON,
35 ENCODING_PROTOBUF,
36 OTLP_LOGS_PATH,
37 SCOPE_NAME,
38 SCOPE_VERSION,
39 SEVERITY_MAP,
40 TOKEN_TYPE_API_KEY,
41 TOKEN_TYPE_BASIC,
42 TOKEN_TYPE_BEARER,
43 TOKEN_TYPE_RAW_BASIC,
44)
45from .protobuf_encoder import encode_export_logs_request
47if TYPE_CHECKING:
48 from homeassistant.config_entries import ConfigEntry
49 from homeassistant.core import Event, HomeAssistant
51_LOGGER = logging.getLogger(__name__)
54def build_auth_header(token: str, token_type: str) -> str:
55 """Build the Authorization header value for bearer or basic auth."""
56 if token_type == TOKEN_TYPE_BASIC:
57 credentials = base64.b64encode(token.encode()).decode()
58 return f"Basic {credentials}"
59 if token_type == TOKEN_TYPE_API_KEY:
60 return f"ApiKey {token}"
61 if token_type == TOKEN_TYPE_RAW_BASIC:
62 return f"Basic {token}"
63 return f"Bearer {token}"
66def parse_resource_attributes(raw: str) -> list[tuple[str, str]]:
67 """Parse 'key1=val1,key2=val2' into a list of (key, value) tuples.
69 Raises ValueError if the format is invalid.
70 """
71 result = []
72 for pair in raw.split(","):
73 pair = pair.strip()
74 if not pair:
75 continue
76 if "=" not in pair:
77 raise ValueError(f"Invalid attribute pair: {pair!r}")
78 key, _, value = pair.partition("=")
79 key = key.strip()
80 value = value.strip()
81 if not key:
82 raise ValueError("Attribute key cannot be empty")
83 result.append((key, value))
84 return result
87def parse_headers(raw: str) -> dict[str, str]:
88 """Parse 'Name: value' lines (newline-separated) into a dict.
90 Raises ValueError if a line is malformed.
91 """
92 result: dict[str, str] = {}
93 for line in raw.splitlines():
94 line = line.strip()
95 if not line:
96 continue
97 if ":" not in line:
98 raise ValueError(f"Invalid header line: {line!r}")
99 name, _, value = line.partition(":")
100 name = name.strip()
101 if not name:
102 raise ValueError("Header name cannot be empty")
103 result[name] = value.strip()
104 return result
107def _mask_auth_headers(headers: dict[str, str]) -> dict[str, str]:
108 def _mask_credential(v: str) -> str:
109 parts = v.split(" ", 1)
110 if len(parts) == 2:
111 scheme, token = parts
112 return f"{scheme} {'*' * len(token)}"
113 return "*" * len(v)
115 return {k: _mask_credential(v) if k.lower() == "authorization" else v for k, v in headers.items()}
118def append_attr(attrs: list[dict[str, Any]], key: str, value: Any, force_null: bool = False) -> None:
119 attr: dict[str, Any] | None = _kv(key, value, force_null=force_null)
120 if attr is not None:
121 attrs.append(attr)
124def _kv(key: str, value: Any, force_null: bool = False) -> dict[str, Any] | None:
125 """Build an OTLP KeyValue attribute"""
126 if value is None and not force_null:
127 return None
128 if isinstance(value, str):
129 return {"key": key, "value": {"stringValue": value}}
130 if isinstance(value, bool):
131 return {"key": key, "value": {"boolValue": value}}
132 if isinstance(value, int):
133 int_val: int | str = value if -(2**31) <= value <= 2**31 - 1 else str(value)
134 return {"key": key, "value": {"intValue": int_val}}
135 if isinstance(value, float):
136 return {"key": key, "value": {"doubleValue": None if value != value else value}}
137 if isinstance(value, bytes):
138 return {"key": key, "value": {"bytesValue": value}}
139 return {"key": key, "value": {"stringValue": str(value)}}
142async def validate(
143 session: aiohttp.ClientSession,
144 url: str,
145 encoding: str,
146 extra_headers: dict[str, str] | None = None,
147) -> dict[str, str]:
148 # Validate connectivity
149 errors: dict[str, str] = {}
150 if encoding == ENCODING_PROTOBUF:
151 data: bytes = encode_export_logs_request({"resourceLogs": []})
152 content_type = "application/x-protobuf"
153 elif encoding == ENCODING_JSON:
154 data = json.dumps({"resourceLogs": []}).encode("utf-8")
155 content_type = "application/json"
156 else:
157 raise ValueError(f"Unknown encoding {encoding}")
158 headers = {"Content-Type": content_type, **(extra_headers or {})}
159 try:
160 async with session.post(
161 url,
162 data=data,
163 headers=headers,
164 timeout=aiohttp.ClientTimeout(total=10),
165 ) as resp:
166 if resp.status >= 400 and resp.status < 500:
167 errors["base"] = "cannot_connect"
168 _LOGGER.error("OTEL-LOGS client connect failed (%s): %s", resp.status, await resp.text())
169 if resp.status >= 500:
170 errors["base"] = "cannot_connect"
171 _LOGGER.error("OTEL-LOGS server connect failed (%s): %s", resp.status, await resp.text())
172 except aiohttp.ClientResponseError as e1:
173 errors["base"] = "cannot_connect"
174 _LOGGER.error("OTEL-LOGS connect client response error: %s", e1)
175 except aiohttp.ClientError as e2:
176 errors["base"] = "cannot_connect"
177 _LOGGER.error("OTEL-LOGS connect client error: %s", e2)
178 except Exception as e3:
179 errors["base"] = "unknown"
180 _LOGGER.error("OTEL-LOGS connect unknown error: %s", e3)
181 return errors
184@dataclass
185class OtlpMessage(LogMessage):
186 payload: dict[str, Any]
189class OtlpSubmission(LogSubmission):
190 def __init__(
191 self, resource: dict[str, Any], records: list[OtlpMessage], extra_headers: dict[str, Any] | None = None
192 ) -> None:
193 self.extra_headers = extra_headers or {}
194 self.resource: dict[str, Any] = resource
195 self.request: dict[str, Any] = self._build_export_request(records)
197 @abstractmethod
198 def body(self) -> dict[str, Any]:
199 pass
201 def _build_export_request(self, records: list[OtlpMessage]) -> dict[str, Any]:
202 """Wrap logRecords in the ExportLogsServiceRequest envelope."""
203 return {
204 "resourceLogs": [
205 {
206 "resource": self.resource,
207 "scopeLogs": [
208 {
209 "scope": {
210 "name": SCOPE_NAME,
211 "version": SCOPE_VERSION,
212 },
213 "logRecords": [r.payload for r in records],
214 }
215 ],
216 }
217 ],
218 }
221class OtlpJsonSubmission(OtlpSubmission):
222 def __init__(
223 self, resource: dict[str, Any], records: list[OtlpMessage], extra_headers: dict[str, Any] | None = None
224 ) -> None:
225 super().__init__(resource, records, extra_headers)
227 def body(self) -> dict[str, Any]:
228 return {"headers": {"Content-Type": "application/json", **self.extra_headers}, "json": self.request}
230 def for_display(self) -> dict[str, Any]:
231 body = self.body()
232 return {**body, "headers": _mask_auth_headers(body["headers"])}
235class OtlpProtobufSubmission(OtlpSubmission):
236 def __init__(
237 self, resource: dict[str, Any], records: list[OtlpMessage], extra_headers: dict[str, Any] | None = None
238 ) -> None:
239 super().__init__(resource, records, extra_headers)
241 def body(self) -> dict[str, Any]:
242 return {
243 "headers": {"Content-Type": "application/x-protobuf", **self.extra_headers},
244 "data": encode_export_logs_request(self.request),
245 }
247 def for_display(self) -> dict[str, Any]:
248 base = self.body()
249 return {
250 "headers": _mask_auth_headers(base["headers"]),
251 "data": base["data"].decode("utf-8", errors="replace").replace("\ufffd", "?"),
252 }
255class OtlpLogExporter(LogExporter):
256 """Buffers system_log_event records and flushes them as OTLP/HTTP JSON."""
258 logger_type = "otel"
260 def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
261 super().__init__(hass)
262 self.name = entry.title
264 self._lock = asyncio.Lock()
265 if hass and hass.config and hass.config.api:
266 self.server_address = hass.config.api.local_ip
267 self.server_port = hass.config.api.port
268 else:
269 self.server_address = None
270 self.server_port = None
272 opts = {**entry.data, **entry.options}
274 host = opts[CONF_HOST]
275 port = opts[CONF_PORT]
276 encoding = opts[CONF_ENCODING]
277 use_tls = opts[CONF_USE_TLS]
278 scheme = "https" if use_tls else "http"
279 path = opts.get(CONF_PATH, OTLP_LOGS_PATH)
280 self.endpoint_url = f"{scheme}://{host}:{port}{path}"
281 self.destination = (host, str(port), encoding)
282 self._use_tls = use_tls
283 self._use_protobuf = encoding == ENCODING_PROTOBUF
284 self._entry = entry
285 self._batch_max_size = opts.get(CONF_BATCH_MAX_SIZE, 100)
286 self._client_timeout = opts.get(CONF_CLIENT_TIMEOUT, DEFAULT_CLIENT_TIMEOUT)
287 self._extra_headers = self._build_extra_headers(opts)
289 self._resource = self._build_resource(opts)
291 _LOGGER.info(f"remote_logger: otel configured for {self.endpoint_url}, protobuf={self._use_protobuf}")
293 def _build_extra_headers(self, opts: dict[str, Any]) -> dict[str, str]:
294 headers: dict[str, str] = {}
295 token = opts.get(CONF_TOKEN, "").strip()
296 if token:
297 token_type = opts.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER)
298 headers["Authorization"] = build_auth_header(token, token_type)
299 raw_headers = opts.get(CONF_HEADERS, "").strip()
300 if raw_headers:
301 headers.update(parse_headers(raw_headers))
302 return headers
304 def _build_resource(self, opts: dict[str, Any]) -> dict[str, Any]:
305 """Build the OTLP Resource object with attributes."""
306 attrs: list[dict[str, Any]] = []
307 append_attr(attrs, "service.name", DEFAULT_SERVICE_NAME)
308 append_attr(attrs, "service.version", hass_version or "unknown")
310 if self.server_address:
311 append_attr(attrs, "service.address", self.server_address)
312 if self.server_port:
313 append_attr(attrs, "service.port", self.server_port)
315 raw = opts.get(CONF_RESOURCE_ATTRIBUTES, DEFAULT_RESOURCE_ATTRIBUTES)
316 if raw and raw.strip():
317 for key, value in parse_resource_attributes(raw):
318 append_attr(attrs, key, value)
320 return {"attributes": attrs}
322 def _to_log_record(
323 self,
324 event: Event,
325 message_override: list[str] | None = None,
326 level_override: str | None = None,
327 state_only: bool = False,
328 ) -> OtlpMessage:
329 """Convert a system_log_event payload to an OTLP logRecord dict."""
330 """ HA System Log Event
331 "name": str
332 "message": list(str)
333 "level": str
334 "source": (str,int)
335 "timestamp": float
336 "exception": str
337 "count": int
338 "first_occurred": float
339 """
340 data = event.data or {}
341 timestamp_s: float = data.get("timestamp", time.time())
342 time_unix_nano = str(int(timestamp_s * 1_000_000_000))
343 observed_timestamp: float = event.time_fired.timestamp()
344 observed_time_unix_nano = str(int(observed_timestamp * 1_000_000_000))
346 level: str = level_override or data.get("level", "INFO").upper()
347 severity_number, severity_text = SEVERITY_MAP.get(level, DEFAULT_SEVERITY)
349 messages: list[str] = message_override or data.get("message", [])
350 message: str = "\n".join(messages)
352 attributes: list[dict[str, Any]] = []
354 if event.event_type == EVENT_SYSTEM_LOG:
355 source = data.get("source")
356 if source and isinstance(source, tuple):
357 source_path, source_lineno = source
358 append_attr(attributes, "code.file.path", source_path)
359 append_attr(attributes, "code.line.number", source_lineno)
360 logger_name = data.get("name")
361 if data.get("count"):
362 append_attr(attributes, "exception.count", data["count"])
363 if data.get("first_occurred"):
364 append_attr(attributes, "exception.first_occurred", isotimestamp(data["first_occurred"]))
365 if logger_name:
366 append_attr(attributes, "code.function.name", logger_name)
367 exception = data.get("exception")
368 if exception:
369 append_attr(attributes, "exception.stacktrace", exception)
371 else:
372 for k, v in data.items():
373 for flat_key, flat_val in flatten_event_data(f"event.data.{k}" if k != "event.data" else k, v, state_only):
374 append_attr(attributes, flat_key, flat_val)
376 # https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/logs/v1/logs.proto
377 payload: dict[str, Any] = {
378 "timeUnixNano": time_unix_nano,
379 "observedTimeUnixNano": observed_time_unix_nano,
380 "severityNumber": severity_number,
381 "severityText": severity_text,
382 "body": {"stringValue": message},
383 "attributes": attributes,
384 }
385 if event.event_type != EVENT_SYSTEM_LOG:
386 payload["eventName"] = event.event_type
387 return OtlpMessage(payload=payload)
389 async def flush(self) -> None:
390 """Flush all buffered log records to the OTLP endpoint."""
391 records: list[OtlpMessage] | None = None
392 async with self._lock:
393 if not self._buffer:
394 return
395 records = cast("list[OtlpMessage]", self._buffer.copy())
396 self._buffer.clear()
398 try:
399 if records:
400 if self._use_protobuf:
401 submission: OtlpSubmission = OtlpProtobufSubmission(self._resource, records, self._extra_headers)
402 else:
403 submission = OtlpJsonSubmission(self._resource, records, self._extra_headers)
404 else:
405 return
406 session: aiohttp.ClientSession = async_get_clientsession(self._hass, verify_ssl=self._use_tls)
407 timeout = aiohttp.ClientTimeout(total=self._client_timeout)
408 async with session.post(self.endpoint_url, timeout=timeout, **submission.body()) as resp:
409 if resp.status in (401, 403):
410 _LOGGER.warning("remote_logger: OTLP authentication failed (%s), triggering reauth", resp.status)
411 self._entry.async_start_reauth(self._hass)
412 return
413 if resp.status >= 400:
414 body = await resp.text()
415 _LOGGER.warning(
416 "remote_logger: OTLP endpoint returned HTTP %s: %s",
417 resp.status,
418 body[:200],
419 )
420 self.on_posting_error(body)
421 if resp.ok or (resp.status >= 400 and resp.status < 500):
422 # records were sent, or there was a client-side error
423 if records:
424 self.last_sent_payload = submission
425 self.on_success()
427 except aiohttp.ClientError as err:
428 _LOGGER.warning("remote_logger: failed to send logs: %s", err)
429 self.on_posting_error(str(err))
430 except RuntimeError as err:
431 if "Session is closed" in str(err):
432 _LOGGER.debug("remote_logger: session closed during flush (shutdown), dropping %d records", len(records or []))
433 else:
434 _LOGGER.exception("remote_logger: unexpected error %s sending logs, skipping records", err)
435 self.on_posting_error(str(err))
436 except Exception as e:
437 _LOGGER.exception("remote_logger: unexpected error %s sending logs, skipping records", e)
438 self.on_posting_error(str(e))
440 def log_direct(self, event_name: str, message: str, level: str, attributes: dict[str, Any] | None = None) -> None:
441 """Buffer a custom log record without requiring a HA Event."""
442 now = time.time()
443 time_unix_nano = str(int(now * 1_000_000_000))
444 severity_number, severity_text = SEVERITY_MAP.get(level.upper(), DEFAULT_SEVERITY)
445 attrs: list[dict[str, Any]] = []
446 for k, v in (attributes or {}).items():
447 append_attr(attrs, k, v)
449 record = OtlpMessage(
450 payload={
451 "timeUnixNano": time_unix_nano,
452 "observedTimeUnixNano": time_unix_nano,
453 "severityNumber": severity_number,
454 "severityText": severity_text,
455 "body": {"stringValue": message},
456 "attributes": attrs,
457 "eventName": event_name,
458 }
459 )
460 self._buffer.append(record)
461 self.on_event()
462 if len(self._buffer) >= self._batch_max_size:
463 self._hass.async_create_task(self.flush())