Skip to content
This repository has been archived by the owner on Jul 14, 2023. It is now read-only.

Commit

Permalink
[DoNotMerge] Add coverage tool (#472)
Browse files Browse the repository at this point in the history
* add coverage function

* fix test

* add coverage script.

* fix test

* fix system exit exception

* fix test

* fix test
  • Loading branch information
00Kai0 authored Aug 10, 2020
1 parent 8631578 commit f2708d2
Show file tree
Hide file tree
Showing 11 changed files with 411 additions and 20 deletions.
216 changes: 216 additions & 0 deletions .scripts/calc_test_coverage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
#!/bin/python
import os
import time
import subprocess
import datetime as dt
from urllib import parse

from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base


# get db info
HostName = os.environ.get("DB_HOST_NAME")
LoginName = os.environ.get("DB_LOGIN_NAME")
Password = parse.quote_plus(os.environ.get("DB_PASSWORD")) # encode special chars
DBName = os.environ.get("DB_NAME")
# DEV_PREFIX = "D:/dev/"
DEV_PREFIX = os.environ.get("DEV_PREFIX")
AZ_PREFIX = DEV_PREFIX + "autorest.az"
AZ_CLI_EXT_PREFIX = DEV_PREFIX + "azure-cli-extensions"
AWAGGER_PREFIX = DEV_PREFIX + "azure-rest-api-specs"


Base = declarative_base()

# Models
class TblAztestBatch(Base):
__tablename__ = "tbl_aztest_batch"

id = Column(Integer, primary_key=True)
batch_name = Column(String, nullable=True)
start_dt = Column(DateTime, default=dt.datetime.utcnow)
end_dt = Column(DateTime, default=dt.datetime.utcnow)

class TblRp(Base):
__tablename__ = "tbl_rp"

id = Column(Integer, primary_key=True)
rp_name = Column(String)

class TblAztestStep(Base):
__tablename__ = "tbl_aztest_step"

id = Column(Integer, primary_key=True)
aztest_batch_id = Column(Integer, ForeignKey(TblAztestBatch.id))
rp_id = Column(Integer, ForeignKey(TblRp.id))
testcase_name = Column(String)
step_name = Column(String)
result = Column(Integer, default=1)
error_message = Column(String, default="")
error_stack = Column(String, default="")
error_normalized = Column(String, default="")
start_dt = Column(DateTime, default=dt.datetime.utcnow)
end_dt = Column(DateTime, default=dt.datetime.utcnow)

# Client
class DBClient:

def __init__(self):
# pyodbc
odbc_connect = parse.quote_plus('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+HostName+';DATABASE='+DBName+';UID='+LoginName+';PWD='+ Password)
self._engine = create_engine('mssql+pyodbc:///?odbc_connect={}'.format(odbc_connect))
self._session_maker = sessionmaker(bind=self._engine)

def __enter__(self):
self._session = self._session_maker()
return self._session

def __exit__(self, *args, **kwargs):
self._session.close()

def test(client):
rs = client.execute("select * from tbl_aztest_batch")
print(rs.fetchone())

def calcCoverage(client, testcase_name, test_path, repo, tbl_batch, debug=False):
if not os.path.exists(test_path):
return

if not debug:
tbl_rp = client.query(TblRp).filter(TblRp.rp_name==repo).first()
if not tbl_rp:
# create rp
tbl_rp = TblRp(rp_name=repo)
client.add(tbl_rp)
tbl_rp = client.query(TblRp).filter(TblRp.rp_name==repo).first()

steps = None
coverage = None
with open(test_path, 'r') as f:
text = f.readlines()
steps = text[1:-1]
coverage = text[-1]

for step in steps:
scenario = step.split('|')[1:-1]
if len(scenario) != 7:
continue
step_name, result, error_message, error_stack, error_normalized, start_dt, end_dt = scenario
start_dt = dt.datetime.strptime(start_dt, "%Y-%m-%d %H:%M:%S.%f")
end_dt = dt.datetime.strptime(end_dt, "%Y-%m-%d %H:%M:%S.%f")
result = True if result == "successed" else False

if debug:
# debug
# print(step_name, result)
print(scenario)
else:
# add step
tbl_az_step = TblAztestStep(
aztest_batch_id=tbl_batch.id,
rp_id=tbl_rp.id,
testcase_name=testcase_name,
step_name=step_name,
result=result,
error_message=error_message,
error_stack=error_stack,
error_normalized=error_normalized,
start_dt=start_dt,
end_dt=end_dt
)
client.add(tbl_az_step)

# commit
client.commit()

def runTask(args):
try:
if os.name == 'nt':
output = subprocess.check_output(args, stderr=subprocess.STDOUT, shell=True, universal_newlines=True)
else:
output = subprocess.check_output(args, stderr=subprocess.STDOUT, universal_newlines=True)
except subprocess.CalledProcessError as exc:
print("Status : FAIL", exc.returncode)
raise AssertionError(exc.output)
else:
print("Output: \n{}\n".format(output))

# Collection
def repoColletor(debug=False, enable_batch=False, run_codegen=False, is_live=False):
az_cli_ext_prefix = AZ_CLI_EXT_PREFIX + "/src"
use_az = "--use=" + AZ_PREFIX
use_az_cli_folder = "--azure-cli-extension-folder=" + AZ_CLI_EXT_PREFIX

repos = list()

# run codegen (the rp names in specification are different from cli_extension )
if run_codegen:
for rp in os.listdir(AWAGGER_PREFIX + '/specification/'):
if rp == 'testserver':
swagger_arg_str = AZ_PREFIX + '/src/test/scenarios/testserver/configuration/readme.md'
else:
swagger_arg_str = AWAGGER_PREFIX + '/specification/'+ rp + '/resource-manager/readme.md'

cmd_codegen = ['autorest', '--version=3.0.6271', '--az', use_az, use_az_cli_folder, swagger_arg_str]
print(" ".join(cmd_codegen))
try:
runTask(cmd_codegen)
except:
continue

for rp in os.listdir(az_cli_ext_prefix):
if os.path.isdir(os.path.join(az_cli_ext_prefix, rp)):
repos.append(rp)

# run codegen (only available for kusto now.)
if run_codegen:
# add extension
cmd_add_ext = ['azdev', 'extension', 'add', rp]
print(" ".join(cmd_add_ext))
try:
runTask(" ".join(cmd_add_ext))
except:
continue

# az test
cmd_az_test = ['azdev', 'test', rp, "--discover"]
if is_live:
cmd_az_test += ['--live']
print(" ".join(cmd_az_test))
try:
runTask(cmd_az_test)
except:
continue

with DBClient() as client:
print("Uploading data..")
if enable_batch:
# unable now
pass

if not debug and repos:
# create batch
tbl_batch = TblAztestBatch(batch_name="")
client.add(tbl_batch)
client.commit()
tbl_batch = client.query(TblAztestBatch).order_by(TblAztestBatch.start_dt.desc()).first()
else:
tbl_batch = None

for repo in repos:
c_name = "test_{}_scenario_coverage.md".format(repo)
c_path = os.path.join(az_cli_ext_prefix, repo, "azext_" + repo, "tests", "latest", c_name)

calcCoverage(client, c_name, c_path, repo, tbl_batch, debug=debug)
print("Completed.")

def main():
# When debug is True, it won't upload test data to database.
debug = os.environ.get("CALC_COVERAGE_DEBUG", True)
repoColletor(debug=debug, run_codegen=False)


if __name__ == "__main__":
main()
40 changes: 37 additions & 3 deletions src/plugins/azgenerator/TemplateAzureCliTestInit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ export function GenerateAzureCliTestInit(model: CodeModelAz): string[] {
output.push("import os");
output.push("import sys");
output.push("import traceback");
output.push("import datetime as dt");
output.push("");
output.push("from azure.core.exceptions import AzureError");
output.push("from azure.cli.testsdk.exceptions import CliTestError, CliExecutionError, JMESPathCheckAssertionError");
output.push("");
output.push("");
output.push("__path__ = __import__('pkgutil').extend_path(__path__, __name__)");
output.push("exceptions = []");
output.push('');
output.push('test_map = dict()');
output.push('SUCCESSED = "successed"');
output.push('FAILED = "failed"');
output.push('');
output.push('def try_manual(func):');
output.push(' def import_manual_function(origin_func):');
Expand Down Expand Up @@ -58,19 +62,49 @@ export function GenerateAzureCliTestInit(model: CodeModelAz): string[] {
output.push(' func_to_call = get_func_to_call()');
output.push(' print("running {}()...".format(func.__name__))');
output.push(' try:');
output.push(' return func_to_call(*args, **kwargs)');
output.push(' except (AssertionError, AzureError, CliTestError, CliExecutionError, JMESPathCheckAssertionError) as e:');
output.push(' test_map[func.__name__] = dict()');
output.push(' test_map[func.__name__]["result"] = SUCCESSED');
output.push(' test_map[func.__name__]["error_message"] = ""');
output.push(' test_map[func.__name__]["error_stack"] = ""');
output.push(' test_map[func.__name__]["error_normalized"] = ""');
output.push(' test_map[func.__name__]["start_dt"] = dt.datetime.utcnow()');
output.push(' ret = func_to_call(*args, **kwargs)');
output.push(' except (AssertionError, AzureError, CliTestError, CliExecutionError, SystemExit, JMESPathCheckAssertionError) as e:');
output.push(' test_map[func.__name__]["end_dt"] = dt.datetime.utcnow()');
output.push(' test_map[func.__name__]["result"] = FAILED');
output.push(' test_map[func.__name__]["error_message"] = str(e).replace("\\r\\n", " ").replace("\\n", " ")[:500]');
output.push(' test_map[func.__name__]["error_stack"] = traceback.format_exc().replace("\\r\\n", " ").replace("\\n", " ")[:500]');
output.push(' print("--------------------------------------")');
output.push(' print("step exception: ", e)');
output.push(' print("--------------------------------------", file=sys.stderr)');
output.push(' print("step exception in {}: {}".format(func.__name__, e), file=sys.stderr)');
output.push(' traceback.print_exc()');
output.push(' exceptions.append((func.__name__, sys.exc_info()))');
output.push(' else:');
output.push(' test_map[func.__name__]["end_dt"] = dt.datetime.utcnow()');
output.push(' return ret');
output.push('');
output.push(' if inspect.isclass(func):');
output.push(' return get_func_to_call()');
output.push(' return wrapper');
output.push('');
output.push('def calc_coverage(filename):');
output.push(' filename = filename.split(".")[0]');
output.push(' coverage_name = filename + "_coverage.md"');
output.push(' with open(coverage_name, "w") as f:');
output.push(' f.write("|Scenario|Result|ErrorMessage|ErrorStack|ErrorNormalized|StartDt|EndDt|\\n")');
output.push(' failed = 0');
output.push(' total = len(test_map)');
output.push(' covered = 0');
output.push(' for k, v in test_map.items():');
output.push(' if not k.startswith("step_"):');
output.push(' total -= 1');
output.push(' continue');
output.push(' if v["result"] == SUCCESSED:');
output.push(' covered += 1');
output.push(' f.write("|{step_name}|{result}|{error_message}|{error_stack}|{error_normalized}|{start_dt}|{end_dt}|\\n".format(step_name=k, **v))');
output.push(' f.write("Coverage: {}/{}\\n".format(covered, total))');
output.push(' print("Create coverage\\n", file=sys.stderr)');
output.push('');
output.push('def raise_if():');
output.push(' if exceptions:');
Expand Down
3 changes: 2 additions & 1 deletion src/plugins/azgenerator/TemplateAzureCliTestScenario.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export function GenerateAzureCliTestScenario(model: CodeModelAz): string[] {
head.push("");
head.push("import os");
head.push("from azure.cli.testsdk import ScenarioTest");
head.push("from .. import try_manual, raise_if");
head.push("from .. import try_manual, raise_if, calc_coverage");
//head.push("from .preparers import (VirtualNetworkPreparer, VnetSubnetPreparer)");
steps.push("");
steps.push("");
Expand Down Expand Up @@ -140,6 +140,7 @@ export function GenerateAzureCliTestScenario(model: CodeModelAz): string[] {
funcScenario.push("");
funcScenario.push("");
body.push(` call_scenario(self${parameterLine()})`);
body.push(` calc_coverage(__file__)`);
body.push(` raise_if()`);
body.push("");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@
import os
import sys
import traceback
import datetime as dt

from azure.core.exceptions import AzureError
from azure.cli.testsdk.exceptions import CliTestError, CliExecutionError, JMESPathCheckAssertionError


__path__ = __import__('pkgutil').extend_path(__path__, __name__)
exceptions = []

test_map = dict()
SUCCESSED = "successed"
FAILED = "failed"

def try_manual(func):
def import_manual_function(origin_func):
Expand Down Expand Up @@ -48,19 +52,49 @@ def wrapper(*args, **kwargs):
func_to_call = get_func_to_call()
print("running {}()...".format(func.__name__))
try:
return func_to_call(*args, **kwargs)
except (AssertionError, AzureError, CliTestError, CliExecutionError, JMESPathCheckAssertionError) as e:
test_map[func.__name__] = dict()
test_map[func.__name__]["result"] = SUCCESSED
test_map[func.__name__]["error_message"] = ""
test_map[func.__name__]["error_stack"] = ""
test_map[func.__name__]["error_normalized"] = ""
test_map[func.__name__]["start_dt"] = dt.datetime.utcnow()
ret = func_to_call(*args, **kwargs)
except (AssertionError, AzureError, CliTestError, CliExecutionError, SystemExit, JMESPathCheckAssertionError) as e:
test_map[func.__name__]["end_dt"] = dt.datetime.utcnow()
test_map[func.__name__]["result"] = FAILED
test_map[func.__name__]["error_message"] = str(e).replace("\r\n", " ").replace("\n", " ")[:500]
test_map[func.__name__]["error_stack"] = traceback.format_exc().replace("\r\n", " ").replace("\n", " ")[:500]
print("--------------------------------------")
print("step exception: ", e)
print("--------------------------------------", file=sys.stderr)
print("step exception in {}: {}".format(func.__name__, e), file=sys.stderr)
traceback.print_exc()
exceptions.append((func.__name__, sys.exc_info()))
else:
test_map[func.__name__]["end_dt"] = dt.datetime.utcnow()
return ret

if inspect.isclass(func):
return get_func_to_call()
return wrapper

def calc_coverage(filename):
filename = filename.split(".")[0]
coverage_name = filename + "_coverage.md"
with open(coverage_name, "w") as f:
f.write("|Scenario|Result|ErrorMessage|ErrorStack|ErrorNormalized|StartDt|EndDt|\n")
failed = 0
total = len(test_map)
covered = 0
for k, v in test_map.items():
if not k.startswith("step_"):
total -= 1
continue
if v["result"] == SUCCESSED:
covered += 1
f.write("|{step_name}|{result}|{error_message}|{error_stack}|{error_normalized}|{start_dt}|{end_dt}|\n".format(step_name=k, **v))
f.write("Coverage: {}/{}\n".format(covered, total))
print("Create coverage\n", file=sys.stderr)

def raise_if():
if exceptions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import os
from azure.cli.testsdk import ScenarioTest
from .. import try_manual, raise_if
from .. import try_manual, raise_if, calc_coverage
from azure.cli.testsdk import ResourceGroupPreparer


Expand Down Expand Up @@ -105,4 +105,5 @@ class AttestationManagementClientScenarioTest(ScenarioTest):
def test_attestation(self, rg, rg_2, rg_3):

call_scenario(self, rg, rg_2, rg_3)
calc_coverage(__file__)
raise_if()
Loading

0 comments on commit f2708d2

Please sign in to comment.