Skip to main content

Coordinator

1. Coordination lifecycle (events)

EventInitiatorWhen
ALL_SERVICES_READYservices-managerOnce at bootstrap, after all services attach (services.js)
BEFORE/AFTER_COMPONENTS_*dataspacePer app load / unload
START/FINISH_BATCH_UNREGISTERdataspaceInside unload() during app load swap and full teardown
COMPONENT_REGISTEREach component with WithCoordinationHelperone emit per instance on loadComponents
COMPONENT_UNREGISTEREach componentunregisterStreams / before('teardown')
ALL_COMPONENTS_STREAMS_PLUGGEDCoordinatorAfter component plug task opens the activation gate
sequenceDiagram
participant SM as services-manager
participant DS as dataspace
participant CO as Coordinator
participant S as Service
participant C as Component

Note over SM,S: Services graph (once, before any app load)
SM->>CO: ALL_SERVICES_READY
CO->>CO: startBatchOperation
CO->>CO: servicesReady plug task
CO->>CO: setup globalActivationIngress (syncProp true)
Note over CO,S: plug task — per target, per batch
CO->>S: RUNTIME_CHANNELS_INFO
opt removed edges only
CO->>S: UNBIND_CHANNEL / UNBIND_GROUP_CHANNEL
end
Note over CO: commit unplug, create edge links (events enqueue globally)
CO->>S: BIND_CHANNEL / BIND_GROUP_CHANNEL
Note over S: bindChannel (handler)
CO->>S: ALL_SERVICES_STREAMS_CONNECTED
Note over S: servicesStreamsConnected
CO->>S: ALL_SERVICES_STREAMS_ACTIVATED
Note over S: servicesStreamsActivated
CO->>S: OLD_LIFECYCLE
Note over S: disconnectStreams → streamsConnected → streamsActivated
Note over CO: syncProp false → global activation flush
CO->>CO: completeBatchOperation

Note over DS,C: Component graph (per app load)
DS->>CO: BEFORE_COMPONENTS_LOAD
DS->>CO: BEFORE_COMPONENTS_UNLOAD
DS->>CO: START_BATCH_UNREGISTER
CO->>CO: startBatchOperation
par each torn-down instance
C->>CO: COMPONENT_UNREGISTER
CO->>C: UNBIND_CHANNEL (edges for that node)
end
DS->>CO: FINISH_BATCH_UNREGISTER
Note over CO: completeBatchOperation → physical unplug
CO->>CO: completeBatchOperation
DS->>CO: AFTER_COMPONENTS_UNLOAD
Note over DS,C: loadComponents — each new instance
par each new instance
C->>C: initialize
C->>C: registerStreams (+ after registerStreams)
C->>CO: COMPONENT_REGISTER
end
DS->>CO: AFTER_COMPONENTS_LOAD
Note over CO: loadingTracker = 0
CO->>CO: startBatchOperation
CO->>CO: componentsReady plug task
CO->>CO: setup globalActivationIngress (syncProp true)
Note over CO,C: plug task — per target, per batch
CO->>C: RUNTIME_CHANNELS_INFO
opt removed edges only (map diff)
CO->>C: UNBIND_CHANNEL / UNBIND_GROUP_CHANNEL
end
Note over CO: commit unplug (diff), create edge links (events enqueue globally)
CO->>C: BIND_CHANNEL / BIND_GROUP_CHANNEL
Note over C: bindChannel (handler)
CO->>C: OLD_LIFECYCLE
Note over C: disconnectStreams → streamsConnected → streamsActivated
Note over CO: syncProp false → global activation flush
CO->>CO: ALL_COMPONENTS_STREAMS_PLUGGED
CO->>CO: completeBatchOperation

Note over DS,C: App unload
DS->>CO: BEFORE_COMPONENTS_UNLOAD
DS->>CO: START_BATCH_UNREGISTER
par each instance
C->>CO: COMPONENT_UNREGISTER
CO->>C: UNBIND_CHANNEL (edges for that node)
end
DS->>CO: FINISH_BATCH_UNREGISTER
Note over CO: completeBatchOperation → physical unplug
DS->>CO: AFTER_COMPONENTS_UNLOAD

Wire buffering: events produced during BIND / activation hooks are enqueued into one plug-task activation buffer and released only after syncProp false (See 5).

2. Streaming lifecycles (component API)

Coordinator actions (BIND, OLD_LIFECYCLE, …) are distinct from Dive hooks (registerStreams, streamsConnected, …). Hooks run inside the component when coordination messages arrive.

Component Sequence (one instance)

PhaseWhereWhat runs
InitializeloadComponentsinitializeregisterStreams() (declares ports / channel IDs; after('registerStreams') on the component) → COMPONENT_REGISTER
UnregisterCOMPONENT_UNREGISTERunregisterStreamsCOMPONENT_UNREGISTER; coordinator UNBIND_*
Plug batchCoordinator → ComponentruntimeChannelsInfounbindChannel (if any) → wire commit → bindChannel → v1: OLD_LIFECYCLE hooks or v2-only logic in bindChannel
ServicesServices graph plugSame through bindChannel, then servicesStreamsConnectedALL_SERVICES_STREAMS_CONNECTED, servicesStreamsActivatedALL_SERVICES_STREAMS_ACTIVATED, then OLD_LIFECYCLE v1 chain
Data emitAfter syncProp = falseBuffered wire events drain globally in FIFO order (See 5); safe to emit downstream on v1 after streamsActivated

NOTE: OLD_LIFECYCLE handler (with_coordination_helper.js) always calls three V1 methods in one synchronous step when the component defines them.

Old vs New API

Old API (WithStreams)New API (WithStreamsV2 only)
MixinsWithStreamsV2 + WithStreamsV2toV1WithStreamsV2 only
registerStreamsBase map + v2toV1 before creates local bus per portRegisters channel IDs only
Wire attachCoordinator bindChannel → plug link into local busbindChannel plugs into link directly
ActivationstreamsConnected / streamsActivated via OLD_LIFECYCLEOften only bindChannel / runtimeChannelsInfo
Typical emitafter('streamsActivated') or handler bodybindChannel / constant on link
sequenceDiagram
participant CO as Coordinator
participant C as Component

Note over C: instance init (before plug)
C->>C: initialize
C->>C: registerStreams
Note over C: registerInputStream / registerOutputStream

Note over CO,C: plug batch (See 1)
CO->>C: RUNTIME_CHANNELS_INFO
Note over C: runtimeChannelsInfo
CO->>C: BIND_CHANNEL
Note over C: bindChannel
CO->>C: OLD_LIFECYCLE
Note over C: disconnectStreams
Note over C: streamsConnected
Note over C: streamsActivated
Note over CO: syncProp false — global FIFO drain
flowchart LR
subgraph init["Instance init"]
I[initialize]
RS[registerStreams]
I --> RS
end

subgraph plug["Plug batch (v1 WithStreams)"]
RT[runtimeChannelsInfo]
BC[bindChannel]
SC[streamsConnected]
SA[streamsActivated]
RT --> BC --> SC --> SA
end

init --> plug

NOTE: Graph delivery is gated by coordinator buffering (See 5), not by blocking component handlers.

3. Mixins

WithStreamsV2 — Channel API: registerInputStream / registerOutputStream, bindChannel / unbindChannel, runtimeChannelsInfo. Coordinator owns wire streams (aka links).

WithStreamsV2toV1 — Bridge for legacy components: creates a local bus per port, maps V1-style getInputStream / getOutputStream onto those buses, plugs coordinator link in bindChannel. NOT a separate lifecycle, but adapter on top of V2.

WithStreamsWithStreamsV2 + WithStreamsV2toV1. Default for most data components.

WithCoordinationHelper — Subscribes to coordination stream; dispatches BIND_CHANNEL, OLD_LIFECYCLE (→ v1 hooks if defined), ALL_SERVICES_STREAMS_*, etc.

WithStreamStatus — Wire error/status types on streams.

4. Component examples

ComponentAPILifecycle events usedPorts / emit
constant (array)New (WithStreamsV2)runtimeChannelsInfo, bindChannelOn bindChannel output: Kefir.constant([1,2]) if no trigger; ignores streamsActivated
combine-byOld (WithStreams)streamsActivated arms combinerInputs via graph after flush; peek reads live p1 buffer
crossfilter-aggregationMixed (V2 + V2toV1)streamsActivated / streamsConnected emitV2 channels + v1 buses; emit after activation hooks

5. Stream plug sequence and global event ordering

The coordinator has two ordering concerns during componentsReady / servicesReady:

  1. Coordination actions must make components see a consistent topology: runtime info and removed-edge unbinds first, physical unplug, new-edge binds, activation hooks.
  2. Data events produced by those binds and hooks must not reach downstream components until every updated component has received its activation actions.

Plug task order

Each plug task creates a fresh syncProp initialized to true, then calls _setupGlobalActivationBuffer(syncProp) before any edge creates a buffered stream. The task then:

  1. Builds runtime-channel info and diffs the previous coordination map against the new map.
  2. Sends runtime info plus target-side unbinds, then source-side unbinds, then other unbind actions.
  3. Calls _commitUnplug() so old physical Kefir plugs are removed after components were told to detach.
  4. Creates new edge links with _plugSimpleStream / _plugGroupStream; each edge gets a source-side target bus, a target-side source pool, a bufferedTarget, optional transformations, and BIND_* actions.
  5. Waits for transformation attachment promises, appends service activation actions or component OLD_LIFECYCLE, updates component status, and sends the bind/activation batch.
  6. Opens the activation gate with sync.plug(Kefir.constant(false)). Component plug tasks then broadcast ALL_COMPONENTS_STREAMS_PLUGGED.

orderedEdgeIds still controls edge creation order for component graphs, but the activation buffer means startup data delivery no longer depends on that order.

Event pipeline

_getBufferedStream now splits an edge ingress into three stages:

flowchart LR
OUT[component/service output link] --> TI[targetIngress]
TI -->|assign global index| G[globalActivationIngress]
G --> BUF[bufferWhileBy syncProp]
BUF -->|FIFO drain| PA[edge postActivation bus]
PA --> BS[bufferedTarget status + metadata]
BS --> TR[wire transformations]
TR --> IN[target input link]

While syncProp is true, every value/error/end event from every newly plugged edge is wrapped as { deliveryStream, event } and pushed into the single globalActivationIngress. GLOBAL_EVENT_NUMBER is assigned before enqueue, so debugger/common-bus metadata reflects the order events were produced, not the order per-edge buffers later drain.

When syncProp changes to false, the one global bufferWhileBy flushes all queued envelopes as a single FIFO. Each envelope is emitted into its own edge postActivation bus, then continues through the existing status/error handling, common-bus metadata preservation, transformations, and target input link.

Legacy note: in original version of activation, each edge had its own bufferWhileBy(syncProp). Opening the gate flushed each edge independently, so cross-edge ordering could become depth-first by wire rather than FIFO by production time. The integration spec covers the reproduced array → flatten → fork → combine-by graph in both exported edge order and combine-edge-first order; both must produce combine-by log values [1, 2].