Coverage for custom_components / remote_logger / otel / exporter.py: 92%

283 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-22 21:55 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import base64 

5import datetime as dt 

6import json 

7import logging 

8import re 

9import time 

10import typing 

11from abc import abstractmethod 

12from dataclasses import dataclass 

13 

14import aiohttp 

15from homeassistant.components.system_log import EVENT_SYSTEM_LOG 

16from homeassistant.const import CONF_HEADERS, CONF_HOST, CONF_PATH, CONF_PORT, CONF_TOKEN 

17from homeassistant.const import __version__ as hass_version 

18from homeassistant.helpers.aiohttp_client import async_get_clientsession 

19 

20from custom_components.remote_logger.const import ( 

21 CONF_BATCH_MAX_SIZE, 

22 CONF_CLIENT_TIMEOUT, 

23 CONF_ENCODING, 

24 CONF_RESOURCE_ATTRIBUTES, 

25 CONF_SUPPRESS_SYSTEM_LOG_EVENT_NAME, 

26 CONF_USE_TLS, 

27 DEFAULT_CLIENT_TIMEOUT, 

28) 

29from custom_components.remote_logger.exporter import LogExporter, LogMessage, LogSubmission 

30from custom_components.remote_logger.helpers import flatten_event_data, isotimestamp 

31 

32from .const import ( 

33 CONF_TOKEN_TYPE, 

34 DEFAULT_RESOURCE_ATTRIBUTES, 

35 DEFAULT_SERVICE_NAME, 

36 DEFAULT_SEVERITY, 

37 ENCODING_JSON, 

38 ENCODING_PROTOBUF, 

39 OTLP_LOGS_PATH, 

40 SCOPE_NAME, 

41 SCOPE_VERSION, 

42 SEVERITY_MAP, 

43 TOKEN_TYPE_API_KEY, 

44 TOKEN_TYPE_BASIC, 

45 TOKEN_TYPE_BEARER, 

46 TOKEN_TYPE_RAW_BASIC, 

47) 

48from .protobuf_encoder import encode_export_logs_request 

49 

50if typing.TYPE_CHECKING: 

51 from collections.abc import Mapping 

52 

53 from homeassistant.config_entries import ConfigEntry 

54 from homeassistant.core import HomeAssistant 

55 

56_LOGGER = logging.getLogger(__name__) 

57 

58 

59def build_auth_header(token: str, token_type: str) -> str: 

60 """Build the Authorization header value for bearer or basic auth.""" 

61 if token_type == TOKEN_TYPE_BASIC: 

62 credentials = base64.b64encode(token.encode()).decode() 

63 return f"Basic {credentials}" 

64 if token_type == TOKEN_TYPE_API_KEY: 

65 return f"ApiKey {token}" 

66 if token_type == TOKEN_TYPE_RAW_BASIC: 

67 return f"Basic {token}" 

68 return f"Bearer {token}" 

69 

70 

71def parse_resource_attributes(raw: str) -> list[tuple[str, str]]: 

72 """Parse 'key1=val1,key2=val2' into a list of (key, value) tuples. 

73 

74 Raises ValueError if the format is invalid. 

75 """ 

76 result = [] 

77 for pair in raw.split(","): 

78 pair = pair.strip() 

79 if not pair: 

80 continue 

81 if "=" not in pair: 

82 raise ValueError(f"Invalid attribute pair: {pair!r}") 

83 key, _, value = pair.partition("=") 

84 key = key.strip() 

85 value = value.strip() 

86 if not key: 

87 raise ValueError("Attribute key cannot be empty") 

88 result.append((key, value)) 

89 return result 

90 

91 

92def parse_headers(raw: str) -> dict[str, str]: 

93 """Parse 'Name: value' entries into a dict. 

94 

95 Entries may be newline- or comma-separated (commas inside values are safe). 

96 Raises ValueError if an entry is malformed. 

97 """ 

98 result: dict[str, str] = {} 

99 for line in re.split(r"\n|,(?=\s*[\w-]+\s*:)", raw): 

100 line = line.strip() 

101 if not line: 

102 continue 

103 if ":" not in line: 

104 raise ValueError(f"Invalid header line: {line!r}") 

105 name, _, value = line.partition(":") 

106 name = name.strip() 

107 if not name: 

108 raise ValueError("Header name cannot be empty") 

109 result[name] = value.strip() 

110 return result 

111 

112 

113def _mask_auth_headers(headers: dict[str, str]) -> dict[str, str]: 

114 def _mask_credential(v: str) -> str: 

115 parts = v.split(" ", 1) 

116 if len(parts) == 2: 

117 scheme, token = parts 

118 return f"{scheme} {'*' * len(token)}" 

119 return "*" * len(v) 

120 

121 return {k: _mask_credential(v) if k.lower() == "authorization" else v for k, v in headers.items()} 

122 

123 

124def append_attr(attrs: list[dict[str, typing.Any]], key: str, value: typing.Any, force_null: bool = False) -> None: 

125 attr: dict[str, typing.Any] | None = _kv(key, value, force_null=force_null) 

126 if attr is not None: 

127 attrs.append(attr) 

128 

129 

130def _kv(key: str, value: typing.Any, force_null: bool = False) -> dict[str, typing.Any] | None: 

131 """Build an OTLP KeyValue attribute""" 

132 if value is None and not force_null: 

133 return None 

134 if isinstance(value, str): 

135 return {"key": key, "value": {"stringValue": value}} 

136 if isinstance(value, bool): 

137 return {"key": key, "value": {"boolValue": value}} 

138 if isinstance(value, int): 

139 int_val: int | str = value if -(2**31) <= value <= 2**31 - 1 else str(value) 

140 return {"key": key, "value": {"intValue": int_val}} 

141 if isinstance(value, float): 

142 return {"key": key, "value": {"doubleValue": None if value != value else value}} 

143 if isinstance(value, bytes): 

144 return {"key": key, "value": {"bytesValue": value}} 

145 return {"key": key, "value": {"stringValue": str(value)}} 

146 

147 

148async def validate( 

149 session: aiohttp.ClientSession, 

150 url: str, 

151 encoding: str, 

152 extra_headers: dict[str, str] | None = None, 

153) -> dict[str, str]: 

154 # Validate connectivity 

155 errors: dict[str, str] = {} 

156 if encoding == ENCODING_PROTOBUF: 

157 data: bytes = encode_export_logs_request({"resourceLogs": []}) 

158 content_type = "application/x-protobuf" 

159 elif encoding == ENCODING_JSON: 

160 data = json.dumps({"resourceLogs": []}).encode("utf-8") 

161 content_type = "application/json" 

162 else: 

163 raise ValueError(f"Unknown encoding {encoding}") 

164 headers = {"Content-Type": content_type, **(extra_headers or {})} 

165 try: 

166 async with session.post( 

167 url, 

168 data=data, 

169 headers=headers, 

170 timeout=aiohttp.ClientTimeout(total=10), 

171 ) as resp: 

172 if resp.status >= 400 and resp.status < 500 and resp.status != 422: 

173 # 422 Unprocessable Entity means the endpoint is reachable but rejected 

174 # our empty validation payload — that confirms connectivity. 

175 errors["base"] = "cannot_connect" 

176 _LOGGER.error("remote_logger: client connect failed (%s): %s", resp.status, await resp.text()) 

177 if resp.status >= 500: 

178 errors["base"] = "cannot_connect" 

179 _LOGGER.error("remote_logger: server connect failed (%s): %s", resp.status, await resp.text()) 

180 except aiohttp.ClientResponseError as e1: 

181 errors["base"] = "cannot_connect" 

182 _LOGGER.error("remote_logger: connect client response error: %s", e1) 

183 except aiohttp.ClientError as e2: 

184 errors["base"] = "cannot_connect" 

185 _LOGGER.error("remote_logger: connect client error: %s", e2) 

186 except Exception as e3: 

187 errors["base"] = "unknown" 

188 _LOGGER.error("remote_logger: connect unknown error: %s", e3) 

189 return errors 

190 

191 

192@dataclass 

193class OtlpMessage(LogMessage): 

194 payload: dict[str, typing.Any] 

195 

196 

197class OtlpSubmission(LogSubmission): 

198 def __init__( 

199 self, resource: dict[str, typing.Any], records: list[OtlpMessage], extra_headers: dict[str, typing.Any] | None = None 

200 ) -> None: 

201 self.extra_headers = extra_headers or {} 

202 self.resource: dict[str, typing.Any] = resource 

203 self.request: dict[str, typing.Any] = self._build_export_request(records) 

204 

205 @abstractmethod 

206 def body(self) -> dict[str, typing.Any]: 

207 pass 

208 

209 def _build_export_request(self, records: list[OtlpMessage]) -> dict[str, typing.Any]: 

210 """Wrap logRecords in the ExportLogsServiceRequest envelope.""" 

211 return { 

212 "resourceLogs": [ 

213 { 

214 "resource": self.resource, 

215 "scopeLogs": [ 

216 { 

217 "scope": { 

218 "name": SCOPE_NAME, 

219 "version": SCOPE_VERSION, 

220 }, 

221 "logRecords": [r.payload for r in records], 

222 } 

223 ], 

224 } 

225 ], 

226 } 

227 

228 

229class OtlpJsonSubmission(OtlpSubmission): 

230 def __init__( 

231 self, resource: dict[str, typing.Any], records: list[OtlpMessage], extra_headers: dict[str, typing.Any] | None = None 

232 ) -> None: 

233 super().__init__(resource, records, extra_headers) 

234 

235 def body(self) -> dict[str, typing.Any]: 

236 return {"headers": {"Content-Type": "application/json", **self.extra_headers}, "json": self.request} 

237 

238 def for_display(self) -> dict[str, typing.Any]: 

239 body = self.body() 

240 return {**body, "headers": _mask_auth_headers(body["headers"])} 

241 

242 

243class OtlpProtobufSubmission(OtlpSubmission): 

244 def __init__( 

245 self, resource: dict[str, typing.Any], records: list[OtlpMessage], extra_headers: dict[str, typing.Any] | None = None 

246 ) -> None: 

247 super().__init__(resource, records, extra_headers) 

248 

249 def body(self) -> dict[str, typing.Any]: 

250 return { 

251 "headers": {"Content-Type": "application/x-protobuf", **self.extra_headers}, 

252 "data": encode_export_logs_request(self.request), 

253 } 

254 

255 def for_display(self) -> dict[str, typing.Any]: 

256 base = self.body() 

257 return { 

258 "headers": _mask_auth_headers(base["headers"]), 

259 "data": base["data"].decode("utf-8", errors="replace").replace("\ufffd", "?"), 

260 } 

261 

262 

263class OtlpLogExporter(LogExporter): 

264 """Buffers system_log_event records and flushes them as OTLP/HTTP JSON.""" 

265 

266 logger_type = "otel" 

267 

268 def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None: 

269 super().__init__(hass) 

270 self.name = entry.title 

271 

272 self._lock = asyncio.Lock() 

273 if hass and hass.config and hass.config.api: 

274 self.server_address = hass.config.api.local_ip 

275 self.server_port = hass.config.api.port 

276 else: 

277 self.server_address = None 

278 self.server_port = None 

279 

280 opts = {**entry.data, **entry.options} 

281 

282 host = opts[CONF_HOST] 

283 port = opts[CONF_PORT] 

284 encoding = opts[CONF_ENCODING] 

285 use_tls = opts[CONF_USE_TLS] 

286 scheme = "https" if use_tls else "http" 

287 path = opts.get(CONF_PATH, OTLP_LOGS_PATH) 

288 self.endpoint_url = f"{scheme}://{host}:{port}{path}" 

289 self.destination = (host, str(port), encoding) 

290 self._use_tls = use_tls 

291 self._use_protobuf = encoding == ENCODING_PROTOBUF 

292 self._entry = entry 

293 self._batch_max_size = opts.get(CONF_BATCH_MAX_SIZE, 100) 

294 self._client_timeout = opts.get(CONF_CLIENT_TIMEOUT, DEFAULT_CLIENT_TIMEOUT) 

295 self._suppress_system_log_event_name = opts.get(CONF_SUPPRESS_SYSTEM_LOG_EVENT_NAME, True) 

296 self._extra_headers = self._build_extra_headers(opts) 

297 self._resource = self._build_resource(opts) 

298 

299 _LOGGER.info(f"remote_logger: otel configured for {self.endpoint_url}, protobuf={self._use_protobuf}") 

300 

301 def _build_extra_headers(self, opts: dict[str, typing.Any]) -> dict[str, str]: 

302 headers: dict[str, str] = {} 

303 token = opts.get(CONF_TOKEN, "").strip() 

304 if token: 

305 token_type = opts.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER) 

306 headers["Authorization"] = build_auth_header(token, token_type) 

307 raw_headers = "\n".join(opts.get(CONF_HEADERS, [])) 

308 if raw_headers: 

309 headers.update(parse_headers(raw_headers)) 

310 return headers 

311 

312 def _build_resource(self, opts: dict[str, typing.Any]) -> dict[str, typing.Any]: 

313 """Build the OTLP Resource object with attributes.""" 

314 attrs: list[dict[str, typing.Any]] = [] 

315 append_attr(attrs, "service.name", DEFAULT_SERVICE_NAME) 

316 append_attr(attrs, "service.version", hass_version or "unknown") 

317 

318 if self.server_address: 

319 append_attr(attrs, "service.address", self.server_address) 

320 if self.server_port: 

321 append_attr(attrs, "service.port", self.server_port) 

322 

323 raw = opts.get(CONF_RESOURCE_ATTRIBUTES, DEFAULT_RESOURCE_ATTRIBUTES) 

324 if raw and raw.strip(): 

325 for key, value in parse_resource_attributes(raw): 

326 append_attr(attrs, key, value) 

327 

328 return {"attributes": attrs} 

329 

330 def create_log_record( 

331 self, 

332 event_data: Mapping[str, typing.Any], 

333 event_type: str | None = None, 

334 time_fired: dt.datetime | None = None, 

335 message_override: list[str] | None = None, 

336 level_override: str | None = None, 

337 state_only: bool = False, 

338 ) -> OtlpMessage: 

339 """Convert a system_log_event payload to an OTLP logRecord dict.""" 

340 """ HA System Log Event 

341 "name": str 

342 "message": list(str) 

343 "level": str 

344 "source": (str,int) 

345 "timestamp": float 

346 "exception": str 

347 "count": int 

348 "first_occurred": float 

349 """ 

350 data: Mapping[str, typing.Any] | dict[str, typing.Any] = event_data or {} 

351 time_fired = time_fired or dt.datetime.now(tz=self.tz) 

352 timestamp_s: float = data.get("timestamp", time.time()) 

353 time_unix_nano = str(int(timestamp_s * 1_000_000_000)) 

354 observed_timestamp: float = time_fired.timestamp() 

355 observed_time_unix_nano = str(int(observed_timestamp * 1_000_000_000)) 

356 

357 level: str = level_override or data.get("level", "INFO").upper() 

358 severity_number, severity_text = SEVERITY_MAP.get(level, DEFAULT_SEVERITY) 

359 

360 messages: list[str] = message_override or data.get("message", []) 

361 message: str = "\n".join(messages) 

362 

363 attributes: list[dict[str, typing.Any]] = [] 

364 

365 if event_type == EVENT_SYSTEM_LOG or event_type is None: 

366 source = data.get("source") 

367 if source and isinstance(source, tuple): 

368 source_path, source_lineno = source 

369 append_attr(attributes, "code.file.path", source_path) 

370 append_attr(attributes, "code.line.number", source_lineno) 

371 logger_name = data.get("name") 

372 if data.get("count"): 

373 append_attr(attributes, "exception.count", data["count"]) 

374 if data.get("first_occurred"): 

375 append_attr(attributes, "exception.first_occurred", isotimestamp(data["first_occurred"])) 

376 if logger_name: 

377 append_attr(attributes, "code.function.name", logger_name) 

378 exception = data.get("exception") 

379 if exception: 

380 append_attr(attributes, "exception.stacktrace", exception) 

381 

382 else: 

383 for k, v in data.items(): 

384 for flat_key, flat_val in flatten_event_data(f"event.data.{k}" if k != "event.data" else k, v, state_only): 

385 append_attr(attributes, flat_key, flat_val) 

386 

387 # https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/logs/v1/logs.proto 

388 payload: dict[str, typing.Any] = { 

389 "timeUnixNano": time_unix_nano, 

390 "observedTimeUnixNano": observed_time_unix_nano, 

391 "severityNumber": severity_number, 

392 "severityText": severity_text, 

393 "body": {"stringValue": message}, 

394 "attributes": attributes, 

395 } 

396 if event_type is not None and (event_type != EVENT_SYSTEM_LOG or not self._suppress_system_log_event_name): 

397 payload["eventName"] = event_type 

398 return OtlpMessage(payload=payload) 

399 

400 async def flush(self) -> None: 

401 """Flush all buffered log records to the OTLP endpoint.""" 

402 records: list[OtlpMessage] | None = None 

403 async with self._lock: 

404 if not self._buffer: 

405 return 

406 records = typing.cast("list[OtlpMessage]", self._buffer.copy()) 

407 self._buffer.clear() 

408 

409 try: 

410 if records: 

411 if self._use_protobuf: 

412 submission: OtlpSubmission = OtlpProtobufSubmission(self._resource, records, self._extra_headers) 

413 else: 

414 submission = OtlpJsonSubmission(self._resource, records, self._extra_headers) 

415 else: 

416 return 

417 session: aiohttp.ClientSession = async_get_clientsession(self._hass, verify_ssl=self._use_tls) 

418 timeout = aiohttp.ClientTimeout(total=self._client_timeout) 

419 async with session.post(self.endpoint_url, timeout=timeout, **submission.body()) as resp: 

420 if resp.status in (401, 403): 

421 _LOGGER.warning("remote_logger: OTLP authentication failed (%s), triggering reauth", resp.status) 

422 self._entry.async_start_reauth(self._hass) 

423 return 

424 if resp.status >= 400: 

425 body = await resp.text() 

426 _LOGGER.warning( 

427 "remote_logger: OTLP endpoint returned HTTP %s: %s", 

428 resp.status, 

429 body[:200], 

430 ) 

431 self.on_posting_error(body) 

432 if resp.ok or (resp.status >= 400 and resp.status < 500): 

433 # records were sent, or there was a client-side error 

434 if records: 

435 self.last_sent_payload = submission 

436 self.on_success() 

437 

438 except aiohttp.ClientError as err: 

439 _LOGGER.warning("remote_logger: failed to send logs: %s", err) 

440 self.on_posting_error(str(err)) 

441 except RuntimeError as err: 

442 if "Session is closed" in str(err): 

443 _LOGGER.debug("remote_logger: session closed during flush (shutdown), dropping %d records", len(records or [])) 

444 else: 

445 _LOGGER.exception("remote_logger: unexpected error %s sending logs, skipping records", err) 

446 self.on_posting_error(str(err)) 

447 except Exception as e: 

448 _LOGGER.exception("remote_logger: unexpected error %s sending logs, skipping records", e) 

449 self.on_posting_error(str(e)) 

450 

451 def log_direct(self, event_name: str, message: str, level: str, attributes: dict[str, typing.Any] | None = None) -> None: 

452 """Buffer a custom log record without requiring a HA Event.""" 

453 now = time.time() 

454 time_unix_nano = str(int(now * 1_000_000_000)) 

455 severity_number, severity_text = SEVERITY_MAP.get(level.upper(), DEFAULT_SEVERITY) 

456 attrs: list[dict[str, typing.Any]] = [] 

457 for k, v in (attributes or {}).items(): 

458 append_attr(attrs, k, v) 

459 

460 record = OtlpMessage( 

461 payload={ 

462 "timeUnixNano": time_unix_nano, 

463 "observedTimeUnixNano": time_unix_nano, 

464 "severityNumber": severity_number, 

465 "severityText": severity_text, 

466 "body": {"stringValue": message}, 

467 "attributes": attrs, 

468 "eventName": event_name, 

469 } 

470 ) 

471 self._buffer.append(record) 

472 self.on_event() 

473 if len(self._buffer) >= self._batch_max_size: 

474 self._hass.async_create_task(self.flush())