Skip to main content

Pushing to the Content API

This page covers the write side of a transformer that produces structured content (announcements, points of interest, events, and similar), using the clib package of the opendatahub-go-sdk. For the shared transformer skeleton (the SDK listener, environment variables, containerization, and local run), see Developing a Data Transformer from Scratch.

This guide uses the traffic-event-a22-opendata transformer (which turns A22 motorway events into Content API Announcement entities) as the reference implementation.

1. The clib package

github.com/noi-techpark/opendatahub-go-sdk/clib is the SDK's Content API client. Its ContentAPI interface exposes four operations:

MethodPurpose
Get(ctx, apiPath, queryParams, out)Read entities (used by the cache loader).
Post(ctx, apiPath, queryParams, payload)Create an entity (used by tag sync).
Put(ctx, apiPath, id, payload)Update a single entity by its ID.
PutMultiple(ctx, apiPath, payload)Upsert a list of entities in one request.
warning

PutMultiple (bulk upsert) is supported only by some entity endpoints. Where it is not available, write entities individually with Put (update by ID) or Post (create). The example in this guide uses PutMultiple because the Announcement endpoint supports it.

Create the client with OAuth2 client credentials against the Core:

contentClient, err := clib.NewContentClient(clib.Config{
BaseURL: env.ODH_CORE_URL, // e.g. https://api.tourism.testingmachine.eu/v1
TokenURL: env.ODH_CORE_TOKEN_URL, // Keycloak token endpoint
ClientID: env.ODH_CORE_TOKEN_CLIENT_ID,
ClientSecret: env.ODH_CORE_TOKEN_CLIENT_SECRET,
DisableOAuth: env.ODH_CORE_TOKEN_URL == "", // read-only access can skip auth
})
ms.FailOnError(context.Background(), err, "failed to create content client")

NewContentClient fetches an initial token up front (so bad credentials fail fast), retries failed requests, and emits OpenTelemetry spans. The returned ContentClient is safe for concurrent use.

info

Reading from the Content API is open, so for a read-only run you can set DisableOAuth: true and omit the token settings. Creating or updating content requires client credentials authorized to write to the Content API; request them from the Open Data Hub team.

Environment variables

In addition to the standard transformer variables (MQ_*, LOG_LEVEL, and telemetry, documented in Developing a Data Transformer from Scratch), a Content API transformer reads:

VariableDescription
ODH_CORE_URLContent API base URL, including the /v1 path.
ODH_CORE_TOKEN_URLOAuth2 token endpoint (Keycloak). Leave empty for a read-only run.
ODH_CORE_TOKEN_CLIENT_IDClient ID authorized to write content.
ODH_CORE_TOKEN_CLIENT_SECRETClient secret.

2. Stateful memory with the cache (optional)

Not every transformer needs this. If yours simply creates or overwrites entities, call Post or Put directly and skip to the next section. The cache is for transformers that must detect changes and keep a set of entities in sync: create new ones, update only the ones that changed, and end ones that disappeared from the source. The Content API itself is the state store, and clib rebuilds the cache from it on startup, so there is no separate database to manage.

On startup, load the entities you already own into a cache with LoadExisting:

annCache, err = clib.LoadExisting(context.Background(), contentClient, clib.LoadConfig[Announcement]{
EntityType: "Announcement",
QueryParams: map[string]string{
"active": "true",
"source": "a22",
"rawfilter": "isnotnull(Mapping.ProviderA22Open.Id)",
},
IDFunc: func(a Announcement) string { return *a.ID },
})
ms.FailOnError(context.Background(), err, "failed to load announcements")

LoadExisting pages through the matching entities (using the Content API's Items / TotalPages envelope), hashes each one, and returns a Cache[T] keyed by your IDFunc. This is the transformer's memory of the current state, rebuilt from the Content API on every startup, so it survives restarts without a separate database.

During transformation, use the cache to detect changes and push only what actually changed:

hash, changed, err := annCache.HasChanged(id, ann)
if changed {
annCache.Set(id, ann, hash)
list = append(list, ann) // only new or changed entities
}

HasChanged hashes the entity and compares it with the cached hash. Control which fields participate in the hash with struct tags:

  • hash:"ignore" on volatile or server-managed fields (timestamps, _Meta, IDs) so they do not trigger spurious updates.
  • hash:"set" on slices whose order is not significant (for example TagIds, HasLanguage).
type Generic struct {
ID *string `json:"Id,omitempty" hash:"ignore"`
Meta *clib.Metadata `json:"_Meta,omitempty" hash:"ignore"`
Active bool `json:"Active"`
TagIds []string `json:"TagIds,omitempty" hash:"set"`
// ...
}
warning

Fields that participate in the hash must be concretely typed. A map[string]any does not hash consistently, so an entity that uses one would always look changed and be re-pushed on every run. In the example, the Mapping field is given an explicit struct type for exactly this reason.

To detect entities that have ended (present in the cache but absent from the latest batch), iterate the cache after processing the batch:

for id, entry := range annCache.Entries() {
if _, stillPresent := seen[id]; stillPresent {
continue
}
ann := entry.Entity
ann.EndTime = &sourceTime // mark it ended
list = append(list, ann)
annCache.Delete(id)
}

3. Deterministic IDs

Give every entity a stable ID derived from a unique source key, so that re-processing the same input updates the same entity instead of creating a duplicate:

const ID_TEMPLATE = "urn:announcements:a22"

func generateOpendataID(event dto.A22OpendataEvent) string {
return clib.GenerateID(ID_TEMPLATE, event.IDNotizia) // "urn:announcements:a22:{uuid5}"
}

GenerateID(prefix, input) returns {prefix}:{uuid5(input)}. The UUID is a deterministic version 5 hash of the input, so the same input always yields the same ID.

4. Tag sync

Tags are defined declaratively in a JSON file and synced to the Content API on startup. Each definition carries its multilingual names and the entity types it is valid for:

resources/tags.json
[
{
"id": "announcement:traffic-event",
"name-it": "Evento di Traffico",
"name-de": "Verkehrsereignis",
"name-en": "Traffic Event",
"types": ["announcement"]
},
{
"id": "traffic-event:road-work",
"name-it": "Cantiere Stradale",
"name-de": "Baustelle",
"name-en": "Road Work",
"types": ["announcement", "traffic-event"]
}
]
tags, err := clib.ReadTagDefs("../resources/tags.json")
ms.FailOnError(context.Background(), err, "failed to read tags")

err = clib.SyncTags(context.Background(), contentClient, tags, clib.SyncTagsConfig{
Source: "announcement",
})
ms.FailOnError(context.Background(), err, "failed to sync tags")

SyncTags creates each tag with a POST, using the id from the definition as the tag's ID. Tags that already exist are ignored (the SDK returns ErrAlreadyExists, which SyncTags treats as success), so it is safe to run on every startup. Without an explicit LicenseInfo, tags default to CC0.

5. Putting it together

The main function wires the standard ingestion listener to the clib client, the cache, and tag sync:

func main() {
ms.InitWithEnv(context.Background(), "", &env)
slog.Info("Starting Content API transformer...")
defer tel.FlushOnPanic()

contentClient, _ = clib.NewContentClient(clib.Config{ /* ... */ })
annCache, _ = clib.LoadExisting(/* ... */) // stateful memory
tags, _ = clib.ReadTagDefs("../resources/tags.json")
clib.SyncTags(context.Background(), contentClient, tags, clib.SyncTagsConfig{Source: "announcement"})

listener := tr.NewTr[string](context.Background(), env.Env)
listener.Start(context.Background(), tr.RawString2JsonMiddleware(Transform))
}

The body of Transform maps each raw event to an entity, skips unchanged entities via the cache, marks ended ones, and upserts the result:

func Transform(ctx context.Context, r *rdb.Raw[dto.Root]) error {
// 1. map each raw event to an Announcement
// 2. skip unchanged entities with annCache.HasChanged
// 3. mark ended entities (in the cache but not in this batch)
// 4. push the changed and ended entities
if len(list) == 0 {
return nil
}
return contentClient.PutMultiple(ctx, "Announcement", list)
}

The listener is the same one every transformer uses; the right way to consume the raw data depends on how the collector stored it (tr.NewTr[string] with RawString2JsonMiddleware for a serialized JSON string, or tr.NewTr[YourDTO] directly for structured data). See Developing a Data Transformer from Scratch. Only the body of Transform and the clib write calls are specific to the Content API.

6. Testing

clib ships a mock of the ContentAPI interface, so you can test transformation logic without a live Core. Inject clibmock.ContentMock in place of the real client and assert on the calls it records:

import "github.com/noi-techpark/opendatahub-go-sdk/clib/clibmock"

This is the same input/output, golden-file approach used with bdpmock for time series transformers (the example ships testdata/in*.json and out*.json).

7. Reference implementation

The complete, runnable example is transformers/traffic-event-a22-opendata in the opendatahub-collectors monorepo. It shows mapping, hashing, ended-event detection, geo enrichment, the clib content model (built on clib.Metadata, clib.LicenseInfo, and clib.GpsInfo), and the full .env, Dockerfile, and Helm configuration.