Skip to main content

Developing a Data Transformer from Scratch

A data transformer is the bridge between the raw data a collector produces and the standardized entities the Open Data Hub serves. It consumes raw data events from the message queue, transforms them, and pushes the result to the Open Data Hub. Where it pushes depends on the kind of data:

This page covers the parts every transformer shares: the role, the core principles, the SDK listener, configuration, containerization, and the local development workflow. The two pages above cover the write side specific to each target.

1. Understanding the Role of a Data Transformer

A data transformer acts as a processing unit within the data integration pipeline. Its primary responsibilities include:

  • Raw Data Consumption: Listening to a message queue for events indicating new raw data.
  • Raw Data Retrieval: Fetching the actual raw data from the raw data storage (the SDK handles this when only an event notification was received).
  • Data Transformation: Converting the raw, source-specific data into the standardized data model of the Open Data Hub. This often involves:
    • Parsing: Deserializing the raw data (e.g., JSON, XML).
    • Enrichment: Adding derived information or linking to external datasets (e.g., geocoding, unit conversions).
    • Mapping: Translating source-specific identifiers or schemas to Open Data Hub standards.
  • Publication: Pushing the transformed data to its target (the Timeseries Writer for measurements, or the Content API for content).

The transformer is where the "intelligence" of data standardization resides, making raw data consumable for various applications.

data transformer flow

2. Core Principles of Data Transformer Development

Many principles from data collector development apply, with some specific nuances for transformers:

2.1. Idempotency is Key

Transformers must be idempotent. Processing the same raw data event multiple times should produce the same result without creating duplicates or conflicting states. Your transformation logic should avoid side effects from re-processing.

2.2. Robust Data Validation and Error Handling

Data from collectors can be malformed or incomplete. Your transformer needs to:

  • Validate Input: Check if the raw data conforms to expected schemas.
  • Selective Processing: If a single record within a batch fails, decide whether to skip only that record or the entire batch.
  • Dead-Letter Queues (DLQ): Failed messages should typically be moved to a DLQ for later inspection, preventing them from blocking the main queue. The SDK's consumer often handles this.
  • Fail Early: Let the application crash if something goes wrong; the test and production environment handles restarts.

2.3. Data Model Consistency

Strictly follow the Open Data Hub data model for your target: stations, data types, and measurements for time series; entities and tags for content. Assign stable, unique IDs and populate metadata richly (for example names in multiple languages) to improve discoverability.

2.4. Performance and Batching

Transformations can be CPU or I/O intensive. Consider batch processing (both writer APIs support pushing data in batches), concurrency, and efficient in-memory lookups for static data instead of repeated file reads.

2.5. Observability

As with collectors, comprehensive logging, tracing, and metrics are crucial for monitoring the transformation process, identifying data quality issues, and debugging performance problems.

3. The transformer skeleton

3.1. Project structure

A typical transformer project looks like this (static data files are added only when the transformation logic needs them):

.
├── docker-compose.yml # Local development setup
├── infrastructure
│ ├── docker
│ │ └── Dockerfile # Containerization instructions
│ └── helm # Helm charts for Kubernetes deployment
├── resources # Static data files required by the transformer (optional)
├── src # Go source code
│ ├── dto.go # Data Transfer Objects (raw data schema)
│ ├── go.mod
│ ├── main.go # Main application logic
│ └── main_test.go # Unit tests
└── testdata # Test input and expected output files
├── input
└── output

3.2. Configuration via environment variables

By design, all configuration is done via environment variables. This simplifies the development cycle and deployment, letting you use a .env file for local testing and Helm values.yaml for deployment.

3.3. Consuming raw data with the SDK

Every transformer uses the opendatahub-go-sdk to initialize common services and to consume raw data from the message queue. The skeleton is the same regardless of the write target:

func main() {
ms.InitWithEnv(context.Background(), "", &env) // logging, messaging, telemetry
defer tel.FlushOnPanic()

// ... set up your write client (BDP or Content API) ...

listener := tr.NewTr[YourDTO](context.Background(), env.Env)
err := listener.Start(context.Background(), Transform)
ms.FailOnError(context.Background(), err, "error while listening to queue")
}

func Transform(ctx context.Context, payload *rdb.Raw[YourDTO]) error {
// map payload.Rawdata, then push to your target
return nil
}
  • ms.InitWithEnv sets up logging, the messaging consumer, and telemetry based on tr.Env.
  • tr.NewTr[YourDTO] creates the listener; the type parameter is the Go struct the raw data is unmarshaled into (received as rdb.Raw[YourDTO]).
  • listener.Start(ctx, handler) consumes messages from the configured queue and calls your handler for each one.
warning

The type passed to tr.NewTr[T] is used to deserialize the raw data directly. If the collector stored the raw payload as a serialized JSON string (very common), construct the listener with string and wrap your handler with RawString2JsonMiddleware, which deserializes the string for you:

listener := tr.NewTr[string](context.Background(), env.Env)
err := listener.Start(context.Background(), tr.RawString2JsonMiddleware[YourDTO](Transform))

SDK environment variables

When using the official SDK, these environment variables configure the transformer:

VariableDescriptionAllowed ValuesDefault
PROVIDERString identifying the data provider[path1]/[path2]/...(No default set)
MQ_URIConnection URI to connect to RabbitMQ-(No default set)
MQ_CLIENTClient name to identify the connection in RabbitMQ-(No default set)
MQ_EXCHANGEThe exchange notifying the new data-routed
MQ_QUEUERabbitMQ queue this transformer pulls messages from-(No default set)
MQ_KEYRouting key used to route messages from MQ_EXCHANGE to MQ_QUEUEpath1.path2...(No default set)
RAW_DATA_BRIDGE_ENDPOINTEndpoint of the Raw Data Bridge, which retrieves raw data-(No default set)
LOG_LEVELSets the logging severity levelDEBUG, INFO, WARN, ERRORINFO
SERVICE_NAMEThe name of the service-gotel
SERVICE_VERSIONThe version of the service-0.0.1
TELEMETRY_ENABLEDEnables or disables telemetrytrue, falsetrue
TELEMETRY_TRACE_ENABLEDEnables or disables trace telemetrytrue, falsetrue
TELEMETRY_TRACE_GRPC_ENDPOINTThe gRPC endpoint for trace export-localhost:4317
TELEMETRY_METRICS_ENABLEDEnables or disables metrics telemetrytrue, falsefalse
TELEMETRY_METRICS_GRPC_ENDPOINTThe gRPC endpoint for metrics export-localhost:4317

The target-specific variables (the BDP_* settings for the Timeseries Writer, or the ODH_CORE_* settings for the Content API) are documented on the two target pages.

4. Containerization with Docker

The Dockerfile uses a multi-stage build. Copy any static resource files into both the build and final stages.

# SPDX-FileCopyrightText: 2024 NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: CC0-1.0

FROM golang:1.25-bookworm as base

FROM base as build-env
WORKDIR /app
COPY src/. . # Copy source code
COPY resources/. ./resources # Copy static resource files (if any)
RUN go mod download
RUN CGO_ENABLED=0 GOOS=linux go build -o main

# BUILD published image (minimal, production-ready)
FROM alpine:latest as build
WORKDIR /app
COPY --from=build-env /app/main .
COPY --from=build-env /app/resources /resources
ENTRYPOINT [ "./main"]

# LOCAL DEVELOPMENT (for hot-reloading/easier debugging)
FROM base as dev
WORKDIR /code
CMD ["go", "run", "main.go"]
warning

Keep the go directive in your go.mod no higher than the Go version of this Dockerfile's base image (this guide uses golang:1.25), or the container build fails with go.mod requires go >= ... (running go ...).

5. Local Orchestration with Docker Compose

The transformer's docker-compose.yml needs a message queue (RabbitMQ) to consume from. The dev profile brings up its own RabbitMQ; the bdp profile runs against the shared infrastructure-v2 stack on the external ingestion network.

x-app-common:
&app-common
build:
dockerfile: infrastructure/docker/Dockerfile
context: .
target: dev
env_file:
- .env
volumes:
- ./src:/code
- ./infrastructure:/code/infrastructure
- pkg:/go/pkg/mod
working_dir: /code

services:
app-bdp:
<<: *app-common
profiles:
- bdp
networks:
- ingestion

app:
<<: *app-common
depends_on:
rabbitmq:
condition: service_healthy
profiles:
- dev

rabbitmq:
extends:
file: ../lib/docker-compose/docker-compose.rabbitmq.yml
service: rabbitmq
attach: false
profiles:
- dev

volumes:
pkg:

networks:
ingestion:
external: true
warning

When developing standalone, start the transformer with its own rabbitmq service:

docker compose --profile dev up
warning

When testing the complete pipeline (together with the collector and the full Open Data Hub Core), start the transformer without its own rabbitmq service:

docker compose --profile bdp up

6. Local Development Workflow

  1. Clone repositories: opendatahub-collectors (your transformer's source) and infrastructure-v2 (the shared compose files).
  2. Navigate to your transformer's directory: cd opendatahub-collectors/transformers/foo-bar.
  3. Create .env: copy the provided .env content and set the MQ_* queue settings, RAW_DATA_BRIDGE_ENDPOINT, and the target-specific credentials (see the Timeseries or Content page).
  4. Start the infrastructure: from the infrastructure-v2 directory:
    docker compose -f docker-compose.yml up -d
    docker compose -f docker-compose.timeseries.yml up -d
  5. Start the transformer: from your transformer's directory:
    docker compose --profile bdp up --build

Testing the data flow

  • Transformer logs: observe the logs of your app (or app-bdp) container.
  • RabbitMQ Management: http://localhost:15673 (guest/guest). The infrastructure-v2 stack maps the management UI to host port 15673; a transformer or collector running with --profile dev would take 15672.
  • MongoDB: mongodb://localhost:27017/?directConnection=true. Inspect the raw data stored by the SDK before it is picked up.

7. Choosing your write target

The skeleton above is shared. The write side depends on the data: