diff --git a/files_for_preparation/bom_for_e.bbclass b/files_for_preparation/bom_for_e.bbclass new file mode 100644 index 000000000..93006c099 --- /dev/null +++ b/files_for_preparation/bom_for_e.bbclass @@ -0,0 +1,149 @@ +# Copyright (c) 2020 LG Electronics, Inc. +# SPDX-License-Identifier: Apache-2.0 +# +# This class adds write_bom_info and write_abi_xml_data, +# Each of them can be run by bitake --runall option. +# They are useful to verify build output specification. + +do_write_bom_info[nostamp] = "1" +addtask write_bom_info +python do_write_bom_info() { + import json + import time + # We want one recipe per line, starting with arch and recipe keys, + # so that it's easy to sort and compare them + class BomJSONEncoder(json.JSONEncoder): + def iterencode(self, obj, _one_shot=True): + if isinstance(obj, dict): + output = [] + if "arch" in obj.keys() and "recipe" in obj.keys(): + output.append(json.dumps("arch") + ": " + self.encode(obj["arch"])) + output.append(json.dumps("recipe") + ": " + self.encode(obj["recipe"])) + for key, value in sorted(obj.items()): + if key == "arch" or key == "recipe": + continue + output.append(json.dumps(key) + ": " + self.encode(value)) + return "{" + ",".join(output) + "}" + else: + return json.JSONEncoder().iterencode(obj, _one_shot) + + + jsondata = {} + jsondata["src_path"] = d.getVar("S", True) + jsondata["src_uri"] = d.getVar("SRC_URI", True) + jsondata["srcrev"] = "".join(d.getVar("SRCREV", True).split()) + jsondata["recipe"] = d.getVar("PN", True) + jsondata["file"] = d.getVar("FILE", True)[len(d.getVar("TOPDIR", True)):] + jsondata["arch"] = d.getVar("PACKAGE_ARCH", True) + jsondata["author"] = d.getVar("AUTHOR", True) + license = d.getVar("LICENSE", True) + license_flags = d.getVar("LICENSE_FLAGS", True) + packages = d.getVar("PACKAGES", True) + jsondata["license"] = license + jsondata["license_flags"] = license_flags + jsondata["complete"] = int(time.time()) + jsondata["packages"] = packages + pkg_lic = {} + if packages: + for pkg in packages.split(): + lic = d.getVar("LICENSE_%s" % pkg, True) + if lic and lic != license: + pkg_lic[pkg] = lic + jsondata["pkg_lic"] = pkg_lic + jsondata["pe"] = d.getVar("PE", True) + jsondata["pv"] = d.getVar("PV", True) + jsondata["pr"] = d.getVar("PR", True) + jsondata["pf"] = d.getVar("PF", True) + jsondata["extendprauto"] = d.getVar("EXTENDPRAUTO", True) + jsondata["extendpkgv"] = d.getVar("EXTENDPKGV", True) + jsondata["description"] = d.getVar("DESCRIPTION", True) + jsondata["summary"] = d.getVar("SUMMARY", True) + jsondata["cve_check_whitelist "] = d.getVar("CVE_CHECK_WHITELIST", True) + + cpe_ids = get_cpe_ids(d.getVar("CVE_VENDOR",""), d.getVar("CVE_PRODUCT",""), d.getVar("CVE_VERSION",""), jsondata["recipe"], jsondata["pv"]) + jsondata["source_info"] = cpe_ids + + datafile = os.path.join(d.getVar("TOPDIR", True), "bom.json") + lock = bb.utils.lockfile(datafile + '.lock') + with open(datafile, "a") as f: + json.dump(jsondata, f, sort_keys=True, cls=BomJSONEncoder) + f.write(',\n') + bb.utils.unlockfile(lock) +} + + +python do_dumptasks() { + + #Dump BitBake tasks to ${TOPDIR}/dumped_tasks/${PF}.task_name. + + import os + import bb + + ar_outdir = os.path.join(d.getVar('TOPDIR', True), "dumped_tasks") # 기본값 설정 + ar_dumptasks = ["do_configure", "do_compile"] # 기본값 설정 + pf = d.getVar('PF', True) + + bb.utils.mkdirhier(ar_outdir) + + for task in ar_dumptasks: + # Do not export tasks that are set to do not run + if d.getVarFlag(task, 'noexec') == '1': + bb.warn('%s: skipping task %s: [noexec]' % (pf, task)) + continue + + dumpfile = os.path.join(ar_outdir, '%s.%s' % (pf, task)) + bb.note('Dumping task %s into %s' % (task, dumpfile)) + + # We assume the task as a shell script and then check if it is + # actually a Python script. + emit_func = bb.data.emit_func + if d.getVarFlag(task, 'python') == '1': + emit_func = bb.data.emit_func_python + + try: + with open(dumpfile, 'w') as f: + emit_func(task, f, d) + except Exception as e: + bb.fatal('%s: Cannot export %s: %s' % (pf, task, e)) +} + +# do_dumptasks 작업을 빌드 순서에 포함시키기 +addtask do_dumptasks after do_configure before do_compile + + +def get_cpe_ids(cve_vendor, cve_product, cve_version, pn, pv): + + #Get list of CPE identifiers for the given product and version + + if cve_vendor is None: + cve_vendor = "" + if cve_product is None: + cve_product = "" + if cve_version is None: + cve_version = "" + + version = cve_version.split("+git")[0] + + if cve_version.startswith("$"): + version = pv + + cpe_ids = [] + for product in cve_product.split(): + # CVE_PRODUCT in recipes may include vendor information for CPE identifiers. If not, + # use wildcard for vendor. + if ":" in product: + cve_vendor, product = product.split(":", 1) + + if product.startswith("$"): + product = pn + + if cve_vendor is None: + cve_vendor = "" + + cpe_id = f'cpe:2.3:a:{cve_vendor}:{product}:{version}:*:*:*:*:*:*:*' + cpe_ids.append(cpe_id) + + return cpe_ids + + + diff --git a/src/fosslight_yocto/_help.py b/src/fosslight_yocto/_help.py index de48f1546..190d0ed03 100644 --- a/src/fosslight_yocto/_help.py +++ b/src/fosslight_yocto/_help.py @@ -24,7 +24,7 @@ -n\t\t\t\t Print result in BIN(Yocto) format -s\t\t\t\t Analyze source code for unconfirmed Open Source -c\t\t\t\t Analyze all the source code - -e\t\t\t\t Compress all the source code + -e \t\t\t Top build output path with bom.json to compress all the source code -o \t\t\t Output Path -f \t\t\t Output file format (excel, csv, opossum) -pr\t\t\t\t Print all data of bom.json""" diff --git a/src/fosslight_yocto/_package_item.py b/src/fosslight_yocto/_package_item.py index d8180c164..a9ada8cf2 100755 --- a/src/fosslight_yocto/_package_item.py +++ b/src/fosslight_yocto/_package_item.py @@ -44,6 +44,9 @@ def __init__(self): self.pr = "" self._yocto_recipe = [] self._yocto_package = [] + self.source_done = "" # Save timestamp after source code fetch : Only for -e option + self.full_src_uri = "" # List all src uri links : Only for -e option + self.pf = "" # Package name + version value : Only for -e option def __eq__(self, value): return self.spdx_id == value @@ -297,6 +300,12 @@ def set_value_switch(oss, key, value, nested_pkg_name): oss.yocto_recipe = value elif key == 'additional_data': oss.additional_data = value + elif key == 'source_done': + oss.source_done = value + elif key == 'full_src_uri': + oss.full_src_uri = value + elif key == 'package_format': + oss.pf = value def update_package_name(oss, value, nested_pkg_name): diff --git a/src/fosslight_yocto/_zip_source_works.py b/src/fosslight_yocto/_zip_source_works.py index c9ccae96a..afe839054 100644 --- a/src/fosslight_yocto/_zip_source_works.py +++ b/src/fosslight_yocto/_zip_source_works.py @@ -25,6 +25,8 @@ ZIP_FILE_EXTENSION = ".zip" EXCLUDE_FILE_EXTENSION = ['socket'] +DUMP_DIR_PATH = "dumped_tasks" + def is_exclude_file(file_abs_path): excluded = False @@ -39,16 +41,119 @@ def is_exclude_file(file_abs_path): return excluded -def zip_module(orig_path, desc_name): +def join_source_path(build_output_path, bom_src_path): + if bom_src_path == '': + return '' + leaf_folder = os.path.basename(os.path.normpath(build_output_path)) + split_path = bom_src_path.split(leaf_folder) + if len(split_path) == 1: + return bom_src_path + join_path = os.path.join(build_output_path, split_path[1][1:]) + # join_path = join_path.replace('\\', '/') + return join_path + + +def check_valid_file_type(file_path, timestamp): + validation = True + if not os.path.isfile(file_path) or \ + os.path.islink(file_path) or \ + os.path.getsize(file_path) > 1024 * 1024 or \ + file_path.endswith('.cmd') or \ + file_path.endswith('.o'): + validation = False + + if validation: + creation_time = os.path.getmtime(file_path) + if creation_time > timestamp: + validation = False + + return validation + + +def get_dump_files(oss_key, dump_dir): + dump_file_list = os.listdir(dump_dir) + found_list = [] + + logger.debug(f'Check dump oss : {oss_key}') + + if oss_key == "": + return found_list + + if dump_file_list is None: + print("no dump info") + return found_list + + for dump in dump_file_list: + if dump.startswith(oss_key): + print("found dump file") + print(dump) + found_list.append(dump) + + return found_list + + +def zip_module(orig_path, desc_name, build_output_dir, timestamp, full_src_uri, pf): FAILED_MSG_PREFIX = "Failed: " + desc_name + " " + orig_path success = True failed_msg = [FAILED_MSG_PREFIX] desc_name = desc_name.strip() zip_name = desc_name + ZIP_FILE_EXTENSION + uri_path_list = [] + dumptasks_dir = os.path.join(build_output_dir, DUMP_DIR_PATH) + oss_dump_list = get_dump_files(pf, dumptasks_dir) + + uris = full_src_uri.split() + + for uri in uris: + if uri.startswith("file://"): + src_uri_file = uri.split("file://")[1] + uri_path = os.path.join(orig_path, src_uri_file) + uri_path = join_source_path(build_output_dir, uri_path) + logger.debug(f'uri full path : {uri_path}') + uri_path_list.append(uri_path) + + if len(uri_path_list) > 0: + uri_path = uri_path_list[0] + else: + uri_path = None + + orig_path = join_source_path(build_output_dir, orig_path) + + if os.path.islink(orig_path): + orig_path = os.path.realpath(orig_path) + orig_path = join_source_path(build_output_dir, orig_path) if desc_name == "": logger.debug("Recipe name is missing") + elif uri_path is not None and os.path.exists(uri_path) and os.path.isfile(uri_path): + + zip_object = zipfile.ZipFile(zip_name, "w", zipfile.ZIP_DEFLATED) + for uri_path in uri_path_list: + + try: + abs_src = os.path.abspath(orig_path) + abs_name = os.path.abspath(uri_path) + des_path = os.path.join(source_desc_folder, zip_name) + + relpath = os.path.relpath(abs_name, abs_src) + zip_object.write(abs_name, relpath) + except Exception as ex: + success = False + failed_msg.append(f'|--- {ex}') + + try: + for dump in oss_dump_list: + dump_orig_path = os.path.join(dumptasks_dir, dump) + zip_object.write(dump_orig_path, os.path.basename(dump_orig_path)) + + zip_object.close() + shutil.move(zip_name, des_path) + except Exception as ex: + success = False + failed_msg.append(f'|--- {ex}') + elif orig_path != "" and os.path.exists(orig_path): + abs_src = os.path.abspath(orig_path) des_path = os.path.join(source_desc_folder, zip_name) compress_file = [] @@ -59,6 +164,8 @@ def zip_module(orig_path, desc_name): abs_name = os.path.abspath(os.path.join(dir_name, filename)) if is_exclude_file(abs_name): continue + if not check_valid_file_type(abs_name, timestamp): + continue if os.path.islink(abs_name): abs_name = os.readlink(abs_name) if not os.path.isfile(abs_name): @@ -71,11 +178,16 @@ def zip_module(orig_path, desc_name): success = False failed_msg.append(f'|--- {ex}') try: + for dump in oss_dump_list: + dump_orig_path = os.path.join(dumptasks_dir, dump) + zip_object.write(dump_orig_path, os.path.basename(dump_orig_path)) + zip_object.close() shutil.move(zip_name, des_path) except Exception as ex: success = False failed_msg.append(f'|--- {ex}') + else: success = False failed_msg.append(f"|--- Can't find source path: {orig_path}") @@ -114,7 +226,7 @@ def zip_compressed_source(output_dir="", total_list=[]): logger.info(f"\n* Final compressed file: {final_zip_file}") -def collect_source(pkg_list: List[PackageItem], output_dir: str): +def collect_source(pkg_list: List[PackageItem], output_dir: str, build_output_dir: str): global source_desc_folder if output_dir == "": output_dir = os.getcwd() @@ -141,9 +253,14 @@ def collect_source(pkg_list: List[PackageItem], output_dir: str): src_uri = recipe_item.download_location base_path = recipe_item.file_path + full_uri = recipe_item.full_src_uri + pf = recipe_item.pf # zip downloaded source codes and located to package_zip folders total_list.append(recipe_name + ZIP_FILE_EXTENSION) - success, failed_msg = zip_module(recipe_item.src_path, recipe_name) + source_timestamp = recipe_item.source_done + zip_file_name = recipe_name + "_" + recipe_item.version + + success, failed_msg = zip_module(recipe_item.src_path, zip_file_name, build_output_dir, source_timestamp, full_uri, pf) if success: success_list.append(recipe_name) else: @@ -168,4 +285,4 @@ def collect_source(pkg_list: List[PackageItem], output_dir: str): write_txt_file(output_failed_txt, "\n".join(failed_list)) # zip package source codes - zip_compressed_source(output_dir, total_list) + # zip_compressed_source(output_dir, total_list) diff --git a/src/fosslight_yocto/create_oss_report_for_yocto.py b/src/fosslight_yocto/create_oss_report_for_yocto.py index 61bfcfbfd..69bea25ea 100755 --- a/src/fosslight_yocto/create_oss_report_for_yocto.py +++ b/src/fosslight_yocto/create_oss_report_for_yocto.py @@ -188,6 +188,12 @@ def read_bom_file(bom_file, buildhistory_latest_pkg_list): if len(path_list) > 0: oss_item['file_path'] = path_list[0] + # for 'e' option to compress fetched files. + oss_item['source_done'] = item.get('complete', "") + oss_item['full_src_uri'] = bom_src_uri + + oss_item['package_format'] = item.get('pf', "") + if bom_packages != "": packages = bom_packages.split() packages = list(set(packages)) @@ -983,7 +989,7 @@ def main(): _print_bin_android = False _analyze_source = False _analyze_source_all = False - _compress_source_all = False + _compress_source_all = "" output_path = os.getcwd() output_src_analysis_file = "source_analysis_report" file_format = "" @@ -1002,7 +1008,7 @@ def main(): parser.add_argument('-n', '--another', action='store_true', required=False) parser.add_argument('-s', '--source', action='store_true', required=False) parser.add_argument('-c', '--complete', action='store_true', required=False) - parser.add_argument('-e', '--compress', action='store_true', required=False) + parser.add_argument('-e', '--compress', type=str, required=False) parser.add_argument('-pr', '--printall', action='store_true', required=False) args = parser.parse_args() @@ -1035,7 +1041,7 @@ def main(): _analyze_source = True _analyze_source_all = True if args.compress: - _compress_source_all = True + _compress_source_all = args.compress if args.printall: printall = True @@ -1094,7 +1100,7 @@ def main(): if _compress_source_all: try: logger.info("* Enable zip option") - collect_source(installed_packages_src, output_path) + collect_source(installed_packages_src, output_path, _compress_source_all) except Exception as ex: logger.error(f"Collecting source code: {ex}")