-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmain.py
123 lines (104 loc) · 3.97 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import requests
import logging
import re
# Configure logging
logging.basicConfig(level=logging.INFO)
# Constants
DOMAIN = 'YOUR_DOMAIN'
PORT = 'YOUR_PORT'
USERNAME = 'YOUR_USERNAME'
PASSWORD = 'YOUR_PASSWORD'
USER_STATUS = 'E' # Set E for Expired, L for Limited, D for Disabled, A for Active
HTTPS = True # Set this to True for HTTPS, False for HTTP
# Create a reusable session
session = requests.Session()
# Yay!
STATUS_MAPPING = {
'E': 'Expired',
'L': 'Limited',
'D': 'Disabled',
'A': 'Active'
}
def get_access_token(username, password):
use_protocol = 'https' if HTTPS else 'http'
url = f'{use_protocol}://{DOMAIN}:{PORT}/api/admin/token'
data = {
'username': USERNAME,
'password': PASSWORD
}
headers = {
"accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded"
}
try:
response = session.post(url, data=data)
response.raise_for_status()
access_token = response.json()['access_token']
logging.info(".:Logged in Successfully:.")
return access_token
except requests.exceptions.RequestException as e:
logging.error(f'Error occurred while obtaining access token: {e}')
return None
def get_users_list(access_token):
use_protocol = 'https' if HTTPS else 'http'
if USER_STATUS.upper() == 'E':
url = f'{use_protocol}://{DOMAIN}:{PORT}/api/users?status=expired'
elif USER_STATUS.upper() == 'L':
url = f'{use_protocol}://{DOMAIN}:{PORT}/api/users?status=limited'
elif USER_STATUS.upper() == 'D':
url = f'{use_protocol}://{DOMAIN}:{PORT}/api/users?status=disabled'
elif USER_STATUS.upper() == 'A':
url = f'{use_protocol}://{DOMAIN}:{PORT}/api/users?status=active'
else:
logging.error(f'Invalid USER_STATUS: {USER_STATUS}')
return None
headers = {
'accept': 'application/json',
'Authorization': f'Bearer {access_token}'
}
try:
response = session.get(url, headers=headers)
response.raise_for_status()
users_list = response.json()
return users_list
except requests.exceptions.RequestException as e:
logging.error(f'Error occurred while retrieving users list: {e}')
return None
def remove_users(access_token, user):
use_protocol = 'https' if HTTPS else 'http'
url = f'{use_protocol}://{DOMAIN}:{PORT}/api/user/{user}'
headers = {
'accept': 'application/json',
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
try:
response = session.delete(url, headers=headers)
response.raise_for_status()
user_details = response.json()
return True
except requests.exceptions.RequestException as e:
logging.error(f'Error occurred while removing user: {e}')
return False
if __name__ == "__main__":
access_token = get_access_token(USERNAME, PASSWORD)
if access_token:
users_list = get_users_list(access_token)
if users_list:
users = []
for user in users_list.get('users', []):
match = re.search(r"'username'\s*:\s*'(\w+)'", str(user))
if match:
users.append(match.group(1))
# Count and print the number of users based on USER_STATUS
num_users = len(users)
status_text = STATUS_MAPPING.get(USER_STATUS.upper(), 'Unknown')
logging.info(f'Total {status_text} Users: {num_users} Found.')
for i in users:
if remove_users(access_token, i):
logging.info(f'User {i} has been successfully removed.')
else:
logging.error(f'Failed to remove user {i}')
logging.info(f'All {status_text} users have been processed.')
# @AMLDevelopment in Telegram
# Github : https://github.com/itsAML