# parser.py # Utilities to extract text from the PDF and parse rows into records. # Assumes two tables: first is Harbor A, second is Harbor B. from __future__ import annotations from csv import reader import re from typing import List, Dict, Any, Tuple # Optional: If you want OCR fallback later, wire in pdf2image + pytesseract here. def extract_text_lines(pdf_path: str) -> List[str]: """Extract text lines from a PDF using pdfplumber (preferred) with a light fallback to PyPDF2. Returns a list of raw lines.""" text = "" try: import pdfplumber with pdfplumber.open(pdf_path) as pdf: pages_text = [] for p in pdf.pages: t = p.extract_text() or "" pages_text.append(t) text = "\n".join(pages_text) except Exception: try: from PyPDF2 import PdfReader reader = PdfReader(pdf_path) pages_text = [] for page in reader.pages: pages_text.append(page.extract_text() or "") text = "\n".join(pages_text) except Exception: text = "" if not text.strip(): raise RuntimeError("No text extracted. If the PDF is scanned, add OCR fallback (pytesseract).") # Normalize to individual lines lines = [ln.strip() for ln in text.splitlines()] return lines HEADER_PATTERNS = [ re.compile(r"\bSchiff\b.*\bETA\b.*\bETS\b", re.IGNORECASE), re.compile(r"Nächster Hafen|Liegeplatz|Ladung|Lotse", re.IGNORECASE), ] DATE_TOKEN = re.compile(r"\b\d{1,2}\.\d{1,2}\.(?:\d{4})?") TIME_FRAGMENT = r"(?:\s*/\s*\d{1,2}\.\d{2}\s*Uhr\s*\*?)?" DT_TOKEN_WITH_TIME = re.compile(r"\d{1,2}\.\d{1,2}\.(?:\d{4})?" + TIME_FRAGMENT) def cleanse_lines(lines: List[str]) -> List[str]: """Remove known header lines and keep data/blank lines.""" out: List[str] = [] for ln in lines: if not ln: out.append("") continue if any(p.search(ln) for p in HEADER_PATTERNS): continue out.append(ln) return out def split_into_tables(lines: List[str]) -> List[List[str]]: """Find candidate data lines (those containing a date token) and split them into up to two blocks separated by at least one blank line. Returns a list of blocks (1 or 2).""" candidate = [ln for ln in lines if (ln == "" or DATE_TOKEN.search(ln))] blocks: List[List[str]] = [] current: List[str] = [] seen_data = False for ln in candidate: if ln == "": if seen_data and current: blocks.append(current) current = [] seen_data = False continue current.append(ln) seen_data = True if current: blocks.append(current) if len(blocks) > 2: # Merge any extra blocks into the second blocks = [blocks[0], sum(blocks[1:], [])] return blocks