Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] | Added util for concurrency control #1064

Merged
merged 3 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.12.12 (2024-10-06)

### Improvements

- Added a util `semaphore_async_iterator` to enable seamless control over concurrent executions.


## 0.12.1 (2024-10-02)

### Bug Fixes
Expand Down
45 changes: 45 additions & 0 deletions port_ocean/tests/utils/test_async_iterators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from typing import Any, AsyncGenerator
import asyncio
from port_ocean.utils.async_iterators import semaphore_async_iterator
import pytest


@pytest.mark.asyncio
async def test_semaphore_async_iterator() -> None:
max_concurrency = 5
semaphore = asyncio.BoundedSemaphore(max_concurrency)

concurrent_tasks = 0
max_concurrent_tasks = 0
lock = asyncio.Lock() # Protect shared variables

num_tasks = 20

async def mock_function() -> AsyncGenerator[str, None]:
nonlocal concurrent_tasks, max_concurrent_tasks

async with lock:
concurrent_tasks += 1
if concurrent_tasks > max_concurrent_tasks:
max_concurrent_tasks = concurrent_tasks

await asyncio.sleep(0.1)
yield "result"

async with lock:
concurrent_tasks -= 1

async def consume_iterator(async_iterator: Any) -> None:
async for _ in async_iterator:
pass

tasks = [
consume_iterator(semaphore_async_iterator(semaphore, mock_function))
for _ in range(num_tasks)
]
await asyncio.gather(*tasks)

assert (
max_concurrent_tasks <= max_concurrency
), f"Max concurrent tasks {max_concurrent_tasks} exceeded semaphore limit {max_concurrency}"
assert concurrent_tasks == 0, "Not all tasks have completed"
60 changes: 60 additions & 0 deletions port_ocean/utils/async_iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import aiostream

if typing.TYPE_CHECKING:
from asyncio import Semaphore


async def stream_async_iterators_tasks(
*tasks: typing.AsyncIterable[typing.Any],
Expand Down Expand Up @@ -47,3 +50,60 @@ async def main():
async with combine.stream() as streamer:
async for batch_items in streamer:
yield batch_items


async def semaphore_async_iterator(
semaphore: "Semaphore",
function: typing.Callable[[], typing.AsyncIterator[typing.Any]],
) -> typing.AsyncIterator[typing.Any]:
"""
Executes an asynchronous iterator function under a semaphore to limit concurrency.
This function ensures that the provided asynchronous iterator function is executed
while respecting the concurrency limit imposed by the semaphore. It acquires the
semaphore before executing the function and releases it after the function completes,
thus controlling the number of concurrent executions.
Parameters:
semaphore (asyncio.Semaphore | asyncio.BoundedSemaphore): The semaphore used to limit concurrency.
function (Callable[[], AsyncIterator[Any]]): A nullary asynchronous function, - apply arguments with `functools.partial` or an anonymous function (lambda)
that returns an asynchronous iterator. This function is executed under the semaphore.
Yields:
Any: The items yielded by the asynchronous iterator function.
Usage:
```python
import asyncio
async def async_iterator_function(param1, param2):
# Your async code here
yield ...
async def async_generator_function():
# Your async code to retrieve items
param1 = "your_param1"
yield param1
async def main():
semaphore = asyncio.BoundedSemaphore(50)
param2 = "your_param2"
tasks = [
semaphore_async_iterator(
semaphore,
lambda: async_iterator_function(param1, param2) # functools.partial(async_iterator_function, param1, param2)
)
async for param1 in async_generator_function()
]
async for batch in stream_async_iterators_tasks(*tasks):
# Process each batch
pass
asyncio.run(main())
```
"""
async with semaphore:
async for result in function():
yield result
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "port-ocean"
version = "0.12.1"
version = "0.12.2"
description = "Port Ocean is a CLI tool for managing your Port projects."
readme = "README.md"
homepage = "https://app.getport.io"
Expand Down