-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhashed_file.py
87 lines (71 loc) · 2.89 KB
/
hashed_file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from __future__ import annotations
import logging
import os
from collections import deque
from math import ceil
import xxhash
from PySide6.QtCore import QByteArray
from PySide6.QtCore import QObject, QFile, QIODevice
class HashedFile(QObject):
__slots__ = ('__blocksize', '__filepath', '__file_handle', '__hasher',
'__hash_data', '__filesize', '__max_blocks_amount')
def __init__(self, path: os.path):
super(HashedFile, self).__init__()
if not os.path.isfile(path):
raise FileNotFoundError(f"Path {path} is a directory")
self.__blocksize = 512
self.__filepath = path
self.__hasher = xxhash.xxh64()
self.__hash_data = deque()
self.__filesize = os.stat(self.__filepath).st_size
self.__max_blocks_amount = ceil(self.__filesize / self.__blocksize)
def __enter__(self):
try:
self.__file_handle = QFile(self.get_file_path())
self.__file_handle.open(QIODevice.ReadOnly)
except Exception as e:
logging.error(f"Failed to open file: {e}")
raise
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.__file_handle:
self.__file_handle.close()
def __eq__(self, other: HashedFile) -> bool:
if self.get_file_size() != other.get_file_size():
return False
for i in range(self.__max_blocks_amount):
if self.get_hash_node(i) != other.get_hash_node(i):
return False
return True
def __hash__(self):
return hash(self.get_file_path())
def get_file_path(self) -> os.path:
return os.path.abspath(self.__filepath)
def get_file_size(self) -> int:
return self.__filesize
def get_hash_node(self, block_index: int) -> QByteArray:
if block_index >= self.__max_blocks_amount or block_index < 0:
logging.error(f"Invalid block index value: {block_index}")
raise ValueError("Invalid block index value")
if block_index >= len(self.__hash_data):
self.get_chunk_hash()
return self.__hash_data[block_index]
def get_chunk_hash(self):
try:
self.__file_handle.seek(self.__blocksize * len(self.__hash_data))
data = self.__file_handle.read(self.__blocksize)
self.__hasher.update(data.data())
hash = self.__hasher.hexdigest()
self.__hash_data.append(hash)
logging.debug(f"Calculated hash for block {len(self.__hash_data) - 1}: {hash}")
return hash
except Exception as e:
logging.error(f"Failed to calculate hash: {e}")
raise
def get_base_name(self):
return os.path.basename(self.get_file_path())
def reset(self):
self.__file_handle.seek(0)
self.__hash_data.clear()
def end_of_file(self) -> bool:
return self.__file_handle.atEnd()