(де)сериализатор для произвольных объектов в Python


Моя программа предназначена для (де-)сериализации произвольных объектов в Python. Это в основном замена для языка Python pickle модуль, который похож, но склонен к выполнению произвольного кода. Я думаю, что мне удалось сделать моей реализации более универсальным и более безопасным, чем pickleно я не специалист в безопасности.

Я пришел сюда, чтобы задать вопрос:

Мой алгоритм (де-)сериализации в безопасности? Есть ли риск произвольного выполнения кода, или что-нибудь еще, что может быть опасно?

Насколько я могу сказать, я избегал выполнения любой недоверенный код:

  • Классы сериализуются как полное имя нравится module.Class. Они десериализуются без импорта каких-либо модулей; модуль посмотрел сквозь sys.modules[module_name].
  • Необычные объекты имеют свои __dict__ сериализован. При десериализации, новый экземпляр создается путем вызова obj = object.__new__(cls) и obj.__dict__.update(serialized_dict).

- Я что-нибудь проглядел?


Код разделен на 3 файла.

Я прошу прощения за длину кода, но там не много я могу удалить. Что сказал, Я думаю, что "интересные" функции serialize, deserialize, deserialize_next и decode_type. Класс-специфические (де-)сериализации (как encode_dict и decode_dict) должна быть безопасной.

Во-первых, общедоступный интерфейс. Это Содержание public_api.py:

from .internals import *


def to_str(obj):
    """
    Serializes an object to a string.

    :param obj: the object to serialize
    :return: the serialized object
    """
    return serialize(obj)


def load(data):
    """
    Deserializes an object.
    The input can be a string, bytes, or file-like object.

    :param data: the data to deserialize, or a file containing that data
    :return: the deserialized object
    """

    if isinstance(data, str):
        pass
    elif isinstance(data, bytes):
        data = data.decode()
    else:
        return load(data.read())

    value, data = deserialize_next(data)
    if data:
        raise DeserializationException('left-over data remained after deserialization')

    return value

internals.py:

import sys
import ast
import inspect
import collections

from .exceptions import *


def serialize(obj):
    """
    serializes an object to a string of the form "CLASS,SIZE,VALUE", where

     - CLASS is the fully qualified name of the object's class
     - SIZE the length of VALUE in characters
     - VALUE is some sort of string representation of obj; the exact format depends on its class
    """

    cls = type(obj)
    try:
        serializer = SERIALIZERS[cls]
    except KeyError:
        # if this object isn't of a basic type, we'll serialize its __dict__
        try:
            obj = vars(obj)
        except TypeError:
            raise SerializationException('Cannot serialize object without a __dict__: {}'.format(obj))

        serializer = SERIALIZERS[dict]

    value = serializer(obj)
    return '{},{},{}'.format(encode_type(cls), len(value), value)


def deserialize(cls, data):
    try:
        deserializer = DESERIALIZERS[cls]
    except KeyError:
        # if this object isn't of a basic type, we'll instantiate a new
        # object of the correct type and update its __dict__.
        # None of this should execute any dangerous code.
        dic = deserialize(dict, data)
        obj = object.__new__(cls)
        obj.__dict__.update(dic)
    else:
        obj = deserializer(data)

    assert isinstance(obj, cls), (obj, cls)
    return obj


def deserialize_next(data):
    """
    Deserializes the first object in *data*, returning the deserialized object
    and the remaining data that needs to be deserialized.
    """
    # extract the class name
    i = data.find(',')
    if i == -1:
        raise DeserializationException('missing type field')

    clsname = data[:i]
    cls = decode_type(clsname)

    # extract the size in bytes
    j = data.find(',', i+1)
    if j == -1:
        raise DeserializationException('missing size field')

    try:
        size = int(data[i+1:j])
    except ValueError:
        raise DeserializationException('invalid value in size field (not a number)')

    # extract the value
    end = j+1+size
    value = data[j+1:end]
    value = deserialize(cls, value)

    return value, data[end:]


# class-specific (de-)serializers below
# =====================================

def encode_type(cls):
    module = inspect.getmodule(cls).__name__
    if module == 'builtins':
        return cls.__qualname__

    return '{}.{}'.format(module, cls.__qualname__)


def decode_type(qualname):
    names = qualname.split('.')

    if len(names) < 2:
        try:
            return getattr(sys.modules['builtins'], qualname)
        except AttributeError:
            raise DeserializationException('Missing module name in __qualname__: {}'.format(qualname))

    # look up the module in sys.modules; this avoids having to execute any import code
    obj = sys.modules[names[0]]
    for name in names[1:]:
        obj = getattr(obj, name)

    if not isinstance(obj, type):
        raise DeserializationException("Unknown type: {} (name resolved to {})".format(qualname, obj))

    return obj


def encode_list(lis):
    chunks = [serialize(value) for value in lis]
    return '[{}]'.format(', '.join(chunks))


def decode_list(data):
    lis = []

    if not data.startswith('['):
        raise DeserializationException('list should start with "[" character')
    data = data[1:]

    while True:
        value, data = deserialize_next(data)
        lis.append(value)

        if not data.startswith(', '):
            if data == ']':
                break
            raise DeserializationException('missing comma between list values')
        data = data[2:]

    return lis


def encode_dict(dic):
    chunks = []

    for key, value in dic.items():
        line = '{}: {}'.format(serialize(key), serialize(value))
        chunks.append(line)

    return '{{{}}}'.format(', '.join(chunks))


def decode_dict(data, dict_type=dict):
    dic = dict_type()

    if not data.startswith('{'):
        raise DeserializationException('dict should start with "{" character')
    data = data[1:]

    while True:
        key, data = deserialize_next(data)

        if not data.startswith(': '):
            raise DeserializationException('missing colon between dict key and value')
        data = data[2:]

        value, data = deserialize_next(data)
        dic[key] = value

        if not data.startswith(', '):
            if data == '}':
                break
            raise DeserializationException('missing comma between dict values')
        data = data[2:]

    return dic


SERIALIZERS = {int: str,
               float: str,
               complex: str,
               str: repr,
               bytes: repr,
               type: encode_type,
               list: encode_list,
               dict: encode_dict,
               collections.OrderedDict: encode_dict,
               }

DESERIALIZERS = {int: int,
                 float: float,
                 complex: complex,
                 str: ast.literal_eval,
                 bytes: ast.literal_eval,
                 type: decode_type,
                 list: decode_list,
                 dict: decode_dict,
                 collections.OrderedDict: lambda d: decode_dict(d, collections.OrderedDict),
                 }

И, наконец, exceptions.py:

class SerializationException(Exception):
    """Raised when data serialization fails"""


class DeserializationException(Exception):
    """Raised when data deserialization fails"""


150
4
задан 9 февраля 2018 в 11:02 Источник Поделиться
Комментарии