Skip to content

Commit

Permalink
feat(utils): implements concurrent executor helper functions
Browse files Browse the repository at this point in the history
We require concurrent execution of threads at many places in the CLI for
example deleting multiple deployments, devices, etc.

This commit adds the helper functions that can be used at all these
places without repeating code.
  • Loading branch information
pallabpain committed Jan 25, 2024
1 parent f2e7afe commit 7a67be3
Showing 1 changed file with 78 additions and 1 deletion.
79 changes: 78 additions & 1 deletion riocli/utils/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import json
import typing
from concurrent.futures import ThreadPoolExecutor
from queue import Queue

import click
from rapyuta_io import Command
from rapyuta_io.utils import RestClient
from rapyuta_io.utils.rest_client import HttpMethod
Expand Down Expand Up @@ -65,3 +67,78 @@ def run_on_device(
device = client.get_device(device_id=device_guid)
cmd = ' '.join(command)
return device.execute_command(Command(cmd, shell=shell, bg=background, runas=user))


def apply_func(
f: typing.Callable,
items: typing.List[typing.Any],
workers: int = 5
) -> None:
"""Apply a function to a list of items in parallel
Parameters
----------
f : typing.Callable
The function to apply
items : typing.List
The list of items to apply the function to
workers : int
The number of workers to use
"""
with ThreadPoolExecutor(
max_workers=workers,
thread_name_prefix='exec'
) as e:
e.map(f, items)


def apply_func_with_result(
f: typing.Callable,
items: typing.List[typing.Any],
workers: int = 5,
key: typing.Callable = None
) -> typing.List[typing.Any]:
"""Apply a function to a list of items in parallel and return the result
The function to apply must use the queue to return the result. For example,
def _apply_delete(result: Queue, deployment: Deployment) -> None:
try:
deployment.deprovision()
result.put((deployment.name, True))
except Exception:
result.put((deployment.name, False))
Here's another example,
def _apply_update(client: Client, result: Queue, deployment: Deployment) -> None
try:
client.update_deployment(deployment)
result.put((deployment.name, True))
except Exception:
result.put((deployment.name, False))
Note that the second last argument of the function must be the queue and
the last must be the item. This requirement must be adhered to for this
function to work correctly.
Parameters
----------
f : typing.Callable
The function to apply
items : typing.List
The list of items to apply the function to
workers : int
The number of workers to use
key : typing.Callable
The function to use to sort the result
"""
r = Queue()
f = functools.partial(f, r)

apply_func(f, items, workers)

if key:
return sorted(list(r.queue), key=key)

return list(r.queue)

0 comments on commit 7a67be3

Please sign in to comment.