Skip to content

Commit

Permalink
fix test_get. rockstor#688
Browse files Browse the repository at this point in the history
  • Loading branch information
schakrava committed Jun 29, 2015
1 parent ce0a480 commit 3139972
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 76 deletions.
7 changes: 5 additions & 2 deletions src/rockstor/storageadmin/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def setUp(self):
def tearDown(self):
self.client.logout()

def get_base(self, baseurl):
def get_base(self, baseurl, name=True):
"""
Test GET request
1. Get base URL
Expand All @@ -52,5 +52,8 @@ def get_base(self, baseurl):
self.assertEqual(response.status_code, status.HTTP_200_OK, msg=response.data)

# get object that doesn't exist
response1 = self.client.get('%s/invalid' % baseurl)
if (name):
response1 = self.client.get('%s/invalid' % baseurl)
else:
response1 = self.client.get('%s/1234567' % baseurl)
self.assertEqual(response1.status_code, status.HTTP_404_NOT_FOUND, msg=response1)
139 changes: 69 additions & 70 deletions src/rockstor/storageadmin/tests/test_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ def setUpClass(cls):
super(UserTests, cls).setUpClass()

# post mocks

cls.patch_getpwnam = patch('pwd.getpwnam')
cls.mock_getpwnam = cls.patch_getpwnam.start()
cls.mock_getpwnam.return_value = 1,2,3,4

cls.patch_useradd = patch('storageadmin.views.user.useradd')
cls.mock_useradd = cls.patch_useradd.start()
cls.mock_useradd.return_value = ([''], [''], 0)
Expand All @@ -59,11 +59,11 @@ def setUpClass(cls):
cls.patch_update_shell = patch('storageadmin.views.user.update_shell')
cls.mock_update_shell = cls.patch_update_shell.start()
cls.mock_update_shell.return_value = True

cls.patch_is_pub_key = patch('storageadmin.views.user.is_pub_key')
cls.mock_is_pub_key = cls.patch_is_pub_key.start()
cls.mock_is_pub_key.return_value = False


@classmethod
def tearDownClass(cls):
Expand All @@ -76,180 +76,179 @@ def test_get(self):
"""
# get base URL
self.get_base(self.BASE_URL)

# get user with username admin2
response = self.client.get('%s/admin2' % self.BASE_URL)
self.assertEqual(response.status_code, status.HTTP_200_OK, msg=response)
def test_post_requests(self):

def test_post_requests(self):
data = {'username': 'user1','password': 'pwuser1',}
invalid_user_names = ('User $', '-user', '.user', '', ' ',)
for uname in invalid_user_names:
data['username'] = uname
response = self.client.post(self.BASE_URL, data=data)
self.assertEqual(response.status_code,
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
e_msg = ("Username is invalid. It must confirm to the regex: [A-Za-z][-a-zA-Z0-9_]*$")
e_msg = ("Username is invalid. It must confirm to the regex: [A-Za-z][-a-zA-Z0-9_]*$")
self.assertEqual(response.data['detail'], e_msg)
# username with more than 30 characters


# username with more than 30 characters
invalid_user_name = 'user'*11
data = {'username': invalid_user_name,'password': 'pwadmin',}
response = self.client.post(self.BASE_URL, data=data)
self.assertEqual(response.status_code,
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
e_msg = ("Username cannot be more than 30 characters long")
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
e_msg = ("Username cannot be more than 30 characters long")
self.assertEqual(response.data['detail'], e_msg)

# create user with no password
data = {'username': 'user1'}
response = self.client.post(self.BASE_URL, data=data)
self.assertEqual(response.status_code,
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
e_msg = ("Password must be a valid string")
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
e_msg = ("Password must be a valid string")
self.assertEqual(response.data['detail'], e_msg)

# create user with invalid admin(not boolean)
data = {'username': 'user1','password': 'pwuser1','admin':'Y'}
response = self.client.post(self.BASE_URL, data=data)
self.assertEqual(response.status_code,
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
e_msg = ("Admin(user type) must be a boolean")
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
e_msg = ("Admin(user type) must be a boolean")
self.assertEqual(response.data['detail'], e_msg)

# create user with invalid shell
data = {'username': 'user1','password': 'pwuser1','shell':'Y'}
response = self.client.post(self.BASE_URL, data=data)
self.assertEqual(response.status_code,
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
e_msg = ("shell(Y) is not valid. Valid shells are ('/opt/rock-dep/bin/rcli', '/bin/bash', '/sbin/nologin')")
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
e_msg = ("shell(Y) is not valid. Valid shells are ('/opt/rock-dep/bin/rcli', '/bin/bash', '/sbin/nologin')")
self.assertEqual(response.data['detail'], e_msg)

# create user with existing username
data = {'username': 'admin','password': 'pwadmin',}
response = self.client.post(self.BASE_URL, data=data)
self.assertEqual(response.status_code,
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
e_msg = ("user: admin already exists. Please choose a different username")
self.assertEqual(response.data['detail'], e_msg)
e_msg = ("user: admin already exists. Please choose a different username")
self.assertEqual(response.data['detail'], e_msg)

# create a user with existing uid (admin2 has uid 1001)
data = {'username': 'newUser','password': 'pwuser2', 'group': 'admin', 'uid':'1001','pubic_key':'xxx'}
response = self.client.post(self.BASE_URL, data=data)
self.assertEqual(response.status_code,
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
e_msg = ("uid: 1001 already exists. Please choose a different one.")
self.assertEqual(response.data['detail'], e_msg)
e_msg = ("uid: 1001 already exists. Please choose a different one.")
self.assertEqual(response.data['detail'], e_msg)

# happy path
data = {'username': 'newUser','password': 'pwuser2', 'group': 'admin', 'pubic_key':'xxx'}
response = self.client.post(self.BASE_URL, data=data)
self.assertEqual(response.status_code,
status.HTTP_200_OK, msg=response.data)
self.assertEqual(response.data['username'], 'newUser')
self.assertEqual(response.data['username'], 'newUser')

data = {'username': 'newUser2','password': 'pwuser2', 'uid':'5001', 'user':'newuser'}
response = self.client.post(self.BASE_URL, data=data)
self.assertEqual(response.status_code,
status.HTTP_200_OK, msg=response.data)
self.assertEqual(response.data['username'], 'newUser2')

def test_email_validation(self):

# create user with invalid email
data = {'username': 'user1','password': 'pwuser1','email':'123'}
response = self.client.post(self.BASE_URL, data=data)
self.assertEqual(response.status_code,
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)


def test_put_requests(self):

status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
self.assertEqual(response.data['detail'], "{'email': [u'Enter a valid email address.']}")


def test_put_requests(self):

# Edit user that does not exists
data = {'group':'admin'}
response = self.client.put('%s/admin5' % self.BASE_URL, data=data)
self.assertEqual(response.status_code,
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
e_msg = ("User(admin5) does not exist")
self.assertEqual(response.data['detail'], e_msg)
e_msg = ("User(admin5) does not exist")
self.assertEqual(response.data['detail'], e_msg)

data = {'password': 'admin2','group':'admin'}
response = self.client.put('%s/bin' % self.BASE_URL, data=data)
self.assertEqual(response.status_code,
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
e_msg = ("Editing restricted user(bin) is not supported.")
self.assertEqual(response.data['detail'], e_msg)
e_msg = ("Editing restricted user(bin) is not supported.")
self.assertEqual(response.data['detail'], e_msg)

data = {'admin': True, 'group':'admin'}
response = self.client.put('%s/admin2' % self.BASE_URL, data=data)
self.assertEqual(response.status_code,
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
e_msg = ("password reset is required to enable admin access. please provide a new password")
e_msg = ("password reset is required to enable admin access. please provide a new password")
self.assertEqual(response.data['detail'], e_msg)
# happy path

# happy path
data = {'password': 'admin2','group':'admin', 'admin': True}
response = self.client.put('%s/admin2' % self.BASE_URL, data=data)
self.assertEqual(response.status_code,
status.HTTP_200_OK, msg=response.data)

data = {'password': 'admin2','group':'admin', 'admin': True, 'user':'uadmin2'}
response = self.client.put('%s/admin2' % self.BASE_URL, data=data)
self.assertEqual(response.status_code,
status.HTTP_200_OK, msg=response.data)
status.HTTP_200_OK, msg=response.data)

data = {'password': 'admin2','group':'admin', 'user':'uadmin2','shell':'/bin/xyz', 'email':'admin2@xyz.com'}
response = self.client.put('%s/admin2' % self.BASE_URL, data=data)
self.assertEqual(response.status_code,
status.HTTP_200_OK, msg=response.data)
def test_delete_requests(self):
status.HTTP_200_OK, msg=response.data)

def test_delete_requests(self):

# delete user that does not exists
username = 'admin5'
response = self.client.delete('%s/%s' % (self.BASE_URL,username))
self.assertEqual(response.status_code,
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
e_msg = ("User(admin5) does not exist")
self.assertEqual(response.data['detail'], e_msg)
e_msg = ("User(admin5) does not exist")
self.assertEqual(response.data['detail'], e_msg)

# delete user that does not exists
username = 'bin'
response = self.client.delete('%s/%s' % (self.BASE_URL,username))
self.assertEqual(response.status_code,
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
e_msg = ("Delete of restricted user(bin) is not supported.")
self.assertEqual(response.data['detail'], e_msg)
e_msg = ("Delete of restricted user(bin) is not supported.")
self.assertEqual(response.data['detail'], e_msg)


username = 'admin2'
self.mock_userdel.side_effect = KeyError('error')
self.mock_userdel.side_effect = KeyError('error')
response = self.client.delete('%s/%s' % (self.BASE_URL,username))
self.assertEqual(response.status_code,
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
e_msg = ("A low level error occured while deleting the user: admin2")
e_msg = ("A low level error occured while deleting the user: admin2")
self.assertEqual(response.data['detail'], e_msg)

# delete currently logged in user
self.mock_userdel.side_effect = None
username = 'admin'
response = self.client.delete('%s/%s' % (self.BASE_URL,username))
self.assertEqual(response.status_code,
status.HTTP_500_INTERNAL_SERVER_ERROR, msg=response.data)
e_msg = ("Cannot delete the currently logged in user")
e_msg = ("Cannot delete the currently logged in user")
self.assertEqual(response.data['detail'], e_msg)

# happy path
# delete user
# delete user
username = 'admin2'
response = self.client.delete('%s/%s' % (self.BASE_URL,username))
self.assertEqual(response.status_code,
status.HTTP_200_OK, msg=response.data)

username = 'admin3'
response = self.client.delete('%s/%s' % (self.BASE_URL,username))
self.assertEqual(response.status_code,
status.HTTP_200_OK, msg=response.data)


status.HTTP_200_OK, msg=response.data)
8 changes: 4 additions & 4 deletions src/rockstor/storageadmin/views/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""

from rest_framework.response import Response
from rest_framework import status
from django.db import transaction
from django.conf import settings
from storageadmin.util import handle_exception
Expand All @@ -35,6 +36,7 @@


class UserMixin(object):
serializer_class = SUserSerializer
exclude_list = ('root', 'nobody', 'bin', 'daemon', 'adm', 'sync',
'shutdown', 'halt', 'mail', 'operator', 'dbus', 'rpc',
'avahi', 'avahi-autoipd', 'rpcuser', 'nfsnobody',
Expand Down Expand Up @@ -92,8 +94,6 @@ def _validate_public_key(request):


class UserListView(UserMixin, rfc.GenericView):
serializer_class = SUserSerializer

def get_queryset(self, *args, **kwargs):
if ('username' in self.kwargs):
self.paginate_by = 0
Expand Down Expand Up @@ -177,8 +177,8 @@ def get(self, *args, **kwargs):
data = User.objects.get(username=self.kwargs['username'])
serialized_data = SUserSerializer(data)
return Response(serialized_data.data)
except:
return Response()
except User.DoesNotExist:
return Response(status=status.HTTP_404_NOT_FOUND)

@transaction.atomic
def put(self, request, username):
Expand Down

0 comments on commit 3139972

Please sign in to comment.