3 Commits

Author SHA1 Message Date
Claude
07db03ecb6 Remove __pycache__ from tracking
Nightshift-Task: event-taxonomy
Nightshift-Ref: https://github.com/marcus/nightshift
2026-03-09 21:09:54 +00:00
Claude
feb49b5841 Add .gitignore and remove __pycache__ from tracking
Nightshift-Task: event-taxonomy
Nightshift-Ref: https://github.com/marcus/nightshift
2026-03-09 21:09:33 +00:00
Claude
a31093822c Add event-taxonomy package with canonical schema, adapters, and CLI
Canonical NormalizedFinding schema with Severity enum (CRITICAL/HIGH/MEDIUM/LOW/INFO).
Per-project adapters for 9 tools with severity mapping for string labels, int 1-10,
float 0-1, Python Enum, and computed properties. CLI pipe interface and registry.

Nightshift-Task: event-taxonomy
Nightshift-Ref: https://github.com/marcus/nightshift
2026-03-09 21:09:13 +00:00
20 changed files with 715 additions and 0 deletions

6
.gitignore vendored Normal file
View File

@@ -0,0 +1,6 @@
__pycache__/
*.pyc
.pytest_cache/
*.egg-info/
dist/
build/

19
pyproject.toml Normal file
View File

@@ -0,0 +1,19 @@
[project]
name = "event-taxonomy"
version = "0.1.0"
description = "Normalize event naming and structure across analysis tools"
requires-python = ">=3.13"
dependencies = []
[project.scripts]
event-taxonomy = "event_taxonomy.__main__:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/event_taxonomy"]
[tool.pytest.ini_options]
testpaths = ["tests"]

View File

@@ -0,0 +1,3 @@
from event_taxonomy.schema import NormalizedFinding, Severity, ToolEvent
__all__ = ["NormalizedFinding", "Severity", "ToolEvent"]

View File

@@ -0,0 +1,32 @@
"""CLI: reads JSON from stdin, outputs normalized JSON.
Usage:
echo '[{...}]' | python -m event_taxonomy --tool bus-factor-analyzer
echo '[{...}]' | event-taxonomy --tool dep-risk-scanner
"""
import argparse
import json
import sys
from event_taxonomy.registry import list_tools, normalize_output
def main() -> None:
parser = argparse.ArgumentParser(description="Normalize tool findings to canonical schema")
parser.add_argument("--tool", required=True, help=f"Tool name. Options: {', '.join(list_tools())}")
parser.add_argument("--version", default="unknown", help="Tool version string")
parser.add_argument("--indent", type=int, default=2, help="JSON indent (0 for compact)")
args = parser.parse_args()
raw = json.load(sys.stdin)
if isinstance(raw, dict):
raw = [raw]
event = normalize_output(args.tool, raw, tool_version=args.version)
indent = args.indent if args.indent > 0 else None
print(event.to_json(indent=indent))
if __name__ == "__main__":
main()

View File

View File

@@ -0,0 +1,85 @@
"""Centralized severity mapping logic for all tool representations."""
from event_taxonomy.schema import Severity
# bus-factor / dep-risk / knowledge-silo style: CRITICAL/HIGH/MEDIUM/LOW(/OK)
_LABEL_MAP: dict[str, Severity] = {
"CRITICAL": Severity.CRITICAL,
"HIGH": Severity.HIGH,
"MEDIUM": Severity.MEDIUM,
"LOW": Severity.LOW,
"OK": Severity.INFO,
}
def map_risk_label(label: str) -> Severity:
"""Map CRITICAL/HIGH/MEDIUM/LOW/OK string labels."""
return _LABEL_MAP.get(label.upper(), Severity.INFO)
# doc-drift style: error/warning/info
_DOC_MAP: dict[str, Severity] = {
"error": Severity.HIGH,
"warning": Severity.MEDIUM,
"info": Severity.INFO,
}
def map_doc_severity(sev: str) -> Severity:
"""Map error/warning/info string labels."""
return _DOC_MAP.get(sev.lower(), Severity.INFO)
# perf-regression style: integer 1-10
def map_int_severity(score: int) -> Severity:
"""Map integer severity (1-10 scale)."""
if score >= 8:
return Severity.CRITICAL
if score >= 6:
return Severity.HIGH
if score >= 4:
return Severity.MEDIUM
if score >= 2:
return Severity.LOW
return Severity.INFO
# schema-evolution style: safe/cautious/dangerous
_SCHEMA_MAP: dict[str, Severity] = {
"dangerous": Severity.CRITICAL,
"cautious": Severity.MEDIUM,
"safe": Severity.LOW,
}
def map_schema_risk(level: str) -> Severity:
"""Map safe/cautious/dangerous string labels."""
return _SCHEMA_MAP.get(level.lower(), Severity.INFO)
# roadmap-entropy style: critical/high/moderate/low
_ENTROPY_MAP: dict[str, Severity] = {
"critical": Severity.CRITICAL,
"high": Severity.HIGH,
"moderate": Severity.MEDIUM,
"low": Severity.LOW,
}
def map_entropy_risk(label: str) -> Severity:
"""Map critical/high/moderate/low string labels."""
return _ENTROPY_MAP.get(label.lower(), Severity.INFO)
# test-flakiness style: float 0.0-1.0
def map_flakiness_rate(rate: float) -> Severity:
"""Map flakiness rate (0.0-1.0) to severity."""
if rate >= 0.5:
return Severity.CRITICAL
if rate >= 0.3:
return Severity.HIGH
if rate >= 0.15:
return Severity.MEDIUM
if rate > 0.0:
return Severity.LOW
return Severity.INFO

View File

@@ -0,0 +1,20 @@
from typing import Any
from event_taxonomy.adapters._severity import map_risk_label
from event_taxonomy.schema import NormalizedFinding
def normalize(finding: dict[str, Any]) -> NormalizedFinding:
return NormalizedFinding(
tool="bus-factor-analyzer",
category="bus-factor",
severity=map_risk_label(finding["risk_label"]),
message=f"Bus factor {finding['bus_factor']}{finding['top_author']} owns {finding['top_author_pct']:.0f}%",
file=finding.get("file"),
metadata={
"top_author": finding["top_author"],
"top_author_pct": finding["top_author_pct"],
"num_contributors": finding["num_contributors"],
"bus_factor": finding["bus_factor"],
},
)

View File

@@ -0,0 +1,22 @@
from typing import Any
from event_taxonomy.adapters._severity import map_risk_label
from event_taxonomy.schema import NormalizedFinding
def normalize(finding: dict[str, Any]) -> NormalizedFinding:
label = finding.get("risk_label", "OK")
return NormalizedFinding(
tool="dep-risk-scanner",
category="dependency-risk",
severity=map_risk_label(label),
message=f"{finding['name']}@{finding['version']}{label}",
recommendation=f"Vulnerability count: {finding.get('vuln_count', 0)}, months stale: {finding.get('months_stale', 0)}"
if finding.get("vuln_count") or finding.get("months_stale")
else None,
metadata={
k: finding[k]
for k in ("ecosystem", "vuln_score", "maintenance_score", "risk_score", "vulns")
if k in finding
},
)

View File

@@ -0,0 +1,16 @@
from typing import Any
from event_taxonomy.adapters._severity import map_doc_severity
from event_taxonomy.schema import NormalizedFinding
def normalize(finding: dict[str, Any]) -> NormalizedFinding:
return NormalizedFinding(
tool="doc-drift-detector",
category=finding.get("kind", "doc-drift"),
severity=map_doc_severity(finding["severity"]),
message=finding["message"],
file=finding.get("file"),
line=finding.get("lineno"),
metadata={"symbol": finding["symbol"]} if finding.get("symbol") else {},
)

View File

@@ -0,0 +1,24 @@
from typing import Any
from event_taxonomy.adapters._severity import map_risk_label
from event_taxonomy.schema import NormalizedFinding
def normalize(finding: dict[str, Any]) -> NormalizedFinding:
risk = finding.get("risk", "LOW")
if hasattr(risk, "value"):
risk = risk.value
return NormalizedFinding(
tool="knowledge-silo-detector",
category="knowledge-silo",
severity=map_risk_label(str(risk)),
message=f"{finding['filepath']} dominated by {finding['dominant_author']} ({finding['dominant_commits']}/{finding['total_commits']} commits)",
file=finding.get("filepath"),
metadata={
"dominant_author": finding["dominant_author"],
"dominance_ratio": finding["dominant_commits"] / finding["total_commits"]
if finding["total_commits"]
else 0,
"other_authors": finding.get("other_authors", {}),
},
)

View File

@@ -0,0 +1,16 @@
from typing import Any
from event_taxonomy.adapters._severity import map_int_severity
from event_taxonomy.schema import NormalizedFinding
def normalize(finding: dict[str, Any]) -> NormalizedFinding:
return NormalizedFinding(
tool="perf-regression-spotter",
category=finding.get("pattern", "performance"),
severity=map_int_severity(finding["severity"]),
message=finding["message"],
file=finding.get("file"),
line=finding.get("line"),
metadata={"snippet": finding["snippet"]} if finding.get("snippet") else {},
)

View File

@@ -0,0 +1,26 @@
from typing import Any
from event_taxonomy.adapters._severity import map_entropy_risk
from event_taxonomy.schema import NormalizedFinding
def normalize(finding: dict[str, Any]) -> NormalizedFinding:
label = finding.get("risk_label", "low")
return NormalizedFinding(
tool="roadmap-entropy",
category="roadmap-entropy",
severity=map_entropy_risk(label),
message=f"Entropy score {finding['entropy_score']:.2f}{label}",
metadata={
k: finding[k]
for k in (
"base_count",
"head_count",
"item_count_delta",
"description_churn",
"priority_shuffles",
"entropy_score",
)
if k in finding
},
)

View File

@@ -0,0 +1,22 @@
from typing import Any
from event_taxonomy.adapters._severity import map_schema_risk
from event_taxonomy.schema import NormalizedFinding
def normalize(finding: dict[str, Any]) -> NormalizedFinding:
op = finding.get("operation", {})
if hasattr(op, "op_type"):
op_type = op.op_type
op_args = getattr(op, "args", [])
else:
op_type = op.get("op_type", "unknown")
op_args = op.get("args", [])
return NormalizedFinding(
tool="schema-evolution-advisor",
category=op_type,
severity=map_schema_risk(finding["risk_level"]),
message=finding["rationale"],
recommendation=finding.get("recommendation"),
metadata={"op_type": op_type, "args": op_args},
)

View File

@@ -0,0 +1,28 @@
from typing import Any
from event_taxonomy.schema import NormalizedFinding, Severity
def normalize(finding: dict[str, Any]) -> NormalizedFinding:
cats = finding.get("categories", [])
added = finding.get("added", 0)
removed = finding.get("removed", 0)
churn = added + removed
if "DELETION" in cats or "DELETED_FILE" in cats:
severity = Severity.MEDIUM
elif churn > 100:
severity = Severity.HIGH
elif churn > 30:
severity = Severity.MEDIUM
elif churn > 0:
severity = Severity.LOW
else:
severity = Severity.INFO
return NormalizedFinding(
tool="semantic-diff",
category=",".join(cats) if cats else "change",
severity=severity,
message=finding.get("summary", f"Changed {finding['path']}"),
file=finding.get("path"),
metadata={"added": added, "removed": removed, "categories": cats},
)

View File

@@ -0,0 +1,23 @@
from typing import Any
from event_taxonomy.adapters._severity import map_flakiness_rate
from event_taxonomy.schema import NormalizedFinding
def normalize(finding: dict[str, Any]) -> NormalizedFinding:
rate = finding["flakiness_rate"]
return NormalizedFinding(
tool="test-flakiness-analyzer",
category="flaky-test",
severity=map_flakiness_rate(rate),
message=f"{finding['test_id']} flaky at {rate:.0%} ({finding['fail_count']}/{finding['total_runs']} failures)",
metadata={
"test_id": finding["test_id"],
"classname": finding.get("classname", ""),
"total_runs": finding["total_runs"],
"pass_count": finding.get("pass_count", 0),
"fail_count": finding["fail_count"],
"flakiness_rate": rate,
"avg_duration": finding.get("avg_duration", 0),
},
)

View File

@@ -0,0 +1,48 @@
"""Adapter registry — maps tool names to normalize functions."""
from typing import Any, Callable
from event_taxonomy.adapters import (
bus_factor,
dep_risk,
doc_drift,
knowledge_silo,
perf_regression,
roadmap_entropy,
schema_evolution,
semantic_diff,
test_flakiness,
)
from event_taxonomy.schema import NormalizedFinding, ToolEvent
_ADAPTERS: dict[str, Callable[[dict[str, Any]], NormalizedFinding]] = {
"bus-factor-analyzer": bus_factor.normalize,
"dep-risk-scanner": dep_risk.normalize,
"doc-drift-detector": doc_drift.normalize,
"knowledge-silo-detector": knowledge_silo.normalize,
"perf-regression-spotter": perf_regression.normalize,
"roadmap-entropy": roadmap_entropy.normalize,
"schema-evolution-advisor": schema_evolution.normalize,
"semantic-diff": semantic_diff.normalize,
"test-flakiness-analyzer": test_flakiness.normalize,
}
def list_tools() -> list[str]:
return sorted(_ADAPTERS.keys())
def get_adapter(tool_name: str) -> Callable[[dict[str, Any]], NormalizedFinding]:
if tool_name not in _ADAPTERS:
raise KeyError(f"Unknown tool: {tool_name}. Available: {', '.join(sorted(_ADAPTERS))}")
return _ADAPTERS[tool_name]
def normalize_output(
tool_name: str,
raw_findings: list[dict[str, Any]],
tool_version: str = "unknown",
) -> ToolEvent:
adapter = get_adapter(tool_name)
findings = [adapter(f) for f in raw_findings]
return ToolEvent(tool_name=tool_name, tool_version=tool_version, findings=findings)

View File

@@ -0,0 +1,68 @@
from __future__ import annotations
import enum
import json
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from typing import Any
class Severity(enum.IntEnum):
"""Canonical severity levels, ordered from most to least severe."""
CRITICAL = 5
HIGH = 4
MEDIUM = 3
LOW = 2
INFO = 1
def __str__(self) -> str:
return self.name
@dataclass
class NormalizedFinding:
"""Unified finding representation across all analysis tools."""
tool: str
category: str
severity: Severity
message: str
file: str | None = None
line: int | None = None
recommendation: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
d = asdict(self)
d["severity"] = self.severity.name
return d
@dataclass
class ToolEvent:
"""Envelope wrapping a tool's normalized output."""
tool_name: str
tool_version: str
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
findings: list[NormalizedFinding] = field(default_factory=list)
@property
def summary(self) -> dict[str, int]:
counts: dict[str, int] = {s.name: 0 for s in Severity}
for f in self.findings:
counts[f.severity.name] += 1
return {"total": len(self.findings), **counts}
def to_dict(self) -> dict[str, Any]:
return {
"tool_name": self.tool_name,
"tool_version": self.tool_version,
"timestamp": self.timestamp,
"findings": [f.to_dict() for f in self.findings],
"summary": self.summary,
}
def to_json(self, **kwargs: Any) -> str:
return json.dumps(self.to_dict(), **kwargs)

145
tests/test_adapters.py Normal file
View File

@@ -0,0 +1,145 @@
from event_taxonomy.adapters import (
bus_factor,
dep_risk,
doc_drift,
knowledge_silo,
perf_regression,
roadmap_entropy,
schema_evolution,
semantic_diff,
test_flakiness,
)
from event_taxonomy.schema import Severity
def test_bus_factor():
f = bus_factor.normalize({
"file": "core.py",
"top_author": "alice",
"top_author_pct": 85.0,
"num_contributors": 2,
"bus_factor": 1,
"risk_label": "CRITICAL",
})
assert f.severity == Severity.CRITICAL
assert f.file == "core.py"
assert "alice" in f.message
def test_dep_risk():
f = dep_risk.normalize({
"name": "requests",
"ecosystem": "pypi",
"version": "2.28.0",
"risk_label": "HIGH",
"vuln_count": 2,
"months_stale": 6,
})
assert f.severity == Severity.HIGH
assert "requests" in f.message
def test_doc_drift():
f = doc_drift.normalize({
"kind": "docstring_param",
"severity": "error",
"message": "Param x not in signature",
"file": "model.py",
"lineno": 42,
"symbol": "process",
})
assert f.severity == Severity.HIGH
assert f.line == 42
assert f.file == "model.py"
def test_knowledge_silo():
f = knowledge_silo.normalize({
"filepath": "auth.py",
"total_commits": 50,
"dominant_author": "bob",
"dominant_commits": 45,
"other_authors": {"alice": 5},
"risk": "HIGH",
})
assert f.severity == Severity.HIGH
assert "bob" in f.message
def test_perf_regression():
f = perf_regression.normalize({
"pattern": "n-plus-one-query",
"severity": 8,
"file": "api.py",
"line": 123,
"message": "DB call inside loop",
"snippet": "for x in items: db.query(x)",
})
assert f.severity == Severity.CRITICAL
assert f.file == "api.py"
assert f.line == 123
def test_roadmap_entropy():
f = roadmap_entropy.normalize({
"entropy_score": 0.65,
"risk_label": "high",
"base_count": 10,
"head_count": 15,
"item_count_delta": 5,
"description_churn": 0.25,
"priority_shuffles": 3,
})
assert f.severity == Severity.HIGH
assert "0.65" in f.message
def test_schema_evolution():
f = schema_evolution.normalize({
"operation": {"op_type": "drop_column", "args": ["users", "password"]},
"risk_level": "dangerous",
"rationale": "Dropping column causes data loss",
"recommendation": "Back up first",
})
assert f.severity == Severity.CRITICAL
assert f.recommendation == "Back up first"
def test_semantic_diff():
f = semantic_diff.normalize({
"path": "api.py",
"categories": ["REFACTOR", "FEATURE"],
"summary": "Modified api.py",
"added": 45,
"removed": 23,
})
assert f.severity == Severity.MEDIUM
assert f.file == "api.py"
def test_semantic_diff_large_churn():
f = semantic_diff.normalize({
"path": "big.py",
"categories": ["FEATURE"],
"summary": "Rewrote big.py",
"added": 200,
"removed": 50,
})
assert f.severity == Severity.HIGH
def test_test_flakiness():
f = test_flakiness.normalize({
"test_id": "tests.integration::test_auth",
"classname": "tests.integration",
"total_runs": 20,
"pass_count": 12,
"fail_count": 8,
"error_count": 0,
"skip_count": 0,
"flakiness_rate": 0.4,
"avg_duration": 1.25,
"duration_stddev": 0.35,
})
assert f.severity == Severity.HIGH
assert "40%" in f.message

52
tests/test_schema.py Normal file
View File

@@ -0,0 +1,52 @@
import json
from event_taxonomy.schema import NormalizedFinding, Severity, ToolEvent
def test_severity_ordering():
assert Severity.CRITICAL > Severity.HIGH > Severity.MEDIUM > Severity.LOW > Severity.INFO
def test_severity_str():
assert str(Severity.CRITICAL) == "CRITICAL"
assert str(Severity.INFO) == "INFO"
def test_finding_construction():
f = NormalizedFinding(
tool="test-tool",
category="test",
severity=Severity.HIGH,
message="something broke",
file="foo.py",
line=42,
)
assert f.tool == "test-tool"
assert f.severity == Severity.HIGH
assert f.file == "foo.py"
assert f.line == 42
assert f.metadata == {}
assert f.recommendation is None
def test_finding_to_dict():
f = NormalizedFinding(tool="t", category="c", severity=Severity.LOW, message="m")
d = f.to_dict()
assert d["severity"] == "LOW"
assert d["tool"] == "t"
def test_tool_event_serialization():
findings = [
NormalizedFinding(tool="t", category="c", severity=Severity.HIGH, message="a"),
NormalizedFinding(tool="t", category="c", severity=Severity.LOW, message="b"),
]
event = ToolEvent(tool_name="t", tool_version="1.0", findings=findings)
d = event.to_dict()
assert d["summary"]["total"] == 2
assert d["summary"]["HIGH"] == 1
assert d["summary"]["LOW"] == 1
j = event.to_json()
parsed = json.loads(j)
assert parsed["tool_name"] == "t"
assert len(parsed["findings"]) == 2

60
tests/test_severity.py Normal file
View File

@@ -0,0 +1,60 @@
from event_taxonomy.adapters._severity import (
map_doc_severity,
map_entropy_risk,
map_flakiness_rate,
map_int_severity,
map_risk_label,
map_schema_risk,
)
from event_taxonomy.schema import Severity
def test_risk_label_mapping():
assert map_risk_label("CRITICAL") == Severity.CRITICAL
assert map_risk_label("high") == Severity.HIGH
assert map_risk_label("Medium") == Severity.MEDIUM
assert map_risk_label("LOW") == Severity.LOW
assert map_risk_label("OK") == Severity.INFO
assert map_risk_label("unknown") == Severity.INFO
def test_doc_severity_mapping():
assert map_doc_severity("error") == Severity.HIGH
assert map_doc_severity("warning") == Severity.MEDIUM
assert map_doc_severity("info") == Severity.INFO
assert map_doc_severity("ERROR") == Severity.HIGH
def test_int_severity_boundaries():
assert map_int_severity(10) == Severity.CRITICAL
assert map_int_severity(8) == Severity.CRITICAL
assert map_int_severity(7) == Severity.HIGH
assert map_int_severity(6) == Severity.HIGH
assert map_int_severity(5) == Severity.MEDIUM
assert map_int_severity(4) == Severity.MEDIUM
assert map_int_severity(3) == Severity.LOW
assert map_int_severity(2) == Severity.LOW
assert map_int_severity(1) == Severity.INFO
def test_schema_risk_mapping():
assert map_schema_risk("dangerous") == Severity.CRITICAL
assert map_schema_risk("cautious") == Severity.MEDIUM
assert map_schema_risk("safe") == Severity.LOW
assert map_schema_risk("Dangerous") == Severity.CRITICAL
def test_entropy_risk_mapping():
assert map_entropy_risk("critical") == Severity.CRITICAL
assert map_entropy_risk("high") == Severity.HIGH
assert map_entropy_risk("moderate") == Severity.MEDIUM
assert map_entropy_risk("low") == Severity.LOW
def test_flakiness_rate_boundaries():
assert map_flakiness_rate(0.0) == Severity.INFO
assert map_flakiness_rate(0.05) == Severity.LOW
assert map_flakiness_rate(0.15) == Severity.MEDIUM
assert map_flakiness_rate(0.3) == Severity.HIGH
assert map_flakiness_rate(0.5) == Severity.CRITICAL
assert map_flakiness_rate(1.0) == Severity.CRITICAL