Coverage for custom_components/remote_logger/config_flow.py: 80%

179 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-07 04:46 +0000

1"""Config flow for the remote_logger integration.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6from typing import Any 

7 

8import voluptuous as vol 

9from homeassistant.config_entries import ConfigEntry, ConfigFlow, ConfigFlowResult, OptionsFlow 

10from homeassistant.const import CONF_HEADERS, CONF_HOST, CONF_PATH, CONF_PORT, CONF_PROTOCOL, CONF_TOKEN 

11from homeassistant.core import callback 

12from homeassistant.helpers import selector 

13from homeassistant.helpers.aiohttp_client import async_get_clientsession 

14 

15from .const import ( 

16 BACKEND_OTEL, 

17 BACKEND_SYSLOG, 

18 CONF_BACKEND, 

19 CONF_CUSTOM_EVENTS, 

20 CONF_ENCODING, 

21 CONF_LOG_HA_CORE_ACTIVITY, 

22 CONF_LOG_HA_CORE_CHANGES, 

23 CONF_LOG_HA_EVENT_BODY, 

24 CONF_LOG_HA_FULL_STATE_CHANGES, 

25 CONF_LOG_HA_LIFECYCLE, 

26 CONF_LOG_HA_STATE_CHANGES, 

27 CONF_RESOURCE_ATTRIBUTES, 

28 CONF_USE_TLS, 

29 DOMAIN, 

30) 

31from .otel.const import CONF_TOKEN_TYPE, OTEL_DATA_SCHEMA, OTLP_LOGS_PATH, REAUTH_OTEL_DATA_SCHEMA, TOKEN_TYPE_BEARER 

32from .otel.exporter import build_auth_header, parse_headers, parse_resource_attributes 

33from .otel.exporter import validate as otel_validate 

34from .syslog.const import SYSLOG_DATA_SCHEMA 

35from .syslog.exporter import validate as syslog_validate 

36 

37_LOGGER = logging.getLogger(__name__) 

38 

39COMMON_DATA_SCHEMA = vol.Schema({ 

40 vol.Optional(CONF_LOG_HA_LIFECYCLE, default=False): selector.BooleanSelector(), 

41 vol.Optional(CONF_LOG_HA_CORE_CHANGES, default=False): selector.BooleanSelector(), 

42 vol.Optional(CONF_LOG_HA_CORE_ACTIVITY, default=False): selector.BooleanSelector(), 

43 vol.Optional(CONF_LOG_HA_STATE_CHANGES, default=False): selector.BooleanSelector(), 

44 vol.Optional(CONF_LOG_HA_FULL_STATE_CHANGES, default=False): selector.BooleanSelector(), 

45 vol.Optional(CONF_LOG_HA_EVENT_BODY, default=True): selector.BooleanSelector(), 

46 vol.Optional(CONF_CUSTOM_EVENTS, default=""): selector.TextSelector(selector.TextSelectorConfig(multiline=True)), 

47}) 

48 

49 

50def _build_endpoint_url(host: str, port: int, use_tls: bool, path: str = OTLP_LOGS_PATH) -> str: 

51 """Build the full OTLP endpoint URL.""" 

52 scheme = "https" if use_tls else "http" 

53 return f"{scheme}://{host}:{port}{path}" 

54 

55 

56class OtelLogsConfigFlow(ConfigFlow, domain=DOMAIN): 

57 """Handle a config flow for OpenTelemetry Log Exporter.""" 

58 

59 VERSION = 2 

60 

61 def __init__(self) -> None: 

62 super().__init__() 

63 self._pending_data: dict[str, Any] = {} 

64 

65 @staticmethod 

66 @callback 

67 def async_get_options_flow(config_entry: ConfigEntry) -> OptionsFlow: 

68 """Return the options flow handler.""" 

69 return RemoteLoggerOptionsFlow(config_entry) 

70 

71 async def async_step_user( 

72 self, 

73 user_input: dict[str, Any] | None = None, # noqa: ARG002 

74 ) -> ConfigFlowResult: 

75 """Show menu to choose backend type.""" 

76 return self.async_show_menu( 

77 step_id="user", 

78 menu_options=["otel", "syslog"], 

79 ) 

80 

81 async def async_step_otel(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult: 

82 """Handle OpenTelemetry OTLP configuration.""" 

83 errors: dict[str, str] = {} 

84 

85 if user_input is not None: 

86 host = user_input[CONF_HOST] 

87 port = user_input[CONF_PORT] 

88 use_tls = user_input[CONF_USE_TLS] 

89 url = _build_endpoint_url(host, port, use_tls, user_input.get(CONF_PATH, OTLP_LOGS_PATH)) 

90 

91 # Validate header fields format before connecting 

92 extra_headers: dict[str, str] = {} 

93 token = user_input.get(CONF_TOKEN, "").strip() 

94 if token: 

95 if not use_tls: 

96 _LOGGER.warning("remote_logger: token configured without TLS; token will be sent in plain text") 

97 token_type = user_input.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER) 

98 extra_headers["Authorization"] = build_auth_header(token, token_type) 

99 raw_headers = user_input.get(CONF_HEADERS, "").strip() 

100 if raw_headers: 

101 try: 

102 extra_headers.update(parse_headers(raw_headers)) 

103 except ValueError: 

104 errors[CONF_HEADERS] = "invalid_headers" 

105 

106 # Validate connectivity 

107 if not errors: 

108 session = async_get_clientsession(self.hass, verify_ssl=use_tls) 

109 errors = await otel_validate(session, url, user_input[CONF_ENCODING], extra_headers or None) 

110 # Validate resource attributes format 

111 if not errors: 

112 raw_attrs = user_input.get(CONF_RESOURCE_ATTRIBUTES, "") 

113 if raw_attrs.strip(): 

114 try: 

115 parse_resource_attributes(raw_attrs) 

116 except ValueError: 

117 errors[CONF_RESOURCE_ATTRIBUTES] = "invalid_attributes" 

118 

119 if not errors: 

120 await self.async_set_unique_id(f"{DOMAIN}_{BACKEND_OTEL}_{host}_{port}") 

121 self._abort_if_unique_id_configured() 

122 self._pending_data = {**user_input, CONF_BACKEND: BACKEND_OTEL} 

123 self._pending_data["_title"] = f"OTLP @ {host}:{port}" 

124 return await self.async_step_common() 

125 

126 return self.async_show_form( 

127 step_id="otel", 

128 data_schema=self.add_suggested_values_to_schema(OTEL_DATA_SCHEMA, user_input or {}), 

129 errors=errors, 

130 ) 

131 

132 async def async_step_reauth(self, entry_data: dict[str, Any]) -> ConfigFlowResult: # noqa: ARG002 

133 """Initiate reauth after authentication failure.""" 

134 return await self.async_step_reauth_otel() 

135 

136 async def async_step_reauth_otel(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult: 

137 """Handle re-entry of the bearer token after an authentication failure.""" 

138 reauth_entry = self._get_reauth_entry() 

139 errors: dict[str, str] = {} 

140 

141 if user_input is not None: 

142 token = user_input.get(CONF_TOKEN, "").strip() 

143 extra_headers: dict[str, str] = {} 

144 if token: 

145 token_type = user_input.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER) 

146 extra_headers["Authorization"] = build_auth_header(token, token_type) 

147 raw_headers = reauth_entry.data.get(CONF_HEADERS, "").strip() 

148 if raw_headers: 

149 extra_headers.update(parse_headers(raw_headers)) 

150 

151 url = _build_endpoint_url( 

152 reauth_entry.data[CONF_HOST], 

153 reauth_entry.data[CONF_PORT], 

154 reauth_entry.data[CONF_USE_TLS], 

155 reauth_entry.data.get(CONF_PATH, OTLP_LOGS_PATH), 

156 ) 

157 session = async_get_clientsession(self.hass, verify_ssl=reauth_entry.data[CONF_USE_TLS]) 

158 errors = await otel_validate(session, url, reauth_entry.data[CONF_ENCODING], extra_headers or None) 

159 if not errors: 

160 return self.async_update_reload_and_abort( 

161 reauth_entry, 

162 data_updates={ 

163 CONF_TOKEN: user_input[CONF_TOKEN], 

164 CONF_TOKEN_TYPE: user_input.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER), 

165 }, 

166 ) 

167 

168 return self.async_show_form( 

169 step_id="reauth_otel", 

170 data_schema=self.add_suggested_values_to_schema(REAUTH_OTEL_DATA_SCHEMA, user_input or {}), 

171 errors=errors, 

172 ) 

173 

174 async def async_step_syslog(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult: 

175 """Handle Syslog RFC 5424 configuration.""" 

176 errors: dict[str, str] = {} 

177 

178 if user_input is not None: 

179 host = user_input[CONF_HOST] 

180 port = user_input[CONF_PORT] 

181 protocol = user_input[CONF_PROTOCOL] 

182 use_tls = user_input.get(CONF_USE_TLS, False) 

183 

184 # Validate connectivity 

185 error = await syslog_validate(self.hass, host, port, protocol, use_tls) 

186 if error: 

187 errors["base"] = error 

188 

189 if not errors: 

190 await self.async_set_unique_id(f"{DOMAIN}_{BACKEND_SYSLOG}") 

191 self._abort_if_unique_id_configured() 

192 self._pending_data = {**user_input, CONF_BACKEND: BACKEND_SYSLOG} 

193 self._pending_data["_title"] = f"Syslog @ {host}:{port} ({protocol.upper()})" 

194 return await self.async_step_common() 

195 

196 return self.async_show_form( 

197 step_id="syslog", 

198 data_schema=self.add_suggested_values_to_schema(SYSLOG_DATA_SCHEMA, user_input or {}), 

199 errors=errors, 

200 ) 

201 

202 async def async_step_common(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult: 

203 """Configure common event subscription options.""" 

204 errors: dict[str, str] = {} 

205 if user_input is not None: 

206 if user_input.get(CONF_LOG_HA_STATE_CHANGES) and user_input.get(CONF_LOG_HA_FULL_STATE_CHANGES): 

207 errors[CONF_LOG_HA_FULL_STATE_CHANGES] = "state_changes_exclusive" 

208 else: 

209 title = self._pending_data.pop("_title") 

210 return self.async_create_entry( 

211 title=title, 

212 data={**self._pending_data, **user_input}, 

213 ) 

214 

215 return self.async_show_form( 

216 step_id="common", 

217 data_schema=self.add_suggested_values_to_schema(COMMON_DATA_SCHEMA, user_input or {}), 

218 errors=errors, 

219 ) 

220 

221 

222class RemoteLoggerOptionsFlow(OptionsFlow): 

223 """Allow editing connection details and event subscriptions after setup.""" 

224 

225 def __init__(self, config_entry: ConfigEntry) -> None: 

226 self._config_entry = config_entry 

227 self._pending_options: dict[str, Any] = {} 

228 

229 async def async_step_init(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult: # noqa: ARG002 

230 """Dispatch to the backend-specific connection form.""" 

231 backend = self._config_entry.data.get(CONF_BACKEND, BACKEND_OTEL) 

232 if backend == BACKEND_SYSLOG: 

233 return await self.async_step_syslog() 

234 return await self.async_step_otel() 

235 

236 async def async_step_otel(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult: 

237 """Handle OTLP connection settings.""" 

238 errors: dict[str, str] = {} 

239 merged = {**self._config_entry.data, **self._config_entry.options} 

240 

241 if user_input is not None: 

242 host = user_input[CONF_HOST] 

243 port = user_input[CONF_PORT] 

244 use_tls = user_input[CONF_USE_TLS] 

245 url = _build_endpoint_url(host, port, use_tls, user_input.get(CONF_PATH, OTLP_LOGS_PATH)) 

246 

247 extra_headers: dict[str, str] = {} 

248 token = user_input.get(CONF_TOKEN, "").strip() 

249 if token: 

250 token_type = user_input.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER) 

251 extra_headers["Authorization"] = build_auth_header(token, token_type) 

252 raw_headers = user_input.get(CONF_HEADERS, "").strip() 

253 if raw_headers: 

254 try: 

255 extra_headers.update(parse_headers(raw_headers)) 

256 except ValueError: 

257 errors[CONF_HEADERS] = "invalid_headers" 

258 

259 if not errors: 

260 session = async_get_clientsession(self.hass, verify_ssl=use_tls) 

261 errors = await otel_validate(session, url, user_input[CONF_ENCODING], extra_headers or None) 

262 

263 if not errors: 

264 raw_attrs = user_input.get(CONF_RESOURCE_ATTRIBUTES, "") 

265 if raw_attrs.strip(): 

266 try: 

267 parse_resource_attributes(raw_attrs) 

268 except ValueError: 

269 errors[CONF_RESOURCE_ATTRIBUTES] = "invalid_attributes" 

270 

271 if not errors: 

272 self._pending_options = user_input 

273 return await self.async_step_events() 

274 

275 return self.async_show_form( 

276 step_id="otel", 

277 data_schema=self.add_suggested_values_to_schema(OTEL_DATA_SCHEMA, user_input or merged), 

278 errors=errors, 

279 ) 

280 

281 async def async_step_syslog(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult: 

282 """Handle Syslog connection settings.""" 

283 errors: dict[str, str] = {} 

284 merged = {**self._config_entry.data, **self._config_entry.options} 

285 

286 if user_input is not None: 

287 host = user_input[CONF_HOST] 

288 port = user_input[CONF_PORT] 

289 protocol = user_input[CONF_PROTOCOL] 

290 use_tls = user_input.get(CONF_USE_TLS, False) 

291 

292 error = await syslog_validate(self.hass, host, port, protocol, use_tls) 

293 if error: 

294 errors["base"] = error 

295 

296 if not errors: 

297 self._pending_options = user_input 

298 return await self.async_step_events() 

299 

300 return self.async_show_form( 

301 step_id="syslog", 

302 data_schema=self.add_suggested_values_to_schema(SYSLOG_DATA_SCHEMA, user_input or merged), 

303 errors=errors, 

304 ) 

305 

306 async def async_step_events(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult: 

307 """Handle event subscription options.""" 

308 errors: dict[str, str] = {} 

309 if user_input is not None: 

310 if user_input.get(CONF_LOG_HA_STATE_CHANGES) and user_input.get(CONF_LOG_HA_FULL_STATE_CHANGES): 

311 errors[CONF_LOG_HA_FULL_STATE_CHANGES] = "state_changes_exclusive" 

312 else: 

313 return self.async_create_entry(title="", data={**self._pending_options, **user_input}) 

314 

315 merged = {**self._config_entry.data, **self._config_entry.options} 

316 current = { 

317 CONF_LOG_HA_CORE_ACTIVITY: merged.get(CONF_LOG_HA_CORE_ACTIVITY, False), 

318 CONF_LOG_HA_LIFECYCLE: merged.get(CONF_LOG_HA_LIFECYCLE, False), 

319 CONF_LOG_HA_CORE_CHANGES: merged.get(CONF_LOG_HA_CORE_CHANGES, False), 

320 CONF_LOG_HA_STATE_CHANGES: merged.get(CONF_LOG_HA_STATE_CHANGES, False), 

321 CONF_LOG_HA_FULL_STATE_CHANGES: merged.get(CONF_LOG_HA_FULL_STATE_CHANGES, False), 

322 CONF_LOG_HA_EVENT_BODY: merged.get(CONF_LOG_HA_EVENT_BODY, False), 

323 CONF_CUSTOM_EVENTS: merged.get(CONF_CUSTOM_EVENTS, ""), 

324 } 

325 return self.async_show_form( 

326 step_id="events", 

327 data_schema=self.add_suggested_values_to_schema(COMMON_DATA_SCHEMA, current), 

328 errors=errors, 

329 )