Skip to content

Commit

Permalink
Merge branch 'develop' into develop-fix-synapsecache-issue
Browse files Browse the repository at this point in the history
  • Loading branch information
linglp committed Jul 26, 2023
2 parents a11eb37 + 5fb5743 commit c65f617
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 42 deletions.
42 changes: 29 additions & 13 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Flask-Cors = "^3.0.10"
pdoc = "^12.2.0"
dateparser = "^1.1.4"
pandarallel = "^1.6.4"
schematic-db = {version = "^0.0.20", extras = ["synapse"]}
schematic-db = {version = "^0.0.29", extras = ["synapse"]}
pyopenssl = "^23.0.0"
typing-extensions = "<4.6.0"

Expand Down
46 changes: 38 additions & 8 deletions schematic/manifest/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,8 @@ def set_dataframe_by_url(
start_col = self._column_to_letter(len(manifest_df.columns) - num_out_of_schema_columns) # find start of out of schema columns
end_col = self._column_to_letter(len(manifest_df.columns) + 1) # find end of out of schema columns
wb.set_data_validation(start = start_col, end = end_col, condition_type = None)


# set permissions so that anyone with the link can edit
sh.share("", role="writer", type="anyone")

Expand Down Expand Up @@ -1463,7 +1465,7 @@ def _handle_output_format_logic(self, output_format: str = None, output_path: st
return output_file_path

# Return google sheet if sheet_url flag is raised.
elif sheet_url:
elif sheet_url:
manifest_sh = self.set_dataframe_by_url(manifest_url=empty_manifest_url, manifest_df=dataframe, out_of_schema_columns=out_of_schema_columns)
return manifest_sh.url

Expand Down Expand Up @@ -1521,7 +1523,6 @@ def get_manifest(
if manifest_record:
# TODO: Update or remove the warning in self.__init__() if
# you change the behavior here based on self.use_annotations

# Update df with existing manifest. Agnostic to output format
updated_df, out_of_schema_columns = self._update_dataframe_with_existing_df(empty_manifest_url=empty_manifest_url, existing_df=manifest_record[1])

Expand Down Expand Up @@ -1555,7 +1556,6 @@ def get_manifest(

# Update `additional_metadata` and generate manifest
manifest_url, manifest_df = self.get_manifest_with_annotations(annotations)

# Update df with existing manifest. Agnostic to output format
updated_df, out_of_schema_columns = self._update_dataframe_with_existing_df(empty_manifest_url=empty_manifest_url, existing_df=manifest_df)

Expand All @@ -1564,10 +1564,35 @@ def get_manifest(
output_path = output_path,
sheet_url = sheet_url,
empty_manifest_url=empty_manifest_url,
dataframe = manifest_df,
dataframe = updated_df,
out_of_schema_columns = out_of_schema_columns,
)
return result

def _get_end_columns(self, current_schema_headers, existing_manifest_headers, out_of_schema_columns):
"""
Gather columns to be added to the end of the manifest, and ensure entityId is at the end.
Args:
current_schema_headers: list, columns in the current manifest schema
existing_manifest_headers: list, columns in the existing manifest
out_of_schema_columns: set, columns that are in the existing manifest, but not the current schema
Returns:
end_columns: list of columns to be added to the end of the manifest.
"""
# Identify columns to add to the end of the manifest
end_columns = list(out_of_schema_columns)

# Make sure want Ids are placed at end of manifest, in given order.
for id_name in ['Uuid', 'Id', 'entityId']:
if id_name in end_columns:
end_columns.remove(id_name)
end_columns.append(id_name)

# Add entity_id to the end columns if it should be there but isn't
if 'entityId' in (current_schema_headers or existing_manfiest_headers) and 'entityId' not in end_columns:
end_columns.append('entityId')
return end_columns

def _update_dataframe_with_existing_df(self, empty_manifest_url: str, existing_df: pd.DataFrame) -> pd.DataFrame:
"""
Handle scenario when existing manifest does not match new manifest template due to changes in the data model:
Expand All @@ -1585,13 +1610,13 @@ def _update_dataframe_with_existing_df(self, empty_manifest_url: str, existing_d

# Get headers for the current schema and existing manifest df.
current_schema_headers = list(self.get_dataframe_by_url(empty_manifest_url).columns)
existing_manfiest_headers = list(existing_df.columns)
existing_manifest_headers = list(existing_df.columns)

# Find columns that exist in the current schema, but are not in the manifest being downloaded.
new_columns = self._get_missing_columns(current_schema_headers, existing_manfiest_headers)
new_columns = self._get_missing_columns(current_schema_headers, existing_manifest_headers)

# Find columns that exist in the manifest being downloaded, but not in the current schema.
out_of_schema_columns = self._get_missing_columns(existing_manfiest_headers, current_schema_headers)
out_of_schema_columns = self._get_missing_columns(existing_manifest_headers, current_schema_headers)

# clean empty columns if any are present (there should be none)
# TODO: Remove this line once we start preventing empty column names
Expand All @@ -1607,12 +1632,17 @@ def _update_dataframe_with_existing_df(self, empty_manifest_url: str, existing_d
**dict(zip(new_columns, len(new_columns) * [""]))
)

end_columns = self._get_end_columns(current_schema_headers=current_schema_headers,
existing_manifest_headers=existing_manifest_headers,
out_of_schema_columns=out_of_schema_columns)

# sort columns in the updated manifest:
# match latest schema order
# move obsolete columns at the end
updated_df = updated_df[self.sort_manifest_fields(updated_df.columns)]
updated_df = updated_df[[c for c in updated_df if c not in out_of_schema_columns] + list(out_of_schema_columns)]

# move obsolete columns at the end with entityId at the very end
updated_df = updated_df[[c for c in updated_df if c not in end_columns] + list(end_columns)]
return updated_df, out_of_schema_columns

def _format_new_excel_column(self, worksheet, new_column_index: int, col: str):
Expand Down
26 changes: 6 additions & 20 deletions schematic/store/synapse.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

import uuid

from schematic_db.synapse.synapse import SynapseConfig
from schematic_db.rdb.synapse_database import SynapseDatabase


Expand Down Expand Up @@ -2107,29 +2106,25 @@ def replaceTable(self, specifySchema: bool = True, columnTypeDict: dict = None,)
return self.existingTableId


def _get_schematic_db_creds(self,):
username = None
def _get_auth_token(self,):
authtoken = None


# Get access token from environment variable if available
# Primarily useful for testing environments, with other possible usefulness for containers
env_access_token = os.getenv("SYNAPSE_ACCESS_TOKEN")
if env_access_token:
authtoken = env_access_token
return username, authtoken
return authtoken

# Get token from authorization header
# Primarily useful for API endpoint functionality
if 'Authorization' in self.synStore.syn.default_headers:
authtoken = self.synStore.syn.default_headers['Authorization'].split('Bearer ')[-1]
return username, authtoken
return authtoken

# retrive credentials from synapse object
# Primarily useful for local users, could only be stored here when a .synapseConfig file is used, but including to be safe
synapse_object_creds = self.synStore.syn.credentials
if hasattr(synapse_object_creds, 'username'):
username = synapse_object_creds.username
if hasattr(synapse_object_creds, '_token'):
authtoken = synapse_object_creds.secret

Expand All @@ -2139,24 +2134,16 @@ def _get_schematic_db_creds(self,):
config = self.synStore.syn.getConfigFile(CONFIG.synapse_configuration_path)

# check which credentials are provided in file
if config.has_option('authentication', 'username'):
username = config.get('authentication', 'username')
if config.has_option('authentication', 'authtoken'):
authtoken = config.get('authentication', 'authtoken')

# raise error if required credentials are not found
# providing an authtoken without a username did not prohibit upsert functionality,
# but including username gathering for completeness for schematic_db
if not username and not authtoken:
raise NameError(
"Username and authtoken credentials could not be found in the environment, synapse object, or the .synapseConfig file"
)
if not authtoken:
raise NameError(
"authtoken credentials could not be found in the environment, synapse object, or the .synapseConfig file"
)

return username, authtoken
return authtoken

def upsertTable(self, sg: SchemaGenerator,):
"""
Expand All @@ -2173,10 +2160,9 @@ def upsertTable(self, sg: SchemaGenerator,):
existingTableId: synID of the already existing table that had its metadata replaced
"""

username, authtoken = self._get_schematic_db_creds()
authtoken = self._get_auth_token()

synConfig = SynapseConfig(username, authtoken, self.synStore.getDatasetProject(self.datasetId))
synapseDB = SynapseDatabase(synConfig)
synapseDB = SynapseDatabase(auth_token=authtoken, project_id=self.synStore.getDatasetProject(self.datasetId))

try:
# Try performing upsert
Expand Down

0 comments on commit c65f617

Please sign in to comment.