You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
231 lines
8.9 KiB
231 lines
8.9 KiB
#! /usr/bin/python |
|
# -*- coding: utf-8 -*- |
|
# |
|
# Protocol Buffers - Google's data interchange format |
|
# Copyright 2008 Google Inc. All rights reserved. |
|
# https://developers.google.com/protocol-buffers/ |
|
# |
|
# Redistribution and use in source and binary forms, with or without |
|
# modification, are permitted provided that the following conditions are |
|
# met: |
|
# |
|
# * Redistributions of source code must retain the above copyright |
|
# notice, this list of conditions and the following disclaimer. |
|
# * Redistributions in binary form must reproduce the above |
|
# copyright notice, this list of conditions and the following disclaimer |
|
# in the documentation and/or other materials provided with the |
|
# distribution. |
|
# * Neither the name of Google Inc. nor the names of its |
|
# contributors may be used to endorse or promote products derived from |
|
# this software without specific prior written permission. |
|
# |
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
|
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
|
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
|
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
|
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|
|
|
"""Test for preservation of unknown fields in the pure Python implementation.""" |
|
|
|
__author__ = 'bohdank@google.com (Bohdan Koval)' |
|
|
|
from google.apputils import basetest |
|
from google.protobuf import unittest_mset_pb2 |
|
from google.protobuf import unittest_pb2 |
|
from google.protobuf.internal import encoder |
|
from google.protobuf.internal import missing_enum_values_pb2 |
|
from google.protobuf.internal import test_util |
|
from google.protobuf.internal import type_checkers |
|
|
|
|
|
class UnknownFieldsTest(basetest.TestCase): |
|
|
|
def setUp(self): |
|
self.descriptor = unittest_pb2.TestAllTypes.DESCRIPTOR |
|
self.all_fields = unittest_pb2.TestAllTypes() |
|
test_util.SetAllFields(self.all_fields) |
|
self.all_fields_data = self.all_fields.SerializeToString() |
|
self.empty_message = unittest_pb2.TestEmptyMessage() |
|
self.empty_message.ParseFromString(self.all_fields_data) |
|
self.unknown_fields = self.empty_message._unknown_fields |
|
|
|
def GetField(self, name): |
|
field_descriptor = self.descriptor.fields_by_name[name] |
|
wire_type = type_checkers.FIELD_TYPE_TO_WIRE_TYPE[field_descriptor.type] |
|
field_tag = encoder.TagBytes(field_descriptor.number, wire_type) |
|
result_dict = {} |
|
for tag_bytes, value in self.unknown_fields: |
|
if tag_bytes == field_tag: |
|
decoder = unittest_pb2.TestAllTypes._decoders_by_tag[tag_bytes][0] |
|
decoder(value, 0, len(value), self.all_fields, result_dict) |
|
return result_dict[field_descriptor] |
|
|
|
def testEnum(self): |
|
value = self.GetField('optional_nested_enum') |
|
self.assertEqual(self.all_fields.optional_nested_enum, value) |
|
|
|
def testRepeatedEnum(self): |
|
value = self.GetField('repeated_nested_enum') |
|
self.assertEqual(self.all_fields.repeated_nested_enum, value) |
|
|
|
def testVarint(self): |
|
value = self.GetField('optional_int32') |
|
self.assertEqual(self.all_fields.optional_int32, value) |
|
|
|
def testFixed32(self): |
|
value = self.GetField('optional_fixed32') |
|
self.assertEqual(self.all_fields.optional_fixed32, value) |
|
|
|
def testFixed64(self): |
|
value = self.GetField('optional_fixed64') |
|
self.assertEqual(self.all_fields.optional_fixed64, value) |
|
|
|
def testLengthDelimited(self): |
|
value = self.GetField('optional_string') |
|
self.assertEqual(self.all_fields.optional_string, value) |
|
|
|
def testGroup(self): |
|
value = self.GetField('optionalgroup') |
|
self.assertEqual(self.all_fields.optionalgroup, value) |
|
|
|
def testSerialize(self): |
|
data = self.empty_message.SerializeToString() |
|
|
|
# Don't use assertEqual because we don't want to dump raw binary data to |
|
# stdout. |
|
self.assertTrue(data == self.all_fields_data) |
|
|
|
def testCopyFrom(self): |
|
message = unittest_pb2.TestEmptyMessage() |
|
message.CopyFrom(self.empty_message) |
|
self.assertEqual(self.unknown_fields, message._unknown_fields) |
|
|
|
def testMergeFrom(self): |
|
message = unittest_pb2.TestAllTypes() |
|
message.optional_int32 = 1 |
|
message.optional_uint32 = 2 |
|
source = unittest_pb2.TestEmptyMessage() |
|
source.ParseFromString(message.SerializeToString()) |
|
|
|
message.ClearField('optional_int32') |
|
message.optional_int64 = 3 |
|
message.optional_uint32 = 4 |
|
destination = unittest_pb2.TestEmptyMessage() |
|
destination.ParseFromString(message.SerializeToString()) |
|
unknown_fields = destination._unknown_fields[:] |
|
|
|
destination.MergeFrom(source) |
|
self.assertEqual(unknown_fields + source._unknown_fields, |
|
destination._unknown_fields) |
|
|
|
def testClear(self): |
|
self.empty_message.Clear() |
|
self.assertEqual(0, len(self.empty_message._unknown_fields)) |
|
|
|
def testByteSize(self): |
|
self.assertEqual(self.all_fields.ByteSize(), self.empty_message.ByteSize()) |
|
|
|
def testUnknownExtensions(self): |
|
message = unittest_pb2.TestEmptyMessageWithExtensions() |
|
message.ParseFromString(self.all_fields_data) |
|
self.assertEqual(self.empty_message._unknown_fields, |
|
message._unknown_fields) |
|
|
|
def testListFields(self): |
|
# Make sure ListFields doesn't return unknown fields. |
|
self.assertEqual(0, len(self.empty_message.ListFields())) |
|
|
|
def testSerializeMessageSetWireFormatUnknownExtension(self): |
|
# Create a message using the message set wire format with an unknown |
|
# message. |
|
raw = unittest_mset_pb2.RawMessageSet() |
|
|
|
# Add an unknown extension. |
|
item = raw.item.add() |
|
item.type_id = 1545009 |
|
message1 = unittest_mset_pb2.TestMessageSetExtension1() |
|
message1.i = 12345 |
|
item.message = message1.SerializeToString() |
|
|
|
serialized = raw.SerializeToString() |
|
|
|
# Parse message using the message set wire format. |
|
proto = unittest_mset_pb2.TestMessageSet() |
|
proto.MergeFromString(serialized) |
|
|
|
# Verify that the unknown extension is serialized unchanged |
|
reserialized = proto.SerializeToString() |
|
new_raw = unittest_mset_pb2.RawMessageSet() |
|
new_raw.MergeFromString(reserialized) |
|
self.assertEqual(raw, new_raw) |
|
|
|
def testEquals(self): |
|
message = unittest_pb2.TestEmptyMessage() |
|
message.ParseFromString(self.all_fields_data) |
|
self.assertEqual(self.empty_message, message) |
|
|
|
self.all_fields.ClearField('optional_string') |
|
message.ParseFromString(self.all_fields.SerializeToString()) |
|
self.assertNotEqual(self.empty_message, message) |
|
|
|
|
|
class UnknownFieldsTest(basetest.TestCase): |
|
|
|
def setUp(self): |
|
self.descriptor = missing_enum_values_pb2.TestEnumValues.DESCRIPTOR |
|
|
|
self.message = missing_enum_values_pb2.TestEnumValues() |
|
self.message.optional_nested_enum = ( |
|
missing_enum_values_pb2.TestEnumValues.ZERO) |
|
self.message.repeated_nested_enum.extend([ |
|
missing_enum_values_pb2.TestEnumValues.ZERO, |
|
missing_enum_values_pb2.TestEnumValues.ONE, |
|
]) |
|
self.message.packed_nested_enum.extend([ |
|
missing_enum_values_pb2.TestEnumValues.ZERO, |
|
missing_enum_values_pb2.TestEnumValues.ONE, |
|
]) |
|
self.message_data = self.message.SerializeToString() |
|
self.missing_message = missing_enum_values_pb2.TestMissingEnumValues() |
|
self.missing_message.ParseFromString(self.message_data) |
|
self.unknown_fields = self.missing_message._unknown_fields |
|
|
|
def GetField(self, name): |
|
field_descriptor = self.descriptor.fields_by_name[name] |
|
wire_type = type_checkers.FIELD_TYPE_TO_WIRE_TYPE[field_descriptor.type] |
|
field_tag = encoder.TagBytes(field_descriptor.number, wire_type) |
|
result_dict = {} |
|
for tag_bytes, value in self.unknown_fields: |
|
if tag_bytes == field_tag: |
|
decoder = missing_enum_values_pb2.TestEnumValues._decoders_by_tag[ |
|
tag_bytes][0] |
|
decoder(value, 0, len(value), self.message, result_dict) |
|
return result_dict[field_descriptor] |
|
|
|
def testUnknownEnumValue(self): |
|
self.assertFalse(self.missing_message.HasField('optional_nested_enum')) |
|
value = self.GetField('optional_nested_enum') |
|
self.assertEqual(self.message.optional_nested_enum, value) |
|
|
|
def testUnknownRepeatedEnumValue(self): |
|
value = self.GetField('repeated_nested_enum') |
|
self.assertEqual(self.message.repeated_nested_enum, value) |
|
|
|
def testUnknownPackedEnumValue(self): |
|
value = self.GetField('packed_nested_enum') |
|
self.assertEqual(self.message.packed_nested_enum, value) |
|
|
|
def testRoundTrip(self): |
|
new_message = missing_enum_values_pb2.TestEnumValues() |
|
new_message.ParseFromString(self.missing_message.SerializeToString()) |
|
self.assertEqual(self.message, new_message) |
|
|
|
|
|
if __name__ == '__main__': |
|
basetest.main()
|
|
|