-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutilities.py
142 lines (113 loc) · 5.44 KB
/
utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import requests
from private import telegram_bot_url, ADMIN_CHAT_IDs, sponser_channel_ids, database_detail
import functools
import arrow
from datetime import datetime
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from posgres_manager import Client
from user.userManager import UserClient
import traceback
from telegram.ext import ConversationHandler
from wallet.wallet_core import WalletManage
main_page_keyboard = [
[InlineKeyboardButton('دوره ها 📚', callback_data='course_list_')],
[InlineKeyboardButton('کیف پول 👝', callback_data='wallet_page')],
[InlineKeyboardButton('زیرمجموعه گیری 👥', callback_data='referral_page')]
]
database_pool = Client(**database_detail)
user_manager = UserClient(database_pool)
wallet_manager = WalletManage('UserDetail', 'credit', database_pool, 'userID')
def report_problem_to_admin(msg: str):
return requests.post(url=telegram_bot_url, data={'chat_id': ADMIN_CHAT_IDs[0], 'text': msg})
def handle_error(func):
@functools.wraps(func)
async def warpper(update, context):
user_detail = update.effective_chat
try:
return await func(update, context)
except Exception as e:
err = ("🔴 Report Problem in Bot\n\n"
f"Something Went Wrong In <b>{func.__name__}</b> Section."
f"\nUser ID: {user_detail.id}"
f"\nError Type: {type(e).__name__}"
f"\nError Reason:\n{e}")
print(err)
report_problem_to_admin(err)
await context.bot.send_message(text='متاسفانه مشکلی وجود داشت، گزارش به ادمین ها ارسال شد!', chat_id=user_detail.id, parse_mode='html')
return warpper
def replace_with_space(txt):
return txt.replace('_', ' ')
def human_readable(date):
get_date = arrow.get(date)
return get_date.humanize()
def unix_time_to_datetime(date):
return datetime.fromtimestamp(date)
def check_join_in_channel(func):
async def wrapper(update, context):
user_detail = update.effective_chat
join_channel_button, text = [], ''
for sponser_channel_username, sponser_channel_address in sponser_channel_ids.items():
member = await context.bot.get_chat_member(sponser_channel_address[0], user_detail.id)
if member.status in ['left', 'kicked']:
text = 'شما در کانال یا گروه های زیر عضو نیستید!'
join_channel_button.append(
[InlineKeyboardButton(sponser_channel_username, url=f'https://t.me/{sponser_channel_address[1]}')])
if join_channel_button:
join_channel_button.append([InlineKeyboardButton('عضو شدم ✅', callback_data='check_join_to_channel')])
await context.bot.send_message(chat_id=user_detail.id, text=text,
reply_markup=InlineKeyboardMarkup(join_channel_button))
return
return await func(update, context)
return wrapper
async def report_problem(func_name, error, side, extra_message=None):
text = (f"🔴 BOT Report Problem [{side}]\n\n"
f"\nFunc Name: {func_name}"
f"\nError Type: {type(error).__name__}"
f"\nError Reason:\n{error}"
f"\nExtra Message:\n{extra_message}")
requests.post(url=telegram_bot_url, data={'chat_id': ADMIN_CHAT_IDs[0], 'text': text})
async def something_went_wrong(update, context):
text= "متاسفانه مشکلی وجود داشت!\nگزارش مشکل به ادمین ارسال شد."
if getattr(update, 'callback_query'):
query = update.callback_query
await query.answer(text)
else:
await update.message.reply_text(text)
def handle_telegram_exceptions(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
side = 'Telgram Func'
print(f"[{side}] An error occurred in {func.__name__}: {e}")
await report_problem(func.__name__, e, side, extra_message=traceback.format_exc())
await something_went_wrong(*args)
return wrapper
def handle_telegram_conversetion_exceptions(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
print(e)
await report_problem(func.__name__, e, 'Telegram Conversetion Func', extra_message=traceback.format_exc())
await something_went_wrong(*args)
return ConversationHandler.END
return wrapper
async def report_problem_to_admin_witout_context(text, chat_id, error, detail=None):
text = ("🔴 Report Problem in Bot\n\n"
f"Something Went Wrong In {text} Section."
f"\nUser ID: {chat_id}"
f"\nError Type: {type(error).__name__}"
f"\nError Reason:\n{error}")
text += f"\nDetail:\n {detail}" if detail else ''
requests.post(url=telegram_bot_url, data={'chat_id': ADMIN_CHAT_IDs[0], 'text': text})
print(f'* REPORT TO ADMIN SUCCESS: ERR: {error}')
def handle_telegram_exceptions_without_user_side(func):
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
await report_problem(func.__name__, e, 'Telgram Func', extra_message=traceback.format_exc())
return wrapper