From 91624505697a3ec8171ab8cfdb57c81559b3590a Mon Sep 17 00:00:00 2001 From: Henry Lee Date: Sat, 11 May 2024 11:59:28 +0800 Subject: [PATCH 1/4] feat(ods): scrape twitter post and insights data --- dags/ods/twitter_post_insights/dags.py | 37 ++++++ dags/ods/twitter_post_insights/udfs.py | 162 +++++++++++++++++++++++++ 2 files changed, 199 insertions(+) create mode 100644 dags/ods/twitter_post_insights/dags.py create mode 100644 dags/ods/twitter_post_insights/udfs.py diff --git a/dags/ods/twitter_post_insights/dags.py b/dags/ods/twitter_post_insights/dags.py new file mode 100644 index 0000000..84a1d14 --- /dev/null +++ b/dags/ods/twitter_post_insights/dags.py @@ -0,0 +1,37 @@ +from datetime import datetime, timedelta +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from ods.twitter_post_insights import udfs + + +DEFAULT_ARGS = { + "owner": "Henry Lee", + "depends_on_past": False, + "start_date": datetime(2023, 6, 14, 0), + "retries": 2, + "retry_delay": timedelta(minutes=5), + "on_failure_callback": lambda x: "Need to send notification to Discord!", +} +dag = DAG( + "TWITTER_POST_INSIGHTS_V1", + default_args=DEFAULT_ARGS, + schedule_interval="5 8 * * *", + max_active_runs=1, + catchup=True, +) +with dag: + CREATE_TABLE_IF_NEEDED = PythonOperator( + task_id="CREATE_TABLE_IF_NEEDED", + python_callable=udfs.create_table_if_needed, + ) + + SAVE_TWITTER_POSTS_AND_INSIGHTS = PythonOperator( + task_id="SAVE_TWITTER_POSTS_AND_INSIGHTS", + python_callable=udfs.save_twitter_posts_and_insights, + ) + + CREATE_TABLE_IF_NEEDED >> SAVE_TWITTER_POSTS_AND_INSIGHTS + + +if __name__ == "__main__": + dag.cli() diff --git a/dags/ods/twitter_post_insights/udfs.py b/dags/ods/twitter_post_insights/udfs.py new file mode 100644 index 0000000..7f2357b --- /dev/null +++ b/dags/ods/twitter_post_insights/udfs.py @@ -0,0 +1,162 @@ +import logging +import os +import requests +from typing import List, Optional +from airflow.models import Variable +from google.cloud import bigquery +from datetime import datetime + + +logger = logging.getLogger(__name__) + + +def create_table_if_needed() -> None: + client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) + post_sql = """ + CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_twitter_posts` ( + id STRING, + created_at TIMESTAMP, + message STRING + ) + """ + client.query(post_sql) + insights_sql = """ + CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_twitter_posts_insights` ( + post_id STRING, + query_time TIMESTAMP, + period STRING, + favorite NULLABLE INTEGER, + reply NULLABLE INTEGER, + retweet NULLABLE INTEGER, + views NULLABLE INTEGER + ) + """ + client.query(insights_sql) + + +def save_twitter_posts_and_insights() -> None: + posts = request_posts_data() + + last_post = query_last_post() + if last_post is None: + new_posts = posts + else: + new_posts = [ + post for post in posts if post["timestamp"] > last_post["created_at"] + ] + + if not dump_posts_to_bigquery( + [ + { + "id": post["tweet_id"], + "created_at": post["timestamp"], + "message": post["text"], + } + for post in new_posts + ] + ): + raise RuntimeError("Failed to dump posts to BigQuery") + + if not dump_posts_insights_to_bigquery( + [ + { + "id": post["tweet_id"], + "query_time": datetime.now(), + "period": "lifetime", + "favorite": post["favorite_count"], + "reply": post["reply_count"], + "retweet": post["retweet_count"], + "views": post["views"], + } + for post in posts + ] + ): + raise RuntimeError("Failed to dump posts insights to BigQuery") + + +def query_last_post() -> Optional[dict]: + client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) + sql = """ + SELECT + * + FROM + `pycontw-225217.ods.ods_pycontw_twitter_posts` + ORDER BY + created_at DESC + LIMIT 1 + """ + result = client.query(sql) + data = list(result) + return data[0] if data else None + + +def request_posts_data() -> List[dict]: + url = "https://twitter154.p.rapidapi.com/v2/UserTweets/" + # 499339900 is PyConTW's twitter id + querystring = {"id": "499339900", "count": "1"} + headers = { + "X-RapidAPI-Key": Variable.get("RAPIDAPIAPI_KEY"), + "X-RapidAPI-Host": "twitter154.p.rapidapi.com", + } + response = requests.get(url, headers=headers, params=querystring) + if response.ok: + return response.json()["results"] + raise RuntimeError(f"Failed to fetch posts data: {response.text}") + + +def dump_posts_to_bigquery(posts: List[dict]) -> bool: + if not posts: + logger.info("No posts to dump!") + return True + + client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) + job_config = bigquery.LoadJobConfig( + schema=[ + bigquery.SchemaField("id", "STRING", mode="REQUIRED"), + bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField("message", "STRING", mode="REQUIRED"), + ], + write_disposition="WRITE_APPEND", + ) + try: + job = client.load_table_from_json( + posts, + "pycontw-225217.ods.ods_pycontw_twitter_posts", + job_config=job_config, + ) + job.result() + return True + except Exception as e: + logger.error(f"Failed to dump posts to BigQuery: {e}", exc_info=True) + return False + + +def dump_posts_insights_to_bigquery(posts: List[dict]) -> bool: + if not posts: + logger.info("No post insights to dump!") + return True + + client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) + job_config = bigquery.LoadJobConfig( + schema=[ + bigquery.SchemaField("post_id", "STRING", mode="REQUIRED"), + bigquery.SchemaField("query_time", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField("period", "STRING", mode="REQUIRED"), + bigquery.SchemaField("favorite", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("reply", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("retweet", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("views", "INTEGER", mode="NULLABLE"), + ], + write_disposition="WRITE_APPEND", + ) + try: + job = client.load_table_from_json( + posts, + "pycontw-225217.ods.ods_pycontw_twitter_posts_insights", + job_config=job_config, + ) + job.result() + return True + except Exception as e: + logger.error(f"Failed to dump posts insights to BigQuery: {e}", exc_info=True) + return False From 831dc8954b646625f6cfa7a435e4ea7b973ee91f Mon Sep 17 00:00:00 2001 From: Henry Lee Date: Sun, 12 May 2024 16:19:11 +0800 Subject: [PATCH 2/4] fix(ods): twitter post scraper runtime error --- dags/ods/twitter_post_insights/dags.py | 2 +- dags/ods/twitter_post_insights/udfs.py | 26 +++++++++++++++++--------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/dags/ods/twitter_post_insights/dags.py b/dags/ods/twitter_post_insights/dags.py index 84a1d14..646a9a5 100644 --- a/dags/ods/twitter_post_insights/dags.py +++ b/dags/ods/twitter_post_insights/dags.py @@ -17,7 +17,7 @@ default_args=DEFAULT_ARGS, schedule_interval="5 8 * * *", max_active_runs=1, - catchup=True, + catchup=False, ) with dag: CREATE_TABLE_IF_NEEDED = PythonOperator( diff --git a/dags/ods/twitter_post_insights/udfs.py b/dags/ods/twitter_post_insights/udfs.py index 7f2357b..6f770e4 100644 --- a/dags/ods/twitter_post_insights/udfs.py +++ b/dags/ods/twitter_post_insights/udfs.py @@ -25,10 +25,10 @@ def create_table_if_needed() -> None: post_id STRING, query_time TIMESTAMP, period STRING, - favorite NULLABLE INTEGER, - reply NULLABLE INTEGER, - retweet NULLABLE INTEGER, - views NULLABLE INTEGER + favorite INTEGER, + reply INTEGER, + retweet INTEGER, + views INTEGER ) """ client.query(insights_sql) @@ -42,7 +42,9 @@ def save_twitter_posts_and_insights() -> None: new_posts = posts else: new_posts = [ - post for post in posts if post["timestamp"] > last_post["created_at"] + post + for post in posts + if post["timestamp"] > last_post["created_at"].timestamp() ] if not dump_posts_to_bigquery( @@ -60,8 +62,8 @@ def save_twitter_posts_and_insights() -> None: if not dump_posts_insights_to_bigquery( [ { - "id": post["tweet_id"], - "query_time": datetime.now(), + "post_id": post["tweet_id"], + "query_time": datetime.now().timestamp(), "period": "lifetime", "favorite": post["favorite_count"], "reply": post["reply_count"], @@ -91,9 +93,15 @@ def query_last_post() -> Optional[dict]: def request_posts_data() -> List[dict]: - url = "https://twitter154.p.rapidapi.com/v2/UserTweets/" + url = "https://twitter154.p.rapidapi.com/user/tweets" # 499339900 is PyConTW's twitter id - querystring = {"id": "499339900", "count": "1"} + querystring = { + "username": "pycontw", + "user_id": "96479162", + "limit": "40", + "include_replies": "false", + "include_pinned": "false", + } headers = { "X-RapidAPI-Key": Variable.get("RAPIDAPIAPI_KEY"), "X-RapidAPI-Host": "twitter154.p.rapidapi.com", From 2df871ae769d0e7f9c9c12cb2bfdeac789f85d47 Mon Sep 17 00:00:00 2001 From: Henry Lee Date: Sat, 18 May 2024 09:11:44 +0800 Subject: [PATCH 3/4] style(ods): formatting --- dags/ods/twitter_post_insights/dags.py | 5 ++--- dags/ods/twitter_post_insights/udfs.py | 6 +++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/dags/ods/twitter_post_insights/dags.py b/dags/ods/twitter_post_insights/dags.py index 646a9a5..3ae1f3f 100644 --- a/dags/ods/twitter_post_insights/dags.py +++ b/dags/ods/twitter_post_insights/dags.py @@ -1,9 +1,9 @@ from datetime import datetime, timedelta + from airflow import DAG from airflow.operators.python_operator import PythonOperator from ods.twitter_post_insights import udfs - DEFAULT_ARGS = { "owner": "Henry Lee", "depends_on_past": False, @@ -21,8 +21,7 @@ ) with dag: CREATE_TABLE_IF_NEEDED = PythonOperator( - task_id="CREATE_TABLE_IF_NEEDED", - python_callable=udfs.create_table_if_needed, + task_id="CREATE_TABLE_IF_NEEDED", python_callable=udfs.create_table_if_needed, ) SAVE_TWITTER_POSTS_AND_INSIGHTS = PythonOperator( diff --git a/dags/ods/twitter_post_insights/udfs.py b/dags/ods/twitter_post_insights/udfs.py index 6f770e4..fe35230 100644 --- a/dags/ods/twitter_post_insights/udfs.py +++ b/dags/ods/twitter_post_insights/udfs.py @@ -1,11 +1,11 @@ import logging import os -import requests +from datetime import datetime from typing import List, Optional + +import requests from airflow.models import Variable from google.cloud import bigquery -from datetime import datetime - logger = logging.getLogger(__name__) From 12a49b68d3d3eb63b551ef51e5850459aca188b1 Mon Sep 17 00:00:00 2001 From: Henry Lee Date: Wed, 22 May 2024 23:16:17 +0800 Subject: [PATCH 4/4] fix(ods): apply suggestion from code review --- dags/ods/twitter_post_insights/udfs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/ods/twitter_post_insights/udfs.py b/dags/ods/twitter_post_insights/udfs.py index fe35230..a68405b 100644 --- a/dags/ods/twitter_post_insights/udfs.py +++ b/dags/ods/twitter_post_insights/udfs.py @@ -80,7 +80,7 @@ def query_last_post() -> Optional[dict]: client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) sql = """ SELECT - * + created_at FROM `pycontw-225217.ods.ods_pycontw_twitter_posts` ORDER BY @@ -97,7 +97,7 @@ def request_posts_data() -> List[dict]: # 499339900 is PyConTW's twitter id querystring = { "username": "pycontw", - "user_id": "96479162", + "user_id": "499339900", "limit": "40", "include_replies": "false", "include_pinned": "false",