Coverage for custom_components / remote_logger / config_flow.py: 80%
196 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-22 21:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-22 21:55 +0000
1"""Config flow for the remote_logger integration."""
3from __future__ import annotations
5import logging
6import re
7from typing import Any
9import voluptuous as vol
10from homeassistant.config_entries import ConfigEntry, ConfigFlow, ConfigFlowResult, OptionsFlow
11from homeassistant.const import CONF_HEADERS, CONF_HOST, CONF_PATH, CONF_PORT, CONF_PROTOCOL, CONF_TOKEN
12from homeassistant.core import callback
13from homeassistant.data_entry_flow import section
14from homeassistant.helpers import selector
15from homeassistant.helpers.aiohttp_client import async_get_clientsession
17from .const import (
18 BACKEND_OTEL,
19 BACKEND_SYSLOG,
20 CONF_BACKEND,
21 CONF_CUSTOM_EVENTS,
22 CONF_ENCODING,
23 CONF_EVENT_BASED_LOGGING,
24 CONF_LOG_HA_CORE_ACTIVITY,
25 CONF_LOG_HA_CORE_CHANGES,
26 CONF_LOG_HA_EVENT_BODY,
27 CONF_LOG_HA_FULL_STATE_CHANGES,
28 CONF_LOG_HA_LIFECYCLE,
29 CONF_LOG_HA_STATE_CHANGES,
30 CONF_LOG_LEVEL,
31 CONF_RESOURCE_ATTRIBUTES,
32 CONF_SUPPRESS_SYSTEM_LOG_EVENT_NAME,
33 CONF_USE_TLS,
34 DEFAULT_LOG_LEVEL,
35 DOMAIN,
36)
37from .otel.const import CONF_TOKEN_TYPE, OTEL_DATA_SCHEMA, OTLP_LOGS_PATH, REAUTH_OTEL_DATA_SCHEMA, TOKEN_TYPE_BEARER
38from .otel.exporter import build_auth_header, parse_headers, parse_resource_attributes
39from .otel.exporter import validate as otel_validate
40from .syslog.const import SYSLOG_DATA_SCHEMA
41from .syslog.exporter import validate as syslog_validate
43_LOGGER = logging.getLogger(__name__)
46def _to_list(value: Any) -> list[str]:
47 """Coerce a stored string (comma/newline-separated) or list to a list of strings."""
48 if isinstance(value, list):
49 return value
50 if not value:
51 return []
52 return [v.strip() for v in re.split(r"[\n,]+", str(value)) if v.strip()]
55COMMON_DATA_SCHEMA = vol.Schema({
56 vol.Optional(CONF_EVENT_BASED_LOGGING, default=False): selector.BooleanSelector(),
57 vol.Optional(CONF_LOG_LEVEL, default=DEFAULT_LOG_LEVEL): selector.SelectSelector(
58 selector.SelectSelectorConfig(
59 options=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
60 mode=selector.SelectSelectorMode.DROPDOWN,
61 )
62 ),
63 vol.Required("ha_standard_events"): section(
64 vol.Schema({
65 vol.Optional(CONF_LOG_HA_LIFECYCLE, default=False): selector.BooleanSelector(),
66 vol.Optional(CONF_LOG_HA_CORE_CHANGES, default=False): selector.BooleanSelector(),
67 vol.Optional(CONF_LOG_HA_CORE_ACTIVITY, default=False): selector.BooleanSelector(),
68 vol.Optional(CONF_LOG_HA_STATE_CHANGES, default=False): selector.BooleanSelector(),
69 vol.Optional(CONF_LOG_HA_FULL_STATE_CHANGES, default=False): selector.BooleanSelector(),
70 }),
71 {"collapsed": False},
72 ),
73 vol.Optional(CONF_LOG_HA_EVENT_BODY, default=True): selector.BooleanSelector(),
74 vol.Optional(CONF_SUPPRESS_SYSTEM_LOG_EVENT_NAME, default=True): selector.BooleanSelector(),
75 vol.Optional(CONF_CUSTOM_EVENTS, default=[]): selector.TextSelector(selector.TextSelectorConfig(multiple=True)),
76})
79def _build_endpoint_url(host: str, port: int, use_tls: bool, path: str = OTLP_LOGS_PATH) -> str:
80 """Build the full OTLP endpoint URL."""
81 scheme = "https" if use_tls else "http"
82 return f"{scheme}://{host}:{port}{path}"
85class OtelLogsConfigFlow(ConfigFlow, domain=DOMAIN):
86 """Handle a config flow for OpenTelemetry Log Exporter."""
88 VERSION = 2
90 def __init__(self) -> None:
91 super().__init__()
92 self._pending_data: dict[str, Any] = {}
94 @staticmethod
95 @callback
96 def async_get_options_flow(config_entry: ConfigEntry) -> OptionsFlow:
97 """Return the options flow handler."""
98 return RemoteLoggerOptionsFlow(config_entry)
100 async def async_step_user(
101 self,
102 user_input: dict[str, Any] | None = None, # noqa: ARG002
103 ) -> ConfigFlowResult:
104 """Show menu to choose backend type."""
105 return self.async_show_menu(
106 step_id="user",
107 menu_options=["otel", "syslog"],
108 )
110 async def async_step_otel(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:
111 """Handle OpenTelemetry OTLP configuration."""
112 errors: dict[str, str] = {}
114 if user_input is not None:
115 host = user_input[CONF_HOST]
116 port = user_input[CONF_PORT]
117 use_tls = user_input[CONF_USE_TLS]
118 url = _build_endpoint_url(host, port, use_tls, user_input.get(CONF_PATH, OTLP_LOGS_PATH))
120 # Validate header fields format before connecting
121 extra_headers: dict[str, str] = {}
122 token = user_input.get(CONF_TOKEN, "").strip()
123 if token:
124 if not use_tls:
125 _LOGGER.warning("remote_logger: token configured without TLS; token will be sent in plain text")
126 token_type = user_input.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER)
127 extra_headers["Authorization"] = build_auth_header(token, token_type)
128 raw_headers = "\n".join(user_input.get(CONF_HEADERS, []))
129 if raw_headers:
130 try:
131 extra_headers.update(parse_headers(raw_headers))
132 except ValueError:
133 errors[CONF_HEADERS] = "invalid_headers"
135 # Validate connectivity
136 if not errors:
137 session = async_get_clientsession(self.hass, verify_ssl=use_tls)
138 errors = await otel_validate(session, url, user_input[CONF_ENCODING], extra_headers or None)
139 # Validate resource attributes format
140 if not errors:
141 raw_attrs = user_input.get(CONF_RESOURCE_ATTRIBUTES, "")
142 if raw_attrs.strip():
143 try:
144 parse_resource_attributes(raw_attrs)
145 except ValueError:
146 errors[CONF_RESOURCE_ATTRIBUTES] = "invalid_attributes"
148 if not errors:
149 await self.async_set_unique_id(f"{DOMAIN}_{BACKEND_OTEL}_{host}_{port}")
150 self._abort_if_unique_id_configured()
151 self._pending_data = {**user_input, CONF_BACKEND: BACKEND_OTEL}
152 self._pending_data["_title"] = f"OTLP @ {host}:{port}"
153 return await self.async_step_common()
155 return self.async_show_form(
156 step_id="otel",
157 data_schema=self.add_suggested_values_to_schema(OTEL_DATA_SCHEMA, user_input or {}),
158 errors=errors,
159 )
161 async def async_step_reauth(self, entry_data: dict[str, Any]) -> ConfigFlowResult: # noqa: ARG002
162 """Initiate reauth after authentication failure."""
163 return await self.async_step_reauth_otel()
165 async def async_step_reauth_otel(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:
166 """Handle re-entry of the bearer token after an authentication failure."""
167 reauth_entry = self._get_reauth_entry()
168 errors: dict[str, str] = {}
170 if user_input is not None:
171 token = user_input.get(CONF_TOKEN, "").strip()
172 extra_headers: dict[str, str] = {}
173 if token:
174 token_type = user_input.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER)
175 extra_headers["Authorization"] = build_auth_header(token, token_type)
176 raw_headers = "\n".join(reauth_entry.data.get(CONF_HEADERS, []))
177 if raw_headers:
178 extra_headers.update(parse_headers(raw_headers))
180 url = _build_endpoint_url(
181 reauth_entry.data[CONF_HOST],
182 reauth_entry.data[CONF_PORT],
183 reauth_entry.data[CONF_USE_TLS],
184 reauth_entry.data.get(CONF_PATH, OTLP_LOGS_PATH),
185 )
186 session = async_get_clientsession(self.hass, verify_ssl=reauth_entry.data[CONF_USE_TLS])
187 errors = await otel_validate(session, url, reauth_entry.data[CONF_ENCODING], extra_headers or None)
188 if not errors:
189 return self.async_update_reload_and_abort(
190 reauth_entry,
191 data_updates={
192 CONF_TOKEN: user_input[CONF_TOKEN],
193 CONF_TOKEN_TYPE: user_input.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER),
194 },
195 )
197 return self.async_show_form(
198 step_id="reauth_otel",
199 data_schema=self.add_suggested_values_to_schema(REAUTH_OTEL_DATA_SCHEMA, user_input or {}),
200 errors=errors,
201 )
203 async def async_step_syslog(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:
204 """Handle Syslog RFC 5424 configuration."""
205 errors: dict[str, str] = {}
207 if user_input is not None:
208 host = user_input[CONF_HOST]
209 port = user_input[CONF_PORT]
210 protocol = user_input[CONF_PROTOCOL]
211 use_tls = user_input.get(CONF_USE_TLS, False)
213 # Validate connectivity
214 error = await syslog_validate(self.hass, host, port, protocol, use_tls)
215 if error:
216 errors["base"] = error
218 if not errors:
219 await self.async_set_unique_id(f"{DOMAIN}_{BACKEND_SYSLOG}")
220 self._abort_if_unique_id_configured()
221 self._pending_data = {**user_input, CONF_BACKEND: BACKEND_SYSLOG}
222 self._pending_data["_title"] = f"Syslog @ {host}:{port} ({protocol.upper()})"
223 return await self.async_step_common()
225 return self.async_show_form(
226 step_id="syslog",
227 data_schema=self.add_suggested_values_to_schema(SYSLOG_DATA_SCHEMA, user_input or {}),
228 errors=errors,
229 )
231 async def async_step_common(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:
232 """Configure common event subscription options."""
233 errors: dict[str, str] = {}
234 if user_input is not None:
235 flat: dict[str, Any] = {k: v for k, v in user_input.items() if not isinstance(v, dict)}
236 for v in user_input.values():
237 if isinstance(v, dict):
238 flat.update(v)
239 if flat.get(CONF_LOG_HA_STATE_CHANGES) and flat.get(CONF_LOG_HA_FULL_STATE_CHANGES):
240 errors[CONF_LOG_HA_FULL_STATE_CHANGES] = "state_changes_exclusive"
241 else:
242 title = self._pending_data.pop("_title")
243 return self.async_create_entry(
244 title=title,
245 data={**self._pending_data, **flat},
246 )
248 return self.async_show_form(
249 step_id="common",
250 data_schema=self.add_suggested_values_to_schema(COMMON_DATA_SCHEMA, user_input or {}),
251 errors=errors,
252 description_placeholders={
253 "learn_more": "[Learn about HA events](https://www.home-assistant.io/docs/configuration/events/)"
254 },
255 )
258class RemoteLoggerOptionsFlow(OptionsFlow):
259 """Allow editing connection details and event subscriptions after setup."""
261 def __init__(self, config_entry: ConfigEntry) -> None:
262 self._config_entry = config_entry
263 self._pending_options: dict[str, Any] = {}
265 async def async_step_init(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult: # noqa: ARG002
266 """Dispatch to the backend-specific connection form."""
267 backend = self._config_entry.data.get(CONF_BACKEND, BACKEND_OTEL)
268 if backend == BACKEND_SYSLOG:
269 return await self.async_step_syslog()
270 return await self.async_step_otel()
272 async def async_step_otel(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:
273 """Handle OTLP connection settings."""
274 errors: dict[str, str] = {}
275 merged = {**self._config_entry.data, **self._config_entry.options}
277 if user_input is not None:
278 host = user_input[CONF_HOST]
279 port = user_input[CONF_PORT]
280 use_tls = user_input[CONF_USE_TLS]
281 url = _build_endpoint_url(host, port, use_tls, user_input.get(CONF_PATH, OTLP_LOGS_PATH))
283 extra_headers: dict[str, str] = {}
284 token = user_input.get(CONF_TOKEN, "").strip()
285 if token:
286 token_type = user_input.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER)
287 extra_headers["Authorization"] = build_auth_header(token, token_type)
288 raw_headers = "\n".join(user_input.get(CONF_HEADERS, []))
289 if raw_headers:
290 try:
291 extra_headers.update(parse_headers(raw_headers))
292 except ValueError:
293 errors[CONF_HEADERS] = "invalid_headers"
295 if not errors:
296 session = async_get_clientsession(self.hass, verify_ssl=use_tls)
297 errors = await otel_validate(session, url, user_input[CONF_ENCODING], extra_headers or None)
299 if not errors:
300 raw_attrs = user_input.get(CONF_RESOURCE_ATTRIBUTES, "")
301 if raw_attrs.strip():
302 try:
303 parse_resource_attributes(raw_attrs)
304 except ValueError:
305 errors[CONF_RESOURCE_ATTRIBUTES] = "invalid_attributes"
307 if not errors:
308 self._pending_options = user_input
309 return await self.async_step_events()
311 suggested = user_input or {**merged, CONF_HEADERS: _to_list(merged.get(CONF_HEADERS, []))}
312 return self.async_show_form(
313 step_id="otel",
314 data_schema=self.add_suggested_values_to_schema(OTEL_DATA_SCHEMA, suggested),
315 errors=errors,
316 )
318 async def async_step_syslog(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:
319 """Handle Syslog connection settings."""
320 errors: dict[str, str] = {}
321 merged = {**self._config_entry.data, **self._config_entry.options}
323 if user_input is not None:
324 host = user_input[CONF_HOST]
325 port = user_input[CONF_PORT]
326 protocol = user_input[CONF_PROTOCOL]
327 use_tls = user_input.get(CONF_USE_TLS, False)
329 error = await syslog_validate(self.hass, host, port, protocol, use_tls)
330 if error:
331 errors["base"] = error
333 if not errors:
334 self._pending_options = user_input
335 return await self.async_step_events()
337 return self.async_show_form(
338 step_id="syslog",
339 data_schema=self.add_suggested_values_to_schema(SYSLOG_DATA_SCHEMA, user_input or merged),
340 errors=errors,
341 )
343 async def async_step_events(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:
344 """Handle event subscription options."""
345 errors: dict[str, str] = {}
346 if user_input is not None:
347 flat: dict[str, Any] = {k: v for k, v in user_input.items() if not isinstance(v, dict)}
348 for v in user_input.values():
349 if isinstance(v, dict):
350 flat.update(v)
351 if flat.get(CONF_LOG_HA_STATE_CHANGES) and flat.get(CONF_LOG_HA_FULL_STATE_CHANGES):
352 errors[CONF_LOG_HA_FULL_STATE_CHANGES] = "state_changes_exclusive"
353 else:
354 return self.async_create_entry(title="", data={**self._pending_options, **flat})
356 merged = {**self._config_entry.data, **self._config_entry.options}
357 current = {
358 CONF_EVENT_BASED_LOGGING: merged.get(CONF_EVENT_BASED_LOGGING, False),
359 CONF_LOG_LEVEL: merged.get(CONF_LOG_LEVEL, DEFAULT_LOG_LEVEL),
360 "ha_standard_events": {
361 CONF_LOG_HA_LIFECYCLE: merged.get(CONF_LOG_HA_LIFECYCLE, False),
362 CONF_LOG_HA_CORE_CHANGES: merged.get(CONF_LOG_HA_CORE_CHANGES, False),
363 CONF_LOG_HA_CORE_ACTIVITY: merged.get(CONF_LOG_HA_CORE_ACTIVITY, False),
364 CONF_LOG_HA_STATE_CHANGES: merged.get(CONF_LOG_HA_STATE_CHANGES, False),
365 CONF_LOG_HA_FULL_STATE_CHANGES: merged.get(CONF_LOG_HA_FULL_STATE_CHANGES, False),
366 },
367 CONF_LOG_HA_EVENT_BODY: merged.get(CONF_LOG_HA_EVENT_BODY, False),
368 CONF_SUPPRESS_SYSTEM_LOG_EVENT_NAME: merged.get(CONF_SUPPRESS_SYSTEM_LOG_EVENT_NAME, True),
369 CONF_CUSTOM_EVENTS: _to_list(merged.get(CONF_CUSTOM_EVENTS, [])),
370 }
371 return self.async_show_form(
372 step_id="events",
373 data_schema=self.add_suggested_values_to_schema(COMMON_DATA_SCHEMA, current),
374 errors=errors,
375 description_placeholders={
376 "learn_more": "[Learn about HA events](https://www.home-assistant.io/docs/configuration/events/)"
377 },
378 )