Coordinator
1. Coordination lifecycle (events)
| Event | Initiator | When |
|---|---|---|
ALL_SERVICES_READY | services-manager | Once at bootstrap, after all services attach (services.js) |
BEFORE/AFTER_COMPONENTS_* | dataspace | Per app load / unload |
START/FINISH_BATCH_UNREGISTER | dataspace | Inside unload() during app load swap and full teardown |
COMPONENT_REGISTER | Each component with WithCoordinationHelper | one emit per instance on loadComponents |
COMPONENT_UNREGISTER | Each component | unregisterStreams / before('teardown') |
ALL_COMPONENTS_STREAMS_PLUGGED | Coordinator | After component plug task opens the activation gate |
sequenceDiagram
participant SM as services-manager
participant DS as dataspace
participant CO as Coordinator
participant S as Service
participant C as Component
Note over SM,S: Services graph (once, before any app load)
SM->>CO: ALL_SERVICES_READY
CO->>CO: startBatchOperation
CO->>CO: servicesReady plug task
CO->>CO: setup globalActivationIngress (syncProp true)
Note over CO,S: plug task — per target, per batch
CO->>S: RUNTIME_CHANNELS_INFO
opt removed edges only
CO->>S: UNBIND_CHANNEL / UNBIND_GROUP_CHANNEL
end
Note over CO: commit unplug, create edge links (events enqueue globally)
CO->>S: BIND_CHANNEL / BIND_GROUP_CHANNEL
Note over S: bindChannel (handler)
CO->>S: ALL_SERVICES_STREAMS_CONNECTED
Note over S: servicesStreamsConnected
CO->>S: ALL_SERVICES_STREAMS_ACTIVATED
Note over S: servicesStreamsActivated
CO->>S: OLD_LIFECYCLE
Note over S: disconnectStreams → streamsConnected → streamsActivated
Note over CO: syncProp false → global activation flush
CO->>CO: completeBatchOperation
Note over DS,C: Component graph (per app load)
DS->>CO: BEFORE_COMPONENTS_LOAD
DS->>CO: BEFORE_COMPONENTS_UNLOAD
DS->>CO: START_BATCH_UNREGISTER
CO->>CO: startBatchOperation
par each torn-down instance
C->>CO: COMPONENT_UNREGISTER
CO->>C: UNBIND_CHANNEL (edges for that node)
end
DS->>CO: FINISH_BATCH_UNREGISTER
Note over CO: completeBatchOperation → physical unplug
CO->>CO: completeBatchOperation
DS->>CO: AFTER_COMPONENTS_UNLOAD
Note over DS,C: loadComponents — each new instance
par each new instance
C->>C: initialize
C->>C: registerStreams (+ after registerStreams)
C->>CO: COMPONENT_REGISTER
end
DS->>CO: AFTER_COMPONENTS_LOAD
Note over CO: loadingTracker = 0
CO->>CO: startBatchOperation
CO->>CO: componentsReady plug task
CO->>CO: setup globalActivationIngress (syncProp true)
Note over CO,C: plug task — per target, per batch
CO->>C: RUNTIME_CHANNELS_INFO
opt removed edges only (map diff)
CO->>C: UNBIND_CHANNEL / UNBIND_GROUP_CHANNEL
end
Note over CO: commit unplug (diff), create edge links (events enqueue globally)
CO->>C: BIND_CHANNEL / BIND_GROUP_CHANNEL
Note over C: bindChannel (handler)
CO->>C: OLD_LIFECYCLE
Note over C: disconnectStreams → streamsConnected → streamsActivated
Note over CO: syncProp false → global activation flush
CO->>CO: ALL_COMPONENTS_STREAMS_PLUGGED
CO->>CO: completeBatchOperation
Note over DS,C: App unload
DS->>CO: BEFORE_COMPONENTS_UNLOAD
DS->>CO: START_BATCH_UNREGISTER
par each instance
C->>CO: COMPONENT_UNREGISTER
CO->>C: UNBIND_CHANNEL (edges for that node)
end
DS->>CO: FINISH_BATCH_UNREGISTER
Note over CO: completeBatchOperation → physical unplug
DS->>CO: AFTER_COMPONENTS_UNLOAD
Wire buffering: events produced during BIND / activation hooks are enqueued into one plug-task activation buffer and released only after syncProp false (See 5).
2. Streaming lifecycles (component API)
Coordinator actions (BIND, OLD_LIFECYCLE, …) are distinct from Dive hooks (registerStreams, streamsConnected, …). Hooks run inside the component when coordination messages arrive.
Component Sequence (one instance)
| Phase | Where | What runs |
|---|---|---|
| Initialize | loadComponents | initialize → registerStreams() (declares ports / channel IDs; after('registerStreams') on the component) → COMPONENT_REGISTER |
| Unregister | COMPONENT_UNREGISTER | unregisterStreams → COMPONENT_UNREGISTER; coordinator UNBIND_* |
| Plug batch | Coordinator → Component | runtimeChannelsInfo → unbindChannel (if any) → wire commit → bindChannel → v1: OLD_LIFECYCLE hooks or v2-only logic in bindChannel |
| Services | Services graph plug | Same through bindChannel, then servicesStreamsConnected ← ALL_SERVICES_STREAMS_CONNECTED, servicesStreamsActivated ← ALL_SERVICES_STREAMS_ACTIVATED, then OLD_LIFECYCLE v1 chain |
| Data emit | After syncProp = false | Buffered wire events drain globally in FIFO order (See 5); safe to emit downstream on v1 after streamsActivated |
NOTE: OLD_LIFECYCLE handler (with_coordination_helper.js) always calls three V1 methods in one synchronous step when the component defines them.
Old vs New API
Old API (WithStreams) | New API (WithStreamsV2 only) | |
|---|---|---|
| Mixins | WithStreamsV2 + WithStreamsV2toV1 | WithStreamsV2 only |
registerStreams | Base map + v2toV1 before creates local bus per port | Registers channel IDs only |
| Wire attach | Coordinator bindChannel → plug link into local bus | bindChannel plugs into link directly |
| Activation | streamsConnected / streamsActivated via OLD_LIFECYCLE | Often only bindChannel / runtimeChannelsInfo |
| Typical emit | after('streamsActivated') or handler body | bindChannel / constant on link |
sequenceDiagram
participant CO as Coordinator
participant C as Component
Note over C: instance init (before plug)
C->>C: initialize
C->>C: registerStreams
Note over C: registerInputStream / registerOutputStream
Note over CO,C: plug batch (See 1)
CO->>C: RUNTIME_CHANNELS_INFO
Note over C: runtimeChannelsInfo
CO->>C: BIND_CHANNEL
Note over C: bindChannel
CO->>C: OLD_LIFECYCLE
Note over C: disconnectStreams
Note over C: streamsConnected
Note over C: streamsActivated
Note over CO: syncProp false — global FIFO drain
flowchart LR
subgraph init["Instance init"]
I[initialize]
RS[registerStreams]
I --> RS
end
subgraph plug["Plug batch (v1 WithStreams)"]
RT[runtimeChannelsInfo]
BC[bindChannel]
SC[streamsConnected]
SA[streamsActivated]
RT --> BC --> SC --> SA
end
init --> plug
NOTE: Graph delivery is gated by coordinator buffering (See 5), not by blocking component handlers.
3. Mixins
WithStreamsV2 — Channel API: registerInputStream / registerOutputStream, bindChannel / unbindChannel, runtimeChannelsInfo. Coordinator owns wire streams (aka links).
WithStreamsV2toV1 — Bridge for legacy components: creates a local bus per port, maps V1-style getInputStream / getOutputStream onto those buses, plugs coordinator link in bindChannel. NOT a separate lifecycle, but adapter on top of V2.
WithStreams — WithStreamsV2 + WithStreamsV2toV1. Default for most data components.
WithCoordinationHelper — Subscribes to coordination stream; dispatches BIND_CHANNEL, OLD_LIFECYCLE (→ v1 hooks if defined), ALL_SERVICES_STREAMS_*, etc.
WithStreamStatus — Wire error/status types on streams.
4. Component examples
| Component | API | Lifecycle events used | Ports / emit |
|---|---|---|---|
| constant (array) | New (WithStreamsV2) | runtimeChannelsInfo, bindChannel | On bindChannel output: Kefir.constant([1,2]) if no trigger; ignores streamsActivated |
| combine-by | Old (WithStreams) | streamsActivated arms combiner | Inputs via graph after flush; peek reads live p1 buffer |
| crossfilter-aggregation | Mixed (V2 + V2toV1) | streamsActivated / streamsConnected emit | V2 channels + v1 buses; emit after activation hooks |
5. Stream plug sequence and global event ordering
The coordinator has two ordering concerns during componentsReady / servicesReady:
- Coordination actions must make components see a consistent topology: runtime info and removed-edge unbinds first, physical unplug, new-edge binds, activation hooks.
- Data events produced by those binds and hooks must not reach downstream components until every updated component has received its activation actions.
Plug task order
Each plug task creates a fresh syncProp initialized to true, then calls _setupGlobalActivationBuffer(syncProp) before any edge creates a buffered stream. The task then:
- Builds runtime-channel info and diffs the previous coordination map against the new map.
- Sends runtime info plus target-side unbinds, then source-side unbinds, then other unbind actions.
- Calls
_commitUnplug()so old physical Kefir plugs are removed after components were told to detach. - Creates new edge links with
_plugSimpleStream/_plugGroupStream; each edge gets a source-sidetargetbus, a target-sidesourcepool, abufferedTarget, optional transformations, andBIND_*actions. - Waits for transformation attachment promises, appends service activation actions or component
OLD_LIFECYCLE, updates component status, and sends the bind/activation batch. - Opens the activation gate with
sync.plug(Kefir.constant(false)). Component plug tasks then broadcastALL_COMPONENTS_STREAMS_PLUGGED.
orderedEdgeIds still controls edge creation order for component graphs, but the activation buffer means startup data delivery no longer depends on that order.
Event pipeline
_getBufferedStream now splits an edge ingress into three stages:
flowchart LR
OUT[component/service output link] --> TI[targetIngress]
TI -->|assign global index| G[globalActivationIngress]
G --> BUF[bufferWhileBy syncProp]
BUF -->|FIFO drain| PA[edge postActivation bus]
PA --> BS[bufferedTarget status + metadata]
BS --> TR[wire transformations]
TR --> IN[target input link]
While syncProp is true, every value/error/end event from every newly plugged edge is wrapped as { deliveryStream, event } and pushed into the single globalActivationIngress. GLOBAL_EVENT_NUMBER is assigned before enqueue, so debugger/common-bus metadata reflects the order events were produced, not the order per-edge buffers later drain.
When syncProp changes to false, the one global bufferWhileBy flushes all queued envelopes as a single FIFO. Each envelope is emitted into its own edge postActivation bus, then continues through the existing status/error handling, common-bus metadata preservation, transformations, and target input link.
Legacy note: in original version of activation, each edge had its own bufferWhileBy(syncProp). Opening the gate flushed each edge independently, so cross-edge ordering could become depth-first by wire rather than FIFO by production time. The integration spec covers the reproduced array → flatten → fork → combine-by graph in both exported edge order and combine-edge-first order; both must produce combine-by log values [1, 2].