Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QS: Migrate to pkgutil for databases #985

Open
wants to merge 3 commits into
base: quantumstrand
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions floss/qs/db/expert.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re
import pathlib
import pkgutil
from typing import Set, Dict, List, Tuple, Literal, Sequence
from dataclasses import dataclass

Expand Down Expand Up @@ -51,13 +52,13 @@ def query(self, s: str) -> Set[str]:
return ret

@classmethod
def from_file(cls, path: pathlib.Path) -> "ExpertStringDatabase":
def load_database(cls, buf: bytes) -> "ExpertStringDatabase":
string_rules: Dict[str, ExpertRule] = {}
substring_rules: List[ExpertRule] = []
regex_rules: List[Tuple[ExpertRule, re.Pattern]] = []

decoder = msgspec.json.Decoder(type=ExpertRule)
buf = path.read_bytes()

for line in buf.split(b"\n"):
if not line:
continue
Expand All @@ -81,9 +82,20 @@ def from_file(cls, path: pathlib.Path) -> "ExpertStringDatabase":
regex_rules=regex_rules,
)

@classmethod
def from_file(cls, path: pathlib.Path) -> "ExpertStringDatabase":
return cls.load_database(path.read_bytes())

@classmethod
def from_pkgutil(cls, package: str, path: str) -> "ExpertStringDatabase":
return cls.load_database(pkgutil.get_data(package, path))


DEFAULT_PATHS = (pathlib.Path(floss.qs.db.__file__).parent / "data" / "expert" / "capa.jsonl",)
DEFAULT_PATHS = ( "data/expert/capa.jsonl",)


def get_default_databases() -> Sequence[ExpertStringDatabase]:
return [ExpertStringDatabase.from_file(path) for path in DEFAULT_PATHS]
# To use from_file
# return [ExpertStringDatabase.from_file(pathlib.Path(floss.qs.db.__file__).parent / path) for path in DEFAULT_PATHS]

return [ExpertStringDatabase.from_pkgutil("floss.qs.db", path) for path in DEFAULT_PATHS]
53 changes: 38 additions & 15 deletions floss/qs/db/gp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import gzip
import hashlib
import pathlib
import pkgutil
import datetime
from typing import Set, Dict, List, Literal, Optional, Sequence
from collections import defaultdict
Expand Down Expand Up @@ -57,13 +58,13 @@ def new_db(cls, note: Optional[str] = None):
)

@classmethod
def from_file(cls, path: pathlib.Path, compress: bool = True) -> "StringGlobalPrevalenceDatabase":
def load_database(cls, buf: bytes, compress: bool = True) -> "StringGlobalPrevalenceDatabase":
metadata_by_string: Dict[str, List[StringGlobalPrevalence]] = defaultdict(list)

if compress:
lines = gzip.decompress(path.read_bytes()).split(b"\n")
lines = gzip.decompress(buf).split(b"\n")
else:
lines = path.read_bytes().split(b"\n")
lines = buf.split(b"\n")

decoder = msgspec.json.Decoder(type=StringGlobalPrevalence)
for line in lines[1:]:
Expand All @@ -77,6 +78,14 @@ def from_file(cls, path: pathlib.Path, compress: bool = True) -> "StringGlobalPr
meta=msgspec.json.Decoder(type=Metadata).decode(lines[0]),
metadata_by_string=metadata_by_string,
)

@classmethod
def from_file(cls, path: pathlib.Path, compress: bool = True) -> "StringGlobalPrevalenceDatabase":
return cls.load_database(path.read_bytes(), compress)

@classmethod
def from_pkgutil(cls, package: str, path: str, compress: bool = True) -> "StringGlobalPrevalenceDatabase":
return cls.load_database(pkgutil.get_data(package, path), compress)

def to_file(self, outfile: str, compress: bool = True):
if compress:
Expand Down Expand Up @@ -112,32 +121,46 @@ def __contains__(self, other: bytes | str) -> bool:
raise ValueError("other must be bytes or str")

@classmethod
def from_file(cls, path: pathlib.Path) -> "StringHashDatabase":
def load_database(cls, buf: bytes) -> "StringHashDatabase":
string_hashes: Set[bytes] = set()

buf = path.read_bytes()

for i in range(0, len(buf), 8):
string_hashes.add(buf[i : i + 8])

return cls(
string_hashes=string_hashes,
)

@classmethod
def from_file(cls, path: pathlib.Path) -> "StringHashDatabase":
return cls.load_database(path.read_bytes())

@classmethod
def from_pkgutil(cls, package: str, path: str) -> "StringHashDatabase":
return cls.load_database(pkgutil.get_data(package, path))


DEFAULT_PATHS = (
pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "gp.jsonl.gz",
pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "cwindb-native.jsonl.gz",
pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "cwindb-dotnet.jsonl.gz",
pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "xaa-hashes.bin",
pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "yaa-hashes.bin",
"data/gp/gp.jsonl.gz",
"data/gp/cwindb-native.jsonl.gz",
"data/gp/cwindb-dotnet.jsonl.gz",
"data/gp/xaa-hashes.bin",
"data/gp/yaa-hashes.bin",
)


def get_default_databases() -> Sequence[StringGlobalPrevalenceDatabase | StringHashDatabase]:
# To use from_file
# return [
# StringGlobalPrevalenceDatabase.from_file(pathlib.Path(floss.qs.db.__file__).parent / path)
# if path.endswith(".jsonl.gz")
# else StringHashDatabase.from_file(pathlib.Path(floss.qs.db.__file__).parent / path)
# for path in DEFAULT_PATHS
# ]

return [
StringGlobalPrevalenceDatabase.from_file(path)
if path.name.endswith(".jsonl.gz")
else StringHashDatabase.from_file(path)
for path in DEFAULT_PATHS
StringGlobalPrevalenceDatabase.from_pkgutil("floss.qs.db", path)
if path.endswith(".jsonl.gz")
else StringHashDatabase.from_pkgutil("floss.qs.db", path)
for path in DEFAULT_PATHS
]
22 changes: 16 additions & 6 deletions floss/qs/db/oss.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import gzip
import pathlib
import pkgutil
from typing import Dict, Sequence
from dataclasses import dataclass

Expand All @@ -25,17 +26,25 @@ def __len__(self) -> int:
return len(self.metadata_by_string)

@classmethod
def from_file(cls, path: pathlib.Path) -> "OpenSourceStringDatabase":
def load_database(cls, buf: bytes) -> "OpenSourceStringDatabase":
metadata_by_string: Dict[str, OpenSourceString] = {}
decoder = msgspec.json.Decoder(type=OpenSourceString)
for line in gzip.decompress(path.read_bytes()).split(b"\n"):
for line in gzip.decompress(buf).split(b"\n"):
if not line:
continue
s = decoder.decode(line)
metadata_by_string[s.string] = s

return cls(metadata_by_string=metadata_by_string)

@classmethod
def from_file(cls, path: pathlib.Path) -> "OpenSourceStringDatabase":
return cls.load_database(path.read_bytes())

@classmethod
def from_pkgutil(cls, package: str, path: str) -> "OpenSourceStringDatabase":
return cls.load_database(pkgutil.get_data(package, path))


DEFAULT_FILENAMES = (
"brotli.jsonl.gz",
Expand All @@ -57,10 +66,11 @@ def from_file(cls, path: pathlib.Path) -> "OpenSourceStringDatabase":
"zlib.jsonl.gz",
)

DEFAULT_PATHS = tuple(
pathlib.Path(floss.qs.db.__file__).parent / "data" / "oss" / filename for filename in DEFAULT_FILENAMES
) + (pathlib.Path(floss.qs.db.__file__).parent / "data" / "crt" / "msvc_v143.jsonl.gz",)
DEFAULT_PATHS = tuple("data/oss/" + f for f in DEFAULT_FILENAMES) + ("data/crt/msvc_v143.jsonl.gz",)


def get_default_databases() -> Sequence[OpenSourceStringDatabase]:
return [OpenSourceStringDatabase.from_file(path) for path in DEFAULT_PATHS]
# To use from_file
# return [OpenSourceStringDatabase.from_file(pathlib.Path(floss.qs.db.__file__).parent / path) for path in DEFAULT_PATHS]

return [OpenSourceStringDatabase.from_pkgutil("floss.qs.db", path) for path in DEFAULT_PATHS]
34 changes: 22 additions & 12 deletions floss/qs/db/winapi.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import gzip
import pathlib
import pkgutil
from typing import Set, Sequence
from dataclasses import dataclass

Expand All @@ -15,25 +16,34 @@ def __len__(self) -> int:
return len(self.dll_names) + len(self.api_names)

@classmethod
def from_dir(cls, path: pathlib.Path) -> "WindowsApiStringDatabase":
dll_names: Set[str] = set()
api_names: Set[str] = set()

for line in gzip.decompress((path / "dlls.txt.gz").read_bytes()).decode("utf-8").splitlines():
def load_database(cls, buf: bytes) -> Set[str]:
names: Set[str] = set()
for line in gzip.decompress(buf).decode("utf-8").splitlines():
if not line:
continue
dll_names.add(line)
names.add(line)

for line in gzip.decompress((path / "apis.txt.gz").read_bytes()).decode("utf-8").splitlines():
if not line:
continue
api_names.add(line)
return names

@classmethod
def from_dir(cls, path: pathlib.Path) -> "WindowsApiStringDatabase":
dll_names = cls.load_database((path / "dlls.txt.gz").read_bytes())
api_names = cls.load_database((path / "apis.txt.gz").read_bytes())

return cls(dll_names=dll_names, api_names=api_names)

@classmethod
def from_pkgutil(cls, package: str, path: str) -> "WindowsApiStringDatabase":
dll_names = cls.load_database(pkgutil.get_data(package, (path + "dlls.txt.gz")))
api_names = cls.load_database(pkgutil.get_data(package, (path + "apis.txt.gz")))
return cls(dll_names=dll_names, api_names=api_names)

DEFAULT_PATHS = (pathlib.Path(floss.qs.db.__file__).parent / "data" / "winapi",)

DEFAULT_PATHS = ("data/winapi/",)


def get_default_databases() -> Sequence[WindowsApiStringDatabase]:
return [WindowsApiStringDatabase.from_dir(path) for path in DEFAULT_PATHS]
# To use from_file
# return [WindowsApiStringDatabase.from_dir(pathlib.Path(floss.qs.db.__file__).parent / path) for path in DEFAULT_PATHS]

return [WindowsApiStringDatabase.from_pkgutil('floss.qs.db', path) for path in DEFAULT_PATHS]
4 changes: 2 additions & 2 deletions floss/qs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,8 @@ def make_tagger(db, queryfn) -> Sequence[Tag]:
raise ValueError(f"unexpected database type: {type(db)}")

# supplement code analysis with a database of junk code strings
junk_db = StringGlobalPrevalenceDatabase.from_file(
pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "junk-code.jsonl.gz"
junk_db = StringGlobalPrevalenceDatabase.from_pkgutil(
"floss.qs.db", "data/gp/junk-code.jsonl.gz"
)
ret.append(make_tagger(junk_db, query_code_string_database))

Expand Down
Loading