Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Maximilian Huber <maximilian.huber@tngtech.com>
  • Loading branch information
maxhbr committed Dec 2, 2023
1 parent 911d91d commit 8a5c5e2
Showing 1 changed file with 123 additions and 222 deletions.
345 changes: 123 additions & 222 deletions src/spdx_tools/spdx3/parser/json_ld/json_ld_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
# SPDX-License-IdentifiedNode: Apache-2.0
import json
import os
from rdflib import Graph, IdentifiedNode, Literal, IdentifiedNode, URIRef, BNode
from rdflib import Graph, IdentifiedNode, Literal, IdentifiedNode, URIRef, BNode, Variable
from rdflib.namespace import DC, DCTERMS, DOAP, FOAF, SKOS, OWL, RDF, RDFS, VOID, XMLNS, XSD
from rdflib.term import bind
from rdflib.term import Node, Identifier, bind
from semantic_version import Version

from spdx_tools.spdx3.payload import Payload
Expand All @@ -27,6 +27,8 @@
from spdx_tools.spdx3.model.software.file import File
from spdx_tools.spdx3.model.software.package import Package

def camel_to_snake(s):
return ''.join(['_'+c.lower() if c.isupper() else c for c in s]).lstrip('_')

def parse_from_file(file_name: str, encoding: str = "utf-8") -> Payload:
return JsonLDParser().parseFile(file_name, encoding)
Expand All @@ -35,6 +37,29 @@ def parse_from_string(file_name: str) -> Payload:
return JsonLDParser().parseString(file_name)

class GraphToElementConverter:
type_to_constructor = {
"https://spdx.org/rdf/v3/Core/CreationInfo": CreationInfo,
"https://spdx.org/rdf/v3/Core/Hash": Hash,
"https://spdx.org/rdf/v3/Core/ExternalIdentifier": ExternalIdentifier,
"https://spdx.org/rdf/v3/Core/ExternalReference": ExternalReference,
"https://spdx.org/rdf/v3/Core/ExternalMap": ExternalMap,
"https://spdx.org/rdf/v3/Core/SpdxDocument": SpdxDocument,
"https://spdx.org/rdf/v3/Software/Package": Package,
"https://spdx.org/rdf/v3/Software/File": File,
"https://spdx.org/rdf/v3/Core/Relationship": Relationship,
"https://spdx.org/rdf/v3/Core/Person": Relationship,
}
non_element_types = [
"https://spdx.org/rdf/v3/Core/CreationInfo",
"https://spdx.org/rdf/v3/Core/Hash",
"https://spdx.org/rdf/v3/Core/ExternalIdentifier",
"https://spdx.org/rdf/v3/Core/ExternalReference",
"https://spdx.org/rdf/v3/Core/ExternalMap",
]

def get_element_types(self) -> list[str]:
return [type for type in self.type_to_constructor.keys() if type not in self.non_element_types]

def __init__(self, graph: Graph, debug: bool = True):
self.graph = graph
self.namespace_manager = graph.namespace_manager
Expand Down Expand Up @@ -71,7 +96,7 @@ def __debug_log_subject__(self, subject: IdentifiedNode):
####################################################################################################
# low level functions

def genSpdxURIRef(self, name: str, namespace: str = "Core"):
def genSpdxURIRef(self, name: str, namespace: str = "Core") -> URIRef:
return URIRef(f"https://spdx.org/rdf/v3/{namespace}/{name}")

def getGraphValueRaw(self, subject: IdentifiedNode, predicate: IdentifiedNode, isMandatory: bool = False) -> str:
Expand Down Expand Up @@ -113,237 +138,113 @@ def getTypeOfSubject(self, subject: IdentifiedNode) -> str:
predicate_for_type = RDF.type
return self.getGraphValue(subject, predicate_for_type)

####################################################################################################
# higher level functions

def getCreationInfo(self, subject: IdentifiedNode) -> CreationInfo:
self.__debug_log_subject__(subject)
return CreationInfo(
spec_version=self.getGraphSpdxValueAsVersion(subject, "Core", "specVersion"),
created=self.getGraphSpdxValueAsDatetime(subject, "Core", "created"),
created_by=self.getGraphSpdxValues(subject, "Core", "createdBy"),
profile=[], # TODO
data_license=self.getGraphSpdxValue(subject, "Core", "dataLicense"),
# created_using: List[str] = None,
# comment: Optional[str] = None,
)

def getCreationInfoOfSubject(self, subject: IdentifiedNode) -> CreationInfo:
subject_of_creation_info = self.getGraphValueRaw(subject, self.genSpdxURIRef(namespace="Core", name="creationInfo"))
return self.getCreationInfo(BNode(subject_of_creation_info))
def predicateToPythonArgsKey(self, predicate: IdentifiedNode) -> str:
return camel_to_snake(predicate.split("/")[-1])

def getHash(self, subject: IdentifiedNode) -> Hash:
return Hash(
algorithm = HashAlgorithm(self.getGraphSpdxValue(subject, "Core", "algorithm", isMandatory=True)),
hash_value = self.getGraphSpdxValue(subject, "Core", "hashValue", isMandatory=True),
comment = self.getGrahSpdxValue(subject, "Core", "comment")
)

def getIntegrityMethod(self, subject: IdentifiedNode) -> IntegrityMethod:
type_of_subject = self.getTypeOfSubject(subject)
if type_of_subject == "https://spdx.org/rdf/v3/Core/Hash":
return self.getHash(subject)
elif type_of_subject == "https://spdx.org/rdf/v3/Core/ExternalIdentifier":
return self.getExternalIdentifier(subject)
def literalToPython(self, literal: Literal) -> any:
datatype = literal.datatype

# match object.datatype:
# case XSD.string:
# value = object.toPython()
# case self.genSpdxURIRef("SemVer"):
# value = Version(object.toPython())
# # case XSD.boolean:
# # value = object.toPython()
# # case XSD.dateTime:
# # value = object.toPython()
# # case XSD.date:
# # value = object.toPython()
# # case XSD.time:
# # value = object.toPython()
# case _:
# self.__debug_log_subject__(subject)
# raise Exception(f"{self.n3(object)} has unsupported datatype={object.datatype}")
if datatype == XSD.string:
return literal.toPython()
elif datatype == self.genSpdxURIRef("SemVer"):
return Version(literal.toPython())
elif datatype == self.genSpdxURIRef("DateTime"):
return datetime_from_str(literal.toPython())
else:
raise Exception(f"unsupported type_of_subject={type_of_subject}")

def getVerifiedUsings(self, subject: IdentifiedNode) -> list[IntegrityMethod]:
verified_using_subjects = self.getGraphSpdxValues(subject, "Core", "verifiedUsing")
verified_using = []
for verified_using_subject in verified_using_subjects:
verified_using.append(self.getIntegrityMethod(verified_using_subject))
return verified_using

def getExternalReference(self, subject: IdentifiedNode) -> ExternalReference:
return ExternalReference(
external_reference_type = ExternalReferenceType(self.getGraphSpdxValue(subject, "Core", "externalReferenceType")),
locator = self.getGraphSpdxValues(subject, "Core", "locator"),
content_type = self.getGraphSpdxValue(subject, "Core", "contentType"),
comment = self.getGraphSpdxValue(subject, "Core", "comment"),
)

def getExternalReferences(self, subject: IdentifiedNode) -> list[ExternalReference]:
external_reference_subjects = self.getGraphSpdxValues(subject, "Core", "externalReference")
external_references = []
for external_reference_subject in external_reference_subjects:
external_references.append(self.getExternalReference(external_reference_subject))
return external_references

def getExternalIdentifier(self, subject: IdentifiedNode) -> ExternalIdentifier:
return ExternalIdentifier(
external_identifier_type = ExternalIdentifierType(self.getGraphSpdxValue(subject, "Core", "externalIdentifierType", isMandatory=True)),
identifier = self.getGraphSpdxValue(subject, "Core", "identifier", isMandatory=True),
comment = self.getGraphSpdxValue(subject, "Core", "comment"),
identifier_locator = self.getGraphSpdxValue(subject, "Core", "identifierLocator"),
issuing_authority = self.getGraphSpdxValue(subject, "Core", "issuingAuthority"),
)

def getExternalIdentifiers(self, subject: IdentifiedNode) -> list[ExternalIdentifier]:
external_identifier_subjects = self.getGraphSpdxValues(subject, "Core", "externalIdentifier")
external_identifiers = []
for external_identifier_subject in external_identifier_subjects:
external_identifiers.append(self.getExternalIdentifier(external_identifier_subject))
return external_identifiers

def getExternalMap(self, subject: IdentifiedNode) -> ExternalMap:
verified_using = self.getVerifiedUsing(subject)
return ExternalMap(
external_id = self.getGraphSpdxValue(subject, "Core", "externalId", isMandatory=True),
verified_using = verified_using,
location_hint = self.getGraphSpdxValue(subject, "Core", "locationHint"),
defining_document = self.getGraphSpdxValue(subject, "Core", "definingDocument"),
)

def getImports(self, subject: IdentifiedNode) -> list[ExternalMap]:
import_subjects = self.getGraphSpdxValues(subject, "Core", "import")
imports = []
for import_subject in import_subjects:
imports.append(self.getExternalMap(import_subject))
return imports
self.__debug_log_subject__(subject)
raise Exception(f"{self.n3(literal)} has unsupported datatype={literal.datatype}")

####################################################################################################
# element handlers

def getElementArgs(self, subject: IdentifiedNode) -> dict:
verified_using = self.getVerifiedUsings(subject)
external_reference = self.getExternalReferences(subject)
external_identifier = self.getExternalIdentifiers(subject)
return dict(
spdx_id = subject.toPython(),
name = self.getGraphSpdxValue(subject, "Core", "name"),
element = self.getGraphSpdxValues(subject, "Core", "element"),
root_element = self.getGraphSpdxValues(subject, "Core", "rootElement"),
creation_info = self.getCreationInfoOfSubject(subject),
summary = self.getGraphSpdxValue(subject, "Core", "summary"),
description = self.getGraphSpdxValue(subject, "Core", "description"),
comment = self.getGraphSpdxValue(subject, "Core", "comment"),
verified_using = verified_using,
external_reference = external_reference, #: List[ExternalReference] : None,
external_identifier = external_identifier, #: List[ExternalIdentifier] : None,
# extension: Optional[str] : None,
)

def getCollectionArgs(self, subject: IdentifiedNode) -> dict:
args = self.getElementArgs(subject)
args["imports"] =self.getImports(subject)
return args


def handleSubjectOfTypeSpdxDocument(self, subject: IdentifiedNode) -> Element:
args = self.getCollectionArgs(subject)
args["context"] = self.getGraphSpdxValue(subject, "Core", "context")
return SpdxDocument(**args)

def handleSubjectOfTypePackage(self, subject: IdentifiedNode) -> Element:
return Package(
spdx_id=subject.toPython(),
name=self.getGraphSpdxValue(subject, "Core", "name"),
creation_info=self.getCreationInfoOfSubject(subject),
# summary: Optional[str] = None,
# description: Optional[str] = None,
# comment: Optional[str] = None,
# verified_using: List[IntegrityMethod] = None,
# external_reference: List[ExternalReference] = None,
# external_identifier: List[ExternalIdentifier] = None,
# extension: Optional[str] = None,
# originated_by: List[str] = None,
# supplied_by: List[str] = None,
# built_time: Optional[datetime] = None,
# release_time: Optional[datetime] = None,
# valid_until_time: Optional[datetime] = None,
# standard: List[str] = None,
# content_identifier: Optional[str] = None,
# primary_purpose: Optional[SoftwarePurpose] = None,
# additional_purpose: List[SoftwarePurpose] = None,
# concluded_license: Optional[LicenseField] = None,
# declared_license: Optional[LicenseField] = None,
# copyright_text: Optional[str] = None,
# attribution_text: Optional[str] = None,
# package_version: Optional[str] = None,
# download_location: Optional[str] = None,
# package_url: Optional[str] = None,
# homepage: Optional[str] = None,
# source_info: Optional[str] = None,
)

def handleSubjectOfTypeFile(self, subject: IdentifiedNode) -> Element:
return File(
spdx_id = subject.toPython(),
name = self.getGraphSpdxValue(subject, "Core","name"),
creation_info = self.getCreationInfoOfSubject(subject),
# summary: Optional[str] = None,
# description: Optional[str] = None,
# comment: Optional[str] = None,
# verified_using: List[IntegrityMethod] = None,
# external_reference: List[ExternalReference] = None,
# external_identifier: List[ExternalIdentifier] = None,
# extension: Optional[str] = None,
# originated_by: List[str] = None,
# supplied_by: List[str] = None,
# built_time: Optional[datetime] = None,
# release_time: Optional[datetime] = None,
# valid_until_time: Optional[datetime] = None,
# standard: List[str] = None,
# content_identifier: Optional[str] = None,
# primary_purpose: Optional[SoftwarePurpose] = None,
# additional_purpose: List[SoftwarePurpose] = None,
# concluded_license: Optional[LicenseField] = None,
# declared_license: Optional[LicenseField] = None,
# copyright_text: Optional[str] = None,
# attribution_text: Optional[str] = None,
# content_type: Optional[str] = None,
)

def handleSubjectOfTypeRelationship(self, subject: IdentifiedNode) -> Element:
relationship_type_uri = self.getGraphSpdxValue(subject, "Core", "relationshipType")
relationship_type = relationship_type_uri.split("/")[-1]
relationship_type = RelationshipType[relationship_type.upper()]

return Relationship(
spdx_id=subject.toPython(),
from_element=self.getGraphSpdxValue(subject, "Core", "from"),
relationship_type=relationship_type,
to=self.getGraphSpdxValues(subject, "Core", "to")
)

def getSubjectAsElement(self, subject: IdentifiedNode) -> Element:
def objectToPython(self, object: Identifier) -> any:
if object == None:
return None
elif type(object) == None:
raise Exception(f"{self.n3(object)} of has unsupported type=Enum?")
elif isinstance(object, Literal):
value = self.literalToPython(object)
return value
elif isinstance(object, URIRef):
print("DEBUG: URIRef:", object)
return self.getSubjectAsPythonObject(object)
elif isinstance(object, BNode):
print("DEBUG: BNode:", object)
return self.getSubjectAsPythonObject(object)
elif isinstance(object, IdentifiedNode):
print("DEBUG: IdentifiedNode:", object)
return self.getSubjectAsPythonObject(object)
elif isinstance(object, Variable):
raise Exception(f"{self.n3(object)} of has unsupported type=Variable")
elif isinstance(object, Identifier):
raise Exception(f"{self.n3(object)} of has unsupported type=Identifier")
elif isinstance(object, Node):
raise Exception(f"{self.n3(object)} of has unsupported type=Node")
else:
raise Exception(f"{self.n3(object)} of has unsupported type={type(object)}")

def getArgsForSubject(self, subject: IdentifiedNode) -> dict:
args = dict()
for predicate, object in self.graph.predicate_objects(subject=subject, unique=False):
if predicate == RDF.type:
continue
print("DEBUG: parse:", predicate, object)

key = self.predicateToPythonArgsKey(predicate)
value = self.objectToPython(object)
print(f"DEBUG: {predicate} -> {key} && {self.n3(object)} -> {value}")
args[key] = value

return args

def applyArgsToConstructor(self, subject: IdentifiedNode, constructor, hasSpdxId: bool = False):
self.__debug_log_subject__(subject)
args = self.getArgsForSubject(subject)
if hasSpdxId:
args["spdx_id"] = subject.toPython()
print("DEBUG: args:", args)
return constructor(**args)

def getSubjectAsPythonObject(self, subject: IdentifiedNode) -> any:
type_of_subject = self.getTypeOfSubject(subject)
if type_of_subject is None:
raise Exception(f"{subject} is not present as an object in the graph")
if type_of_subject in self.type_to_constructor:
hasSpdxId = type_of_subject not in self.non_element_types
return self.applyArgsToConstructor(subject, self.type_to_constructor[type_of_subject], hasSpdxId=hasSpdxId)
else:
self.__debug_log_subject__(subject)
raise Exception(f"{subject} has unsupported constructor type={type_of_subject}")

non_element_types = [
"https://spdx.org/rdf/v3/Core/CreationInfo",
"https://spdx.org/rdf/v3/Core/Hash",
"https://spdx.org/rdf/v3/Core/ExternalIdentifier",
"https://spdx.org/rdf/v3/Core/ExternalReference",
"https://spdx.org/rdf/v3/Core/ExternalMap",
]
if type_of_subject in non_element_types:
return None
def get_subjects(self, only_elements: bool = True) -> list[IdentifiedNode]:
subjects = list(self.graph.subjects(unique=True))
if only_elements:
return [subject for subject in subjects if self.getTypeOfSubject(subject) not in self.element_types()]
else:
return subjects

match type_of_subject:
case "https://spdx.org/rdf/v3/Core/SpdxDocument":
return self.handleSubjectOfTypeSpdxDocument(subject)
case "https://spdx.org/rdf/v3/Software/Package":
return self.handleSubjectOfTypePackage(subject)
case "https://spdx.org/rdf/v3/Software/File":
return self.handleSubjectOfTypeFile(subject)
case "https://spdx.org/rdf/v3/Core/Relationship":
return self.handleSubjectOfTypeRelationship(subject)
case _:
self.__debug_log_subject__(subject)
raise Exception(f"{subject} has unsupported type={type_of_subject}")

def get_subjects(self) -> list[IdentifiedNode]:
return list(self.graph.subjects(unique=True))
def get_elements(self) -> list[Element]:
return [self.getSubjectAsPythonObject(subject) for subject in self.get_subjects(only_elements=True)]

class JsonLDParser:
def graphToPayload(self, graph: Graph) -> Payload:
converter = GraphToElementConverter(graph)
payload = Payload()

for subject in converter.get_subjects():
element = converter.getSubjectAsElement(subject)
converter = GraphToElementConverter(graph, True)
for element in converter.get_elements():
if element is not None:
payload.add_element(element)

Expand Down

0 comments on commit 8a5c5e2

Please sign in to comment.