Coverage for custom_components / remote_logger / config_flow.py: 80%

196 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-22 21:55 +0000

1"""Config flow for the remote_logger integration.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import re 

7from typing import Any 

8 

9import voluptuous as vol 

10from homeassistant.config_entries import ConfigEntry, ConfigFlow, ConfigFlowResult, OptionsFlow 

11from homeassistant.const import CONF_HEADERS, CONF_HOST, CONF_PATH, CONF_PORT, CONF_PROTOCOL, CONF_TOKEN 

12from homeassistant.core import callback 

13from homeassistant.data_entry_flow import section 

14from homeassistant.helpers import selector 

15from homeassistant.helpers.aiohttp_client import async_get_clientsession 

16 

17from .const import ( 

18 BACKEND_OTEL, 

19 BACKEND_SYSLOG, 

20 CONF_BACKEND, 

21 CONF_CUSTOM_EVENTS, 

22 CONF_ENCODING, 

23 CONF_EVENT_BASED_LOGGING, 

24 CONF_LOG_HA_CORE_ACTIVITY, 

25 CONF_LOG_HA_CORE_CHANGES, 

26 CONF_LOG_HA_EVENT_BODY, 

27 CONF_LOG_HA_FULL_STATE_CHANGES, 

28 CONF_LOG_HA_LIFECYCLE, 

29 CONF_LOG_HA_STATE_CHANGES, 

30 CONF_LOG_LEVEL, 

31 CONF_RESOURCE_ATTRIBUTES, 

32 CONF_SUPPRESS_SYSTEM_LOG_EVENT_NAME, 

33 CONF_USE_TLS, 

34 DEFAULT_LOG_LEVEL, 

35 DOMAIN, 

36) 

37from .otel.const import CONF_TOKEN_TYPE, OTEL_DATA_SCHEMA, OTLP_LOGS_PATH, REAUTH_OTEL_DATA_SCHEMA, TOKEN_TYPE_BEARER 

38from .otel.exporter import build_auth_header, parse_headers, parse_resource_attributes 

39from .otel.exporter import validate as otel_validate 

40from .syslog.const import SYSLOG_DATA_SCHEMA 

41from .syslog.exporter import validate as syslog_validate 

42 

43_LOGGER = logging.getLogger(__name__) 

44 

45 

46def _to_list(value: Any) -> list[str]: 

47 """Coerce a stored string (comma/newline-separated) or list to a list of strings.""" 

48 if isinstance(value, list): 

49 return value 

50 if not value: 

51 return [] 

52 return [v.strip() for v in re.split(r"[\n,]+", str(value)) if v.strip()] 

53 

54 

55COMMON_DATA_SCHEMA = vol.Schema({ 

56 vol.Optional(CONF_EVENT_BASED_LOGGING, default=False): selector.BooleanSelector(), 

57 vol.Optional(CONF_LOG_LEVEL, default=DEFAULT_LOG_LEVEL): selector.SelectSelector( 

58 selector.SelectSelectorConfig( 

59 options=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], 

60 mode=selector.SelectSelectorMode.DROPDOWN, 

61 ) 

62 ), 

63 vol.Required("ha_standard_events"): section( 

64 vol.Schema({ 

65 vol.Optional(CONF_LOG_HA_LIFECYCLE, default=False): selector.BooleanSelector(), 

66 vol.Optional(CONF_LOG_HA_CORE_CHANGES, default=False): selector.BooleanSelector(), 

67 vol.Optional(CONF_LOG_HA_CORE_ACTIVITY, default=False): selector.BooleanSelector(), 

68 vol.Optional(CONF_LOG_HA_STATE_CHANGES, default=False): selector.BooleanSelector(), 

69 vol.Optional(CONF_LOG_HA_FULL_STATE_CHANGES, default=False): selector.BooleanSelector(), 

70 }), 

71 {"collapsed": False}, 

72 ), 

73 vol.Optional(CONF_LOG_HA_EVENT_BODY, default=True): selector.BooleanSelector(), 

74 vol.Optional(CONF_SUPPRESS_SYSTEM_LOG_EVENT_NAME, default=True): selector.BooleanSelector(), 

75 vol.Optional(CONF_CUSTOM_EVENTS, default=[]): selector.TextSelector(selector.TextSelectorConfig(multiple=True)), 

76}) 

77 

78 

79def _build_endpoint_url(host: str, port: int, use_tls: bool, path: str = OTLP_LOGS_PATH) -> str: 

80 """Build the full OTLP endpoint URL.""" 

81 scheme = "https" if use_tls else "http" 

82 return f"{scheme}://{host}:{port}{path}" 

83 

84 

85class OtelLogsConfigFlow(ConfigFlow, domain=DOMAIN): 

86 """Handle a config flow for OpenTelemetry Log Exporter.""" 

87 

88 VERSION = 2 

89 

90 def __init__(self) -> None: 

91 super().__init__() 

92 self._pending_data: dict[str, Any] = {} 

93 

94 @staticmethod 

95 @callback 

96 def async_get_options_flow(config_entry: ConfigEntry) -> OptionsFlow: 

97 """Return the options flow handler.""" 

98 return RemoteLoggerOptionsFlow(config_entry) 

99 

100 async def async_step_user( 

101 self, 

102 user_input: dict[str, Any] | None = None, # noqa: ARG002 

103 ) -> ConfigFlowResult: 

104 """Show menu to choose backend type.""" 

105 return self.async_show_menu( 

106 step_id="user", 

107 menu_options=["otel", "syslog"], 

108 ) 

109 

110 async def async_step_otel(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult: 

111 """Handle OpenTelemetry OTLP configuration.""" 

112 errors: dict[str, str] = {} 

113 

114 if user_input is not None: 

115 host = user_input[CONF_HOST] 

116 port = user_input[CONF_PORT] 

117 use_tls = user_input[CONF_USE_TLS] 

118 url = _build_endpoint_url(host, port, use_tls, user_input.get(CONF_PATH, OTLP_LOGS_PATH)) 

119 

120 # Validate header fields format before connecting 

121 extra_headers: dict[str, str] = {} 

122 token = user_input.get(CONF_TOKEN, "").strip() 

123 if token: 

124 if not use_tls: 

125 _LOGGER.warning("remote_logger: token configured without TLS; token will be sent in plain text") 

126 token_type = user_input.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER) 

127 extra_headers["Authorization"] = build_auth_header(token, token_type) 

128 raw_headers = "\n".join(user_input.get(CONF_HEADERS, [])) 

129 if raw_headers: 

130 try: 

131 extra_headers.update(parse_headers(raw_headers)) 

132 except ValueError: 

133 errors[CONF_HEADERS] = "invalid_headers" 

134 

135 # Validate connectivity 

136 if not errors: 

137 session = async_get_clientsession(self.hass, verify_ssl=use_tls) 

138 errors = await otel_validate(session, url, user_input[CONF_ENCODING], extra_headers or None) 

139 # Validate resource attributes format 

140 if not errors: 

141 raw_attrs = user_input.get(CONF_RESOURCE_ATTRIBUTES, "") 

142 if raw_attrs.strip(): 

143 try: 

144 parse_resource_attributes(raw_attrs) 

145 except ValueError: 

146 errors[CONF_RESOURCE_ATTRIBUTES] = "invalid_attributes" 

147 

148 if not errors: 

149 await self.async_set_unique_id(f"{DOMAIN}_{BACKEND_OTEL}_{host}_{port}") 

150 self._abort_if_unique_id_configured() 

151 self._pending_data = {**user_input, CONF_BACKEND: BACKEND_OTEL} 

152 self._pending_data["_title"] = f"OTLP @ {host}:{port}" 

153 return await self.async_step_common() 

154 

155 return self.async_show_form( 

156 step_id="otel", 

157 data_schema=self.add_suggested_values_to_schema(OTEL_DATA_SCHEMA, user_input or {}), 

158 errors=errors, 

159 ) 

160 

161 async def async_step_reauth(self, entry_data: dict[str, Any]) -> ConfigFlowResult: # noqa: ARG002 

162 """Initiate reauth after authentication failure.""" 

163 return await self.async_step_reauth_otel() 

164 

165 async def async_step_reauth_otel(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult: 

166 """Handle re-entry of the bearer token after an authentication failure.""" 

167 reauth_entry = self._get_reauth_entry() 

168 errors: dict[str, str] = {} 

169 

170 if user_input is not None: 

171 token = user_input.get(CONF_TOKEN, "").strip() 

172 extra_headers: dict[str, str] = {} 

173 if token: 

174 token_type = user_input.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER) 

175 extra_headers["Authorization"] = build_auth_header(token, token_type) 

176 raw_headers = "\n".join(reauth_entry.data.get(CONF_HEADERS, [])) 

177 if raw_headers: 

178 extra_headers.update(parse_headers(raw_headers)) 

179 

180 url = _build_endpoint_url( 

181 reauth_entry.data[CONF_HOST], 

182 reauth_entry.data[CONF_PORT], 

183 reauth_entry.data[CONF_USE_TLS], 

184 reauth_entry.data.get(CONF_PATH, OTLP_LOGS_PATH), 

185 ) 

186 session = async_get_clientsession(self.hass, verify_ssl=reauth_entry.data[CONF_USE_TLS]) 

187 errors = await otel_validate(session, url, reauth_entry.data[CONF_ENCODING], extra_headers or None) 

188 if not errors: 

189 return self.async_update_reload_and_abort( 

190 reauth_entry, 

191 data_updates={ 

192 CONF_TOKEN: user_input[CONF_TOKEN], 

193 CONF_TOKEN_TYPE: user_input.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER), 

194 }, 

195 ) 

196 

197 return self.async_show_form( 

198 step_id="reauth_otel", 

199 data_schema=self.add_suggested_values_to_schema(REAUTH_OTEL_DATA_SCHEMA, user_input or {}), 

200 errors=errors, 

201 ) 

202 

203 async def async_step_syslog(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult: 

204 """Handle Syslog RFC 5424 configuration.""" 

205 errors: dict[str, str] = {} 

206 

207 if user_input is not None: 

208 host = user_input[CONF_HOST] 

209 port = user_input[CONF_PORT] 

210 protocol = user_input[CONF_PROTOCOL] 

211 use_tls = user_input.get(CONF_USE_TLS, False) 

212 

213 # Validate connectivity 

214 error = await syslog_validate(self.hass, host, port, protocol, use_tls) 

215 if error: 

216 errors["base"] = error 

217 

218 if not errors: 

219 await self.async_set_unique_id(f"{DOMAIN}_{BACKEND_SYSLOG}") 

220 self._abort_if_unique_id_configured() 

221 self._pending_data = {**user_input, CONF_BACKEND: BACKEND_SYSLOG} 

222 self._pending_data["_title"] = f"Syslog @ {host}:{port} ({protocol.upper()})" 

223 return await self.async_step_common() 

224 

225 return self.async_show_form( 

226 step_id="syslog", 

227 data_schema=self.add_suggested_values_to_schema(SYSLOG_DATA_SCHEMA, user_input or {}), 

228 errors=errors, 

229 ) 

230 

231 async def async_step_common(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult: 

232 """Configure common event subscription options.""" 

233 errors: dict[str, str] = {} 

234 if user_input is not None: 

235 flat: dict[str, Any] = {k: v for k, v in user_input.items() if not isinstance(v, dict)} 

236 for v in user_input.values(): 

237 if isinstance(v, dict): 

238 flat.update(v) 

239 if flat.get(CONF_LOG_HA_STATE_CHANGES) and flat.get(CONF_LOG_HA_FULL_STATE_CHANGES): 

240 errors[CONF_LOG_HA_FULL_STATE_CHANGES] = "state_changes_exclusive" 

241 else: 

242 title = self._pending_data.pop("_title") 

243 return self.async_create_entry( 

244 title=title, 

245 data={**self._pending_data, **flat}, 

246 ) 

247 

248 return self.async_show_form( 

249 step_id="common", 

250 data_schema=self.add_suggested_values_to_schema(COMMON_DATA_SCHEMA, user_input or {}), 

251 errors=errors, 

252 description_placeholders={ 

253 "learn_more": "[Learn about HA events](https://www.home-assistant.io/docs/configuration/events/)" 

254 }, 

255 ) 

256 

257 

258class RemoteLoggerOptionsFlow(OptionsFlow): 

259 """Allow editing connection details and event subscriptions after setup.""" 

260 

261 def __init__(self, config_entry: ConfigEntry) -> None: 

262 self._config_entry = config_entry 

263 self._pending_options: dict[str, Any] = {} 

264 

265 async def async_step_init(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult: # noqa: ARG002 

266 """Dispatch to the backend-specific connection form.""" 

267 backend = self._config_entry.data.get(CONF_BACKEND, BACKEND_OTEL) 

268 if backend == BACKEND_SYSLOG: 

269 return await self.async_step_syslog() 

270 return await self.async_step_otel() 

271 

272 async def async_step_otel(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult: 

273 """Handle OTLP connection settings.""" 

274 errors: dict[str, str] = {} 

275 merged = {**self._config_entry.data, **self._config_entry.options} 

276 

277 if user_input is not None: 

278 host = user_input[CONF_HOST] 

279 port = user_input[CONF_PORT] 

280 use_tls = user_input[CONF_USE_TLS] 

281 url = _build_endpoint_url(host, port, use_tls, user_input.get(CONF_PATH, OTLP_LOGS_PATH)) 

282 

283 extra_headers: dict[str, str] = {} 

284 token = user_input.get(CONF_TOKEN, "").strip() 

285 if token: 

286 token_type = user_input.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER) 

287 extra_headers["Authorization"] = build_auth_header(token, token_type) 

288 raw_headers = "\n".join(user_input.get(CONF_HEADERS, [])) 

289 if raw_headers: 

290 try: 

291 extra_headers.update(parse_headers(raw_headers)) 

292 except ValueError: 

293 errors[CONF_HEADERS] = "invalid_headers" 

294 

295 if not errors: 

296 session = async_get_clientsession(self.hass, verify_ssl=use_tls) 

297 errors = await otel_validate(session, url, user_input[CONF_ENCODING], extra_headers or None) 

298 

299 if not errors: 

300 raw_attrs = user_input.get(CONF_RESOURCE_ATTRIBUTES, "") 

301 if raw_attrs.strip(): 

302 try: 

303 parse_resource_attributes(raw_attrs) 

304 except ValueError: 

305 errors[CONF_RESOURCE_ATTRIBUTES] = "invalid_attributes" 

306 

307 if not errors: 

308 self._pending_options = user_input 

309 return await self.async_step_events() 

310 

311 suggested = user_input or {**merged, CONF_HEADERS: _to_list(merged.get(CONF_HEADERS, []))} 

312 return self.async_show_form( 

313 step_id="otel", 

314 data_schema=self.add_suggested_values_to_schema(OTEL_DATA_SCHEMA, suggested), 

315 errors=errors, 

316 ) 

317 

318 async def async_step_syslog(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult: 

319 """Handle Syslog connection settings.""" 

320 errors: dict[str, str] = {} 

321 merged = {**self._config_entry.data, **self._config_entry.options} 

322 

323 if user_input is not None: 

324 host = user_input[CONF_HOST] 

325 port = user_input[CONF_PORT] 

326 protocol = user_input[CONF_PROTOCOL] 

327 use_tls = user_input.get(CONF_USE_TLS, False) 

328 

329 error = await syslog_validate(self.hass, host, port, protocol, use_tls) 

330 if error: 

331 errors["base"] = error 

332 

333 if not errors: 

334 self._pending_options = user_input 

335 return await self.async_step_events() 

336 

337 return self.async_show_form( 

338 step_id="syslog", 

339 data_schema=self.add_suggested_values_to_schema(SYSLOG_DATA_SCHEMA, user_input or merged), 

340 errors=errors, 

341 ) 

342 

343 async def async_step_events(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult: 

344 """Handle event subscription options.""" 

345 errors: dict[str, str] = {} 

346 if user_input is not None: 

347 flat: dict[str, Any] = {k: v for k, v in user_input.items() if not isinstance(v, dict)} 

348 for v in user_input.values(): 

349 if isinstance(v, dict): 

350 flat.update(v) 

351 if flat.get(CONF_LOG_HA_STATE_CHANGES) and flat.get(CONF_LOG_HA_FULL_STATE_CHANGES): 

352 errors[CONF_LOG_HA_FULL_STATE_CHANGES] = "state_changes_exclusive" 

353 else: 

354 return self.async_create_entry(title="", data={**self._pending_options, **flat}) 

355 

356 merged = {**self._config_entry.data, **self._config_entry.options} 

357 current = { 

358 CONF_EVENT_BASED_LOGGING: merged.get(CONF_EVENT_BASED_LOGGING, False), 

359 CONF_LOG_LEVEL: merged.get(CONF_LOG_LEVEL, DEFAULT_LOG_LEVEL), 

360 "ha_standard_events": { 

361 CONF_LOG_HA_LIFECYCLE: merged.get(CONF_LOG_HA_LIFECYCLE, False), 

362 CONF_LOG_HA_CORE_CHANGES: merged.get(CONF_LOG_HA_CORE_CHANGES, False), 

363 CONF_LOG_HA_CORE_ACTIVITY: merged.get(CONF_LOG_HA_CORE_ACTIVITY, False), 

364 CONF_LOG_HA_STATE_CHANGES: merged.get(CONF_LOG_HA_STATE_CHANGES, False), 

365 CONF_LOG_HA_FULL_STATE_CHANGES: merged.get(CONF_LOG_HA_FULL_STATE_CHANGES, False), 

366 }, 

367 CONF_LOG_HA_EVENT_BODY: merged.get(CONF_LOG_HA_EVENT_BODY, False), 

368 CONF_SUPPRESS_SYSTEM_LOG_EVENT_NAME: merged.get(CONF_SUPPRESS_SYSTEM_LOG_EVENT_NAME, True), 

369 CONF_CUSTOM_EVENTS: _to_list(merged.get(CONF_CUSTOM_EVENTS, [])), 

370 } 

371 return self.async_show_form( 

372 step_id="events", 

373 data_schema=self.add_suggested_values_to_schema(COMMON_DATA_SCHEMA, current), 

374 errors=errors, 

375 description_placeholders={ 

376 "learn_more": "[Learn about HA events](https://www.home-assistant.io/docs/configuration/events/)" 

377 }, 

378 )