From f2e2535bff02839e0b6387062167140a4dcff6f9 Mon Sep 17 00:00:00 2001 From: Sarabjit Dhiman Date: Fri, 24 May 2024 19:37:23 +0530 Subject: [PATCH] Fix : Previous instances' sessions being overwritten Fix: session generation. Previous instance's session was being overwritten upon a new instance creation. Removed _DEFAULT_SESSION from config.py module. _DEFAULT_SESSION configuration is not required at all, it was used for some testing initially, and it was unintentionally in config.py module all this time. --- setup.py | 2 +- tweeterpy/api_util.py | 9 ++--- tweeterpy/config.py | 1 - tweeterpy/login_util.py | 20 +++++------ tweeterpy/request_util.py | 6 ++-- tweeterpy/session_util.py | 18 +++++----- tweeterpy/tweeterpy.py | 70 ++++++++++++++++++++++----------------- 7 files changed, 67 insertions(+), 59 deletions(-) diff --git a/setup.py b/setup.py index 2c3928c..0ced576 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ from setuptools import setup -VERSION = "1.1.2" +VERSION = "1.1.3" SHORT_DESCRIPTION = "TweeterPy is a python library to extract data from Twitter. TweeterPy API lets you scrape data from a user's profile like username, userid, bio, followers/followings list, profile media, tweets, etc." with open("requirements.txt") as file: diff --git a/tweeterpy/api_util.py b/tweeterpy/api_util.py index c95ce10..cba0943 100644 --- a/tweeterpy/api_util.py +++ b/tweeterpy/api_util.py @@ -24,7 +24,8 @@ class ApiUpdater: Twitter updates its API quite frequently. Therefore, ApiUpdater checks for the latest updates and modifies the api_endpoints, feature_switches, path etc in constants.py """ - def __init__(self, update_api=True): + def __init__(self, update_api=True, session=None): + self.__session = session try: logger.debug('Updating API...') # fmt: off - Turns off formatting for this block of code. @@ -59,7 +60,7 @@ def __init__(self, update_api=True): # fmt: on def _get_home_page_source(self): - return str(make_request(Path.BASE_URL)) + return str(make_request(Path.BASE_URL, session=self.__session)) def _get_api_file_url(self, page_source=None): if page_source is None: @@ -88,12 +89,12 @@ def _get_main_file_url(self, page_source=None): def _get_api_file_content(self, file_url=None): if file_url is None: file_url = self._get_api_file_url() - return str(make_request(file_url)) + return str(make_request(file_url, session=self.__session)) def _get_main_file_content(self, file_url=None): if file_url is None: file_url = self._get_main_file_url() - return str(make_request(file_url)) + return str(make_request(file_url, session=self.__session)) def _js_to_py_dict(sel, page_source): if isinstance(page_source, list): diff --git a/tweeterpy/config.py b/tweeterpy/config.py index bce75af..e787f8d 100644 --- a/tweeterpy/config.py +++ b/tweeterpy/config.py @@ -1,5 +1,4 @@ # Configuration File -_DEFAULT_SESSION = None # Used to reuse generated session. DON'T CHANGE IT _RATE_LIMIT_STATS = None # Used to keep a track of api limits. DON'T CHANGE IT _USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36' diff --git a/tweeterpy/login_util.py b/tweeterpy/login_util.py index cb5c0f5..6db97ef 100644 --- a/tweeterpy/login_util.py +++ b/tweeterpy/login_util.py @@ -5,8 +5,8 @@ class TaskHandler: - def __init__(self): - pass + def __init__(self, session=None): + self.__session = session def _create_task_mapper(self, username, password, verification_input_data): # fmt: off - Turns off formatting for this block of code. Just for the readability purpose. @@ -32,11 +32,11 @@ def _get_flow_token(self): 'phone_verification': 4, 'privacy_options': 1, 'security_key': 3, 'select_avatar': 4, 'select_banner': 2, 'settings_list': 7, 'show_code': 1, 'sign_up': 2, 'sign_up_review': 4, 'tweet_selection_urt': 1, 'update_users': 1, 'upload_media': 1, 'user_recommendations_list': 4, 'user_recommendations_urt': 1, 'wait_spinner': 3, 'web_modal': 1}} - return make_request(Path.TASK_URL, method="POST", params=params, json=payload) + return make_request(Path.TASK_URL, method="POST", params=params, json=payload, session=self.__session) def _get_javscript_instrumentation_subtask(self): params = {'c_name': 'ui_metrics'} - return make_request(Path.JAVSCRIPT_INSTRUMENTATION_URL, params=params) + return make_request(Path.JAVSCRIPT_INSTRUMENTATION_URL, params=params, session=self.__session) def _get_user_flow_token(self, flow_token, subtask_id="LoginJsInstrumentationSubtask"): payload = {'flow_token': flow_token, @@ -44,7 +44,7 @@ def _get_user_flow_token(self, flow_token, subtask_id="LoginJsInstrumentationSub 'js_instrumentation': { 'response': '', 'link': 'next_link'}}]} - return make_request(Path.TASK_URL, method="POST", json=payload) + return make_request(Path.TASK_URL, method="POST", json=payload, session=self.__session) @disable_logger def _get_password_flow_token(self, flow_token, subtask_id="LoginEnterUserIdentifierSSO", username=None): @@ -53,31 +53,31 @@ def _get_password_flow_token(self, flow_token, subtask_id="LoginEnterUserIdentif 'settings_list': { 'setting_responses': [{'key': 'user_identifier', 'response_data': {'text_data': {'result': username}}}], 'link': 'next_link'}}]} - return make_request(Path.TASK_URL, method="POST", json=payload) + return make_request(Path.TASK_URL, method="POST", json=payload, session=self.__session) @disable_logger def _get_account_duplication_flow_token(self, flow_token, subtask_id="LoginEnterPassword", password=None): payload = {'flow_token': flow_token, 'subtask_inputs': [{'subtask_id': subtask_id, 'enter_password': {'password': password, 'link': 'next_link'}}]} - return make_request(Path.TASK_URL, method="POST", json=payload) + return make_request(Path.TASK_URL, method="POST", json=payload, session=self.__session) def _check_suspicious_login(self, flow_token, subtask_id="DenyLoginSubtask"): payload = {"flow_token": flow_token, "subtask_inputs": [{"subtask_id": subtask_id, "cta": {"link": "next_link"}}]} - return make_request(Path.TASK_URL, method="POST", json=payload) + return make_request(Path.TASK_URL, method="POST", json=payload, session=self.__session) def _check_account_duplication(self, flow_token, subtask_id="AccountDuplicationCheck"): payload = {'flow_token': flow_token, 'subtask_inputs': [{'subtask_id': subtask_id, 'check_logged_in_account': {'link': 'AccountDuplicationCheck_false'}}]} - return make_request(Path.TASK_URL, method="POST", json=payload) + return make_request(Path.TASK_URL, method="POST", json=payload, session=self.__session) def _handle_suspicious_login(self, flow_token, subtask_id="LoginAcid",verification_input_data=None): payload = {"flow_token": flow_token, "subtask_inputs": [{"subtask_id": subtask_id, "enter_text": {"text": verification_input_data,"link":"next_link"}}]} handle_incorrect_input = True while handle_incorrect_input: - response = make_request(Path.TASK_URL, method="POST", json=payload, skip_error_checking=True) + response = make_request(Path.TASK_URL, method="POST", json=payload, skip_error_checking=True, session=self.__session) if isinstance(response, dict) and "errors" in response.keys(): error_message = "\n".join([error['message'] for error in response['errors']]) payload['subtask_inputs'][0]['enter_text']['text'] = str(input(f"{error_message} - Type again ==> ")) diff --git a/tweeterpy/request_util.py b/tweeterpy/request_util.py index a51d362..f227c40 100644 --- a/tweeterpy/request_util.py +++ b/tweeterpy/request_util.py @@ -9,12 +9,14 @@ def make_request(url, session=None, method=None, max_retries=None, timeout=None, skip_error_checking=False, **kwargs): + if session is None: + raise NameError("name 'session' is not defined.") + if not isinstance(session, requests.Session): + raise TypeError(f"Invalid session type. {session} is not a requests.Session Object...") if method is None: method = "GET" if max_retries is None: max_retries = config.MAX_RETRIES or 3 - if session is None: - session = config._DEFAULT_SESSION or requests.Session() if timeout is None: timeout = config.TIMEOUT or 30 logger.debug(f"{locals()}") diff --git a/tweeterpy/session_util.py b/tweeterpy/session_util.py index 345a1c2..fc90365 100644 --- a/tweeterpy/session_util.py +++ b/tweeterpy/session_util.py @@ -34,13 +34,10 @@ def _show_saved_sessions(directory_path=None): def save_session(filename=None, path=None, session=None): - if session is None or not isinstance(session, requests.Session): - logger.warn( - "No Session object given. Trying to save existing/default Session...") - if config._DEFAULT_SESSION: - session = config._DEFAULT_SESSION - else: - raise TypeError(f'{session} is not a requests Session Object...') + if session is None: + raise NameError("name 'session' is not defined.") + if not isinstance(session, requests.Session): + raise TypeError(f"Invalid session type. {session} is not a requests.Session Object...") if filename is None: filename = str( input("Enter Username/Account Name to Save the Session : ")).strip() @@ -54,15 +51,16 @@ def save_session(filename=None, path=None, session=None): def load_session(file_path=None, session=None): + if session is None: + raise NameError("name 'session' is not defined.") + if not isinstance(session, requests.Session): + raise TypeError(f"Invalid session type. {session} is not a requests.Session Object...") if file_path is None: file_path = _show_saved_sessions() with open(file_path, "rb") as file: headers, cookies = pickle.load(file) - if session is None: - session = config._DEFAULT_SESSION or requests.Session() session.headers = headers session.cookies = cookies - config._DEFAULT_SESSION = session return session diff --git a/tweeterpy/tweeterpy.py b/tweeterpy/tweeterpy.py index 950c03e..f988217 100644 --- a/tweeterpy/tweeterpy.py +++ b/tweeterpy/tweeterpy.py @@ -28,12 +28,12 @@ def __init__(self): set_log_level(logging.ERROR, external_only=disable_external_only) self.generate_session() # update api endpoints - self.__token = self.session.headers.pop("Authorization") + self.__token = self.__session.headers.pop("Authorization") try: - ApiUpdater(update_api=config.UPDATE_API) + ApiUpdater(update_api=config.UPDATE_API, session=self.__session) except Exception as error: logger.warn(error) - self.session.headers.update({"Authorization":self.__token}) + self.__session.headers.update({"Authorization":self.__token}) def _generate_request_data(self, endpoint, variables=None, **kwargs): # fmt: off - Turns off formatting for this block of code. Just for the readability purpose. @@ -45,11 +45,11 @@ def _generate_request_data(self, endpoint, variables=None, **kwargs): features = FeatureSwitch().get_query_features(endpoint) or util.generate_features(**kwargs) query_params["features"] = json.dumps(features) # fmt: on - request_payload = {"url": url, "params": query_params} + request_payload = {"url": url, "params": query_params, "session":self.__session} logger.debug(f"Request Payload => {request_payload}") return request_payload - def _handle_pagination(self, url, params, end_cursor=None, data_path=None, total=None, pagination=True): + def _handle_pagination(self, url, params, end_cursor=None, data_path=None, total=None, pagination=True, **kwargs): # fmt: off - Turns off formatting for this block of code. Just for the readability purpose. def filter_data(response): filtered_data = [] @@ -65,13 +65,14 @@ def filter_data(response): logger.warn("Either enable the pagination or disable total number of results.") raise Exception("pagination cannot be disabled while the total number of results are specified.") data_container = {"data": [],"cursor_endpoint": None, "has_next_page": True, "api_rate_limit":config._RATE_LIMIT_STATS} + session = kwargs.get("session", self.__session) while data_container["has_next_page"]: try: if end_cursor: variables = json.loads(params['variables']) variables['cursor'] = end_cursor params['variables'] = json.dumps(variables) - response = make_request(url, params=params) + response = make_request(url, params=params, session=session) data = [item for item in reduce( dict.get, data_path, response) if item['type'] == 'TimelineAddEntries'][0]['entries'] top_cursor = [ @@ -107,12 +108,11 @@ def filter_data(response): @property def session(self): - return self._session + return self.__session @session.setter def session(self, session): - self._session = session - config._DEFAULT_SESSION = session + self.__session = session @property def me(self): @@ -125,9 +125,11 @@ def me(self): "withSubscribedTab": True, "withCommunitiesCreation": True} request_payload = self._generate_request_data( Path.VIEWER_ENDPOINT, variables, user_data_features=True) - response = self.session.get(**request_payload) try: - return response.json() + response = make_request(**request_payload) + if not isinstance(response, dict): + raise Exception(response) + return response except: logger.info("Guest Session") return @@ -151,25 +153,29 @@ def generate_session(self, auth_token=None): """ try: logger.debug("Trying to generate a new session.") - self.session = requests.Session() + session = requests.Session() if config.PROXY is not None: - self.session.proxies = config.PROXY - self.session.verify = False - self.session.headers.update(util.generate_headers()) - # home_page = make_request(Path.BASE_URL, session=self.session) - home_page = util.handle_x_migration(session=self.session) - guest_token = make_request( - Path.GUEST_TOKEN_URL, method="POST", session=self.session).get('guest_token', util.find_guest_token(home_page)) - self.session.headers.update({'X-Guest-Token': guest_token}) - self.session.cookies.update({'gt': guest_token}) + session.proxies = config.PROXY + session.verify = False + session.headers.update(util.generate_headers()) + # home_page = make_request(Path.BASE_URL, session=session) + home_page = util.handle_x_migration(session=session) + try: + guest_token = make_request( + Path.GUEST_TOKEN_URL, method="POST", session=session).get('guest_token', util.find_guest_token(home_page)) + except Exception as error: + logger.error(error) + session.headers.update({'X-Guest-Token': guest_token}) + session.cookies.update({'gt': guest_token}) if auth_token: - self.session.cookies.update({'auth_token': auth_token}) - util.generate_headers(self.session) + session.cookies.update({'auth_token': auth_token}) + util.generate_headers(session) except Exception as error: logger.exception(f"Couldn't generate a new session.\n{error}\n") raise logger.debug("Session has been generated.") - return self.session + self.__session = session + return self.__session def save_session(self, session=None, session_name=None): """Save a logged in session to avoid frequent logins in future. @@ -182,7 +188,7 @@ def save_session(self, session=None, session_name=None): path: Saved session file path. """ if session is None: - session = self.session + session = self.__session if session_name is None: session_name = self.me['data']['viewer']['user_results']['result']['legacy']['screen_name'] return save_session(filename=session_name, session=session) @@ -197,9 +203,11 @@ def load_session(self, session_file_path=None, session=None): Returns: requests.Session: Restored session. """ - self.session = load_session( + if session is None: + session = self.generate_session() + self.__session = load_session( file_path=session_file_path, session=session) - return self.session + return self.__session def logged_in(self): """Check if the user is logged in. @@ -207,7 +215,7 @@ def logged_in(self): Returns: bool: Returns True if the user is logged in. """ - if "auth_token" in self.session.cookies.keys(): + if "auth_token" in self.__session.cookies.keys(): # logger.info('User is authenticated.') return True return False @@ -219,14 +227,14 @@ def login(self, username=None, password=None): username (str, optional): Twitter username or email. Defaults to None. password (str, optional): Password. Defaults to None. """ - if "auth_token" in self.session.cookies.keys(): + if "auth_token" in self.__session.cookies.keys(): self.generate_session() if username is None: username = str(input("Enter Your Username or Email : ")).strip() if password is None: password = getpass.getpass() - TaskHandler().login(username, password) - util.generate_headers(session=self.session) + TaskHandler(session=self.__session).login(username, password) + util.generate_headers(session=self.__session) try: user = self.me username = util.find_nested_key(user, 'screen_name')