forked from securefederatedai/openfl
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Files added required to test experimental aggregator based workflow b…
…y GitHUB actions Signed-off-by: Parth Mandaliya <parthx.mandaliya@intel.com>
- Loading branch information
1 parent
4545663
commit 2f4c26e
Showing
4 changed files
with
237 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
90 changes: 90 additions & 0 deletions
90
tests/github/experimental/workspace/test_experimental_agg_based_workflow.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
# Copyright (C) 2020-2023 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
import os | ||
import time | ||
import socket | ||
import argparse | ||
from pathlib import Path | ||
from subprocess import check_call | ||
from concurrent.futures import ProcessPoolExecutor | ||
from sys import executable | ||
import shutil | ||
from openfl.utilities.utils import rmtree | ||
from tests.github.experimental.workspace.utils import create_collaborator | ||
from tests.github.experimental.workspace.utils import create_certified_workspace | ||
from tests.github.experimental.workspace.utils import certify_aggregator | ||
|
||
|
||
if __name__ == '__main__': | ||
# Test the pipeline | ||
parser = argparse.ArgumentParser() | ||
workspace_choice = [] | ||
with os.scandir('tests/github/experimental/workspace') as iterator: | ||
for entry in iterator: | ||
if entry.name not in ['__init__.py', 'workspace', 'default']: | ||
workspace_choice.append(entry.name) | ||
parser.add_argument('--template', default='testcase_include_exclude', choices=workspace_choice) | ||
parser.add_argument('--fed_workspace', default='fed_work12345alpha81671') | ||
parser.add_argument('--col', action='append', default=[]) | ||
parser.add_argument('--rounds-to-train') | ||
|
||
origin_dir = Path.cwd().resolve() | ||
args = parser.parse_args() | ||
fed_workspace = args.fed_workspace | ||
archive_name = f'{fed_workspace}.zip' | ||
fqdn = socket.getfqdn() | ||
template = args.template | ||
rounds_to_train = args.rounds_to_train | ||
collaborators = args.col | ||
# START | ||
# ===== | ||
# Make sure you are in a Python virtual environment with the FL package installed. | ||
|
||
source_directory = origin_dir / 'tests'/'github'/'experimental'/'workspace' / template | ||
destination_directory = origin_dir / 'openfl-workspace' / 'experimental' / template | ||
if os.path.exists(destination_directory): | ||
shutil.rmtree(destination_directory) | ||
|
||
# Copy template to the destination directory | ||
shutil.copytree(src=source_directory, dst=destination_directory) | ||
|
||
check_call([executable, '-m', 'pip', 'install', '.']) | ||
|
||
# Activate experimental | ||
check_call(['fx', 'experimental', 'activate']) | ||
|
||
create_certified_workspace(fed_workspace, template, fqdn, rounds_to_train) | ||
certify_aggregator(fqdn) | ||
|
||
# Get the absolute directory path for the workspace | ||
workspace_root = Path().resolve() | ||
|
||
# Create Collaborators | ||
for collab in collaborators: | ||
create_collaborator( | ||
collab, workspace_root, archive_name, fed_workspace | ||
) | ||
|
||
# Run the federation | ||
with ProcessPoolExecutor(max_workers=len(collaborators) + 1) as executor: | ||
executor.submit( | ||
check_call, ['fx', 'aggregator', 'start'], cwd=workspace_root | ||
) | ||
time.sleep(5) | ||
|
||
for collab in collaborators: | ||
col_dir = workspace_root / collab / fed_workspace | ||
executor.submit( | ||
check_call, ['fx', 'collaborator', 'start', '-n', collab], | ||
cwd=col_dir | ||
) | ||
|
||
os.chdir(origin_dir) | ||
rmtree(workspace_root) | ||
|
||
# Remove template to the destination directory | ||
shutil.rmtree(destination_directory) | ||
|
||
# Deactivate experimental | ||
check_call(['fx', 'experimental', 'deactivate']) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
# Copyright (C) 2020-2023 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
import shutil | ||
from subprocess import check_call | ||
import os | ||
from pathlib import Path | ||
import re | ||
import tarfile | ||
|
||
|
||
def create_collaborator(col, workspace_root, archive_name, fed_workspace): | ||
# Copy workspace to collaborator directories (these can be on different machines) | ||
col_path = workspace_root / col | ||
shutil.rmtree(col_path, ignore_errors=True) # Remove any existing directory | ||
col_path.mkdir() # Create a new directory for the collaborator | ||
|
||
# Import the workspace to this collaborator | ||
check_call( | ||
['fx', 'workspace', 'import', '--archive', workspace_root / archive_name], | ||
cwd=col_path | ||
) | ||
|
||
# Create collaborator certificate request | ||
check_call( | ||
['fx', 'collaborator', 'create', '-n', col, '--silent'], | ||
cwd=col_path / fed_workspace | ||
) | ||
# Remove '--silent' if you run this manually | ||
check_call( | ||
['fx', 'collaborator', 'generate-cert-request', '-n', col, '--silent'], | ||
cwd=col_path / fed_workspace | ||
) | ||
|
||
# Sign collaborator certificate | ||
# Remove '--silent' if you run this manually | ||
request_pkg = col_path / fed_workspace / f'col_{col}_to_agg_cert_request.zip' | ||
check_call( | ||
['fx', 'collaborator', 'certify', '--request-pkg', str(request_pkg), '--silent'], | ||
cwd=workspace_root) | ||
|
||
# Import the signed certificate from the aggregator | ||
import_path = workspace_root / f'agg_to_col_{col}_signed_cert.zip' | ||
check_call( | ||
['fx', 'collaborator', 'certify', '--import', import_path], | ||
cwd=col_path / fed_workspace | ||
) | ||
|
||
|
||
def create_certified_workspace(path, template, fqdn, rounds_to_train): | ||
shutil.rmtree(path, ignore_errors=True) | ||
check_call(['fx', 'workspace', 'create', '--prefix', path, '--template', template]) | ||
os.chdir(path) | ||
|
||
# Initialize FL plan | ||
check_call(['fx', 'plan', 'initialize', '-a', fqdn]) | ||
plan_path = Path('plan/plan.yaml') | ||
try: | ||
rounds_to_train = int(rounds_to_train) | ||
with open(plan_path, "r", encoding='utf-8') as sources: | ||
lines = sources.readlines() | ||
with open(plan_path, "w", encoding='utf-8') as sources: | ||
for line in lines: | ||
sources.write( | ||
re.sub(r'rounds_to_train.*', f'rounds_to_train: {rounds_to_train}', line) | ||
) | ||
except (ValueError, TypeError): | ||
pass | ||
# Create certificate authority for workspace | ||
check_call(['fx', 'workspace', 'certify']) | ||
|
||
# Export FL workspace | ||
check_call(['fx', 'workspace', 'export']) | ||
|
||
|
||
def certify_aggregator(fqdn): | ||
# Create aggregator certificate | ||
check_call(['fx', 'aggregator', 'generate-cert-request', '--fqdn', fqdn]) | ||
|
||
# Sign aggregator certificate | ||
check_call(['fx', 'aggregator', 'certify', '--fqdn', fqdn, '--silent']) | ||
|
||
|
||
def create_signed_cert_for_collaborator(col, data_path): | ||
''' | ||
We do certs exchage for all participants in a single workspace to speed up this test run. | ||
Do not do this in real experiments in untrusted environments | ||
''' | ||
print(f'Certifying collaborator {col} with data path {data_path}...') | ||
# Create collaborator certificate request | ||
check_call([ | ||
'fx', 'collaborator', 'create', '-d', data_path, '-n', col, '--silent' | ||
]) | ||
check_call([ | ||
'fx', 'collaborator', 'generate-cert-request', '-n', col, '--silent' | ||
]) | ||
# Sign collaborator certificate | ||
check_call([ | ||
'fx', | ||
'collaborator', | ||
'certify', | ||
'--request-pkg', | ||
f'col_{col}_to_agg_cert_request.zip', | ||
'--silent' | ||
]) | ||
|
||
# Pack the collaborators private key and the signed cert | ||
# as well as it's data.yaml to a tarball | ||
tarfiles = ['plan/data.yaml', f'agg_to_col_{col}_signed_cert.zip'] | ||
with os.scandir('cert/client') as iterator: | ||
for entry in iterator: | ||
if entry.name.endswith('key'): | ||
tarfiles.append(entry.path) | ||
with tarfile.open(f'cert_col_{col}.tar', 'w') as t: | ||
for f in tarfiles: | ||
t.add(f) | ||
for f in tarfiles: | ||
os.remove(f) | ||
# Remove request archive | ||
os.remove(f'col_{col}_to_agg_cert_request.zip') | ||
|
||
|
||
def start_aggregator_container(workspace_image_name, aggregator_required_files): | ||
check_call( | ||
'docker run --rm ' | ||
'--network host ' | ||
f'-v {Path.cwd().resolve()}/{aggregator_required_files}:/certs.tar ' | ||
'-e \"CONTAINER_TYPE=aggregator\" ' | ||
f'{workspace_image_name} ' | ||
'bash /openfl/openfl-docker/start_actor_in_container.sh', | ||
shell=True) | ||
|
||
|
||
def start_collaborator_container(workspace_image_name, col_name): | ||
check_call( | ||
'docker run --rm ' | ||
'--network host ' | ||
f'-v {Path.cwd()}/cert_col_{col_name}.tar:/certs.tar ' | ||
'-e \"CONTAINER_TYPE=collaborator\" ' | ||
f'-e \"COL={col_name}\" ' | ||
f'{workspace_image_name} ' | ||
'bash /openfl/openfl-docker/start_actor_in_container.sh', | ||
shell=True) |