-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdb_handler.py
163 lines (145 loc) · 6.08 KB
/
db_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import sqlalchemy.exc
from sqlalchemy import create_engine, ForeignKey, Column, String, Integer, CHAR
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from exceptions import UserAlreadyExists, IncorrectCredentials
from dotenv import load_dotenv
import os
from argon2 import PasswordHasher
import password_hasher as ph
load_dotenv()
# Must be at top of file before classes or anything is loaded
# Prevent some error, I forget
Base = declarative_base()
# Create tables
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String, nullable=False)
email = Column(String, unique=True, nullable=False)
password = Column(String, nullable=False)
account_type = Column(String, nullable=False)
address = Column(String, nullable=False)
school = Column(String, nullable=False)
class Admin(Base):
__tablename__ = "admins"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String, nullable=False)
email = Column(String, unique=True, nullable=False)
password = Column(String, nullable=False)
account_type = Column(String, nullable=False)
school = Column(String, nullable=False)
class db_handler:
@staticmethod
def register(name: str, email: str, password: str, address: str, school: str):
try:
hashed_passw = ph.hash_pw(password, email)
user = User(name=name, email=email, password=hashed_passw, account_type="parent", address=address, school=school)
session.rollback()
session.add(user)
session.commit()
except sqlalchemy.exc.IntegrityError:
raise UserAlreadyExists("User already exists!", status_code=422)
return True # "User Added Successfully"
@staticmethod
def admin_register(name: str, email: str, password: str, school: str):
try:
hashed_passw = ph.hash_pw(password, email)
admin = Admin(name=name, email=email, password=hashed_passw, account_type="admin", school=school)
session.rollback()
session.add(admin)
session.commit()
except sqlalchemy.exc.IntegrityError as e:
print(e)
raise UserAlreadyExists("User already exists!", status_code=422)
return True # "User Added Successfully"
@staticmethod
def admin_login(email: str, password: str):
try:
cursor = session.query(Admin).filter(Admin.email == email)
for user in cursor:
print(user.password, str(user.password))
if ph.verify(user.password, password, email):
return True # "Successfully authenticated"
raise IncorrectCredentials("Incorrect Credentials")
raise IncorrectCredentials("Incorrect Credentials")
except UnboundLocalError: # No user found
print("No user found")
raise IncorrectCredentials("Incorrect Credentials")
@staticmethod
def login(email: str, password: str):
try:
cursor = session.query(User).filter(User.email == email)
session.rollback()
for user in cursor:
print(user.password, str(user.password))
if ph.verify(user.password, password, email):
return True # "Successfully authenticated"
raise IncorrectCredentials("Incorrect Credentials")
raise IncorrectCredentials("Incorrect Credentials")
except UnboundLocalError: # No user found
print("No user found")
raise IncorrectCredentials("Incorrect Credentials")
@staticmethod
def get_address(email: str):
try:
cursor = session.query(User).filter(User.email == email)
for user in cursor:
return user.address
raise IncorrectCredentials("Incorrect Credentials")
except UnboundLocalError: # No user found
print("No user found")
raise IncorrectCredentials("Incorrect Credentials")
@staticmethod
def get_school(email: str):
try:
cursor = session.query(User).filter(User.email == email)
for user in cursor:
return user.school
raise IncorrectCredentials("Incorrect Credentials")
except UnboundLocalError: # No user found
print("No user found")
raise IncorrectCredentials("Incorrect Credentials")
@staticmethod
def get_name(email: str):
try:
cursor = session.query(User).filter(User.email == email)
for user in cursor:
return user.name
raise IncorrectCredentials("Incorrect Credentials")
except UnboundLocalError: # No user found
print("No user found")
raise IncorrectCredentials("Incorrect Credentials")
@staticmethod
def get_admin_name(email: str):
try:
cursor = session.query(Admin).filter(Admin.email == email)
for user in cursor:
return user.name
raise IncorrectCredentials("Incorrect Credentials")
except UnboundLocalError: # No user found
print("No user found")
raise IncorrectCredentials("Incorrect Credentials")
@staticmethod
def get_admin_school(email: str):
try:
cursor = session.query(Admin).filter(Admin.email == email)
for user in cursor:
return user.school
raise IncorrectCredentials("Incorrect Credentials")
except UnboundLocalError: # No user found
print("No user found")
raise IncorrectCredentials("Incorrect Credentials")
@staticmethod
def get_addrs():
cursor = session.query(User.address)
addrs = []
for addr in cursor:
addrs.append(addr)
return addrs
# Sqlalchemy boilerplate
# Generating sqlalchemy stuff
engine = create_engine(os.getenv("DB_URL"))
Base.metadata.create_all(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()