Coverage for custom_components/remote_logger/otel/protobuf_encoder.py: 96%
102 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-02-18 22:41 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-02-18 22:41 +0000
1"""Minimal protobuf wire-format encoder for OTLP ExportLogsServiceRequest.
3Encodes the same dict structure used for JSON export into protobuf binary,
4without requiring the opentelemetry-proto compiled classes.
5"""
7from __future__ import annotations
9import logging
10import struct
11from typing import Any
13# reference:
14# https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/logs/v1/logs.proto
15# https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/common/v1/common.proto
17# Protobuf wire types
18WIRE_VARINT = 0 # int32, int64, uint32, uint64, sint32, sint64, bool, enum
19WIRE_64BIT = 1 # fixed64, sfixed64, double
20WIRE_LENGTH_DELIMITED = 2 # string, bytes, embedded messages, packed repeated fields
22_LOGGER = logging.getLogger(__name__)
24# --------------------------------------------------------------------------- #
25# Low-level protobuf primitives #
26# --------------------------------------------------------------------------- #
29def _encode_varint(value: int) -> bytes:
30 """Encode an unsigned integer as a protobuf varint."""
31 parts = []
32 while value > 0x7F:
33 parts.append((value & 0x7F) | 0x80)
34 value >>= 7
35 parts.append(value & 0x7F)
36 return bytes(parts)
39def _tag(field_number: int, wire_type: int) -> bytes:
40 return _encode_varint((field_number << 3) | wire_type)
43def _encode_string_field(field_number: int, value: str) -> bytes:
44 """Encode a string field (tag + length + UTF-8 bytes)."""
45 try:
46 data: bytes = value.encode("utf-8")
47 except Exception:
48 # don't log or there'll be infinite loop
49 # _LOGGER.exception("remote_logger non string found at %s: %s", field_number, value)
50 data = f"TYPE ERROR ({value})".encode()
51 return _tag(field_number, WIRE_LENGTH_DELIMITED) + _encode_varint(len(data)) + data
54def _encode_bytes_field(field_number: int, value: bytes) -> bytes:
55 """Encode a bytes field (tag + length + raw bytes)."""
56 return _tag(field_number, WIRE_LENGTH_DELIMITED) + _encode_varint(len(value)) + value
59def _encode_submessage(field_number: int, data: bytes) -> bytes:
60 """Encode an embedded message field (tag + length + serialized submessage)."""
61 return _tag(field_number, WIRE_LENGTH_DELIMITED) + _encode_varint(len(data)) + data
64def _encode_fixed64(field_number: int, value: int) -> bytes:
65 """Encode a fixed64 field (tag + 8 bytes little-endian)."""
66 return _tag(field_number, WIRE_64BIT) + struct.pack("<d", value)
69def _encode_uint32_field(field_number: int, value: int) -> bytes:
70 """Encode a uint32/int32/enum as a varint field."""
71 return _tag(field_number, WIRE_VARINT) + _encode_varint(value)
74# --------------------------------------------------------------------------- #
75# OTLP message encoders #
76# Each function takes the same dict structure used for JSON and returns bytes. #
77# --------------------------------------------------------------------------- #
80def _encode_any_value(av: dict[str, Any]) -> bytes:
81 """Encode an AnyValue message."""
82 if "string_value" in av:
83 return _encode_string_field(1, av["string_value"])
84 if "int_value" in av:
85 return _encode_uint32_field(3, av["int_value"])
86 if "byte_value" in av:
87 return _encode_bytes_field(7, av["byte_value"])
88 if "bool_value" in av:
89 return _encode_uint32_field(2, 1 if av["bool_value"] else 0)
90 if "float_value" in av:
91 return _encode_fixed64(4, av["float_value"])
92 return b""
95def _encode_key_value(kv: dict[str, Any]) -> bytes:
96 """Encode a KeyValue message: key=1 (string), value=2 (AnyValue)."""
97 result = _encode_string_field(1, kv["key"])
98 if "value" in kv:
99 value_bytes = _encode_any_value(kv["value"])
100 result += _encode_submessage(2, value_bytes)
101 return result
104def _encode_resource(resource: dict[str, Any]) -> bytes:
105 """Encode a Resource message: attributes=1 (repeated KeyValue)."""
106 result = b""
107 for attr in resource.get("attributes", []):
108 result += _encode_submessage(1, _encode_key_value(attr))
109 return result
112def _encode_instrumentation_scope(scope: dict[str, Any]) -> bytes:
113 """Encode an InstrumentationScope: name=1, version=2."""
114 result = b""
115 if "name" in scope:
116 result += _encode_string_field(1, scope["name"])
117 if "version" in scope:
118 result += _encode_string_field(2, scope["version"])
119 return result
122def _encode_log_record(record: dict[str, Any]) -> bytes:
123 """Encode a LogRecord message.
125 Field mapping:
126 time_unix_nano = 1 (fixed64)
127 severity_number = 2 (enum/int32)
128 severity_text = 3 (string)
129 body = 5 (AnyValue)
130 attributes = 6 (repeated KeyValue)
131 trace_id = 9 (bytes)
132 span_id = 10 (bytes)
133 observed_time_unix_nano = 11 (fixed64)
134 """
135 result = b""
137 if "timeUnixNano" in record:
138 result += _encode_fixed64(1, int(record["timeUnixNano"]))
140 if "severityNumber" in record:
141 result += _encode_uint32_field(2, record["severityNumber"])
143 if "severityText" in record:
144 result += _encode_string_field(3, record["severityText"])
146 if "body" in record:
147 result += _encode_submessage(5, _encode_any_value(record["body"]))
149 for attr in record.get("attributes", []):
150 result += _encode_submessage(6, _encode_key_value(attr))
152 if record.get("traceId"):
153 result += _encode_bytes_field(9, bytes.fromhex(record["traceId"]))
155 if record.get("spanId"):
156 result += _encode_bytes_field(10, bytes.fromhex(record["spanId"]))
158 if "observedTimeUnixNano" in record:
159 result += _encode_fixed64(11, int(record["observedTimeUnixNano"]))
161 return result
164def _encode_scope_logs(scope_logs: dict[str, Any]) -> bytes:
165 """Encode a ScopeLogs: scope=1, log_records=2."""
166 result = b""
167 if "scope" in scope_logs:
168 result += _encode_submessage(1, _encode_instrumentation_scope(scope_logs["scope"]))
169 for record in scope_logs.get("logRecords", []):
170 result += _encode_submessage(2, _encode_log_record(record))
171 return result
174def _encode_resource_logs(rl: dict[str, Any]) -> bytes:
175 """Encode a ResourceLogs: resource=1, scope_logs=2."""
176 result = b""
177 if "resource" in rl:
178 result += _encode_submessage(1, _encode_resource(rl["resource"]))
179 for sl in rl.get("scopeLogs", []):
180 result += _encode_submessage(2, _encode_scope_logs(sl))
181 return result
184def encode_export_logs_request(request: dict[str, Any]) -> bytes:
185 """Encode an ExportLogsServiceRequest: resource_logs=1 (repeated).
187 Takes the same dict structure as the JSON payload and returns
188 the serialized protobuf bytes.
189 """
190 result = b""
191 for rl in request.get("resourceLogs", []):
192 try:
193 result += _encode_submessage(1, _encode_resource_logs(rl))
194 except Exception as e:
195 _LOGGER.error("remote_logger: failed to build protobuf: %s", e)
196 return result