Coverage for custom_components/remote_logger/config_flow.py: 80%
179 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-07 04:46 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-07 04:46 +0000
1"""Config flow for the remote_logger integration."""
3from __future__ import annotations
5import logging
6from typing import Any
8import voluptuous as vol
9from homeassistant.config_entries import ConfigEntry, ConfigFlow, ConfigFlowResult, OptionsFlow
10from homeassistant.const import CONF_HEADERS, CONF_HOST, CONF_PATH, CONF_PORT, CONF_PROTOCOL, CONF_TOKEN
11from homeassistant.core import callback
12from homeassistant.helpers import selector
13from homeassistant.helpers.aiohttp_client import async_get_clientsession
15from .const import (
16 BACKEND_OTEL,
17 BACKEND_SYSLOG,
18 CONF_BACKEND,
19 CONF_CUSTOM_EVENTS,
20 CONF_ENCODING,
21 CONF_LOG_HA_CORE_ACTIVITY,
22 CONF_LOG_HA_CORE_CHANGES,
23 CONF_LOG_HA_EVENT_BODY,
24 CONF_LOG_HA_FULL_STATE_CHANGES,
25 CONF_LOG_HA_LIFECYCLE,
26 CONF_LOG_HA_STATE_CHANGES,
27 CONF_RESOURCE_ATTRIBUTES,
28 CONF_USE_TLS,
29 DOMAIN,
30)
31from .otel.const import CONF_TOKEN_TYPE, OTEL_DATA_SCHEMA, OTLP_LOGS_PATH, REAUTH_OTEL_DATA_SCHEMA, TOKEN_TYPE_BEARER
32from .otel.exporter import build_auth_header, parse_headers, parse_resource_attributes
33from .otel.exporter import validate as otel_validate
34from .syslog.const import SYSLOG_DATA_SCHEMA
35from .syslog.exporter import validate as syslog_validate
37_LOGGER = logging.getLogger(__name__)
39COMMON_DATA_SCHEMA = vol.Schema({
40 vol.Optional(CONF_LOG_HA_LIFECYCLE, default=False): selector.BooleanSelector(),
41 vol.Optional(CONF_LOG_HA_CORE_CHANGES, default=False): selector.BooleanSelector(),
42 vol.Optional(CONF_LOG_HA_CORE_ACTIVITY, default=False): selector.BooleanSelector(),
43 vol.Optional(CONF_LOG_HA_STATE_CHANGES, default=False): selector.BooleanSelector(),
44 vol.Optional(CONF_LOG_HA_FULL_STATE_CHANGES, default=False): selector.BooleanSelector(),
45 vol.Optional(CONF_LOG_HA_EVENT_BODY, default=True): selector.BooleanSelector(),
46 vol.Optional(CONF_CUSTOM_EVENTS, default=""): selector.TextSelector(selector.TextSelectorConfig(multiline=True)),
47})
50def _build_endpoint_url(host: str, port: int, use_tls: bool, path: str = OTLP_LOGS_PATH) -> str:
51 """Build the full OTLP endpoint URL."""
52 scheme = "https" if use_tls else "http"
53 return f"{scheme}://{host}:{port}{path}"
56class OtelLogsConfigFlow(ConfigFlow, domain=DOMAIN):
57 """Handle a config flow for OpenTelemetry Log Exporter."""
59 VERSION = 2
61 def __init__(self) -> None:
62 super().__init__()
63 self._pending_data: dict[str, Any] = {}
65 @staticmethod
66 @callback
67 def async_get_options_flow(config_entry: ConfigEntry) -> OptionsFlow:
68 """Return the options flow handler."""
69 return RemoteLoggerOptionsFlow(config_entry)
71 async def async_step_user(
72 self,
73 user_input: dict[str, Any] | None = None, # noqa: ARG002
74 ) -> ConfigFlowResult:
75 """Show menu to choose backend type."""
76 return self.async_show_menu(
77 step_id="user",
78 menu_options=["otel", "syslog"],
79 )
81 async def async_step_otel(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:
82 """Handle OpenTelemetry OTLP configuration."""
83 errors: dict[str, str] = {}
85 if user_input is not None:
86 host = user_input[CONF_HOST]
87 port = user_input[CONF_PORT]
88 use_tls = user_input[CONF_USE_TLS]
89 url = _build_endpoint_url(host, port, use_tls, user_input.get(CONF_PATH, OTLP_LOGS_PATH))
91 # Validate header fields format before connecting
92 extra_headers: dict[str, str] = {}
93 token = user_input.get(CONF_TOKEN, "").strip()
94 if token:
95 if not use_tls:
96 _LOGGER.warning("remote_logger: token configured without TLS; token will be sent in plain text")
97 token_type = user_input.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER)
98 extra_headers["Authorization"] = build_auth_header(token, token_type)
99 raw_headers = user_input.get(CONF_HEADERS, "").strip()
100 if raw_headers:
101 try:
102 extra_headers.update(parse_headers(raw_headers))
103 except ValueError:
104 errors[CONF_HEADERS] = "invalid_headers"
106 # Validate connectivity
107 if not errors:
108 session = async_get_clientsession(self.hass, verify_ssl=use_tls)
109 errors = await otel_validate(session, url, user_input[CONF_ENCODING], extra_headers or None)
110 # Validate resource attributes format
111 if not errors:
112 raw_attrs = user_input.get(CONF_RESOURCE_ATTRIBUTES, "")
113 if raw_attrs.strip():
114 try:
115 parse_resource_attributes(raw_attrs)
116 except ValueError:
117 errors[CONF_RESOURCE_ATTRIBUTES] = "invalid_attributes"
119 if not errors:
120 await self.async_set_unique_id(f"{DOMAIN}_{BACKEND_OTEL}_{host}_{port}")
121 self._abort_if_unique_id_configured()
122 self._pending_data = {**user_input, CONF_BACKEND: BACKEND_OTEL}
123 self._pending_data["_title"] = f"OTLP @ {host}:{port}"
124 return await self.async_step_common()
126 return self.async_show_form(
127 step_id="otel",
128 data_schema=self.add_suggested_values_to_schema(OTEL_DATA_SCHEMA, user_input or {}),
129 errors=errors,
130 )
132 async def async_step_reauth(self, entry_data: dict[str, Any]) -> ConfigFlowResult: # noqa: ARG002
133 """Initiate reauth after authentication failure."""
134 return await self.async_step_reauth_otel()
136 async def async_step_reauth_otel(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:
137 """Handle re-entry of the bearer token after an authentication failure."""
138 reauth_entry = self._get_reauth_entry()
139 errors: dict[str, str] = {}
141 if user_input is not None:
142 token = user_input.get(CONF_TOKEN, "").strip()
143 extra_headers: dict[str, str] = {}
144 if token:
145 token_type = user_input.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER)
146 extra_headers["Authorization"] = build_auth_header(token, token_type)
147 raw_headers = reauth_entry.data.get(CONF_HEADERS, "").strip()
148 if raw_headers:
149 extra_headers.update(parse_headers(raw_headers))
151 url = _build_endpoint_url(
152 reauth_entry.data[CONF_HOST],
153 reauth_entry.data[CONF_PORT],
154 reauth_entry.data[CONF_USE_TLS],
155 reauth_entry.data.get(CONF_PATH, OTLP_LOGS_PATH),
156 )
157 session = async_get_clientsession(self.hass, verify_ssl=reauth_entry.data[CONF_USE_TLS])
158 errors = await otel_validate(session, url, reauth_entry.data[CONF_ENCODING], extra_headers or None)
159 if not errors:
160 return self.async_update_reload_and_abort(
161 reauth_entry,
162 data_updates={
163 CONF_TOKEN: user_input[CONF_TOKEN],
164 CONF_TOKEN_TYPE: user_input.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER),
165 },
166 )
168 return self.async_show_form(
169 step_id="reauth_otel",
170 data_schema=self.add_suggested_values_to_schema(REAUTH_OTEL_DATA_SCHEMA, user_input or {}),
171 errors=errors,
172 )
174 async def async_step_syslog(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:
175 """Handle Syslog RFC 5424 configuration."""
176 errors: dict[str, str] = {}
178 if user_input is not None:
179 host = user_input[CONF_HOST]
180 port = user_input[CONF_PORT]
181 protocol = user_input[CONF_PROTOCOL]
182 use_tls = user_input.get(CONF_USE_TLS, False)
184 # Validate connectivity
185 error = await syslog_validate(self.hass, host, port, protocol, use_tls)
186 if error:
187 errors["base"] = error
189 if not errors:
190 await self.async_set_unique_id(f"{DOMAIN}_{BACKEND_SYSLOG}")
191 self._abort_if_unique_id_configured()
192 self._pending_data = {**user_input, CONF_BACKEND: BACKEND_SYSLOG}
193 self._pending_data["_title"] = f"Syslog @ {host}:{port} ({protocol.upper()})"
194 return await self.async_step_common()
196 return self.async_show_form(
197 step_id="syslog",
198 data_schema=self.add_suggested_values_to_schema(SYSLOG_DATA_SCHEMA, user_input or {}),
199 errors=errors,
200 )
202 async def async_step_common(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:
203 """Configure common event subscription options."""
204 errors: dict[str, str] = {}
205 if user_input is not None:
206 if user_input.get(CONF_LOG_HA_STATE_CHANGES) and user_input.get(CONF_LOG_HA_FULL_STATE_CHANGES):
207 errors[CONF_LOG_HA_FULL_STATE_CHANGES] = "state_changes_exclusive"
208 else:
209 title = self._pending_data.pop("_title")
210 return self.async_create_entry(
211 title=title,
212 data={**self._pending_data, **user_input},
213 )
215 return self.async_show_form(
216 step_id="common",
217 data_schema=self.add_suggested_values_to_schema(COMMON_DATA_SCHEMA, user_input or {}),
218 errors=errors,
219 )
222class RemoteLoggerOptionsFlow(OptionsFlow):
223 """Allow editing connection details and event subscriptions after setup."""
225 def __init__(self, config_entry: ConfigEntry) -> None:
226 self._config_entry = config_entry
227 self._pending_options: dict[str, Any] = {}
229 async def async_step_init(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult: # noqa: ARG002
230 """Dispatch to the backend-specific connection form."""
231 backend = self._config_entry.data.get(CONF_BACKEND, BACKEND_OTEL)
232 if backend == BACKEND_SYSLOG:
233 return await self.async_step_syslog()
234 return await self.async_step_otel()
236 async def async_step_otel(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:
237 """Handle OTLP connection settings."""
238 errors: dict[str, str] = {}
239 merged = {**self._config_entry.data, **self._config_entry.options}
241 if user_input is not None:
242 host = user_input[CONF_HOST]
243 port = user_input[CONF_PORT]
244 use_tls = user_input[CONF_USE_TLS]
245 url = _build_endpoint_url(host, port, use_tls, user_input.get(CONF_PATH, OTLP_LOGS_PATH))
247 extra_headers: dict[str, str] = {}
248 token = user_input.get(CONF_TOKEN, "").strip()
249 if token:
250 token_type = user_input.get(CONF_TOKEN_TYPE, TOKEN_TYPE_BEARER)
251 extra_headers["Authorization"] = build_auth_header(token, token_type)
252 raw_headers = user_input.get(CONF_HEADERS, "").strip()
253 if raw_headers:
254 try:
255 extra_headers.update(parse_headers(raw_headers))
256 except ValueError:
257 errors[CONF_HEADERS] = "invalid_headers"
259 if not errors:
260 session = async_get_clientsession(self.hass, verify_ssl=use_tls)
261 errors = await otel_validate(session, url, user_input[CONF_ENCODING], extra_headers or None)
263 if not errors:
264 raw_attrs = user_input.get(CONF_RESOURCE_ATTRIBUTES, "")
265 if raw_attrs.strip():
266 try:
267 parse_resource_attributes(raw_attrs)
268 except ValueError:
269 errors[CONF_RESOURCE_ATTRIBUTES] = "invalid_attributes"
271 if not errors:
272 self._pending_options = user_input
273 return await self.async_step_events()
275 return self.async_show_form(
276 step_id="otel",
277 data_schema=self.add_suggested_values_to_schema(OTEL_DATA_SCHEMA, user_input or merged),
278 errors=errors,
279 )
281 async def async_step_syslog(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:
282 """Handle Syslog connection settings."""
283 errors: dict[str, str] = {}
284 merged = {**self._config_entry.data, **self._config_entry.options}
286 if user_input is not None:
287 host = user_input[CONF_HOST]
288 port = user_input[CONF_PORT]
289 protocol = user_input[CONF_PROTOCOL]
290 use_tls = user_input.get(CONF_USE_TLS, False)
292 error = await syslog_validate(self.hass, host, port, protocol, use_tls)
293 if error:
294 errors["base"] = error
296 if not errors:
297 self._pending_options = user_input
298 return await self.async_step_events()
300 return self.async_show_form(
301 step_id="syslog",
302 data_schema=self.add_suggested_values_to_schema(SYSLOG_DATA_SCHEMA, user_input or merged),
303 errors=errors,
304 )
306 async def async_step_events(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:
307 """Handle event subscription options."""
308 errors: dict[str, str] = {}
309 if user_input is not None:
310 if user_input.get(CONF_LOG_HA_STATE_CHANGES) and user_input.get(CONF_LOG_HA_FULL_STATE_CHANGES):
311 errors[CONF_LOG_HA_FULL_STATE_CHANGES] = "state_changes_exclusive"
312 else:
313 return self.async_create_entry(title="", data={**self._pending_options, **user_input})
315 merged = {**self._config_entry.data, **self._config_entry.options}
316 current = {
317 CONF_LOG_HA_CORE_ACTIVITY: merged.get(CONF_LOG_HA_CORE_ACTIVITY, False),
318 CONF_LOG_HA_LIFECYCLE: merged.get(CONF_LOG_HA_LIFECYCLE, False),
319 CONF_LOG_HA_CORE_CHANGES: merged.get(CONF_LOG_HA_CORE_CHANGES, False),
320 CONF_LOG_HA_STATE_CHANGES: merged.get(CONF_LOG_HA_STATE_CHANGES, False),
321 CONF_LOG_HA_FULL_STATE_CHANGES: merged.get(CONF_LOG_HA_FULL_STATE_CHANGES, False),
322 CONF_LOG_HA_EVENT_BODY: merged.get(CONF_LOG_HA_EVENT_BODY, False),
323 CONF_CUSTOM_EVENTS: merged.get(CONF_CUSTOM_EVENTS, ""),
324 }
325 return self.async_show_form(
326 step_id="events",
327 data_schema=self.add_suggested_values_to_schema(COMMON_DATA_SCHEMA, current),
328 errors=errors,
329 )