Geospatiale Datenpipelines für ganz Deutschland

Ein modulares Framework für Gebäude-Big-Data auf Spark und Container Apps

  • azure
  • data-engineering
  • geospatial
  • python
  • spark

Bei credium verarbeiten wir geospatiale Gebäudedaten für ganz Deutschland — Millionen von Datensätzen pro Bundesland, aus amtlichen Katasterdaten (ALKIS), 3D-Stadtmodellen (CityGML) und eigenen ML-Modellen. Das Ergebnis: angereicherte Gebäudeattribute wie Typ, Wandflächen, Geschosszahlen und dutzende weitere Dimensionen. Hier beschreibe ich die Infrastruktur dahinter.

Das JobBuilder-Framework

Das Kernstück ist ein abstraktes Framework mit zwei konkreten Laufzeitumgebungen.

PythonJobBuilder läuft in Azure Container Apps — für Datensätze unter ~1 GB. Nutzt Pandas, psycopg2 und DuckDB. Leichtgewichtig, schnell gestartet, günstig.

SparkJobBuilder läuft auf Azure Synapse — für alles darüber. Nutzt PySpark mit Apache Sedona für räumliche SQL-Operationen (ST_Intersects, ST_Buffer, ST_Transform). Skaliert horizontal über mehrere Worker.

Beide Builder teilen dieselbe Basis:

  • YAML-Config mit Auto-Detection — der Builder inspiziert den Call-Stack und findet automatisch die config.yml im Verzeichnis des aufrufenden Skripts. Jobs übergeben nur ihren Namen.
  • Connector-Registry — Plugin-System für Datenquellen. job.register_postgres("pg", config, "postgres") macht job.pg als Attribut verfügbar. Sieben Connector-Typen: PostgreSQL, Blob, HTTP, DuckDB, Serverless SQL Pool, Dedicated SQL Pool, Teams.
  • Einheitliche Data-Quality-Checks — identische Validierungslogik auf beiden Runtimes.
  • Lifecycle-Managementstart() → Verarbeitung → end()/fail() mit automatischem Log-Upload nach Azure Blob Storage.
# Container-Version: Pandas + DuckDB
job = PythonJobBuilder("family_home")
job.register_duckdb("duckdb", config, "duckdb")
df = job.duckdb.select("SELECT * FROM read_parquet('az://...')")

# Spark-Version: PySpark + Sedona
job = SparkJobBuilder("family_home")
df = job.spark.read.parquet("abfss://...@storage.dfs.core.windows.net/...")
df = df.filter(F.col("building_function").isin(residential_functions))

Derselbe konzeptionelle Job kann auf unterschiedlicher Infrastruktur laufen — je nach Datenmenge. Der Family-Home-Job existiert z.B. als Spark-Version (In-Cluster-Verarbeitung) und als Container-Version, die Azure Synapse CETAS nutzt und die Daten komplett serverseitig verarbeitet, ohne sie in den Container zu laden.

Connector-System

Jeder Connector implementiert ein einheitliches Interface für select, insert, upsert und execute. Die interessantesten:

PostgreSQL — psycopg2 mit Azure AD Token-Auth, Key-Vault-Integration für Secrets, und COPY-basierte Bulk-Inserts für Performance.

DuckDB — Generiert SAS-Tokens für Azure Blob Storage und lässt DuckDB direkt auf Produktions-Parquets arbeiten. Damit können Entwickler Spark-Jobs lokal gegen echte Daten testen, ohne Cluster:

job.register_duckdb("duckdb", config, "duckdb")
df = job.duckdb.select("""
    SELECT gmlid, ST_AsText(geom) as geom_wkt, height
    FROM read_parquet('az://container/buildings/*.parquet')
    WHERE height > 10
""")

Dedicated SQL Pool — Server-seitige Verarbeitung über CETAS (Create External Table As Select) für 100GB+ Datensätze, die nicht in den Arbeitsspeicher passen.

Data Quality als First-Class Citizen

Kein Job schreibt Output ohne Qualitätsprüfung. Der BaseValidator bietet:

  • Coverage — Anteil nicht-null pro Spalte
  • Uniqueness — Primärschlüssel-Eindeutigkeit
  • Range — Wertgrenzen (Höhe 0-1000m, Azimut 0-360°, Confidence 0-1)
  • Pattern — Regex-Validierung für Identifier (z.B. deutsche GMLIDs)
  • Count Stability — Abweichung zum Vorgängerlauf (Schwelle: 5%)
  • Distribution — Erwartete Verteilung pro Kategorie über ganz Deutschland
  • Column-by-Column Diff — Detaillierter Vergleich mit Backup-Daten inkl. Change- und Null-Schwellwerten

Schlägt ein Check fehl, stoppt die Pipeline. Das Team bekommt eine Teams-Nachricht als Adaptive Card mit Zusammenfassung aller Checks.

Notifications

Das BaseNotifications-Modul sendet Microsoft Teams Webhooks mit Rich Adaptive Cards — inklusive Data-Quality-Summaries, Pipeline-Status und Fehlerdetails. Nicht nur „Job failed”, sondern genau welcher Check warum fehlgeschlagen ist.

Beispiel: Räumliche Analysen mit Sedona

Die Spark-Jobs nutzen Apache Sedona für geospatiale Operationen im großen Maßstab. Ein typisches Muster: räumliche Joins über Millionen von Geometrien, z.B. Berührungsanalysen zwischen Gebäuden (ST_Intersects(ST_Buffer(...))), Azimut-Berechnungen für Wandorientierungen (ST_Azimuth, ST_DumpPoints), oder CRS-Transformationen (ST_Transform(..., 'EPSG:25832')).

Besonders komplex: die Wandflächenberechnung, die 3D-Gebäudeflächen aus CityGML in 2D projiziert, Schnittflächen zwischen angrenzenden Wänden berechnet, und daraus Anbautypen ableitet (freistehend, einseitig, zweiseitig, …).

Self-Monitoring

Eine Azure Function pollt alle 60 Sekunden Synapse nach Pipeline- und Spark-Runs, reichert Spark-Runs mit Kostenabschätzungen basierend auf Pool-Pricing an, und upserted alles nach PostgreSQL. Bei Statuswechseln (running → succeeded/failed) gehen automatisch Teams-Notifications raus.

Infrastruktur

  • Azure Synapse für Spark (3.4 und 3.5)
  • Azure Container Apps für Python-Jobs
  • Azure Functions für Event-getriebene Tasks
  • Azure Blob Storage (Parquet) als zentraler Data Lake
  • PostgreSQL + PostGIS für die Produktionsdatenbank
  • Docker Compose für lokale Spark-Cluster (Master + 2 Worker)
  • uv als Python-Paketmanager
  • Azure DevOps Pipelines für CI/CD
  • Log-Archivierung — Job-Logs werden gebuffert und bei Abschluss nach Blob Storage geschrieben, mit RedactingFormatter der Secrets aus den Logs maskiert