Skip to content

Commit

Permalink
switch to a streaming parser to further minimise mem footprint during…
Browse files Browse the repository at this point in the history
… a reload
  • Loading branch information
3ll3d00d committed Sep 9, 2023
1 parent 34ed7d7 commit 34f4fc5
Show file tree
Hide file tree
Showing 4 changed files with 477 additions and 378 deletions.
16 changes: 10 additions & 6 deletions ezbeq/catalogue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import datetime, timedelta
from typing import Optional, List, Callable, Union, Set

import ijson
import requests

from ezbeq.apis.ws import WsServer
Expand Down Expand Up @@ -271,11 +272,12 @@ def meta_msg(self) -> str:


class Catalogues:
def __init__(self, config_path: str, catalogue_url: str, ws: WsServer, refresh_seconds: float):
def __init__(self, config_path: str, catalogue_url: str, ws: WsServer, refresh_seconds: float, chunk_size: int):
self.__catalogue_url = catalogue_url
self.__version_file = os.path.join(config_path, 'version.txt')
self.__catalogue_file = os.path.join(config_path, 'database.json')
self.__db = os.path.join(config_path, 'ezbeq.db')
self.__chunk_size = chunk_size
logger.info(f'Using database at {self.__db}')
self.__ensure_db()
self.__refresh_interval = refresh_seconds
Expand All @@ -298,7 +300,7 @@ def __send_chunked_catalogue(self, sender: Callable[[str], None]):
with db_ops(self.__db) as cur:
res = cur.execute(count_sql).fetchone()
if res:
vals = {'count': res[0], 'limit': 500, 'offset': 0, 'start': time.time()}
vals = {'count': res[0], 'limit': self.__chunk_size, 'offset': 0, 'start': time.time()}
from twisted.internet import reactor
reactor.callInThread(lambda: self.__load_next_chunk(sender, catalogue.version, **vals))
else:
Expand Down Expand Up @@ -405,12 +407,13 @@ def __insert_catalogue(self, version: str) -> Optional[Catalogue]:
languages = set()
years = set()
extra_vals: tuple = (version, now)
with open(self.__catalogue_file, 'r') as infile:
with open(self.__catalogue_file, 'rb') as infile:
with db_ops(self.__db) as cur:
values = []
count = 0
insert_sql = f"INSERT INTO catalogue_entry({FIELDS_STR},version,loaded_at) VALUES({', '.join(['?'] * (len(FIELDS) + 2))})"
for idx, c in enumerate(json.load(infile)):
start = time.time()
for idx, c in enumerate(ijson.items(infile, 'item', use_float=True)):
count = count + 1
entry = CatalogueEntry(f"{version}_{idx}", c)
for v in entry.audio_types:
Expand All @@ -431,7 +434,7 @@ def __insert_catalogue(self, version: str) -> Optional[Catalogue]:
if values:
cur.executemany(insert_sql, values)
cur.connection.commit()
logger.info(f'Inserted {count} entries into {self.__db} for version {version}')
logger.info(f'Inserted {count} entries into {self.__db} for version {version} in {to_millis(start, time.time())}ms')

def insert_if(meta_type: str, vals: set):
if vals:
Expand Down Expand Up @@ -624,7 +627,8 @@ def __init__(self, config: Config, ws: WsServer):
self.__catalogues: Catalogues = Catalogues(config.config_path,
config.beqcatalogue_url,
ws,
config.catalogue_refresh_interval)
config.catalogue_refresh_interval,
config.chunk_size)

def find(self, entry_id: str, match_on_idx: Optional[bool] = None, as_dict: bool = False) -> Optional[
Union[CatalogueEntry, dict]]:
Expand Down
4 changes: 4 additions & 0 deletions ezbeq/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ def ensure_dir_exists(d) -> None:
def enable_metrics(self) -> bool:
return self.__enable_metrics

@property
def chunk_size(self) -> int:
return self.config.get('chunk_size', 1000)

@property
def beqcatalogue_url(self) -> str:
return self.__beqcatalogue_url
Expand Down
Loading

0 comments on commit 34f4fc5

Please sign in to comment.