Skip to content

Commit

Permalink
Fix : Previous instances' sessions being overwritten
Browse files Browse the repository at this point in the history
Fix: session generation. Previous instance's session was being overwritten upon a new instance creation. Removed _DEFAULT_SESSION from config.py module. _DEFAULT_SESSION configuration is not required at all, it was used for some testing initially, and it was unintentionally in config.py module all this time.
  • Loading branch information
iSarabjitDhiman committed May 24, 2024
1 parent faa06e6 commit f2e2535
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 59 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from setuptools import setup

VERSION = "1.1.2"
VERSION = "1.1.3"
SHORT_DESCRIPTION = "TweeterPy is a python library to extract data from Twitter. TweeterPy API lets you scrape data from a user's profile like username, userid, bio, followers/followings list, profile media, tweets, etc."

with open("requirements.txt") as file:
Expand Down
9 changes: 5 additions & 4 deletions tweeterpy/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class ApiUpdater:
Twitter updates its API quite frequently. Therefore, ApiUpdater checks for the latest updates and modifies the api_endpoints, feature_switches, path etc in constants.py
"""

def __init__(self, update_api=True):
def __init__(self, update_api=True, session=None):
self.__session = session
try:
logger.debug('Updating API...')
# fmt: off - Turns off formatting for this block of code.
Expand Down Expand Up @@ -59,7 +60,7 @@ def __init__(self, update_api=True):
# fmt: on

def _get_home_page_source(self):
return str(make_request(Path.BASE_URL))
return str(make_request(Path.BASE_URL, session=self.__session))

def _get_api_file_url(self, page_source=None):
if page_source is None:
Expand Down Expand Up @@ -88,12 +89,12 @@ def _get_main_file_url(self, page_source=None):
def _get_api_file_content(self, file_url=None):
if file_url is None:
file_url = self._get_api_file_url()
return str(make_request(file_url))
return str(make_request(file_url, session=self.__session))

def _get_main_file_content(self, file_url=None):
if file_url is None:
file_url = self._get_main_file_url()
return str(make_request(file_url))
return str(make_request(file_url, session=self.__session))

def _js_to_py_dict(sel, page_source):
if isinstance(page_source, list):
Expand Down
1 change: 0 additions & 1 deletion tweeterpy/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Configuration File
_DEFAULT_SESSION = None # Used to reuse generated session. DON'T CHANGE IT
_RATE_LIMIT_STATS = None # Used to keep a track of api limits. DON'T CHANGE IT

_USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36'
Expand Down
20 changes: 10 additions & 10 deletions tweeterpy/login_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@


class TaskHandler:
def __init__(self):
pass
def __init__(self, session=None):
self.__session = session

def _create_task_mapper(self, username, password, verification_input_data):
# fmt: off - Turns off formatting for this block of code. Just for the readability purpose.
Expand All @@ -32,19 +32,19 @@ def _get_flow_token(self):
'phone_verification': 4, 'privacy_options': 1, 'security_key': 3, 'select_avatar': 4, 'select_banner': 2,
'settings_list': 7, 'show_code': 1, 'sign_up': 2, 'sign_up_review': 4, 'tweet_selection_urt': 1, 'update_users': 1,
'upload_media': 1, 'user_recommendations_list': 4, 'user_recommendations_urt': 1, 'wait_spinner': 3, 'web_modal': 1}}
return make_request(Path.TASK_URL, method="POST", params=params, json=payload)
return make_request(Path.TASK_URL, method="POST", params=params, json=payload, session=self.__session)

def _get_javscript_instrumentation_subtask(self):
params = {'c_name': 'ui_metrics'}
return make_request(Path.JAVSCRIPT_INSTRUMENTATION_URL, params=params)
return make_request(Path.JAVSCRIPT_INSTRUMENTATION_URL, params=params, session=self.__session)

def _get_user_flow_token(self, flow_token, subtask_id="LoginJsInstrumentationSubtask"):
payload = {'flow_token': flow_token,
'subtask_inputs': [{'subtask_id': subtask_id,
'js_instrumentation': {
'response': '',
'link': 'next_link'}}]}
return make_request(Path.TASK_URL, method="POST", json=payload)
return make_request(Path.TASK_URL, method="POST", json=payload, session=self.__session)

@disable_logger
def _get_password_flow_token(self, flow_token, subtask_id="LoginEnterUserIdentifierSSO", username=None):
Expand All @@ -53,31 +53,31 @@ def _get_password_flow_token(self, flow_token, subtask_id="LoginEnterUserIdentif
'settings_list': {
'setting_responses': [{'key': 'user_identifier', 'response_data': {'text_data': {'result': username}}}],
'link': 'next_link'}}]}
return make_request(Path.TASK_URL, method="POST", json=payload)
return make_request(Path.TASK_URL, method="POST", json=payload, session=self.__session)

@disable_logger
def _get_account_duplication_flow_token(self, flow_token, subtask_id="LoginEnterPassword", password=None):
payload = {'flow_token': flow_token,
'subtask_inputs': [{'subtask_id': subtask_id,
'enter_password': {'password': password, 'link': 'next_link'}}]}
return make_request(Path.TASK_URL, method="POST", json=payload)
return make_request(Path.TASK_URL, method="POST", json=payload, session=self.__session)

def _check_suspicious_login(self, flow_token, subtask_id="DenyLoginSubtask"):
payload = {"flow_token": flow_token,
"subtask_inputs": [{"subtask_id": subtask_id, "cta": {"link": "next_link"}}]}
return make_request(Path.TASK_URL, method="POST", json=payload)
return make_request(Path.TASK_URL, method="POST", json=payload, session=self.__session)

def _check_account_duplication(self, flow_token, subtask_id="AccountDuplicationCheck"):
payload = {'flow_token': flow_token,
'subtask_inputs': [{'subtask_id': subtask_id, 'check_logged_in_account': {'link': 'AccountDuplicationCheck_false'}}]}
return make_request(Path.TASK_URL, method="POST", json=payload)
return make_request(Path.TASK_URL, method="POST", json=payload, session=self.__session)

def _handle_suspicious_login(self, flow_token, subtask_id="LoginAcid",verification_input_data=None):
payload = {"flow_token": flow_token,
"subtask_inputs": [{"subtask_id": subtask_id, "enter_text": {"text": verification_input_data,"link":"next_link"}}]}
handle_incorrect_input = True
while handle_incorrect_input:
response = make_request(Path.TASK_URL, method="POST", json=payload, skip_error_checking=True)
response = make_request(Path.TASK_URL, method="POST", json=payload, skip_error_checking=True, session=self.__session)
if isinstance(response, dict) and "errors" in response.keys():
error_message = "\n".join([error['message'] for error in response['errors']])
payload['subtask_inputs'][0]['enter_text']['text'] = str(input(f"{error_message} - Type again ==> "))
Expand Down
6 changes: 4 additions & 2 deletions tweeterpy/request_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@


def make_request(url, session=None, method=None, max_retries=None, timeout=None, skip_error_checking=False, **kwargs):
if session is None:
raise NameError("name 'session' is not defined.")
if not isinstance(session, requests.Session):
raise TypeError(f"Invalid session type. {session} is not a requests.Session Object...")
if method is None:
method = "GET"
if max_retries is None:
max_retries = config.MAX_RETRIES or 3
if session is None:
session = config._DEFAULT_SESSION or requests.Session()
if timeout is None:
timeout = config.TIMEOUT or 30
logger.debug(f"{locals()}")
Expand Down
18 changes: 8 additions & 10 deletions tweeterpy/session_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,10 @@ def _show_saved_sessions(directory_path=None):


def save_session(filename=None, path=None, session=None):
if session is None or not isinstance(session, requests.Session):
logger.warn(
"No Session object given. Trying to save existing/default Session...")
if config._DEFAULT_SESSION:
session = config._DEFAULT_SESSION
else:
raise TypeError(f'{session} is not a requests Session Object...')
if session is None:
raise NameError("name 'session' is not defined.")
if not isinstance(session, requests.Session):
raise TypeError(f"Invalid session type. {session} is not a requests.Session Object...")
if filename is None:
filename = str(
input("Enter Username/Account Name to Save the Session : ")).strip()
Expand All @@ -54,15 +51,16 @@ def save_session(filename=None, path=None, session=None):


def load_session(file_path=None, session=None):
if session is None:
raise NameError("name 'session' is not defined.")
if not isinstance(session, requests.Session):
raise TypeError(f"Invalid session type. {session} is not a requests.Session Object...")
if file_path is None:
file_path = _show_saved_sessions()
with open(file_path, "rb") as file:
headers, cookies = pickle.load(file)
if session is None:
session = config._DEFAULT_SESSION or requests.Session()
session.headers = headers
session.cookies = cookies
config._DEFAULT_SESSION = session
return session


Expand Down
70 changes: 39 additions & 31 deletions tweeterpy/tweeterpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ def __init__(self):
set_log_level(logging.ERROR, external_only=disable_external_only)
self.generate_session()
# update api endpoints
self.__token = self.session.headers.pop("Authorization")
self.__token = self.__session.headers.pop("Authorization")
try:
ApiUpdater(update_api=config.UPDATE_API)
ApiUpdater(update_api=config.UPDATE_API, session=self.__session)
except Exception as error:
logger.warn(error)
self.session.headers.update({"Authorization":self.__token})
self.__session.headers.update({"Authorization":self.__token})

def _generate_request_data(self, endpoint, variables=None, **kwargs):
# fmt: off - Turns off formatting for this block of code. Just for the readability purpose.
Expand All @@ -45,11 +45,11 @@ def _generate_request_data(self, endpoint, variables=None, **kwargs):
features = FeatureSwitch().get_query_features(endpoint) or util.generate_features(**kwargs)
query_params["features"] = json.dumps(features)
# fmt: on
request_payload = {"url": url, "params": query_params}
request_payload = {"url": url, "params": query_params, "session":self.__session}
logger.debug(f"Request Payload => {request_payload}")
return request_payload

def _handle_pagination(self, url, params, end_cursor=None, data_path=None, total=None, pagination=True):
def _handle_pagination(self, url, params, end_cursor=None, data_path=None, total=None, pagination=True, **kwargs):
# fmt: off - Turns off formatting for this block of code. Just for the readability purpose.
def filter_data(response):
filtered_data = []
Expand All @@ -65,13 +65,14 @@ def filter_data(response):
logger.warn("Either enable the pagination or disable total number of results.")
raise Exception("pagination cannot be disabled while the total number of results are specified.")
data_container = {"data": [],"cursor_endpoint": None, "has_next_page": True, "api_rate_limit":config._RATE_LIMIT_STATS}
session = kwargs.get("session", self.__session)
while data_container["has_next_page"]:
try:
if end_cursor:
variables = json.loads(params['variables'])
variables['cursor'] = end_cursor
params['variables'] = json.dumps(variables)
response = make_request(url, params=params)
response = make_request(url, params=params, session=session)
data = [item for item in reduce(
dict.get, data_path, response) if item['type'] == 'TimelineAddEntries'][0]['entries']
top_cursor = [
Expand Down Expand Up @@ -107,12 +108,11 @@ def filter_data(response):

@property
def session(self):
return self._session
return self.__session

@session.setter
def session(self, session):
self._session = session
config._DEFAULT_SESSION = session
self.__session = session

@property
def me(self):
Expand All @@ -125,9 +125,11 @@ def me(self):
"withSubscribedTab": True, "withCommunitiesCreation": True}
request_payload = self._generate_request_data(
Path.VIEWER_ENDPOINT, variables, user_data_features=True)
response = self.session.get(**request_payload)
try:
return response.json()
response = make_request(**request_payload)
if not isinstance(response, dict):
raise Exception(response)
return response
except:
logger.info("Guest Session")
return
Expand All @@ -151,25 +153,29 @@ def generate_session(self, auth_token=None):
"""
try:
logger.debug("Trying to generate a new session.")
self.session = requests.Session()
session = requests.Session()
if config.PROXY is not None:
self.session.proxies = config.PROXY
self.session.verify = False
self.session.headers.update(util.generate_headers())
# home_page = make_request(Path.BASE_URL, session=self.session)
home_page = util.handle_x_migration(session=self.session)
guest_token = make_request(
Path.GUEST_TOKEN_URL, method="POST", session=self.session).get('guest_token', util.find_guest_token(home_page))
self.session.headers.update({'X-Guest-Token': guest_token})
self.session.cookies.update({'gt': guest_token})
session.proxies = config.PROXY
session.verify = False
session.headers.update(util.generate_headers())
# home_page = make_request(Path.BASE_URL, session=session)
home_page = util.handle_x_migration(session=session)
try:
guest_token = make_request(
Path.GUEST_TOKEN_URL, method="POST", session=session).get('guest_token', util.find_guest_token(home_page))
except Exception as error:
logger.error(error)
session.headers.update({'X-Guest-Token': guest_token})
session.cookies.update({'gt': guest_token})
if auth_token:
self.session.cookies.update({'auth_token': auth_token})
util.generate_headers(self.session)
session.cookies.update({'auth_token': auth_token})
util.generate_headers(session)
except Exception as error:
logger.exception(f"Couldn't generate a new session.\n{error}\n")
raise
logger.debug("Session has been generated.")
return self.session
self.__session = session
return self.__session

def save_session(self, session=None, session_name=None):
"""Save a logged in session to avoid frequent logins in future.
Expand All @@ -182,7 +188,7 @@ def save_session(self, session=None, session_name=None):
path: Saved session file path.
"""
if session is None:
session = self.session
session = self.__session
if session_name is None:
session_name = self.me['data']['viewer']['user_results']['result']['legacy']['screen_name']
return save_session(filename=session_name, session=session)
Expand All @@ -197,17 +203,19 @@ def load_session(self, session_file_path=None, session=None):
Returns:
requests.Session: Restored session.
"""
self.session = load_session(
if session is None:
session = self.generate_session()
self.__session = load_session(
file_path=session_file_path, session=session)
return self.session
return self.__session

def logged_in(self):
"""Check if the user is logged in.
Returns:
bool: Returns True if the user is logged in.
"""
if "auth_token" in self.session.cookies.keys():
if "auth_token" in self.__session.cookies.keys():
# logger.info('User is authenticated.')
return True
return False
Expand All @@ -219,14 +227,14 @@ def login(self, username=None, password=None):
username (str, optional): Twitter username or email. Defaults to None.
password (str, optional): Password. Defaults to None.
"""
if "auth_token" in self.session.cookies.keys():
if "auth_token" in self.__session.cookies.keys():
self.generate_session()
if username is None:
username = str(input("Enter Your Username or Email : ")).strip()
if password is None:
password = getpass.getpass()
TaskHandler().login(username, password)
util.generate_headers(session=self.session)
TaskHandler(session=self.__session).login(username, password)
util.generate_headers(session=self.__session)
try:
user = self.me
username = util.find_nested_key(user, 'screen_name')
Expand Down

0 comments on commit f2e2535

Please sign in to comment.