Compare commits
3 Commits
913fd6ca22
...
24e7f0f6f4
| Author | SHA1 | Date | |
|---|---|---|---|
| 24e7f0f6f4 | |||
| 5b61102356 | |||
| e4d82835da |
51
src/brecal_api_client/README.md
Normal file
51
src/brecal_api_client/README.md
Normal file
@ -0,0 +1,51 @@
|
||||
# BreCal API Client
|
||||
|
||||
Minimal Python helper for `misc/BreCalApi.yaml`. It focuses on the login, shipcall, and times endpoints needed by CLI tools, but the helper method `BreCalClient.raw_request` makes it straightforward to call any other endpoint defined in the OpenAPI specification.
|
||||
|
||||
Dependencies: only the `requests` package in addition to the standard library.
|
||||
|
||||
## Endpoint selection
|
||||
|
||||
`BreCalClient` reads its default `base_url` from `~/.config/brecal/client.json`. The file lets you define multiple deployments and switch between them without modifying code:
|
||||
|
||||
```json
|
||||
{
|
||||
"environment": "devel",
|
||||
"endpoints": {
|
||||
"local": "http://localhost:5000",
|
||||
"devel": "https://brecaldevel.bsmd-emswe.eu",
|
||||
"test": "https://brecaltest.example.net",
|
||||
"prod": "https://brecal.example.com"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Override the selection at runtime via `BreCalClient(base_url="...")` or the environment variable `BRECAL_BASE_URL`. If no config is present the client falls back to the development server URL.
|
||||
|
||||
## Credentials
|
||||
|
||||
Store credentials in `~/.config/brecal/credentials.json`:
|
||||
|
||||
```json
|
||||
{
|
||||
"username": "alfred",
|
||||
"password": "123456"
|
||||
}
|
||||
```
|
||||
|
||||
You can override the location when calling `Credentials.load("/path/to/file.json")` or provide credentials from environment variables via `Credentials.from_env()`.
|
||||
|
||||
## Example
|
||||
|
||||
```python
|
||||
from brecal_api_client import BreCalClient, Credentials
|
||||
|
||||
creds = Credentials.load()
|
||||
with BreCalClient(credentials=creds) as client:
|
||||
# list ship calls from the last week
|
||||
shipcalls = client.get_shipcalls(past_days=7)
|
||||
|
||||
# create/update ship calls or times
|
||||
shipcall_id = client.create_shipcall({...})
|
||||
times = client.get_times(shipcall_id=shipcall_id)
|
||||
```
|
||||
25
src/brecal_api_client/__init__.py
Normal file
25
src/brecal_api_client/__init__.py
Normal file
@ -0,0 +1,25 @@
|
||||
"""Simple Python client for the BreCal REST API."""
|
||||
|
||||
from .client import BreCalClient, DEFAULT_BASE_URL
|
||||
from .config import ClientConfig, get_default_base_url
|
||||
from .credentials import Credentials
|
||||
from .exceptions import (
|
||||
AuthenticationError,
|
||||
AuthorizationError,
|
||||
BreCalApiError,
|
||||
ClientConfigurationError,
|
||||
)
|
||||
from .types import LoginResult
|
||||
|
||||
__all__ = [
|
||||
"BreCalClient",
|
||||
"Credentials",
|
||||
"ClientConfig",
|
||||
"get_default_base_url",
|
||||
"LoginResult",
|
||||
"DEFAULT_BASE_URL",
|
||||
"BreCalApiError",
|
||||
"AuthenticationError",
|
||||
"AuthorizationError",
|
||||
"ClientConfigurationError",
|
||||
]
|
||||
248
src/brecal_api_client/client.py
Normal file
248
src/brecal_api_client/client.py
Normal file
@ -0,0 +1,248 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, Iterable, Mapping, Optional, Sequence
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import requests
|
||||
import time
|
||||
|
||||
from .config import get_default_base_url
|
||||
from .credentials import Credentials
|
||||
from .exceptions import (
|
||||
AuthenticationError,
|
||||
AuthorizationError,
|
||||
BreCalApiError,
|
||||
ClientConfigurationError,
|
||||
)
|
||||
from .types import JsonDict, LoginResult, MutableJsonDict
|
||||
|
||||
DEFAULT_BASE_URL = get_default_base_url()
|
||||
|
||||
|
||||
@dataclass
|
||||
class _RequestContext:
|
||||
method: str
|
||||
path: str
|
||||
expected: Sequence[int]
|
||||
auth: bool
|
||||
|
||||
|
||||
class BreCalClient:
|
||||
"""Thin convenience wrapper around the BreCal REST API."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: Optional[str] = None,
|
||||
*,
|
||||
credentials: Optional[Credentials] = None,
|
||||
timeout: float = 30.0,
|
||||
session: Optional[requests.Session] = None,
|
||||
auto_login: bool = True,
|
||||
) -> None:
|
||||
resolved_base_url = base_url or get_default_base_url()
|
||||
if not resolved_base_url:
|
||||
raise ClientConfigurationError("base_url must be provided.")
|
||||
self.base_url = resolved_base_url.rstrip("/")
|
||||
self._timeout = timeout
|
||||
self._session = session or requests.Session()
|
||||
self._credentials = credentials
|
||||
self._login: Optional[LoginResult] = None
|
||||
if auto_login and credentials is not None:
|
||||
self.login(credentials)
|
||||
|
||||
# -----------------------------------------------------
|
||||
# lifecycle helpers
|
||||
# -----------------------------------------------------
|
||||
def close(self) -> None:
|
||||
self._session.close()
|
||||
|
||||
def __enter__(self) -> "BreCalClient":
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb) -> None:
|
||||
self.close()
|
||||
|
||||
# -----------------------------------------------------
|
||||
# authentication
|
||||
# -----------------------------------------------------
|
||||
@property
|
||||
def token(self) -> Optional[str]:
|
||||
return self._login.token if self._login else None
|
||||
|
||||
@property
|
||||
def login_info(self) -> Optional[LoginResult]:
|
||||
return self._login
|
||||
|
||||
def ensure_authenticated(self) -> None:
|
||||
if self._login and self._login.expires_at.timestamp() > _epoch_seconds() + 30:
|
||||
return
|
||||
if not self._credentials:
|
||||
raise AuthenticationError(
|
||||
"Client has no stored credentials. Call login() with credentials first."
|
||||
)
|
||||
self.login(self._credentials)
|
||||
|
||||
def login(self, credentials: Credentials) -> LoginResult:
|
||||
payload = {"username": credentials.username, "password": credentials.password}
|
||||
data = self._request_json(
|
||||
_RequestContext("POST", "/login", expected=(200,), auth=False),
|
||||
json=payload,
|
||||
)
|
||||
if not isinstance(data, Mapping):
|
||||
raise AuthenticationError("Login returned unexpected payload.")
|
||||
result = LoginResult.from_api(data)
|
||||
if not result.token:
|
||||
raise AuthenticationError("Login response did not include a token.")
|
||||
self._login = result
|
||||
self._credentials = credentials
|
||||
return result
|
||||
|
||||
# -----------------------------------------------------
|
||||
# shipcalls
|
||||
# -----------------------------------------------------
|
||||
def get_shipcalls(self, *, past_days: Optional[int] = None) -> Sequence[JsonDict]:
|
||||
params: Dict[str, Any] = {}
|
||||
if past_days is not None:
|
||||
params["past_days"] = int(past_days)
|
||||
data = self._request_json(
|
||||
_RequestContext("GET", "/shipcalls", expected=(200,), auth=True),
|
||||
params=params or None,
|
||||
)
|
||||
return _as_sequence_of_dicts(data)
|
||||
|
||||
def create_shipcall(self, shipcall: Mapping[str, Any]) -> int:
|
||||
payload = _copy_without_keys(shipcall, drop_keys=("id",))
|
||||
data = self._request_json(
|
||||
_RequestContext("POST", "/shipcalls", expected=(201,), auth=True),
|
||||
json=payload,
|
||||
)
|
||||
return _extract_id(data)
|
||||
|
||||
def update_shipcall(self, shipcall: Mapping[str, Any]) -> int:
|
||||
if "id" not in shipcall:
|
||||
raise ValueError("Shipcall update requires an 'id' field.")
|
||||
data = self._request_json(
|
||||
_RequestContext("PUT", "/shipcalls", expected=(200,), auth=True),
|
||||
json=dict(shipcall),
|
||||
)
|
||||
return _extract_id(data)
|
||||
|
||||
# -----------------------------------------------------
|
||||
# times
|
||||
# -----------------------------------------------------
|
||||
def get_times(self, *, shipcall_id: Optional[int] = None) -> Sequence[JsonDict]:
|
||||
params = {"shipcall_id": int(shipcall_id)} if shipcall_id is not None else None
|
||||
data = self._request_json(
|
||||
_RequestContext("GET", "/times", expected=(200,), auth=True),
|
||||
params=params,
|
||||
)
|
||||
return _as_sequence_of_dicts(data)
|
||||
|
||||
def create_times(self, entry: Mapping[str, Any]) -> int:
|
||||
payload = _copy_without_keys(entry, drop_keys=("id",))
|
||||
data = self._request_json(
|
||||
_RequestContext("POST", "/times", expected=(201,), auth=True),
|
||||
json=payload,
|
||||
)
|
||||
return _extract_id(data)
|
||||
|
||||
def update_times(self, entry: Mapping[str, Any]) -> int:
|
||||
if "id" not in entry:
|
||||
raise ValueError("Times update requires an 'id' field.")
|
||||
data = self._request_json(
|
||||
_RequestContext("PUT", "/times", expected=(200,), auth=True),
|
||||
json=dict(entry),
|
||||
)
|
||||
return _extract_id(data)
|
||||
|
||||
def delete_times(self, times_id: int) -> int:
|
||||
data = self._request_json(
|
||||
_RequestContext("DELETE", "/times", expected=(200,), auth=True),
|
||||
params={"id": int(times_id)},
|
||||
)
|
||||
return _extract_id(data)
|
||||
|
||||
# -----------------------------------------------------
|
||||
# generic helpers
|
||||
# -----------------------------------------------------
|
||||
def raw_request(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
*,
|
||||
expected: Sequence[int] = (200,),
|
||||
auth: bool = True,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
"""Expose the low-level request helper for endpoints not wrapped yet."""
|
||||
ctx = _RequestContext(method.upper(), path, expected, auth)
|
||||
return self._request_json(ctx, **kwargs)
|
||||
|
||||
def _request_json(self, ctx: _RequestContext, **kwargs: Any) -> Any:
|
||||
url = urljoin(f"{self.base_url}/", ctx.path.lstrip("/"))
|
||||
headers: Dict[str, str] = kwargs.pop("headers", {})
|
||||
headers.setdefault("Accept", "application/json")
|
||||
if "json" in kwargs:
|
||||
headers.setdefault("Content-Type", "application/json")
|
||||
if ctx.auth:
|
||||
self.ensure_authenticated()
|
||||
headers.setdefault("Authorization", f"Bearer {self.token}")
|
||||
|
||||
response = self._session.request(
|
||||
ctx.method,
|
||||
url,
|
||||
timeout=self._timeout,
|
||||
headers=headers,
|
||||
**kwargs,
|
||||
)
|
||||
if response.status_code == 401 or response.status_code == 403:
|
||||
raise AuthorizationError(
|
||||
f"{ctx.method} {ctx.path} returned {response.status_code}",
|
||||
status_code=response.status_code,
|
||||
payload=_safe_json(response),
|
||||
)
|
||||
if response.status_code not in ctx.expected:
|
||||
raise BreCalApiError(
|
||||
f"{ctx.method} {ctx.path} returned {response.status_code}",
|
||||
status_code=response.status_code,
|
||||
payload=_safe_json(response),
|
||||
)
|
||||
if response.content:
|
||||
return _safe_json(response)
|
||||
return None
|
||||
|
||||
|
||||
def _copy_without_keys(
|
||||
data: Mapping[str, Any], *, drop_keys: Iterable[str]
|
||||
) -> MutableJsonDict:
|
||||
payload: MutableJsonDict = dict(data)
|
||||
for key in drop_keys:
|
||||
payload.pop(key, None)
|
||||
return payload
|
||||
|
||||
|
||||
def _extract_id(payload: Any) -> int:
|
||||
if isinstance(payload, Mapping) and "id" in payload:
|
||||
return int(payload["id"])
|
||||
raise BreCalApiError("API response did not include an 'id' field.", payload=payload)
|
||||
|
||||
|
||||
def _as_sequence_of_dicts(data: Any) -> Sequence[JsonDict]:
|
||||
if isinstance(data, list):
|
||||
return data
|
||||
raise BreCalApiError("Expected list response from API.", payload=data)
|
||||
|
||||
|
||||
def _safe_json(response: requests.Response) -> Any:
|
||||
content_type = response.headers.get("Content-Type", "")
|
||||
if "application/json" in content_type:
|
||||
try:
|
||||
return response.json()
|
||||
except ValueError:
|
||||
pass
|
||||
return response.text
|
||||
|
||||
|
||||
def _epoch_seconds() -> int:
|
||||
return int(time.time())
|
||||
74
src/brecal_api_client/config.py
Normal file
74
src/brecal_api_client/config.py
Normal file
@ -0,0 +1,74 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any, Mapping, Optional, Union
|
||||
|
||||
from .exceptions import ClientConfigurationError
|
||||
|
||||
ConfigPath = Union[str, Path]
|
||||
|
||||
DEFAULT_BASE_URL_FALLBACK = "https://brecaldevel.bsmd-emswe.eu"
|
||||
CONFIG_FILENAME = "client.json"
|
||||
|
||||
|
||||
def _default_config_path() -> Path:
|
||||
xdg = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config"))
|
||||
return (xdg / "brecal" / CONFIG_FILENAME).expanduser()
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ClientConfig:
|
||||
base_url: str
|
||||
environment: Optional[str] = None
|
||||
|
||||
@classmethod
|
||||
def from_mapping(cls, data: Mapping[str, Any]) -> "ClientConfig":
|
||||
environment = data.get("environment")
|
||||
base_url = data.get("base_url")
|
||||
endpoints = data.get("endpoints")
|
||||
|
||||
if isinstance(endpoints, Mapping):
|
||||
if environment and environment in endpoints:
|
||||
base_url = endpoints[environment]
|
||||
elif not base_url and endpoints:
|
||||
# Pick the first entry as a last resort
|
||||
_, base_url = next(iter(endpoints.items()))
|
||||
|
||||
if not base_url:
|
||||
raise ClientConfigurationError(
|
||||
"Client configuration requires either 'base_url' or an "
|
||||
"'endpoints' mapping."
|
||||
)
|
||||
|
||||
return cls(
|
||||
base_url=str(base_url).rstrip("/"),
|
||||
environment=str(environment) if environment else None,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def load(cls, path: Optional[ConfigPath] = None) -> "ClientConfig":
|
||||
file_path = Path(path) if path else _default_config_path()
|
||||
data = json.loads(file_path.read_text(encoding="utf-8"))
|
||||
return cls.from_mapping(data)
|
||||
|
||||
|
||||
def get_default_base_url(path: Optional[ConfigPath] = None) -> str:
|
||||
"""Resolve the default base URL using env vars or ~/.config/brecal/client.json."""
|
||||
env_override = os.getenv("BRECAL_BASE_URL")
|
||||
if env_override:
|
||||
return env_override.rstrip("/")
|
||||
|
||||
try:
|
||||
config = ClientConfig.load(path=path)
|
||||
return config.base_url
|
||||
except FileNotFoundError:
|
||||
return DEFAULT_BASE_URL_FALLBACK
|
||||
except ClientConfigurationError:
|
||||
raise
|
||||
except Exception as exc:
|
||||
raise ClientConfigurationError(
|
||||
f"Failed to load BreCal client configuration: {exc}"
|
||||
) from exc
|
||||
68
src/brecal_api_client/credentials.py
Normal file
68
src/brecal_api_client/credentials.py
Normal file
@ -0,0 +1,68 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any, Mapping, Optional, Union
|
||||
|
||||
ConfigPath = Union[str, Path]
|
||||
|
||||
|
||||
def _default_credentials_path() -> Path:
|
||||
"""Return the default path for the credential file."""
|
||||
xdg = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config"))
|
||||
return (xdg / "brecal" / "credentials.json").expanduser()
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Credentials:
|
||||
"""Holds username/password pairs for the BreCal API."""
|
||||
|
||||
username: str
|
||||
password: str
|
||||
|
||||
@classmethod
|
||||
def from_mapping(cls, data: Mapping[str, Any]) -> "Credentials":
|
||||
"""Create credentials from a mapping (dict, TOML config, etc.)."""
|
||||
username = _coalesce_key(
|
||||
data, ("username", "user_name", "user"), required="username"
|
||||
)
|
||||
password = _coalesce_key(
|
||||
data, ("password", "pass", "secret"), required="password"
|
||||
)
|
||||
if not isinstance(username, str) or not username.strip():
|
||||
raise ValueError("BreCal credentials require a non-empty username.")
|
||||
if not isinstance(password, str) or not password:
|
||||
raise ValueError("BreCal credentials require a non-empty password.")
|
||||
return cls(username=username.strip(), password=password)
|
||||
|
||||
@classmethod
|
||||
def load(cls, path: Optional[ConfigPath] = None) -> "Credentials":
|
||||
"""Load credentials from a JSON file."""
|
||||
file_path = Path(path) if path else _default_credentials_path()
|
||||
text = file_path.read_text(encoding="utf-8")
|
||||
data = json.loads(text)
|
||||
return cls.from_mapping(data)
|
||||
|
||||
@classmethod
|
||||
def from_env(
|
||||
cls, username_var: str = "BRECAL_USERNAME", password_var: str = "BRECAL_PASSWORD"
|
||||
) -> "Credentials":
|
||||
"""Load credentials from environment variables."""
|
||||
username = os.getenv(username_var)
|
||||
password = os.getenv(password_var)
|
||||
if not username or not password:
|
||||
raise EnvironmentError(
|
||||
f"Missing credentials in env vars {username_var}/{password_var}"
|
||||
)
|
||||
return cls(username=username, password=password)
|
||||
|
||||
|
||||
def _coalesce_key(
|
||||
data: Mapping[str, Any], keys: tuple[str, ...], *, required: str
|
||||
) -> Any:
|
||||
for key in keys:
|
||||
if key in data:
|
||||
return data[key]
|
||||
raise KeyError(f"Missing '{required}' in credentials mapping.")
|
||||
30
src/brecal_api_client/exceptions.py
Normal file
30
src/brecal_api_client/exceptions.py
Normal file
@ -0,0 +1,30 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Optional
|
||||
|
||||
|
||||
class BreCalApiError(RuntimeError):
|
||||
"""Base exception for API client failures."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
*,
|
||||
status_code: Optional[int] = None,
|
||||
payload: Optional[Any] = None,
|
||||
) -> None:
|
||||
super().__init__(message)
|
||||
self.status_code = status_code
|
||||
self.payload = payload
|
||||
|
||||
|
||||
class AuthenticationError(BreCalApiError):
|
||||
"""Raised when login fails."""
|
||||
|
||||
|
||||
class AuthorizationError(BreCalApiError):
|
||||
"""Raised for 401/403 responses after authentication."""
|
||||
|
||||
|
||||
class ClientConfigurationError(ValueError):
|
||||
"""Raised for invalid client configuration or missing dependencies."""
|
||||
58
src/brecal_api_client/types.py
Normal file
58
src/brecal_api_client/types.py
Normal file
@ -0,0 +1,58 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict, Mapping, MutableMapping, Optional
|
||||
|
||||
JsonDict = Dict[str, Any]
|
||||
MutableJsonDict = MutableMapping[str, Any]
|
||||
|
||||
|
||||
@dataclass
|
||||
class LoginResult:
|
||||
"""Represents the payload returned by /login."""
|
||||
|
||||
id: int
|
||||
participant_id: Optional[int]
|
||||
first_name: str
|
||||
last_name: str
|
||||
user_name: str
|
||||
user_email: Optional[str]
|
||||
user_phone: Optional[str]
|
||||
token: str
|
||||
exp: int
|
||||
|
||||
@classmethod
|
||||
def from_api(cls, data: Mapping[str, Any]) -> "LoginResult":
|
||||
return cls(
|
||||
id=_coerce_int(data.get("id")),
|
||||
participant_id=_coerce_optional_int(data.get("participant_id")),
|
||||
first_name=str(data.get("first_name") or ""),
|
||||
last_name=str(data.get("last_name") or ""),
|
||||
user_name=str(data.get("user_name") or ""),
|
||||
user_email=_coerce_optional_str(data.get("user_email")),
|
||||
user_phone=_coerce_optional_str(data.get("user_phone")),
|
||||
token=str(data.get("token") or ""),
|
||||
exp=_coerce_int(data.get("exp")),
|
||||
)
|
||||
|
||||
@property
|
||||
def expires_at(self) -> datetime:
|
||||
return datetime.fromtimestamp(self.exp, tz=timezone.utc)
|
||||
|
||||
|
||||
def _coerce_int(value: Any) -> int:
|
||||
if value is None:
|
||||
raise ValueError("Expected integer value, got None")
|
||||
return int(value)
|
||||
|
||||
|
||||
def _coerce_optional_int(value: Any) -> Optional[int]:
|
||||
return None if value is None else int(value)
|
||||
|
||||
|
||||
def _coerce_optional_str(value: Any) -> Optional[str]:
|
||||
if value is None:
|
||||
return None
|
||||
text = str(value)
|
||||
return text if text else None
|
||||
166
tools/pdf_import/jmueller_parser.py
Normal file
166
tools/pdf_import/jmueller_parser.py
Normal file
@ -0,0 +1,166 @@
|
||||
# parser.py
|
||||
# Utilities to extract text from the PDF and parse rows into records.
|
||||
# Assumes two tables: first is Harbor A, second is Harbor B.
|
||||
|
||||
|
||||
from __future__ import annotations
|
||||
from csv import reader
|
||||
import re
|
||||
from typing import List, Dict, Any, Tuple
|
||||
|
||||
|
||||
# Optional: If you want OCR fallback later, wire in pdf2image + pytesseract here.
|
||||
|
||||
|
||||
def extract_text_lines(pdf_path: str) -> List[str]:
|
||||
"""Extract text lines from a PDF using pdfplumber (preferred) with a
|
||||
light fallback to PyPDF2. Returns a list of raw lines."""
|
||||
text = ""
|
||||
|
||||
try:
|
||||
import pdfplumber
|
||||
with pdfplumber.open(pdf_path) as pdf:
|
||||
pages_text = []
|
||||
for p in pdf.pages:
|
||||
t = p.extract_text() or ""
|
||||
pages_text.append(t)
|
||||
text = "\n".join(pages_text)
|
||||
except Exception:
|
||||
|
||||
try:
|
||||
from PyPDF2 import PdfReader
|
||||
reader = PdfReader(pdf_path)
|
||||
pages_text = []
|
||||
for page in reader.pages:
|
||||
pages_text.append(page.extract_text() or "")
|
||||
text = "\n".join(pages_text)
|
||||
except Exception:
|
||||
text = ""
|
||||
|
||||
|
||||
if not text.strip():
|
||||
raise RuntimeError("No text extracted. If the PDF is scanned, add OCR fallback (pytesseract).")
|
||||
|
||||
|
||||
# Normalize to individual lines
|
||||
lines = [ln.strip() for ln in text.splitlines()]
|
||||
return lines
|
||||
|
||||
|
||||
HEADER_PATTERNS = [
|
||||
re.compile(r"\bSchiff\b.*\bETA\b.*\bETS\b", re.IGNORECASE),
|
||||
re.compile(r"Nächster Hafen|Liegeplatz|Ladung|Lotse", re.IGNORECASE),
|
||||
]
|
||||
|
||||
|
||||
DATE_TOKEN = re.compile(r"\b\d{1,2}\.\d{1,2}\.(?:\d{4})?")
|
||||
TIME_FRAGMENT = r"(?:\s*/\s*\d{1,2}\.\d{2}\s*Uhr\s*\*?)?"
|
||||
DT_TOKEN_WITH_TIME = re.compile(r"\d{1,2}\.\d{1,2}\.(?:\d{4})?" + TIME_FRAGMENT)
|
||||
|
||||
|
||||
|
||||
|
||||
def cleanse_lines(lines: List[str]) -> List[str]:
|
||||
"""Remove known header lines and keep data/blank lines."""
|
||||
out: List[str] = []
|
||||
for ln in lines:
|
||||
if not ln:
|
||||
out.append("")
|
||||
continue
|
||||
if any(p.search(ln) for p in HEADER_PATTERNS):
|
||||
continue
|
||||
out.append(ln)
|
||||
return out
|
||||
|
||||
def split_into_tables(lines: List[str]) -> List[List[str]]:
|
||||
"""Find candidate data lines (those containing a date token) and split them
|
||||
into up to two blocks separated by at least one blank line. Returns a list
|
||||
of blocks (1 or 2)."""
|
||||
candidate = [ln for ln in lines if (ln == "" or DATE_TOKEN.search(ln))]
|
||||
|
||||
|
||||
blocks: List[List[str]] = []
|
||||
current: List[str] = []
|
||||
seen_data = False
|
||||
for ln in candidate:
|
||||
if ln == "":
|
||||
if seen_data and current:
|
||||
blocks.append(current)
|
||||
current = []
|
||||
seen_data = False
|
||||
continue
|
||||
current.append(ln)
|
||||
seen_data = True
|
||||
if current:
|
||||
blocks.append(current)
|
||||
|
||||
|
||||
if len(blocks) > 2:
|
||||
# Merge any extra blocks into the second
|
||||
blocks = [blocks[0], sum(blocks[1:], [])]
|
||||
return blocks
|
||||
|
||||
def parse_line_to_record(ln: str) -> Dict[str, Any]:
|
||||
"""Parse a single table line into a minimal record.
|
||||
|
||||
|
||||
Output fields:
|
||||
- ship: text before the first date token
|
||||
- eta_raw: 1st date(+optional time) token as raw string
|
||||
- ets_raw: 2nd date(+optional time) token as raw string (if present)
|
||||
- notes: remainder of the line after the last extracted date token
|
||||
- raw_line: the full original line
|
||||
"""
|
||||
|
||||
# Ship name up to the first date token
|
||||
first = DATE_TOKEN.search(ln)
|
||||
ship = ln[: first.start()].strip() if first else ln.strip()
|
||||
|
||||
|
||||
# Extract up to two date(+time) tokens
|
||||
dt_tokens = DT_TOKEN_WITH_TIME.findall(ln)
|
||||
eta_raw = dt_tokens[0].strip() if len(dt_tokens) >= 1 else None
|
||||
ets_raw = dt_tokens[1].strip() if len(dt_tokens) >= 2 else None
|
||||
|
||||
|
||||
# Notes: everything after the last date token we captured
|
||||
notes = ""
|
||||
if dt_tokens:
|
||||
last_match = None
|
||||
it = DT_TOKEN_WITH_TIME.finditer(ln)
|
||||
for last_match in it:
|
||||
pass
|
||||
if last_match:
|
||||
notes = ln[last_match.end() :].strip()
|
||||
|
||||
|
||||
return {
|
||||
"ship": ship,
|
||||
"eta_raw": eta_raw,
|
||||
"ets_raw": ets_raw,
|
||||
"notes": notes,
|
||||
"raw_line": ln,
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
def parse_pdf_to_records(pdf_path: str) -> List[Dict[str, Any]]:
|
||||
"""High-level: extract lines, cleanse headers, split into 1–2 tables,
|
||||
tag as harbor A/B by order, parse rows → records."""
|
||||
|
||||
lines = extract_text_lines(pdf_path)
|
||||
clean = cleanse_lines(lines)
|
||||
blocks = split_into_tables(clean)
|
||||
|
||||
|
||||
records: List[Dict[str, Any]] = []
|
||||
for i, block in enumerate(blocks):
|
||||
harbor = "A" if i == 0 else "B"
|
||||
for ln in block:
|
||||
if not ln.strip():
|
||||
continue
|
||||
rec = parse_line_to_record(ln)
|
||||
rec["harbor"] = harbor
|
||||
records.append(rec)
|
||||
return records
|
||||
172
tools/pdf_import/pdf_to_records.py
Normal file
172
tools/pdf_import/pdf_to_records.py
Normal file
@ -0,0 +1,172 @@
|
||||
# pdf_to_records.py
|
||||
# CLI: parse a PDF and write JSONL (default) or CSV with one record per row.
|
||||
|
||||
|
||||
from __future__ import annotations
|
||||
import argparse, json, csv, re
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Any
|
||||
|
||||
|
||||
# -----------------------------
|
||||
# PDF text extraction helpers
|
||||
# -----------------------------
|
||||
HEADER_PATTERNS = [
|
||||
re.compile(r"\bSchiff\b.*\bETA\b.*\bETS\b", re.IGNORECASE),
|
||||
re.compile(r"Nächster Hafen|Liegeplatz|Ladung|Lotse", re.IGNORECASE),
|
||||
]
|
||||
DATE_TOKEN = re.compile(r"\b\d{1,2}\.\d{1,2}\.(?:\d{4})?")
|
||||
TIME_FRAGMENT = r"(?:\s*/\s*\d{1,2}\.\d{2}\s*Uhr\s*\*?)?"
|
||||
DT_TOKEN_WITH_TIME = re.compile(r"\d{1,2}\.\d{1,2}\.(?:\d{4})?" + TIME_FRAGMENT)
|
||||
|
||||
|
||||
def extract_text_lines(pdf_path: str) -> List[str]:
|
||||
"""Extract raw text lines from the PDF. Prefers pdfplumber with PyPDF2 fallback."""
|
||||
text = ""
|
||||
try:
|
||||
import pdfplumber
|
||||
|
||||
with pdfplumber.open(pdf_path) as pdf:
|
||||
pages_text = [(p.extract_text() or "") for p in pdf.pages]
|
||||
text = "\n".join(pages_text)
|
||||
except Exception:
|
||||
try:
|
||||
from PyPDF2 import PdfReader
|
||||
|
||||
reader = PdfReader(pdf_path)
|
||||
pages_text = [(page.extract_text() or "") for page in reader.pages]
|
||||
text = "\n".join(pages_text)
|
||||
except Exception:
|
||||
text = ""
|
||||
|
||||
if not text.strip():
|
||||
raise RuntimeError(
|
||||
"No text extracted. If the PDF is scanned, consider adding OCR fallback."
|
||||
)
|
||||
|
||||
return [ln.strip() for ln in text.splitlines()]
|
||||
|
||||
|
||||
def cleanse_lines(lines: List[str]) -> List[str]:
|
||||
"""Remove headers, keep data lines and blanks for table boundaries."""
|
||||
cleaned: List[str] = []
|
||||
for ln in lines:
|
||||
if not ln:
|
||||
cleaned.append("")
|
||||
continue
|
||||
if any(pattern.search(ln) for pattern in HEADER_PATTERNS):
|
||||
continue
|
||||
cleaned.append(ln)
|
||||
return cleaned
|
||||
|
||||
|
||||
def split_into_tables(lines: List[str]) -> List[List[str]]:
|
||||
"""Split lines into up to two tables, separated by blank lines."""
|
||||
candidate = [ln for ln in lines if (ln == "" or DATE_TOKEN.search(ln))]
|
||||
|
||||
blocks: List[List[str]] = []
|
||||
current: List[str] = []
|
||||
seen_data = False
|
||||
for ln in candidate:
|
||||
if ln == "":
|
||||
if seen_data and current:
|
||||
blocks.append(current)
|
||||
current = []
|
||||
seen_data = False
|
||||
continue
|
||||
current.append(ln)
|
||||
seen_data = True
|
||||
if current:
|
||||
blocks.append(current)
|
||||
|
||||
if len(blocks) > 2:
|
||||
blocks = [blocks[0], sum(blocks[1:], [])]
|
||||
return blocks
|
||||
|
||||
|
||||
def parse_line_to_record(ln: str) -> Dict[str, Any]:
|
||||
"""Parse a table line into structured fields."""
|
||||
first = DATE_TOKEN.search(ln)
|
||||
ship = ln[: first.start()].strip() if first else ln.strip()
|
||||
|
||||
dt_tokens = DT_TOKEN_WITH_TIME.findall(ln)
|
||||
eta_raw = dt_tokens[0].strip() if len(dt_tokens) >= 1 else None
|
||||
ets_raw = dt_tokens[1].strip() if len(dt_tokens) >= 2 else None
|
||||
|
||||
notes = ""
|
||||
if dt_tokens:
|
||||
last_match = None
|
||||
for last_match in DT_TOKEN_WITH_TIME.finditer(ln):
|
||||
pass
|
||||
if last_match:
|
||||
notes = ln[last_match.end() :].strip()
|
||||
|
||||
return {
|
||||
"ship": ship,
|
||||
"eta_raw": eta_raw,
|
||||
"ets_raw": ets_raw,
|
||||
"notes": notes,
|
||||
"raw_line": ln,
|
||||
}
|
||||
|
||||
|
||||
def parse_pdf_to_records(pdf_path: str) -> List[Dict[str, Any]]:
|
||||
"""High-level parser: extract text, sanitize, split per harbor, parse rows."""
|
||||
lines = extract_text_lines(pdf_path)
|
||||
clean = cleanse_lines(lines)
|
||||
blocks = split_into_tables(clean)
|
||||
|
||||
records: List[Dict[str, Any]] = []
|
||||
for i, block in enumerate(blocks):
|
||||
harbor = "A" if i == 0 else "B"
|
||||
for ln in block:
|
||||
if not ln.strip():
|
||||
continue
|
||||
rec = parse_line_to_record(ln)
|
||||
rec["harbor"] = harbor
|
||||
records.append(rec)
|
||||
|
||||
return records
|
||||
|
||||
|
||||
def write_jsonl(path: Path, rows: List[Dict[str, Any]]):
|
||||
with path.open("w", encoding="utf-8") as f:
|
||||
for r in rows:
|
||||
f.write(json.dumps(r, ensure_ascii=False) + "\n")
|
||||
|
||||
|
||||
def write_csv(path: Path, rows: List[Dict[str, Any]]):
|
||||
if not rows:
|
||||
path.write_text("", encoding="utf-8")
|
||||
return
|
||||
fieldnames = ["harbor", "ship", "eta_raw", "ets_raw", "notes"]
|
||||
with path.open("w", newline="", encoding="utf-8") as f:
|
||||
w = csv.DictWriter(f, fieldnames=fieldnames)
|
||||
w.writeheader()
|
||||
for r in rows:
|
||||
w.writerow({k: r.get(k) for k in fieldnames})
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser(description="Parse ship tables PDF → records (A/B)")
|
||||
ap.add_argument("pdf", help="Path to partner PDF")
|
||||
ap.add_argument("--out", help="Output file path (default: <pdf>.jsonl)")
|
||||
ap.add_argument("--format", choices=["jsonl", "csv"], default="jsonl")
|
||||
args = ap.parse_args()
|
||||
|
||||
|
||||
rows = parse_pdf_to_records(args.pdf)
|
||||
|
||||
|
||||
out = Path(args.out) if args.out else Path(args.pdf).with_suffix(".jsonl")
|
||||
if args.format == "jsonl":
|
||||
write_jsonl(out, rows)
|
||||
else:
|
||||
write_csv(out, rows)
|
||||
|
||||
print(f"Wrote {len(rows)} records -> {out}")
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in New Issue
Block a user