101 lines
2.9 KiB
Python
101 lines
2.9 KiB
Python
# parser.py
|
|
# Utilities to extract text from the PDF and parse rows into records.
|
|
# Assumes two tables: first is Harbor A, second is Harbor B.
|
|
|
|
|
|
from __future__ import annotations
|
|
from csv import reader
|
|
import re
|
|
from typing import List, Dict, Any, Tuple
|
|
|
|
|
|
# Optional: If you want OCR fallback later, wire in pdf2image + pytesseract here.
|
|
|
|
|
|
def extract_text_lines(pdf_path: str) -> List[str]:
|
|
"""Extract text lines from a PDF using pdfplumber (preferred) with a
|
|
light fallback to PyPDF2. Returns a list of raw lines."""
|
|
text = ""
|
|
|
|
try:
|
|
import pdfplumber
|
|
with pdfplumber.open(pdf_path) as pdf:
|
|
pages_text = []
|
|
for p in pdf.pages:
|
|
t = p.extract_text() or ""
|
|
pages_text.append(t)
|
|
text = "\n".join(pages_text)
|
|
except Exception:
|
|
|
|
try:
|
|
from PyPDF2 import PdfReader
|
|
reader = PdfReader(pdf_path)
|
|
pages_text = []
|
|
for page in reader.pages:
|
|
pages_text.append(page.extract_text() or "")
|
|
text = "\n".join(pages_text)
|
|
except Exception:
|
|
text = ""
|
|
|
|
|
|
if not text.strip():
|
|
raise RuntimeError("No text extracted. If the PDF is scanned, add OCR fallback (pytesseract).")
|
|
|
|
|
|
# Normalize to individual lines
|
|
lines = [ln.strip() for ln in text.splitlines()]
|
|
return lines
|
|
|
|
|
|
HEADER_PATTERNS = [
|
|
re.compile(r"\bSchiff\b.*\bETA\b.*\bETS\b", re.IGNORECASE),
|
|
re.compile(r"Nächster Hafen|Liegeplatz|Ladung|Lotse", re.IGNORECASE),
|
|
]
|
|
|
|
|
|
DATE_TOKEN = re.compile(r"\b\d{1,2}\.\d{1,2}\.(?:\d{4})?")
|
|
TIME_FRAGMENT = r"(?:\s*/\s*\d{1,2}\.\d{2}\s*Uhr\s*\*?)?"
|
|
DT_TOKEN_WITH_TIME = re.compile(r"\d{1,2}\.\d{1,2}\.(?:\d{4})?" + TIME_FRAGMENT)
|
|
|
|
|
|
|
|
|
|
def cleanse_lines(lines: List[str]) -> List[str]:
|
|
"""Remove known header lines and keep data/blank lines."""
|
|
out: List[str] = []
|
|
for ln in lines:
|
|
if not ln:
|
|
out.append("")
|
|
continue
|
|
if any(p.search(ln) for p in HEADER_PATTERNS):
|
|
continue
|
|
out.append(ln)
|
|
return out
|
|
|
|
def split_into_tables(lines: List[str]) -> List[List[str]]:
|
|
"""Find candidate data lines (those containing a date token) and split them
|
|
into up to two blocks separated by at least one blank line. Returns a list
|
|
of blocks (1 or 2)."""
|
|
candidate = [ln for ln in lines if (ln == "" or DATE_TOKEN.search(ln))]
|
|
|
|
|
|
blocks: List[List[str]] = []
|
|
current: List[str] = []
|
|
seen_data = False
|
|
for ln in candidate:
|
|
if ln == "":
|
|
if seen_data and current:
|
|
blocks.append(current)
|
|
current = []
|
|
seen_data = False
|
|
continue
|
|
current.append(ln)
|
|
seen_data = True
|
|
if current:
|
|
blocks.append(current)
|
|
|
|
|
|
if len(blocks) > 2:
|
|
# Merge any extra blocks into the second
|
|
blocks = [blocks[0], sum(blocks[1:], [])]
|
|
return blocks |