Skip to content

Implementing WorldStateStoreService

The WorldStateStoreService streams geometries and frames to clients as *commonpb.Transform protos. The draw package handles the tedious parts — converting a spatialmath.Geometry into a commonpb.Geometry, deriving stable UUIDs, and packing color/visibility metadata.

If you only need to render snapshots in a browser, see Embedding <Visualizer /> instead — that path doesn’t require implementing a service.

A WorldStateStoreService exposes three RPCs:

  • ListUUIDs — every transform you currently track.
  • GetTransform(uuid) — full data for one transform.
  • StreamTransformChanges — live ADDED / UPDATED / REMOVED events.

The <Visualizer /> subscribes and relies on the WorldStateStoreService to emit deltas. That means your service has to maintain internal state, diff it against new data, and broadcast the diff.

This guide walks through that loop with one concrete example: an obstacle store that polls a Viam sensor named obstacle-sensor, reconciles the readings against its internal state, and streams the deltas. The sensor’s Readings() returns a list of obstacles, each with a stable id, a position, dimensions, and an ending_soon boolean. The store keys off the id: same id, different position or dimensions ⇒ UPDATED; previously-seen id missing from a tick ⇒ REMOVED. Newly-added obstacles render blue at 50% opacity, the final frame before removal renders red at 50% opacity, and steady-state obstacles render orange — all driven by the diff plus the ending_soon signal.

The complete runnable module lives at docs/examples/worldstatestore/. Use store.go as the reference implementation while reading the rest of this guide.

obstacle-store/
├── go.mod
├── main.go         # module bootstrap
└── store.go        # WorldStateStoreService implementation
go get github.com/viam-labs/motion-tools/draw
go get go.viam.com/rdk
go get go.viam.com/api
package main

import (
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/golang/geo/r3"
    "github.com/viam-labs/motion-tools/draw"
    commonpb "go.viam.com/api/common/v1"
    pb "go.viam.com/api/service/worldstatestore/v1"
    "go.viam.com/rdk/components/sensor"
    "go.viam.com/rdk/logging"
    "go.viam.com/rdk/resource"
    "go.viam.com/rdk/services/worldstatestore"
    "go.viam.com/rdk/spatialmath"
)

const pollInterval = time.Second

type obstacleStore struct {
    resource.Named
    resource.TriviallyReconfigurable
    resource.TriviallyCloseable

    logger logging.Logger
    sensor sensor.Sensor

    mu         sync.RWMutex
    transforms map[string]*commonpb.Transform // keyed by obstacle id
    last       map[string]obstacle            // last-seen reading per id, for diffing

    streamCtx context.Context
    cancel    context.CancelFunc

    subsMu sync.Mutex
    subs   map[chan worldstatestore.TransformChange]struct{}
}

func newObstacleStore(
    ctx context.Context,
    deps resource.Dependencies,
    conf resource.Config,
    logger logging.Logger,
) (worldstatestore.Service, error) {
    obstacleSensor, err := sensor.FromProvider(deps, "obstacle-sensor")
    if err != nil {
        return nil, fmt.Errorf("getting obstacle-sensor: %w", err)
    }

    // The poll loop must outlive the constructor's ctx (which is cancelled
    // once construction returns), so derive from context.Background() and
    // tie the lifetime to Close() via the cancel func instead.
    streamCtx, cancel := context.WithCancel(context.Background())
    s := &obstacleStore{
        Named:      conf.ResourceName().AsNamed(),
        logger:     logger,
        sensor:     obstacleSensor,
        transforms: make(map[string]*commonpb.Transform),
        last:       make(map[string]obstacle),
        streamCtx:  streamCtx,
        cancel:     cancel,
        subs:       make(map[chan worldstatestore.TransformChange]struct{}),
    }

    go s.pollLoop()
    return s, nil
}

func (s *obstacleStore) Close(ctx context.Context) error {
    s.cancel()
    s.subsMu.Lock()
    for ch := range s.subs {
        delete(s.subs, ch)
        close(ch)
    }
    s.subsMu.Unlock()
    return nil
}

subs is the fan-out for StreamTransformChanges. Every active subscription owns one channel; broadcast writes to all of them. Concurrent clients don’t starve each other.

func (s *obstacleStore) ListUUIDs(ctx context.Context, extra map[string]any) ([][]byte, error) {
    s.mu.RLock()
    defer s.mu.RUnlock()

    uuids := make([][]byte, 0, len(s.transforms))
    for _, t := range s.transforms {
        uuids = append(uuids, t.Uuid)
    }
    return uuids, nil
}

func (s *obstacleStore) GetTransform(ctx context.Context, uuid []byte, extra map[string]any) (*commonpb.Transform, error) {
    s.mu.RLock()
    defer s.mu.RUnlock()

    // O(n) scan — fine for tens of obstacles. If you expect hundreds or
    // thousands, maintain a parallel uuid->transform map instead.
    for _, t := range s.transforms {
        if string(t.Uuid) == string(uuid) {
            return t, nil
        }
    }
    return nil, resource.NewNotFoundError(resource.NewName(worldstatestore.API, string(uuid)))
}

func (s *obstacleStore) StreamTransformChanges(
    ctx context.Context,
    extra map[string]any,
) (*worldstatestore.TransformChangeStream, error) {
    ch := make(chan worldstatestore.TransformChange, 100)

    s.subsMu.Lock()
    s.subs[ch] = struct{}{}
    s.subsMu.Unlock()

    // Tear down the subscription when the client disconnects.
    go func() {
        <-ctx.Done()
        s.subsMu.Lock()
        if _, ok := s.subs[ch]; ok {
            delete(s.subs, ch)
            close(ch)
        }
        s.subsMu.Unlock()
    }()

    return worldstatestore.NewTransformChangeStreamFromChannel(ctx, ch), nil
}
type obstacle struct {
    ID         string
    X, Y, Z    float64
    SX, SY, SZ float64
    EndingSoon bool   // sensor's lookahead: this obstacle will be gone next poll
    Lifecycle  string // derived in reconcile: "appearing" | "alive" | "disappearing"
}

const (
    lifecycleAppearing    = "appearing"
    lifecycleAlive        = "alive"
    lifecycleDisappearing = "disappearing"
)

func (s *obstacleStore) pollLoop() {
    ticker := time.NewTicker(pollInterval)
    defer ticker.Stop()

    for {
        select {
        case <-s.streamCtx.Done():
            return
        case <-ticker.C:
            obstacles, err := s.readObstacles()
            if err != nil {
                s.logger.Warnw("failed to read obstacles", "err", err)
                continue
            }
            s.reconcile(obstacles)
        }
    }
}

func (s *obstacleStore) readObstacles() ([]obstacle, error) {
    readings, err := s.sensor.Readings(s.streamCtx, nil)
    if err != nil {
        return nil, err
    }

    raw, ok := readings["obstacles"].([]any)
    if !ok {
        return nil, fmt.Errorf("sensor returned no obstacles slice")
    }

    out := make([]obstacle, 0, len(raw))
    for _, r := range raw {
        m, _ := r.(map[string]any)
        if m == nil {
            continue
        }
        id, _ := m["id"].(string)
        if id == "" {
            continue
        }
        out = append(out, obstacle{
            ID: id,
            X:  floatField(m, "x"), Y: floatField(m, "y"), Z: floatField(m, "z"),
            SX: floatField(m, "sx"), SY: floatField(m, "sy"), SZ: floatField(m, "sz"),
            EndingSoon: boolField(m, "ending_soon"),
        })
    }
    return out, nil
}

reconcile is where the diff happens. We index the next tick by id, then walk both maps to figure out adds, updates, and removals. Because we keep the previous reading around in last, we can also build a precise field mask for each UPDATED event — clients can apply the partial transform without re-fetching the whole thing.

The same loop derives each obstacle’s Lifecycle. The split is symmetric: the store knows additions (a new id ⇒ “appearing”), the sensor knows removals (it tagged the obstacle with EndingSoon based on its lookahead ⇒ “disappearing”), and everything else is “alive”. A change in lifecycle adds metadata to the field mask, since color and opacity live in Transform.metadata:

func (s *obstacleStore) reconcile(obstacles []obstacle) {
    next := make(map[string]obstacle, len(obstacles))
    for _, o := range obstacles {
        next[o.ID] = o
    }

    s.mu.Lock()
    defer s.mu.Unlock()

    // ADDED + UPDATED
    for id, o := range next {
        prev, seen := s.last[id]
        // Lifecycle: appearing on the first frame we ever see this id; the
        // sensor's lookahead tags the final frame as ending-soon; everything
        // else is alive.
        switch {
        case !seen:
            o.Lifecycle = lifecycleAppearing
        case o.EndingSoon:
            o.Lifecycle = lifecycleDisappearing
        default:
            o.Lifecycle = lifecycleAlive
        }

        if seen && prev == o {
            continue // unchanged — skip the rebuild and the broadcast
        }
        transform, err := buildTransform(o)
        if err != nil {
            s.logger.Warnw("failed to build transform", "err", err, "id", id)
            continue
        }
        s.transforms[id] = transform
        s.last[id] = o

        if !seen {
            s.emit(transform, pb.TransformChangeType_TRANSFORM_CHANGE_TYPE_ADDED, nil)
            continue
        }
        // Surgical field mask: only list what actually changed. Path strings
        // are proto field names (snake_case), per the worldstatestore spec.
        var fields []string
        if prev.X != o.X || prev.Y != o.Y || prev.Z != o.Z {
            fields = append(fields, "pose_in_observer_frame")
        }
        if prev.SX != o.SX || prev.SY != o.SY || prev.SZ != o.SZ {
            fields = append(fields, "physical_object")
        }
        if prev.Lifecycle != o.Lifecycle {
            fields = append(fields, "metadata")
        }
        s.emit(transform, pb.TransformChangeType_TRANSFORM_CHANGE_TYPE_UPDATED, fields)
    }

    // REMOVED — anything in the previous tick that's missing now.
    // The Go spec explicitly allows deleting from a map during a range loop;
    // the deleted entry just won't be visited again. No copy needed.
    for id, t := range s.transforms {
        if _, present := next[id]; present {
            continue
        }
        delete(s.transforms, id)
        delete(s.last, id)
        s.emit(&commonpb.Transform{Uuid: t.Uuid}, pb.TransformChangeType_TRANSFORM_CHANGE_TYPE_REMOVED, nil)
    }
}
  • WithID(o.ID) makes the UUID deterministic from the obstacle’s id, so the same obstacle on every poll produces the same UUID — a requirement for UPDATED events to match on the client.
  • colorFor(o.Lifecycle) switches color and opacity per lifecycle, so the visual encoding lines up with the change events: blue/50% on first appearance, red/50% on the final frame, orange while alive.
func buildTransform(o obstacle) (*commonpb.Transform, error) {
    box, err := spatialmath.NewBox(
        spatialmath.NewZeroPose(),
        r3.Vector{X: o.SX, Y: o.SY, Z: o.SZ},
        o.ID,
    )
    if err != nil {
        return nil, err
    }

    drawn, err := draw.NewDrawnGeometry(box, draw.WithGeometryColor(colorFor(o.Lifecycle)))
    if err != nil {
        return nil, err
    }

    return drawn.Draw(o.ID,
        draw.WithID(o.ID),
        draw.WithPose(spatialmath.NewPoseFromPoint(r3.Vector{X: o.X, Y: o.Y, Z: o.Z})),
    )
}

// colorFor maps lifecycle → color. Opacity rides on the alpha channel:
// 127/255 ≈ 50%. Picked via colour name + SetAlpha rather than raw RGBA so
// the intent stays readable.
func colorFor(lifecycle string) draw.Color {
    switch lifecycle {
    case lifecycleAppearing:
        return draw.ColorFromName("blue").SetAlpha(127)
    case lifecycleDisappearing:
        return draw.ColorFromName("red").SetAlpha(127)
    default:
        return draw.ColorFromName("orange")
    }
}

For other primitive shapes (sphere, capsule, mesh), substitute the corresponding spatialmath.New* constructor — draw.NewDrawnGeometry takes any spatialmath.Geometry. For point clouds, see draw.NewDrawnPointCloud.

func (s *obstacleStore) emit(
    transform *commonpb.Transform,
    changeType pb.TransformChangeType,
    updatedFields []string,
) {
    change := worldstatestore.TransformChange{
        ChangeType:    changeType,
        Transform:     transform,
        UpdatedFields: updatedFields,
    }

    s.subsMu.Lock()
    subs := make([]chan worldstatestore.TransformChange, 0, len(s.subs))
    for ch := range s.subs {
        subs = append(subs, ch)
    }
    s.subsMu.Unlock()

    for _, ch := range subs {
        select {
        case ch <- change:
        case <-s.streamCtx.Done():
            return
        default:
            // Subscriber is full; drop this change rather than blocking the
            // poll loop. In practice, give them a deeper buffer or a slow-
            // consumer policy that suits your latency requirements.
        }
    }
}

The pattern generalizes:

  1. Maintain an internal map keyed by the identity your source provides (sensor-emitted id, source UUID, etc.).
  2. On each event, build the next set, diff against current, and emit ADDED / UPDATED / REMOVED.
  3. Use draw API tools like draw.NewDrawn* + Draw(name, draw.WithID(id)) to produce *commonpb.Transforms with deterministic UUIDs so updates land on the same entity client-side.
  4. Fan out broadcasts so multiple subscribers don’t block each other.

For the draw API surface — colors, metadata, point clouds — see the draw reference. Note the world state store service only works with Transform objects from the Viam API. It is not able to render Drawings.