from __future__ import annotations from pathlib import Path import pdfplumber from app.parsers.base import CMMParser, match_column from app.parsers.models import MeasurementRecord, ParsedReport class PDFParser(CMMParser): def parse(self, path: Path) -> ParsedReport: text_parts: list[str] = [] all_rows: list[dict[str, str | None]] = [] headers: list[str] = [] with pdfplumber.open(path) as pdf: for page in pdf.pages: page_text = page.extract_text() or "" text_parts.append(page_text) for table in page.extract_tables(): if not table or not table[0]: continue if not headers: headers = [str(c or "").strip() for c in table[0]] data_rows = table[1:] else: data_rows = table for row in data_rows: if row and any(cell for cell in row): all_rows.append( { headers[i]: (str(cell).strip() if cell else None) for i, cell in enumerate(row) if i < len(headers) } ) raw_text = "\n".join(text_parts) col_map = {match_column(h): h for h in headers if match_column(h)} measurements = self._extract(all_rows, col_map) metadata = self._extract_metadata(raw_text) metadata["source"] = "pdf" return ParsedReport( filename=path.name, measurements=measurements, metadata=metadata, raw_text=raw_text[:10_000], ) def _extract( self, rows: list[dict[str, str | None]], col_map: dict[str | None, str], ) -> list[MeasurementRecord]: required = {"feature_name", "nominal", "actual"} if not required.issubset(col_map): return self._fallback_extract(rows) records: list[MeasurementRecord] = [] for row in rows: try: name = row.get(col_map["feature_name"]) or "" nominal = _to_float(row.get(col_map["nominal"])) actual = _to_float(row.get(col_map["actual"])) if nominal is None or actual is None or not name: continue tol_plus = ( _to_float(row.get(col_map.get("tolerance_plus", ""), "")) or 0.0 ) tol_minus = ( _to_float(row.get(col_map.get("tolerance_minus", ""), "")) or 0.0 ) deviation = ( _to_float(row.get(col_map.get("deviation", ""), "")) or actual - nominal ) records.append( MeasurementRecord( feature_name=name, nominal=nominal, tolerance_plus=abs(tol_plus), tolerance_minus=-abs(tol_minus), actual=actual, deviation=deviation, ) ) except (ValueError, TypeError): continue return records def _fallback_extract( self, rows: list[dict[str, str | None]] ) -> list[MeasurementRecord]: """Try to extract from rows even without full column mapping.""" if not rows: return [] headers = list(rows[0].keys()) # Heuristic: first string-looking column = name, then look for numeric columns numeric_cols: list[str] = [] name_col: str | None = None for h in headers: sample_vals = [r.get(h) for r in rows[:5] if r.get(h)] if sample_vals and all(_to_float(v) is not None for v in sample_vals): numeric_cols.append(h) elif name_col is None and sample_vals: name_col = h if not name_col or len(numeric_cols) < 2: return [] records: list[MeasurementRecord] = [] for row in rows: try: name = row.get(name_col) or "" nominal = _to_float(row.get(numeric_cols[0])) actual = _to_float(row.get(numeric_cols[1])) if nominal is None or actual is None or not name: continue tol = _to_float(row.get(numeric_cols[2])) if len(numeric_cols) > 2 else 0.0 tol = tol or 0.0 records.append( MeasurementRecord( feature_name=name, nominal=nominal, tolerance_plus=abs(tol), tolerance_minus=-abs(tol), actual=actual, deviation=actual - nominal, ) ) except (ValueError, TypeError): continue return records def _extract_metadata(self, text: str) -> dict[str, str]: metadata: dict[str, str] = {} import re for pattern, key in [ (r"(?i)part\s*(?:no|number|#|:)\s*[:\s]*(\S+)", "part_number"), (r"(?i)serial\s*(?:no|number|#|:)\s*[:\s]*(\S+)", "serial_number"), (r"(?i)date\s*[:\s]+(\d[\d/\-\.]+\d)", "inspection_date"), (r"(?i)program\s*[:\s]+(.+?)(?:\n|$)", "program"), (r"(?i)operator\s*[:\s]+(.+?)(?:\n|$)", "operator"), ]: m = re.search(pattern, text) if m: metadata[key] = m.group(1).strip() return metadata def _to_float(val: str | None) -> float | None: if val is None: return None val = val.strip().replace(",", "") try: return float(val) except ValueError: return None