Developing a Data Transformer from Scratch
A data transformer is the bridge between the raw data a collector produces and the standardized entities the Open Data Hub serves. It consumes raw data events from the message queue, transforms them, and pushes the result to the Open Data Hub. Where it pushes depends on the kind of data:
- Time series measurements go to the Timeseries Writer (BDP). See Pushing to the Timeseries Writer.
- Structured content (announcements, points of interest, events, and similar) goes to the Content API. See Pushing to the Content API.
This page covers the parts every transformer shares: the role, the core principles, the SDK listener, configuration, containerization, and the local development workflow. The two pages above cover the write side specific to each target.
1. Understanding the Role of a Data Transformer
A data transformer acts as a processing unit within the data integration pipeline. Its primary responsibilities include:
- Raw Data Consumption: Listening to a message queue for events indicating new raw data.
- Raw Data Retrieval: Fetching the actual raw data from the raw data storage (the SDK handles this when only an event notification was received).
- Data Transformation: Converting the raw, source-specific data into the standardized data model of the Open Data Hub. This often involves:
- Parsing: Deserializing the raw data (e.g., JSON, XML).
- Enrichment: Adding derived information or linking to external datasets (e.g., geocoding, unit conversions).
- Mapping: Translating source-specific identifiers or schemas to Open Data Hub standards.
- Publication: Pushing the transformed data to its target (the Timeseries Writer for measurements, or the Content API for content).
The transformer is where the "intelligence" of data standardization resides, making raw data consumable for various applications.
2. Core Principles of Data Transformer Development
Many principles from data collector development apply, with some specific nuances for transformers:
2.1. Idempotency is Key
Transformers must be idempotent. Processing the same raw data event multiple times should produce the same result without creating duplicates or conflicting states. Your transformation logic should avoid side effects from re-processing.
2.2. Robust Data Validation and Error Handling
Data from collectors can be malformed or incomplete. Your transformer needs to:
- Validate Input: Check if the raw data conforms to expected schemas.
- Selective Processing: If a single record within a batch fails, decide whether to skip only that record or the entire batch.
- Dead-Letter Queues (DLQ): Failed messages should typically be moved to a DLQ for later inspection, preventing them from blocking the main queue. The SDK's consumer often handles this.
- Fail Early: Let the application crash if something goes wrong; the test and production environment handles restarts.
2.3. Data Model Consistency
Strictly follow the Open Data Hub data model for your target: stations, data types, and measurements for time series; entities and tags for content. Assign stable, unique IDs and populate metadata richly (for example names in multiple languages) to improve discoverability.
2.4. Performance and Batching
Transformations can be CPU or I/O intensive. Consider batch processing (both writer APIs support pushing data in batches), concurrency, and efficient in-memory lookups for static data instead of repeated file reads.
2.5. Observability
As with collectors, comprehensive logging, tracing, and metrics are crucial for monitoring the transformation process, identifying data quality issues, and debugging performance problems.
3. The transformer skeleton
3.1. Project structure
A typical transformer project looks like this (static data files are added only when the transformation logic needs them):
.
├── docker-compose.yml # Local development setup
├── infrastructure
│ ├── docker
│ │ └── Dockerfile # Containerization instructions
│ └── helm # Helm charts for Kubernetes deployment
├── resources # Static data files required by the transformer (optional)
├── src # Go source code
│ ├── dto.go # Data Transfer Objects (raw data schema)
│ ├── go.mod
│ ├── main.go # Main application logic
│ └── main_test.go # Unit tests
└── testdata # Test input and expected output files
├── input
└── output
3.2. Configuration via environment variables
By design, all configuration is done via environment variables. This simplifies the development cycle and deployment, letting you use a .env file for local testing and Helm values.yaml for deployment.
3.3. Consuming raw data with the SDK
Every transformer uses the opendatahub-go-sdk to initialize common services and to consume raw data from the message queue. The skeleton is the same regardless of the write target:
func main() {
ms.InitWithEnv(context.Background(), "", &env) // logging, messaging, telemetry
defer tel.FlushOnPanic()
// ... set up your write client (BDP or Content API) ...
listener := tr.NewTr[YourDTO](context.Background(), env.Env)
err := listener.Start(context.Background(), Transform)
ms.FailOnError(context.Background(), err, "error while listening to queue")
}
func Transform(ctx context.Context, payload *rdb.Raw[YourDTO]) error {
// map payload.Rawdata, then push to your target
return nil
}
ms.InitWithEnvsets up logging, the messaging consumer, and telemetry based ontr.Env.tr.NewTr[YourDTO]creates the listener; the type parameter is the Go struct the raw data is unmarshaled into (received asrdb.Raw[YourDTO]).listener.Start(ctx, handler)consumes messages from the configured queue and calls your handler for each one.
The type passed to tr.NewTr[T] is used to deserialize the raw data directly. If the collector stored the raw payload as a serialized JSON string (very common), construct the listener with string and wrap your handler with RawString2JsonMiddleware, which deserializes the string for you:
listener := tr.NewTr[string](context.Background(), env.Env)
err := listener.Start(context.Background(), tr.RawString2JsonMiddleware[YourDTO](Transform))
SDK environment variables
When using the official SDK, these environment variables configure the transformer:
| Variable | Description | Allowed Values | Default |
|---|---|---|---|
PROVIDER | String identifying the data provider | [path1]/[path2]/... | (No default set) |
MQ_URI | Connection URI to connect to RabbitMQ | - | (No default set) |
MQ_CLIENT | Client name to identify the connection in RabbitMQ | - | (No default set) |
MQ_EXCHANGE | The exchange notifying the new data | - | routed |
MQ_QUEUE | RabbitMQ queue this transformer pulls messages from | - | (No default set) |
MQ_KEY | Routing key used to route messages from MQ_EXCHANGE to MQ_QUEUE | path1.path2... | (No default set) |
RAW_DATA_BRIDGE_ENDPOINT | Endpoint of the Raw Data Bridge, which retrieves raw data | - | (No default set) |
LOG_LEVEL | Sets the logging severity level | DEBUG, INFO, WARN, ERROR | INFO |
SERVICE_NAME | The name of the service | - | gotel |
SERVICE_VERSION | The version of the service | - | 0.0.1 |
TELEMETRY_ENABLED | Enables or disables telemetry | true, false | true |
TELEMETRY_TRACE_ENABLED | Enables or disables trace telemetry | true, false | true |
TELEMETRY_TRACE_GRPC_ENDPOINT | The gRPC endpoint for trace export | - | localhost:4317 |
TELEMETRY_METRICS_ENABLED | Enables or disables metrics telemetry | true, false | false |
TELEMETRY_METRICS_GRPC_ENDPOINT | The gRPC endpoint for metrics export | - | localhost:4317 |
The target-specific variables (the BDP_* settings for the Timeseries Writer, or the ODH_CORE_* settings for the Content API) are documented on the two target pages.
4. Containerization with Docker
The Dockerfile uses a multi-stage build. Copy any static resource files into both the build and final stages.
- Dockerfile
# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0
FROM golang:1.25-bookworm as base
FROM base as build-env
WORKDIR /app
COPY src/. . # Copy source code
COPY resources/. ./resources # Copy static resource files (if any)
RUN go mod download
RUN CGO_ENABLED=0 GOOS=linux go build -o main
# BUILD published image (minimal, production-ready)
FROM alpine:latest as build
WORKDIR /app
COPY --from=build-env /app/main .
COPY --from=build-env /app/resources /resources
ENTRYPOINT [ "./main"]
# LOCAL DEVELOPMENT (for hot-reloading/easier debugging)
FROM base as dev
WORKDIR /code
CMD ["go", "run", "main.go"]
Keep the go directive in your go.mod no higher than the Go version of this Dockerfile's base image (this guide uses golang:1.25), or the container build fails with go.mod requires go >= ... (running go ...).
5. Local Orchestration with Docker Compose
The transformer's docker-compose.yml needs a message queue (RabbitMQ) to consume from. The dev profile brings up its own RabbitMQ; the bdp profile runs against the shared infrastructure-v2 stack on the external ingestion network.
- docker-compose.yml
x-app-common:
&app-common
build:
dockerfile: infrastructure/docker/Dockerfile
context: .
target: dev
env_file:
- .env
volumes:
- ./src:/code
- ./infrastructure:/code/infrastructure
- pkg:/go/pkg/mod
working_dir: /code
services:
app-bdp:
<<: *app-common
profiles:
- bdp
networks:
- ingestion
app:
<<: *app-common
depends_on:
rabbitmq:
condition: service_healthy
profiles:
- dev
rabbitmq:
extends:
file: ../lib/docker-compose/docker-compose.rabbitmq.yml
service: rabbitmq
attach: false
profiles:
- dev
volumes:
pkg:
networks:
ingestion:
external: true
When developing standalone, start the transformer with its own rabbitmq service:
docker compose --profile dev up
When testing the complete pipeline (together with the collector and the full Open Data Hub Core), start the transformer without its own rabbitmq service:
docker compose --profile bdp up
6. Local Development Workflow
- Clone repositories:
opendatahub-collectors(your transformer's source) and infrastructure-v2 (the shared compose files). - Navigate to your transformer's directory:
cd opendatahub-collectors/transformers/foo-bar. - Create
.env: copy the provided.envcontent and set theMQ_*queue settings,RAW_DATA_BRIDGE_ENDPOINT, and the target-specific credentials (see the Timeseries or Content page). - Start the infrastructure: from the
infrastructure-v2directory:docker compose -f docker-compose.yml up -d
docker compose -f docker-compose.timeseries.yml up -d - Start the transformer: from your transformer's directory:
docker compose --profile bdp up --build
Testing the data flow
- Transformer logs: observe the logs of your
app(orapp-bdp) container. - RabbitMQ Management:
http://localhost:15673(guest/guest). Theinfrastructure-v2stack maps the management UI to host port 15673; a transformer or collector running with--profile devwould take15672. - MongoDB:
mongodb://localhost:27017/?directConnection=true. Inspect the raw data stored by the SDK before it is picked up.
7. Choosing your write target
The skeleton above is shared. The write side depends on the data:
- Pushing to the Timeseries Writer (BDP) for time series measurements from stations and sensors, using
go-bdp-client. - Pushing to the Content API for structured content (announcements, points of interest, events), using the SDK's
clibpackage.