This commit is contained in:
Daniel Schick 2025-11-13 12:59:23 +01:00
parent 354e65fb71
commit 913fd6ca22
2 changed files with 238 additions and 1 deletions

View File

@ -98,4 +98,69 @@ def split_into_tables(lines: List[str]) -> List[List[str]]:
if len(blocks) > 2:
# Merge any extra blocks into the second
blocks = [blocks[0], sum(blocks[1:], [])]
return blocks
return blocks
def parse_line_to_record(ln: str) -> Dict[str, Any]:
"""Parse a single table line into a minimal record.
Output fields:
- ship: text before the first date token
- eta_raw: 1st date(+optional time) token as raw string
- ets_raw: 2nd date(+optional time) token as raw string (if present)
- notes: remainder of the line after the last extracted date token
- raw_line: the full original line
"""
# Ship name up to the first date token
first = DATE_TOKEN.search(ln)
ship = ln[: first.start()].strip() if first else ln.strip()
# Extract up to two date(+time) tokens
dt_tokens = DT_TOKEN_WITH_TIME.findall(ln)
eta_raw = dt_tokens[0].strip() if len(dt_tokens) >= 1 else None
ets_raw = dt_tokens[1].strip() if len(dt_tokens) >= 2 else None
# Notes: everything after the last date token we captured
notes = ""
if dt_tokens:
last_match = None
it = DT_TOKEN_WITH_TIME.finditer(ln)
for last_match in it:
pass
if last_match:
notes = ln[last_match.end() :].strip()
return {
"ship": ship,
"eta_raw": eta_raw,
"ets_raw": ets_raw,
"notes": notes,
"raw_line": ln,
}
def parse_pdf_to_records(pdf_path: str) -> List[Dict[str, Any]]:
"""High-level: extract lines, cleanse headers, split into 12 tables,
tag as harbor A/B by order, parse rows records."""
lines = extract_text_lines(pdf_path)
clean = cleanse_lines(lines)
blocks = split_into_tables(clean)
records: List[Dict[str, Any]] = []
for i, block in enumerate(blocks):
harbor = "A" if i == 0 else "B"
for ln in block:
if not ln.strip():
continue
rec = parse_line_to_record(ln)
rec["harbor"] = harbor
records.append(rec)
return records

View File

@ -0,0 +1,172 @@
# pdf_to_records.py
# CLI: parse a PDF and write JSONL (default) or CSV with one record per row.
from __future__ import annotations
import argparse, json, csv, re
from pathlib import Path
from typing import List, Dict, Any
# -----------------------------
# PDF text extraction helpers
# -----------------------------
HEADER_PATTERNS = [
re.compile(r"\bSchiff\b.*\bETA\b.*\bETS\b", re.IGNORECASE),
re.compile(r"Nächster Hafen|Liegeplatz|Ladung|Lotse", re.IGNORECASE),
]
DATE_TOKEN = re.compile(r"\b\d{1,2}\.\d{1,2}\.(?:\d{4})?")
TIME_FRAGMENT = r"(?:\s*/\s*\d{1,2}\.\d{2}\s*Uhr\s*\*?)?"
DT_TOKEN_WITH_TIME = re.compile(r"\d{1,2}\.\d{1,2}\.(?:\d{4})?" + TIME_FRAGMENT)
def extract_text_lines(pdf_path: str) -> List[str]:
"""Extract raw text lines from the PDF. Prefers pdfplumber with PyPDF2 fallback."""
text = ""
try:
import pdfplumber
with pdfplumber.open(pdf_path) as pdf:
pages_text = [(p.extract_text() or "") for p in pdf.pages]
text = "\n".join(pages_text)
except Exception:
try:
from PyPDF2 import PdfReader
reader = PdfReader(pdf_path)
pages_text = [(page.extract_text() or "") for page in reader.pages]
text = "\n".join(pages_text)
except Exception:
text = ""
if not text.strip():
raise RuntimeError(
"No text extracted. If the PDF is scanned, consider adding OCR fallback."
)
return [ln.strip() for ln in text.splitlines()]
def cleanse_lines(lines: List[str]) -> List[str]:
"""Remove headers, keep data lines and blanks for table boundaries."""
cleaned: List[str] = []
for ln in lines:
if not ln:
cleaned.append("")
continue
if any(pattern.search(ln) for pattern in HEADER_PATTERNS):
continue
cleaned.append(ln)
return cleaned
def split_into_tables(lines: List[str]) -> List[List[str]]:
"""Split lines into up to two tables, separated by blank lines."""
candidate = [ln for ln in lines if (ln == "" or DATE_TOKEN.search(ln))]
blocks: List[List[str]] = []
current: List[str] = []
seen_data = False
for ln in candidate:
if ln == "":
if seen_data and current:
blocks.append(current)
current = []
seen_data = False
continue
current.append(ln)
seen_data = True
if current:
blocks.append(current)
if len(blocks) > 2:
blocks = [blocks[0], sum(blocks[1:], [])]
return blocks
def parse_line_to_record(ln: str) -> Dict[str, Any]:
"""Parse a table line into structured fields."""
first = DATE_TOKEN.search(ln)
ship = ln[: first.start()].strip() if first else ln.strip()
dt_tokens = DT_TOKEN_WITH_TIME.findall(ln)
eta_raw = dt_tokens[0].strip() if len(dt_tokens) >= 1 else None
ets_raw = dt_tokens[1].strip() if len(dt_tokens) >= 2 else None
notes = ""
if dt_tokens:
last_match = None
for last_match in DT_TOKEN_WITH_TIME.finditer(ln):
pass
if last_match:
notes = ln[last_match.end() :].strip()
return {
"ship": ship,
"eta_raw": eta_raw,
"ets_raw": ets_raw,
"notes": notes,
"raw_line": ln,
}
def parse_pdf_to_records(pdf_path: str) -> List[Dict[str, Any]]:
"""High-level parser: extract text, sanitize, split per harbor, parse rows."""
lines = extract_text_lines(pdf_path)
clean = cleanse_lines(lines)
blocks = split_into_tables(clean)
records: List[Dict[str, Any]] = []
for i, block in enumerate(blocks):
harbor = "A" if i == 0 else "B"
for ln in block:
if not ln.strip():
continue
rec = parse_line_to_record(ln)
rec["harbor"] = harbor
records.append(rec)
return records
def write_jsonl(path: Path, rows: List[Dict[str, Any]]):
with path.open("w", encoding="utf-8") as f:
for r in rows:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
def write_csv(path: Path, rows: List[Dict[str, Any]]):
if not rows:
path.write_text("", encoding="utf-8")
return
fieldnames = ["harbor", "ship", "eta_raw", "ets_raw", "notes"]
with path.open("w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=fieldnames)
w.writeheader()
for r in rows:
w.writerow({k: r.get(k) for k in fieldnames})
def main():
ap = argparse.ArgumentParser(description="Parse ship tables PDF → records (A/B)")
ap.add_argument("pdf", help="Path to partner PDF")
ap.add_argument("--out", help="Output file path (default: <pdf>.jsonl)")
ap.add_argument("--format", choices=["jsonl", "csv"], default="jsonl")
args = ap.parse_args()
rows = parse_pdf_to_records(args.pdf)
out = Path(args.out) if args.out else Path(args.pdf).with_suffix(".jsonl")
if args.format == "jsonl":
write_jsonl(out, rows)
else:
write_csv(out, rows)
print(f"Wrote {len(rows)} records -> {out}")
if __name__ == "__main__":
main()