Add event-taxonomy package with canonical schema, adapters, and CLI
Canonical NormalizedFinding schema with Severity enum (CRITICAL/HIGH/MEDIUM/LOW/INFO). Per-project adapters for 9 tools with severity mapping for string labels, int 1-10, float 0-1, Python Enum, and computed properties. CLI pipe interface and registry. Nightshift-Task: event-taxonomy Nightshift-Ref: https://github.com/marcus/nightshift
This commit is contained in:
68
src/event_taxonomy/schema.py
Normal file
68
src/event_taxonomy/schema.py
Normal file
@@ -0,0 +1,68 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import json
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
|
||||
class Severity(enum.IntEnum):
|
||||
"""Canonical severity levels, ordered from most to least severe."""
|
||||
|
||||
CRITICAL = 5
|
||||
HIGH = 4
|
||||
MEDIUM = 3
|
||||
LOW = 2
|
||||
INFO = 1
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.name
|
||||
|
||||
|
||||
@dataclass
|
||||
class NormalizedFinding:
|
||||
"""Unified finding representation across all analysis tools."""
|
||||
|
||||
tool: str
|
||||
category: str
|
||||
severity: Severity
|
||||
message: str
|
||||
file: str | None = None
|
||||
line: int | None = None
|
||||
recommendation: str | None = None
|
||||
metadata: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
d = asdict(self)
|
||||
d["severity"] = self.severity.name
|
||||
return d
|
||||
|
||||
|
||||
@dataclass
|
||||
class ToolEvent:
|
||||
"""Envelope wrapping a tool's normalized output."""
|
||||
|
||||
tool_name: str
|
||||
tool_version: str
|
||||
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
|
||||
findings: list[NormalizedFinding] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def summary(self) -> dict[str, int]:
|
||||
counts: dict[str, int] = {s.name: 0 for s in Severity}
|
||||
for f in self.findings:
|
||||
counts[f.severity.name] += 1
|
||||
return {"total": len(self.findings), **counts}
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"tool_name": self.tool_name,
|
||||
"tool_version": self.tool_version,
|
||||
"timestamp": self.timestamp,
|
||||
"findings": [f.to_dict() for f in self.findings],
|
||||
"summary": self.summary,
|
||||
}
|
||||
|
||||
def to_json(self, **kwargs: Any) -> str:
|
||||
return json.dumps(self.to_dict(), **kwargs)
|
||||
Reference in New Issue
Block a user