Skip to content

Commit

Permalink
Fix remaining UCO validation issues: Remove invalid facets, fix hash …
Browse files Browse the repository at this point in the history
…properties, ensure proper object types
  • Loading branch information
vulnmaster committed Jan 7, 2025
1 parent 3e4d666 commit cc83bc2
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 306 deletions.
281 changes: 112 additions & 169 deletions nsrl_to_uco.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,126 +353,70 @@ def process_file(self, input_file: Path) -> Optional[Dict]:
return None

current_time = get_current_time()
objects = [] # List to store all objects

# Create identifiers with UUIDs
# Create bundle
bundle_id = self.create_identifier("bundle", "nsrl-caid")
tool_id = self.create_identifier("tool", "nsrl-to-uco")
nist_id = self.create_identifier("org", "nist")
source_id = self.create_identifier("source", "nsrl-caid")
action_id = self.create_identifier("action", "conversion")

# Create the base objects
objects = [
{
"@id": bundle_id,
"@type": "uco-core:Bundle",
"uco-core:description": "NSRL CAID media file reference data converted to UCO format",
"uco-core:objectCreatedTime": {
"@type": "xsd:dateTime",
"@value": current_time
},
"uco-core:object": [] # Will be populated with all other objects
},
{
"@id": nist_id,
"@type": "uco-identity:Organization",
"uco-core:name": "National Institute of Standards and Technology",
"uco-core:description": "NIST maintains the NSRL CAID repository"
},
{
"@id": source_id,
"@type": "uco-observable:URL",
"uco-core:name": "NSRL CAID Repository",
"uco-core:description": "National Software Reference Library - Comprehensive Application Identifier",
"uco-observable:value": "https://s3.amazonaws.com/rds.nsrl.nist.gov/RDS/CAID/current/NSRL-CAID-JSONs.zip"
bundle = {
"@id": bundle_id,
"@type": "uco-core:Bundle",
"uco-core:description": "NSRL CAID media file reference data converted to UCO format",
"uco-core:objectCreatedTime": {
"@type": "xsd:dateTime",
"@value": current_time
},
{
"@id": tool_id,
"@type": "uco-tool:ConfiguredTool",
"uco-core:name": "nsrl_to_uco.py",
"uco-core:description": "Tool to convert NSRL CAID JSON to UCO format",
"uco-core:objectCreatedTime": {
"@type": "xsd:dateTime",
"@value": current_time
}
"uco-core:object": [] # Will be populated later
}
objects.append(bundle)

# Create tool object
tool_id = self.create_identifier("tool", "nsrl-to-uco")
tool = {
"@id": tool_id,
"@type": "uco-tool:ConfiguredTool",
"uco-core:name": "nsrl_to_uco.py",
"uco-core:description": "Tool to convert NSRL CAID JSON to UCO format",
"uco-core:objectCreatedTime": {
"@type": "xsd:dateTime",
"@value": current_time
}
]
}
objects.append(tool)

# Create environment object with proper IRI
env_id = self.create_identifier("environment", "python")
env_facet_id = self.create_identifier("facet", "python-env")

# Create environment facet
env_facet = {
"@id": env_facet_id,
"@type": "uco-observable:EnvironmentFacet",
"uco-core:name": "Python Environment",
"uco-core:description": f"Python {sys.version}"
# Create organization object
org_id = self.create_identifier("org", "nist")
org = {
"@id": org_id,
"@type": "uco-identity:Organization",
"uco-core:name": "National Institute of Standards and Technology",
"uco-core:description": "NIST maintains the NSRL CAID repository"
}

objects.append(org)

# Create source object
source_id = self.create_identifier("source", "nsrl-caid")
source = {
"@id": source_id,
"@type": "uco-observable:URL",
"uco-core:name": "NSRL CAID Repository",
"uco-core:description": "National Software Reference Library - Comprehensive Application Identifier",
"uco-observable:value": "https://s3.amazonaws.com/rds.nsrl.nist.gov/RDS/CAID/current/NSRL-CAID-JSONs.zip"
}
objects.append(source)

# Create environment object
env_obj = {
env_id = self.create_identifier("environment", "python")
env = {
"@id": env_id,
"@type": "uco-observable:ObservableObject",
"uco-core:name": "Python Environment",
"uco-core:description": f"Python {sys.version}",
"uco-core:objectCreatedTime": {
"@type": "xsd:dateTime",
"@value": current_time
},
"uco-core:hasFacet": [{"@id": env_facet_id}]
}
objects.extend([env_facet, env_obj])

# Create action object with proper environment reference
action_obj = {
"@id": action_id,
"@type": "uco-action:Action",
"uco-core:name": "NSRL CAID to UCO Conversion",
"uco-core:description": "Conversion of NSRL CAID data to UCO format",
"uco-core:objectCreatedTime": {
"@type": "xsd:dateTime",
"@value": current_time
},
"uco-action:startTime": {
"@type": "xsd:dateTime",
"@value": current_time
},
"uco-action:endTime": {
"@type": "xsd:dateTime",
"@value": current_time
},
"uco-action:performer": {"@id": tool_id},
"uco-action:environment": {"@id": env_id}
}
objects.append(action_obj)

# Add relationships
objects.extend([
{
"@id": self.create_identifier("relationship", "source-maintainer"),
"@type": "uco-core:Relationship",
"uco-core:source": {"@id": source_id},
"uco-core:target": {"@id": nist_id},
"uco-core:kindOfRelationship": "maintainedBy",
"uco-core:isDirectional": True,
"uco-core:objectCreatedTime": {
"@type": "xsd:dateTime",
"@value": current_time
}
},
{
"@id": self.create_identifier("relationship", "action-source"),
"@type": "uco-core:Relationship",
"uco-core:source": {"@id": action_id},
"uco-core:target": {"@id": source_id},
"uco-core:kindOfRelationship": "inputSource",
"uco-core:isDirectional": True,
"uco-core:objectCreatedTime": {
"@type": "xsd:dateTime",
"@value": current_time
}
}
])
}
objects.append(env)

# Process each media item
for media in media_list:
Expand All @@ -486,84 +430,83 @@ def process_file(self, input_file: Path) -> Optional[Dict]:
"uco-core:objectCreatedTime": {
"@type": "xsd:dateTime",
"@value": current_time
},
"uco-core:hasFacet": []
}
}

# Add file facet for each media file
# Process each media file
for media_file in media.get("MediaFiles", []):
facet_id = self.create_identifier("facet", f"{media_id}-{media_file.get('MD5', 'unknown')}")
hash_id = self.create_identifier("hash", media_file.get("MD5", "unknown"))
hash_facet_id = self.create_identifier("facet", f"hash-{media_file.get('MD5', 'unknown')}")

# Create hash object
hash_obj = {
"@id": hash_id,
"@type": "uco-types:Hash",
"uco-core:objectCreatedTime": {
"@type": "xsd:dateTime",
"@value": current_time
},
"uco-core:hasFacet": [{
"@id": hash_facet_id
}]
}
# Create hash objects
hash_objects = []

# Create hash facet
hash_facet = {
"@id": hash_facet_id,
"@type": "uco-types:HashFacet",
"uco-types:hashMethod": {
"@type": "uco-vocabulary:HashNameVocab",
"@value": "MD5"
},
"uco-types:hashValue": {
"@type": "xsd:hexBinary",
"@value": media_file.get("MD5", "").upper()
if "MD5" in media_file:
hash_id = self.create_identifier("hash", media_file["MD5"])
hash_obj = {
"@id": hash_id,
"@type": "uco-types:Hash",
"uco-types:hashMethod": {
"@type": "uco-vocabulary:HashNameVocab",
"@value": "MD5"
},
"uco-types:hashValue": {
"@type": "xsd:hexBinary",
"@value": media_file["MD5"].upper()
}
}
}

hash_objects.append(hash_obj)
objects.append(hash_obj)

if "SHA1" in media:
hash_id = self.create_identifier("hash", media["SHA1"])
hash_obj = {
"@id": hash_id,
"@type": "uco-types:Hash",
"uco-types:hashMethod": {
"@type": "uco-vocabulary:HashNameVocab",
"@value": "SHA1"
},
"uco-types:hashValue": {
"@type": "xsd:hexBinary",
"@value": media["SHA1"].upper()
}
}
hash_objects.append(hash_obj)
objects.append(hash_obj)

# Create file facet
facet_id = self.create_identifier("facet", f"{media_id}-{media_file.get('MD5', 'unknown')}")
file_facet = {
"@id": facet_id,
"@type": "uco-observable:FileFacet",
"uco-observable:fileName": media_file.get("FileName", ""),
"uco-observable:filePath": media_file.get("FilePath", ""),
"uco-observable:hash": [{"@id": hash_id}]
"uco-observable:hash": [{"@id": h["@id"]} for h in hash_objects],
"uco-observable:isDirectory": False
}

# Add reference to facet in file object
file_obj["uco-core:hasFacet"].append({"@id": facet_id})

# Add facet and hash objects to graph
objects.extend([file_facet, hash_facet, hash_obj])

# Add relationship between action and file
objects.append({
"@id": self.create_identifier("relationship", f"action-file-{media_id}"),
"@type": "uco-core:Relationship",
"uco-core:source": {"@id": action_id},
"uco-core:target": {"@id": file_id},
"uco-core:kindOfRelationship": "outputFile",
"uco-core:isDirectional": True,
"uco-core:objectCreatedTime": {
"@type": "xsd:dateTime",
"@value": current_time
}
})

# Add size if available
if "MediaSize" in media:
try:
file_facet["uco-observable:sizeInBytes"] = int(media["MediaSize"])
except ValueError:
self.logger.warning(f"Invalid MediaSize value: {media['MediaSize']}")

file_obj["uco-core:hasFacet"] = [{"@id": facet_id}]
objects.append(file_facet)

objects.append(file_obj)

# Add only UcoObjects to the bundle's object list
objects[0]["uco-core:object"] = [{"@id": obj["@id"]} for obj in objects[1:]
if obj["@type"] in ["uco-identity:Organization",
"uco-observable:URL",
"uco-tool:ConfiguredTool",
"uco-observable:ObservableObject",
"uco-action:Action",
"uco-core:Relationship",
"uco-observable:File",
"uco-types:Hash"]]
# Add all UcoObjects to the bundle's object list
bundle["uco-core:object"] = [
{"@id": obj["@id"]} for obj in objects
if obj["@type"] in [
"uco-tool:ConfiguredTool",
"uco-identity:Organization",
"uco-observable:URL",
"uco-observable:ObservableObject",
"uco-observable:File",
"uco-types:Hash"
]
]

# Create the UCO object
uco_object = {
Expand Down
Loading

0 comments on commit cc83bc2

Please sign in to comment.