Skip to content
This repository has been archived by the owner on Nov 11, 2019. It is now read-only.

please/cli: limiting async with semaphore #2015

Merged
merged 1 commit into from
Apr 2, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions lib/please_cli/please_cli/decision_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import datetime
import functools
import json
import multiprocessing
import os
import typing

Expand Down Expand Up @@ -371,6 +372,7 @@ async def read_stream(stream,


async def run(_command: typing.Union[str, typing.List[str]],
semaphore: typing.Optional[asyncio.Semaphore] = None,
stream: bool = False,
handle_stream_line: typing.Optional[typing.Callable[[str], None]] = None,
log_command: bool = True,
Expand Down Expand Up @@ -400,7 +402,12 @@ async def run(_command: typing.Union[str, typing.List[str]],
if log_command:
log.debug('Running command', command=hide_secrets(command, secrets), kwargs=_kwargs)

process = await asyncio.create_subprocess_shell(command, **_kwargs) # noqa
if semaphore is None:
process = await asyncio.create_subprocess_shell(command, **_kwargs) # noqa
else:
async with semaphore:
process = await asyncio.create_subprocess_shell(command, **_kwargs) # noqa

if stream:
_log_output: typing.Optional[typing.Callable[[str], None]] = None
if log_output:
Expand All @@ -420,17 +427,19 @@ def _log_output(line):
return process.returncode, output, error


async def check_in_nix_cache(session: aiohttp.ClientSession,
async def check_in_nix_cache(semaphore: asyncio.Semaphore,
session: aiohttp.ClientSession,
nix_path: NixPath,
nix_hash: NixHash,
) -> typing.Tuple[NixPath, bool]:
exists = False
for cache_url in please_cli.config.CACHE_URLS:
try:
url = f'{cache_url}/{nix_hash}.narinfo'
async with session.get(url) as resp:
exists = resp.status == 200
break
async with semaphore:
async with session.get(url) as resp:
exists = resp.status == 200
break
except Exception:
exists = False

Expand Down Expand Up @@ -469,10 +478,14 @@ def nix_hash(self):
return self._drv[0][0][1][11:43]


async def get_projects_hash(nix_instantiate: str,
async def get_projects_hash(semaphore: asyncio.Semaphore,
nix_instantiate: str,
nix_path: NixPath):
default_nix = os.path.join(please_cli.config.ROOT_DIR, 'nix/default.nix')
code, output, error = await run([nix_instantiate, default_nix, '-A', nix_path], stream=True)
code, output, error = await run([nix_instantiate, default_nix, '-A', nix_path],
stream=True,
semaphore=semaphore,
)
try:
drv_path = output.split('\n')[-1].strip()
with open(drv_path) as f:
Expand All @@ -488,9 +501,12 @@ async def get_projects_hash(nix_instantiate: str,
async def get_projects_hashes(nix_instantiate: str,
projects: Projects,
) -> NixHashes:
# this limits nix calls allowed to make at the same time
semaphore = asyncio.Semaphore(value=multiprocessing.cpu_count())

nix_hashes = []
tasks = [
get_projects_hash(nix_instantiate, nix_path)
get_projects_hash(semaphore, nix_instantiate, nix_path)
for nix_path in get_nix_paths(projects)
]
for task in tqdm.tqdm(asyncio.as_completed(tasks), total=len(tasks)):
Expand All @@ -507,9 +523,12 @@ async def get_projects_to_build(session: aiohttp.ClientSession,
nix_hashes: NixHashes
) -> typing.List[str]:

# how many concurrent requests to sent
semaphore = asyncio.Semaphore(value=100)

project_to_build: typing.Dict[str, bool] = dict()
tasks = [
check_in_nix_cache(session, nix_path, nix_hash)
check_in_nix_cache(semaphore, session, nix_path, nix_hash)
for nix_path, nix_hash in nix_hashes
]
for task in tqdm.tqdm(asyncio.as_completed(tasks), total=len(tasks)):
Expand Down