From cdca4bc29981e0b14b61856626dbc9ff77d76a79 Mon Sep 17 00:00:00 2001 From: Oscar Batori Date: Sat, 11 Apr 2020 22:07:09 -0700 Subject: [PATCH 1/2] WIP --- doltpy/core/dolt.py | 274 ++++++++++++++++----- doltpy/core/tests/dolt_testing_fixtures.py | 9 +- doltpy/core/tests/test_dolt.py | 12 +- 3 files changed, 225 insertions(+), 70 deletions(-) diff --git a/doltpy/core/dolt.py b/doltpy/core/dolt.py index 608c456..d1bd2a6 100644 --- a/doltpy/core/dolt.py +++ b/doltpy/core/dolt.py @@ -19,6 +19,9 @@ class DoltException(Exception): + """ + A class representing a Dolt exception. + """ def __init__(self, exec_args, stdout, stderr, exitcode): self.exec_args = exec_args self.stdout = stdout @@ -26,9 +29,39 @@ def __init__(self, exec_args, stdout, stderr, exitcode): self.exitcode = exitcode +def init_new_repo(repo_dir: str) -> 'Dolt': + """ + Creates a new repository in the directory specified, creating the directory if `create_dir` is passed, and returns + a `Dolt` object representing the newly created repo. + :return: + """ + logger.info("Initializing a new repository in {}".format(repo_dir)) + args = ["dolt", "init"] + + _execute(args=args, cwd=repo_dir) + + return Dolt(repo_dir) + + +# TODO we need to sort out where stuff gets cloned and ensure that clone actually takes an argument correctly. The +# function should return a Dolt object tied to the repo that was just cloned +def clone_repo(repo_url: str): + """ + Clones a repository into the repository specified, currently only supports DoltHub as a remote. + :return: + """ + args = ["dolt", "clone", repo_url, './'] + + _execute(args=args, cwd='.') + + class DoltCommitSummary: - def __init__(self, hash: str, ts: datetime, author: str): - self.hash = hash + """ + Represents metadata about a commit, including a ref, timestamp, and author, to make it easier to sort and present + to the user. + """ + def __init__(self, ref: str, ts: datetime, author: str): + self.hash = ref self.ts = ts self.author = author @@ -36,6 +69,8 @@ def __str__(self): return '{}: {} @ {}'.format(self.hash, self.author, self.ts) +# For now Doltpy works by dispatching into the shell. We will change this in an upcoming release, but these functions +# wrap calls to Popen, the main Python API for launching a subprocess. def _execute(args, cwd): proc = Popen(args=args, cwd=cwd, stdout=PIPE, stderr=PIPE) out, err = proc.communicate() @@ -53,75 +88,108 @@ def _execute_restart_serve_if_needed(dlt, args): was_serving = True dlt.stop_server() - _execute(args=args, cwd=dlt.repo_dir) + _execute(args=args, cwd=dlt.repo_dir()) if was_serving: dlt.start_server() class Dolt(object): + """ + This the top level object for interacting with a Dolt database. A Dolt database lives at a path in the file system + and there should be a 1:1 mapping between Dolt objects and repositories you want to interact with. There two pieces + of state information this object stores are the repo path and a connection to the MySQL Server (if it is running). + + In the docstrings for the various functions "this repository" or "this database" refers to Dolt repo that exists at + the directory returned by `self.repo_dir()`. Most of the functions on this object will throw a + + Note, it is not reccomended to recycle objects, as this could lead to peculiar results. For example if you have Dolt + databases in `~/db1` and `~/db2` the following will be strange: + ``` + >>> from doltpy.core import Dolt + >>> db = Dolt('~/db1') + >>> db.start_server() + >>> db._repo_dir = '~/db2' + ``` + In this case calls to db that use the SQL server, for example `pandas_read_sql(...)` will reference the Dolt repo + in `~/db1`, and calls that use the CLI will reference `~/db2`. + + Instead simply create a separate objects for each Dolt database to avoid this confusion. + """ def __init__(self, repo_dir): - self.repo_dir = repo_dir - self.dir_exists = os.path.exists(repo_dir) + """ + + :param repo_dir: + """ + self._repo_dir = repo_dir self.server = None self.cnx = None - def config(self, is_global, user_name, user_email): + def repo_dir(self) -> str: + return self._repo_dir + + def config(self, is_global: bool, user_name: str, user_email: str): + """ + Exposes a way to set the user name and email to be associated with commit messages. Can be either global, or + local to this repo. + :param is_global: + :param user_name: + :param user_email: + :return: + """ args = ["dolt", "config", "add"] if is_global: args.append("--global") - elif not self.dir_exists: - raise Exception("{} does not exist. Cannot configure local options without a valid directory.".format(self.repo_dir)) name_args = args email_args = args.copy() - name_args.append(["user.name", user_name]) - email_args.append(["user.email", user_email]) + name_args.extend(["user.name", user_name]) + email_args.extend(["user.email", user_email]) if is_global: _execute(args=name_args, cwd=None) _execute(args=email_args, cwd=None) else: - _execute(args=name_args, cwd=self.repo_dir) - _execute(args=email_args, cwd=self.repo_dir) - - def clone(self, repo_url: str): - if self.dir_exists: - raise Exception(self.repo_dir + " .") - - os.makedirs(self.repo_dir) - self.dir_exists = True - - args = ["dolt", "clone", repo_url, self.repo_dir] - - _execute(args=args, cwd=self.repo_dir) - - def init_new_repo(self): - args = ["dolt", "init"] - - _execute(args=args, cwd=self.repo_dir) + _execute(args=name_args, cwd=self.repo_dir()) + _execute(args=email_args, cwd=self.repo_dir()) - def create_branch(self, branch_name, commit_ref=None): + def create_branch(self, branch_name: str, commit_ref: str = None): + """ + Creates a new branch in this repo with the name branch_name. If commit_ref is None the ref is the HEAD of the + currently checked out branch. + :param branch_name: + :param commit_ref: + :return: + """ args = ["dolt", "branch", branch_name] if commit_ref is not None: args.append(commit_ref) - _execute(args=args, cwd=self.repo_dir) + _execute(args=args, cwd=self.repo_dir()) - def checkout(self, branch_name): + def checkout(self, branch_name: str): + """ + Check out the repo in `self.repo_dir()` at the specified branch. + :param branch_name: the branch to checkout at + :return: + """ assert branch_name in self.get_branch_list(), 'Cannot checkout of non-existent branch {}'.format(branch_name) args = ["dolt", "checkout", branch_name] _execute_restart_serve_if_needed(self, args) def start_server(self): + """ + Start a MySQL Server instance for the Dolt repo in `self.repo_dir()` running on port 3306. Note this function is + very alpha, as it doesn't yet support specifying the port. + """ if self.server is not None: raise Exception("already running") args = ['dolt', 'sql-server', '-t', '0'] - proc = Popen(args=args, cwd=self.repo_dir, stdout=PIPE, stderr=STDOUT) + proc = Popen(args=args, cwd=self.repo_dir(), stdout=PIPE, stderr=STDOUT) # make sure the thread has started, this is a bit hacky @retry(exceptions=Exception, backoff=2) @@ -133,10 +201,20 @@ def get_connection(): self.cnx = cnx def repo_is_clean(self): - res = _execute(['dolt', 'status'], self.repo_dir) + """ + Returns true if the repo is clean, which is to say the working set has no changes, and false otherwise. This + is directly analogous to the Git concept of "clean". + :return: + """ + res = _execute(['dolt', 'status'], self.repo_dir()) return 'clean' in str(res) def stop_server(self): + """ + Stop the MySQL Server instance running on port 3306, returns an error if the server is not running at on that + port. + :return: + """ if self.server is None: raise Exception("never started.") @@ -146,7 +224,12 @@ def stop_server(self): self.server.kill() self.server = None - def query_server(self, query): + def query_server(self, query: str): + """ + Execute the specified query against the MySQL Server instance running on port 3306. + :param query: the query to execute + :return: + """ if self.server is None: raise Exception("never started.") @@ -156,18 +239,37 @@ def query_server(self, query): return cursor def execute_sql_stmt(self, stmt: str): + """ + Execute the specified query via the `dolt sql -q` command line interface. This function will be deprecated in + an upcoming release as the MySQL Server supports all statements that can be executed via the client. + :param stmt: the + :return: + """ logger.info('Executing the following SQL statement via CLI:\n{}\n'.format(stmt)) - _execute(['dolt', 'sql', '-q', stmt], cwd=self.repo_dir) + _execute(['dolt', 'sql', '-q', stmt], cwd=self.repo_dir()) - def pandas_read_sql(self, query): + def pandas_read_sql(self, query: str): + """ + Execute a SQL statement against the MySQL Server running on port 3306 and return the result as a Pandas + `DataFrame` object. This is a higher level version of `query_server` where the object returned i + :param query: + :return: + """ if self.server is None: raise Exception("never started.") return pd.read_sql(query, con=self.cnx) def read_table(self, table_name: str, delimiter: str = ',') -> pd.DataFrame: + """ + Reads the contents of a table and returns it as a Pandas `DataFrame`. Under the hood this uses export and the + filesystem, in short order we are likley to replace this with use of the MySQL Server. + :param table_name: + :param delimiter: + :return: + """ fp = tempfile.NamedTemporaryFile(suffix='.csv') - _execute(['dolt', 'table', 'export', table_name, fp.name, '-f'], self.repo_dir) + _execute(['dolt', 'table', 'export', table_name, fp.name, '-f'], self.repo_dir()) result = pd.read_csv(fp.name, delimiter=delimiter) return result @@ -196,9 +298,11 @@ def bulk_import(self, table_name: str, data: io.StringIO, primary_keys: List[str], - import_mode: str): + import_mode: str) -> None: """ - This takes a file-handle like object and produces a + This takes a file like object representing a CSV and imports it to the table specified. Note that you must + specify the primary key, and the import mode. The import mode is one of the keys of IMPORT_MODES_TO_FLAGS. + Choosing the wrong import mode will throw an error, for example `CREATE` on an existing table. :param table_name: :param data: :param primary_keys: @@ -215,26 +319,43 @@ def _import_helper(self, table_name: str, write_import_file: Callable[[str], None], primary_keys: List[str], - import_mode: str): + import_mode: str) -> None: import_modes = IMPORT_MODES_TO_FLAGS.keys() assert import_mode in import_modes, 'update_mode must be one of: {}'.format(import_modes) import_flags = IMPORT_MODES_TO_FLAGS[import_mode] logger.info('Importing to table {} in dolt directory located in {}, import mode {}'.format(table_name, - self.repo_dir, + self.repo_dir(), import_mode)) fp = tempfile.NamedTemporaryFile(suffix='.csv') write_import_file(fp.name) args = ['dolt', 'table', 'import', table_name, '--pk={}'.format(','.join(primary_keys))] + import_flags - _execute(args + [fp.name], self.repo_dir) + _execute(args + [fp.name], self.repo_dir()) - def add_table_to_next_commit(self, table_name): - _execute_restart_serve_if_needed(self, ["dolt", "add", table_name]) + def add_table_to_next_commit(self, *table_names: str): + """ + Stage the tables specified in table_names to be committed. + :param table_names: + :return: + """ + _execute_restart_serve_if_needed(self, ["dolt", "add"] + list(table_names)) def commit(self, commit_message): + """ + Create a commit from the current working set the HEAD of the checked out branch to the value of the commit hash. + :param commit_message: + :return: + """ _execute_restart_serve_if_needed(self, ["dolt", "commit", "-m", commit_message]) def push(self, remote: str, branch: str): + """ + Push to the remote specified. If either the branch or the remote do not exist then an `AssertionError` will be + thrown. + :param remote: + :param branch: + :return: + """ def _assertion_helper(name: str, required: str, existing: List[str]): assert required in existing, 'cannot push to {} that does not exist, {} not in {}'.format(name, required, @@ -244,10 +365,15 @@ def _assertion_helper(name: str, required: str, existing: List[str]): _execute_restart_serve_if_needed(self, ['dolt', 'push', remote, branch]) def pull(self, remote: str = 'origin'): - _execute(['dolt', 'pull', remote], self.repo_dir) + _execute(['dolt', 'pull', remote], self.repo_dir()) def get_commits(self) -> List[DoltCommitSummary]: - output = _execute(['dolt', 'log'], self.repo_dir).split('\n') + """ + Returns a list of `DoltCommitSummary`, representing the list of commits on the currently checked out branch, + ordered by the timestamp associated with the commit. + :return: + """ + output = _execute(['dolt', 'log'], self.repo_dir()).split('\n') current_commit, author, date = None, None, None for line in output: if line.startswith('commit'): @@ -264,10 +390,16 @@ def get_commits(self) -> List[DoltCommitSummary]: pass def get_dirty_tables(self) -> Tuple[Mapping[str, bool], Mapping[str, bool]]: + """ + Returns a tuple of maps, the first element is a map keyed on the names of newly created tables, and a second + is keyed on modified tables, with the values being boolean flags to indicate whether changes have been stage for + commit. + :return: + """ new_tables, changes = {}, {} if not self.repo_is_clean(): - output = [line.lstrip() for line in _execute(['dolt', 'status'], self.repo_dir).split('\n')] + output = [line.lstrip() for line in _execute(['dolt', 'status'], self.repo_dir()).split('\n')] staged = False for line in output: if line.startswith('Changes to be committed'): @@ -285,37 +417,61 @@ def get_dirty_tables(self) -> Tuple[Mapping[str, bool], Mapping[str, bool]]: return new_tables, changes - def clean_local(self): + def clean_local(self) -> None: + """ + Wipes out the un-commited tables in the working set, useful for scripting "all or nothing" data imports. + :return: + """ new_tables, changes = self.get_dirty_tables() for table in [table for table, is_staged in list(new_tables.items()) + list(changes.items()) if is_staged]: logger.info('Resetting table {}'.format(table)) - _execute(['dolt', 'reset', table], self.repo_dir) + _execute(['dolt', 'reset', table], self.repo_dir()) for table in new_tables.keys(): logger.info('Removing newly created table {}'.format(table)) - _execute(['dolt', 'table', 'rm', table], self.repo_dir) + _execute(['dolt', 'table', 'rm', table], self.repo_dir()) for table in changes.keys(): logger.info('Discarding local changes to table {}'.format(table)) - _execute(['dolt', 'checkout', table], self.repo_dir) + _execute(['dolt', 'checkout', table], self.repo_dir()) assert self.repo_is_clean(), 'Something went wrong, repo is not clean' def get_existing_tables(self) -> List[str]: - return [line.lstrip() for line in _execute(['dolt', 'ls'], self.repo_dir).split('\n')[1:] if line] + """ + Get the list of tables in the the working set. + :return: + """ + return [line.lstrip() for line in _execute(['dolt', 'ls'], self.repo_dir()).split('\n')[1:] if line] - def get_last_commit_time(self): + def get_last_commit_time(self) -> datetime: + """ + Returns the time stamp associated with the ref corresponding to HEAD on the currently checked out branch. + :return: + """ return max([commit.ts for commit in self.get_commits()]) - def get_branch_list(self): + def get_branch_list(self) -> List[str]: + """ + Returns a list of branches in the repository in directory returned by `self.repo_dir()` + :return: + """ return [line.replace('*', '').lstrip().rstrip() - for line in _execute(['dolt', 'branch'], self.repo_dir).split('\n') if line] + for line in _execute(['dolt', 'branch'], self.repo_dir()).split('\n') if line] - def get_remote_list(self): - return [line.rstrip() for line in _execute(['dolt', 'remote'], self.repo_dir).split('\n') if line] + def get_remote_list(self) -> List[str]: + """ + Returns a list of remotes that have been added to the repository corresponding to self. + :return: list of remotes + """ + return [line.rstrip() for line in _execute(['dolt', 'remote'], self._repo_dir).split('\n') if line] - def get_current_branch(self): - for line in _execute(['dolt', 'branch'], self.repo_dir).split('\n'): + def get_current_branch(self) -> str: + """ + Returns the currently checked out branch of the Dolt repository corresping to self. + :return: the checked out branch + """ + for line in _execute(['dolt', 'branch'], self._repo_dir).split('\n'): if line.lstrip().startswith('*'): return line.replace('*', '').lstrip().rstrip() diff --git a/doltpy/core/tests/dolt_testing_fixtures.py b/doltpy/core/tests/dolt_testing_fixtures.py index 6644b88..9d08c58 100644 --- a/doltpy/core/tests/dolt_testing_fixtures.py +++ b/doltpy/core/tests/dolt_testing_fixtures.py @@ -1,5 +1,5 @@ import os -from doltpy.core.dolt import Dolt, _execute +from doltpy.core.dolt import init_new_repo, Dolt, _execute import pytest import shutil from typing import Tuple @@ -13,10 +13,9 @@ def get_repo_path_tmp_path(path: str) -> Tuple[str, str]: def init_repo(tmp_path) -> Dolt: repo_path, repo_data_dir = get_repo_path_tmp_path(tmp_path) assert not os.path.exists(repo_data_dir) - repo = Dolt(repo_path) - repo.init_new_repo() - _execute(['rm', 'LICENSE.md'], repo.repo_dir) - _execute(['rm', 'README.md'], repo.repo_dir) + repo = init_new_repo(repo_path) + _execute(['rm', 'LICENSE.md'], repo.repo_dir()) + _execute(['rm', 'README.md'], repo.repo_dir()) yield repo if os.path.exists(repo_data_dir): shutil.rmtree(repo_data_dir) diff --git a/doltpy/core/tests/test_dolt.py b/doltpy/core/tests/test_dolt.py index ab403d1..124fa0e 100644 --- a/doltpy/core/tests/test_dolt.py +++ b/doltpy/core/tests/test_dolt.py @@ -1,5 +1,5 @@ import pytest -from doltpy.core.dolt import Dolt, _execute, DoltException, UPDATE +from doltpy.core.dolt import init_new_repo, Dolt, _execute, DoltException, UPDATE import shutil import pandas as pd import uuid @@ -28,7 +28,7 @@ def create_test_table(init_repo, create_test_data) -> Tuple[Dolt, str]: ''') repo.import_df('test_players', pd.read_csv(test_data_path), ['id'], UPDATE) yield repo, 'test_players' - _execute(['dolt', 'table', 'rm', 'test_players'], repo.repo_dir) + _execute(['dolt', 'table', 'rm', 'test_players'], repo.repo_dir()) @pytest.fixture @@ -42,8 +42,7 @@ def run_serve_mode(init_repo): def test_init_new_repo(tmp_path): repo_path, repo_data_dir = get_repo_path_tmp_path(tmp_path) assert not os.path.exists(repo_data_dir) - dolt = Dolt(repo_path) - dolt.init_new_repo() + init_new_repo(repo_path) assert os.path.exists(repo_data_dir) shutil.rmtree(repo_data_dir) @@ -112,6 +111,7 @@ def test_clean_local(create_test_table): assert repo.repo_is_clean() +# TODO test datetime types here def test_sql_server(create_test_table, run_serve_mode): repo, test_table = create_test_table data = repo.pandas_read_sql('SELECT * FROM {}'.format(test_table)) @@ -129,9 +129,9 @@ def test_branch_list(create_test_table): def test_remote_list(create_test_table): repo, _ = create_test_table - _execute(['dolt', 'remote', 'add', 'origin', 'blah-blah'], repo.repo_dir) + _execute(['dolt', 'remote', 'add', 'origin', 'blah-blah'], repo.repo_dir()) assert repo.get_remote_list() == ['origin'] - _execute(['dolt', 'remote', 'add', 'another-origin', 'la-la-land'], repo.repo_dir) + _execute(['dolt', 'remote', 'add', 'another-origin', 'la-la-land'], repo.repo_dir()) assert set(repo.get_remote_list()) == {'origin', 'another-origin'} From 94cfbadb418f5a6246a85af3a9cd1f14bde9ac7b Mon Sep 17 00:00:00 2001 From: Oscar Batori Date: Mon, 13 Apr 2020 14:36:50 -0700 Subject: [PATCH 2/2] Add more doc strings and fix some typos. --- doltpy/core/__init__.py | 2 +- doltpy/etl/cli_logging_config_helper.py | 4 +++ doltpy/etl/dolt_loader.py | 13 ++++++++- doltpy/etl/dolthub_loader.py | 22 ++++++++++++-- doltpy/etl/loaders.py | 39 ++++++++++++++++++++++--- setup.py | 2 +- 6 files changed, 72 insertions(+), 10 deletions(-) diff --git a/doltpy/core/__init__.py b/doltpy/core/__init__.py index a500e5e..ac0ea13 100644 --- a/doltpy/core/__init__.py +++ b/doltpy/core/__init__.py @@ -1 +1 @@ -from .dolt import Dolt, DoltException, DoltCommitSummary +from .dolt import Dolt, DoltException, DoltCommitSummary, clone_repo, init_new_repo diff --git a/doltpy/etl/cli_logging_config_helper.py b/doltpy/etl/cli_logging_config_helper.py index 531ca0c..58c30e0 100644 --- a/doltpy/etl/cli_logging_config_helper.py +++ b/doltpy/etl/cli_logging_config_helper.py @@ -2,6 +2,10 @@ def config_cli_logger(): + """ + A helper to provide a nicely configured logger across loaders. + :return: + """ logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y%m%d %H:%M:%S', level=logging.INFO) \ No newline at end of file diff --git a/doltpy/etl/dolt_loader.py b/doltpy/etl/dolt_loader.py index d992900..49a9203 100644 --- a/doltpy/etl/dolt_loader.py +++ b/doltpy/etl/dolt_loader.py @@ -1,6 +1,5 @@ import argparse from doltpy.core import Dolt -from typing import List from doltpy.etl.loaders import resolve_function, DoltLoaderBuilder import logging from doltpy.etl.cli_logging_config_helper import config_cli_logger @@ -9,6 +8,14 @@ def loader(loader_builder: DoltLoaderBuilder, dolt_dir: str, dry_run: bool): + """ + This function takes a `DoltLoaderBuilder`, repo and remote settings, and attempts to execute the loaders returned + by the builder. + :param loader_builder: + :param dolt_dir: + :param dry_run: + :return: + """ logger.info( '''Commencing load to Dolt with the following options: - dolt_dir {dolt_dir} @@ -22,6 +29,10 @@ def loader(loader_builder: DoltLoaderBuilder, dolt_dir: str, dry_run: bool): def main(): + """ + Used as a function backing shim for surfacing command line tool. + :return: + """ config_cli_logger() parser = argparse.ArgumentParser() parser.add_argument('dolt_load_module', help='Fully qualified path to a module providing a set of loaders') diff --git a/doltpy/etl/dolthub_loader.py b/doltpy/etl/dolthub_loader.py index 02f4201..4f79130 100644 --- a/doltpy/etl/dolthub_loader.py +++ b/doltpy/etl/dolthub_loader.py @@ -1,5 +1,5 @@ import argparse -from doltpy.core import Dolt +from doltpy.core import Dolt, clone_repo import os import tempfile from doltpy.etl.loaders import resolve_function, DoltLoaderBuilder @@ -16,14 +16,26 @@ def loader(loader_builder: DoltLoaderBuilder, remote_name: str, dry_run: bool, remote_url: str): + """ + This function takes a `DoltLoaderBuilder`, repo and remote settings, and attempts to execute the loaders returned + by the builder. + :param loader_builder: + :param dolt_dir: + :param clone: + :param push: + :param remote_name: + :param dry_run: + :param remote_url: + :return: + """ if clone: assert remote_url, 'If clone is True then remote must be passed' temp_dir = tempfile.mkdtemp() logger.info('Clone is set to true, so ignoring dolt_dir') - repo = Dolt(temp_dir) if clone: logger.info('Clone set to True, cloning remote {}'.format(remote_url)) - repo.clone(remote_url) + clone_repo(remote_url, temp_dir) + repo = Dolt(temp_dir) else: assert os.path.exists(os.path.join(dolt_dir, '.dolt')), 'Repo must exist locally if not cloned' repo = Dolt(dolt_dir) @@ -49,6 +61,10 @@ def loader(loader_builder: DoltLoaderBuilder, def main(): + """ + Used as a function backing shim for surfacing command line tool. + :return: + """ config_cli_logger() parser = argparse.ArgumentParser() parser.add_argument('dolt_load_module', help='Fully qualified path to a module providing a set of loaders') diff --git a/doltpy/etl/loaders.py b/doltpy/etl/loaders.py index 6c702e1..40f6ed8 100644 --- a/doltpy/etl/loaders.py +++ b/doltpy/etl/loaders.py @@ -1,6 +1,6 @@ from typing import Callable, List import io -from doltpy.core.dolt import UPDATE, Dolt, DoltException +from doltpy.core.dolt import UPDATE, Dolt import pandas as pd import hashlib import importlib @@ -17,7 +17,9 @@ def resolve_function(module_path: str): """ - Takes a string of the form you.module.member_containing_loaders and returns a list of loaders. + Takes a string of the form you.module.member_containing_loaders and returns a list of loaders. It exists to allow + commands to be called with arguments that are strings that can be resolved to functions. This is used when + specifying function parameters via the command line. :param module_path: :return: """ @@ -48,6 +50,13 @@ def resolve_branch(branch: str, module_generator_path: str, default: str): def insert_unique_key(df: pd.DataFrame) -> pd.DataFrame: + """ + This function takes Pandas `DataFrame` and inserts a unique hash to each row created from the row itself, along + with a count of how many rows produce the same hash. The idea is to provide some rudimentary tools for writing data + with unique keys. + :param df: + :return: + """ assert 'hash_id' not in df.columns and 'count' not in df.columns, 'Require hash_id and count not in df' ids = df.apply(lambda r: hashlib.md5(','.join([str(el) for el in r]).encode('utf-8')).hexdigest(), axis=1) with_id = df.assign(hash_id=ids).set_index('hash_id') @@ -82,7 +91,10 @@ def get_bulk_table_writer(table: str, import_mode: str = None, transformers: List[FileTransformer] = None) -> DoltTableWriter: """ - Returns a loader function that writes a file-like object to Dolt, the file must be a valid CSV file. + Returns a function that takes a Dolt repository object and writes the contents of the file like object returned by + the function parameter `get_data` to the table specified using the primary keys passed. Optionally toggle the import + mode and apply a list of transformers to do some data cleaning operations. For example, we might apply a transformer + that converts some date strings to proper datetime objects. :param table: :param get_data: :param pk_cols: @@ -105,7 +117,10 @@ def get_df_table_writer(table: str, import_mode: str = None, transformers: List[DataframeTransformer] = None) -> DoltTableWriter: """ - Returns a loader that writes the `pd.DataFrame` produced by get_data to Dolt. + Returns a function that takes a Dolt repository object and writes the Pandas DataFrame returned by the function + parameter `get_data` to the table specified using the primary keys passed. Optionally toggle the import mode and + apply a list of transformers to do some data cleaning operations. For example, we might apply a transformer that + converts some date strings to proper datetime objects. :param table: :param get_data: :param pk_cols: @@ -127,6 +142,16 @@ def get_table_transfomer(get_data: Callable[[Dolt], pd.DataFrame], target_pk_cols: List[str], transformer: DataframeTransformer, import_mode: str = UPDATE) -> DoltTableWriter: + """ + A version of get_df_table writer where the input is a Dolt repository. This is used for transforming raw data into + derived tables. + :param get_data: + :param target_table: + :param target_pk_cols: + :param transformer: + :param import_mode: + :return: + """ def inner(repo: Dolt): input_data = get_data(repo) transformed_data = transformer(input_data) @@ -192,6 +217,12 @@ def inner(repo: Dolt): def get_branch_creator(new_branch_name: str, refspec: str = None): + """ + Returns a function that creates a branch at the specified refspec. + :param new_branch_name: + :param refspec: + :return: + """ def inner(repo: Dolt): assert new_branch_name not in repo.get_branch_list(), 'Branch {} already exists'.format(new_branch_name) logger.info('Creating new branch on repo in {} named {} at refspec {}'.format(repo.repo_dir, diff --git a/setup.py b/setup.py index 08faa5b..05d5e1f 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import setup, find_packages setup(name='doltpy', - version='0.0.6', + version='0.0.7', packages=find_packages(), install_requires=['pandas>=0.25.0', 'sqlalchemy>=1.3.8', 'mysql-connector-python==8.0.17', 'retry>=0.9.2'], author='Liquidata',