Coverage for custom_components/remote_logger/otel/protobuf_encoder.py: 96%

106 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-07 04:46 +0000

1"""Minimal protobuf wire-format encoder for OTLP ExportLogsServiceRequest. 

2 

3Encodes the same dict structure used for JSON export into protobuf binary, 

4without requiring the opentelemetry-proto compiled classes. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10import struct 

11from typing import Any 

12 

13# reference: 

14# https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/logs/v1/logs.proto 

15# https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/common/v1/common.proto 

16 

17# Protobuf wire types 

18WIRE_VARINT = 0 # int32, int64, uint32, uint64, sint32, sint64, bool, enum 

19WIRE_64BIT = 1 # fixed64, sfixed64, double 

20WIRE_LENGTH_DELIMITED = 2 # string, bytes, embedded messages, packed repeated fields 

21 

22_LOGGER = logging.getLogger(__name__) 

23 

24# --------------------------------------------------------------------------- # 

25# Low-level protobuf primitives # 

26# --------------------------------------------------------------------------- # 

27 

28 

29def _encode_varint(value: int) -> bytes: 

30 """Encode an unsigned integer as a protobuf varint.""" 

31 parts = [] 

32 while value > 0x7F: 

33 parts.append((value & 0x7F) | 0x80) 

34 value >>= 7 

35 parts.append(value & 0x7F) 

36 return bytes(parts) 

37 

38 

39def _tag(field_number: int, wire_type: int) -> bytes: 

40 return _encode_varint((field_number << 3) | wire_type) 

41 

42 

43def _encode_string_field(field_number: int, value: str) -> bytes: 

44 """Encode a string field (tag + length + UTF-8 bytes).""" 

45 try: 

46 data: bytes = value.encode("utf-8") 

47 except Exception: 

48 # don't log or there'll be infinite loop 

49 # _LOGGER.exception("remote_logger non string found at %s: %s", field_number, value) 

50 data = f"TYPE ERROR ({value})".encode() 

51 return _tag(field_number, WIRE_LENGTH_DELIMITED) + _encode_varint(len(data)) + data 

52 

53 

54def _encode_bytes_field(field_number: int, value: bytes) -> bytes: 

55 """Encode a bytes field (tag + length + raw bytes).""" 

56 return _tag(field_number, WIRE_LENGTH_DELIMITED) + _encode_varint(len(value)) + value 

57 

58 

59def _encode_submessage(field_number: int, data: bytes) -> bytes: 

60 """Encode an embedded message field (tag + length + serialized submessage).""" 

61 return _tag(field_number, WIRE_LENGTH_DELIMITED) + _encode_varint(len(data)) + data 

62 

63 

64def _encode_fixed64(field_number: int, value: int) -> bytes: 

65 """Encode a fixed64 field (tag + 8 bytes little-endian).""" 

66 return _tag(field_number, WIRE_64BIT) + struct.pack("<q", value) 

67 

68 

69def _encode_float64(field_number: int, value: float) -> bytes: 

70 """Encode a fixed64 field (tag + 8 bytes little-endian).""" 

71 return _tag(field_number, WIRE_64BIT) + struct.pack("<d", value) 

72 

73 

74def _encode_uint32_field(field_number: int, value: int) -> bytes: 

75 """Encode a uint32/int32/enum as a varint field.""" 

76 return _tag(field_number, WIRE_VARINT) + _encode_varint(value) 

77 

78 

79# --------------------------------------------------------------------------- # 

80# OTLP message encoders # 

81# Each function takes the same dict structure used for JSON and returns bytes. # 

82# --------------------------------------------------------------------------- # 

83 

84 

85def _encode_any_value(av: dict[str, Any]) -> bytes: 

86 """Encode an AnyValue message.""" 

87 if "stringValue" in av: 

88 return _encode_string_field(1, av["stringValue"]) 

89 if "intValue" in av: 

90 return _encode_uint32_field(3, int(av["intValue"])) 

91 if "bytesValue" in av: 

92 return _encode_bytes_field(7, av["bytesValue"]) 

93 if "boolValue" in av: 

94 return _encode_uint32_field(2, 1 if av["boolValue"] else 0) 

95 if "doubleValue" in av: 

96 return _encode_float64(4, av["doubleValue"]) 

97 return b"" 

98 

99 

100def _encode_key_value(kv: dict[str, Any]) -> bytes: 

101 """Encode a KeyValue message: key=1 (string), value=2 (AnyValue).""" 

102 result = _encode_string_field(1, kv["key"]) 

103 if "value" in kv: 

104 value_bytes = _encode_any_value(kv["value"]) 

105 result += _encode_submessage(2, value_bytes) 

106 return result 

107 

108 

109def _encode_resource(resource: dict[str, Any]) -> bytes: 

110 """Encode a Resource message: attributes=1 (repeated KeyValue).""" 

111 result = b"" 

112 for attr in resource.get("attributes", []): 

113 result += _encode_submessage(1, _encode_key_value(attr)) 

114 return result 

115 

116 

117def _encode_instrumentation_scope(scope: dict[str, Any]) -> bytes: 

118 """Encode an InstrumentationScope: name=1, version=2.""" 

119 result = b"" 

120 if "name" in scope: 

121 result += _encode_string_field(1, scope["name"]) 

122 if "version" in scope: 

123 result += _encode_string_field(2, scope["version"]) 

124 return result 

125 

126 

127def _encode_log_record(record: dict[str, Any]) -> bytes: 

128 """Encode a LogRecord message. 

129 

130 Field mapping: 

131 time_unix_nano = 1 (fixed64) 

132 severity_number = 2 (enum/int32) 

133 severity_text = 3 (string) 

134 body = 5 (AnyValue) 

135 attributes = 6 (repeated KeyValue) 

136 trace_id = 9 (bytes) 

137 span_id = 10 (bytes) 

138 observed_time_unix_nano = 11 (fixed64) 

139 event_name = 12 (string) 

140 """ 

141 result = b"" 

142 

143 if "timeUnixNano" in record: 

144 result += _encode_fixed64(1, int(record["timeUnixNano"])) 

145 

146 if "severityNumber" in record: 

147 result += _encode_uint32_field(2, record["severityNumber"]) 

148 

149 if "severityText" in record: 

150 result += _encode_string_field(3, record["severityText"]) 

151 

152 if "body" in record: 

153 result += _encode_submessage(5, _encode_any_value(record["body"])) 

154 

155 for attr in record.get("attributes", []): 

156 result += _encode_submessage(6, _encode_key_value(attr)) 

157 

158 if record.get("traceId"): 

159 result += _encode_bytes_field(9, bytes.fromhex(record["traceId"])) 

160 

161 if record.get("spanId"): 

162 result += _encode_bytes_field(10, bytes.fromhex(record["spanId"])) 

163 

164 if "observedTimeUnixNano" in record: 

165 result += _encode_fixed64(11, int(record["observedTimeUnixNano"])) 

166 

167 if record.get("eventName"): 

168 result += _encode_string_field(12, record["eventName"]) 

169 

170 return result 

171 

172 

173def _encode_scope_logs(scope_logs: dict[str, Any]) -> bytes: 

174 """Encode a ScopeLogs: scope=1, log_records=2.""" 

175 result = b"" 

176 if "scope" in scope_logs: 

177 result += _encode_submessage(1, _encode_instrumentation_scope(scope_logs["scope"])) 

178 for record in scope_logs.get("logRecords", []): 

179 result += _encode_submessage(2, _encode_log_record(record)) 

180 return result 

181 

182 

183def _encode_resource_logs(rl: dict[str, Any]) -> bytes: 

184 """Encode a ResourceLogs: resource=1, scope_logs=2.""" 

185 result = b"" 

186 if "resource" in rl: 

187 result += _encode_submessage(1, _encode_resource(rl["resource"])) 

188 for sl in rl.get("scopeLogs", []): 

189 result += _encode_submessage(2, _encode_scope_logs(sl)) 

190 return result 

191 

192 

193def encode_export_logs_request(request: dict[str, Any]) -> bytes: 

194 """Encode an ExportLogsServiceRequest: resource_logs=1 (repeated). 

195 

196 Takes the same dict structure as the JSON payload and returns 

197 the serialized protobuf bytes. 

198 """ 

199 result = b"" 

200 for rl in request.get("resourceLogs", []): 

201 try: 

202 result += _encode_submessage(1, _encode_resource_logs(rl)) 

203 except Exception as e: 

204 _LOGGER.exception("remote_logger: failed to build protobuf for %s: %s", rl, e) 

205 return result