WIP
This commit is contained in:
parent
60baf02299
commit
14c99df447
101
tools/pdf_import/jmueller_parser.py
Normal file
101
tools/pdf_import/jmueller_parser.py
Normal file
@ -0,0 +1,101 @@
|
|||||||
|
# parser.py
|
||||||
|
# Utilities to extract text from the PDF and parse rows into records.
|
||||||
|
# Assumes two tables: first is Harbor A, second is Harbor B.
|
||||||
|
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
from csv import reader
|
||||||
|
import re
|
||||||
|
from typing import List, Dict, Any, Tuple
|
||||||
|
|
||||||
|
|
||||||
|
# Optional: If you want OCR fallback later, wire in pdf2image + pytesseract here.
|
||||||
|
|
||||||
|
|
||||||
|
def extract_text_lines(pdf_path: str) -> List[str]:
|
||||||
|
"""Extract text lines from a PDF using pdfplumber (preferred) with a
|
||||||
|
light fallback to PyPDF2. Returns a list of raw lines."""
|
||||||
|
text = ""
|
||||||
|
|
||||||
|
try:
|
||||||
|
import pdfplumber
|
||||||
|
with pdfplumber.open(pdf_path) as pdf:
|
||||||
|
pages_text = []
|
||||||
|
for p in pdf.pages:
|
||||||
|
t = p.extract_text() or ""
|
||||||
|
pages_text.append(t)
|
||||||
|
text = "\n".join(pages_text)
|
||||||
|
except Exception:
|
||||||
|
|
||||||
|
try:
|
||||||
|
from PyPDF2 import PdfReader
|
||||||
|
reader = PdfReader(pdf_path)
|
||||||
|
pages_text = []
|
||||||
|
for page in reader.pages:
|
||||||
|
pages_text.append(page.extract_text() or "")
|
||||||
|
text = "\n".join(pages_text)
|
||||||
|
except Exception:
|
||||||
|
text = ""
|
||||||
|
|
||||||
|
|
||||||
|
if not text.strip():
|
||||||
|
raise RuntimeError("No text extracted. If the PDF is scanned, add OCR fallback (pytesseract).")
|
||||||
|
|
||||||
|
|
||||||
|
# Normalize to individual lines
|
||||||
|
lines = [ln.strip() for ln in text.splitlines()]
|
||||||
|
return lines
|
||||||
|
|
||||||
|
|
||||||
|
HEADER_PATTERNS = [
|
||||||
|
re.compile(r"\bSchiff\b.*\bETA\b.*\bETS\b", re.IGNORECASE),
|
||||||
|
re.compile(r"Nächster Hafen|Liegeplatz|Ladung|Lotse", re.IGNORECASE),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
DATE_TOKEN = re.compile(r"\b\d{1,2}\.\d{1,2}\.(?:\d{4})?")
|
||||||
|
TIME_FRAGMENT = r"(?:\s*/\s*\d{1,2}\.\d{2}\s*Uhr\s*\*?)?"
|
||||||
|
DT_TOKEN_WITH_TIME = re.compile(r"\d{1,2}\.\d{1,2}\.(?:\d{4})?" + TIME_FRAGMENT)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def cleanse_lines(lines: List[str]) -> List[str]:
|
||||||
|
"""Remove known header lines and keep data/blank lines."""
|
||||||
|
out: List[str] = []
|
||||||
|
for ln in lines:
|
||||||
|
if not ln:
|
||||||
|
out.append("")
|
||||||
|
continue
|
||||||
|
if any(p.search(ln) for p in HEADER_PATTERNS):
|
||||||
|
continue
|
||||||
|
out.append(ln)
|
||||||
|
return out
|
||||||
|
|
||||||
|
def split_into_tables(lines: List[str]) -> List[List[str]]:
|
||||||
|
"""Find candidate data lines (those containing a date token) and split them
|
||||||
|
into up to two blocks separated by at least one blank line. Returns a list
|
||||||
|
of blocks (1 or 2)."""
|
||||||
|
candidate = [ln for ln in lines if (ln == "" or DATE_TOKEN.search(ln))]
|
||||||
|
|
||||||
|
|
||||||
|
blocks: List[List[str]] = []
|
||||||
|
current: List[str] = []
|
||||||
|
seen_data = False
|
||||||
|
for ln in candidate:
|
||||||
|
if ln == "":
|
||||||
|
if seen_data and current:
|
||||||
|
blocks.append(current)
|
||||||
|
current = []
|
||||||
|
seen_data = False
|
||||||
|
continue
|
||||||
|
current.append(ln)
|
||||||
|
seen_data = True
|
||||||
|
if current:
|
||||||
|
blocks.append(current)
|
||||||
|
|
||||||
|
|
||||||
|
if len(blocks) > 2:
|
||||||
|
# Merge any extra blocks into the second
|
||||||
|
blocks = [blocks[0], sum(blocks[1:], [])]
|
||||||
|
return blocks
|
||||||
Loading…
Reference in New Issue
Block a user