Библиотека алгоритма декомпрессии


Меня попросили написать библиотеку декомпрессор для процесса интервью. Это библиотека декомпрессии по алгоритму LZW. Я заставил его работать и пытались его осуществить. Я ищу какие-либо отзывы о структура, презентация, читабельность от профессионалов, которые работали в качестве инженеров программного обеспечения.

На LZW объяснение. Во второй таблице в разделе распаковки объясняет, как словарь построен и размер ключа ограничивается 12-бит в моем случае.

"""
Decompressor library for Lempel–Ziv–Welch compression algorithm.
Main Functions:
    decompress(file_path)
    extract_archive_to_file(input_path, output_path)
Test code in the end.
"""


def read_compressed_codes_from_archive(file_path: str) -> [int]:
    """
    Reads file at specified path and returns content as an int array.
    :param file_path: path to archive
    :return: array of int codes
    """
    compressed_codes = []
    with open(file_path, "rb") as archive:
        # Read the 3 bytes, which contain two 12-bit codes
        bytes = archive.read(3)
        while bytes:
            if len(bytes) == 3:
                code1, code2 = unpack_codes_from_3_bytes(bytes)

                compressed_codes.append(code1)
                compressed_codes.append(code2)
            elif len(bytes) == 2:
                # Odd number of codes in the end of the file
                # Extract a single 12-bit code which is padded to 16-bits
                code = (0b0000111111111111 & (bytes[0] << 8)) | bytes[1]
                compressed_codes.append(code)

            bytes = archive.read(3)

    return compressed_codes


def unpack_codes_from_3_bytes(packed_bytes: [bytes]) -> (int, int):
    """
    Extracts two 12-bit numbers from 3 bytes (24-bits)
    :param packed_bytes: array of 3 int bytes
    :return: extracted int codes
    """
    # Create 16-bit int vars to store extracted codes in
    code1 = 0b0000111111111111
    code2 = 0b0000111111111111

    # Copy over 8 most significant bits and shift them to correct place
    code1 = code1 & packed_bytes[0] << 4
    # Copy over the 4 least significant bits from second byte
    code1 = code1 | (packed_bytes[1] >> 4)

    # Copy over the 4 most significant bits from second byte
    code2 = code2 & packed_bytes[1] << 8
    # Copy over the 8 least significant bits from third byte
    code2 = code2 | packed_bytes[2]

    return code1, code2


def convert_codes_to_text(compressed_codes: [int]) -> str:
    """
    Decompresses codes into text data.
    :param compressed_codes: compressed codes from archive
    :return: string data from compressed codes
    """
    # Create initial dictionary with individual char compressed_codes
    dictionary_size = 256
    dictionary = {i: chr(i) for i in range(dictionary_size)}

    text_data = ""
    previous_code = None

    # Loop over compressed compressed_codes and convert them to text values.
    for code in compressed_codes:
        if code in dictionary:
            text_data += dictionary[code]

            if previous_code:
                # Make sure new value is not already in the dict,
                # otherwise add it to the end.
                if dictionary[previous_code] + dictionary[code][0] not in \
                        dictionary.keys():
                    dictionary[dictionary_size] = dictionary[previous_code] \
                                                + dictionary[code][0]

                    dictionary_size += 1
                    # If dictionary is full, reset back to initial dict
                    # That is 4096 in binary ie one and 12 zeros
                    if dictionary_size == 0b1000000000000:
                        dictionary_size = 256
        else:
            assert ("Decompression error. Code {} not present in "
                    "dictionary".format(code))

        previous_code = code

    return text_data


def decompress(file_path: str) -> str:
    """
    Decompress an archive and get the text data.
    :param filepath: path to archive
    :return: text data as string
    """
    codes = read_compressed_codes_from_archive(file_path)
    text_data = convert_codes_to_text(codes)
    return text_data


def extract_archive_to_file(input_path: str, output_path: str):
    """
    Extracts an archive and saves it on disk.
    :param input_path: path to archive
    :param output_path: path to extracted file
    :return: none
    """
    text_data = decompress(input_path)
    with open(output_path, "w") as output_file:
        output_file.write(text_data)


def test_library():
    """
    Example test function.
    """
    file_path = "LzwInputData/compressedfile3.z"
    text_data = decompress(file_path)
    print(text_data)

    output_path = "LzwInputData/compressedfile3.txt"
    extract_archive_to_file(file_path, output_path)

test_library()

Спасибо



681
5
задан 16 марта 2018 в 01:03 Источник Поделиться
Комментарии
2 ответа

Вообще это похоже на хорошо написанный, хорошо документированы, чистый код Python - так в целом хорошая работа! Но если я должен быть придирчивым, есть несколько вещей, которые я мог бы упомянуть...

code1, code2 = unpack_codes_from_3_bytes(bytes)
compressed_codes.append(code1)
compressed_codes.append(code2)

Может быть сокращен до:

compressed_codes.extend(unpack_codes_from_3_bytes(bytes))

Я немного неясно, почему вы делаете:

if dictionary_size == 0b1000000000000:

В начале, вы определяете его как int (dictionary_size = 256), то единственным изменением инт Дополнение (dictionary_size += 1) так почему двоичное сравнение?

Вы можете объяснить, почему вы используете assert()? В модуле, как это я обычно предпочитаю либо исключений явно - даже так далеко, чтобы создать пользовательское исключение занятия, так что конечный пользователь модуля может поймать их легко - или если это для отчетности, использование logging.error или похожие.

Есть несколько мест, где можно создавать переменные и используются один раз, например:

text_data = convert_codes_to_text(codes)
return text_data

может быть:

return convert_codes_to_text(codes)

или:

text_data = decompress(input_path)
with open(output_path, "w") as output_file:
output_file.write(text_data)

может быть:

with open(output_path, "w") as output_file:
output_file.write(decompress(input_path))

Наконец, not in dictionary.keys(): может просто быть not in dictionary:

3
ответ дан 16 марта 2018 в 01:03 Источник Поделиться

Во-первых, я бы рекомендовал вам изменить интерфейс API read_compressed_codes_from_archive чтобы не брать пути к файлу, но файл-как объект, см. Этот пост. Это оставляет вас намного больше свободы, если вы когда-нибудь хотели уйти от только чтения из файла. В качестве примера, приложение (не в Python), который я использовал, чтобы работать на в итоге столкнулся с вопросами, где 3-участник API, которые мы использовали только имена, и в итоге мы получили сообщения об ошибке, что он разбился на большие файлы. Получается, что так, как они разработали свои библиотеки, они могут только обработать файл при загрузке всю вещь сразу, а не через поток или загрузка файла последовательно. Явно поддерживая поток или что-то подобное с самого начала-это хороший способ справиться с этим.

Далее, вы должны сделать читатель генератора, по очень схожим причинам. В целом, я бы, наверное, переписать код, чтобы выглядеть так (Обратите внимание, я не знаю, что правильное DOC и синтаксис ввода для генераторов, поэтому я оставил как есть).

import typing

def read_compressed_codes_from_archive(archive: typing.BinaryIO) -> [int]:
"""
Reads file at specified path and returns content as an int array.
:param file_path: path to archive
:return: array of int codes
"""

bytes = archive.read(3)

while bytes:
if len(bytes) == 3:
code1, code2 = unpack_codes_from_3_bytes(bytes)

yield code1
yield code2
elif len(bytes) == 2:
yield (TWELVE_BIT_MASK & (bytes[0] << 8)) | bytes[1]

bytes = archive.read(3)

Я также определена глобальная Константа, TWELVE_BIT_MASK = 0b0000111111111111 так будет более понятно, что там происходит в действительности. Тогда мы можем упростить бит-сложа немного; получение первых 12-битный номер, который не требует какой-либо маскировки. Оттуда, я хотел тогда двигаться все немного вертел в unpack_codes_from_3_bytesи использовать yield from чтобы получить все значения, независимо от регистра. Наконец, мы можем написать функцию для обработки каждого случая применение получают первые двенадцать и двенадцать бит, вот так:

import typing

def read_compressed_codes_from_archive(archive: typing.BinaryIO) -> [int]
bytes = archive.read(3)

while bytes:
yield from unpack_codes_from_3_bytes(bytes)
bytes = archive.read(3)

def unpack_codes_from_3_bytes(packed_bytes: [bytes]) -> int:

if len(bytes) == 3:
yield get_first_twelve_bits(packed_bytes)
yield get_last_twelve_bits(packed_bytes)

def get_first_twelve_bits(packed_bytes: [bytes]) -> int:
return (packed_bytes[0] << 4) | (packed_bytes[1] >> 4)

def get_last_twelve_bits(packed_bytes: [bytes]) -> int:
return (TWELVE_BIT_MASK & packed_bytes[-2] << 8) | packed_bytes[-1]

Вы могли бы даже рассмотреть создание класса, представляющего архив на LZW повторяемое, как так (с верхней части моей головы, поэтому не может быть правильным)

class LzwArchive:

def __init__(self, archive: typing.BinaryIO):
self.archive = archive

def __iter__(self):
codes = self.archive.read(3)

while codes:
yield from LzwArchive._unpack_codes(codes)
codes = self.archive.read(3)

@staticmethod
def _unpack_codes(codes: [bytes]) -> [int]:
if len(codes) == 3:
yield LzwArchive._get_first_twelve_bits(codes)
yield LzwArchive._get_last_twelve_bits(codes)

@staticmethod
def _get_first_twelve_bits(packed_bytes: [bytes]) -> int:
return (packed_bytes[0] << 4) | (packed_bytes[1] >> 4)

@staticmethod
def _get_last_twelve_bits(packed_bytes: [bytes]) -> int:
return (TWELVE_BIT_MASK & packed_bytes[-2] << 8) | packed_bytes[-1]

Тогда вы могли бы просто сделать что-то вроде этого:

with open(myfile, 'rb') as f:
archive = LzwArchive(f)
for code in archive:
process(code)

Давайте держать генератора! Мы можем сделать convert_codes_to_text повторяемое также

def convert_codes_to_text(compressed_codes: [int]) -> str:
dictionary_size = MIN_DICT_SIZE
dictionary = {i: chr(i) for i in range(dictionary_size)}

text_data = ""
previous_code = None

# Loop over compressed compressed_codes and convert them to text values.
for code in compressed_codes:
try:
yield dictionary[code]
except KeyError:
# Do your thing here, but something better than an assert

if previous_code:
new_code = dictionary[code][0] + dictionary[previous_code]
if new_code not in dictionary:
dictionary[dictionary_size] = new_code
dictionary_size += 1

if dictionary_size = MAX_DICT_SIZE:
dictionary_size = MIN_DICT_SIZE

previous_code = code

Он чувствует, как мы делаем слишком много работы, чтобы поддерживать здесь наш словарь, так что я думаю, мы можем сделать вполне разумный структуры данных. Мы могли бы баловаться с LzwDictionary чтобы сделать его подклассом dict и действительно крякающий dict (на самом деле, подклассы defaultdict и/или OrderedDict может вам что-то очень круто) это больше, чем нужно прямо сейчас.

def convert_codes_to_text(compressed_codes: [int]) -> str:
dictionary = LzwDictionary()
previous_code = None

for code in compressed_codes:
try:
yield dictionary[code]
except KeyError:
# Do your thing here, but something better than an assert

if previous_code:
new_code = dictionary[code][0] + dictionary[previous_code]
if new_code not in dictionary:
dictionary.add_code(new_code)

previous_code = code

class LzwDictionary:
MAX_SIZE = 4096
MIN_SIZE = 256

def __init__(self):
self.data = {i: chr(i) for i in range(LzwDictionary.MIN_SIZE)}
self.size = LzwDictionary.MIN_SIZE

def __getitem__(self, key):
return self.data[key]

def __contains__(self, key):
return key in self.data

def add_code(self, code):
self.data[self.size] = code
self.size += 1
self._check_size()

def _check_size(self):
if self.size > LzwDictionary.MAX_SIZE:
self.size = LzwDictionary.MIN_SIZE

На данный момент мы просто должны использовать итератор, вы могли бы сделать ''.join(convert_codes_to_text(codes)) или напишите пошагово. Я бы, наверное, получиться что-то вроде этого:

def extract_archive_to_file(input_path, output_path):

with open(input_path, 'rb') as lz_file:
archive = LzwArchive(lz_file)
decompressed = convert_codes_to_text(archive)
with open(output_path, 'w') as out_file:
for character in decompressed:
out_file.write(character)

Писал все это без доступа в Python, поэтому, пожалуйста, простите любые опечатки или ошибки.

2
ответ дан 19 марта 2018 в 09:03 Источник Поделиться