Coverage for custom_components / remote_logger / otel / exporter.py: 92%
283 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-22 21:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-22 21:55 +0000
1from __future__ import annotations
3import asyncio
4import base64
5import datetime as dt
6import json
7import logging
8import re
9import time
10import typing
11from abc import abstractmethod
12from dataclasses import dataclass
14import aiohttp
15from homeassistant.components.system_log import EVENT_SYSTEM_LOG
16from homeassistant.const import CONF_HEADERS, CONF_HOST, CONF_PATH, CONF_PORT, CONF_TOKEN
17from homeassistant.const import __version__ as hass_version
18from homeassistant.helpers.aiohttp_client import async_get_clientsession
20from custom_components.remote_logger.const import (
21 CONF_BATCH_MAX_SIZE,
22 CONF_CLIENT_TIMEOUT,
23 CONF_ENCODING,
24 CONF_RESOURCE_ATTRIBUTES,
25 CONF_SUPPRESS_SYSTEM_LOG_EVENT_NAME,
26 CONF_USE_TLS,
27 DEFAULT_CLIENT_TIMEOUT,
28)
29from custom_components.remote_logger.exporter import LogExporter, LogMessage, LogSubmission
30from custom_components.remote_logger.helpers import flatten_event_data, isotimestamp
32from .const import (
33 CONF_TOKEN_TYPE,
34 DEFAULT_RESOURCE_ATTRIBUTES,
35 DEFAULT_SERVICE_NAME,
36 DEFAULT_SEVERITY,
37 ENCODING_JSON,
38 ENCODING_PROTOBUF,
39 OTLP_LOGS_PATH,
40 SCOPE_NAME,
41 SCOPE_VERSION,
42 SEVERITY_MAP,
43 TOKEN_TYPE_API_KEY,
44 TOKEN_TYPE_BASIC,
45 TOKEN_TYPE_BEARER,
46 TOKEN_TYPE_RAW_BASIC,
47)
48from .protobuf_encoder import encode_export_logs_request
50if typing.TYPE_CHECKING:
51 from collections.abc import Mapping
53 from homeassistant.config_entries import ConfigEntry
54 from homeassistant.core import HomeAssistant
56_LOGGER = logging.getLogger(__name__)
59def build_auth_header(token: str, token_type: str) -> str:
60 """Build the Authorization header value for bearer or basic auth."""
61 if token_type == TOKEN_TYPE_BASIC:
62 credentials = base64.b64encode(token.encode()).decode()
63 return f"Basic {credentials}"
64 if token_type == TOKEN_TYPE_API_KEY:
65 return f"ApiKey {token}"
66 if token_type == TOKEN_TYPE_RAW_BASIC:
67 return f"Basic {token}"
68 return f"Bearer {token}"
71def parse_resource_attributes(raw: str) -> list[tuple[str, str]]:
72 """Parse 'key1=val1,key2=val2' into a list of (key, value) tuples.
74 Raises ValueError if the format is invalid.
75 """
76 result = []
77 for pair in raw.split(","):
78 pair = pair.strip()
79 if not pair:
80 continue
81 if "=" not in pair:
82 raise ValueError(f"Invalid attribute pair: {pair!r}")
83 key, _, value = pair.partition("=")
84 key = key.strip()
85 value = value.strip()
86 if not key:
87 raise ValueError("Attribute key cannot be empty")
88 result.append((key, value))
89 return result
92def parse_headers(raw: str) -> dict[str, str]:
93 """Parse 'Name: value' entries into a dict.
95 Entries may be newline- or comma-separated (commas inside values are safe).
96 Raises ValueError if an entry is malformed.
97 """
98 result: dict[str, str] = {}
99 for line in re.split(r"\n|,(?=\s*[\w-]+\s*:)", raw):
100 line = line.strip()
101 if not line:
102 continue
103 if ":" not in line:
104 raise ValueError(f"Invalid header line: {line!r}")
105 name, _, value = line.partition(":")
106 name = name.strip()
107 if not name:
108 raise ValueError("Header name cannot be empty")
109 result[name] = value.strip()
110 return result
113def _mask_auth_headers(headers: dict[str, str]) -> dict[str, str]:
114 def _mask_credential(v: str) -> str:
115 parts = v.split(" ", 1)
116 if len(parts) == 2:
117 scheme, token = parts
118 return f"{scheme} {'*' * len(token)}"
119 return "*" * len(v)
121 return {k: _mask_credential(v) if k.lower() == "authorization" else v for k, v in headers.items()}
124def append_attr(attrs: list[dict[str, typing.Any]], key: str, value: typing.Any, force_null: bool = False) -> None:
125 attr: dict[str, typing.Any] | None = _kv(key, value, force_null=force_null)
126 if attr is not None:
127 attrs.append(attr)
130def _kv(key: str, value: typing.Any, force_null: bool = False) -> dict[str, typing.Any] | None:
131 """Build an OTLP KeyValue attribute"""
132 if value is None and not force_null:
133 return None
134 if isinstance(value, str):
135 return {"key": key, "value": {"stringValue": value}}
136 if isinstance(value, bool):
137 return {"key": key, "value": {"boolValue": value}}
138 if isinstance(value, int):
139 int_val: int | str = value if -(2**31) <= value <= 2**31 - 1 else str(value)
140 return {"key": key, "value": {"intValue": int_val}}
141 if isinstance(value, float):
142 return {"key": key, "value": {"doubleValue": None if value != value else value}}
143 if isinstance(value, bytes):
144 return {"key": key, "value": {"bytesValue": value}}
145 return {"key": key, "value": {"stringValue": str(value)}}
148async def validate(
149 session: aiohttp.ClientSession,
150 url: str,
151 encoding: str,
152 extra_headers: dict[str, str] | None = None,
153) -> dict[str, str]:
154 # Validate connectivity
155 errors: dict[str, str] = {}
156 if encoding == ENCODING_PROTOBUF:
157 data: bytes = encode_export_logs_request({"resourceLogs": []})
158 content_type = "application/x-protobuf"
159 elif encoding == ENCODING_JSON:
160 data = json.dumps({"resourceLogs": []}).encode("utf-8")
161 content_type = "application/json"
162 else:
163 raise ValueError(f"Unknown encoding {encoding}")
164 headers = {"Content-Type": content_type, **(extra_headers or {})}
165 try:
166 async with session.post(
167 url,
168 data=data,
169 headers=headers,
170 timeout=aiohttp.ClientTimeout(total=10),
171 ) as resp:
172 if resp.status >= 400 and resp.status < 500 and resp.status != 422:
173 # 422 Unprocessable Entity means the endpoint is reachable but rejected
174 # our empty validation payload — that confirms connectivity.
175 errors["base"] = "cannot_connect"
176 _LOGGER.error("remote_logger: client connect failed (%s): %s", resp.status, await resp.text())
177 if resp.status >= 500:
178 errors["base"] = "cannot_connect"
179 _LOGGER.error("remote_logger: server connect failed (%s): %s", resp.status, await resp.text())
180 except aiohttp.ClientResponseError as e1:
181 errors["base"] = "cannot_connect"
182 _LOGGER.error("remote_logger: connect client response error: %s", e1)
183 except aiohttp.ClientError as e2:
184 errors["base"] = "cannot_connect"
185 _LOGGER.error("remote_logger: connect client error: %s", e2)
186 except Exception as e3:
187 errors["base"] = "unknown"
188 _LOGGER.error("remote_logger: connect unknown error: %s", e3)
189 return errors
192@dataclass
193class OtlpMessage(LogMessage):
194 payload: dict[str, typing.Any]
197class OtlpSubmission(LogSubmission):
198 def __init__(
199 self, resource: dict[str, typing.Any], records: list[OtlpMessage], extra_headers: dict[str, typing.Any] | None = None
200 ) -> None:
201 self.extra_headers = extra_headers or {}
202 self.resource: dict[str, typing.Any] = resource
203 self.request: dict[str, typing.Any] = self._build_export_request(records)
205 @abstractmethod
206 def body(self) -> dict[str, typing.Any]:
207 pass
209 def _build_export_request(self, records: list[OtlpMessage]) -> dict[str, typing.Any]:
210 """Wrap logRecords in the ExportLogsServiceRequest envelope."""
211 return {
212 "resourceLogs": [
213 {
214 "resource": self.resource,
215 "scopeLogs": [
216 {
217 "scope": {
218 "name": SCOPE_NAME,
219 "version": SCOPE_VERSION,
220 },
221 "logRecords": [r.payload for r in records],
222 }
223 ],
224 }
225 ],
226 }
229class OtlpJsonSubmission(OtlpSubmission):
230 def __init__(
231 self, resource: dict[str, typing.Any], records: list[OtlpMessage], extra_headers: dict[str, typing.Any] | None = None
232 ) -> None:
233 super().__init__(resource, records, extra_headers)
235 def body(self) -> dict[str, typing.Any]:
236 return {"headers": {"Content-Type": "application/json", **self.extra_headers}, "json": self.request}
238 def for_display(self) -> dict[str, typing.Any]:
239 body = self.body()
240 return {**body, "headers": _mask_auth_headers(body["headers"])}
243class OtlpProtobufSubmission(OtlpSubmission):
244 def __init__(
245 self, resource: dict[str, typing.Any], records: list[OtlpMessage], extra_headers: dict[str, typing.Any] | None = None
246 ) -> None:
247 super().__init__(resource, records, extra_headers)
249 def body(self) -> dict[str, typing.Any]:
250 return {
251 "headers": {"Content-Type": "application/x-protobuf", **self.extra_headers},
252 "data": encode_export_logs_request(self.request),
253 }
255 def for_display(self) -> dict[str, typing.Any]:
256 base = self.body()
257 return {
258 "headers": _mask_auth_headers(base["headers"]),
259 "data": base["data"].decode("utf-8", errors="replace").replace("\ufffd", "?"),
260 }
263class OtlpLogExporter(LogExporter):
264 """Buffers system_log_event records and flushes them as OTLP/HTTP JSON."""
266 logger_type = "otel"
268 def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
269 super().__init__(hass)
270 self.name = entry.title
272 self._lock = asyncio.Lock()
273 if hass and hass.config and hass.config.api:
274 self.server_address = hass.config.api.local_ip
275 self.server_port = hass.config.api.port
276 else:
277 self.server_address = None
278 self.server_port = None
280 opts = {**entry.data, **entry.options}
282 host = opts[CONF_HOST]
283 port = opts[CONF_PORT]
284 encoding = opts[CONF_ENCODING]
285 use_tls = opts[CONF_USE_TLS]
286 scheme = "https" if use_tls else "http"
287 path = opts.get(CONF_PATH, OTLP_LOGS_PATH)
288 self.endpoint_url = f"{scheme}://{host}:{port}{path}"
289 self.destination = (host, str(port), encoding)
290 self._use_tls = use_tls
291 self._use_protobuf = encoding == ENCODING_PROTOBUF
292 self._entry = entry
293 self._batch_max_size = opts.get(CONF_BATCH_MAX_SIZE, 100)
294 self._client_timeout = opts.get(CONF_CLIENT_TIMEOUT, DEFAULT_CLIENT_TIMEOUT)
295 self._suppress_system_log_event_name = opts.get(CONF_SUPPRESS_SYSTEM_LOG_EVENT_NAME, True)
296 self._extra_headers = self._build_extra_headers(opts)
297 self._resource = self._build_resource(opts)
299 _LOGGER.info(f"remote_logger: otel configured for {self.endpoint_url}, protobuf={self._use_protobuf}")
301 def _build_extra_headers(self, opts: dict[str, typing.Any]) -> dict[str, str]:
302 headers: dict[str, str] = {}
303 token = opts.get(CONF_TOKEN, "").strip()
304 if token:
305 token_type = opts.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER)
306 headers["Authorization"] = build_auth_header(token, token_type)
307 raw_headers = "\n".join(opts.get(CONF_HEADERS, []))
308 if raw_headers:
309 headers.update(parse_headers(raw_headers))
310 return headers
312 def _build_resource(self, opts: dict[str, typing.Any]) -> dict[str, typing.Any]:
313 """Build the OTLP Resource object with attributes."""
314 attrs: list[dict[str, typing.Any]] = []
315 append_attr(attrs, "service.name", DEFAULT_SERVICE_NAME)
316 append_attr(attrs, "service.version", hass_version or "unknown")
318 if self.server_address:
319 append_attr(attrs, "service.address", self.server_address)
320 if self.server_port:
321 append_attr(attrs, "service.port", self.server_port)
323 raw = opts.get(CONF_RESOURCE_ATTRIBUTES, DEFAULT_RESOURCE_ATTRIBUTES)
324 if raw and raw.strip():
325 for key, value in parse_resource_attributes(raw):
326 append_attr(attrs, key, value)
328 return {"attributes": attrs}
330 def create_log_record(
331 self,
332 event_data: Mapping[str, typing.Any],
333 event_type: str | None = None,
334 time_fired: dt.datetime | None = None,
335 message_override: list[str] | None = None,
336 level_override: str | None = None,
337 state_only: bool = False,
338 ) -> OtlpMessage:
339 """Convert a system_log_event payload to an OTLP logRecord dict."""
340 """ HA System Log Event
341 "name": str
342 "message": list(str)
343 "level": str
344 "source": (str,int)
345 "timestamp": float
346 "exception": str
347 "count": int
348 "first_occurred": float
349 """
350 data: Mapping[str, typing.Any] | dict[str, typing.Any] = event_data or {}
351 time_fired = time_fired or dt.datetime.now(tz=self.tz)
352 timestamp_s: float = data.get("timestamp", time.time())
353 time_unix_nano = str(int(timestamp_s * 1_000_000_000))
354 observed_timestamp: float = time_fired.timestamp()
355 observed_time_unix_nano = str(int(observed_timestamp * 1_000_000_000))
357 level: str = level_override or data.get("level", "INFO").upper()
358 severity_number, severity_text = SEVERITY_MAP.get(level, DEFAULT_SEVERITY)
360 messages: list[str] = message_override or data.get("message", [])
361 message: str = "\n".join(messages)
363 attributes: list[dict[str, typing.Any]] = []
365 if event_type == EVENT_SYSTEM_LOG or event_type is None:
366 source = data.get("source")
367 if source and isinstance(source, tuple):
368 source_path, source_lineno = source
369 append_attr(attributes, "code.file.path", source_path)
370 append_attr(attributes, "code.line.number", source_lineno)
371 logger_name = data.get("name")
372 if data.get("count"):
373 append_attr(attributes, "exception.count", data["count"])
374 if data.get("first_occurred"):
375 append_attr(attributes, "exception.first_occurred", isotimestamp(data["first_occurred"]))
376 if logger_name:
377 append_attr(attributes, "code.function.name", logger_name)
378 exception = data.get("exception")
379 if exception:
380 append_attr(attributes, "exception.stacktrace", exception)
382 else:
383 for k, v in data.items():
384 for flat_key, flat_val in flatten_event_data(f"event.data.{k}" if k != "event.data" else k, v, state_only):
385 append_attr(attributes, flat_key, flat_val)
387 # https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/logs/v1/logs.proto
388 payload: dict[str, typing.Any] = {
389 "timeUnixNano": time_unix_nano,
390 "observedTimeUnixNano": observed_time_unix_nano,
391 "severityNumber": severity_number,
392 "severityText": severity_text,
393 "body": {"stringValue": message},
394 "attributes": attributes,
395 }
396 if event_type is not None and (event_type != EVENT_SYSTEM_LOG or not self._suppress_system_log_event_name):
397 payload["eventName"] = event_type
398 return OtlpMessage(payload=payload)
400 async def flush(self) -> None:
401 """Flush all buffered log records to the OTLP endpoint."""
402 records: list[OtlpMessage] | None = None
403 async with self._lock:
404 if not self._buffer:
405 return
406 records = typing.cast("list[OtlpMessage]", self._buffer.copy())
407 self._buffer.clear()
409 try:
410 if records:
411 if self._use_protobuf:
412 submission: OtlpSubmission = OtlpProtobufSubmission(self._resource, records, self._extra_headers)
413 else:
414 submission = OtlpJsonSubmission(self._resource, records, self._extra_headers)
415 else:
416 return
417 session: aiohttp.ClientSession = async_get_clientsession(self._hass, verify_ssl=self._use_tls)
418 timeout = aiohttp.ClientTimeout(total=self._client_timeout)
419 async with session.post(self.endpoint_url, timeout=timeout, **submission.body()) as resp:
420 if resp.status in (401, 403):
421 _LOGGER.warning("remote_logger: OTLP authentication failed (%s), triggering reauth", resp.status)
422 self._entry.async_start_reauth(self._hass)
423 return
424 if resp.status >= 400:
425 body = await resp.text()
426 _LOGGER.warning(
427 "remote_logger: OTLP endpoint returned HTTP %s: %s",
428 resp.status,
429 body[:200],
430 )
431 self.on_posting_error(body)
432 if resp.ok or (resp.status >= 400 and resp.status < 500):
433 # records were sent, or there was a client-side error
434 if records:
435 self.last_sent_payload = submission
436 self.on_success()
438 except aiohttp.ClientError as err:
439 _LOGGER.warning("remote_logger: failed to send logs: %s", err)
440 self.on_posting_error(str(err))
441 except RuntimeError as err:
442 if "Session is closed" in str(err):
443 _LOGGER.debug("remote_logger: session closed during flush (shutdown), dropping %d records", len(records or []))
444 else:
445 _LOGGER.exception("remote_logger: unexpected error %s sending logs, skipping records", err)
446 self.on_posting_error(str(err))
447 except Exception as e:
448 _LOGGER.exception("remote_logger: unexpected error %s sending logs, skipping records", e)
449 self.on_posting_error(str(e))
451 def log_direct(self, event_name: str, message: str, level: str, attributes: dict[str, typing.Any] | None = None) -> None:
452 """Buffer a custom log record without requiring a HA Event."""
453 now = time.time()
454 time_unix_nano = str(int(now * 1_000_000_000))
455 severity_number, severity_text = SEVERITY_MAP.get(level.upper(), DEFAULT_SEVERITY)
456 attrs: list[dict[str, typing.Any]] = []
457 for k, v in (attributes or {}).items():
458 append_attr(attrs, k, v)
460 record = OtlpMessage(
461 payload={
462 "timeUnixNano": time_unix_nano,
463 "observedTimeUnixNano": time_unix_nano,
464 "severityNumber": severity_number,
465 "severityText": severity_text,
466 "body": {"stringValue": message},
467 "attributes": attrs,
468 "eventName": event_name,
469 }
470 )
471 self._buffer.append(record)
472 self.on_event()
473 if len(self._buffer) >= self._batch_max_size:
474 self._hass.async_create_task(self.flush())