git_brcal/tools/pdf_import/jmueller_parser.py
2025-11-13 13:04:23 +01:00

166 lines
4.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# parser.py
# Utilities to extract text from the PDF and parse rows into records.
# Assumes two tables: first is Harbor A, second is Harbor B.
from __future__ import annotations
from csv import reader
import re
from typing import List, Dict, Any, Tuple
# Optional: If you want OCR fallback later, wire in pdf2image + pytesseract here.
def extract_text_lines(pdf_path: str) -> List[str]:
"""Extract text lines from a PDF using pdfplumber (preferred) with a
light fallback to PyPDF2. Returns a list of raw lines."""
text = ""
try:
import pdfplumber
with pdfplumber.open(pdf_path) as pdf:
pages_text = []
for p in pdf.pages:
t = p.extract_text() or ""
pages_text.append(t)
text = "\n".join(pages_text)
except Exception:
try:
from PyPDF2 import PdfReader
reader = PdfReader(pdf_path)
pages_text = []
for page in reader.pages:
pages_text.append(page.extract_text() or "")
text = "\n".join(pages_text)
except Exception:
text = ""
if not text.strip():
raise RuntimeError("No text extracted. If the PDF is scanned, add OCR fallback (pytesseract).")
# Normalize to individual lines
lines = [ln.strip() for ln in text.splitlines()]
return lines
HEADER_PATTERNS = [
re.compile(r"\bSchiff\b.*\bETA\b.*\bETS\b", re.IGNORECASE),
re.compile(r"Nächster Hafen|Liegeplatz|Ladung|Lotse", re.IGNORECASE),
]
DATE_TOKEN = re.compile(r"\b\d{1,2}\.\d{1,2}\.(?:\d{4})?")
TIME_FRAGMENT = r"(?:\s*/\s*\d{1,2}\.\d{2}\s*Uhr\s*\*?)?"
DT_TOKEN_WITH_TIME = re.compile(r"\d{1,2}\.\d{1,2}\.(?:\d{4})?" + TIME_FRAGMENT)
def cleanse_lines(lines: List[str]) -> List[str]:
"""Remove known header lines and keep data/blank lines."""
out: List[str] = []
for ln in lines:
if not ln:
out.append("")
continue
if any(p.search(ln) for p in HEADER_PATTERNS):
continue
out.append(ln)
return out
def split_into_tables(lines: List[str]) -> List[List[str]]:
"""Find candidate data lines (those containing a date token) and split them
into up to two blocks separated by at least one blank line. Returns a list
of blocks (1 or 2)."""
candidate = [ln for ln in lines if (ln == "" or DATE_TOKEN.search(ln))]
blocks: List[List[str]] = []
current: List[str] = []
seen_data = False
for ln in candidate:
if ln == "":
if seen_data and current:
blocks.append(current)
current = []
seen_data = False
continue
current.append(ln)
seen_data = True
if current:
blocks.append(current)
if len(blocks) > 2:
# Merge any extra blocks into the second
blocks = [blocks[0], sum(blocks[1:], [])]
return blocks
def parse_line_to_record(ln: str) -> Dict[str, Any]:
"""Parse a single table line into a minimal record.
Output fields:
- ship: text before the first date token
- eta_raw: 1st date(+optional time) token as raw string
- ets_raw: 2nd date(+optional time) token as raw string (if present)
- notes: remainder of the line after the last extracted date token
- raw_line: the full original line
"""
# Ship name up to the first date token
first = DATE_TOKEN.search(ln)
ship = ln[: first.start()].strip() if first else ln.strip()
# Extract up to two date(+time) tokens
dt_tokens = DT_TOKEN_WITH_TIME.findall(ln)
eta_raw = dt_tokens[0].strip() if len(dt_tokens) >= 1 else None
ets_raw = dt_tokens[1].strip() if len(dt_tokens) >= 2 else None
# Notes: everything after the last date token we captured
notes = ""
if dt_tokens:
last_match = None
it = DT_TOKEN_WITH_TIME.finditer(ln)
for last_match in it:
pass
if last_match:
notes = ln[last_match.end() :].strip()
return {
"ship": ship,
"eta_raw": eta_raw,
"ets_raw": ets_raw,
"notes": notes,
"raw_line": ln,
}
def parse_pdf_to_records(pdf_path: str) -> List[Dict[str, Any]]:
"""High-level: extract lines, cleanse headers, split into 12 tables,
tag as harbor A/B by order, parse rows → records."""
lines = extract_text_lines(pdf_path)
clean = cleanse_lines(lines)
blocks = split_into_tables(clean)
records: List[Dict[str, Any]] = []
for i, block in enumerate(blocks):
harbor = "A" if i == 0 else "B"
for ln in block:
if not ln.strip():
continue
rec = parse_line_to_record(ln)
rec["harbor"] = harbor
records.append(rec)
return records