FastAPI app that parses CMM inspection reports (PDF/Excel/CSV), computes SPC metrics (Cp/Cpk/Pp/Ppk, control limits, Shapiro-Wilk), generates interactive Plotly charts, and provides AI-powered quality summaries via Azure OpenAI with graceful fallback. Includes 21 passing tests covering parsers, SPC calculations, and API endpoints.
166 lines
4.9 KiB
Python
166 lines
4.9 KiB
Python
from __future__ import annotations
|
||
|
||
from dataclasses import dataclass
|
||
|
||
import numpy as np
|
||
from scipy import stats
|
||
|
||
from app.parsers.models import MeasurementRecord
|
||
|
||
# d2 constants for subgroup sizes 2–10 (R-bar/d2 method)
|
||
_D2 = {2: 1.128, 3: 1.693, 4: 2.059, 5: 2.326, 6: 2.534, 7: 2.704, 8: 2.847, 9: 2.970, 10: 3.078}
|
||
|
||
|
||
@dataclass
|
||
class SPCResult:
|
||
feature_name: str
|
||
n: int
|
||
mean: float
|
||
std: float
|
||
min_val: float
|
||
max_val: float
|
||
usl: float
|
||
lsl: float
|
||
nominal: float
|
||
cp: float | None
|
||
cpk: float | None
|
||
pp: float | None
|
||
ppk: float | None
|
||
ucl: float
|
||
lcl: float
|
||
out_of_spec_count: int
|
||
shapiro_p: float | None
|
||
values: list[float]
|
||
|
||
def to_dict(self) -> dict:
|
||
return {
|
||
"feature_name": self.feature_name,
|
||
"n": self.n,
|
||
"mean": round(self.mean, 6),
|
||
"std": round(self.std, 6),
|
||
"min": round(self.min_val, 6),
|
||
"max": round(self.max_val, 6),
|
||
"usl": round(self.usl, 6),
|
||
"lsl": round(self.lsl, 6),
|
||
"nominal": round(self.nominal, 6),
|
||
"cp": _r(self.cp),
|
||
"cpk": _r(self.cpk),
|
||
"pp": _r(self.pp),
|
||
"ppk": _r(self.ppk),
|
||
"ucl": round(self.ucl, 6),
|
||
"lcl": round(self.lcl, 6),
|
||
"out_of_spec_count": self.out_of_spec_count,
|
||
"shapiro_p": _r(self.shapiro_p),
|
||
"values": [round(v, 6) for v in self.values],
|
||
}
|
||
|
||
|
||
def _r(v: float | None) -> float | None:
|
||
return round(v, 4) if v is not None else None
|
||
|
||
|
||
def calculate_spc(
|
||
records: list[MeasurementRecord], subgroup_size: int = 5
|
||
) -> list[SPCResult]:
|
||
"""Calculate SPC metrics grouped by feature name."""
|
||
groups: dict[str, list[MeasurementRecord]] = {}
|
||
for rec in records:
|
||
groups.setdefault(rec.feature_name, []).append(rec)
|
||
|
||
results: list[SPCResult] = []
|
||
for name, recs in groups.items():
|
||
values = [r.actual for r in recs]
|
||
n = len(values)
|
||
if n < 1:
|
||
continue
|
||
|
||
arr = np.array(values)
|
||
mean = float(np.mean(arr))
|
||
usl = recs[0].usl
|
||
lsl = recs[0].lsl
|
||
nominal = recs[0].nominal
|
||
|
||
if n < 2:
|
||
results.append(
|
||
SPCResult(
|
||
feature_name=name, n=n, mean=mean, std=0.0,
|
||
min_val=values[0], max_val=values[0],
|
||
usl=usl, lsl=lsl, nominal=nominal,
|
||
cp=None, cpk=None, pp=None, ppk=None,
|
||
ucl=mean, lcl=mean,
|
||
out_of_spec_count=sum(1 for v in values if v < lsl or v > usl),
|
||
shapiro_p=None, values=values,
|
||
)
|
||
)
|
||
continue
|
||
|
||
std_overall = float(np.std(arr, ddof=1))
|
||
tol_range = usl - lsl
|
||
|
||
# Pp, Ppk (overall)
|
||
pp = tol_range / (6 * std_overall) if std_overall > 0 else None
|
||
ppk = (
|
||
min((usl - mean), (mean - lsl)) / (3 * std_overall)
|
||
if std_overall > 0
|
||
else None
|
||
)
|
||
|
||
# Cp, Cpk (within-subgroup using R-bar/d2)
|
||
std_within = _within_subgroup_sigma(arr, subgroup_size)
|
||
cp = tol_range / (6 * std_within) if std_within and std_within > 0 else None
|
||
cpk = (
|
||
min((usl - mean), (mean - lsl)) / (3 * std_within)
|
||
if std_within and std_within > 0
|
||
else None
|
||
)
|
||
|
||
# Control limits (X-bar chart, 3-sigma)
|
||
ucl = mean + 3 * std_overall
|
||
lcl = mean - 3 * std_overall
|
||
|
||
# Shapiro-Wilk normality test (need 3 ≤ n ≤ 5000)
|
||
shapiro_p = None
|
||
if 3 <= n <= 5000:
|
||
_, shapiro_p = stats.shapiro(arr)
|
||
shapiro_p = float(shapiro_p)
|
||
|
||
out_of_spec = sum(1 for v in values if v < lsl or v > usl)
|
||
|
||
results.append(
|
||
SPCResult(
|
||
feature_name=name, n=n, mean=mean, std=std_overall,
|
||
min_val=float(np.min(arr)), max_val=float(np.max(arr)),
|
||
usl=usl, lsl=lsl, nominal=nominal,
|
||
cp=cp, cpk=cpk, pp=pp, ppk=ppk,
|
||
ucl=ucl, lcl=lcl,
|
||
out_of_spec_count=out_of_spec,
|
||
shapiro_p=shapiro_p, values=values,
|
||
)
|
||
)
|
||
return results
|
||
|
||
|
||
def _within_subgroup_sigma(arr: np.ndarray, subgroup_size: int) -> float | None:
|
||
"""Estimate within-subgroup sigma using R-bar / d2 method."""
|
||
n = len(arr)
|
||
if n < 2:
|
||
return None
|
||
|
||
sg = min(subgroup_size, n)
|
||
d2 = _D2.get(sg)
|
||
if d2 is None:
|
||
# Fallback: use overall std if subgroup size not in d2 table
|
||
return float(np.std(arr, ddof=1))
|
||
|
||
ranges: list[float] = []
|
||
for i in range(0, n - sg + 1, sg):
|
||
subgroup = arr[i : i + sg]
|
||
if len(subgroup) >= 2:
|
||
ranges.append(float(np.max(subgroup) - np.min(subgroup)))
|
||
|
||
if not ranges:
|
||
return float(np.std(arr, ddof=1))
|
||
|
||
r_bar = float(np.mean(ranges))
|
||
return r_bar / d2
|