1use aether_ast::{
2 AttributeId, Datom, DatomProvenance, ElementId, EntityId, OperationKind, ReplicaId, Value,
3};
4
5pub const COORDINATION_PILOT_PRE_HEARTBEAT_ELEMENT: u64 = 5;
6pub const COORDINATION_PILOT_AUTHORIZED_AS_OF_ELEMENT: u64 = 9;
7
8pub fn coordination_pilot_dsl(view: &str, query_body: &str) -> String {
9 format!(
10 r#"
11schema v1 {{
12 attr task.depends_on: RefSet<Entity>
13 attr task.status: ScalarLWW<String>
14 attr task.claimed_by: ScalarLWW<String>
15 attr task.lease_epoch: ScalarLWW<U64>
16 attr task.lease_state: ScalarLWW<String>
17 attr heartbeat.task: RefScalar<Entity>
18 attr heartbeat.worker: ScalarLWW<String>
19 attr heartbeat.epoch: ScalarLWW<U64>
20 attr heartbeat.at: ScalarLWW<U64>
21 attr outcome.task: RefScalar<Entity>
22 attr outcome.worker: ScalarLWW<String>
23 attr outcome.epoch: ScalarLWW<U64>
24 attr outcome.status: ScalarLWW<String>
25 attr outcome.detail: ScalarLWW<String>
26}}
27
28predicates {{
29 task(Entity)
30 worker(String)
31 worker_capability(String, String)
32 task_depends_on(Entity, Entity)
33 task_status(Entity, String)
34 task_claimed_by(Entity, String)
35 task_lease_epoch(Entity, U64)
36 task_lease_state(Entity, String)
37 heartbeat_task(Entity, Entity)
38 heartbeat_worker(Entity, String)
39 heartbeat_epoch(Entity, U64)
40 heartbeat_at(Entity, U64)
41 outcome_task(Entity, Entity)
42 outcome_worker(Entity, String)
43 outcome_epoch(Entity, U64)
44 outcome_status(Entity, String)
45 outcome_detail(Entity, String)
46 task_complete(Entity)
47 dependency_blocked(Entity)
48 lease_active(Entity, String, U64)
49 lease_heartbeat(Entity, String, U64, U64)
50 live_authority(Entity, String, U64, U64)
51 active_claim(Entity)
52 task_ready(Entity)
53 worker_can_claim(Entity, String)
54 execution_authorized(Entity, String, U64)
55 execution_outcome_recorded(Entity, String, U64, String, String)
56 execution_outcome_accepted(Entity, String, U64, String, String)
57 execution_outcome_rejected_stale(Entity, String, U64, String, String)
58}}
59
60facts {{
61 task(entity(1))
62 task(entity(2))
63 task(entity(3))
64 worker("worker-a")
65 worker("worker-b")
66 worker_capability("worker-a", "executor")
67 worker_capability("worker-b", "executor")
68}}
69
70rules {{
71 task_complete(t) <- task_status(t, "done")
72 dependency_blocked(t) <- task_depends_on(t, dep), not task_complete(dep)
73 lease_active(t, w, epoch) <- task_claimed_by(t, w), task_lease_epoch(t, epoch), task_lease_state(t, "active")
74 lease_heartbeat(t, w, epoch, beat) <- heartbeat_task(h, t), heartbeat_worker(h, w), heartbeat_epoch(h, epoch), heartbeat_at(h, beat)
75 live_authority(t, w, epoch, beat) <- lease_active(t, w, epoch), lease_heartbeat(t, w, epoch, beat)
76 active_claim(t) <- live_authority(t, w, epoch, beat)
77 task_ready(t) <- task(t), not task_complete(t), not dependency_blocked(t), not active_claim(t)
78 worker_can_claim(t, w) <- task_ready(t), worker(w), worker_capability(w, "executor")
79 execution_authorized(t, w, epoch) <- live_authority(t, w, epoch, beat)
80 execution_outcome_recorded(t, w, epoch, status, detail) <- outcome_task(o, t), outcome_worker(o, w), outcome_epoch(o, epoch), outcome_status(o, status), outcome_detail(o, detail)
81 execution_outcome_accepted(t, w, epoch, status, detail) <- execution_outcome_recorded(t, w, epoch, status, detail), execution_authorized(t, w, epoch)
82 execution_outcome_rejected_stale(t, w, epoch, status, detail) <- execution_outcome_recorded(t, w, epoch, status, detail), not execution_authorized(t, w, epoch)
83 task_complete(t) <- execution_outcome_accepted(t, w, epoch, "completed", detail)
84}}
85
86materialize {{
87 lease_heartbeat
88 live_authority
89 task_ready
90 worker_can_claim
91 execution_authorized
92 execution_outcome_recorded
93 execution_outcome_accepted
94 execution_outcome_rejected_stale
95}}
96
97query {{
98 {view}
99 {query_body}
100}}
101"#
102 )
103}
104
105pub fn coordination_pilot_seed_history() -> Vec<Datom> {
106 vec![
107 dependency_datom(1, 2, 1),
108 datom(
109 EntityId::new(2),
110 AttributeId::new(2),
111 Value::String("done".into()),
112 OperationKind::Assert,
113 2,
114 ),
115 datom(
116 EntityId::new(1),
117 AttributeId::new(3),
118 Value::String("worker-a".into()),
119 OperationKind::Claim,
120 3,
121 ),
122 datom(
123 EntityId::new(1),
124 AttributeId::new(4),
125 Value::U64(1),
126 OperationKind::LeaseOpen,
127 4,
128 ),
129 datom(
130 EntityId::new(1),
131 AttributeId::new(5),
132 Value::String("active".into()),
133 OperationKind::LeaseOpen,
134 5,
135 ),
136 heartbeat_entity_datom(1001, 6, 1, 6),
137 heartbeat_string_datom(1001, 7, "worker-a", 7),
138 heartbeat_datum_u64(1001, 8, 1, 8),
139 heartbeat_datum_u64(1001, 9, 100, 9),
140 datom(
141 EntityId::new(1),
142 AttributeId::new(3),
143 Value::String("worker-b".into()),
144 OperationKind::Claim,
145 10,
146 ),
147 datom(
148 EntityId::new(1),
149 AttributeId::new(4),
150 Value::U64(2),
151 OperationKind::LeaseRenew,
152 11,
153 ),
154 heartbeat_entity_datom(1002, 6, 1, 12),
155 heartbeat_string_datom(1002, 7, "worker-b", 13),
156 heartbeat_datum_u64(1002, 8, 2, 14),
157 heartbeat_datum_u64(1002, 9, 200, 15),
158 outcome_entity_datom(2001, 10, 1, 16),
159 outcome_string_datom(2001, 11, "worker-a", 17),
160 outcome_datum_u64(2001, 12, 1, 18),
161 outcome_string_datom(2001, 13, "completed", 19),
162 outcome_string_datom(2001, 14, "stale-worker-a", 20),
163 outcome_entity_datom(2002, 10, 1, 21),
164 outcome_string_datom(2002, 11, "worker-b", 22),
165 outcome_datum_u64(2002, 12, 2, 23),
166 outcome_string_datom(2002, 13, "completed", 24),
167 outcome_string_datom(2002, 14, "current-worker-b", 25),
168 ]
169}
170
171fn dependency_datom(entity: u64, value: u64, element: u64) -> Datom {
172 datom(
173 EntityId::new(entity),
174 AttributeId::new(1),
175 Value::Entity(EntityId::new(value)),
176 OperationKind::Add,
177 element,
178 )
179}
180
181fn heartbeat_entity_datom(entity: u64, attribute: u64, value: u64, element: u64) -> Datom {
182 datom(
183 EntityId::new(entity),
184 AttributeId::new(attribute),
185 Value::Entity(EntityId::new(value)),
186 OperationKind::LeaseRenew,
187 element,
188 )
189}
190
191fn heartbeat_string_datom(entity: u64, attribute: u64, value: &str, element: u64) -> Datom {
192 datom(
193 EntityId::new(entity),
194 AttributeId::new(attribute),
195 Value::String(value.into()),
196 OperationKind::LeaseRenew,
197 element,
198 )
199}
200
201fn heartbeat_datum_u64(entity: u64, attribute: u64, value: u64, element: u64) -> Datom {
202 datom(
203 EntityId::new(entity),
204 AttributeId::new(attribute),
205 Value::U64(value),
206 OperationKind::LeaseRenew,
207 element,
208 )
209}
210
211fn outcome_entity_datom(entity: u64, attribute: u64, value: u64, element: u64) -> Datom {
212 datom(
213 EntityId::new(entity),
214 AttributeId::new(attribute),
215 Value::Entity(EntityId::new(value)),
216 OperationKind::Annotate,
217 element,
218 )
219}
220
221fn outcome_string_datom(entity: u64, attribute: u64, value: &str, element: u64) -> Datom {
222 datom(
223 EntityId::new(entity),
224 AttributeId::new(attribute),
225 Value::String(value.into()),
226 OperationKind::Annotate,
227 element,
228 )
229}
230
231fn outcome_datum_u64(entity: u64, attribute: u64, value: u64, element: u64) -> Datom {
232 datom(
233 EntityId::new(entity),
234 AttributeId::new(attribute),
235 Value::U64(value),
236 OperationKind::Annotate,
237 element,
238 )
239}
240
241fn datom(
242 entity: EntityId,
243 attribute: AttributeId,
244 value: Value,
245 op: OperationKind,
246 element: u64,
247) -> Datom {
248 Datom {
249 entity,
250 attribute,
251 value,
252 op,
253 element: ElementId::new(element),
254 replica: ReplicaId::new(1),
255 causal_context: Default::default(),
256 provenance: DatomProvenance::default(),
257 policy: None,
258 }
259}