Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
07db03ecb6 | ||
|
|
feb49b5841 | ||
|
|
a31093822c |
6
.gitignore
vendored
Normal file
6
.gitignore
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
__pycache__/
|
||||
*.pyc
|
||||
.pytest_cache/
|
||||
*.egg-info/
|
||||
dist/
|
||||
build/
|
||||
19
pyproject.toml
Normal file
19
pyproject.toml
Normal file
@@ -0,0 +1,19 @@
|
||||
[project]
|
||||
name = "event-taxonomy"
|
||||
version = "0.1.0"
|
||||
description = "Normalize event naming and structure across analysis tools"
|
||||
requires-python = ">=3.13"
|
||||
dependencies = []
|
||||
|
||||
[project.scripts]
|
||||
event-taxonomy = "event_taxonomy.__main__:main"
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
packages = ["src/event_taxonomy"]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
testpaths = ["tests"]
|
||||
3
src/event_taxonomy/__init__.py
Normal file
3
src/event_taxonomy/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from event_taxonomy.schema import NormalizedFinding, Severity, ToolEvent
|
||||
|
||||
__all__ = ["NormalizedFinding", "Severity", "ToolEvent"]
|
||||
32
src/event_taxonomy/__main__.py
Normal file
32
src/event_taxonomy/__main__.py
Normal file
@@ -0,0 +1,32 @@
|
||||
"""CLI: reads JSON from stdin, outputs normalized JSON.
|
||||
|
||||
Usage:
|
||||
echo '[{...}]' | python -m event_taxonomy --tool bus-factor-analyzer
|
||||
echo '[{...}]' | event-taxonomy --tool dep-risk-scanner
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
|
||||
from event_taxonomy.registry import list_tools, normalize_output
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Normalize tool findings to canonical schema")
|
||||
parser.add_argument("--tool", required=True, help=f"Tool name. Options: {', '.join(list_tools())}")
|
||||
parser.add_argument("--version", default="unknown", help="Tool version string")
|
||||
parser.add_argument("--indent", type=int, default=2, help="JSON indent (0 for compact)")
|
||||
args = parser.parse_args()
|
||||
|
||||
raw = json.load(sys.stdin)
|
||||
if isinstance(raw, dict):
|
||||
raw = [raw]
|
||||
|
||||
event = normalize_output(args.tool, raw, tool_version=args.version)
|
||||
indent = args.indent if args.indent > 0 else None
|
||||
print(event.to_json(indent=indent))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
0
src/event_taxonomy/adapters/__init__.py
Normal file
0
src/event_taxonomy/adapters/__init__.py
Normal file
85
src/event_taxonomy/adapters/_severity.py
Normal file
85
src/event_taxonomy/adapters/_severity.py
Normal file
@@ -0,0 +1,85 @@
|
||||
"""Centralized severity mapping logic for all tool representations."""
|
||||
|
||||
from event_taxonomy.schema import Severity
|
||||
|
||||
# bus-factor / dep-risk / knowledge-silo style: CRITICAL/HIGH/MEDIUM/LOW(/OK)
|
||||
_LABEL_MAP: dict[str, Severity] = {
|
||||
"CRITICAL": Severity.CRITICAL,
|
||||
"HIGH": Severity.HIGH,
|
||||
"MEDIUM": Severity.MEDIUM,
|
||||
"LOW": Severity.LOW,
|
||||
"OK": Severity.INFO,
|
||||
}
|
||||
|
||||
|
||||
def map_risk_label(label: str) -> Severity:
|
||||
"""Map CRITICAL/HIGH/MEDIUM/LOW/OK string labels."""
|
||||
return _LABEL_MAP.get(label.upper(), Severity.INFO)
|
||||
|
||||
|
||||
# doc-drift style: error/warning/info
|
||||
_DOC_MAP: dict[str, Severity] = {
|
||||
"error": Severity.HIGH,
|
||||
"warning": Severity.MEDIUM,
|
||||
"info": Severity.INFO,
|
||||
}
|
||||
|
||||
|
||||
def map_doc_severity(sev: str) -> Severity:
|
||||
"""Map error/warning/info string labels."""
|
||||
return _DOC_MAP.get(sev.lower(), Severity.INFO)
|
||||
|
||||
|
||||
# perf-regression style: integer 1-10
|
||||
def map_int_severity(score: int) -> Severity:
|
||||
"""Map integer severity (1-10 scale)."""
|
||||
if score >= 8:
|
||||
return Severity.CRITICAL
|
||||
if score >= 6:
|
||||
return Severity.HIGH
|
||||
if score >= 4:
|
||||
return Severity.MEDIUM
|
||||
if score >= 2:
|
||||
return Severity.LOW
|
||||
return Severity.INFO
|
||||
|
||||
|
||||
# schema-evolution style: safe/cautious/dangerous
|
||||
_SCHEMA_MAP: dict[str, Severity] = {
|
||||
"dangerous": Severity.CRITICAL,
|
||||
"cautious": Severity.MEDIUM,
|
||||
"safe": Severity.LOW,
|
||||
}
|
||||
|
||||
|
||||
def map_schema_risk(level: str) -> Severity:
|
||||
"""Map safe/cautious/dangerous string labels."""
|
||||
return _SCHEMA_MAP.get(level.lower(), Severity.INFO)
|
||||
|
||||
|
||||
# roadmap-entropy style: critical/high/moderate/low
|
||||
_ENTROPY_MAP: dict[str, Severity] = {
|
||||
"critical": Severity.CRITICAL,
|
||||
"high": Severity.HIGH,
|
||||
"moderate": Severity.MEDIUM,
|
||||
"low": Severity.LOW,
|
||||
}
|
||||
|
||||
|
||||
def map_entropy_risk(label: str) -> Severity:
|
||||
"""Map critical/high/moderate/low string labels."""
|
||||
return _ENTROPY_MAP.get(label.lower(), Severity.INFO)
|
||||
|
||||
|
||||
# test-flakiness style: float 0.0-1.0
|
||||
def map_flakiness_rate(rate: float) -> Severity:
|
||||
"""Map flakiness rate (0.0-1.0) to severity."""
|
||||
if rate >= 0.5:
|
||||
return Severity.CRITICAL
|
||||
if rate >= 0.3:
|
||||
return Severity.HIGH
|
||||
if rate >= 0.15:
|
||||
return Severity.MEDIUM
|
||||
if rate > 0.0:
|
||||
return Severity.LOW
|
||||
return Severity.INFO
|
||||
20
src/event_taxonomy/adapters/bus_factor.py
Normal file
20
src/event_taxonomy/adapters/bus_factor.py
Normal file
@@ -0,0 +1,20 @@
|
||||
from typing import Any
|
||||
|
||||
from event_taxonomy.adapters._severity import map_risk_label
|
||||
from event_taxonomy.schema import NormalizedFinding
|
||||
|
||||
|
||||
def normalize(finding: dict[str, Any]) -> NormalizedFinding:
|
||||
return NormalizedFinding(
|
||||
tool="bus-factor-analyzer",
|
||||
category="bus-factor",
|
||||
severity=map_risk_label(finding["risk_label"]),
|
||||
message=f"Bus factor {finding['bus_factor']} — {finding['top_author']} owns {finding['top_author_pct']:.0f}%",
|
||||
file=finding.get("file"),
|
||||
metadata={
|
||||
"top_author": finding["top_author"],
|
||||
"top_author_pct": finding["top_author_pct"],
|
||||
"num_contributors": finding["num_contributors"],
|
||||
"bus_factor": finding["bus_factor"],
|
||||
},
|
||||
)
|
||||
22
src/event_taxonomy/adapters/dep_risk.py
Normal file
22
src/event_taxonomy/adapters/dep_risk.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from typing import Any
|
||||
|
||||
from event_taxonomy.adapters._severity import map_risk_label
|
||||
from event_taxonomy.schema import NormalizedFinding
|
||||
|
||||
|
||||
def normalize(finding: dict[str, Any]) -> NormalizedFinding:
|
||||
label = finding.get("risk_label", "OK")
|
||||
return NormalizedFinding(
|
||||
tool="dep-risk-scanner",
|
||||
category="dependency-risk",
|
||||
severity=map_risk_label(label),
|
||||
message=f"{finding['name']}@{finding['version']} — {label}",
|
||||
recommendation=f"Vulnerability count: {finding.get('vuln_count', 0)}, months stale: {finding.get('months_stale', 0)}"
|
||||
if finding.get("vuln_count") or finding.get("months_stale")
|
||||
else None,
|
||||
metadata={
|
||||
k: finding[k]
|
||||
for k in ("ecosystem", "vuln_score", "maintenance_score", "risk_score", "vulns")
|
||||
if k in finding
|
||||
},
|
||||
)
|
||||
16
src/event_taxonomy/adapters/doc_drift.py
Normal file
16
src/event_taxonomy/adapters/doc_drift.py
Normal file
@@ -0,0 +1,16 @@
|
||||
from typing import Any
|
||||
|
||||
from event_taxonomy.adapters._severity import map_doc_severity
|
||||
from event_taxonomy.schema import NormalizedFinding
|
||||
|
||||
|
||||
def normalize(finding: dict[str, Any]) -> NormalizedFinding:
|
||||
return NormalizedFinding(
|
||||
tool="doc-drift-detector",
|
||||
category=finding.get("kind", "doc-drift"),
|
||||
severity=map_doc_severity(finding["severity"]),
|
||||
message=finding["message"],
|
||||
file=finding.get("file"),
|
||||
line=finding.get("lineno"),
|
||||
metadata={"symbol": finding["symbol"]} if finding.get("symbol") else {},
|
||||
)
|
||||
24
src/event_taxonomy/adapters/knowledge_silo.py
Normal file
24
src/event_taxonomy/adapters/knowledge_silo.py
Normal file
@@ -0,0 +1,24 @@
|
||||
from typing import Any
|
||||
|
||||
from event_taxonomy.adapters._severity import map_risk_label
|
||||
from event_taxonomy.schema import NormalizedFinding
|
||||
|
||||
|
||||
def normalize(finding: dict[str, Any]) -> NormalizedFinding:
|
||||
risk = finding.get("risk", "LOW")
|
||||
if hasattr(risk, "value"):
|
||||
risk = risk.value
|
||||
return NormalizedFinding(
|
||||
tool="knowledge-silo-detector",
|
||||
category="knowledge-silo",
|
||||
severity=map_risk_label(str(risk)),
|
||||
message=f"{finding['filepath']} dominated by {finding['dominant_author']} ({finding['dominant_commits']}/{finding['total_commits']} commits)",
|
||||
file=finding.get("filepath"),
|
||||
metadata={
|
||||
"dominant_author": finding["dominant_author"],
|
||||
"dominance_ratio": finding["dominant_commits"] / finding["total_commits"]
|
||||
if finding["total_commits"]
|
||||
else 0,
|
||||
"other_authors": finding.get("other_authors", {}),
|
||||
},
|
||||
)
|
||||
16
src/event_taxonomy/adapters/perf_regression.py
Normal file
16
src/event_taxonomy/adapters/perf_regression.py
Normal file
@@ -0,0 +1,16 @@
|
||||
from typing import Any
|
||||
|
||||
from event_taxonomy.adapters._severity import map_int_severity
|
||||
from event_taxonomy.schema import NormalizedFinding
|
||||
|
||||
|
||||
def normalize(finding: dict[str, Any]) -> NormalizedFinding:
|
||||
return NormalizedFinding(
|
||||
tool="perf-regression-spotter",
|
||||
category=finding.get("pattern", "performance"),
|
||||
severity=map_int_severity(finding["severity"]),
|
||||
message=finding["message"],
|
||||
file=finding.get("file"),
|
||||
line=finding.get("line"),
|
||||
metadata={"snippet": finding["snippet"]} if finding.get("snippet") else {},
|
||||
)
|
||||
26
src/event_taxonomy/adapters/roadmap_entropy.py
Normal file
26
src/event_taxonomy/adapters/roadmap_entropy.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from typing import Any
|
||||
|
||||
from event_taxonomy.adapters._severity import map_entropy_risk
|
||||
from event_taxonomy.schema import NormalizedFinding
|
||||
|
||||
|
||||
def normalize(finding: dict[str, Any]) -> NormalizedFinding:
|
||||
label = finding.get("risk_label", "low")
|
||||
return NormalizedFinding(
|
||||
tool="roadmap-entropy",
|
||||
category="roadmap-entropy",
|
||||
severity=map_entropy_risk(label),
|
||||
message=f"Entropy score {finding['entropy_score']:.2f} — {label}",
|
||||
metadata={
|
||||
k: finding[k]
|
||||
for k in (
|
||||
"base_count",
|
||||
"head_count",
|
||||
"item_count_delta",
|
||||
"description_churn",
|
||||
"priority_shuffles",
|
||||
"entropy_score",
|
||||
)
|
||||
if k in finding
|
||||
},
|
||||
)
|
||||
22
src/event_taxonomy/adapters/schema_evolution.py
Normal file
22
src/event_taxonomy/adapters/schema_evolution.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from typing import Any
|
||||
|
||||
from event_taxonomy.adapters._severity import map_schema_risk
|
||||
from event_taxonomy.schema import NormalizedFinding
|
||||
|
||||
|
||||
def normalize(finding: dict[str, Any]) -> NormalizedFinding:
|
||||
op = finding.get("operation", {})
|
||||
if hasattr(op, "op_type"):
|
||||
op_type = op.op_type
|
||||
op_args = getattr(op, "args", [])
|
||||
else:
|
||||
op_type = op.get("op_type", "unknown")
|
||||
op_args = op.get("args", [])
|
||||
return NormalizedFinding(
|
||||
tool="schema-evolution-advisor",
|
||||
category=op_type,
|
||||
severity=map_schema_risk(finding["risk_level"]),
|
||||
message=finding["rationale"],
|
||||
recommendation=finding.get("recommendation"),
|
||||
metadata={"op_type": op_type, "args": op_args},
|
||||
)
|
||||
28
src/event_taxonomy/adapters/semantic_diff.py
Normal file
28
src/event_taxonomy/adapters/semantic_diff.py
Normal file
@@ -0,0 +1,28 @@
|
||||
from typing import Any
|
||||
|
||||
from event_taxonomy.schema import NormalizedFinding, Severity
|
||||
|
||||
|
||||
def normalize(finding: dict[str, Any]) -> NormalizedFinding:
|
||||
cats = finding.get("categories", [])
|
||||
added = finding.get("added", 0)
|
||||
removed = finding.get("removed", 0)
|
||||
churn = added + removed
|
||||
if "DELETION" in cats or "DELETED_FILE" in cats:
|
||||
severity = Severity.MEDIUM
|
||||
elif churn > 100:
|
||||
severity = Severity.HIGH
|
||||
elif churn > 30:
|
||||
severity = Severity.MEDIUM
|
||||
elif churn > 0:
|
||||
severity = Severity.LOW
|
||||
else:
|
||||
severity = Severity.INFO
|
||||
return NormalizedFinding(
|
||||
tool="semantic-diff",
|
||||
category=",".join(cats) if cats else "change",
|
||||
severity=severity,
|
||||
message=finding.get("summary", f"Changed {finding['path']}"),
|
||||
file=finding.get("path"),
|
||||
metadata={"added": added, "removed": removed, "categories": cats},
|
||||
)
|
||||
23
src/event_taxonomy/adapters/test_flakiness.py
Normal file
23
src/event_taxonomy/adapters/test_flakiness.py
Normal file
@@ -0,0 +1,23 @@
|
||||
from typing import Any
|
||||
|
||||
from event_taxonomy.adapters._severity import map_flakiness_rate
|
||||
from event_taxonomy.schema import NormalizedFinding
|
||||
|
||||
|
||||
def normalize(finding: dict[str, Any]) -> NormalizedFinding:
|
||||
rate = finding["flakiness_rate"]
|
||||
return NormalizedFinding(
|
||||
tool="test-flakiness-analyzer",
|
||||
category="flaky-test",
|
||||
severity=map_flakiness_rate(rate),
|
||||
message=f"{finding['test_id']} flaky at {rate:.0%} ({finding['fail_count']}/{finding['total_runs']} failures)",
|
||||
metadata={
|
||||
"test_id": finding["test_id"],
|
||||
"classname": finding.get("classname", ""),
|
||||
"total_runs": finding["total_runs"],
|
||||
"pass_count": finding.get("pass_count", 0),
|
||||
"fail_count": finding["fail_count"],
|
||||
"flakiness_rate": rate,
|
||||
"avg_duration": finding.get("avg_duration", 0),
|
||||
},
|
||||
)
|
||||
48
src/event_taxonomy/registry.py
Normal file
48
src/event_taxonomy/registry.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""Adapter registry — maps tool names to normalize functions."""
|
||||
|
||||
from typing import Any, Callable
|
||||
|
||||
from event_taxonomy.adapters import (
|
||||
bus_factor,
|
||||
dep_risk,
|
||||
doc_drift,
|
||||
knowledge_silo,
|
||||
perf_regression,
|
||||
roadmap_entropy,
|
||||
schema_evolution,
|
||||
semantic_diff,
|
||||
test_flakiness,
|
||||
)
|
||||
from event_taxonomy.schema import NormalizedFinding, ToolEvent
|
||||
|
||||
_ADAPTERS: dict[str, Callable[[dict[str, Any]], NormalizedFinding]] = {
|
||||
"bus-factor-analyzer": bus_factor.normalize,
|
||||
"dep-risk-scanner": dep_risk.normalize,
|
||||
"doc-drift-detector": doc_drift.normalize,
|
||||
"knowledge-silo-detector": knowledge_silo.normalize,
|
||||
"perf-regression-spotter": perf_regression.normalize,
|
||||
"roadmap-entropy": roadmap_entropy.normalize,
|
||||
"schema-evolution-advisor": schema_evolution.normalize,
|
||||
"semantic-diff": semantic_diff.normalize,
|
||||
"test-flakiness-analyzer": test_flakiness.normalize,
|
||||
}
|
||||
|
||||
|
||||
def list_tools() -> list[str]:
|
||||
return sorted(_ADAPTERS.keys())
|
||||
|
||||
|
||||
def get_adapter(tool_name: str) -> Callable[[dict[str, Any]], NormalizedFinding]:
|
||||
if tool_name not in _ADAPTERS:
|
||||
raise KeyError(f"Unknown tool: {tool_name}. Available: {', '.join(sorted(_ADAPTERS))}")
|
||||
return _ADAPTERS[tool_name]
|
||||
|
||||
|
||||
def normalize_output(
|
||||
tool_name: str,
|
||||
raw_findings: list[dict[str, Any]],
|
||||
tool_version: str = "unknown",
|
||||
) -> ToolEvent:
|
||||
adapter = get_adapter(tool_name)
|
||||
findings = [adapter(f) for f in raw_findings]
|
||||
return ToolEvent(tool_name=tool_name, tool_version=tool_version, findings=findings)
|
||||
68
src/event_taxonomy/schema.py
Normal file
68
src/event_taxonomy/schema.py
Normal file
@@ -0,0 +1,68 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import json
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
|
||||
class Severity(enum.IntEnum):
|
||||
"""Canonical severity levels, ordered from most to least severe."""
|
||||
|
||||
CRITICAL = 5
|
||||
HIGH = 4
|
||||
MEDIUM = 3
|
||||
LOW = 2
|
||||
INFO = 1
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.name
|
||||
|
||||
|
||||
@dataclass
|
||||
class NormalizedFinding:
|
||||
"""Unified finding representation across all analysis tools."""
|
||||
|
||||
tool: str
|
||||
category: str
|
||||
severity: Severity
|
||||
message: str
|
||||
file: str | None = None
|
||||
line: int | None = None
|
||||
recommendation: str | None = None
|
||||
metadata: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
d = asdict(self)
|
||||
d["severity"] = self.severity.name
|
||||
return d
|
||||
|
||||
|
||||
@dataclass
|
||||
class ToolEvent:
|
||||
"""Envelope wrapping a tool's normalized output."""
|
||||
|
||||
tool_name: str
|
||||
tool_version: str
|
||||
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
|
||||
findings: list[NormalizedFinding] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def summary(self) -> dict[str, int]:
|
||||
counts: dict[str, int] = {s.name: 0 for s in Severity}
|
||||
for f in self.findings:
|
||||
counts[f.severity.name] += 1
|
||||
return {"total": len(self.findings), **counts}
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"tool_name": self.tool_name,
|
||||
"tool_version": self.tool_version,
|
||||
"timestamp": self.timestamp,
|
||||
"findings": [f.to_dict() for f in self.findings],
|
||||
"summary": self.summary,
|
||||
}
|
||||
|
||||
def to_json(self, **kwargs: Any) -> str:
|
||||
return json.dumps(self.to_dict(), **kwargs)
|
||||
145
tests/test_adapters.py
Normal file
145
tests/test_adapters.py
Normal file
@@ -0,0 +1,145 @@
|
||||
from event_taxonomy.adapters import (
|
||||
bus_factor,
|
||||
dep_risk,
|
||||
doc_drift,
|
||||
knowledge_silo,
|
||||
perf_regression,
|
||||
roadmap_entropy,
|
||||
schema_evolution,
|
||||
semantic_diff,
|
||||
test_flakiness,
|
||||
)
|
||||
from event_taxonomy.schema import Severity
|
||||
|
||||
|
||||
def test_bus_factor():
|
||||
f = bus_factor.normalize({
|
||||
"file": "core.py",
|
||||
"top_author": "alice",
|
||||
"top_author_pct": 85.0,
|
||||
"num_contributors": 2,
|
||||
"bus_factor": 1,
|
||||
"risk_label": "CRITICAL",
|
||||
})
|
||||
assert f.severity == Severity.CRITICAL
|
||||
assert f.file == "core.py"
|
||||
assert "alice" in f.message
|
||||
|
||||
|
||||
def test_dep_risk():
|
||||
f = dep_risk.normalize({
|
||||
"name": "requests",
|
||||
"ecosystem": "pypi",
|
||||
"version": "2.28.0",
|
||||
"risk_label": "HIGH",
|
||||
"vuln_count": 2,
|
||||
"months_stale": 6,
|
||||
})
|
||||
assert f.severity == Severity.HIGH
|
||||
assert "requests" in f.message
|
||||
|
||||
|
||||
def test_doc_drift():
|
||||
f = doc_drift.normalize({
|
||||
"kind": "docstring_param",
|
||||
"severity": "error",
|
||||
"message": "Param x not in signature",
|
||||
"file": "model.py",
|
||||
"lineno": 42,
|
||||
"symbol": "process",
|
||||
})
|
||||
assert f.severity == Severity.HIGH
|
||||
assert f.line == 42
|
||||
assert f.file == "model.py"
|
||||
|
||||
|
||||
def test_knowledge_silo():
|
||||
f = knowledge_silo.normalize({
|
||||
"filepath": "auth.py",
|
||||
"total_commits": 50,
|
||||
"dominant_author": "bob",
|
||||
"dominant_commits": 45,
|
||||
"other_authors": {"alice": 5},
|
||||
"risk": "HIGH",
|
||||
})
|
||||
assert f.severity == Severity.HIGH
|
||||
assert "bob" in f.message
|
||||
|
||||
|
||||
def test_perf_regression():
|
||||
f = perf_regression.normalize({
|
||||
"pattern": "n-plus-one-query",
|
||||
"severity": 8,
|
||||
"file": "api.py",
|
||||
"line": 123,
|
||||
"message": "DB call inside loop",
|
||||
"snippet": "for x in items: db.query(x)",
|
||||
})
|
||||
assert f.severity == Severity.CRITICAL
|
||||
assert f.file == "api.py"
|
||||
assert f.line == 123
|
||||
|
||||
|
||||
def test_roadmap_entropy():
|
||||
f = roadmap_entropy.normalize({
|
||||
"entropy_score": 0.65,
|
||||
"risk_label": "high",
|
||||
"base_count": 10,
|
||||
"head_count": 15,
|
||||
"item_count_delta": 5,
|
||||
"description_churn": 0.25,
|
||||
"priority_shuffles": 3,
|
||||
})
|
||||
assert f.severity == Severity.HIGH
|
||||
assert "0.65" in f.message
|
||||
|
||||
|
||||
def test_schema_evolution():
|
||||
f = schema_evolution.normalize({
|
||||
"operation": {"op_type": "drop_column", "args": ["users", "password"]},
|
||||
"risk_level": "dangerous",
|
||||
"rationale": "Dropping column causes data loss",
|
||||
"recommendation": "Back up first",
|
||||
})
|
||||
assert f.severity == Severity.CRITICAL
|
||||
assert f.recommendation == "Back up first"
|
||||
|
||||
|
||||
def test_semantic_diff():
|
||||
f = semantic_diff.normalize({
|
||||
"path": "api.py",
|
||||
"categories": ["REFACTOR", "FEATURE"],
|
||||
"summary": "Modified api.py",
|
||||
"added": 45,
|
||||
"removed": 23,
|
||||
})
|
||||
assert f.severity == Severity.MEDIUM
|
||||
assert f.file == "api.py"
|
||||
|
||||
|
||||
def test_semantic_diff_large_churn():
|
||||
f = semantic_diff.normalize({
|
||||
"path": "big.py",
|
||||
"categories": ["FEATURE"],
|
||||
"summary": "Rewrote big.py",
|
||||
"added": 200,
|
||||
"removed": 50,
|
||||
})
|
||||
assert f.severity == Severity.HIGH
|
||||
|
||||
|
||||
def test_test_flakiness():
|
||||
f = test_flakiness.normalize({
|
||||
"test_id": "tests.integration::test_auth",
|
||||
"classname": "tests.integration",
|
||||
"total_runs": 20,
|
||||
"pass_count": 12,
|
||||
"fail_count": 8,
|
||||
"error_count": 0,
|
||||
"skip_count": 0,
|
||||
"flakiness_rate": 0.4,
|
||||
"avg_duration": 1.25,
|
||||
"duration_stddev": 0.35,
|
||||
})
|
||||
assert f.severity == Severity.HIGH
|
||||
assert "40%" in f.message
|
||||
52
tests/test_schema.py
Normal file
52
tests/test_schema.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import json
|
||||
|
||||
from event_taxonomy.schema import NormalizedFinding, Severity, ToolEvent
|
||||
|
||||
|
||||
def test_severity_ordering():
|
||||
assert Severity.CRITICAL > Severity.HIGH > Severity.MEDIUM > Severity.LOW > Severity.INFO
|
||||
|
||||
|
||||
def test_severity_str():
|
||||
assert str(Severity.CRITICAL) == "CRITICAL"
|
||||
assert str(Severity.INFO) == "INFO"
|
||||
|
||||
|
||||
def test_finding_construction():
|
||||
f = NormalizedFinding(
|
||||
tool="test-tool",
|
||||
category="test",
|
||||
severity=Severity.HIGH,
|
||||
message="something broke",
|
||||
file="foo.py",
|
||||
line=42,
|
||||
)
|
||||
assert f.tool == "test-tool"
|
||||
assert f.severity == Severity.HIGH
|
||||
assert f.file == "foo.py"
|
||||
assert f.line == 42
|
||||
assert f.metadata == {}
|
||||
assert f.recommendation is None
|
||||
|
||||
|
||||
def test_finding_to_dict():
|
||||
f = NormalizedFinding(tool="t", category="c", severity=Severity.LOW, message="m")
|
||||
d = f.to_dict()
|
||||
assert d["severity"] == "LOW"
|
||||
assert d["tool"] == "t"
|
||||
|
||||
|
||||
def test_tool_event_serialization():
|
||||
findings = [
|
||||
NormalizedFinding(tool="t", category="c", severity=Severity.HIGH, message="a"),
|
||||
NormalizedFinding(tool="t", category="c", severity=Severity.LOW, message="b"),
|
||||
]
|
||||
event = ToolEvent(tool_name="t", tool_version="1.0", findings=findings)
|
||||
d = event.to_dict()
|
||||
assert d["summary"]["total"] == 2
|
||||
assert d["summary"]["HIGH"] == 1
|
||||
assert d["summary"]["LOW"] == 1
|
||||
j = event.to_json()
|
||||
parsed = json.loads(j)
|
||||
assert parsed["tool_name"] == "t"
|
||||
assert len(parsed["findings"]) == 2
|
||||
60
tests/test_severity.py
Normal file
60
tests/test_severity.py
Normal file
@@ -0,0 +1,60 @@
|
||||
from event_taxonomy.adapters._severity import (
|
||||
map_doc_severity,
|
||||
map_entropy_risk,
|
||||
map_flakiness_rate,
|
||||
map_int_severity,
|
||||
map_risk_label,
|
||||
map_schema_risk,
|
||||
)
|
||||
from event_taxonomy.schema import Severity
|
||||
|
||||
|
||||
def test_risk_label_mapping():
|
||||
assert map_risk_label("CRITICAL") == Severity.CRITICAL
|
||||
assert map_risk_label("high") == Severity.HIGH
|
||||
assert map_risk_label("Medium") == Severity.MEDIUM
|
||||
assert map_risk_label("LOW") == Severity.LOW
|
||||
assert map_risk_label("OK") == Severity.INFO
|
||||
assert map_risk_label("unknown") == Severity.INFO
|
||||
|
||||
|
||||
def test_doc_severity_mapping():
|
||||
assert map_doc_severity("error") == Severity.HIGH
|
||||
assert map_doc_severity("warning") == Severity.MEDIUM
|
||||
assert map_doc_severity("info") == Severity.INFO
|
||||
assert map_doc_severity("ERROR") == Severity.HIGH
|
||||
|
||||
|
||||
def test_int_severity_boundaries():
|
||||
assert map_int_severity(10) == Severity.CRITICAL
|
||||
assert map_int_severity(8) == Severity.CRITICAL
|
||||
assert map_int_severity(7) == Severity.HIGH
|
||||
assert map_int_severity(6) == Severity.HIGH
|
||||
assert map_int_severity(5) == Severity.MEDIUM
|
||||
assert map_int_severity(4) == Severity.MEDIUM
|
||||
assert map_int_severity(3) == Severity.LOW
|
||||
assert map_int_severity(2) == Severity.LOW
|
||||
assert map_int_severity(1) == Severity.INFO
|
||||
|
||||
|
||||
def test_schema_risk_mapping():
|
||||
assert map_schema_risk("dangerous") == Severity.CRITICAL
|
||||
assert map_schema_risk("cautious") == Severity.MEDIUM
|
||||
assert map_schema_risk("safe") == Severity.LOW
|
||||
assert map_schema_risk("Dangerous") == Severity.CRITICAL
|
||||
|
||||
|
||||
def test_entropy_risk_mapping():
|
||||
assert map_entropy_risk("critical") == Severity.CRITICAL
|
||||
assert map_entropy_risk("high") == Severity.HIGH
|
||||
assert map_entropy_risk("moderate") == Severity.MEDIUM
|
||||
assert map_entropy_risk("low") == Severity.LOW
|
||||
|
||||
|
||||
def test_flakiness_rate_boundaries():
|
||||
assert map_flakiness_rate(0.0) == Severity.INFO
|
||||
assert map_flakiness_rate(0.05) == Severity.LOW
|
||||
assert map_flakiness_rate(0.15) == Severity.MEDIUM
|
||||
assert map_flakiness_rate(0.3) == Severity.HIGH
|
||||
assert map_flakiness_rate(0.5) == Severity.CRITICAL
|
||||
assert map_flakiness_rate(1.0) == Severity.CRITICAL
|
||||
Reference in New Issue
Block a user