Coverage for custom_components/remote_logger/otel/protobuf_encoder.py: 96%
106 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-07 04:46 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-07 04:46 +0000
1"""Minimal protobuf wire-format encoder for OTLP ExportLogsServiceRequest.
3Encodes the same dict structure used for JSON export into protobuf binary,
4without requiring the opentelemetry-proto compiled classes.
5"""
7from __future__ import annotations
9import logging
10import struct
11from typing import Any
13# reference:
14# https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/logs/v1/logs.proto
15# https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/common/v1/common.proto
17# Protobuf wire types
18WIRE_VARINT = 0 # int32, int64, uint32, uint64, sint32, sint64, bool, enum
19WIRE_64BIT = 1 # fixed64, sfixed64, double
20WIRE_LENGTH_DELIMITED = 2 # string, bytes, embedded messages, packed repeated fields
22_LOGGER = logging.getLogger(__name__)
24# --------------------------------------------------------------------------- #
25# Low-level protobuf primitives #
26# --------------------------------------------------------------------------- #
29def _encode_varint(value: int) -> bytes:
30 """Encode an unsigned integer as a protobuf varint."""
31 parts = []
32 while value > 0x7F:
33 parts.append((value & 0x7F) | 0x80)
34 value >>= 7
35 parts.append(value & 0x7F)
36 return bytes(parts)
39def _tag(field_number: int, wire_type: int) -> bytes:
40 return _encode_varint((field_number << 3) | wire_type)
43def _encode_string_field(field_number: int, value: str) -> bytes:
44 """Encode a string field (tag + length + UTF-8 bytes)."""
45 try:
46 data: bytes = value.encode("utf-8")
47 except Exception:
48 # don't log or there'll be infinite loop
49 # _LOGGER.exception("remote_logger non string found at %s: %s", field_number, value)
50 data = f"TYPE ERROR ({value})".encode()
51 return _tag(field_number, WIRE_LENGTH_DELIMITED) + _encode_varint(len(data)) + data
54def _encode_bytes_field(field_number: int, value: bytes) -> bytes:
55 """Encode a bytes field (tag + length + raw bytes)."""
56 return _tag(field_number, WIRE_LENGTH_DELIMITED) + _encode_varint(len(value)) + value
59def _encode_submessage(field_number: int, data: bytes) -> bytes:
60 """Encode an embedded message field (tag + length + serialized submessage)."""
61 return _tag(field_number, WIRE_LENGTH_DELIMITED) + _encode_varint(len(data)) + data
64def _encode_fixed64(field_number: int, value: int) -> bytes:
65 """Encode a fixed64 field (tag + 8 bytes little-endian)."""
66 return _tag(field_number, WIRE_64BIT) + struct.pack("<q", value)
69def _encode_float64(field_number: int, value: float) -> bytes:
70 """Encode a fixed64 field (tag + 8 bytes little-endian)."""
71 return _tag(field_number, WIRE_64BIT) + struct.pack("<d", value)
74def _encode_uint32_field(field_number: int, value: int) -> bytes:
75 """Encode a uint32/int32/enum as a varint field."""
76 return _tag(field_number, WIRE_VARINT) + _encode_varint(value)
79# --------------------------------------------------------------------------- #
80# OTLP message encoders #
81# Each function takes the same dict structure used for JSON and returns bytes. #
82# --------------------------------------------------------------------------- #
85def _encode_any_value(av: dict[str, Any]) -> bytes:
86 """Encode an AnyValue message."""
87 if "stringValue" in av:
88 return _encode_string_field(1, av["stringValue"])
89 if "intValue" in av:
90 return _encode_uint32_field(3, int(av["intValue"]))
91 if "bytesValue" in av:
92 return _encode_bytes_field(7, av["bytesValue"])
93 if "boolValue" in av:
94 return _encode_uint32_field(2, 1 if av["boolValue"] else 0)
95 if "doubleValue" in av:
96 return _encode_float64(4, av["doubleValue"])
97 return b""
100def _encode_key_value(kv: dict[str, Any]) -> bytes:
101 """Encode a KeyValue message: key=1 (string), value=2 (AnyValue)."""
102 result = _encode_string_field(1, kv["key"])
103 if "value" in kv:
104 value_bytes = _encode_any_value(kv["value"])
105 result += _encode_submessage(2, value_bytes)
106 return result
109def _encode_resource(resource: dict[str, Any]) -> bytes:
110 """Encode a Resource message: attributes=1 (repeated KeyValue)."""
111 result = b""
112 for attr in resource.get("attributes", []):
113 result += _encode_submessage(1, _encode_key_value(attr))
114 return result
117def _encode_instrumentation_scope(scope: dict[str, Any]) -> bytes:
118 """Encode an InstrumentationScope: name=1, version=2."""
119 result = b""
120 if "name" in scope:
121 result += _encode_string_field(1, scope["name"])
122 if "version" in scope:
123 result += _encode_string_field(2, scope["version"])
124 return result
127def _encode_log_record(record: dict[str, Any]) -> bytes:
128 """Encode a LogRecord message.
130 Field mapping:
131 time_unix_nano = 1 (fixed64)
132 severity_number = 2 (enum/int32)
133 severity_text = 3 (string)
134 body = 5 (AnyValue)
135 attributes = 6 (repeated KeyValue)
136 trace_id = 9 (bytes)
137 span_id = 10 (bytes)
138 observed_time_unix_nano = 11 (fixed64)
139 event_name = 12 (string)
140 """
141 result = b""
143 if "timeUnixNano" in record:
144 result += _encode_fixed64(1, int(record["timeUnixNano"]))
146 if "severityNumber" in record:
147 result += _encode_uint32_field(2, record["severityNumber"])
149 if "severityText" in record:
150 result += _encode_string_field(3, record["severityText"])
152 if "body" in record:
153 result += _encode_submessage(5, _encode_any_value(record["body"]))
155 for attr in record.get("attributes", []):
156 result += _encode_submessage(6, _encode_key_value(attr))
158 if record.get("traceId"):
159 result += _encode_bytes_field(9, bytes.fromhex(record["traceId"]))
161 if record.get("spanId"):
162 result += _encode_bytes_field(10, bytes.fromhex(record["spanId"]))
164 if "observedTimeUnixNano" in record:
165 result += _encode_fixed64(11, int(record["observedTimeUnixNano"]))
167 if record.get("eventName"):
168 result += _encode_string_field(12, record["eventName"])
170 return result
173def _encode_scope_logs(scope_logs: dict[str, Any]) -> bytes:
174 """Encode a ScopeLogs: scope=1, log_records=2."""
175 result = b""
176 if "scope" in scope_logs:
177 result += _encode_submessage(1, _encode_instrumentation_scope(scope_logs["scope"]))
178 for record in scope_logs.get("logRecords", []):
179 result += _encode_submessage(2, _encode_log_record(record))
180 return result
183def _encode_resource_logs(rl: dict[str, Any]) -> bytes:
184 """Encode a ResourceLogs: resource=1, scope_logs=2."""
185 result = b""
186 if "resource" in rl:
187 result += _encode_submessage(1, _encode_resource(rl["resource"]))
188 for sl in rl.get("scopeLogs", []):
189 result += _encode_submessage(2, _encode_scope_logs(sl))
190 return result
193def encode_export_logs_request(request: dict[str, Any]) -> bytes:
194 """Encode an ExportLogsServiceRequest: resource_logs=1 (repeated).
196 Takes the same dict structure as the JSON payload and returns
197 the serialized protobuf bytes.
198 """
199 result = b""
200 for rl in request.get("resourceLogs", []):
201 try:
202 result += _encode_submessage(1, _encode_resource_logs(rl))
203 except Exception as e:
204 _LOGGER.exception("remote_logger: failed to build protobuf for %s: %s", rl, e)
205 return result