使用自己的Python函数处理Protobuf中的字符串编码
我目前所在的项目是一个老项目,里面的字符串编码有点乱,数据库中有些是GB2312,有些是UTF8;代码中有些是GBK,有些是UTF8,代码中转来转去,经常是不太清楚当前这个字符串是什么编码,由于是老项目,也没去修改。最近合服脚本由项目上进行维护了,我拿到脚本看了看是Python写的,我之前也没学习过Python,只有现学现用。
数据库中使用了Protobuf,这里面也有字符串,编码也是有GBK,也有UTF8编码的,而且是交叉使用 ,有过合服经验的同学应该知道,这里会涉及一些修改,比如名字冲突需要改名。Protobuf中的名字修改就需要先解析出来修改了再序列化回去。这个时候问题来了,Protobuf默认是使用的UTF8编码进行解析(Decode)与序列化的(Encode),可以参见:google.protobuf.internal中的decoder.py中的函数:
1def StringDecoder(field_number, is_repeated, is_packed, key, new_default):
2 """Returns a decoder for a string field."""
3
4 local_DecodeVarint = _DecodeVarint
5 local_unicode = unicode
6
7 assert not is_packed
8 if is_repeated:
9 tag_bytes = encoder.TagBytes(field_number,
10 wire_format.WIRETYPE_LENGTH_DELIMITED)
11 tag_len = len(tag_bytes)
12 def DecodeRepeatedField(buffer, pos, end, message, field_dict):
13 value = field_dict.get(key)
14 if value is None:
15 value = field_dict.setdefault(key, new_default(message))
16 while 1:
17 (size, pos) = local_DecodeVarint(buffer, pos)
18 new_pos = pos + size
19 if new_pos > end:
20 raise _DecodeError('Truncated string.')
21 value.append(local_unicode(buffer[pos:new_pos], 'utf-8'))
22 # Predict that the next tag is another copy of the same repeated field.
23 pos = new_pos + tag_len
24 if buffer[new_pos:pos] != tag_bytes or new_pos == end:
25 # Prediction failed. Return.
26 return new_pos
27 return DecodeRepeatedField
28 else:
29 def DecodeField(buffer, pos, end, message, field_dict):
30 (size, pos) = local_DecodeVarint(buffer, pos)
31 new_pos = pos + size
32 if new_pos > end:
33 raise _DecodeError('Truncated string.')
34 field_dict[key] = local_unicode(buffer[pos:new_pos], 'utf-8')
35 return new_pos
36 return DecodeField```
37
38
39
40
41以及encoder.py中的函数
42
43
44
45```python
46def StringEncoder(field_number, is_repeated, is_packed):
47 """Returns an encoder for a string field."""
48
49 tag = TagBytes(field_number, wire_format.WIRETYPE_LENGTH_DELIMITED)
50 local_EncodeVarint = _EncodeVarint
51 local_len = len
52 assert not is_packed
53 if is_repeated:
54 def EncodeRepeatedField(write, value):
55 for element in value:
56 encoded = element.encode('utf-8')
57 write(tag)
58 local_EncodeVarint(write, local_len(encoded))
59 write(encoded)
60 return EncodeRepeatedField
61 else:
62 def EncodeField(write, value):
63 encoded = value.encode('utf-8')
64 write(tag)
65 local_EncodeVarint(write, local_len(encoded))
66 return write(encoded)
67 return EncodeField```
68
69
70
71 如果Protobuf中的字符串编码为非UTF8编码,则在解析(Decode)的过程中会出现异常(有点奇怪的是我同事的电脑上没出现异常):
72
73
74
75'utf8' codec can't decode byte……
76
77我们有没有一个方法在不改变Protobuf原来的代码的情况下使用自己的函数来进行解析呢,这是我首先想到的,由于没学习过Python,恶补了一下Python基础后,研究发现Protobuf是把Decode的函数入口放在了一个数组中,在引入模块的时候就会自动初始化这些入口函数,然后保存到各个Protobuf类中,各个PB类都有一个decoders_by_tag字典,这个字典就存放了各种数据类型的解析函数入口地址。
78
79通过上面的代码可以看出,具体解析函数(DecodeField)是放在一个闭包中的,不能直接修改,所以必须整个(StringDecoder)替换。通过深入研究,终于发现了其设置的入口,在google.protobuf.internal的type_checkers.py中有这样一段代码:
80
81
82
83```python
84# Maps from field types to encoder constructors.
85TYPE_TO_ENCODER = {
86 _FieldDescriptor.TYPE_DOUBLE: encoder.DoubleEncoder,
87 _FieldDescriptor.TYPE_FLOAT: encoder.FloatEncoder,
88 _FieldDescriptor.TYPE_INT64: encoder.Int64Encoder,
89 _FieldDescriptor.TYPE_UINT64: encoder.UInt64Encoder,
90 _FieldDescriptor.TYPE_INT32: encoder.Int32Encoder,
91 _FieldDescriptor.TYPE_FIXED64: encoder.Fixed64Encoder,
92 _FieldDescriptor.TYPE_FIXED32: encoder.Fixed32Encoder,
93 _FieldDescriptor.TYPE_BOOL: encoder.BoolEncoder,
94 _FieldDescriptor.TYPE_STRING: encoder.StringEncoder,
95 _FieldDescriptor.TYPE_GROUP: encoder.GroupEncoder,
96 _FieldDescriptor.TYPE_MESSAGE: encoder.MessageEncoder,
97 _FieldDescriptor.TYPE_BYTES: encoder.BytesEncoder,
98 _FieldDescriptor.TYPE_UINT32: encoder.UInt32Encoder,
99 _FieldDescriptor.TYPE_ENUM: encoder.EnumEncoder,
100 _FieldDescriptor.TYPE_SFIXED32: encoder.SFixed32Encoder,
101 _FieldDescriptor.TYPE_SFIXED64: encoder.SFixed64Encoder,
102 _FieldDescriptor.TYPE_SINT32: encoder.SInt32Encoder,
103 _FieldDescriptor.TYPE_SINT64: encoder.SInt64Encoder,
104 }
105
106
107# Maps from field types to sizer constructors.
108TYPE_TO_SIZER = {
109 _FieldDescriptor.TYPE_DOUBLE: encoder.DoubleSizer,
110 _FieldDescriptor.TYPE_FLOAT: encoder.FloatSizer,
111 _FieldDescriptor.TYPE_INT64: encoder.Int64Sizer,
112 _FieldDescriptor.TYPE_UINT64: encoder.UInt64Sizer,
113 _FieldDescriptor.TYPE_INT32: encoder.Int32Sizer,
114 _FieldDescriptor.TYPE_FIXED64: encoder.Fixed64Sizer,
115 _FieldDescriptor.TYPE_FIXED32: encoder.Fixed32Sizer,
116 _FieldDescriptor.TYPE_BOOL: encoder.BoolSizer,
117 _FieldDescriptor.TYPE_STRING: encoder.StringSizer,
118 _FieldDescriptor.TYPE_GROUP: encoder.GroupSizer,
119 _FieldDescriptor.TYPE_MESSAGE: encoder.MessageSizer,
120 _FieldDescriptor.TYPE_BYTES: encoder.BytesSizer,
121 _FieldDescriptor.TYPE_UINT32: encoder.UInt32Sizer,
122 _FieldDescriptor.TYPE_ENUM: encoder.EnumSizer,
123 _FieldDescriptor.TYPE_SFIXED32: encoder.SFixed32Sizer,
124 _FieldDescriptor.TYPE_SFIXED64: encoder.SFixed64Sizer,
125 _FieldDescriptor.TYPE_SINT32: encoder.SInt32Sizer,
126 _FieldDescriptor.TYPE_SINT64: encoder.SInt64Sizer,
127 }
128
129
130# Maps from field type to a decoder constructor.
131TYPE_TO_DECODER = {
132 _FieldDescriptor.TYPE_DOUBLE: decoder.DoubleDecoder,
133 _FieldDescriptor.TYPE_FLOAT: decoder.FloatDecoder,
134 _FieldDescriptor.TYPE_INT64: decoder.Int64Decoder,
135 _FieldDescriptor.TYPE_UINT64: decoder.UInt64Decoder,
136 _FieldDescriptor.TYPE_INT32: decoder.Int32Decoder,
137 _FieldDescriptor.TYPE_FIXED64: decoder.Fixed64Decoder,
138 _FieldDescriptor.TYPE_FIXED32: decoder.Fixed32Decoder,
139 _FieldDescriptor.TYPE_BOOL: decoder.BoolDecoder,
140 _FieldDescriptor.TYPE_STRING: decoder.StringDecoder,
141 _FieldDescriptor.TYPE_GROUP: decoder.GroupDecoder,
142 _FieldDescriptor.TYPE_MESSAGE: decoder.MessageDecoder,
143 _FieldDescriptor.TYPE_BYTES: decoder.BytesDecoder,
144 _FieldDescriptor.TYPE_UINT32: decoder.UInt32Decoder,
145 _FieldDescriptor.TYPE_ENUM: decoder.EnumDecoder,
146 _FieldDescriptor.TYPE_SFIXED32: decoder.SFixed32Decoder,
147 _FieldDescriptor.TYPE_SFIXED64: decoder.SFixed64Decoder,
148 _FieldDescriptor.TYPE_SINT32: decoder.SInt32Decoder,
149 _FieldDescriptor.TYPE_SINT64: decoder.SInt64Decoder,
150 }```
151
152
153
154 第一个是序列化(Encoder)的函数入口,第二个是计算大小的函数入口,第三个就是解析(Decoder)的入口,我们可以看到这里映射了所有类型的处理函数入口,那我们把这个入口函数替换成我们自己的函数,就可以根据实际需要进行处理了。
155
156
157
158这里我们需要特别注意的是Protobuf中的各个类都是在模块导入的时候就初始化好了,所以,如果我们要修改入口函数,必须在PB各类引入之前进行修改。为此我写了一个模块文件:protobuf_hack.py,这个模块必须先于PB类import,其内容如下:
159
160
161
162```python
163from google.protobuf.internal import decoder
164from google.protobuf.internal import encoder
165from google.protobuf.internal import wire_format
166from google.protobuf.internal import type_checkers
167from google.protobuf import reflection
168from google.protobuf import message
169
170def StringDecoder(field_number, is_repeated, is_packed, key, new_default):
171 """Returns a decoder for a string field."""
172
173 local_DecodeVarint = _DecodeVarint
174 local_unicode = unicode
175
176 assert not is_packed
177 if is_repeated:
178 tag_bytes = encoder.TagBytes(field_number,
179 wire_format.WIRETYPE_LENGTH_DELIMITED)
180 tag_len = len(tag_bytes)
181 def DecodeRepeatedField(buffer, pos, end, message, field_dict):
182 value = field_dict.get(key)
183 if value is None:
184 value = field_dict.setdefault(key, new_default(message))
185 while 1:
186 (size, pos) = local_DecodeVarint(buffer, pos)
187 new_pos = pos + size
188 if new_pos > end:
189 raise _DecodeError('Truncated string.')
190 value.append(local_unicode(buffer[pos:new_pos], 'gbk'))
191 # Predict that the next tag is another copy of the same repeated field.
192 pos = new_pos + tag_len
193 if buffer[new_pos:pos] != tag_bytes or new_pos == end:
194 # Prediction failed. Return.
195 return new_pos
196 return DecodeRepeatedField
197 else:
198 def DecodeField(buffer, pos, end, message, field_dict):
199 (size, pos) = local_DecodeVarint(buffer, pos)
200 new_pos = pos + size
201 if new_pos > end:
202 raise _DecodeError('Truncated string.')
203 field_dict[key] = local_unicode(buffer[pos:new_pos], 'gbk')
204 return new_pos
205 return DecodeField
206
207type_checkers.TYPE_TO_DECODER[type_checkers._FieldDescriptor.TYPE_STRING] = StringDecoder
这样,我们可以把所有PB中的字符串解析按GBK编码解析了。但是项目中的字符串并不是所有的字符串都是GBK编码的,也有UTF8编码的,为了支持两种编码,我做了一个处理,就是先尝试使用一种编码解析,如果出现异常,再使用另一种编码进行解析,这样就保证了我们所有的字符串都可以正确解析。理想很丰满,现实很骨感,解析是正确了,但是如果我们序列化回去在服务器程序中去使用的时候就会出现乱码,因为原来的GBK或者UTF8统一成UTF8编码了,当然,我们也可以继续像Decoder调用自己的函数一样处理Encoder,但是在Encoder中我们并不知道这个字符串原来在数据库中是什么编码,也没有PB以及字段信息,无法差别处理。
至此,算是白忙活了,无法满足需要。
如果我们能够只修改我们指定的PB类的处理函数就好了,因为我们可以找出哪些PB的字符串是GBK编码的。再次经过深入研究,总算是做到了。
在这里有一个函数帮了我大忙,reflection.py中的ParseMessage函数,我们看一下:
1def ParseMessage(descriptor, byte_str):
2 """Generate a new Message instance from this Descriptor and a byte string.
3
4 Args:
5 descriptor: Protobuf Descriptor object
6 byte_str: Serialized protocol buffer byte string
7
8 Returns:
9 Newly created protobuf Message object.
10 """
11
12 class _ResultClass(message.Message):
13 __metaclass__ = GeneratedProtocolMessageType
14 DESCRIPTOR = descriptor
15
16 new_msg = _ResultClass()
17 new_msg.ParseFromString(byte_str)
18 return new_msg
这个函数其实就是通过描述符信息(descriptor)来解析二进制串,生成一个新的PB消息实例。这中间的关键就是函数中的那个动态生成类实例的代码,在这里会走一次PB类的初始化流程,即会初始化我们所需要的Decoder以及Encoder函数映射字典。为了工作需要,我修改一下这个函数:
1def ParseMessage(descriptor):
2 class _ResultClass(message.Message):
3 __metaclass__ = reflection.GeneratedProtocolMessageType
4 DESCRIPTOR = descriptor
5
6 new_msg = _ResultClass()
7 return new_msg
然后加入我们需要使用自定义函数处理的PB类,注意这里一定是所需要的最小的PB结构。
1def hacker(msg):
2 ParseMessage(msg.DESCRIPTOR)
3
4def hack_pb():
5 #修改默认的字符串处理函数入口为自定义函数
6 type_checkers.TYPE_TO_DECODER[type_checkers._FieldDescriptor.TYPE_STRING] = StringDecoder
7 type_checkers.TYPE_TO_ENCODER[type_checkers._FieldDescriptor.TYPE_STRING] = StringEncoder
8 type_checkers.TYPE_TO_SIZER[type_checkers._FieldDescriptor.TYPE_STRING] = StringSizer
9
10 try:
11 # 这里加入我们需要修改的PB类
12 hacker(DbProto.DB_FriendAssetEntry_PB)
13 except Exception as e:
14 print(e)
15
16 #还原字符串处理函数入口
17 type_checkers.TYPE_TO_DECODER[type_checkers._FieldDescriptor.TYPE_STRING] = decoder.StringDecoder
18 type_checkers.TYPE_TO_ENCODER[type_checkers._FieldDescriptor.TYPE_STRING] = encoder.StringEncoder
19 type_checkers.TYPE_TO_SIZER[type_checkers._FieldDescriptor.TYPE_STRING] = encoder.StringSizer
由于Encode的时候Protobuf是先计算字段的长度,然后再处理的各字段,所以我们还需要把计算大小的函数使用自定义函数,否则再次解析会出问题。
现在基本上满足了需要,算是大功告成了!
细心的读者,不知你发现没,这里还是有一个问题,目前无法解决的问题,就是如果我们一个最小的PB中如果有两个字符串字段,采用的不同的编码怎么办?一般情况下,正常的设计者不会这样做,但是就像我们项目中的编码混乱一样,如果一个不小心就搞成不一样的编码就悲剧了!如果哪位高手有此解决方案,欢迎分享!!!
把整个文件附上:
1from google.protobuf.internal import decoder
2from google.protobuf.internal import encoder
3from google.protobuf.internal import wire_format
4from google.protobuf.internal import type_checkers
5from google.protobuf import reflection
6from google.protobuf import message
7
8def StringDecoder(field_number, is_repeated, is_packed, key, new_default):
9 """Returns a decoder for a string field."""
10
11 local_DecodeVarint = decoder._DecodeVarint
12 local_unicode = unicode
13
14 assert not is_packed
15 if is_repeated:
16 tag_bytes = encoder.TagBytes(field_number,
17 wire_format.WIRETYPE_LENGTH_DELIMITED)
18 tag_len = len(tag_bytes)
19
20 def DecodeRepeatedField(buffer, pos, end, message, field_dict):
21 value = field_dict.get(key)
22 if value is None:
23 value = field_dict.setdefault(key, new_default(message))
24 while 1:
25 (size, pos) = local_DecodeVarint(buffer, pos)
26 new_pos = pos + size
27 if new_pos > end:
28 raise decoder._DecodeError('Truncated string.')
29 str = '' #这里先尝试使用UTF8编码进行解析,如果出现异常则尝试使用GBK编码解析
30 try:
31 str = local_unicode(buffer[pos:new_pos], 'utf-8')
32 except Exception as e:
33 try:
34 str = local_unicode(buffer[pos:new_pos], 'gbk')
35 except Exception as e1:
36 str = ''
37
38 value.append(str)
39 # Predict that the next tag is another copy of the same repeated field.
40 pos = new_pos + tag_len
41 if buffer[new_pos:pos] != tag_bytes or new_pos == end:
42 # Prediction failed. Return.
43 return new_pos
44
45 return DecodeRepeatedField
46 else:
47 def DecodeField(buffer, pos, end, message, field_dict):
48 (size, pos) = local_DecodeVarint(buffer, pos)
49 new_pos = pos + size
50 if new_pos > end:
51 raise decoder._DecodeError('Truncated string.')
52
53 str = '' #这里先尝试使用UTF8编码进行解析,如果出现异常则尝试使用GBK编码解析
54 try:
55 str = local_unicode(buffer[pos:new_pos], 'utf-8')
56 except Exception as e:
57 try:
58 str = local_unicode(buffer[pos:new_pos], 'gbk')
59 except Exception as e1:
60 str = ''
61
62 field_dict[key] = str
63 return new_pos
64
65 return DecodeField
66
67
68def StringEncoder(field_number, is_repeated, is_packed):
69 """Returns an encoder for a string field."""
70
71 tag = encoder.TagBytes(field_number, wire_format.WIRETYPE_LENGTH_DELIMITED)
72 local_EncodeVarint = encoder._EncodeVarint
73 local_len = len
74 assert not is_packed
75 if is_repeated:
76 def EncodeRepeatedField(write, value):
77 for element in value:
78 encoded = element.encode('gbk') #序列化的时候就直接使用GBK编码了
79 write(tag)
80 local_EncodeVarint(write, local_len(encoded))
81 write(encoded)
82
83 return EncodeRepeatedField
84 else:
85 def EncodeField(write, value):
86 encoded = value.encode('gbk') #序列化的时候就直接使用GBK编码了
87 write(tag)
88 local_EncodeVarint(write, local_len(encoded))
89 return write(encoded)
90
91 return EncodeField
92
93def StringSizer(field_number, is_repeated, is_packed):
94 """Returns a sizer for a string field."""
95
96 tag_size = encoder._TagSize(field_number)
97 local_VarintSize = encoder._VarintSize
98 local_len = len
99 assert not is_packed
100 if is_repeated:
101 def RepeatedFieldSize(value):
102 result = tag_size * len(value)
103 for element in value:
104 l = local_len(element.encode('gbk')) #注意序列化前计算长度时也需要使用与序列化相同的编码,否则会出错
105 result += local_VarintSize(l) + l
106 return result
107
108 return RepeatedFieldSize
109 else:
110 def FieldSize(value):
111 l = local_len(value.encode('gbk')) #注意序列化前计算长度时也需要使用与序列化相同的编码,否则会出错
112 return tag_size + local_VarintSize(l) + l
113
114 return FieldSize
115
116def ParseMessage(descriptor):
117 class _ResultClass(message.Message):
118 __metaclass__ = reflection.GeneratedProtocolMessageType
119 DESCRIPTOR = descriptor
120
121 new_msg = _ResultClass()
122 return new_msg
123
124def hacker(msg):
125 ParseMessage(msg.DESCRIPTOR)
126
127def hack_pb():
128 # 修改默认的字符串处理函数入口为自定义函数
129 type_checkers.TYPE_TO_DECODER[type_checkers._FieldDescriptor.TYPE_STRING] = StringDecoder
130 type_checkers.TYPE_TO_ENCODER[type_checkers._FieldDescriptor.TYPE_STRING] = StringEncoder
131 type_checkers.TYPE_TO_SIZER[type_checkers._FieldDescriptor.TYPE_STRING] = StringSizer
132
133 try:
134 # 这里加入我们需要修改的PB类,注意这里需要自行import DbProto模块
135 hacker(DbProto.DB_FriendAssetEntry_PB)
136 except Exception as e:
137 print(e)
138
139 # 还原字符串处理函数入口
140 type_checkers.TYPE_TO_DECODER[type_checkers._FieldDescriptor.TYPE_STRING] = decoder.StringDecoder
141 type_checkers.TYPE_TO_ENCODER[type_checkers._FieldDescriptor.TYPE_STRING] = encoder.StringEncoder
142 type_checkers.TYPE_TO_SIZER[type_checkers._FieldDescriptor.TYPE_STRING] = encoder.StringSizer
143
144#这里让其在引入模块时自动执行
145hack_pb()
- 原文作者:Witton
- 原文链接:https://wittonbell.github.io/posts/2016/2016-11-04-使用自己的Python函数处理Protobuf中的字符串编码/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议. 进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。