Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExtractフェーズのCommunityNoteローデータの抽出 #66

Merged
merged 15 commits into from
May 30, 2024
Merged
2 changes: 1 addition & 1 deletion etl/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ authors = [
{name = "yu23ki14"}
]
dependencies = [
"birdxplorer_common @ git+https://github.com/codeforjapan/BirdXplorer.git@feature/issue-53-divide-python-packages#subdirectory=common",
"birdxplorer_common[dev] @ git+https://github.com/codeforjapan/BirdXplorer.git@etl/main#subdirectory=common",
"pandas",
"sqlalchemy",
"requests",
Expand Down
26 changes: 17 additions & 9 deletions etl/src/birdxplorer_etl/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,47 @@
import requests
import stringcase
from prefect import get_run_logger
from sqlalchemy.orm import Session

import birdxplorer_common
import birdxplorer_common.models
from birdxplorer_common.storage import RowNoteRecord

from .lib.sqlite.init import init_db
from . import settings


def extract_data():
def extract_data(db: Session):
logger = get_run_logger()
logger.info("Downloading community notes data")

db = init_db()

# 最新のNoteデータを取得
# Noteデータを取得してSQLiteに保存
date = datetime.now()
latest_note = db.query(RowNoteRecord).order_by(RowNoteRecord.created_at_millis.desc()).first()

while True:
if (
latest_note
and int(latest_note.created_at_millis) / 1000
> datetime.timestamp(date) - 24 * 60 * 60 * settings.COMMUNITY_NOTE_DAYS_AGO
):
break
url = f'https://ton.twimg.com/birdwatch-public-data/{date.strftime("%Y/%m/%d")}/notes/notes-00000.tsv'
logger.info(url)
res = requests.get(url)

if res.status_code == 200:
# res.contentをdbのNoteテーブル
tsv_data = res.content.decode("utf-8").splitlines()
reader = csv.DictReader(tsv_data, delimiter="\t")
reader.fieldnames = [stringcase.snakecase(field) for field in reader.fieldnames]

for row in reader:
db.add(row)
db.add(RowNoteRecord(**row))
break
date = date - timedelta(days=1)

db.commit()

db.query(birdxplorer_common.models.Note).first()
row1 = db.query(RowNoteRecord).first()
logger.info(row1)

# # Noteに紐づくtweetデータを取得
# for note in notes_data:
Expand Down
13 changes: 10 additions & 3 deletions etl/src/birdxplorer_etl/lib/sqlite/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,26 @@
import os

from prefect import get_run_logger
from sqlalchemy import create_engine
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker

from birdxplorer_common.storage import Base
from birdxplorer_common.storage import RowNoteRecord


def init_db():
logger = get_run_logger()

# ToDo: dbファイルをS3など外部に置く必要がある。
db_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", "data", "note.db"))
logger.info(f"Initializing database at {db_path}")
engine = create_engine("sqlite:///" + db_path)
Base.metadata.create_all(engine)

# 一時データベースのテーブル作成する
# ToDo: noteテーブル以外に必要なものを追加
if not inspect(engine).has_table("note"):
logger.info("Creating table note")
RowNoteRecord.metadata.create_all(engine)

Session = sessionmaker(bind=engine)

return Session()
20 changes: 14 additions & 6 deletions etl/src/birdxplorer_etl/main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
from prefect import flow, task
from sqlalchemy.orm import Session

from .extract import extract_data
from .lib.sqlite.init import init_db
from .load import load_data
from .transform import transform_data


@task
def extract():
extract_data()
def initialize():
db = init_db()
return {"db": db}


@task
def extract(db: Session):
extract_data(db)


@task
Expand All @@ -22,9 +30,9 @@ def load():

@flow
def run_etl():
e = extract()
t = transform()
l = load()

i = initialize()
_ = extract(i["db"])
_ = transform()
_ = load()

run_etl()
3 changes: 3 additions & 0 deletions etl/src/birdxplorer_etl/settings.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
TARGET_TWITTER_POST_START_UNIX_MILLISECOND = 1577836800000
TARGET_TWITTER_POST_END_UNIX_MILLISECOND = 1577836799000

# Extractで何日前のデータを最新と定義するか。開発中は3日前が楽。
COMMUNITY_NOTE_DAYS_AGO = 3
Loading