aether_api/
pilot.rs

1use aether_ast::{
2    AttributeId, Datom, DatomProvenance, ElementId, EntityId, OperationKind, ReplicaId, Value,
3};
4
5pub const COORDINATION_PILOT_PRE_HEARTBEAT_ELEMENT: u64 = 5;
6pub const COORDINATION_PILOT_AUTHORIZED_AS_OF_ELEMENT: u64 = 9;
7
8pub fn coordination_pilot_dsl(view: &str, query_body: &str) -> String {
9    format!(
10        r#"
11schema v1 {{
12  attr task.depends_on: RefSet<Entity>
13  attr task.status: ScalarLWW<String>
14  attr task.claimed_by: ScalarLWW<String>
15  attr task.lease_epoch: ScalarLWW<U64>
16  attr task.lease_state: ScalarLWW<String>
17  attr heartbeat.task: RefScalar<Entity>
18  attr heartbeat.worker: ScalarLWW<String>
19  attr heartbeat.epoch: ScalarLWW<U64>
20  attr heartbeat.at: ScalarLWW<U64>
21  attr outcome.task: RefScalar<Entity>
22  attr outcome.worker: ScalarLWW<String>
23  attr outcome.epoch: ScalarLWW<U64>
24  attr outcome.status: ScalarLWW<String>
25  attr outcome.detail: ScalarLWW<String>
26}}
27
28predicates {{
29  task(Entity)
30  worker(String)
31  worker_capability(String, String)
32  task_depends_on(Entity, Entity)
33  task_status(Entity, String)
34  task_claimed_by(Entity, String)
35  task_lease_epoch(Entity, U64)
36  task_lease_state(Entity, String)
37  heartbeat_task(Entity, Entity)
38  heartbeat_worker(Entity, String)
39  heartbeat_epoch(Entity, U64)
40  heartbeat_at(Entity, U64)
41  outcome_task(Entity, Entity)
42  outcome_worker(Entity, String)
43  outcome_epoch(Entity, U64)
44  outcome_status(Entity, String)
45  outcome_detail(Entity, String)
46  task_complete(Entity)
47  dependency_blocked(Entity)
48  lease_active(Entity, String, U64)
49  lease_heartbeat(Entity, String, U64, U64)
50  live_authority(Entity, String, U64, U64)
51  active_claim(Entity)
52  task_ready(Entity)
53  worker_can_claim(Entity, String)
54  execution_authorized(Entity, String, U64)
55  execution_outcome_recorded(Entity, String, U64, String, String)
56  execution_outcome_accepted(Entity, String, U64, String, String)
57  execution_outcome_rejected_stale(Entity, String, U64, String, String)
58}}
59
60facts {{
61  task(entity(1))
62  task(entity(2))
63  task(entity(3))
64  worker("worker-a")
65  worker("worker-b")
66  worker_capability("worker-a", "executor")
67  worker_capability("worker-b", "executor")
68}}
69
70rules {{
71  task_complete(t) <- task_status(t, "done")
72  dependency_blocked(t) <- task_depends_on(t, dep), not task_complete(dep)
73  lease_active(t, w, epoch) <- task_claimed_by(t, w), task_lease_epoch(t, epoch), task_lease_state(t, "active")
74  lease_heartbeat(t, w, epoch, beat) <- heartbeat_task(h, t), heartbeat_worker(h, w), heartbeat_epoch(h, epoch), heartbeat_at(h, beat)
75  live_authority(t, w, epoch, beat) <- lease_active(t, w, epoch), lease_heartbeat(t, w, epoch, beat)
76  active_claim(t) <- live_authority(t, w, epoch, beat)
77  task_ready(t) <- task(t), not task_complete(t), not dependency_blocked(t), not active_claim(t)
78  worker_can_claim(t, w) <- task_ready(t), worker(w), worker_capability(w, "executor")
79  execution_authorized(t, w, epoch) <- live_authority(t, w, epoch, beat)
80  execution_outcome_recorded(t, w, epoch, status, detail) <- outcome_task(o, t), outcome_worker(o, w), outcome_epoch(o, epoch), outcome_status(o, status), outcome_detail(o, detail)
81  execution_outcome_accepted(t, w, epoch, status, detail) <- execution_outcome_recorded(t, w, epoch, status, detail), execution_authorized(t, w, epoch)
82  execution_outcome_rejected_stale(t, w, epoch, status, detail) <- execution_outcome_recorded(t, w, epoch, status, detail), not execution_authorized(t, w, epoch)
83  task_complete(t) <- execution_outcome_accepted(t, w, epoch, "completed", detail)
84}}
85
86materialize {{
87  lease_heartbeat
88  live_authority
89  task_ready
90  worker_can_claim
91  execution_authorized
92  execution_outcome_recorded
93  execution_outcome_accepted
94  execution_outcome_rejected_stale
95}}
96
97query {{
98  {view}
99  {query_body}
100}}
101"#
102    )
103}
104
105pub fn coordination_pilot_seed_history() -> Vec<Datom> {
106    vec![
107        dependency_datom(1, 2, 1),
108        datom(
109            EntityId::new(2),
110            AttributeId::new(2),
111            Value::String("done".into()),
112            OperationKind::Assert,
113            2,
114        ),
115        datom(
116            EntityId::new(1),
117            AttributeId::new(3),
118            Value::String("worker-a".into()),
119            OperationKind::Claim,
120            3,
121        ),
122        datom(
123            EntityId::new(1),
124            AttributeId::new(4),
125            Value::U64(1),
126            OperationKind::LeaseOpen,
127            4,
128        ),
129        datom(
130            EntityId::new(1),
131            AttributeId::new(5),
132            Value::String("active".into()),
133            OperationKind::LeaseOpen,
134            5,
135        ),
136        heartbeat_entity_datom(1001, 6, 1, 6),
137        heartbeat_string_datom(1001, 7, "worker-a", 7),
138        heartbeat_datum_u64(1001, 8, 1, 8),
139        heartbeat_datum_u64(1001, 9, 100, 9),
140        datom(
141            EntityId::new(1),
142            AttributeId::new(3),
143            Value::String("worker-b".into()),
144            OperationKind::Claim,
145            10,
146        ),
147        datom(
148            EntityId::new(1),
149            AttributeId::new(4),
150            Value::U64(2),
151            OperationKind::LeaseRenew,
152            11,
153        ),
154        heartbeat_entity_datom(1002, 6, 1, 12),
155        heartbeat_string_datom(1002, 7, "worker-b", 13),
156        heartbeat_datum_u64(1002, 8, 2, 14),
157        heartbeat_datum_u64(1002, 9, 200, 15),
158        outcome_entity_datom(2001, 10, 1, 16),
159        outcome_string_datom(2001, 11, "worker-a", 17),
160        outcome_datum_u64(2001, 12, 1, 18),
161        outcome_string_datom(2001, 13, "completed", 19),
162        outcome_string_datom(2001, 14, "stale-worker-a", 20),
163        outcome_entity_datom(2002, 10, 1, 21),
164        outcome_string_datom(2002, 11, "worker-b", 22),
165        outcome_datum_u64(2002, 12, 2, 23),
166        outcome_string_datom(2002, 13, "completed", 24),
167        outcome_string_datom(2002, 14, "current-worker-b", 25),
168    ]
169}
170
171fn dependency_datom(entity: u64, value: u64, element: u64) -> Datom {
172    datom(
173        EntityId::new(entity),
174        AttributeId::new(1),
175        Value::Entity(EntityId::new(value)),
176        OperationKind::Add,
177        element,
178    )
179}
180
181fn heartbeat_entity_datom(entity: u64, attribute: u64, value: u64, element: u64) -> Datom {
182    datom(
183        EntityId::new(entity),
184        AttributeId::new(attribute),
185        Value::Entity(EntityId::new(value)),
186        OperationKind::LeaseRenew,
187        element,
188    )
189}
190
191fn heartbeat_string_datom(entity: u64, attribute: u64, value: &str, element: u64) -> Datom {
192    datom(
193        EntityId::new(entity),
194        AttributeId::new(attribute),
195        Value::String(value.into()),
196        OperationKind::LeaseRenew,
197        element,
198    )
199}
200
201fn heartbeat_datum_u64(entity: u64, attribute: u64, value: u64, element: u64) -> Datom {
202    datom(
203        EntityId::new(entity),
204        AttributeId::new(attribute),
205        Value::U64(value),
206        OperationKind::LeaseRenew,
207        element,
208    )
209}
210
211fn outcome_entity_datom(entity: u64, attribute: u64, value: u64, element: u64) -> Datom {
212    datom(
213        EntityId::new(entity),
214        AttributeId::new(attribute),
215        Value::Entity(EntityId::new(value)),
216        OperationKind::Annotate,
217        element,
218    )
219}
220
221fn outcome_string_datom(entity: u64, attribute: u64, value: &str, element: u64) -> Datom {
222    datom(
223        EntityId::new(entity),
224        AttributeId::new(attribute),
225        Value::String(value.into()),
226        OperationKind::Annotate,
227        element,
228    )
229}
230
231fn outcome_datum_u64(entity: u64, attribute: u64, value: u64, element: u64) -> Datom {
232    datom(
233        EntityId::new(entity),
234        AttributeId::new(attribute),
235        Value::U64(value),
236        OperationKind::Annotate,
237        element,
238    )
239}
240
241fn datom(
242    entity: EntityId,
243    attribute: AttributeId,
244    value: Value,
245    op: OperationKind,
246    element: u64,
247) -> Datom {
248    Datom {
249        entity,
250        attribute,
251        value,
252        op,
253        element: ElementId::new(element),
254        replica: ReplicaId::new(1),
255        causal_context: Default::default(),
256        provenance: DatomProvenance::default(),
257        policy: None,
258    }
259}