Skip to content

Commit

Permalink
Combine NFC, MXID, and phone number door access logic (#552)
Browse files Browse the repository at this point in the history
* Refactor NFC, MXID, and phone number door access into one

* Move phone_list next to list

* filter the user with the nfccard

---------

Co-authored-by: Tatu Wikman <tatu.wikman@gmail.com>
  • Loading branch information
drjaska and tswfi authored Feb 2, 2025
1 parent 336ac0f commit 5aa3c8c
Showing 1 changed file with 78 additions and 127 deletions.
205 changes: 78 additions & 127 deletions api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from rest_framework.response import Response
from rest_framework.throttling import AnonRateThrottle
from rest_framework_tracking.mixins import LoggingMixin
from users.models import CustomUser, NFCCard, ServiceSubscription
from users.models import CustomUser, ServiceSubscription

from utils.phonenumber import normalize_number

Expand Down Expand Up @@ -49,173 +49,124 @@ class AccessViewSet(LoggingMixin, mixins.ListModelMixin, viewsets.GenericViewSet
# default queryset as none
queryset = CustomUser.objects.none

@action(detail=False, methods=["post"], throttle_classes=[VerySlowThrottle])
def phone(self, request, format=None):
"""
Check if the phone number is allowed to access and return some user data
to caller.
call with something like this
http POST http://127.0.0.1:8000/api/v1/access/phone/ deviceid=asdf payload=0440431918
returns 200 ok with some user data if everything is fine and 4XX for other situations
users with enough power will also get a list of all users with door access with this endpoint
"""

inserializer = AccessDataSerializer(data=request.data)
inserializer.is_valid(raise_exception=True)

# check that we know which device this is
deviceqs = AccessDevice.objects.all()
deviceid = inserializer.validated_data.get("deviceid")
device = get_object_or_404(deviceqs, deviceid=deviceid)
logging.debug(f"found device {device}")

# phone number comes in payload, but it is in a wrong format
# the number will most probably start with 00 instead of +

number = inserializer.validated_data.get("payload")
number = normalize_number(number)
qs = CustomUser.objects.filter(phone=number)

# nothing found, 480
if qs.count() == 0:
return Response(status=480)

# our user
user = qs.first()

# user does not have access rights
if not user.has_door_access():
user.log("Door access denied with phone")
door_access_denied.send(sender=self.__class__, user=user, method="phone")
outserializer = UserAccessSerializer(user)
return Response(outserializer.data, status=481)

user.log("Door opened with phone")
outserializer = UserAccessSerializer(user)
return Response(outserializer.data)

@phone.mapping.get
def phone_list(self, request, format=None):
"""
List all phone access users
"""
# only for superusers
if not request.user or not request.user.is_superuser:
return Response(status=status.HTTP_401_UNAUTHORIZED)

# collect list of all users that have door access
users_with_door_access = []
for ss in (
ServiceSubscription.objects.select_related("user")
.filter(service=config.DEFAULT_ACCOUNT_SERVICE)
.filter(state=ServiceSubscription.ACTIVE)
):
users_with_door_access.append(ss.user)

# and output it
outserializer = UserAccessSerializer(users_with_door_access, many=True)
return Response(outserializer.data)

@action(detail=False, methods=["post"], throttle_classes=[VerySlowThrottle])
def nfc(self, request, format=None):
"""
NFC card access
"""
def access_token_abstraction(self, request, format, method):
logentry = DeviceAccessLogEntry()
inserializer = AccessDataSerializer(data=request.data)
inserializer.is_valid(raise_exception=True)

# check that we know which device this is
deviceqs = AccessDevice.objects.all()
deviceid = inserializer.validated_data.get("deviceid")
device = get_object_or_404(deviceqs, deviceid=deviceid)
logging.debug(f"found device {device}")

cardid = inserializer.validated_data.get("payload")
qs = NFCCard.objects.filter(cardid=cardid)
access_token = inserializer.validated_data.get("payload")
if method == "phone":
# phone number comes in payload, but it is in a wrong format
# the number will most probably start with 00 instead of +
access_token = normalize_number(access_token)
users = []
if method == "phone":
users = CustomUser.objects.filter(phone=access_token)
elif method == "nfc":
users = CustomUser.objects.filter(nfccard__cardid=access_token)
elif method == "mxid":
users = CustomUser.objects.filter(mxid=access_token)

logentry.device = device
logentry.payload = cardid
logentry.payload = access_token

# 0 = success, any other = failure
response_status = 0

if qs.count() == 0:
response_status = 480
else:
logentry.nfccard = qs.first()
# nothing found, 480 (NO_CONTENT)
if users.count() == 0:
logentry.granted = False
logentry.save()
return Response(status=480)

# our user
user = qs.first().user
# planned database scheme says that
# phone numbers, MXIDs, nfc tags are/will be unique
user = users.first()

# user does not have access rights
if not user.has_door_access():
user.log("Door access denied with NFC")
door_access_denied.send(sender=self.__class__, user=user, method="nfc")
response_status = 481
# user does not have access rights
if not user.has_door_access():
response_status = 481

logentry.granted = response_status == 0

logentry.save()

if response_status == 0:
user.log("Door opened with NFC")
if method != "phone":
# uppercase NFC and MXID
user.log(f"Door opened with {method.upper()}")
else:
user.log(f"Door opened with {method}")
outserializer = UserAccessSerializer(user)
return Response(outserializer.data)

if response_status == 481:
if method != "phone":
# uppercase NFC and MXID
user.log(f"Door access denied with {method.upper()}")
else:
user.log(f"Door access denied with {method}")
door_access_denied.send(sender=self.__class__, user=user, method=method)
outserializer = UserAccessSerializer(user)
return Response(outserializer.data, status=response_status)

return Response(status=response_status)

@action(detail=False, methods=["post"], throttle_classes=[VerySlowThrottle])
def mxid(self, request, format=None):
"""
Matrix mxid access
def phone(self, request, format=None):
"""
logentry = DeviceAccessLogEntry()
inserializer = AccessDataSerializer(data=request.data)
inserializer.is_valid(raise_exception=True)

deviceqs = AccessDevice.objects.all()
deviceid = inserializer.validated_data.get("deviceid")
device = get_object_or_404(deviceqs, deviceid=deviceid)
logging.debug(f"found device {device}")

mxid = inserializer.validated_data.get("payload")
users = CustomUser.objects.filter(mxid=mxid)
Check if the phone number is allowed to access and return some user data
to caller.
logentry.device = device
logentry.payload = mxid
call with something like this
http POST http://127.0.0.1:8000/api/v1/access/phone/ deviceid=asdf payload=0440431918
# 0 = success, any other = failure
response_status = 0
returns 200 ok with some user data if everything is fine and 4XX for other situations
if users.count() != 1:
response_status = 480
else:
user = users.first()
users with enough power will also get a list of all users with door access with this endpoint
"""
return AccessViewSet.access_token_abstraction(self, request, format, "phone")

# user does not have access rights
if not user.has_door_access():
door_access_denied.send(sender=self.__class__, user=user, method="mxid")
response_status = 481
@action(detail=False, methods=["post"], throttle_classes=[VerySlowThrottle])
def nfc(self, request, format=None):
"""
NFC card access
"""
return AccessViewSet.access_token_abstraction(self, request, format, "nfc")

logentry.granted = response_status == 0
logentry.save()
@action(detail=False, methods=["post"], throttle_classes=[VerySlowThrottle])
def mxid(self, request, format=None):
"""
Matrix mxid access
"""
return AccessViewSet.access_token_abstraction(self, request, format, "mxid")

if response_status == 0:
outserializer = UserAccessSerializer(user)
return Response(outserializer.data)
@phone.mapping.get
def phone_list(self, request, format=None):
"""
List all phone access users
"""
# only for superusers
if not request.user or not request.user.is_superuser:
return Response(status=status.HTTP_401_UNAUTHORIZED)

if response_status == 481:
outserializer = UserAccessSerializer(user)
return Response(outserializer.data, status=response_status)
# collect list of all users that have door access
users_with_door_access = []
for ss in (
ServiceSubscription.objects.select_related("user")
.filter(service=config.DEFAULT_ACCOUNT_SERVICE)
.filter(state=ServiceSubscription.ACTIVE)
):
users_with_door_access.append(ss.user)

return Response(status=response_status)
# and output it
outserializer = UserAccessSerializer(users_with_door_access, many=True)
return Response(outserializer.data)

def list(self, request):
return Response(status=status.HTTP_501_NOT_IMPLEMENTED)

0 comments on commit 5aa3c8c

Please sign in to comment.