Coverage for custom_components/remote_logger/otel/protobuf_encoder.py: 96%

102 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-02-18 22:41 +0000

1"""Minimal protobuf wire-format encoder for OTLP ExportLogsServiceRequest. 

2 

3Encodes the same dict structure used for JSON export into protobuf binary, 

4without requiring the opentelemetry-proto compiled classes. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10import struct 

11from typing import Any 

12 

13# reference: 

14# https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/logs/v1/logs.proto 

15# https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/common/v1/common.proto 

16 

17# Protobuf wire types 

18WIRE_VARINT = 0 # int32, int64, uint32, uint64, sint32, sint64, bool, enum 

19WIRE_64BIT = 1 # fixed64, sfixed64, double 

20WIRE_LENGTH_DELIMITED = 2 # string, bytes, embedded messages, packed repeated fields 

21 

22_LOGGER = logging.getLogger(__name__) 

23 

24# --------------------------------------------------------------------------- # 

25# Low-level protobuf primitives # 

26# --------------------------------------------------------------------------- # 

27 

28 

29def _encode_varint(value: int) -> bytes: 

30 """Encode an unsigned integer as a protobuf varint.""" 

31 parts = [] 

32 while value > 0x7F: 

33 parts.append((value & 0x7F) | 0x80) 

34 value >>= 7 

35 parts.append(value & 0x7F) 

36 return bytes(parts) 

37 

38 

39def _tag(field_number: int, wire_type: int) -> bytes: 

40 return _encode_varint((field_number << 3) | wire_type) 

41 

42 

43def _encode_string_field(field_number: int, value: str) -> bytes: 

44 """Encode a string field (tag + length + UTF-8 bytes).""" 

45 try: 

46 data: bytes = value.encode("utf-8") 

47 except Exception: 

48 # don't log or there'll be infinite loop 

49 # _LOGGER.exception("remote_logger non string found at %s: %s", field_number, value) 

50 data = f"TYPE ERROR ({value})".encode() 

51 return _tag(field_number, WIRE_LENGTH_DELIMITED) + _encode_varint(len(data)) + data 

52 

53 

54def _encode_bytes_field(field_number: int, value: bytes) -> bytes: 

55 """Encode a bytes field (tag + length + raw bytes).""" 

56 return _tag(field_number, WIRE_LENGTH_DELIMITED) + _encode_varint(len(value)) + value 

57 

58 

59def _encode_submessage(field_number: int, data: bytes) -> bytes: 

60 """Encode an embedded message field (tag + length + serialized submessage).""" 

61 return _tag(field_number, WIRE_LENGTH_DELIMITED) + _encode_varint(len(data)) + data 

62 

63 

64def _encode_fixed64(field_number: int, value: int) -> bytes: 

65 """Encode a fixed64 field (tag + 8 bytes little-endian).""" 

66 return _tag(field_number, WIRE_64BIT) + struct.pack("<d", value) 

67 

68 

69def _encode_uint32_field(field_number: int, value: int) -> bytes: 

70 """Encode a uint32/int32/enum as a varint field.""" 

71 return _tag(field_number, WIRE_VARINT) + _encode_varint(value) 

72 

73 

74# --------------------------------------------------------------------------- # 

75# OTLP message encoders # 

76# Each function takes the same dict structure used for JSON and returns bytes. # 

77# --------------------------------------------------------------------------- # 

78 

79 

80def _encode_any_value(av: dict[str, Any]) -> bytes: 

81 """Encode an AnyValue message.""" 

82 if "string_value" in av: 

83 return _encode_string_field(1, av["string_value"]) 

84 if "int_value" in av: 

85 return _encode_uint32_field(3, av["int_value"]) 

86 if "byte_value" in av: 

87 return _encode_bytes_field(7, av["byte_value"]) 

88 if "bool_value" in av: 

89 return _encode_uint32_field(2, 1 if av["bool_value"] else 0) 

90 if "float_value" in av: 

91 return _encode_fixed64(4, av["float_value"]) 

92 return b"" 

93 

94 

95def _encode_key_value(kv: dict[str, Any]) -> bytes: 

96 """Encode a KeyValue message: key=1 (string), value=2 (AnyValue).""" 

97 result = _encode_string_field(1, kv["key"]) 

98 if "value" in kv: 

99 value_bytes = _encode_any_value(kv["value"]) 

100 result += _encode_submessage(2, value_bytes) 

101 return result 

102 

103 

104def _encode_resource(resource: dict[str, Any]) -> bytes: 

105 """Encode a Resource message: attributes=1 (repeated KeyValue).""" 

106 result = b"" 

107 for attr in resource.get("attributes", []): 

108 result += _encode_submessage(1, _encode_key_value(attr)) 

109 return result 

110 

111 

112def _encode_instrumentation_scope(scope: dict[str, Any]) -> bytes: 

113 """Encode an InstrumentationScope: name=1, version=2.""" 

114 result = b"" 

115 if "name" in scope: 

116 result += _encode_string_field(1, scope["name"]) 

117 if "version" in scope: 

118 result += _encode_string_field(2, scope["version"]) 

119 return result 

120 

121 

122def _encode_log_record(record: dict[str, Any]) -> bytes: 

123 """Encode a LogRecord message. 

124 

125 Field mapping: 

126 time_unix_nano = 1 (fixed64) 

127 severity_number = 2 (enum/int32) 

128 severity_text = 3 (string) 

129 body = 5 (AnyValue) 

130 attributes = 6 (repeated KeyValue) 

131 trace_id = 9 (bytes) 

132 span_id = 10 (bytes) 

133 observed_time_unix_nano = 11 (fixed64) 

134 """ 

135 result = b"" 

136 

137 if "timeUnixNano" in record: 

138 result += _encode_fixed64(1, int(record["timeUnixNano"])) 

139 

140 if "severityNumber" in record: 

141 result += _encode_uint32_field(2, record["severityNumber"]) 

142 

143 if "severityText" in record: 

144 result += _encode_string_field(3, record["severityText"]) 

145 

146 if "body" in record: 

147 result += _encode_submessage(5, _encode_any_value(record["body"])) 

148 

149 for attr in record.get("attributes", []): 

150 result += _encode_submessage(6, _encode_key_value(attr)) 

151 

152 if record.get("traceId"): 

153 result += _encode_bytes_field(9, bytes.fromhex(record["traceId"])) 

154 

155 if record.get("spanId"): 

156 result += _encode_bytes_field(10, bytes.fromhex(record["spanId"])) 

157 

158 if "observedTimeUnixNano" in record: 

159 result += _encode_fixed64(11, int(record["observedTimeUnixNano"])) 

160 

161 return result 

162 

163 

164def _encode_scope_logs(scope_logs: dict[str, Any]) -> bytes: 

165 """Encode a ScopeLogs: scope=1, log_records=2.""" 

166 result = b"" 

167 if "scope" in scope_logs: 

168 result += _encode_submessage(1, _encode_instrumentation_scope(scope_logs["scope"])) 

169 for record in scope_logs.get("logRecords", []): 

170 result += _encode_submessage(2, _encode_log_record(record)) 

171 return result 

172 

173 

174def _encode_resource_logs(rl: dict[str, Any]) -> bytes: 

175 """Encode a ResourceLogs: resource=1, scope_logs=2.""" 

176 result = b"" 

177 if "resource" in rl: 

178 result += _encode_submessage(1, _encode_resource(rl["resource"])) 

179 for sl in rl.get("scopeLogs", []): 

180 result += _encode_submessage(2, _encode_scope_logs(sl)) 

181 return result 

182 

183 

184def encode_export_logs_request(request: dict[str, Any]) -> bytes: 

185 """Encode an ExportLogsServiceRequest: resource_logs=1 (repeated). 

186 

187 Takes the same dict structure as the JSON payload and returns 

188 the serialized protobuf bytes. 

189 """ 

190 result = b"" 

191 for rl in request.get("resourceLogs", []): 

192 try: 

193 result += _encode_submessage(1, _encode_resource_logs(rl)) 

194 except Exception as e: 

195 _LOGGER.error("remote_logger: failed to build protobuf: %s", e) 

196 return result