Coverage for custom_components/remote_logger/otel/exporter.py: 96%

179 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-02-18 22:41 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import json 

5import logging 

6import time 

7from typing import TYPE_CHECKING, Any 

8 

9import aiohttp 

10from homeassistant.const import __version__ as hass_version 

11from homeassistant.core import Event, HomeAssistant, callback 

12from homeassistant.helpers.aiohttp_client import async_get_clientsession 

13 

14from custom_components.remote_logger.const import ( 

15 BATCH_FLUSH_INTERVAL_SECONDS, 

16 CONF_BATCH_MAX_SIZE, 

17 CONF_ENCODING, 

18 CONF_HOST, 

19 CONF_PORT, 

20 CONF_RESOURCE_ATTRIBUTES, 

21 CONF_USE_TLS, 

22) 

23 

24from .const import ( 

25 DEFAULT_RESOURCE_ATTRIBUTES, 

26 DEFAULT_SERVICE_NAME, 

27 DEFAULT_SEVERITY, 

28 ENCODING_JSON, 

29 ENCODING_PROTOBUF, 

30 OTLP_LOGS_PATH, 

31 SCOPE_NAME, 

32 SCOPE_VERSION, 

33 SEVERITY_MAP, 

34) 

35from .protobuf_encoder import encode_export_logs_request 

36 

37if TYPE_CHECKING: 

38 from homeassistant.config_entries import ConfigEntry 

39 

40_LOGGER = logging.getLogger(__name__) 

41 

42 

43def parse_resource_attributes(raw: str) -> list[tuple[str, str]]: 

44 """Parse 'key1=val1,key2=val2' into a list of (key, value) tuples. 

45 

46 Raises ValueError if the format is invalid. 

47 """ 

48 result = [] 

49 for pair in raw.split(","): 

50 pair = pair.strip() 

51 if not pair: 

52 continue 

53 if "=" not in pair: 

54 raise ValueError(f"Invalid attribute pair: {pair!r}") 

55 key, _, value = pair.partition("=") 

56 key = key.strip() 

57 value = value.strip() 

58 if not key: 

59 raise ValueError("Attribute key cannot be empty") 

60 result.append((key, value)) 

61 return result 

62 

63 

64def _kv(key: str, value: Any) -> dict[str, Any]: 

65 """Build an OTLP KeyValue attribute""" 

66 if isinstance(value, str): 

67 return {"key": key, "value": {"string_value": value}} 

68 if isinstance(value, bool): 

69 return {"key": key, "value": {"bool_value": value}} 

70 if isinstance(value, int): 

71 return {"key": key, "value": {"int_value": value}} 

72 if isinstance(value, float): 

73 return {"key": key, "value": {"float_value": value}} 

74 if isinstance(value, bytes): 

75 return {"key": key, "value": {"byte_value": value}} 

76 return {"key": key, "value": {"string_value": str(value)}} 

77 

78 

79async def validate(session: aiohttp.ClientSession, url: str, encoding: str) -> dict[str, str]: 

80 # Validate connectivity 

81 errors: dict[str, str] = {} 

82 if encoding == ENCODING_PROTOBUF: 

83 data: bytes = encode_export_logs_request({"resourceLogs": []}) 

84 content_type = "application/x-protobuf" 

85 elif encoding == ENCODING_JSON: 

86 data = json.dumps({"resourceLogs": []}).encode("utf-8") 

87 content_type = "application/json" 

88 else: 

89 raise ValueError(f"Unknown encoding {encoding}") 

90 try: 

91 async with session.post( 

92 url, 

93 data=data, 

94 headers={"Content-Type": content_type}, 

95 timeout=aiohttp.ClientTimeout(total=10), 

96 ) as resp: 

97 if resp.status >= 400 and resp.status < 500: 

98 errors["base"] = "cannot_connect" 

99 _LOGGER.error("OTEL-LOGS client connect failed (%s): %s", resp.status, await resp.text()) 

100 if resp.status >= 500: 

101 errors["base"] = "cannot_connect" 

102 _LOGGER.error("OTEL-LOGS server connect failed (%s): %s", resp.status, await resp.text()) 

103 except aiohttp.ClientError as e1: 

104 errors["base"] = "cannot_connect" 

105 _LOGGER.error("OTEL-LOGS connect client error: %s", e1) 

106 except Exception as e2: 

107 errors["base"] = "unknown" 

108 _LOGGER.error("OTEL-LOGS connect unknown error: %s", e2) 

109 return errors 

110 

111 

112class OtlpLogExporter: 

113 """Buffers system_log_event records and flushes them as OTLP/HTTP JSON.""" 

114 

115 def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None: 

116 self._hass = hass 

117 

118 self._buffer: list[dict[str, Any]] = [] 

119 self._in_progress: dict[str, Any] | None = None 

120 self._lock = asyncio.Lock() 

121 if hass and hass.config and hass.config.api: 

122 self.server_address = hass.config.api.local_ip 

123 self.server_port = hass.config.api.port 

124 else: 

125 self.server_address = None 

126 self.server_port = None 

127 

128 host = entry.data[CONF_HOST] 

129 port = entry.data[CONF_PORT] 

130 use_tls = entry.data[CONF_USE_TLS] 

131 scheme = "https" if use_tls else "http" 

132 self.endpoint_url = f"{scheme}://{host}:{port}{OTLP_LOGS_PATH}" 

133 self._use_tls = use_tls 

134 self._use_protobuf = entry.data.get(CONF_ENCODING) == ENCODING_PROTOBUF 

135 self._batch_max_size = entry.data.get(CONF_BATCH_MAX_SIZE, 100) 

136 

137 self._resource = self._build_resource(entry) 

138 

139 _LOGGER.info(f"remote_logger: otel configured for {self.endpoint_url}, protobuf={self._use_protobuf}") 

140 

141 def _build_resource(self, entry: ConfigEntry) -> dict[str, Any]: 

142 """Build the OTLP Resource object with attributes.""" 

143 attrs: list[dict[str, Any]] = [ 

144 _kv("service.name", DEFAULT_SERVICE_NAME), 

145 _kv("service.version", hass_version or "unknown"), 

146 ] 

147 if self.server_address: 

148 attrs.append(_kv("service.address", self.server_address)) 

149 if self.server_port: 

150 attrs.append(_kv("service.port", self.server_port)) 

151 

152 raw = entry.data.get(CONF_RESOURCE_ATTRIBUTES, DEFAULT_RESOURCE_ATTRIBUTES) 

153 if raw and raw.strip(): 

154 for key, value in parse_resource_attributes(raw): 

155 attrs.append(_kv(key, value)) 

156 

157 return {"attributes": attrs} 

158 

159 @callback 

160 def handle_event(self, event: Event) -> None: 

161 """Receive a system_log_event and buffer an OTLP logRecord.""" 

162 if ( 

163 event.data 

164 and event.data.get("source") 

165 and len(event.data["source"]) == 2 

166 and "custom_components/remote_logger/otel" in event.data["source"][0] 

167 ): 

168 # prevent log loops 

169 return 

170 try: 

171 record = self._to_log_record(event.data) 

172 self._buffer.append(record) 

173 

174 if len(self._buffer) >= self._batch_max_size: 

175 self._hass.async_create_task(self.flush()) 

176 except Exception as e: 

177 _LOGGER.error("remote_logger: otel handler failure %s on %s", e, event.data) 

178 

179 def _to_log_record(self, data: Any) -> dict[str, Any]: 

180 """Convert a system_log_event payload to an OTLP logRecord dict.""" 

181 """ 

182 "name": str 

183 "message": list(str) 

184 "level": str 

185 "source": (str,int) 

186 "timestamp": float 

187 "exception": str 

188 "count": int 

189 "first_occurred": float 

190 """ 

191 timestamp_s: float = data.get("timestamp", time.time()) 

192 time_unix_nano = str(int(timestamp_s * 1_000_000_000)) 

193 observed_time_unix_nano = str(int(time.time() * 1_000_000_000)) 

194 

195 level: str = data.get("level", "INFO").upper() 

196 severity_number, severity_text = SEVERITY_MAP.get(level, DEFAULT_SEVERITY) 

197 

198 messages: list[str] = data.get("message", []) 

199 message: str = "\n".join(messages) 

200 

201 attributes: list[dict[str, Any]] = [] 

202 source = data.get("source") 

203 if source and isinstance(source, tuple): 

204 source_path, source_lineno = source 

205 attributes.append(_kv("code.file.path", source_path)) 

206 attributes.append(_kv("code.line.number", source_lineno)) 

207 logger_name = data.get("name") 

208 if data.get("count"): 

209 attributes.append(_kv("exception.count", data["count"])) 

210 if data.get("first_occurred"): 

211 attributes.append(_kv("exception.first_occurred", data["first_occurred"])) 

212 if logger_name: 

213 attributes.append(_kv("code.function.name", logger_name)) 

214 exception = data.get("exception") 

215 if exception: 

216 attributes.append(_kv("exception.stacktrace", exception)) 

217 

218 return { 

219 "timeUnixNano": time_unix_nano, 

220 "observedTimeUnixNano": observed_time_unix_nano, 

221 "severityNumber": severity_number, 

222 "severityText": severity_text, 

223 "body": {"string_value": message}, 

224 "attributes": attributes, 

225 } 

226 

227 async def flush_loop(self) -> None: 

228 """Periodically flush buffered log records.""" 

229 try: 

230 while True: 

231 await asyncio.sleep(BATCH_FLUSH_INTERVAL_SECONDS) 

232 await self.flush() 

233 except asyncio.CancelledError: 

234 raise 

235 

236 def generate_submission(self, records: list[dict[str, Any]]) -> dict[str, Any]: 

237 result: dict[str, str | bytes | dict[str, Any]] = {"headers": {}} 

238 request = self._build_export_request(records) 

239 

240 if self._use_protobuf: 

241 result["data"] = encode_export_logs_request(request) 

242 result["headers"]["Content-Type"] = "application/x-protobuf" # type: ignore 

243 else: 

244 result["json"] = request 

245 result["headers"]["Content-Type"] = "application/json" # type: ignore 

246 return result 

247 

248 async def flush(self) -> None: 

249 """Flush all buffered log records to the OTLP endpoint.""" 

250 records: list[dict[str, Any]] | None = None 

251 async with self._lock: 

252 if not self._in_progress: 

253 if not self._buffer: 

254 return 

255 records = self._buffer.copy() 

256 self._buffer.clear() 

257 

258 try: 

259 if records and not self._in_progress: 

260 msg: dict[str, Any] = self.generate_submission(records) 

261 elif self._in_progress: 

262 msg = self._in_progress 

263 else: 

264 return 

265 session: aiohttp.ClientSession = async_get_clientsession(self._hass, verify_ssl=self._use_tls) 

266 async with session.post(self.endpoint_url, timeout=aiohttp.ClientTimeout(total=10), **msg) as resp: 

267 if resp.status >= 400: 

268 body = await resp.text() 

269 _LOGGER.warning( 

270 "remote_logger: OTLP endpoint returned HTTP %s: %s", 

271 resp.status, 

272 body[:200], 

273 ) 

274 if resp.ok or (resp.status >= 400 and resp.status < 500): 

275 # records were sent, or there was a client-side error 

276 self._in_progress = None 

277 

278 except aiohttp.ClientError as err: 

279 _LOGGER.warning("remote_logger: failed to send logs: %s", err) 

280 except Exception: 

281 _LOGGER.exception("remote_logger: unexpected error sending logs, skipping records") 

282 self._in_progress = None 

283 

284 async def close(self) -> None: 

285 """Clean up resources (no-op for HTTP-based exporter).""" 

286 

287 def _build_export_request(self, records: list[dict[str, Any]]) -> dict[str, Any]: 

288 """Wrap logRecords in the ExportLogsServiceRequest envelope.""" 

289 return { 

290 "resourceLogs": [ 

291 { 

292 "resource": self._resource, 

293 "scopeLogs": [ 

294 { 

295 "scope": { 

296 "name": SCOPE_NAME, 

297 "version": SCOPE_VERSION, 

298 }, 

299 "logRecords": records, 

300 } 

301 ], 

302 } 

303 ], 

304 }