Skip to content

Commit

Permalink
Updating -e option by adding new dump feature (#47)
Browse files Browse the repository at this point in the history
* update -e option function

* adding dump build configuration files by adding new bbclass

* update bom.bbclass

* fix error bom.json

* fix error in bom_for_e.bbclass

* Adapting review comments and fix build error

* restore bom.bbclass

* fix reviewed item and build error

* fix build error in log statement

* adding 'get' to set default value in pf
  • Loading branch information
heedu authored Feb 26, 2025
1 parent b262c36 commit 169e1f2
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 9 deletions.
149 changes: 149 additions & 0 deletions files_for_preparation/bom_for_e.bbclass
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# Copyright (c) 2020 LG Electronics, Inc.
# SPDX-License-Identifier: Apache-2.0
#
# This class adds write_bom_info and write_abi_xml_data,
# Each of them can be run by bitake --runall option.
# They are useful to verify build output specification.

do_write_bom_info[nostamp] = "1"
addtask write_bom_info
python do_write_bom_info() {
import json
import time
# We want one recipe per line, starting with arch and recipe keys,
# so that it's easy to sort and compare them
class BomJSONEncoder(json.JSONEncoder):
def iterencode(self, obj, _one_shot=True):
if isinstance(obj, dict):
output = []
if "arch" in obj.keys() and "recipe" in obj.keys():
output.append(json.dumps("arch") + ": " + self.encode(obj["arch"]))
output.append(json.dumps("recipe") + ": " + self.encode(obj["recipe"]))
for key, value in sorted(obj.items()):
if key == "arch" or key == "recipe":
continue
output.append(json.dumps(key) + ": " + self.encode(value))
return "{" + ",".join(output) + "}"
else:
return json.JSONEncoder().iterencode(obj, _one_shot)


jsondata = {}
jsondata["src_path"] = d.getVar("S", True)
jsondata["src_uri"] = d.getVar("SRC_URI", True)
jsondata["srcrev"] = "".join(d.getVar("SRCREV", True).split())
jsondata["recipe"] = d.getVar("PN", True)
jsondata["file"] = d.getVar("FILE", True)[len(d.getVar("TOPDIR", True)):]
jsondata["arch"] = d.getVar("PACKAGE_ARCH", True)
jsondata["author"] = d.getVar("AUTHOR", True)
license = d.getVar("LICENSE", True)
license_flags = d.getVar("LICENSE_FLAGS", True)
packages = d.getVar("PACKAGES", True)
jsondata["license"] = license
jsondata["license_flags"] = license_flags
jsondata["complete"] = int(time.time())
jsondata["packages"] = packages
pkg_lic = {}
if packages:
for pkg in packages.split():
lic = d.getVar("LICENSE_%s" % pkg, True)
if lic and lic != license:
pkg_lic[pkg] = lic
jsondata["pkg_lic"] = pkg_lic
jsondata["pe"] = d.getVar("PE", True)
jsondata["pv"] = d.getVar("PV", True)
jsondata["pr"] = d.getVar("PR", True)
jsondata["pf"] = d.getVar("PF", True)
jsondata["extendprauto"] = d.getVar("EXTENDPRAUTO", True)
jsondata["extendpkgv"] = d.getVar("EXTENDPKGV", True)
jsondata["description"] = d.getVar("DESCRIPTION", True)
jsondata["summary"] = d.getVar("SUMMARY", True)
jsondata["cve_check_whitelist "] = d.getVar("CVE_CHECK_WHITELIST", True)

cpe_ids = get_cpe_ids(d.getVar("CVE_VENDOR",""), d.getVar("CVE_PRODUCT",""), d.getVar("CVE_VERSION",""), jsondata["recipe"], jsondata["pv"])
jsondata["source_info"] = cpe_ids

datafile = os.path.join(d.getVar("TOPDIR", True), "bom.json")
lock = bb.utils.lockfile(datafile + '.lock')
with open(datafile, "a") as f:
json.dump(jsondata, f, sort_keys=True, cls=BomJSONEncoder)
f.write(',\n')
bb.utils.unlockfile(lock)
}


python do_dumptasks() {

#Dump BitBake tasks to ${TOPDIR}/dumped_tasks/${PF}.task_name.

import os
import bb

ar_outdir = os.path.join(d.getVar('TOPDIR', True), "dumped_tasks") # 기본값 설정
ar_dumptasks = ["do_configure", "do_compile"] # 기본값 설정
pf = d.getVar('PF', True)

bb.utils.mkdirhier(ar_outdir)

for task in ar_dumptasks:
# Do not export tasks that are set to do not run
if d.getVarFlag(task, 'noexec') == '1':
bb.warn('%s: skipping task %s: [noexec]' % (pf, task))
continue

dumpfile = os.path.join(ar_outdir, '%s.%s' % (pf, task))
bb.note('Dumping task %s into %s' % (task, dumpfile))

# We assume the task as a shell script and then check if it is
# actually a Python script.
emit_func = bb.data.emit_func
if d.getVarFlag(task, 'python') == '1':
emit_func = bb.data.emit_func_python

try:
with open(dumpfile, 'w') as f:
emit_func(task, f, d)
except Exception as e:
bb.fatal('%s: Cannot export %s: %s' % (pf, task, e))
}

# do_dumptasks 작업을 빌드 순서에 포함시키기
addtask do_dumptasks after do_configure before do_compile


def get_cpe_ids(cve_vendor, cve_product, cve_version, pn, pv):

#Get list of CPE identifiers for the given product and version

if cve_vendor is None:
cve_vendor = ""
if cve_product is None:
cve_product = ""
if cve_version is None:
cve_version = ""

version = cve_version.split("+git")[0]

if cve_version.startswith("$"):
version = pv

cpe_ids = []
for product in cve_product.split():
# CVE_PRODUCT in recipes may include vendor information for CPE identifiers. If not,
# use wildcard for vendor.
if ":" in product:
cve_vendor, product = product.split(":", 1)

if product.startswith("$"):
product = pn

if cve_vendor is None:
cve_vendor = ""

cpe_id = f'cpe:2.3:a:{cve_vendor}:{product}:{version}:*:*:*:*:*:*:*'
cpe_ids.append(cpe_id)

return cpe_ids



2 changes: 1 addition & 1 deletion src/fosslight_yocto/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
-n\t\t\t\t Print result in BIN(Yocto) format
-s\t\t\t\t Analyze source code for unconfirmed Open Source
-c\t\t\t\t Analyze all the source code
-e\t\t\t\t Compress all the source code
-e <path>\t\t\t Top build output path with bom.json to compress all the source code
-o <path>\t\t\t Output Path
-f <format>\t\t\t Output file format (excel, csv, opossum)
-pr\t\t\t\t Print all data of bom.json"""
Expand Down
9 changes: 9 additions & 0 deletions src/fosslight_yocto/_package_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ def __init__(self):
self.pr = ""
self._yocto_recipe = []
self._yocto_package = []
self.source_done = "" # Save timestamp after source code fetch : Only for -e option
self.full_src_uri = "" # List all src uri links : Only for -e option
self.pf = "" # Package name + version value : Only for -e option

def __eq__(self, value):
return self.spdx_id == value
Expand Down Expand Up @@ -297,6 +300,12 @@ def set_value_switch(oss, key, value, nested_pkg_name):
oss.yocto_recipe = value
elif key == 'additional_data':
oss.additional_data = value
elif key == 'source_done':
oss.source_done = value
elif key == 'full_src_uri':
oss.full_src_uri = value
elif key == 'package_format':
oss.pf = value


def update_package_name(oss, value, nested_pkg_name):
Expand Down
125 changes: 121 additions & 4 deletions src/fosslight_yocto/_zip_source_works.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
ZIP_FILE_EXTENSION = ".zip"
EXCLUDE_FILE_EXTENSION = ['socket']

DUMP_DIR_PATH = "dumped_tasks"


def is_exclude_file(file_abs_path):
excluded = False
Expand All @@ -39,16 +41,119 @@ def is_exclude_file(file_abs_path):
return excluded


def zip_module(orig_path, desc_name):
def join_source_path(build_output_path, bom_src_path):
if bom_src_path == '':
return ''
leaf_folder = os.path.basename(os.path.normpath(build_output_path))
split_path = bom_src_path.split(leaf_folder)
if len(split_path) == 1:
return bom_src_path
join_path = os.path.join(build_output_path, split_path[1][1:])
# join_path = join_path.replace('\\', '/')
return join_path


def check_valid_file_type(file_path, timestamp):
validation = True
if not os.path.isfile(file_path) or \
os.path.islink(file_path) or \
os.path.getsize(file_path) > 1024 * 1024 or \
file_path.endswith('.cmd') or \
file_path.endswith('.o'):
validation = False

if validation:
creation_time = os.path.getmtime(file_path)
if creation_time > timestamp:
validation = False

return validation


def get_dump_files(oss_key, dump_dir):
dump_file_list = os.listdir(dump_dir)
found_list = []

logger.debug(f'Check dump oss : {oss_key}')

if oss_key == "":
return found_list

if dump_file_list is None:
print("no dump info")
return found_list

for dump in dump_file_list:
if dump.startswith(oss_key):
print("found dump file")
print(dump)
found_list.append(dump)

return found_list


def zip_module(orig_path, desc_name, build_output_dir, timestamp, full_src_uri, pf):
FAILED_MSG_PREFIX = "Failed: " + desc_name + " " + orig_path
success = True
failed_msg = [FAILED_MSG_PREFIX]
desc_name = desc_name.strip()
zip_name = desc_name + ZIP_FILE_EXTENSION
uri_path_list = []
dumptasks_dir = os.path.join(build_output_dir, DUMP_DIR_PATH)
oss_dump_list = get_dump_files(pf, dumptasks_dir)

uris = full_src_uri.split()

for uri in uris:
if uri.startswith("file://"):
src_uri_file = uri.split("file://")[1]
uri_path = os.path.join(orig_path, src_uri_file)
uri_path = join_source_path(build_output_dir, uri_path)
logger.debug(f'uri full path : {uri_path}')
uri_path_list.append(uri_path)

if len(uri_path_list) > 0:
uri_path = uri_path_list[0]
else:
uri_path = None

orig_path = join_source_path(build_output_dir, orig_path)

if os.path.islink(orig_path):
orig_path = os.path.realpath(orig_path)
orig_path = join_source_path(build_output_dir, orig_path)

if desc_name == "":
logger.debug("Recipe name is missing")
elif uri_path is not None and os.path.exists(uri_path) and os.path.isfile(uri_path):

zip_object = zipfile.ZipFile(zip_name, "w", zipfile.ZIP_DEFLATED)
for uri_path in uri_path_list:

try:
abs_src = os.path.abspath(orig_path)
abs_name = os.path.abspath(uri_path)
des_path = os.path.join(source_desc_folder, zip_name)

relpath = os.path.relpath(abs_name, abs_src)
zip_object.write(abs_name, relpath)
except Exception as ex:
success = False
failed_msg.append(f'|--- {ex}')

try:
for dump in oss_dump_list:
dump_orig_path = os.path.join(dumptasks_dir, dump)
zip_object.write(dump_orig_path, os.path.basename(dump_orig_path))

zip_object.close()
shutil.move(zip_name, des_path)
except Exception as ex:
success = False
failed_msg.append(f'|--- {ex}')

elif orig_path != "" and os.path.exists(orig_path):

abs_src = os.path.abspath(orig_path)
des_path = os.path.join(source_desc_folder, zip_name)
compress_file = []
Expand All @@ -59,6 +164,8 @@ def zip_module(orig_path, desc_name):
abs_name = os.path.abspath(os.path.join(dir_name, filename))
if is_exclude_file(abs_name):
continue
if not check_valid_file_type(abs_name, timestamp):
continue
if os.path.islink(abs_name):
abs_name = os.readlink(abs_name)
if not os.path.isfile(abs_name):
Expand All @@ -71,11 +178,16 @@ def zip_module(orig_path, desc_name):
success = False
failed_msg.append(f'|--- {ex}')
try:
for dump in oss_dump_list:
dump_orig_path = os.path.join(dumptasks_dir, dump)
zip_object.write(dump_orig_path, os.path.basename(dump_orig_path))

zip_object.close()
shutil.move(zip_name, des_path)
except Exception as ex:
success = False
failed_msg.append(f'|--- {ex}')

else:
success = False
failed_msg.append(f"|--- Can't find source path: {orig_path}")
Expand Down Expand Up @@ -114,7 +226,7 @@ def zip_compressed_source(output_dir="", total_list=[]):
logger.info(f"\n* Final compressed file: {final_zip_file}")


def collect_source(pkg_list: List[PackageItem], output_dir: str):
def collect_source(pkg_list: List[PackageItem], output_dir: str, build_output_dir: str):
global source_desc_folder
if output_dir == "":
output_dir = os.getcwd()
Expand All @@ -141,9 +253,14 @@ def collect_source(pkg_list: List[PackageItem], output_dir: str):
src_uri = recipe_item.download_location
base_path = recipe_item.file_path

full_uri = recipe_item.full_src_uri
pf = recipe_item.pf
# zip downloaded source codes and located to package_zip folders
total_list.append(recipe_name + ZIP_FILE_EXTENSION)
success, failed_msg = zip_module(recipe_item.src_path, recipe_name)
source_timestamp = recipe_item.source_done
zip_file_name = recipe_name + "_" + recipe_item.version

success, failed_msg = zip_module(recipe_item.src_path, zip_file_name, build_output_dir, source_timestamp, full_uri, pf)
if success:
success_list.append(recipe_name)
else:
Expand All @@ -168,4 +285,4 @@ def collect_source(pkg_list: List[PackageItem], output_dir: str):
write_txt_file(output_failed_txt, "\n".join(failed_list))

# zip package source codes
zip_compressed_source(output_dir, total_list)
# zip_compressed_source(output_dir, total_list)
Loading

0 comments on commit 169e1f2

Please sign in to comment.