我目前所在的项目是一个老项目,里面的字符串编码有点乱,数据库中有些是GB2312,有些是UTF8;代码中有些是GBK,有些是UTF8,代码中转来转去,经常是不太清楚当前这个字符串是什么编码,由于是老项目,也没去修改。最近合服脚本由项目上进行维护了,我拿到脚本看了看是Python写的,我之前也没学习过Python,只有现学现用。

数据库中使用了Protobuf,这里面也有字符串,编码也是有GBK,也有UTF8编码的,而且是交叉使用 ,有过合服经验的同学应该知道,这里会涉及一些修改,比如名字冲突需要改名。Protobuf中的名字修改就需要先解析出来修改了再序列化回去。这个时候问题来了,Protobuf默认是使用的UTF8编码进行解析(Decode)与序列化的(Encode),可以参见:google.protobuf.internal中的decoder.py中的函数:

 

  1def StringDecoder(field_number, is_repeated, is_packed, key, new_default):
  2  """Returns a decoder for a string field."""
  3
  4  local_DecodeVarint = _DecodeVarint
  5  local_unicode = unicode
  6
  7  assert not is_packed
  8  if is_repeated:
  9    tag_bytes = encoder.TagBytes(field_number,
 10                                 wire_format.WIRETYPE_LENGTH_DELIMITED)
 11    tag_len = len(tag_bytes)
 12    def DecodeRepeatedField(buffer, pos, end, message, field_dict):
 13      value = field_dict.get(key)
 14      if value is None:
 15        value = field_dict.setdefault(key, new_default(message))
 16      while 1:
 17        (size, pos) = local_DecodeVarint(buffer, pos)
 18        new_pos = pos + size
 19        if new_pos > end:
 20          raise _DecodeError('Truncated string.')
 21        value.append(local_unicode(buffer[pos:new_pos], 'utf-8'))
 22        # Predict that the next tag is another copy of the same repeated field.
 23        pos = new_pos + tag_len
 24        if buffer[new_pos:pos] != tag_bytes or new_pos == end:
 25          # Prediction failed.  Return.
 26          return new_pos
 27    return DecodeRepeatedField
 28  else:
 29    def DecodeField(buffer, pos, end, message, field_dict):
 30      (size, pos) = local_DecodeVarint(buffer, pos)
 31      new_pos = pos + size
 32      if new_pos > end:
 33        raise _DecodeError('Truncated string.')
 34      field_dict[key] = local_unicode(buffer[pos:new_pos], 'utf-8')
 35      return new_pos
 36    return DecodeField```
 37
 38
 39 
 40
 41以及encoder.py中的函数
 42
 43 
 44
 45```python
 46def StringEncoder(field_number, is_repeated, is_packed):
 47  """Returns an encoder for a string field."""
 48
 49  tag = TagBytes(field_number, wire_format.WIRETYPE_LENGTH_DELIMITED)
 50  local_EncodeVarint = _EncodeVarint
 51  local_len = len
 52  assert not is_packed
 53  if is_repeated:
 54    def EncodeRepeatedField(write, value):
 55      for element in value:
 56        encoded = element.encode('utf-8')
 57        write(tag)
 58        local_EncodeVarint(write, local_len(encoded))
 59        write(encoded)
 60    return EncodeRepeatedField
 61  else:
 62    def EncodeField(write, value):
 63      encoded = value.encode('utf-8')
 64      write(tag)
 65      local_EncodeVarint(write, local_len(encoded))
 66      return write(encoded)
 67    return EncodeField```
 68
 69
 70
 71 如果Protobuf中的字符串编码为非UTF8编码则在解析Decode的过程中会出现异常有点奇怪的是我同事的电脑上没出现异常):
 72
 73 
 74
 75'utf8' codec can't decode byte……
 76
 77我们有没有一个方法在不改变Protobuf原来的代码的情况下使用自己的函数来进行解析呢这是我首先想到的由于没学习过Python恶补了一下Python基础后研究发现Protobuf是把Decode的函数入口放在了一个数组中在引入模块的时候就会自动初始化这些入口函数然后保存到各个Protobuf类中各个PB类都有一个decoders_by_tag字典这个字典就存放了各种数据类型的解析函数入口地址
 78
 79通过上面的代码可以看出具体解析函数DecodeField)是放在一个闭包中的不能直接修改所以必须整个StringDecoder替换通过深入研究终于发现了其设置的入口在google.protobuf.internal的type_checkers.py中有这样一段代码
 80
 81 
 82
 83```python
 84# Maps from field types to encoder constructors.
 85TYPE_TO_ENCODER = {
 86    _FieldDescriptor.TYPE_DOUBLE: encoder.DoubleEncoder,
 87    _FieldDescriptor.TYPE_FLOAT: encoder.FloatEncoder,
 88    _FieldDescriptor.TYPE_INT64: encoder.Int64Encoder,
 89    _FieldDescriptor.TYPE_UINT64: encoder.UInt64Encoder,
 90    _FieldDescriptor.TYPE_INT32: encoder.Int32Encoder,
 91    _FieldDescriptor.TYPE_FIXED64: encoder.Fixed64Encoder,
 92    _FieldDescriptor.TYPE_FIXED32: encoder.Fixed32Encoder,
 93    _FieldDescriptor.TYPE_BOOL: encoder.BoolEncoder,
 94    _FieldDescriptor.TYPE_STRING: encoder.StringEncoder,
 95    _FieldDescriptor.TYPE_GROUP: encoder.GroupEncoder,
 96    _FieldDescriptor.TYPE_MESSAGE: encoder.MessageEncoder,
 97    _FieldDescriptor.TYPE_BYTES: encoder.BytesEncoder,
 98    _FieldDescriptor.TYPE_UINT32: encoder.UInt32Encoder,
 99    _FieldDescriptor.TYPE_ENUM: encoder.EnumEncoder,
100    _FieldDescriptor.TYPE_SFIXED32: encoder.SFixed32Encoder,
101    _FieldDescriptor.TYPE_SFIXED64: encoder.SFixed64Encoder,
102    _FieldDescriptor.TYPE_SINT32: encoder.SInt32Encoder,
103    _FieldDescriptor.TYPE_SINT64: encoder.SInt64Encoder,
104    }
105
106
107# Maps from field types to sizer constructors.
108TYPE_TO_SIZER = {
109    _FieldDescriptor.TYPE_DOUBLE: encoder.DoubleSizer,
110    _FieldDescriptor.TYPE_FLOAT: encoder.FloatSizer,
111    _FieldDescriptor.TYPE_INT64: encoder.Int64Sizer,
112    _FieldDescriptor.TYPE_UINT64: encoder.UInt64Sizer,
113    _FieldDescriptor.TYPE_INT32: encoder.Int32Sizer,
114    _FieldDescriptor.TYPE_FIXED64: encoder.Fixed64Sizer,
115    _FieldDescriptor.TYPE_FIXED32: encoder.Fixed32Sizer,
116    _FieldDescriptor.TYPE_BOOL: encoder.BoolSizer,
117    _FieldDescriptor.TYPE_STRING: encoder.StringSizer,
118    _FieldDescriptor.TYPE_GROUP: encoder.GroupSizer,
119    _FieldDescriptor.TYPE_MESSAGE: encoder.MessageSizer,
120    _FieldDescriptor.TYPE_BYTES: encoder.BytesSizer,
121    _FieldDescriptor.TYPE_UINT32: encoder.UInt32Sizer,
122    _FieldDescriptor.TYPE_ENUM: encoder.EnumSizer,
123    _FieldDescriptor.TYPE_SFIXED32: encoder.SFixed32Sizer,
124    _FieldDescriptor.TYPE_SFIXED64: encoder.SFixed64Sizer,
125    _FieldDescriptor.TYPE_SINT32: encoder.SInt32Sizer,
126    _FieldDescriptor.TYPE_SINT64: encoder.SInt64Sizer,
127    }
128
129
130# Maps from field type to a decoder constructor.
131TYPE_TO_DECODER = {
132    _FieldDescriptor.TYPE_DOUBLE: decoder.DoubleDecoder,
133    _FieldDescriptor.TYPE_FLOAT: decoder.FloatDecoder,
134    _FieldDescriptor.TYPE_INT64: decoder.Int64Decoder,
135    _FieldDescriptor.TYPE_UINT64: decoder.UInt64Decoder,
136    _FieldDescriptor.TYPE_INT32: decoder.Int32Decoder,
137    _FieldDescriptor.TYPE_FIXED64: decoder.Fixed64Decoder,
138    _FieldDescriptor.TYPE_FIXED32: decoder.Fixed32Decoder,
139    _FieldDescriptor.TYPE_BOOL: decoder.BoolDecoder,
140    _FieldDescriptor.TYPE_STRING: decoder.StringDecoder,
141    _FieldDescriptor.TYPE_GROUP: decoder.GroupDecoder,
142    _FieldDescriptor.TYPE_MESSAGE: decoder.MessageDecoder,
143    _FieldDescriptor.TYPE_BYTES: decoder.BytesDecoder,
144    _FieldDescriptor.TYPE_UINT32: decoder.UInt32Decoder,
145    _FieldDescriptor.TYPE_ENUM: decoder.EnumDecoder,
146    _FieldDescriptor.TYPE_SFIXED32: decoder.SFixed32Decoder,
147    _FieldDescriptor.TYPE_SFIXED64: decoder.SFixed64Decoder,
148    _FieldDescriptor.TYPE_SINT32: decoder.SInt32Decoder,
149    _FieldDescriptor.TYPE_SINT64: decoder.SInt64Decoder,
150    }```
151
152
153
154 第一个是序列化Encoder的函数入口第二个是计算大小的函数入口第三个就是解析Decoder的入口我们可以看到这里映射了所有类型的处理函数入口那我们把这个入口函数替换成我们自己的函数就可以根据实际需要进行处理了
155
156 
157
158这里我们需要特别注意的是Protobuf中的各个类都是在模块导入的时候就初始化好了所以如果我们要修改入口函数必须在PB各类引入之前进行修改为此我写了一个模块文件protobuf_hack.py这个模块必须先于PB类import其内容如下
159
160 
161
162```python
163from google.protobuf.internal import decoder
164from google.protobuf.internal import encoder
165from google.protobuf.internal import wire_format
166from google.protobuf.internal import type_checkers
167from google.protobuf import reflection
168from google.protobuf import message
169
170def StringDecoder(field_number, is_repeated, is_packed, key, new_default):
171  """Returns a decoder for a string field."""
172
173  local_DecodeVarint = _DecodeVarint
174  local_unicode = unicode
175
176  assert not is_packed
177  if is_repeated:
178    tag_bytes = encoder.TagBytes(field_number,
179                                 wire_format.WIRETYPE_LENGTH_DELIMITED)
180    tag_len = len(tag_bytes)
181    def DecodeRepeatedField(buffer, pos, end, message, field_dict):
182      value = field_dict.get(key)
183      if value is None:
184        value = field_dict.setdefault(key, new_default(message))
185      while 1:
186        (size, pos) = local_DecodeVarint(buffer, pos)
187        new_pos = pos + size
188        if new_pos > end:
189          raise _DecodeError('Truncated string.')
190        value.append(local_unicode(buffer[pos:new_pos], 'gbk'))
191        # Predict that the next tag is another copy of the same repeated field.
192        pos = new_pos + tag_len
193        if buffer[new_pos:pos] != tag_bytes or new_pos == end:
194          # Prediction failed.  Return.
195          return new_pos
196    return DecodeRepeatedField
197  else:
198    def DecodeField(buffer, pos, end, message, field_dict):
199      (size, pos) = local_DecodeVarint(buffer, pos)
200      new_pos = pos + size
201      if new_pos > end:
202        raise _DecodeError('Truncated string.')
203      field_dict[key] = local_unicode(buffer[pos:new_pos], 'gbk')
204      return new_pos
205    return DecodeField
206
207type_checkers.TYPE_TO_DECODER[type_checkers._FieldDescriptor.TYPE_STRING] = StringDecoder 

   

这样,我们可以把所有PB中的字符串解析按GBK编码解析了。但是项目中的字符串并不是所有的字符串都是GBK编码的,也有UTF8编码的,为了支持两种编码,我做了一个处理,就是先尝试使用一种编码解析,如果出现异常,再使用另一种编码进行解析,这样就保证了我们所有的字符串都可以正确解析。理想很丰满,现实很骨感,解析是正确了,但是如果我们序列化回去在服务器程序中去使用的时候就会出现乱码,因为原来的GBK或者UTF8统一成UTF8编码了,当然,我们也可以继续像Decoder调用自己的函数一样处理Encoder,但是在Encoder中我们并不知道这个字符串原来在数据库中是什么编码,也没有PB以及字段信息,无法差别处理。

 

至此,算是白忙活了,无法满足需要。

如果我们能够只修改我们指定的PB类的处理函数就好了,因为我们可以找出哪些PB的字符串是GBK编码的。再次经过深入研究,总算是做到了。

在这里有一个函数帮了我大忙,reflection.py中的ParseMessage函数,我们看一下:

 

 1def ParseMessage(descriptor, byte_str):
 2  """Generate a new Message instance from this Descriptor and a byte string.
 3
 4  Args:
 5    descriptor: Protobuf Descriptor object
 6    byte_str: Serialized protocol buffer byte string
 7
 8  Returns:
 9    Newly created protobuf Message object.
10  """
11
12  class _ResultClass(message.Message):
13    __metaclass__ = GeneratedProtocolMessageType
14    DESCRIPTOR = descriptor
15
16  new_msg = _ResultClass()
17  new_msg.ParseFromString(byte_str)
18  return new_msg

这个函数其实就是通过描述符信息(descriptor)来解析二进制串,生成一个新的PB消息实例。这中间的关键就是函数中的那个动态生成类实例的代码,在这里会走一次PB类的初始化流程,即会初始化我们所需要的Decoder以及Encoder函数映射字典。为了工作需要,我修改一下这个函数:

 

 

1def ParseMessage(descriptor):
2  class _ResultClass(message.Message):
3    __metaclass__ = reflection.GeneratedProtocolMessageType
4    DESCRIPTOR = descriptor
5
6  new_msg = _ResultClass()
7  return new_msg

然后加入我们需要使用自定义函数处理的PB类,注意这里一定是所需要的最小的PB结构。

 1def hacker(msg):
 2    ParseMessage(msg.DESCRIPTOR)
 3	
 4def hack_pb():
 5    #修改默认的字符串处理函数入口为自定义函数
 6    type_checkers.TYPE_TO_DECODER[type_checkers._FieldDescriptor.TYPE_STRING] = StringDecoder
 7    type_checkers.TYPE_TO_ENCODER[type_checkers._FieldDescriptor.TYPE_STRING] = StringEncoder
 8    type_checkers.TYPE_TO_SIZER[type_checkers._FieldDescriptor.TYPE_STRING] = StringSizer
 9
10    try:
11        # 这里加入我们需要修改的PB类
12        hacker(DbProto.DB_FriendAssetEntry_PB)
13    except Exception as e:
14        print(e)
15
16    #还原字符串处理函数入口
17    type_checkers.TYPE_TO_DECODER[type_checkers._FieldDescriptor.TYPE_STRING] = decoder.StringDecoder
18    type_checkers.TYPE_TO_ENCODER[type_checkers._FieldDescriptor.TYPE_STRING] = encoder.StringEncoder
19    type_checkers.TYPE_TO_SIZER[type_checkers._FieldDescriptor.TYPE_STRING] = encoder.StringSizer

由于Encode的时候Protobuf是先计算字段的长度,然后再处理的各字段,所以我们还需要把计算大小的函数使用自定义函数,否则再次解析会出问题。

现在基本上满足了需要,算是大功告成了!

细心的读者,不知你发现没,这里还是有一个问题,目前无法解决的问题,就是如果我们一个最小的PB中如果有两个字符串字段,采用的不同的编码怎么办?一般情况下,正常的设计者不会这样做,但是就像我们项目中的编码混乱一样,如果一个不小心就搞成不一样的编码就悲剧了!如果哪位高手有此解决方案,欢迎分享!!!

把整个文件附上:

 

 

  1from google.protobuf.internal import decoder
  2from google.protobuf.internal import encoder
  3from google.protobuf.internal import wire_format
  4from google.protobuf.internal import type_checkers
  5from google.protobuf import reflection
  6from google.protobuf import message
  7
  8def StringDecoder(field_number, is_repeated, is_packed, key, new_default):
  9    """Returns a decoder for a string field."""
 10
 11    local_DecodeVarint = decoder._DecodeVarint
 12    local_unicode = unicode
 13
 14    assert not is_packed
 15    if is_repeated:
 16        tag_bytes = encoder.TagBytes(field_number,
 17                                     wire_format.WIRETYPE_LENGTH_DELIMITED)
 18        tag_len = len(tag_bytes)
 19
 20        def DecodeRepeatedField(buffer, pos, end, message, field_dict):
 21            value = field_dict.get(key)
 22            if value is None:
 23                value = field_dict.setdefault(key, new_default(message))
 24            while 1:
 25                (size, pos) = local_DecodeVarint(buffer, pos)
 26                new_pos = pos + size
 27                if new_pos > end:
 28                    raise decoder._DecodeError('Truncated string.')
 29                str = '' #这里先尝试使用UTF8编码进行解析,如果出现异常则尝试使用GBK编码解析
 30                try:
 31                    str = local_unicode(buffer[pos:new_pos], 'utf-8')
 32                except Exception as e:
 33                    try:
 34                        str = local_unicode(buffer[pos:new_pos], 'gbk')
 35                    except Exception as e1:
 36                        str = ''
 37
 38                value.append(str)
 39                # Predict that the next tag is another copy of the same repeated field.
 40                pos = new_pos + tag_len
 41                if buffer[new_pos:pos] != tag_bytes or new_pos == end:
 42                    # Prediction failed.  Return.
 43                    return new_pos
 44
 45        return DecodeRepeatedField
 46    else:
 47        def DecodeField(buffer, pos, end, message, field_dict):
 48            (size, pos) = local_DecodeVarint(buffer, pos)
 49            new_pos = pos + size
 50            if new_pos > end:
 51                raise decoder._DecodeError('Truncated string.')
 52
 53            str = '' #这里先尝试使用UTF8编码进行解析,如果出现异常则尝试使用GBK编码解析
 54            try:
 55                str = local_unicode(buffer[pos:new_pos], 'utf-8')
 56            except Exception as e:
 57                try:
 58                    str = local_unicode(buffer[pos:new_pos], 'gbk')
 59                except Exception as e1:
 60                    str = ''
 61
 62            field_dict[key] = str
 63            return new_pos
 64
 65        return DecodeField
 66
 67
 68def StringEncoder(field_number, is_repeated, is_packed):
 69    """Returns an encoder for a string field."""
 70
 71    tag = encoder.TagBytes(field_number, wire_format.WIRETYPE_LENGTH_DELIMITED)
 72    local_EncodeVarint = encoder._EncodeVarint
 73    local_len = len
 74    assert not is_packed
 75    if is_repeated:
 76        def EncodeRepeatedField(write, value):
 77            for element in value:
 78                encoded = element.encode('gbk') #序列化的时候就直接使用GBK编码了
 79                write(tag)
 80                local_EncodeVarint(write, local_len(encoded))
 81                write(encoded)
 82
 83        return EncodeRepeatedField
 84    else:
 85        def EncodeField(write, value):
 86            encoded = value.encode('gbk') #序列化的时候就直接使用GBK编码了
 87            write(tag)
 88            local_EncodeVarint(write, local_len(encoded))
 89            return write(encoded)
 90
 91        return EncodeField
 92
 93def StringSizer(field_number, is_repeated, is_packed):
 94    """Returns a sizer for a string field."""
 95
 96    tag_size = encoder._TagSize(field_number)
 97    local_VarintSize = encoder._VarintSize
 98    local_len = len
 99    assert not is_packed
100    if is_repeated:
101        def RepeatedFieldSize(value):
102            result = tag_size * len(value)
103            for element in value:
104                l = local_len(element.encode('gbk')) #注意序列化前计算长度时也需要使用与序列化相同的编码,否则会出错
105                result += local_VarintSize(l) + l
106            return result
107
108        return RepeatedFieldSize
109    else:
110        def FieldSize(value):
111            l = local_len(value.encode('gbk')) #注意序列化前计算长度时也需要使用与序列化相同的编码,否则会出错
112            return tag_size + local_VarintSize(l) + l
113
114        return FieldSize
115
116def ParseMessage(descriptor):
117  class _ResultClass(message.Message):
118    __metaclass__ = reflection.GeneratedProtocolMessageType
119    DESCRIPTOR = descriptor
120
121  new_msg = _ResultClass()
122  return new_msg
123
124def hacker(msg):
125    ParseMessage(msg.DESCRIPTOR)
126
127def hack_pb():
128    # 修改默认的字符串处理函数入口为自定义函数
129    type_checkers.TYPE_TO_DECODER[type_checkers._FieldDescriptor.TYPE_STRING] = StringDecoder
130    type_checkers.TYPE_TO_ENCODER[type_checkers._FieldDescriptor.TYPE_STRING] = StringEncoder
131    type_checkers.TYPE_TO_SIZER[type_checkers._FieldDescriptor.TYPE_STRING] = StringSizer
132
133    try:
134        # 这里加入我们需要修改的PB类,注意这里需要自行import DbProto模块
135        hacker(DbProto.DB_FriendAssetEntry_PB)
136    except Exception as e:
137        print(e)
138
139    # 还原字符串处理函数入口
140    type_checkers.TYPE_TO_DECODER[type_checkers._FieldDescriptor.TYPE_STRING] = decoder.StringDecoder
141    type_checkers.TYPE_TO_ENCODER[type_checkers._FieldDescriptor.TYPE_STRING] = encoder.StringEncoder
142    type_checkers.TYPE_TO_SIZER[type_checkers._FieldDescriptor.TYPE_STRING] = encoder.StringSizer
143
144#这里让其在引入模块时自动执行
145hack_pb()