-
Notifications
You must be signed in to change notification settings - Fork 21
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Include disk-uploader pipeline in tekton tests
Signed-off-by: Geetika Kapoor <gkapoor@redhat.com>
- Loading branch information
1 parent
c51774a
commit aac3b92
Showing
4 changed files
with
220 additions
and
36 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
168 changes: 168 additions & 0 deletions
168
tests/infrastructure/tekton/test_tekton_pipeline_disk_uploader.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
import os | ||
|
||
import pytest | ||
from kubernetes.dynamic.exceptions import ResourceNotFoundError | ||
from ocp_resources.data_source import DataSource | ||
from ocp_resources.pipeline import Pipeline | ||
from ocp_resources.pipelineruns import PipelineRun | ||
from ocp_resources.secret import Secret | ||
from ocp_resources.virtual_machine import VirtualMachine | ||
from ocp_resources.virtual_machine_cluster_instancetype import VirtualMachineClusterInstancetype | ||
from ocp_resources.virtual_machine_cluster_preference import VirtualMachineClusterPreference | ||
from pytest_testconfig import py_config | ||
|
||
from tests.infrastructure.tekton.utils import assert_pipelinerun_succeeded, check_pipelinerun_status | ||
from utilities.constants import OS_FLAVOR_FEDORA, PVC, TIMEOUT_5MIN, TIMEOUT_10MIN, TIMEOUT_30SEC | ||
from utilities.infra import base64_encode_str | ||
from utilities.storage import create_dv | ||
from utilities.virt import VirtualMachineForTests, wait_for_running_vm | ||
|
||
pytestmark = pytest.mark.tier3 | ||
|
||
DISK_UPLOADER_TASK = "disk-uploader" | ||
|
||
EXPORT_SOURCE_KIND = "EXPORT_SOURCE_KIND" | ||
EXPORT_SOURCE_NAME = "EXPORT_SOURCE_NAME" | ||
VOLUME_NAME = "VOLUME_NAME" | ||
IMAGE_DESTINATION = "IMAGE_DESTINATION" | ||
PUSH_TIMEOUT = "PUSH_TIMEOUT" | ||
SECRET_NAME = "SECRET_NAME" | ||
|
||
pipeline_params = [ | ||
{"name": EXPORT_SOURCE_KIND, "type": "string"}, | ||
{"name": EXPORT_SOURCE_NAME, "type": "string"}, | ||
{"name": VOLUME_NAME, "type": "string"}, | ||
{"name": IMAGE_DESTINATION, "type": "string"}, | ||
{"name": PUSH_TIMEOUT, "type": "string"}, | ||
{"name": SECRET_NAME, "type": "string"}, | ||
] | ||
|
||
pipeline_tasks = [ | ||
{ | ||
"name": DISK_UPLOADER_TASK, | ||
"params": [ | ||
{"name": EXPORT_SOURCE_KIND, "value": f"$(params.{EXPORT_SOURCE_KIND})"}, | ||
{"name": EXPORT_SOURCE_NAME, "value": f"$(params.{EXPORT_SOURCE_NAME})"}, | ||
{"name": VOLUME_NAME, "value": f"$(params.{VOLUME_NAME})"}, | ||
{"name": IMAGE_DESTINATION, "value": f"$(params.{IMAGE_DESTINATION})"}, | ||
{"name": PUSH_TIMEOUT, "value": f"$(params.{PUSH_TIMEOUT})"}, | ||
{"name": SECRET_NAME, "value": f"$(params.{SECRET_NAME})"}, | ||
], | ||
"taskRef": {"kind": "Task", "name": DISK_UPLOADER_TASK}, | ||
} | ||
] | ||
|
||
|
||
@pytest.fixture() | ||
def quay_disk_uploader_secret(custom_pipeline_namespace): | ||
with Secret( | ||
name="quay-disk-uploader-secret", | ||
namespace=custom_pipeline_namespace.name, | ||
accesskeyid=base64_encode_str(os.environ["QUAY_ACCESS_KEY_TEKTON_TASKS"]), | ||
secretkey=base64_encode_str(os.environ["QUAY_SECRET_KEY_TEKTON_TASKS"]), | ||
) as quay_disk_uploader_secret: | ||
yield quay_disk_uploader_secret | ||
|
||
|
||
@pytest.fixture() | ||
def created_fedora_dv(admin_client, golden_images_namespace, custom_pipeline_namespace): | ||
with create_dv( | ||
dv_name="fedora-dv-disk-uploader", | ||
namespace=custom_pipeline_namespace.name, | ||
source=PVC, | ||
source_pvc=DataSource( | ||
name=OS_FLAVOR_FEDORA, namespace=golden_images_namespace.name | ||
).instance.spec.source.pvc.name, | ||
source_namespace=golden_images_namespace.name, | ||
size="35Gi", | ||
client=admin_client, | ||
storage_class=py_config["default_storage_class"], | ||
) as dv: | ||
dv.wait_for_dv_success() | ||
yield dv | ||
|
||
|
||
@pytest.fixture() | ||
def vm_for_disk_uploader(custom_pipeline_namespace, admin_client, created_fedora_dv): | ||
with VirtualMachineForTests( | ||
name="fedora-vm-diskuploader", | ||
namespace=custom_pipeline_namespace.name, | ||
client=admin_client, | ||
data_volume=created_fedora_dv, | ||
vm_instance_type=VirtualMachineClusterInstancetype(name="u1.small"), | ||
vm_preference=VirtualMachineClusterPreference(name=OS_FLAVOR_FEDORA), | ||
run_strategy=VirtualMachine.RunStrategy.ALWAYS, | ||
) as vm: | ||
wait_for_running_vm(vm=vm, wait_for_interfaces=False, check_ssh_connectivity=False) | ||
yield vm | ||
|
||
|
||
@pytest.fixture() | ||
def stopped_vm_for_disk_uploader(vm_for_disk_uploader): | ||
vm_for_disk_uploader.stop(wait=True) | ||
yield | ||
|
||
|
||
@pytest.fixture() | ||
def pipeline_disk_uploader( | ||
admin_client, | ||
custom_pipeline_namespace, | ||
): | ||
with Pipeline( | ||
name="pipeline-disk-uploader", | ||
namespace=custom_pipeline_namespace.name, | ||
client=admin_client, | ||
tasks=pipeline_tasks, | ||
params=pipeline_params, | ||
) as pipeline: | ||
if pipeline.exists: | ||
yield pipeline | ||
raise ResourceNotFoundError(f"Pipeline {pipeline.name} not found") | ||
|
||
|
||
@pytest.fixture() | ||
def pipelinerun_for_disk_uploader( | ||
admin_client, | ||
custom_pipeline_namespace, | ||
pipeline_disk_uploader, | ||
vm_for_disk_uploader, | ||
created_fedora_dv, | ||
quay_disk_uploader_secret, | ||
): | ||
pipeline_run_params = { | ||
"EXPORT_SOURCE_KIND": "vm", | ||
"EXPORT_SOURCE_NAME": vm_for_disk_uploader.name, | ||
"VOLUME_NAME": created_fedora_dv.name, | ||
"IMAGE_DESTINATION": "quay.io/openshift-cnv/tekton-tasks", | ||
"PUSH_TIMEOUT": TIMEOUT_5MIN, | ||
"SECRET_NAME": quay_disk_uploader_secret.name, | ||
} | ||
|
||
with PipelineRun( | ||
name="pipelinerun-disk-uploader", | ||
namespace=custom_pipeline_namespace.name, | ||
client=admin_client, | ||
params=pipeline_run_params, | ||
pipelineref=pipeline_disk_uploader.name, | ||
) as pipelinerun: | ||
pipelinerun.wait_for_conditions() | ||
yield pipelinerun | ||
|
||
|
||
@pytest.mark.usefixtures("extracted_kubevirt_tekton_resources", "processed_yaml_files") | ||
@pytest.mark.polarion("CNV-11721") | ||
def test_disk_uploader_pipelinerun( | ||
quay_disk_uploader_secret, | ||
vm_for_disk_uploader, | ||
stopped_vm_for_disk_uploader, | ||
pipeline_disk_uploader, | ||
pipelinerun_for_disk_uploader, | ||
): | ||
assert_pipelinerun_succeeded( | ||
pipelinerun=pipelinerun_for_disk_uploader, | ||
final_status=check_pipelinerun_status( | ||
pipelinerun=pipelinerun_for_disk_uploader, | ||
wait_timeout=TIMEOUT_10MIN, | ||
sleep_interval=TIMEOUT_30SEC, | ||
), | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters