FastAPI app that parses CMM inspection reports (PDF/Excel/CSV), computes SPC metrics (Cp/Cpk/Pp/Ppk, control limits, Shapiro-Wilk), generates interactive Plotly charts, and provides AI-powered quality summaries via Azure OpenAI with graceful fallback. Includes 21 passing tests covering parsers, SPC calculations, and API endpoints.
162 lines
5.9 KiB
Python
162 lines
5.9 KiB
Python
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
import pdfplumber
|
|
|
|
from app.parsers.base import CMMParser, match_column
|
|
from app.parsers.models import MeasurementRecord, ParsedReport
|
|
|
|
|
|
class PDFParser(CMMParser):
|
|
def parse(self, path: Path) -> ParsedReport:
|
|
text_parts: list[str] = []
|
|
all_rows: list[dict[str, str | None]] = []
|
|
headers: list[str] = []
|
|
|
|
with pdfplumber.open(path) as pdf:
|
|
for page in pdf.pages:
|
|
page_text = page.extract_text() or ""
|
|
text_parts.append(page_text)
|
|
|
|
for table in page.extract_tables():
|
|
if not table or not table[0]:
|
|
continue
|
|
if not headers:
|
|
headers = [str(c or "").strip() for c in table[0]]
|
|
data_rows = table[1:]
|
|
else:
|
|
data_rows = table
|
|
for row in data_rows:
|
|
if row and any(cell for cell in row):
|
|
all_rows.append(
|
|
{
|
|
headers[i]: (str(cell).strip() if cell else None)
|
|
for i, cell in enumerate(row)
|
|
if i < len(headers)
|
|
}
|
|
)
|
|
|
|
raw_text = "\n".join(text_parts)
|
|
col_map = {match_column(h): h for h in headers if match_column(h)}
|
|
measurements = self._extract(all_rows, col_map)
|
|
metadata = self._extract_metadata(raw_text)
|
|
metadata["source"] = "pdf"
|
|
|
|
return ParsedReport(
|
|
filename=path.name,
|
|
measurements=measurements,
|
|
metadata=metadata,
|
|
raw_text=raw_text[:10_000],
|
|
)
|
|
|
|
def _extract(
|
|
self,
|
|
rows: list[dict[str, str | None]],
|
|
col_map: dict[str | None, str],
|
|
) -> list[MeasurementRecord]:
|
|
required = {"feature_name", "nominal", "actual"}
|
|
if not required.issubset(col_map):
|
|
return self._fallback_extract(rows)
|
|
|
|
records: list[MeasurementRecord] = []
|
|
for row in rows:
|
|
try:
|
|
name = row.get(col_map["feature_name"]) or ""
|
|
nominal = _to_float(row.get(col_map["nominal"]))
|
|
actual = _to_float(row.get(col_map["actual"]))
|
|
if nominal is None or actual is None or not name:
|
|
continue
|
|
tol_plus = (
|
|
_to_float(row.get(col_map.get("tolerance_plus", ""), "")) or 0.0
|
|
)
|
|
tol_minus = (
|
|
_to_float(row.get(col_map.get("tolerance_minus", ""), "")) or 0.0
|
|
)
|
|
deviation = (
|
|
_to_float(row.get(col_map.get("deviation", ""), ""))
|
|
or actual - nominal
|
|
)
|
|
records.append(
|
|
MeasurementRecord(
|
|
feature_name=name,
|
|
nominal=nominal,
|
|
tolerance_plus=abs(tol_plus),
|
|
tolerance_minus=-abs(tol_minus),
|
|
actual=actual,
|
|
deviation=deviation,
|
|
)
|
|
)
|
|
except (ValueError, TypeError):
|
|
continue
|
|
return records
|
|
|
|
def _fallback_extract(
|
|
self, rows: list[dict[str, str | None]]
|
|
) -> list[MeasurementRecord]:
|
|
"""Try to extract from rows even without full column mapping."""
|
|
if not rows:
|
|
return []
|
|
headers = list(rows[0].keys())
|
|
# Heuristic: first string-looking column = name, then look for numeric columns
|
|
numeric_cols: list[str] = []
|
|
name_col: str | None = None
|
|
for h in headers:
|
|
sample_vals = [r.get(h) for r in rows[:5] if r.get(h)]
|
|
if sample_vals and all(_to_float(v) is not None for v in sample_vals):
|
|
numeric_cols.append(h)
|
|
elif name_col is None and sample_vals:
|
|
name_col = h
|
|
if not name_col or len(numeric_cols) < 2:
|
|
return []
|
|
|
|
records: list[MeasurementRecord] = []
|
|
for row in rows:
|
|
try:
|
|
name = row.get(name_col) or ""
|
|
nominal = _to_float(row.get(numeric_cols[0]))
|
|
actual = _to_float(row.get(numeric_cols[1]))
|
|
if nominal is None or actual is None or not name:
|
|
continue
|
|
tol = _to_float(row.get(numeric_cols[2])) if len(numeric_cols) > 2 else 0.0
|
|
tol = tol or 0.0
|
|
records.append(
|
|
MeasurementRecord(
|
|
feature_name=name,
|
|
nominal=nominal,
|
|
tolerance_plus=abs(tol),
|
|
tolerance_minus=-abs(tol),
|
|
actual=actual,
|
|
deviation=actual - nominal,
|
|
)
|
|
)
|
|
except (ValueError, TypeError):
|
|
continue
|
|
return records
|
|
|
|
def _extract_metadata(self, text: str) -> dict[str, str]:
|
|
metadata: dict[str, str] = {}
|
|
import re
|
|
|
|
for pattern, key in [
|
|
(r"(?i)part\s*(?:no|number|#|:)\s*[:\s]*(\S+)", "part_number"),
|
|
(r"(?i)serial\s*(?:no|number|#|:)\s*[:\s]*(\S+)", "serial_number"),
|
|
(r"(?i)date\s*[:\s]+(\d[\d/\-\.]+\d)", "inspection_date"),
|
|
(r"(?i)program\s*[:\s]+(.+?)(?:\n|$)", "program"),
|
|
(r"(?i)operator\s*[:\s]+(.+?)(?:\n|$)", "operator"),
|
|
]:
|
|
m = re.search(pattern, text)
|
|
if m:
|
|
metadata[key] = m.group(1).strip()
|
|
return metadata
|
|
|
|
|
|
def _to_float(val: str | None) -> float | None:
|
|
if val is None:
|
|
return None
|
|
val = val.strip().replace(",", "")
|
|
try:
|
|
return float(val)
|
|
except ValueError:
|
|
return None
|