diff --git a/vulnerabilities/api_v2.py b/vulnerabilities/api_v2.py index ba41d0906..8df56b874 100644 --- a/vulnerabilities/api_v2.py +++ b/vulnerabilities/api_v2.py @@ -146,6 +146,31 @@ class AdvisoryV2Serializer(serializers.ModelSerializer): references = AdvisoryReferenceSerializer(many=True) severities = AdvisorySeveritySerializer(many=True) advisory_id = serializers.CharField(source="avid", read_only=True) + related_ssvc_trees = serializers.SerializerMethodField() + + def get_related_ssvc_trees(self, obj): + related_ssvcs = obj.related_ssvcs.all().select_related("source_advisory") + source_ssvcs = obj.source_ssvcs.all().select_related("source_advisory") + + seen = set() + result = [] + + for ssvc in list(related_ssvcs) + list(source_ssvcs): + key = (ssvc.vector, ssvc.source_advisory_id) + if key in seen: + continue + seen.add(key) + + result.append( + { + "vector": ssvc.vector, + "decision": ssvc.decision, + "options": ssvc.options, + "source_url": ssvc.source_advisory.url, + } + ) + + return result class Meta: model = AdvisoryV2 @@ -160,6 +185,7 @@ class Meta: "exploitability", "weighted_severity", "risk_score", + "related_ssvc_trees", ] def get_aliases(self, obj): diff --git a/vulnerabilities/improvers/__init__.py b/vulnerabilities/improvers/__init__.py index aa9312ec1..7735ad816 100644 --- a/vulnerabilities/improvers/__init__.py +++ b/vulnerabilities/improvers/__init__.py @@ -19,6 +19,7 @@ from vulnerabilities.pipelines import flag_ghost_packages from vulnerabilities.pipelines import populate_vulnerability_summary_pipeline from vulnerabilities.pipelines import remove_duplicate_advisories +from vulnerabilities.pipelines.v2_improvers import collect_ssvc_trees from vulnerabilities.pipelines.v2_improvers import compute_advisory_todo as compute_advisory_todo_v2 from vulnerabilities.pipelines.v2_improvers import compute_package_risk as compute_package_risk_v2 from vulnerabilities.pipelines.v2_improvers import ( @@ -70,5 +71,6 @@ compute_advisory_todo_v2.ComputeToDo, unfurl_version_range_v2.UnfurlVersionRangePipeline, compute_advisory_todo.ComputeToDo, + collect_ssvc_trees.CollectSSVCPipeline, ] ) diff --git a/vulnerabilities/migrations/0104_ssvc.py b/vulnerabilities/migrations/0104_ssvc.py new file mode 100644 index 000000000..1e9100eae --- /dev/null +++ b/vulnerabilities/migrations/0104_ssvc.py @@ -0,0 +1,59 @@ +# Generated by Django 4.2.25 on 2025-12-15 15:15 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ("vulnerabilities", "0103_codecommit_impactedpackage_affecting_commits_and_more"), + ] + + operations = [ + migrations.CreateModel( + name="SSVC", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, primary_key=True, serialize=False, verbose_name="ID" + ), + ), + ( + "vector", + models.CharField( + help_text="The vector string representing the SSVC.", max_length=255 + ), + ), + ( + "options", + models.JSONField(help_text="A JSON object containing the SSVC options."), + ), + ( + "decision", + models.CharField(help_text="The decision string for the SSVC.", max_length=255), + ), + ( + "related_advisories", + models.ManyToManyField( + help_text="Advisories associated with this SSVC.", + related_name="related_ssvcs", + to="vulnerabilities.advisoryv2", + ), + ), + ( + "source_advisory", + models.ForeignKey( + help_text="The advisory that was used to generate this SSVC decision.", + on_delete=django.db.models.deletion.CASCADE, + related_name="source_ssvcs", + to="vulnerabilities.advisoryv2", + ), + ), + ], + options={ + "unique_together": {("vector", "source_advisory")}, + }, + ), + ] diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index e1c4ddc6b..9a0d05c8f 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -3414,3 +3414,26 @@ class CodeCommit(models.Model): class Meta: unique_together = ("commit_hash", "vcs_url") + + +class SSVC(models.Model): + vector = models.CharField(max_length=255, help_text="The vector string representing the SSVC.") + options = models.JSONField(help_text="A JSON object containing the SSVC options.") + decision = models.CharField(max_length=255, help_text="The decision string for the SSVC.") + related_advisories = models.ManyToManyField( + AdvisoryV2, + related_name="related_ssvcs", + help_text="Advisories associated with this SSVC.", + ) + source_advisory = models.ForeignKey( + AdvisoryV2, + on_delete=models.CASCADE, + related_name="source_ssvcs", + help_text="The advisory that was used to generate this SSVC decision.", + ) + + def __str__(self): + return f"SSVC Decision: {self.vector} -> {self.decision}" + + class Meta: + unique_together = ("vector", "source_advisory") diff --git a/vulnerabilities/pipelines/v2_importers/vulnrichment_importer.py b/vulnerabilities/pipelines/v2_importers/vulnrichment_importer.py index a596d8d65..7de7ff7d7 100644 --- a/vulnerabilities/pipelines/v2_importers/vulnrichment_importer.py +++ b/vulnerabilities/pipelines/v2_importers/vulnrichment_importer.py @@ -16,6 +16,7 @@ from vulnerabilities.utils import get_advisory_url from vulnerabilities.utils import get_cwe_id from vulnerabilities.utils import get_reference_id +from vulnerabilities.utils import ssvc_calculator logger = logging.getLogger(__name__) @@ -210,117 +211,3 @@ def clean_downloads(self): def on_failure(self): self.clean_downloads() - - -def ssvc_calculator(ssvc_data): - """ - Return the ssvc vector and the decision value - """ - options = ssvc_data.get("options", []) - timestamp = ssvc_data.get("timestamp") - - # Extract the options into a dictionary - options_dict = {k: v.lower() for option in options for k, v in option.items()} - - # We copied the table value from this link. - # https://www.cisa.gov/sites/default/files/publications/cisa-ssvc-guide%20508c.pdf - - # Determining Mission and Well-Being Impact Value - mission_well_being_table = { - # (Mission Prevalence, Public Well-being Impact) : "Mission & Well-being" - ("minimal", "minimal"): "low", - ("minimal", "material"): "medium", - ("minimal", "irreversible"): "high", - ("support", "minimal"): "medium", - ("support", "material"): "medium", - ("support", "irreversible"): "high", - ("essential", "minimal"): "high", - ("essential", "material"): "high", - ("essential", "irreversible"): "high", - } - - if "Mission Prevalence" not in options_dict: - options_dict["Mission Prevalence"] = "minimal" - - if "Public Well-being Impact" not in options_dict: - options_dict["Public Well-being Impact"] = "material" - - options_dict["Mission & Well-being"] = mission_well_being_table[ - (options_dict["Mission Prevalence"], options_dict["Public Well-being Impact"]) - ] - - decision_key = ( - options_dict.get("Exploitation"), - options_dict.get("Automatable"), - options_dict.get("Technical Impact"), - options_dict.get("Mission & Well-being"), - ) - - decision_points = { - "Exploitation": {"E": {"none": "N", "poc": "P", "active": "A"}}, - "Automatable": {"A": {"no": "N", "yes": "Y"}}, - "Technical Impact": {"T": {"partial": "P", "total": "T"}}, - "Public Well-being Impact": {"B": {"minimal": "M", "material": "A", "irreversible": "I"}}, - "Mission Prevalence": {"P": {"minimal": "M", "support": "S", "essential": "E"}}, - "Mission & Well-being": {"M": {"low": "L", "medium": "M", "high": "H"}}, - } - - # Create the SSVC vector - ssvc_vector = "SSVCv2/" - for key, value_map in options_dict.items(): - options_key = decision_points.get(key) - for lhs, rhs_map in options_key.items(): - ssvc_vector += f"{lhs}:{rhs_map.get(value_map)}/" - - # "Decision": {"D": {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"}}, - decision_values = {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"} - - decision_lookup = { - ("none", "no", "partial", "low"): "Track", - ("none", "no", "partial", "medium"): "Track", - ("none", "no", "partial", "high"): "Track", - ("none", "no", "total", "low"): "Track", - ("none", "no", "total", "medium"): "Track", - ("none", "no", "total", "high"): "Track*", - ("none", "yes", "partial", "low"): "Track", - ("none", "yes", "partial", "medium"): "Track", - ("none", "yes", "partial", "high"): "Attend", - ("none", "yes", "total", "low"): "Track", - ("none", "yes", "total", "medium"): "Track", - ("none", "yes", "total", "high"): "Attend", - ("poc", "no", "partial", "low"): "Track", - ("poc", "no", "partial", "medium"): "Track", - ("poc", "no", "partial", "high"): "Track*", - ("poc", "no", "total", "low"): "Track", - ("poc", "no", "total", "medium"): "Track*", - ("poc", "no", "total", "high"): "Attend", - ("poc", "yes", "partial", "low"): "Track", - ("poc", "yes", "partial", "medium"): "Track", - ("poc", "yes", "partial", "high"): "Attend", - ("poc", "yes", "total", "low"): "Track", - ("poc", "yes", "total", "medium"): "Track*", - ("poc", "yes", "total", "high"): "Attend", - ("active", "no", "partial", "low"): "Track", - ("active", "no", "partial", "medium"): "Track", - ("active", "no", "partial", "high"): "Attend", - ("active", "no", "total", "low"): "Track", - ("active", "no", "total", "medium"): "Attend", - ("active", "no", "total", "high"): "Act", - ("active", "yes", "partial", "low"): "Attend", - ("active", "yes", "partial", "medium"): "Attend", - ("active", "yes", "partial", "high"): "Act", - ("active", "yes", "total", "low"): "Attend", - ("active", "yes", "total", "medium"): "Act", - ("active", "yes", "total", "high"): "Act", - } - - decision = decision_lookup.get(decision_key, "") - - if decision: - ssvc_vector += f"D:{decision_values.get(decision)}/" - - if timestamp: - timestamp_formatted = dateparser.parse(timestamp).strftime("%Y-%m-%dT%H:%M:%SZ") - - ssvc_vector += f"{timestamp_formatted}/" - return ssvc_vector, decision diff --git a/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py b/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py new file mode 100644 index 000000000..4afe8b2a1 --- /dev/null +++ b/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py @@ -0,0 +1,146 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import logging + +from django.db.models import Prefetch +from django.db.models import Q + +from vulnerabilities.models import SSVC +from vulnerabilities.models import AdvisorySeverity +from vulnerabilities.models import AdvisoryV2 +from vulnerabilities.pipelines import VulnerableCodePipeline +from vulnerabilities.pipelines.v2_importers.vulnrichment_importer import VulnrichImporterPipeline +from vulnerabilities.severity_systems import SCORING_SYSTEMS + +logger = logging.getLogger(__name__) + + +class CollectSSVCPipeline(VulnerableCodePipeline): + """ + Collect SSVC Pipeline + + This pipeline collects SSVC from Vulnrichment project and associates them with existing advisories. + """ + + pipeline_id = "collect_ssvc_trees" + + @classmethod + def steps(cls): + return (cls.collect_ssvc_data,) + + def collect_ssvc_data(self): + vulnrichment_advisories = ( + AdvisoryV2.objects.filter( + datasource_id=VulnrichImporterPipeline.pipeline_id, + severities__scoring_system=SCORING_SYSTEMS["ssvc"], + ) + .distinct() + .prefetch_related( + Prefetch( + "severities", + queryset=AdvisorySeverity.objects.filter( + scoring_system=SCORING_SYSTEMS["ssvc"] + ), + ) + ) + ) + + self.log( + f"Found {vulnrichment_advisories.count()} advisories from Vulnrichment with SSVC severities." + ) + for advisory in vulnrichment_advisories: + self.log(f"Processing advisory: {advisory.advisory_id}") + for severity in advisory.severities.all(): + ssvc_vector = severity.scoring_elements + try: + ssvc_tree, decision = convert_vector_to_tree_and_decision(ssvc_vector) + self.log( + f"Advisory: {advisory.advisory_id}, SSVC Tree: {ssvc_tree}, Decision: {decision}, vector: {ssvc_vector}" + ) + ssvc_obj, _ = SSVC.objects.get_or_create( + source_advisory=advisory, + defaults={ + "options": ssvc_tree, + "decision": decision, + "vector": ssvc_vector, + }, + ) + # All advisories that have advisory.advisory_id in their aliases or advisory_id same as advisory.advisory_id + related_advisories = AdvisoryV2.objects.filter( + Q(advisory_id=advisory.advisory_id) | Q(aliases__alias=advisory.advisory_id) + ).distinct() + related_advisories = related_advisories.exclude(id=advisory.id) + ssvc_obj.related_advisories.set(related_advisories) + except ValueError as e: + logger.error( + f"Failed to parse SSVC vector '{ssvc_vector}' for advisory '{advisory}': {e}" + ) + + +REVERSE_POINTS = { + "E": ("Exploitation", {"N": "none", "P": "poc", "A": "active"}), + "A": ("Automatable", {"N": "no", "Y": "yes"}), + "T": ("Technical Impact", {"P": "partial", "T": "total"}), + "P": ("Mission Prevalence", {"M": "minimal", "S": "support", "E": "essential"}), + "B": ("Public Well-being Impact", {"M": "minimal", "A": "material", "I": "irreversible"}), + "M": ("Mission & Well-being", {"L": "low", "M": "medium", "H": "high"}), +} + +REVERSE_DECISION = { + "T": "Track", + "R": "Track*", + "A": "Attend", + "C": "Act", +} + +VECTOR_ORDER = ["E", "A", "T", "P", "B", "M"] + + +def convert_vector_to_tree_and_decision(vector: str): + """ + Convert a given SSVC vector string into a structured tree and decision. + + Args: + vector (str): The SSVC vector string. + + Returns: + tuple: A tuple containing the SSVC tree (dict) and decision (str). + """ + if not vector.startswith("SSVCv2/"): + raise ValueError("Invalid SSVC vector") + + parts = [p for p in vector.replace("SSVCv2/", "").split("/") if p] + + options = [] + decision = None + + for part in parts: + if ":" not in part: + continue + + key, value = part.split(":", 1) + + if key == "D": + decision = REVERSE_DECISION.get(value) + continue + + if key in REVERSE_POINTS: + name, mapping = REVERSE_POINTS[key] + options.append({name: mapping[value]}) + + options.sort( + key=lambda o: VECTOR_ORDER.index( + next(k for k, _ in REVERSE_POINTS.values() if k == next(iter(o))) + ) + if False + else 0 + ) + + return options, decision diff --git a/vulnerabilities/templates/advisory_detail.html b/vulnerabilities/templates/advisory_detail.html index c3d93619a..976ae80de 100644 --- a/vulnerabilities/templates/advisory_detail.html +++ b/vulnerabilities/templates/advisory_detail.html @@ -44,13 +44,6 @@ -
  • - - - Severity details ({{ severity_vectors|length }}) - - -
  • {% if advisory.exploits %}
  • @@ -70,6 +63,16 @@
  • + {% if ssvcs %} +
  • + + + Related SSVCS ({{ ssvcs|length }}) + + +
  • + {% endif %} +