Geospatiale Datenpipelines für ganz Deutschland
Ein modulares Framework für Gebäude-Big-Data auf Spark und Container Apps
- azure
- data-engineering
- geospatial
- python
- spark
Bei credium verarbeiten wir geospatiale Gebäudedaten für ganz Deutschland — Millionen von Datensätzen pro Bundesland, aus amtlichen Katasterdaten (ALKIS), 3D-Stadtmodellen (CityGML) und eigenen ML-Modellen. Das Ergebnis: angereicherte Gebäudeattribute wie Typ, Wandflächen, Geschosszahlen und dutzende weitere Dimensionen. Hier beschreibe ich die Infrastruktur dahinter.
Das JobBuilder-Framework
Das Kernstück ist ein abstraktes Framework mit zwei konkreten Laufzeitumgebungen.
PythonJobBuilder läuft in Azure Container Apps — für Datensätze unter ~1 GB. Nutzt Pandas, psycopg2 und DuckDB. Leichtgewichtig, schnell gestartet, günstig.
SparkJobBuilder läuft auf Azure Synapse — für alles darüber. Nutzt PySpark mit Apache Sedona für räumliche SQL-Operationen (ST_Intersects, ST_Buffer, ST_Transform). Skaliert horizontal über mehrere Worker.
Beide Builder teilen dieselbe Basis:
- YAML-Config mit Auto-Detection — der Builder inspiziert den Call-Stack und findet automatisch die
config.ymlim Verzeichnis des aufrufenden Skripts. Jobs übergeben nur ihren Namen. - Connector-Registry — Plugin-System für Datenquellen.
job.register_postgres("pg", config, "postgres")machtjob.pgals Attribut verfügbar. Sieben Connector-Typen: PostgreSQL, Blob, HTTP, DuckDB, Serverless SQL Pool, Dedicated SQL Pool, Teams. - Einheitliche Data-Quality-Checks — identische Validierungslogik auf beiden Runtimes.
- Lifecycle-Management —
start()→ Verarbeitung →end()/fail()mit automatischem Log-Upload nach Azure Blob Storage.
# Container-Version: Pandas + DuckDB
job = PythonJobBuilder("family_home")
job.register_duckdb("duckdb", config, "duckdb")
df = job.duckdb.select("SELECT * FROM read_parquet('az://...')")
# Spark-Version: PySpark + Sedona
job = SparkJobBuilder("family_home")
df = job.spark.read.parquet("abfss://...@storage.dfs.core.windows.net/...")
df = df.filter(F.col("building_function").isin(residential_functions))
Derselbe konzeptionelle Job kann auf unterschiedlicher Infrastruktur laufen — je nach Datenmenge. Der Family-Home-Job existiert z.B. als Spark-Version (In-Cluster-Verarbeitung) und als Container-Version, die Azure Synapse CETAS nutzt und die Daten komplett serverseitig verarbeitet, ohne sie in den Container zu laden.
Connector-System
Jeder Connector implementiert ein einheitliches Interface für select, insert, upsert und execute. Die interessantesten:
PostgreSQL — psycopg2 mit Azure AD Token-Auth, Key-Vault-Integration für Secrets, und COPY-basierte Bulk-Inserts für Performance.
DuckDB — Generiert SAS-Tokens für Azure Blob Storage und lässt DuckDB direkt auf Produktions-Parquets arbeiten. Damit können Entwickler Spark-Jobs lokal gegen echte Daten testen, ohne Cluster:
job.register_duckdb("duckdb", config, "duckdb")
df = job.duckdb.select("""
SELECT gmlid, ST_AsText(geom) as geom_wkt, height
FROM read_parquet('az://container/buildings/*.parquet')
WHERE height > 10
""")
Dedicated SQL Pool — Server-seitige Verarbeitung über CETAS (Create External Table As Select) für 100GB+ Datensätze, die nicht in den Arbeitsspeicher passen.
Data Quality als First-Class Citizen
Kein Job schreibt Output ohne Qualitätsprüfung. Der BaseValidator bietet:
- Coverage — Anteil nicht-null pro Spalte
- Uniqueness — Primärschlüssel-Eindeutigkeit
- Range — Wertgrenzen (Höhe 0-1000m, Azimut 0-360°, Confidence 0-1)
- Pattern — Regex-Validierung für Identifier (z.B. deutsche GMLIDs)
- Count Stability — Abweichung zum Vorgängerlauf (Schwelle: 5%)
- Distribution — Erwartete Verteilung pro Kategorie über ganz Deutschland
- Column-by-Column Diff — Detaillierter Vergleich mit Backup-Daten inkl. Change- und Null-Schwellwerten
Schlägt ein Check fehl, stoppt die Pipeline. Das Team bekommt eine Teams-Nachricht als Adaptive Card mit Zusammenfassung aller Checks.
Notifications
Das BaseNotifications-Modul sendet Microsoft Teams Webhooks mit Rich Adaptive Cards — inklusive Data-Quality-Summaries, Pipeline-Status und Fehlerdetails. Nicht nur „Job failed”, sondern genau welcher Check warum fehlgeschlagen ist.
Beispiel: Räumliche Analysen mit Sedona
Die Spark-Jobs nutzen Apache Sedona für geospatiale Operationen im großen Maßstab. Ein typisches Muster: räumliche Joins über Millionen von Geometrien, z.B. Berührungsanalysen zwischen Gebäuden (ST_Intersects(ST_Buffer(...))), Azimut-Berechnungen für Wandorientierungen (ST_Azimuth, ST_DumpPoints), oder CRS-Transformationen (ST_Transform(..., 'EPSG:25832')).
Besonders komplex: die Wandflächenberechnung, die 3D-Gebäudeflächen aus CityGML in 2D projiziert, Schnittflächen zwischen angrenzenden Wänden berechnet, und daraus Anbautypen ableitet (freistehend, einseitig, zweiseitig, …).
Self-Monitoring
Eine Azure Function pollt alle 60 Sekunden Synapse nach Pipeline- und Spark-Runs, reichert Spark-Runs mit Kostenabschätzungen basierend auf Pool-Pricing an, und upserted alles nach PostgreSQL. Bei Statuswechseln (running → succeeded/failed) gehen automatisch Teams-Notifications raus.
Infrastruktur
- Azure Synapse für Spark (3.4 und 3.5)
- Azure Container Apps für Python-Jobs
- Azure Functions für Event-getriebene Tasks
- Azure Blob Storage (Parquet) als zentraler Data Lake
- PostgreSQL + PostGIS für die Produktionsdatenbank
- Docker Compose für lokale Spark-Cluster (Master + 2 Worker)
- uv als Python-Paketmanager
- Azure DevOps Pipelines für CI/CD
- Log-Archivierung — Job-Logs werden gebuffert und bei Abschluss nach Blob Storage geschrieben, mit
RedactingFormatterder Secrets aus den Logs maskiert