diff --git a/floss/qs/db/expert.py b/floss/qs/db/expert.py index 738720f98..1fb9648b4 100644 --- a/floss/qs/db/expert.py +++ b/floss/qs/db/expert.py @@ -1,5 +1,6 @@ import re import pathlib +import pkgutil from typing import Set, Dict, List, Tuple, Literal, Sequence from dataclasses import dataclass @@ -51,13 +52,13 @@ def query(self, s: str) -> Set[str]: return ret @classmethod - def from_file(cls, path: pathlib.Path) -> "ExpertStringDatabase": + def load_database(cls, buf: bytes) -> "ExpertStringDatabase": string_rules: Dict[str, ExpertRule] = {} substring_rules: List[ExpertRule] = [] regex_rules: List[Tuple[ExpertRule, re.Pattern]] = [] decoder = msgspec.json.Decoder(type=ExpertRule) - buf = path.read_bytes() + for line in buf.split(b"\n"): if not line: continue @@ -81,9 +82,20 @@ def from_file(cls, path: pathlib.Path) -> "ExpertStringDatabase": regex_rules=regex_rules, ) + @classmethod + def from_file(cls, path: pathlib.Path) -> "ExpertStringDatabase": + return cls.load_database(path.read_bytes()) + + @classmethod + def from_pkgutil(cls, package: str, path: str) -> "ExpertStringDatabase": + return cls.load_database(pkgutil.get_data(package, path)) + -DEFAULT_PATHS = (pathlib.Path(floss.qs.db.__file__).parent / "data" / "expert" / "capa.jsonl",) +DEFAULT_PATHS = ( "data/expert/capa.jsonl",) def get_default_databases() -> Sequence[ExpertStringDatabase]: - return [ExpertStringDatabase.from_file(path) for path in DEFAULT_PATHS] + # To use from_file + # return [ExpertStringDatabase.from_file(pathlib.Path(floss.qs.db.__file__).parent / path) for path in DEFAULT_PATHS] + + return [ExpertStringDatabase.from_pkgutil("floss.qs.db", path) for path in DEFAULT_PATHS] diff --git a/floss/qs/db/gp.py b/floss/qs/db/gp.py index 0b2f503b6..92498893d 100644 --- a/floss/qs/db/gp.py +++ b/floss/qs/db/gp.py @@ -1,6 +1,7 @@ import gzip import hashlib import pathlib +import pkgutil import datetime from typing import Set, Dict, List, Literal, Optional, Sequence from collections import defaultdict @@ -57,13 +58,13 @@ def new_db(cls, note: Optional[str] = None): ) @classmethod - def from_file(cls, path: pathlib.Path, compress: bool = True) -> "StringGlobalPrevalenceDatabase": + def load_database(cls, buf: bytes, compress: bool = True) -> "StringGlobalPrevalenceDatabase": metadata_by_string: Dict[str, List[StringGlobalPrevalence]] = defaultdict(list) if compress: - lines = gzip.decompress(path.read_bytes()).split(b"\n") + lines = gzip.decompress(buf).split(b"\n") else: - lines = path.read_bytes().split(b"\n") + lines = buf.split(b"\n") decoder = msgspec.json.Decoder(type=StringGlobalPrevalence) for line in lines[1:]: @@ -77,6 +78,14 @@ def from_file(cls, path: pathlib.Path, compress: bool = True) -> "StringGlobalPr meta=msgspec.json.Decoder(type=Metadata).decode(lines[0]), metadata_by_string=metadata_by_string, ) + + @classmethod + def from_file(cls, path: pathlib.Path, compress: bool = True) -> "StringGlobalPrevalenceDatabase": + return cls.load_database(path.read_bytes(), compress) + + @classmethod + def from_pkgutil(cls, package: str, path: str, compress: bool = True) -> "StringGlobalPrevalenceDatabase": + return cls.load_database(pkgutil.get_data(package, path), compress) def to_file(self, outfile: str, compress: bool = True): if compress: @@ -112,32 +121,46 @@ def __contains__(self, other: bytes | str) -> bool: raise ValueError("other must be bytes or str") @classmethod - def from_file(cls, path: pathlib.Path) -> "StringHashDatabase": + def load_database(cls, buf: bytes) -> "StringHashDatabase": string_hashes: Set[bytes] = set() - buf = path.read_bytes() - for i in range(0, len(buf), 8): string_hashes.add(buf[i : i + 8]) return cls( string_hashes=string_hashes, ) + + @classmethod + def from_file(cls, path: pathlib.Path) -> "StringHashDatabase": + return cls.load_database(path.read_bytes()) + + @classmethod + def from_pkgutil(cls, package: str, path: str) -> "StringHashDatabase": + return cls.load_database(pkgutil.get_data(package, path)) DEFAULT_PATHS = ( - pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "gp.jsonl.gz", - pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "cwindb-native.jsonl.gz", - pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "cwindb-dotnet.jsonl.gz", - pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "xaa-hashes.bin", - pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "yaa-hashes.bin", + "data/gp/gp.jsonl.gz", + "data/gp/cwindb-native.jsonl.gz", + "data/gp/cwindb-dotnet.jsonl.gz", + "data/gp/xaa-hashes.bin", + "data/gp/yaa-hashes.bin", ) def get_default_databases() -> Sequence[StringGlobalPrevalenceDatabase | StringHashDatabase]: + # To use from_file + # return [ + # StringGlobalPrevalenceDatabase.from_file(pathlib.Path(floss.qs.db.__file__).parent / path) + # if path.endswith(".jsonl.gz") + # else StringHashDatabase.from_file(pathlib.Path(floss.qs.db.__file__).parent / path) + # for path in DEFAULT_PATHS + # ] + return [ - StringGlobalPrevalenceDatabase.from_file(path) - if path.name.endswith(".jsonl.gz") - else StringHashDatabase.from_file(path) - for path in DEFAULT_PATHS + StringGlobalPrevalenceDatabase.from_pkgutil("floss.qs.db", path) + if path.endswith(".jsonl.gz") + else StringHashDatabase.from_pkgutil("floss.qs.db", path) + for path in DEFAULT_PATHS ] diff --git a/floss/qs/db/oss.py b/floss/qs/db/oss.py index 1d0d4c72d..cfc6079e9 100644 --- a/floss/qs/db/oss.py +++ b/floss/qs/db/oss.py @@ -1,5 +1,6 @@ import gzip import pathlib +import pkgutil from typing import Dict, Sequence from dataclasses import dataclass @@ -25,10 +26,10 @@ def __len__(self) -> int: return len(self.metadata_by_string) @classmethod - def from_file(cls, path: pathlib.Path) -> "OpenSourceStringDatabase": + def load_database(cls, buf: bytes) -> "OpenSourceStringDatabase": metadata_by_string: Dict[str, OpenSourceString] = {} decoder = msgspec.json.Decoder(type=OpenSourceString) - for line in gzip.decompress(path.read_bytes()).split(b"\n"): + for line in gzip.decompress(buf).split(b"\n"): if not line: continue s = decoder.decode(line) @@ -36,6 +37,14 @@ def from_file(cls, path: pathlib.Path) -> "OpenSourceStringDatabase": return cls(metadata_by_string=metadata_by_string) + @classmethod + def from_file(cls, path: pathlib.Path) -> "OpenSourceStringDatabase": + return cls.load_database(path.read_bytes()) + + @classmethod + def from_pkgutil(cls, package: str, path: str) -> "OpenSourceStringDatabase": + return cls.load_database(pkgutil.get_data(package, path)) + DEFAULT_FILENAMES = ( "brotli.jsonl.gz", @@ -57,10 +66,11 @@ def from_file(cls, path: pathlib.Path) -> "OpenSourceStringDatabase": "zlib.jsonl.gz", ) -DEFAULT_PATHS = tuple( - pathlib.Path(floss.qs.db.__file__).parent / "data" / "oss" / filename for filename in DEFAULT_FILENAMES -) + (pathlib.Path(floss.qs.db.__file__).parent / "data" / "crt" / "msvc_v143.jsonl.gz",) +DEFAULT_PATHS = tuple("data/oss/" + f for f in DEFAULT_FILENAMES) + ("data/crt/msvc_v143.jsonl.gz",) def get_default_databases() -> Sequence[OpenSourceStringDatabase]: - return [OpenSourceStringDatabase.from_file(path) for path in DEFAULT_PATHS] + # To use from_file + # return [OpenSourceStringDatabase.from_file(pathlib.Path(floss.qs.db.__file__).parent / path) for path in DEFAULT_PATHS] + + return [OpenSourceStringDatabase.from_pkgutil("floss.qs.db", path) for path in DEFAULT_PATHS] diff --git a/floss/qs/db/winapi.py b/floss/qs/db/winapi.py index 2f176b630..36e3755d3 100644 --- a/floss/qs/db/winapi.py +++ b/floss/qs/db/winapi.py @@ -1,5 +1,6 @@ import gzip import pathlib +import pkgutil from typing import Set, Sequence from dataclasses import dataclass @@ -15,25 +16,34 @@ def __len__(self) -> int: return len(self.dll_names) + len(self.api_names) @classmethod - def from_dir(cls, path: pathlib.Path) -> "WindowsApiStringDatabase": - dll_names: Set[str] = set() - api_names: Set[str] = set() - - for line in gzip.decompress((path / "dlls.txt.gz").read_bytes()).decode("utf-8").splitlines(): + def load_database(cls, buf: bytes) -> Set[str]: + names: Set[str] = set() + for line in gzip.decompress(buf).decode("utf-8").splitlines(): if not line: continue - dll_names.add(line) + names.add(line) - for line in gzip.decompress((path / "apis.txt.gz").read_bytes()).decode("utf-8").splitlines(): - if not line: - continue - api_names.add(line) + return names + + @classmethod + def from_dir(cls, path: pathlib.Path) -> "WindowsApiStringDatabase": + dll_names = cls.load_database((path / "dlls.txt.gz").read_bytes()) + api_names = cls.load_database((path / "apis.txt.gz").read_bytes()) return cls(dll_names=dll_names, api_names=api_names) + @classmethod + def from_pkgutil(cls, package: str, path: str) -> "WindowsApiStringDatabase": + dll_names = cls.load_database(pkgutil.get_data(package, (path + "dlls.txt.gz"))) + api_names = cls.load_database(pkgutil.get_data(package, (path + "apis.txt.gz"))) + return cls(dll_names=dll_names, api_names=api_names) -DEFAULT_PATHS = (pathlib.Path(floss.qs.db.__file__).parent / "data" / "winapi",) + +DEFAULT_PATHS = ("data/winapi/",) def get_default_databases() -> Sequence[WindowsApiStringDatabase]: - return [WindowsApiStringDatabase.from_dir(path) for path in DEFAULT_PATHS] + # To use from_file + # return [WindowsApiStringDatabase.from_dir(pathlib.Path(floss.qs.db.__file__).parent / path) for path in DEFAULT_PATHS] + + return [WindowsApiStringDatabase.from_pkgutil('floss.qs.db', path) for path in DEFAULT_PATHS] diff --git a/floss/qs/main.py b/floss/qs/main.py index 8cc2a542c..78d43e647 100644 --- a/floss/qs/main.py +++ b/floss/qs/main.py @@ -475,8 +475,8 @@ def make_tagger(db, queryfn) -> Sequence[Tag]: raise ValueError(f"unexpected database type: {type(db)}") # supplement code analysis with a database of junk code strings - junk_db = StringGlobalPrevalenceDatabase.from_file( - pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "junk-code.jsonl.gz" + junk_db = StringGlobalPrevalenceDatabase.from_pkgutil( + "floss.qs.db", "data/gp/junk-code.jsonl.gz" ) ret.append(make_tagger(junk_db, query_code_string_database))