Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modified User Route for User Management Redesign #2595

Merged
merged 14 commits into from
Dec 10, 2024
1 change: 1 addition & 0 deletions care/facility/api/viewsets/facility_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class UserFilter(filters.FilterSet):
choices=[(key, key) for key in User.TYPE_VALUE_MAP],
coerce=lambda role: User.TYPE_VALUE_MAP[role],
)
username = filters.CharFilter(field_name="username", lookup_expr="icontains")
vigneshhari marked this conversation as resolved.
Show resolved Hide resolved

class Meta:
model = User
Expand Down
2 changes: 2 additions & 0 deletions care/users/api/serializers/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ class Meta:
"pf_auth",
"read_profile_picture_url",
"user_flags",
"last_login",
Jacobjeevan marked this conversation as resolved.
Show resolved Hide resolved
)
read_only_fields = (
"is_superuser",
Expand All @@ -347,6 +348,7 @@ class Meta:
"pf_endpoint",
"pf_p256dh",
"pf_auth",
"last_login",
)

extra_kwargs = {"url": {"lookup_field": "username"}}
Expand Down
29 changes: 28 additions & 1 deletion care/users/api/viewsets/change_password.py
vigneshhari marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,26 @@ class ChangePasswordView(UpdateAPIView):
model = User

def update(self, request, *args, **kwargs):
self.object = self.request.user
username = request.data.get("username")
if not username:
return Response(
{"message": ["Username is required"]},
status=status.HTTP_400_BAD_REQUEST,
)
self.object = User.objects.filter(username=username).first()
Jacobjeevan marked this conversation as resolved.
Show resolved Hide resolved
if not self.object:
return Response(
{"message": ["User not found"]}, status=status.HTTP_404_NOT_FOUND
)
if not self.has_permission(request, self.object):
return Response(
{
"message": [
"User does not have elevated permissions to change password"
]
},
status=status.HTTP_403_FORBIDDEN,
)
serializer = self.get_serializer(data=request.data)

if serializer.is_valid():
Expand All @@ -48,3 +67,11 @@ def update(self, request, *args, **kwargs):
return Response({"message": "Password updated successfully"})

return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

def has_permission(self, request, user):
authuser = request.user
return (
authuser == user
or authuser.is_superuser
or authuser.user_type >= User.TYPE_VALUE_MAP["DistrictAdmin"]
Jacobjeevan marked this conversation as resolved.
Show resolved Hide resolved
)
35 changes: 34 additions & 1 deletion care/users/api/viewsets/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ def last_active_after(self, queryset, name, value):
return queryset.filter(last_login__gte=date)


class UserViewSetPermission(DRYPermissions):
def has_permission(self, request, view):
if request.method == "GET" and "username" in view.kwargs:
return True
return super().has_permission(request, view)

def has_object_permission(self, request, view, obj):
if request.method == "GET" and "username" in view.kwargs:
return True
return super().has_object_permission(request, view, obj)


class UserViewSet(
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
Expand All @@ -113,7 +125,7 @@ class UserViewSet(
queryset = queryset.filter(Q(asset__isnull=True))
lookup_field = "username"
lookup_value_regex = "[^/]+"
permission_classes = (IsAuthenticated, DRYPermissions)
permission_classes = (IsAuthenticated, UserViewSetPermission)
filter_backends = (
filters.DjangoFilterBackend,
rest_framework_filters.OrderingFilter,
Expand Down Expand Up @@ -155,6 +167,16 @@ def get_queryset(self):

def get_object(self) -> User:
try:
if self.request.method == "GET" and not self.kwargs.get("username"):
Jacobjeevan marked this conversation as resolved.
Show resolved Hide resolved
username = self.request.query_params.get("username")
if not username:
raise ValidationError({"message": "Username is required"})
user = get_object_or_404(self.get_queryset(), username=username)
if not self.has_permission(user):
raise ValidationError(
{"message": "You do not have permission to access this user"}
)
return user
return super().get_object()
except Http404 as e:
error = "User not found"
Expand Down Expand Up @@ -203,6 +225,17 @@ def destroy(self, request, *args, **kwargs):
user.save(update_fields=["is_active"])
return Response(status=status.HTTP_204_NO_CONTENT)

def has_permission(self, user):
requesting_user = self.request.user
return (
requesting_user == user
or requesting_user.is_superuser
or (
requesting_user.state == user.state
Jacobjeevan marked this conversation as resolved.
Show resolved Hide resolved
or requesting_user.district == user.district
)
)
Jacobjeevan marked this conversation as resolved.
Show resolved Hide resolved

@extend_schema(tags=["users"])
@action(detail=False, methods=["POST"])
def add_user(self, request, *args, **kwargs):
Expand Down
134 changes: 131 additions & 3 deletions care/users/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def get_detail_representation(self, obj=None) -> dict:
"video_connect_link": obj.video_connect_link,
"read_profile_picture_url": obj.profile_picture_url,
"user_flags": [],
"last_login": obj.last_login,
**self.get_local_body_district_state_representation(obj),
}

Expand Down Expand Up @@ -137,10 +138,38 @@ def setUpTestData(cls) -> None:
cls.user_2 = cls.create_user(**cls.data_2)

cls.data_3 = cls.get_user_data(cls.district)
cls.data_3.update({"username": "user_3", "password": "password"})
cls.data_3.update(
{
"username": "user_3",
"password": "password",
"user_type": User.TYPE_VALUE_MAP["Doctor"],
}
)
cls.user_3 = cls.create_user(**cls.data_3)
cls.link_user_with_facility(cls.user_3, cls.facility, cls.super_user)

cls.data_4 = cls.get_user_data(cls.district)
cls.data_4.update(
{
"username": "user_4",
"password": "password",
"user_type": User.TYPE_VALUE_MAP["DistrictAdmin"],
}
)
cls.user_4 = cls.create_user(**cls.data_4)
cls.link_user_with_facility(cls.user_4, cls.facility, cls.super_user)

cls.data_5 = cls.get_user_data(cls.district)
cls.data_5.update(
{
"username": "user_5",
"password": "password",
"user_type": User.TYPE_VALUE_MAP["WardAdmin"],
}
)
cls.user_5 = cls.create_user(**cls.data_5)
cls.link_user_with_facility(cls.user_5, cls.facility, cls.super_user)

def test_user_can_access_url(self):
"""Test user can access the url by location"""
username = self.user.username
Expand All @@ -152,7 +181,7 @@ def test_user_can_read_all_users_within_accessible_facility(self):
response = self.client.get("/api/v1/users/")
self.assertEqual(response.status_code, status.HTTP_200_OK)
res_data_json = response.json()
self.assertEqual(res_data_json["count"], 2)
self.assertEqual(res_data_json["count"], 3)
results = res_data_json["results"]
self.assertIn(self.user.id, {r["id"] for r in results})
self.assertIn(self.user_3.id, {r["id"] for r in results})
Expand All @@ -177,12 +206,19 @@ def test_user_can_modify_themselves(self):
)

def test_user_cannot_read_others(self):
"""Test 1 user can read the attributes of the other user"""
"""Test 1 user can read the attributes of the other user not in the same ditrict/state"""
username = self.data_2["username"]
response = self.client.get(f"/api/v1/users/{username}/")
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertEqual(response.json()["detail"], "User not found")

def test_user_can_read_others_in_same_district_or_state(self):
"""Test 1 user can read the attributes of the other user in the same district or state"""
username = self.user_3.username
response = self.client.get(f"/api/v1/users/{username}/")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.json()["first_name"], self.user_3.first_name)

def test_user_cannot_modify_others(self):
"""Test a user can't modify others"""
username = self.data_2["username"]
Expand All @@ -207,6 +243,98 @@ def test_user_cannot_delete_others(self):
User.objects.get(username=self.data_2[field]).username,
)

def test_user_cannot_change_password_of_others(self):
"""Test a user cannot change password of others"""
username = self.data_2["username"]
password = self.data_2["password"]
response = self.client.put(
"/api/v1/password_change/",
{
"username": username,
"old_password": password,
"new_password": "password2",
},
)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)

def test_user_with_districtadmin_access_can_modify_others(self):
"""Test a user with district admin access can modify others underneath the hierarchy"""
self.client.force_authenticate(self.user_4)
username = self.data_2["username"]
response = self.client.patch(
f"/api/v1/users/{username}/",
{
"date_of_birth": date(2005, 4, 1),
},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.json()["date_of_birth"], "2005-04-01")

def test_user_with_districtadmin_access_can_change_password_of_others(self):
"""Test a user with district admin perms can change the password of other users underneath the hierarchy"""
self.client.force_authenticate(self.user_4)
username = self.data_2["username"]
password = self.data_2["password"]
response = self.client.put(
"/api/v1/password_change/",
{
"username": username,
"old_password": password,
"new_password": "password2",
},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)

def test_user_with_districtadmin_access_cannot_change_password_of_others_without_username(
self,
):
"""Test a user with district admin access cannot change the password of other users without username"""
self.client.force_authenticate(self.user_4)
response = self.client.put(
"/api/v1/password_change/",
{"old_password": "password", "new_password": "password2"},
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.data["message"][0], "Username is required")

def test_user_with_district_admin_cannot_change_password_of_non_existing_user(self):
"""Test a user with district admin access cannot change the password of a non existing user"""
self.client.force_authenticate(self.user_4)
response = self.client.put(
"/api/v1/password_change/",
{
"username": "foobar",
"old_password": "password",
"new_password": "password2",
},
)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertEqual(response.data["message"][0], "User not found")

def test_user_with_district_admin_cannot_change_password_of_others_with_invalid_old_password(
self,
):
"""Test a user with district admin access cannot change the password of other users with invalid old password"""
self.client.force_authenticate(self.user_4)
response = self.client.put(
"/api/v1/password_change/",
{
"username": self.data_2["username"],
"old_password": "wrong_password",
"new_password": "password2",
},
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(
response.data["old_password"][0],
"Wrong password entered. Please check your password.",
)

def test_user_gets_error_when_accessing_user_details_with_invalid_username(self):
"""Test a user gets error when accessing user details with invalid username"""
response = self.client.get("/api/v1/users/foobar/")
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)


class TestUserFilter(TestUtils, APITestCase):
@classmethod
Expand Down
Loading