-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase.py
71 lines (63 loc) · 2.16 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import csv
import os
from crypto import AES_Cipher
from firebase_functions import db
firebase_db = db.reference("data")
class Database:
'''
initialize the database instance
'''
def __init__(self, database_name) -> None:
self.database_name = database_name
self.appendStream = open(database_name, "a+")
self.readStream = open(database_name, "r+")
self.writer = csv.writer(self.appendStream, delimiter=",", lineterminator="\n")
self.reader = csv.DictReader(
self.readStream, delimiter=",", lineterminator="\n"
)
self.rows = [*self.reader]
'''
write a row to the database
'''
def write(self, row):
if os.stat(self.database_name).st_size == 0:
self.writer.writerow(row.keys())
self.writer.writerow(row.values())
print(self.writer)
self.rows.append(row)
self.appendStream.flush()
firebase_db.push(row)
'''
prints all the rows in the database
'''
def show(self):
print(self.rows)
'''
find operaton on the database, loops through all the encrypted data to find the matching row
'''
def find(self, primary_keys,config):
print(primary_keys)
for row in self.rows:
if all(
AES_Cipher.verify(details["data"], row[key], details["isEncrypted"])
for key, details in primary_keys.items()
):
return {
key: row[key]
if not config["form_fields"][key]["isEncrypted"]
else AES_Cipher.decrypt(row[key])
for key in row
}
data = firebase_db.get()
for row in data:
if all(
AES_Cipher.verify(details["data"], row[key], details["isEncrypted"])
for key, details in primary_keys.items()
):
return {
key: row[key]
if not primary_keys[key]["isEncrypted"]
else AES_Cipher.decrypt(row[key])
for key in row
}
raise Exception("Row not found")