Skip to content

Commit

Permalink
Deployment improvements (#95)
Browse files Browse the repository at this point in the history
* improve everything

* improve cli

* fix test

* fix
  • Loading branch information
srossross authored Sep 12, 2024
1 parent 6325bb2 commit a7a8984
Show file tree
Hide file tree
Showing 13 changed files with 298 additions and 62 deletions.
33 changes: 33 additions & 0 deletions example-packages.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"pypi": [
"flask",
"captchaai",
"dnsblock-update",
"django-projector",
"OAuthClientUser",
"dashing",
"hay-say-common",
"django-projector",
"junhao-sales",
"slacktools",
"obsidian-metadata",
"smartstatic",
"smartpip",
"smartdb",
"smartpivot",
"smartchart",
"smartui",
"dnsblock-update",
"dashing",
"nesterSPC",
"aonetest",
"obsidian-metadata",
"biogeme-optimization",
"nesterSPC",
"aonetest",
"django-passwordless"
],
"conda": [
"flask"
]
}
130 changes: 111 additions & 19 deletions score/cli.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import os
import click
import duckdb

import json
import pandas as pd
from .conda.get_conda_package_names import get_conda_package_names
from .conda.scrape_conda import scrape_conda
from .pypi.json_scraper import scrape_json
from .conda.scrape_conda import scrape_conda, conda_schema
from .pypi.json_scraper import scrape_json, pypi_schema
from .pypi.pypi_downloads import get_bulk_download_counts
from .logger import setup_logger
from .pypi.get_pypi_package_list import get_pypi_package_names
Expand Down Expand Up @@ -62,19 +63,25 @@ def cli():
default=PREP_PYPI_DIR,
help="The output directory to save the scraped data in hive partition",
)
@click.option("--only", "only_packages", multiple=True)
@partition_option
@num_partitions_option
def scrape_pypi(num_partitions, partition, output):
packages = get_pypi_package_names(num_partitions, partition)
def scrape_pypi(num_partitions, partition, output, only_packages):
if only_packages:
packages = only_packages
else:
packages = get_pypi_package_names(num_partitions, partition)

click.echo(
f"Will process {len(packages)} packages in partition {partition} of {num_partitions}"
)

df = scrape_json(packages)
df["partition"] = partition
df["insert_ts"] = pd.Timestamp.now()

click.echo(f"Saving data to {output}")
df.to_parquet(output, partition_cols=["partition"])
df.to_parquet(output, partition_cols=["partition"], schema=pypi_schema)
click.echo("Pypi Scraping complete")


Expand Down Expand Up @@ -109,19 +116,24 @@ def scrape_pypi_downloads(output):
default="conda-forge",
help="The conda channel to scrape packages from",
)
@click.option("--only", "only_packages", multiple=True)
@partition_option
@num_partitions_option
def conda(num_partitions, partition, output, channel):
packages = get_conda_package_names(num_partitions, partition, channel)
def conda(num_partitions, partition, output, channel, only_packages):
if only_packages:
packages = only_packages
else:
packages = get_conda_package_names(num_partitions, partition, channel)
click.echo(
f"Will process {len(packages)} packages in partition {partition} of {num_partitions}"
)
df = scrape_conda(channel, packages)
df["partition"] = partition
df["channel"] = channel
df["insert_ts"] = pd.Timestamp.now()

click.echo(f"Saving data to {output}")
df.to_parquet(output, partition_cols=["channel", "partition"])
df.to_parquet(output, partition_cols=["channel", "partition"], schema=conda_schema)
click.echo("Conda Scraping complete")


Expand Down Expand Up @@ -224,9 +236,12 @@ def git(input, num_partitions, partition, output):

df = scrape_git(urls)
df["partition"] = partition
df["insert_ts"] = pd.Timestamp.now()

click.echo(f"Saving data to {output}")

for field in git_schema:
if field.name not in df.columns:
df[field.name] = None
df.to_parquet(output, partition_cols=["partition"], schema=git_schema)
click.echo("Git Scraping complete")

Expand Down Expand Up @@ -281,13 +296,13 @@ def coalesce(
db.execute("CREATE SECRET (TYPE GCS);")

to_coalesce = [
("conda", conda_input, conda_output, None),
("pypi", pypi_input, pypi_output, None),
("conda", conda_input, conda_output, conda_schema),
("pypi", pypi_input, pypi_output, pypi_schema),
("git", git_input, git_output, git_schema),
]
for name, input_path, output_path, schema in to_coalesce:
click.echo(f"Reading data from {name} {input_path} into memory")
git_df = db.execute(
df = db.execute(
f"""
select * from read_parquet('{input_path}/**/*.parquet')
where (partition % {num_partitions}) = {partition}
Expand All @@ -296,8 +311,8 @@ def coalesce(

click.echo(f"Saving data to {output_path}")

git_df["partition"] = partition
git_df.to_parquet(output_path, partition_cols=["partition"], schema=schema)
df["partition"] = partition
df.to_parquet(output_path, partition_cols=["partition"], schema=schema)
click.echo("Coalesce complete")


Expand Down Expand Up @@ -336,14 +351,18 @@ def score(git_input, pypi_input, conda_input, output, note_output):
db.execute(
f"""
CREATE TABLE pypi AS
SELECT name, version, source_url FROM read_parquet('{pypi_input}/*/*.parquet')
SELECT name, version, release_date, source_url
FROM read_parquet('{pypi_input}/**/*.parquet')
QUALIFY ROW_NUMBER() OVER (PARTITION BY name ORDER BY insert_ts DESC) = 1
"""
)
click.echo(f"Reading data from conda {conda_input} into memory")
db.execute(
f"""
CREATE TABLE conda AS
SELECT full_name, latest_version, source_url FROM read_parquet('{conda_input}/**/*.parquet')
SELECT full_name, latest_version, release_date, source_url
FROM read_parquet('{conda_input}/**/*.parquet')
QUALIFY ROW_NUMBER() OVER (PARTITION BY full_name ORDER BY insert_ts DESC) = 1
"""
)

Expand All @@ -352,20 +371,93 @@ def score(git_input, pypi_input, conda_input, output, note_output):
db.execute(
f"""
CREATE TABLE git AS
select * from read_parquet('{git_input}/*/*.parquet')
select *
FROM read_parquet('{git_input}/**/*.parquet')
QUALIFY ROW_NUMBER() OVER (PARTITION BY source_url ORDER BY insert_ts DESC) = 1
"""
)

click.echo("Processing score")
print(db.query("select source_url, error from git").df())
df = create_scores(db)
click.echo(f"Saving data to {output}")
df.to_parquet(output, schema=score_schema)

note_df = notes.to_df()
click.echo(f"Saving data to {note_output}")
note_df.to_parquet(note_output)
click.echo("Score complete")


@cli.command()
@click.option("-o", "--output-root", default="full")
@click.option("--config", default="example-packages.json")
@click.pass_context
def full_run(ctx, config, output_root):

with open(config) as fd:
config_data = json.load(fd)

prep_conda_path = f"{output_root}/prep/conda"
prep_pypi_path = f"{output_root}/prep/pypi"
prep_git_path = f"{output_root}/prep/git"
conda_path = f"{output_root}/conda"
pypi_path = f"{output_root}/pypi"
git_path = f"{output_root}/git"

source_url_path = f"{output_root}/source-urls.parquet"
score_path = f"{output_root}/score.parquet"
notes_path = f"{output_root}/notes.parquet"

ctx.invoke(
scrape_pypi,
output=prep_pypi_path,
num_partitions=1,
partition=0,
only_packages=config_data["pypi"],
)
ctx.invoke(
conda,
output=prep_conda_path,
num_partitions=1,
partition=0,
only_packages=config_data["conda"],
)
ctx.invoke(
agg_source_urls,
output=source_url_path,
pypi_input=prep_pypi_path,
conda_input=prep_conda_path,
)

ctx.invoke(
git,
input=source_url_path,
output=prep_git_path,
num_partitions=1,
partition=0,
)

ctx.invoke(
coalesce,
git_input=prep_git_path,
pypi_input=prep_pypi_path,
conda_input=prep_conda_path,
git_output=git_path,
pypi_output=pypi_path,
conda_output=conda_path,
num_partitions=1,
partition=0,
)

ctx.invoke(
score,
git_input=git_path,
pypi_input=pypi_path,
conda_input=conda_path,
output=score_path,
note_output=notes_path,
)


if __name__ == "__main__":
cli()
19 changes: 18 additions & 1 deletion score/conda/scrape_conda.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
from typing import List
import pandas as pd
import pyarrow as pa
from tqdm import tqdm

from dateutil.parser import parse as parsedate
from ..utils.request_session import get_session

CONDA_PACKAGE_URL_TEMPLATE = "https://api.anaconda.org/package/{channel}/{package}"


conda_schema = pa.schema(
[
("partition", pa.int32()),
("insert_ts", pa.timestamp("ns")),
("name", pa.string()),
("channel", pa.string()),
("full_name", pa.string()),
("source_url", pa.string()),
("latest_version", pa.string()),
("ndownloads", pa.int64()),
("release_date", pa.timestamp("ns")),
]
)


def scrape_conda(channel, package_names: List[str]) -> pd.DataFrame:
"""
Scrapes metadata for a list of Conda packages from the Anaconda API and returns the data as a DataFrame.
Expand Down Expand Up @@ -44,6 +60,7 @@ def scrape_conda(channel, package_names: List[str]) -> pd.DataFrame:
"source_url": source_url,
"latest_version": package_data["latest_version"],
"ndownloads": ndownloads,
"release_date": parsedate(package_data["modified_at"]),
}
)

Expand Down
1 change: 0 additions & 1 deletion score/git_vcs/license_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ def identify_license(license_content: str) -> dict:

@lru_cache
def get_all_licenses():
print("license_dir", license_dir)
licenses = {}
for license_file in license_dir.glob("*"):
with open(license_file, "rb") as f:
Expand Down
13 changes: 8 additions & 5 deletions score/git_vcs/scrape.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
import logging
import os
from contextlib import contextmanager
from concurrent.futures import ThreadPoolExecutor

from ..utils.map import do_map
from .license_detection import identify_license
from .check_url import check_url
from ..notes import Note
Expand Down Expand Up @@ -71,6 +70,7 @@ def clone_repo(url):
git_schema = pa.schema(
[
("partition", pa.int32()),
("insert_ts", pa.timestamp("ns")),
("source_url", pa.string()),
("error", pa.int32()),
("recent_authors_count", pa.int32()),
Expand Down Expand Up @@ -125,9 +125,8 @@ def scrape_git(urls: list) -> pd.DataFrame:
how closely the detected license matches the best-known license text (value between 0 and 1).
"""

exec = ThreadPoolExecutor(16)
all_data = list(
tqdm(exec.map(create_git_metadata, urls), total=len(urls), disable=None)
tqdm(do_map(create_git_metadata, urls), total=len(urls), disable=None)
)

df = pd.DataFrame(all_data)
Expand Down Expand Up @@ -230,5 +229,9 @@ def get_pypackage_name(repo: Repo) -> Optional[str]:
except (toml.TomlDecodeError, IndexError, IOError) as err:
log.error(f"Error reading pyproject.toml: {err}")
return None
name = data.get("project", {}).get("name")

if not name:
return None

return data.get("project", {}).get("name")
return name.lower().replace("_", "-")
2 changes: 1 addition & 1 deletion score/git_vcs/test_git_scrape.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ def test_scrape_flask():
assert metadata
print(metadata)
assert metadata["license"]["kind"] == "BSD"
assert metadata["py_package"] == "Flask"
assert metadata["py_package"] == "flask"
4 changes: 4 additions & 0 deletions score/notes.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ def __init__(self, note):

LAST_COMMIT_OVER_A_YEAR = "The last commit was over a year ago"

PACKGE_SKEW_NOT_UPDATED = "Package is out of sync with the source code"

PACKGE_SKEW_NOT_RELEASED = "Package is ahead of the source code"


def to_df():
return pd.DataFrame.from_records(
Expand Down
2 changes: 1 addition & 1 deletion score/pypi/get_pypi_package_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def get_all_pypi_package_names():
package_names = re.findall(
r'<a href="/simple/([^/]+)/">', response.text
) # Extract package names
return package_names
return [p.lower().replace("_", "-") for p in package_names]


def get_pypi_package_names(num_partitions: int, partition: int):
Expand Down
Loading

0 comments on commit a7a8984

Please sign in to comment.