Coverage for custom_components/remote_logger/otel/exporter.py: 91%

278 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-07 04:46 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import base64 

5import json 

6import logging 

7import time 

8from abc import abstractmethod 

9from dataclasses import dataclass 

10from typing import TYPE_CHECKING, Any, cast 

11 

12import aiohttp 

13from homeassistant.const import CONF_HEADERS, CONF_HOST, CONF_PATH, CONF_PORT, CONF_TOKEN 

14from homeassistant.const import __version__ as hass_version 

15from homeassistant.helpers.aiohttp_client import async_get_clientsession 

16 

17from custom_components.remote_logger.const import ( 

18 CONF_BATCH_MAX_SIZE, 

19 CONF_CLIENT_TIMEOUT, 

20 CONF_ENCODING, 

21 CONF_RESOURCE_ATTRIBUTES, 

22 CONF_USE_TLS, 

23 DEFAULT_CLIENT_TIMEOUT, 

24 EVENT_SYSTEM_LOG, 

25) 

26from custom_components.remote_logger.exporter import LogExporter, LogMessage, LogSubmission 

27from custom_components.remote_logger.helpers import flatten_event_data, isotimestamp 

28 

29from .const import ( 

30 CONF_TOKEN_TYPE, 

31 DEFAULT_RESOURCE_ATTRIBUTES, 

32 DEFAULT_SERVICE_NAME, 

33 DEFAULT_SEVERITY, 

34 ENCODING_JSON, 

35 ENCODING_PROTOBUF, 

36 OTLP_LOGS_PATH, 

37 SCOPE_NAME, 

38 SCOPE_VERSION, 

39 SEVERITY_MAP, 

40 TOKEN_TYPE_API_KEY, 

41 TOKEN_TYPE_BASIC, 

42 TOKEN_TYPE_BEARER, 

43 TOKEN_TYPE_RAW_BASIC, 

44) 

45from .protobuf_encoder import encode_export_logs_request 

46 

47if TYPE_CHECKING: 

48 from homeassistant.config_entries import ConfigEntry 

49 from homeassistant.core import Event, HomeAssistant 

50 

51_LOGGER = logging.getLogger(__name__) 

52 

53 

54def build_auth_header(token: str, token_type: str) -> str: 

55 """Build the Authorization header value for bearer or basic auth.""" 

56 if token_type == TOKEN_TYPE_BASIC: 

57 credentials = base64.b64encode(token.encode()).decode() 

58 return f"Basic {credentials}" 

59 if token_type == TOKEN_TYPE_API_KEY: 

60 return f"ApiKey {token}" 

61 if token_type == TOKEN_TYPE_RAW_BASIC: 

62 return f"Basic {token}" 

63 return f"Bearer {token}" 

64 

65 

66def parse_resource_attributes(raw: str) -> list[tuple[str, str]]: 

67 """Parse 'key1=val1,key2=val2' into a list of (key, value) tuples. 

68 

69 Raises ValueError if the format is invalid. 

70 """ 

71 result = [] 

72 for pair in raw.split(","): 

73 pair = pair.strip() 

74 if not pair: 

75 continue 

76 if "=" not in pair: 

77 raise ValueError(f"Invalid attribute pair: {pair!r}") 

78 key, _, value = pair.partition("=") 

79 key = key.strip() 

80 value = value.strip() 

81 if not key: 

82 raise ValueError("Attribute key cannot be empty") 

83 result.append((key, value)) 

84 return result 

85 

86 

87def parse_headers(raw: str) -> dict[str, str]: 

88 """Parse 'Name: value' lines (newline-separated) into a dict. 

89 

90 Raises ValueError if a line is malformed. 

91 """ 

92 result: dict[str, str] = {} 

93 for line in raw.splitlines(): 

94 line = line.strip() 

95 if not line: 

96 continue 

97 if ":" not in line: 

98 raise ValueError(f"Invalid header line: {line!r}") 

99 name, _, value = line.partition(":") 

100 name = name.strip() 

101 if not name: 

102 raise ValueError("Header name cannot be empty") 

103 result[name] = value.strip() 

104 return result 

105 

106 

107def _mask_auth_headers(headers: dict[str, str]) -> dict[str, str]: 

108 def _mask_credential(v: str) -> str: 

109 parts = v.split(" ", 1) 

110 if len(parts) == 2: 

111 scheme, token = parts 

112 return f"{scheme} {'*' * len(token)}" 

113 return "*" * len(v) 

114 

115 return {k: _mask_credential(v) if k.lower() == "authorization" else v for k, v in headers.items()} 

116 

117 

118def append_attr(attrs: list[dict[str, Any]], key: str, value: Any, force_null: bool = False) -> None: 

119 attr: dict[str, Any] | None = _kv(key, value, force_null=force_null) 

120 if attr is not None: 

121 attrs.append(attr) 

122 

123 

124def _kv(key: str, value: Any, force_null: bool = False) -> dict[str, Any] | None: 

125 """Build an OTLP KeyValue attribute""" 

126 if value is None and not force_null: 

127 return None 

128 if isinstance(value, str): 

129 return {"key": key, "value": {"stringValue": value}} 

130 if isinstance(value, bool): 

131 return {"key": key, "value": {"boolValue": value}} 

132 if isinstance(value, int): 

133 int_val: int | str = value if -(2**31) <= value <= 2**31 - 1 else str(value) 

134 return {"key": key, "value": {"intValue": int_val}} 

135 if isinstance(value, float): 

136 return {"key": key, "value": {"doubleValue": None if value != value else value}} 

137 if isinstance(value, bytes): 

138 return {"key": key, "value": {"bytesValue": value}} 

139 return {"key": key, "value": {"stringValue": str(value)}} 

140 

141 

142async def validate( 

143 session: aiohttp.ClientSession, 

144 url: str, 

145 encoding: str, 

146 extra_headers: dict[str, str] | None = None, 

147) -> dict[str, str]: 

148 # Validate connectivity 

149 errors: dict[str, str] = {} 

150 if encoding == ENCODING_PROTOBUF: 

151 data: bytes = encode_export_logs_request({"resourceLogs": []}) 

152 content_type = "application/x-protobuf" 

153 elif encoding == ENCODING_JSON: 

154 data = json.dumps({"resourceLogs": []}).encode("utf-8") 

155 content_type = "application/json" 

156 else: 

157 raise ValueError(f"Unknown encoding {encoding}") 

158 headers = {"Content-Type": content_type, **(extra_headers or {})} 

159 try: 

160 async with session.post( 

161 url, 

162 data=data, 

163 headers=headers, 

164 timeout=aiohttp.ClientTimeout(total=10), 

165 ) as resp: 

166 if resp.status >= 400 and resp.status < 500: 

167 errors["base"] = "cannot_connect" 

168 _LOGGER.error("OTEL-LOGS client connect failed (%s): %s", resp.status, await resp.text()) 

169 if resp.status >= 500: 

170 errors["base"] = "cannot_connect" 

171 _LOGGER.error("OTEL-LOGS server connect failed (%s): %s", resp.status, await resp.text()) 

172 except aiohttp.ClientResponseError as e1: 

173 errors["base"] = "cannot_connect" 

174 _LOGGER.error("OTEL-LOGS connect client response error: %s", e1) 

175 except aiohttp.ClientError as e2: 

176 errors["base"] = "cannot_connect" 

177 _LOGGER.error("OTEL-LOGS connect client error: %s", e2) 

178 except Exception as e3: 

179 errors["base"] = "unknown" 

180 _LOGGER.error("OTEL-LOGS connect unknown error: %s", e3) 

181 return errors 

182 

183 

184@dataclass 

185class OtlpMessage(LogMessage): 

186 payload: dict[str, Any] 

187 

188 

189class OtlpSubmission(LogSubmission): 

190 def __init__( 

191 self, resource: dict[str, Any], records: list[OtlpMessage], extra_headers: dict[str, Any] | None = None 

192 ) -> None: 

193 self.extra_headers = extra_headers or {} 

194 self.resource: dict[str, Any] = resource 

195 self.request: dict[str, Any] = self._build_export_request(records) 

196 

197 @abstractmethod 

198 def body(self) -> dict[str, Any]: 

199 pass 

200 

201 def _build_export_request(self, records: list[OtlpMessage]) -> dict[str, Any]: 

202 """Wrap logRecords in the ExportLogsServiceRequest envelope.""" 

203 return { 

204 "resourceLogs": [ 

205 { 

206 "resource": self.resource, 

207 "scopeLogs": [ 

208 { 

209 "scope": { 

210 "name": SCOPE_NAME, 

211 "version": SCOPE_VERSION, 

212 }, 

213 "logRecords": [r.payload for r in records], 

214 } 

215 ], 

216 } 

217 ], 

218 } 

219 

220 

221class OtlpJsonSubmission(OtlpSubmission): 

222 def __init__( 

223 self, resource: dict[str, Any], records: list[OtlpMessage], extra_headers: dict[str, Any] | None = None 

224 ) -> None: 

225 super().__init__(resource, records, extra_headers) 

226 

227 def body(self) -> dict[str, Any]: 

228 return {"headers": {"Content-Type": "application/json", **self.extra_headers}, "json": self.request} 

229 

230 def for_display(self) -> dict[str, Any]: 

231 body = self.body() 

232 return {**body, "headers": _mask_auth_headers(body["headers"])} 

233 

234 

235class OtlpProtobufSubmission(OtlpSubmission): 

236 def __init__( 

237 self, resource: dict[str, Any], records: list[OtlpMessage], extra_headers: dict[str, Any] | None = None 

238 ) -> None: 

239 super().__init__(resource, records, extra_headers) 

240 

241 def body(self) -> dict[str, Any]: 

242 return { 

243 "headers": {"Content-Type": "application/x-protobuf", **self.extra_headers}, 

244 "data": encode_export_logs_request(self.request), 

245 } 

246 

247 def for_display(self) -> dict[str, Any]: 

248 base = self.body() 

249 return { 

250 "headers": _mask_auth_headers(base["headers"]), 

251 "data": base["data"].decode("utf-8", errors="replace").replace("\ufffd", "?"), 

252 } 

253 

254 

255class OtlpLogExporter(LogExporter): 

256 """Buffers system_log_event records and flushes them as OTLP/HTTP JSON.""" 

257 

258 logger_type = "otel" 

259 

260 def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None: 

261 super().__init__(hass) 

262 self.name = entry.title 

263 

264 self._lock = asyncio.Lock() 

265 if hass and hass.config and hass.config.api: 

266 self.server_address = hass.config.api.local_ip 

267 self.server_port = hass.config.api.port 

268 else: 

269 self.server_address = None 

270 self.server_port = None 

271 

272 opts = {**entry.data, **entry.options} 

273 

274 host = opts[CONF_HOST] 

275 port = opts[CONF_PORT] 

276 encoding = opts[CONF_ENCODING] 

277 use_tls = opts[CONF_USE_TLS] 

278 scheme = "https" if use_tls else "http" 

279 path = opts.get(CONF_PATH, OTLP_LOGS_PATH) 

280 self.endpoint_url = f"{scheme}://{host}:{port}{path}" 

281 self.destination = (host, str(port), encoding) 

282 self._use_tls = use_tls 

283 self._use_protobuf = encoding == ENCODING_PROTOBUF 

284 self._entry = entry 

285 self._batch_max_size = opts.get(CONF_BATCH_MAX_SIZE, 100) 

286 self._client_timeout = opts.get(CONF_CLIENT_TIMEOUT, DEFAULT_CLIENT_TIMEOUT) 

287 self._extra_headers = self._build_extra_headers(opts) 

288 

289 self._resource = self._build_resource(opts) 

290 

291 _LOGGER.info(f"remote_logger: otel configured for {self.endpoint_url}, protobuf={self._use_protobuf}") 

292 

293 def _build_extra_headers(self, opts: dict[str, Any]) -> dict[str, str]: 

294 headers: dict[str, str] = {} 

295 token = opts.get(CONF_TOKEN, "").strip() 

296 if token: 

297 token_type = opts.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER) 

298 headers["Authorization"] = build_auth_header(token, token_type) 

299 raw_headers = opts.get(CONF_HEADERS, "").strip() 

300 if raw_headers: 

301 headers.update(parse_headers(raw_headers)) 

302 return headers 

303 

304 def _build_resource(self, opts: dict[str, Any]) -> dict[str, Any]: 

305 """Build the OTLP Resource object with attributes.""" 

306 attrs: list[dict[str, Any]] = [] 

307 append_attr(attrs, "service.name", DEFAULT_SERVICE_NAME) 

308 append_attr(attrs, "service.version", hass_version or "unknown") 

309 

310 if self.server_address: 

311 append_attr(attrs, "service.address", self.server_address) 

312 if self.server_port: 

313 append_attr(attrs, "service.port", self.server_port) 

314 

315 raw = opts.get(CONF_RESOURCE_ATTRIBUTES, DEFAULT_RESOURCE_ATTRIBUTES) 

316 if raw and raw.strip(): 

317 for key, value in parse_resource_attributes(raw): 

318 append_attr(attrs, key, value) 

319 

320 return {"attributes": attrs} 

321 

322 def _to_log_record( 

323 self, 

324 event: Event, 

325 message_override: list[str] | None = None, 

326 level_override: str | None = None, 

327 state_only: bool = False, 

328 ) -> OtlpMessage: 

329 """Convert a system_log_event payload to an OTLP logRecord dict.""" 

330 """ HA System Log Event 

331 "name": str 

332 "message": list(str) 

333 "level": str 

334 "source": (str,int) 

335 "timestamp": float 

336 "exception": str 

337 "count": int 

338 "first_occurred": float 

339 """ 

340 data = event.data or {} 

341 timestamp_s: float = data.get("timestamp", time.time()) 

342 time_unix_nano = str(int(timestamp_s * 1_000_000_000)) 

343 observed_timestamp: float = event.time_fired.timestamp() 

344 observed_time_unix_nano = str(int(observed_timestamp * 1_000_000_000)) 

345 

346 level: str = level_override or data.get("level", "INFO").upper() 

347 severity_number, severity_text = SEVERITY_MAP.get(level, DEFAULT_SEVERITY) 

348 

349 messages: list[str] = message_override or data.get("message", []) 

350 message: str = "\n".join(messages) 

351 

352 attributes: list[dict[str, Any]] = [] 

353 

354 if event.event_type == EVENT_SYSTEM_LOG: 

355 source = data.get("source") 

356 if source and isinstance(source, tuple): 

357 source_path, source_lineno = source 

358 append_attr(attributes, "code.file.path", source_path) 

359 append_attr(attributes, "code.line.number", source_lineno) 

360 logger_name = data.get("name") 

361 if data.get("count"): 

362 append_attr(attributes, "exception.count", data["count"]) 

363 if data.get("first_occurred"): 

364 append_attr(attributes, "exception.first_occurred", isotimestamp(data["first_occurred"])) 

365 if logger_name: 

366 append_attr(attributes, "code.function.name", logger_name) 

367 exception = data.get("exception") 

368 if exception: 

369 append_attr(attributes, "exception.stacktrace", exception) 

370 

371 else: 

372 for k, v in data.items(): 

373 for flat_key, flat_val in flatten_event_data(f"event.data.{k}" if k != "event.data" else k, v, state_only): 

374 append_attr(attributes, flat_key, flat_val) 

375 

376 # https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/logs/v1/logs.proto 

377 payload: dict[str, Any] = { 

378 "timeUnixNano": time_unix_nano, 

379 "observedTimeUnixNano": observed_time_unix_nano, 

380 "severityNumber": severity_number, 

381 "severityText": severity_text, 

382 "body": {"stringValue": message}, 

383 "attributes": attributes, 

384 } 

385 if event.event_type != EVENT_SYSTEM_LOG: 

386 payload["eventName"] = event.event_type 

387 return OtlpMessage(payload=payload) 

388 

389 async def flush(self) -> None: 

390 """Flush all buffered log records to the OTLP endpoint.""" 

391 records: list[OtlpMessage] | None = None 

392 async with self._lock: 

393 if not self._buffer: 

394 return 

395 records = cast("list[OtlpMessage]", self._buffer.copy()) 

396 self._buffer.clear() 

397 

398 try: 

399 if records: 

400 if self._use_protobuf: 

401 submission: OtlpSubmission = OtlpProtobufSubmission(self._resource, records, self._extra_headers) 

402 else: 

403 submission = OtlpJsonSubmission(self._resource, records, self._extra_headers) 

404 else: 

405 return 

406 session: aiohttp.ClientSession = async_get_clientsession(self._hass, verify_ssl=self._use_tls) 

407 timeout = aiohttp.ClientTimeout(total=self._client_timeout) 

408 async with session.post(self.endpoint_url, timeout=timeout, **submission.body()) as resp: 

409 if resp.status in (401, 403): 

410 _LOGGER.warning("remote_logger: OTLP authentication failed (%s), triggering reauth", resp.status) 

411 self._entry.async_start_reauth(self._hass) 

412 return 

413 if resp.status >= 400: 

414 body = await resp.text() 

415 _LOGGER.warning( 

416 "remote_logger: OTLP endpoint returned HTTP %s: %s", 

417 resp.status, 

418 body[:200], 

419 ) 

420 self.on_posting_error(body) 

421 if resp.ok or (resp.status >= 400 and resp.status < 500): 

422 # records were sent, or there was a client-side error 

423 if records: 

424 self.last_sent_payload = submission 

425 self.on_success() 

426 

427 except aiohttp.ClientError as err: 

428 _LOGGER.warning("remote_logger: failed to send logs: %s", err) 

429 self.on_posting_error(str(err)) 

430 except RuntimeError as err: 

431 if "Session is closed" in str(err): 

432 _LOGGER.debug("remote_logger: session closed during flush (shutdown), dropping %d records", len(records or [])) 

433 else: 

434 _LOGGER.exception("remote_logger: unexpected error %s sending logs, skipping records", err) 

435 self.on_posting_error(str(err)) 

436 except Exception as e: 

437 _LOGGER.exception("remote_logger: unexpected error %s sending logs, skipping records", e) 

438 self.on_posting_error(str(e)) 

439 

440 def log_direct(self, event_name: str, message: str, level: str, attributes: dict[str, Any] | None = None) -> None: 

441 """Buffer a custom log record without requiring a HA Event.""" 

442 now = time.time() 

443 time_unix_nano = str(int(now * 1_000_000_000)) 

444 severity_number, severity_text = SEVERITY_MAP.get(level.upper(), DEFAULT_SEVERITY) 

445 attrs: list[dict[str, Any]] = [] 

446 for k, v in (attributes or {}).items(): 

447 append_attr(attrs, k, v) 

448 

449 record = OtlpMessage( 

450 payload={ 

451 "timeUnixNano": time_unix_nano, 

452 "observedTimeUnixNano": time_unix_nano, 

453 "severityNumber": severity_number, 

454 "severityText": severity_text, 

455 "body": {"stringValue": message}, 

456 "attributes": attrs, 

457 "eventName": event_name, 

458 } 

459 ) 

460 self._buffer.append(record) 

461 self.on_event() 

462 if len(self._buffer) >= self._batch_max_size: 

463 self._hass.async_create_task(self.flush())