Skip to content

Add lempel ziv compression #2107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions compression/lempel_ziv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""
One of the several implementations of Lempel–Ziv–Welch compression algorithm
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch
"""

import math
import os
import sys


def read_file_binary(file_path: str) -> str:
"""
Reads given file as bytes and returns them as a long string
"""
result = ""
try:
with open(file_path, "rb") as binary_file:
data = binary_file.read()
for dat in data:
curr_byte = "{0:08b}".format(dat)
result += curr_byte
return result
except IOError:
print("File not accessible")
sys.exit()


def add_key_to_lexicon(
lexicon: dict, curr_string: str, index: int, last_match_id: int
) -> None:
"""
Adds new strings (curr_string + "0", curr_string + "1") to the lexicon
"""
lexicon.pop(curr_string)
lexicon[curr_string + "0"] = last_match_id

if math.log2(index).is_integer():
for curr_key in lexicon:
lexicon[curr_key] = "0" + lexicon[curr_key]

lexicon[curr_string + "1"] = bin(index)[2:]


def compress_data(data_bits: str) -> str:
"""
Compresses given data_bits using Lempel–Ziv–Welch compression algorithm
and returns the result as a string
"""
lexicon = {"0": "0", "1": "1"}
result, curr_string = "", ""
index = len(lexicon)

for i in range(len(data_bits)):
curr_string += data_bits[i]
if curr_string not in lexicon:
continue

last_match_id = lexicon[curr_string]
result += last_match_id
add_key_to_lexicon(lexicon, curr_string, index, last_match_id)
index += 1
curr_string = ""

while curr_string != "" and curr_string not in lexicon:
curr_string += "0"

if curr_string != "":
last_match_id = lexicon[curr_string]
result += last_match_id

return result


def add_file_length(source_path: str, compressed: str) -> str:
"""
Adds given file's length in front (using Elias gamma coding) of the compressed
string
"""
file_length = os.path.getsize(source_path)
file_length_binary = bin(file_length)[2:]
length_length = len(file_length_binary)

return "0" * (length_length - 1) + file_length_binary + compressed


def write_file_binary(file_path: str, to_write: str) -> None:
"""
Writes given to_write string (should only consist of 0's and 1's) as bytes in the
file
"""
byte_length = 8
try:
with open(file_path, "wb") as opened_file:
result_byte_array = [
to_write[i : i + byte_length]
for i in range(0, len(to_write), byte_length)
]

if len(result_byte_array[-1]) % byte_length == 0:
result_byte_array.append("10000000")
else:
result_byte_array[-1] += "1" + "0" * (
byte_length - len(result_byte_array[-1]) - 1
)

for elem in result_byte_array:
opened_file.write(int(elem, 2).to_bytes(1, byteorder="big"))
except IOError:
print("File not accessible")
sys.exit()


def compress(source_path, destination_path: str) -> None:
"""
Reads source file, compresses it and writes the compressed result in destination
file
"""
data_bits = read_file_binary(source_path)
compressed = compress_data(data_bits)
compressed = add_file_length(source_path, compressed)
write_file_binary(destination_path, compressed)


if __name__ == "__main__":
compress(sys.argv[1], sys.argv[2])
111 changes: 111 additions & 0 deletions compression/lempel_ziv_decompress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""
One of the several implementations of Lempel–Ziv–Welch decompression algorithm
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch
"""

import math
import sys


def read_file_binary(file_path: str) -> str:
"""
Reads given file as bytes and returns them as a long string
"""
result = ""
try:
with open(file_path, "rb") as binary_file:
data = binary_file.read()
for dat in data:
curr_byte = "{0:08b}".format(dat)
result += curr_byte
return result
except IOError:
print("File not accessible")
sys.exit()


def decompress_data(data_bits: str) -> str:
"""
Decompresses given data_bits using Lempel–Ziv–Welch compression algorithm
and returns the result as a string
"""
lexicon = {"0": "0", "1": "1"}
result, curr_string = "", ""
index = len(lexicon)

for i in range(len(data_bits)):
curr_string += data_bits[i]
if curr_string not in lexicon:
continue

last_match_id = lexicon[curr_string]
result += last_match_id
lexicon[curr_string] = last_match_id + "0"

if math.log2(index).is_integer():
newLex = {}
for curr_key in list(lexicon):
newLex["0" + curr_key] = lexicon.pop(curr_key)
lexicon = newLex

lexicon[bin(index)[2:]] = last_match_id + "1"
index += 1
curr_string = ""
return result


def write_file_binary(file_path: str, to_write: str) -> None:
"""
Writes given to_write string (should only consist of 0's and 1's) as bytes in the
file
"""
byte_length = 8
try:
with open(file_path, "wb") as opened_file:
result_byte_array = [
to_write[i : i + byte_length]
for i in range(0, len(to_write), byte_length)
]

if len(result_byte_array[-1]) % byte_length == 0:
result_byte_array.append("10000000")
else:
result_byte_array[-1] += "1" + "0" * (
byte_length - len(result_byte_array[-1]) - 1
)

for elem in result_byte_array[:-1]:
opened_file.write(int(elem, 2).to_bytes(1, byteorder="big"))
except IOError:
print("File not accessible")
sys.exit()


def remove_prefix(data_bits: str) -> str:
"""
Removes size prefix, that compressed file should have
Returns the result
"""
counter = 0
for letter in data_bits:
if letter == "1":
break
counter += 1

data_bits = data_bits[counter:]
data_bits = data_bits[counter + 1 :]
return data_bits


def compress(source_path: str, destination_path: str) -> None:
"""
Reads source file, decompresses it and writes the result in destination file
"""
data_bits = read_file_binary(source_path)
data_bits = remove_prefix(data_bits)
decompressed = decompress_data(data_bits)
write_file_binary(destination_path, decompressed)


if __name__ == "__main__":
compress(sys.argv[1], sys.argv[2])