diff --git a/tools/pdf_import/jmueller_parser.py b/tools/pdf_import/jmueller_parser.py new file mode 100644 index 0000000..e5e98ea --- /dev/null +++ b/tools/pdf_import/jmueller_parser.py @@ -0,0 +1,101 @@ +# parser.py +# Utilities to extract text from the PDF and parse rows into records. +# Assumes two tables: first is Harbor A, second is Harbor B. + + +from __future__ import annotations +from csv import reader +import re +from typing import List, Dict, Any, Tuple + + +# Optional: If you want OCR fallback later, wire in pdf2image + pytesseract here. + + +def extract_text_lines(pdf_path: str) -> List[str]: + """Extract text lines from a PDF using pdfplumber (preferred) with a + light fallback to PyPDF2. Returns a list of raw lines.""" + text = "" + + try: + import pdfplumber + with pdfplumber.open(pdf_path) as pdf: + pages_text = [] + for p in pdf.pages: + t = p.extract_text() or "" + pages_text.append(t) + text = "\n".join(pages_text) + except Exception: + + try: + from PyPDF2 import PdfReader + reader = PdfReader(pdf_path) + pages_text = [] + for page in reader.pages: + pages_text.append(page.extract_text() or "") + text = "\n".join(pages_text) + except Exception: + text = "" + + + if not text.strip(): + raise RuntimeError("No text extracted. If the PDF is scanned, add OCR fallback (pytesseract).") + + + # Normalize to individual lines + lines = [ln.strip() for ln in text.splitlines()] + return lines + + +HEADER_PATTERNS = [ +re.compile(r"\bSchiff\b.*\bETA\b.*\bETS\b", re.IGNORECASE), +re.compile(r"Nächster Hafen|Liegeplatz|Ladung|Lotse", re.IGNORECASE), +] + + +DATE_TOKEN = re.compile(r"\b\d{1,2}\.\d{1,2}\.(?:\d{4})?") +TIME_FRAGMENT = r"(?:\s*/\s*\d{1,2}\.\d{2}\s*Uhr\s*\*?)?" +DT_TOKEN_WITH_TIME = re.compile(r"\d{1,2}\.\d{1,2}\.(?:\d{4})?" + TIME_FRAGMENT) + + + + +def cleanse_lines(lines: List[str]) -> List[str]: + """Remove known header lines and keep data/blank lines.""" + out: List[str] = [] + for ln in lines: + if not ln: + out.append("") + continue + if any(p.search(ln) for p in HEADER_PATTERNS): + continue + out.append(ln) + return out + +def split_into_tables(lines: List[str]) -> List[List[str]]: + """Find candidate data lines (those containing a date token) and split them + into up to two blocks separated by at least one blank line. Returns a list + of blocks (1 or 2).""" + candidate = [ln for ln in lines if (ln == "" or DATE_TOKEN.search(ln))] + + + blocks: List[List[str]] = [] + current: List[str] = [] + seen_data = False + for ln in candidate: + if ln == "": + if seen_data and current: + blocks.append(current) + current = [] + seen_data = False + continue + current.append(ln) + seen_data = True + if current: + blocks.append(current) + + + if len(blocks) > 2: + # Merge any extra blocks into the second + blocks = [blocks[0], sum(blocks[1:], [])] + return blocks \ No newline at end of file