1use crate::{
2 pilot::{
3 coordination_pilot_dsl, COORDINATION_PILOT_AUTHORIZED_AS_OF_ELEMENT,
4 COORDINATION_PILOT_PRE_HEARTBEAT_ELEMENT,
5 },
6 ApiError, ExplainTupleRequest, HistoryRequest, KernelService, RunDocumentRequest,
7};
8use aether_ast::{ElementId, PolicyContext, QueryRow, TupleId, Value};
9use aether_resolver::ResolveError;
10use serde::{Deserialize, Serialize};
11use std::collections::{BTreeMap, VecDeque};
12use std::fmt::Write as _;
13use std::time::{SystemTime, UNIX_EPOCH};
14
15#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
16pub struct CoordinationPilotReport {
17 pub generated_at_ms: u64,
18 pub policy_context: Option<PolicyContext>,
19 pub history_len: usize,
20 pub pre_heartbeat_authorized: Vec<ReportRow>,
21 pub as_of_authorized: Vec<ReportRow>,
22 pub live_heartbeats: Vec<ReportRow>,
23 pub current_authorized: Vec<ReportRow>,
24 pub claimable: Vec<ReportRow>,
25 pub accepted_outcomes: Vec<ReportRow>,
26 pub rejected_outcomes: Vec<ReportRow>,
27 pub trace: Option<TraceSummary>,
28}
29
30#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
31pub struct ReportRow {
32 pub tuple_id: Option<TupleId>,
33 pub values: Vec<Value>,
34}
35
36#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
37pub struct TraceSummary {
38 pub root: TupleId,
39 pub tuple_count: usize,
40 pub tuples: Vec<TraceTupleSummary>,
41}
42
43#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
44pub struct TraceTupleSummary {
45 pub tuple_id: TupleId,
46 pub values: Vec<Value>,
47 pub iteration: usize,
48 pub source_datom_ids: Vec<ElementId>,
49 pub parent_tuple_ids: Vec<TupleId>,
50}
51
52#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
53#[serde(tag = "kind", rename_all = "snake_case")]
54pub enum CoordinationCut {
55 #[default]
56 Current,
57 AsOf {
58 element: ElementId,
59 },
60}
61
62impl CoordinationCut {
63 fn view_label(&self) -> String {
64 match self {
65 Self::Current => "current".into(),
66 Self::AsOf { element } => format!("as_of e{}", element.0),
67 }
68 }
69
70 fn human_label(&self) -> String {
71 match self {
72 Self::Current => "Current".into(),
73 Self::AsOf { element } => format!("AsOf(e{})", element.0),
74 }
75 }
76}
77
78#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
79pub struct CoordinationDeltaReportRequest {
80 #[serde(default)]
81 pub left: CoordinationCut,
82 #[serde(default)]
83 pub right: CoordinationCut,
84 #[serde(default, skip_serializing_if = "Option::is_none")]
85 pub policy_context: Option<PolicyContext>,
86}
87
88#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
89pub struct CoordinationTraceHandle {
90 pub tuple_id: TupleId,
91 pub tuple_count: usize,
92 pub source_datom_ids: Vec<ElementId>,
93 pub parent_tuple_ids: Vec<TupleId>,
94}
95
96#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
97pub struct ReportRowDiff {
98 pub row: ReportRow,
99 #[serde(default, skip_serializing_if = "Option::is_none")]
100 pub trace: Option<CoordinationTraceHandle>,
101}
102
103#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
104pub struct ReportRowChange {
105 pub before: ReportRow,
106 pub after: ReportRow,
107 #[serde(default, skip_serializing_if = "Option::is_none")]
108 pub before_trace: Option<CoordinationTraceHandle>,
109 #[serde(default, skip_serializing_if = "Option::is_none")]
110 pub after_trace: Option<CoordinationTraceHandle>,
111}
112
113#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
114pub struct ReportSectionDelta {
115 pub added: Vec<ReportRowDiff>,
116 pub removed: Vec<ReportRowDiff>,
117 pub changed: Vec<ReportRowChange>,
118}
119
120#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
121pub struct CoordinationDeltaReport {
122 pub generated_at_ms: u64,
123 pub left: CoordinationCut,
124 pub right: CoordinationCut,
125 #[serde(default, skip_serializing_if = "Option::is_none")]
126 pub policy_context: Option<PolicyContext>,
127 pub left_history_len: usize,
128 pub right_history_len: usize,
129 pub current_authorized: ReportSectionDelta,
130 pub claimable: ReportSectionDelta,
131 pub live_heartbeats: ReportSectionDelta,
132 pub accepted_outcomes: ReportSectionDelta,
133 pub rejected_outcomes: ReportSectionDelta,
134}
135
136pub fn build_coordination_pilot_report(
137 service: &mut impl KernelService,
138) -> Result<CoordinationPilotReport, ApiError> {
139 build_coordination_pilot_report_with_policy(service, None)
140}
141
142pub fn build_coordination_pilot_report_with_policy(
143 service: &mut impl KernelService,
144 policy_context: Option<PolicyContext>,
145) -> Result<CoordinationPilotReport, ApiError> {
146 let history_len = service
147 .history(HistoryRequest {
148 policy_context: policy_context.clone(),
149 })?
150 .datoms
151 .len();
152 let pre_heartbeat_authorized = run_report_query(
153 service,
154 RunDocumentRequest {
155 dsl: coordination_pilot_dsl(
156 &format!("as_of e{}", COORDINATION_PILOT_PRE_HEARTBEAT_ELEMENT),
157 "goal execution_authorized(t, worker, epoch)\n keep t, worker, epoch",
158 ),
159 policy_context: policy_context.clone(),
160 },
161 )?;
162 let as_of_authorized = run_report_query(
163 service,
164 RunDocumentRequest {
165 dsl: coordination_pilot_dsl(
166 &format!("as_of e{}", COORDINATION_PILOT_AUTHORIZED_AS_OF_ELEMENT),
167 "goal execution_authorized(t, worker, epoch)\n keep t, worker, epoch",
168 ),
169 policy_context: policy_context.clone(),
170 },
171 )?;
172 let live_heartbeats = run_report_query(
173 service,
174 RunDocumentRequest {
175 dsl: coordination_pilot_dsl(
176 "current",
177 "goal live_authority(t, worker, epoch, beat)\n keep t, worker, epoch, beat",
178 ),
179 policy_context: policy_context.clone(),
180 },
181 )?;
182 let current_authorized = run_report_query(
183 service,
184 RunDocumentRequest {
185 dsl: coordination_pilot_dsl(
186 "current",
187 "goal execution_authorized(t, worker, epoch)\n keep t, worker, epoch",
188 ),
189 policy_context: policy_context.clone(),
190 },
191 )?;
192 let claimable = run_report_query(
193 service,
194 RunDocumentRequest {
195 dsl: coordination_pilot_dsl(
196 "current",
197 "goal worker_can_claim(t, worker)\n keep t, worker",
198 ),
199 policy_context: policy_context.clone(),
200 },
201 )?;
202 let accepted_outcomes = run_report_query(
203 service,
204 RunDocumentRequest {
205 dsl: coordination_pilot_dsl(
206 "current",
207 "goal execution_outcome_accepted(t, worker, epoch, status, detail)\n keep t, worker, epoch, status, detail",
208 ),
209 policy_context: policy_context.clone(),
210 },
211 )?;
212 let rejected_outcomes = run_report_query(
213 service,
214 RunDocumentRequest {
215 dsl: coordination_pilot_dsl(
216 "current",
217 "goal execution_outcome_rejected_stale(t, worker, epoch, status, detail)\n keep t, worker, epoch, status, detail",
218 ),
219 policy_context: policy_context.clone(),
220 },
221 )?;
222
223 let trace = current_authorized
224 .first()
225 .and_then(|row| row.tuple_id)
226 .map(|tuple_id| -> Result<TraceSummary, ApiError> {
227 let trace = service
228 .explain_tuple(ExplainTupleRequest {
229 tuple_id,
230 policy_context: policy_context.clone(),
231 })?
232 .trace;
233 Ok(TraceSummary {
234 root: trace.root,
235 tuple_count: trace.tuples.len(),
236 tuples: trace
237 .tuples
238 .into_iter()
239 .map(|tuple| TraceTupleSummary {
240 tuple_id: tuple.tuple.id,
241 values: tuple.tuple.values,
242 iteration: tuple.metadata.iteration,
243 source_datom_ids: tuple.metadata.source_datom_ids,
244 parent_tuple_ids: tuple.metadata.parent_tuple_ids,
245 })
246 .collect(),
247 })
248 })
249 .transpose()?;
250
251 Ok(CoordinationPilotReport {
252 generated_at_ms: now_millis(),
253 policy_context,
254 history_len,
255 pre_heartbeat_authorized: into_report_rows(pre_heartbeat_authorized),
256 as_of_authorized: into_report_rows(as_of_authorized),
257 live_heartbeats: into_report_rows(live_heartbeats),
258 current_authorized: into_report_rows(current_authorized),
259 claimable: into_report_rows(claimable),
260 accepted_outcomes: into_report_rows(accepted_outcomes),
261 rejected_outcomes: into_report_rows(rejected_outcomes),
262 trace,
263 })
264}
265
266pub fn build_coordination_delta_report(
267 service: &mut impl KernelService,
268 request: CoordinationDeltaReportRequest,
269) -> Result<CoordinationDeltaReport, ApiError> {
270 let policy_context = request.policy_context.clone();
271 let left = build_coordination_snapshot(service, &request.left, request.policy_context.clone())?;
272 let right =
273 build_coordination_snapshot(service, &request.right, request.policy_context.clone())?;
274
275 Ok(CoordinationDeltaReport {
276 generated_at_ms: now_millis(),
277 left: request.left,
278 right: request.right,
279 policy_context: policy_context.clone(),
280 left_history_len: left.history_len,
281 right_history_len: right.history_len,
282 current_authorized: diff_report_rows(
283 service,
284 left.current_authorized,
285 right.current_authorized,
286 policy_context.as_ref(),
287 )?,
288 claimable: diff_report_rows(
289 service,
290 left.claimable,
291 right.claimable,
292 policy_context.as_ref(),
293 )?,
294 live_heartbeats: diff_report_rows(
295 service,
296 left.live_heartbeats,
297 right.live_heartbeats,
298 policy_context.as_ref(),
299 )?,
300 accepted_outcomes: diff_report_rows(
301 service,
302 left.accepted_outcomes,
303 right.accepted_outcomes,
304 policy_context.as_ref(),
305 )?,
306 rejected_outcomes: diff_report_rows(
307 service,
308 left.rejected_outcomes,
309 right.rejected_outcomes,
310 policy_context.as_ref(),
311 )?,
312 })
313}
314
315pub fn render_coordination_pilot_report_markdown(report: &CoordinationPilotReport) -> String {
316 let mut output = String::new();
317 let _ = writeln!(output, "# AETHER Coordination Pilot Report");
318 let _ = writeln!(output);
319 let _ = writeln!(output, "- Generated at: `{}`", report.generated_at_ms);
320 let _ = writeln!(
321 output,
322 "- Effective policy: `{}`",
323 format_policy_context(report.policy_context.as_ref())
324 );
325 let _ = writeln!(output, "- Journal entries: `{}`", report.history_len);
326 let _ = writeln!(output);
327
328 render_row_section(
329 &mut output,
330 &format!(
331 "Authorization Before Heartbeat At AsOf(e{})",
332 COORDINATION_PILOT_PRE_HEARTBEAT_ELEMENT
333 ),
334 &report.pre_heartbeat_authorized,
335 );
336 render_row_section(
337 &mut output,
338 &format!(
339 "Authorization At AsOf(e{})",
340 COORDINATION_PILOT_AUTHORIZED_AS_OF_ELEMENT
341 ),
342 &report.as_of_authorized,
343 );
344 render_row_section(
345 &mut output,
346 "Current Live Heartbeats",
347 &report.live_heartbeats,
348 );
349 render_row_section(
350 &mut output,
351 "Authorization At Current",
352 &report.current_authorized,
353 );
354 render_row_section(&mut output, "Current Claimable Work", &report.claimable);
355 render_row_section(
356 &mut output,
357 "Current Accepted Outcomes",
358 &report.accepted_outcomes,
359 );
360 render_row_section(
361 &mut output,
362 "Current Rejected Outcomes",
363 &report.rejected_outcomes,
364 );
365
366 let _ = writeln!(output, "## Proof Trace");
367 let _ = writeln!(output);
368 match &report.trace {
369 Some(trace) => {
370 let _ = writeln!(output, "- Root tuple: `{}`", trace.root.0);
371 let _ = writeln!(output, "- Tuples in trace: `{}`", trace.tuple_count);
372 let _ = writeln!(output);
373 for tuple in &trace.tuples {
374 let _ = writeln!(
375 output,
376 "- `t{}` | values `{}` | iteration `{}` | sources `{}` | parents `{}`",
377 tuple.tuple_id.0,
378 format_values(&tuple.values),
379 tuple.iteration,
380 format_element_ids(&tuple.source_datom_ids),
381 format_tuple_ids(&tuple.parent_tuple_ids),
382 );
383 }
384 }
385 None => {
386 let _ = writeln!(
387 output,
388 "No current authorization tuple was available to explain."
389 );
390 }
391 }
392
393 output
394}
395
396pub fn render_coordination_delta_report_markdown(report: &CoordinationDeltaReport) -> String {
397 let mut output = String::new();
398 let _ = writeln!(output, "# AETHER Coordination Delta Report");
399 let _ = writeln!(output);
400 let _ = writeln!(output, "- Generated at: `{}`", report.generated_at_ms);
401 let _ = writeln!(output, "- Left cut: `{}`", report.left.human_label());
402 let _ = writeln!(output, "- Right cut: `{}`", report.right.human_label());
403 let _ = writeln!(
404 output,
405 "- Effective policy: `{}`",
406 format_policy_context(report.policy_context.as_ref())
407 );
408 let _ = writeln!(
409 output,
410 "- Left journal entries: `{}`",
411 report.left_history_len
412 );
413 let _ = writeln!(
414 output,
415 "- Right journal entries: `{}`",
416 report.right_history_len
417 );
418 let _ = writeln!(output);
419
420 render_delta_section(
421 &mut output,
422 "Authorization Delta",
423 &report.current_authorized,
424 );
425 render_delta_section(&mut output, "Claimable Work Delta", &report.claimable);
426 render_delta_section(
427 &mut output,
428 "Live Heartbeats Delta",
429 &report.live_heartbeats,
430 );
431 render_delta_section(
432 &mut output,
433 "Accepted Outcomes Delta",
434 &report.accepted_outcomes,
435 );
436 render_delta_section(
437 &mut output,
438 "Rejected Outcomes Delta",
439 &report.rejected_outcomes,
440 );
441
442 output
443}
444
445fn format_policy_context(policy_context: Option<&PolicyContext>) -> String {
446 match policy_context {
447 None => "public".into(),
448 Some(policy_context) => {
449 let capabilities = if policy_context.capabilities.is_empty() {
450 "-".into()
451 } else {
452 policy_context.capabilities.join(", ")
453 };
454 let visibilities = if policy_context.visibilities.is_empty() {
455 "-".into()
456 } else {
457 policy_context.visibilities.join(", ")
458 };
459 format!("capabilities=[{capabilities}] visibilities=[{visibilities}]")
460 }
461 }
462}
463
464fn into_report_rows(rows: Vec<QueryRow>) -> Vec<ReportRow> {
465 rows.into_iter()
466 .map(|row| ReportRow {
467 tuple_id: row.tuple_id,
468 values: row.values,
469 })
470 .collect()
471}
472
473fn render_row_section(output: &mut String, title: &str, rows: &[ReportRow]) {
474 let _ = writeln!(output, "## {title}");
475 let _ = writeln!(output);
476 if rows.is_empty() {
477 let _ = writeln!(output, "No rows.");
478 let _ = writeln!(output);
479 return;
480 }
481
482 for row in rows {
483 let tuple_id = row
484 .tuple_id
485 .map(|tuple_id| format!("t{}", tuple_id.0))
486 .unwrap_or_else(|| "-".into());
487 let _ = writeln!(
488 output,
489 "- `{}` | `{}`",
490 tuple_id,
491 format_values(&row.values)
492 );
493 }
494 let _ = writeln!(output);
495}
496
497fn render_delta_section(output: &mut String, title: &str, delta: &ReportSectionDelta) {
498 let _ = writeln!(output, "## {title}");
499 let _ = writeln!(output);
500 if delta.added.is_empty() && delta.removed.is_empty() && delta.changed.is_empty() {
501 let _ = writeln!(output, "No changes.");
502 let _ = writeln!(output);
503 return;
504 }
505
506 render_diff_rows(output, "Added", &delta.added);
507 render_diff_rows(output, "Removed", &delta.removed);
508 render_changed_rows(output, &delta.changed);
509}
510
511fn render_diff_rows(output: &mut String, title: &str, rows: &[ReportRowDiff]) {
512 if rows.is_empty() {
513 return;
514 }
515 let _ = writeln!(output, "### {title}");
516 let _ = writeln!(output);
517 for row in rows {
518 let tuple = row
519 .row
520 .tuple_id
521 .map(|tuple_id| format!("t{}", tuple_id.0))
522 .unwrap_or_else(|| "-".into());
523 let _ = writeln!(
524 output,
525 "- `{}` | `{}`",
526 tuple,
527 format_values(&row.row.values)
528 );
529 if let Some(trace) = &row.trace {
530 let _ = writeln!(
531 output,
532 " trace `t{}` | tuples `{}` | sources `{}` | parents `{}`",
533 trace.tuple_id.0,
534 trace.tuple_count,
535 format_element_ids(&trace.source_datom_ids),
536 format_tuple_ids(&trace.parent_tuple_ids),
537 );
538 }
539 }
540 let _ = writeln!(output);
541}
542
543fn render_changed_rows(output: &mut String, rows: &[ReportRowChange]) {
544 if rows.is_empty() {
545 return;
546 }
547 let _ = writeln!(output, "### Changed");
548 let _ = writeln!(output);
549 for row in rows {
550 let _ = writeln!(
551 output,
552 "- before `{}` | after `{}`",
553 format_values(&row.before.values),
554 format_values(&row.after.values),
555 );
556 if let Some(trace) = &row.before_trace {
557 let _ = writeln!(
558 output,
559 " before trace `t{}` | tuples `{}` | sources `{}` | parents `{}`",
560 trace.tuple_id.0,
561 trace.tuple_count,
562 format_element_ids(&trace.source_datom_ids),
563 format_tuple_ids(&trace.parent_tuple_ids),
564 );
565 }
566 if let Some(trace) = &row.after_trace {
567 let _ = writeln!(
568 output,
569 " after trace `t{}` | tuples `{}` | sources `{}` | parents `{}`",
570 trace.tuple_id.0,
571 trace.tuple_count,
572 format_element_ids(&trace.source_datom_ids),
573 format_tuple_ids(&trace.parent_tuple_ids),
574 );
575 }
576 }
577 let _ = writeln!(output);
578}
579
580fn run_report_query(
581 service: &mut impl KernelService,
582 request: RunDocumentRequest,
583) -> Result<Vec<QueryRow>, ApiError> {
584 match service.run_document(request) {
585 Ok(response) => Ok(response.query.unwrap_or_default().rows),
586 Err(ApiError::Resolve(ResolveError::UnknownElementId(_))) => Ok(Vec::new()),
587 Err(error) => Err(error),
588 }
589}
590
591#[derive(Clone, Debug, Default)]
592struct CoordinationSnapshot {
593 history_len: usize,
594 current_authorized: Vec<ReportRow>,
595 claimable: Vec<ReportRow>,
596 live_heartbeats: Vec<ReportRow>,
597 accepted_outcomes: Vec<ReportRow>,
598 rejected_outcomes: Vec<ReportRow>,
599}
600
601fn build_coordination_snapshot(
602 service: &mut impl KernelService,
603 cut: &CoordinationCut,
604 policy_context: Option<PolicyContext>,
605) -> Result<CoordinationSnapshot, ApiError> {
606 let history_len = run_report_history_len(service, cut, policy_context.clone())?;
607 Ok(CoordinationSnapshot {
608 history_len,
609 current_authorized: into_report_rows(run_report_query_for_cut(
610 service,
611 cut,
612 "goal execution_authorized(t, worker, epoch)\n keep t, worker, epoch",
613 policy_context.clone(),
614 )?),
615 claimable: into_report_rows(run_report_query_for_cut(
616 service,
617 cut,
618 "goal worker_can_claim(t, worker)\n keep t, worker",
619 policy_context.clone(),
620 )?),
621 live_heartbeats: into_report_rows(run_report_query_for_cut(
622 service,
623 cut,
624 "goal live_authority(t, worker, epoch, beat)\n keep t, worker, epoch, beat",
625 policy_context.clone(),
626 )?),
627 accepted_outcomes: into_report_rows(run_report_query_for_cut(
628 service,
629 cut,
630 "goal execution_outcome_accepted(t, worker, epoch, status, detail)\n keep t, worker, epoch, status, detail",
631 policy_context.clone(),
632 )?),
633 rejected_outcomes: into_report_rows(run_report_query_for_cut(
634 service,
635 cut,
636 "goal execution_outcome_rejected_stale(t, worker, epoch, status, detail)\n keep t, worker, epoch, status, detail",
637 policy_context,
638 )?),
639 })
640}
641
642fn run_report_history_len(
643 service: &mut impl KernelService,
644 cut: &CoordinationCut,
645 policy_context: Option<PolicyContext>,
646) -> Result<usize, ApiError> {
647 let history = service.history(HistoryRequest {
648 policy_context: policy_context.clone(),
649 })?;
650 match cut {
651 CoordinationCut::Current => Ok(history.datoms.len()),
652 CoordinationCut::AsOf { element } => history
653 .datoms
654 .iter()
655 .position(|datom| datom.element == *element)
656 .map(|index| index + 1)
657 .ok_or_else(|| ApiError::Validation(format!("unknown element {}", element.0))),
658 }
659}
660
661fn run_report_query_for_cut(
662 service: &mut impl KernelService,
663 cut: &CoordinationCut,
664 query_body: &str,
665 policy_context: Option<PolicyContext>,
666) -> Result<Vec<QueryRow>, ApiError> {
667 run_report_query(
668 service,
669 RunDocumentRequest {
670 dsl: coordination_pilot_dsl(&cut.view_label(), query_body),
671 policy_context,
672 },
673 )
674}
675
676fn diff_report_rows(
677 service: &mut impl KernelService,
678 left: Vec<ReportRow>,
679 right: Vec<ReportRow>,
680 policy_context: Option<&PolicyContext>,
681) -> Result<ReportSectionDelta, ApiError> {
682 let mut left_exact = rows_by_signature(left);
683 let mut right_exact = rows_by_signature(right);
684 let exact_keys = left_exact
685 .keys()
686 .filter(|signature| right_exact.contains_key(*signature))
687 .cloned()
688 .collect::<Vec<_>>();
689 for signature in exact_keys {
690 let left_rows = left_exact
691 .get_mut(&signature)
692 .expect("left signature exists");
693 let right_rows = right_exact
694 .get_mut(&signature)
695 .expect("right signature exists");
696 let pair_count = left_rows.len().min(right_rows.len());
697 for _ in 0..pair_count {
698 left_rows.pop_front();
699 right_rows.pop_front();
700 }
701 if left_rows.is_empty() {
702 left_exact.remove(&signature);
703 }
704 if right_rows.is_empty() {
705 right_exact.remove(&signature);
706 }
707 }
708
709 let mut left_grouped = rows_by_primary_key(left_exact);
710 let mut right_grouped = rows_by_primary_key(right_exact);
711 let group_keys = left_grouped
712 .keys()
713 .chain(right_grouped.keys())
714 .cloned()
715 .collect::<std::collections::BTreeSet<_>>();
716
717 let mut added = Vec::new();
718 let mut removed = Vec::new();
719 let mut changed = Vec::new();
720
721 for key in group_keys {
722 let mut left_rows = left_grouped.remove(&key).unwrap_or_default();
723 let mut right_rows = right_grouped.remove(&key).unwrap_or_default();
724 while let (Some(before), Some(after)) = (left_rows.pop_front(), right_rows.pop_front()) {
725 changed.push(ReportRowChange {
726 before_trace: trace_handle_for_row(service, &before, policy_context)?,
727 after_trace: trace_handle_for_row(service, &after, policy_context)?,
728 before,
729 after,
730 });
731 }
732 for row in left_rows {
733 removed.push(ReportRowDiff {
734 trace: trace_handle_for_row(service, &row, policy_context)?,
735 row,
736 });
737 }
738 for row in right_rows {
739 added.push(ReportRowDiff {
740 trace: trace_handle_for_row(service, &row, policy_context)?,
741 row,
742 });
743 }
744 }
745
746 Ok(ReportSectionDelta {
747 added,
748 removed,
749 changed,
750 })
751}
752
753fn rows_by_signature(rows: Vec<ReportRow>) -> BTreeMap<String, VecDeque<ReportRow>> {
754 let mut grouped = BTreeMap::new();
755 for row in rows {
756 grouped
757 .entry(row_signature(&row))
758 .or_insert_with(VecDeque::new)
759 .push_back(row);
760 }
761 grouped
762}
763
764fn rows_by_primary_key(
765 rows: BTreeMap<String, VecDeque<ReportRow>>,
766) -> BTreeMap<String, VecDeque<ReportRow>> {
767 let mut grouped = BTreeMap::new();
768 for (_, mut bucket) in rows {
769 while let Some(row) = bucket.pop_front() {
770 grouped
771 .entry(row_primary_key(&row))
772 .or_insert_with(VecDeque::new)
773 .push_back(row);
774 }
775 }
776 grouped
777}
778
779fn row_signature(row: &ReportRow) -> String {
780 format!(
781 "{}|{}",
782 row.tuple_id
783 .map(|tuple_id| tuple_id.0.to_string())
784 .unwrap_or_else(|| "-".into()),
785 format_values(&row.values)
786 )
787}
788
789fn row_primary_key(row: &ReportRow) -> String {
790 row.values
791 .first()
792 .map(format_value)
793 .or_else(|| row.tuple_id.map(|tuple_id| format!("t{}", tuple_id.0)))
794 .unwrap_or_else(|| "-".into())
795}
796
797fn trace_handle_for_row(
798 service: &mut impl KernelService,
799 row: &ReportRow,
800 policy_context: Option<&PolicyContext>,
801) -> Result<Option<CoordinationTraceHandle>, ApiError> {
802 let Some(tuple_id) = row.tuple_id else {
803 return Ok(None);
804 };
805 let trace = match service.explain_tuple(ExplainTupleRequest {
806 tuple_id,
807 policy_context: policy_context.cloned(),
808 }) {
809 Ok(response) => response.trace,
810 Err(ApiError::Validation(message))
811 if message == "requested tuple is not visible under the current policy" =>
812 {
813 return Ok(None);
814 }
815 Err(error) => return Err(error),
816 };
817 let root = trace
818 .tuples
819 .iter()
820 .find(|tuple| tuple.tuple.id == trace.root)
821 .or_else(|| trace.tuples.first())
822 .ok_or_else(|| ApiError::Validation("empty explain trace".into()))?;
823 Ok(Some(CoordinationTraceHandle {
824 tuple_id: trace.root,
825 tuple_count: trace.tuples.len(),
826 source_datom_ids: root.metadata.source_datom_ids.clone(),
827 parent_tuple_ids: root.metadata.parent_tuple_ids.clone(),
828 }))
829}
830
831fn format_values(values: &[Value]) -> String {
832 values
833 .iter()
834 .map(format_value)
835 .collect::<Vec<_>>()
836 .join(", ")
837}
838
839fn format_value(value: &Value) -> String {
840 match value {
841 Value::Null => "null".into(),
842 Value::Bool(value) => value.to_string(),
843 Value::I64(value) => value.to_string(),
844 Value::U64(value) => value.to_string(),
845 Value::F64(value) => value.to_string(),
846 Value::String(value) => format!("\"{value}\""),
847 Value::Bytes(bytes) => format!("{bytes:?}"),
848 Value::Entity(entity) => format!("entity({})", entity.0),
849 Value::List(values) => format!("[{}]", format_values(values)),
850 }
851}
852
853fn format_element_ids(ids: &[ElementId]) -> String {
854 if ids.is_empty() {
855 return "-".into();
856 }
857 ids.iter()
858 .map(|id| format!("e{}", id.0))
859 .collect::<Vec<_>>()
860 .join(", ")
861}
862
863fn format_tuple_ids(ids: &[TupleId]) -> String {
864 if ids.is_empty() {
865 return "-".into();
866 }
867 ids.iter()
868 .map(|id| format!("t{}", id.0))
869 .collect::<Vec<_>>()
870 .join(", ")
871}
872
873fn now_millis() -> u64 {
874 let duration = SystemTime::now()
875 .duration_since(UNIX_EPOCH)
876 .unwrap_or_default();
877 duration.as_millis().min(u128::from(u64::MAX)) as u64
878}
879
880#[cfg(test)]
881mod tests {
882 use super::{build_coordination_pilot_report, build_coordination_pilot_report_with_policy};
883 use crate::{
884 coordination_pilot_seed_history, AppendRequest, InMemoryKernelService, KernelService,
885 };
886 use aether_ast::{EntityId, PolicyContext, PolicyEnvelope, Value};
887
888 #[test]
889 fn coordination_pilot_report_captures_expected_answers() {
890 let mut service = InMemoryKernelService::new();
891 service
892 .append(AppendRequest {
893 datoms: coordination_pilot_seed_history(),
894 })
895 .expect("append seed history");
896
897 let report =
898 build_coordination_pilot_report(&mut service).expect("build coordination report");
899
900 assert_eq!(report.history_len, 25);
901 assert!(report.pre_heartbeat_authorized.is_empty());
902 assert_eq!(
903 report.as_of_authorized[0].values,
904 vec![
905 Value::Entity(EntityId::new(1)),
906 Value::String("worker-a".into()),
907 Value::U64(1),
908 ]
909 );
910 assert_eq!(
911 report.live_heartbeats[0].values,
912 vec![
913 Value::Entity(EntityId::new(1)),
914 Value::String("worker-b".into()),
915 Value::U64(2),
916 Value::U64(200),
917 ]
918 );
919 assert_eq!(
920 report.current_authorized[0].values,
921 vec![
922 Value::Entity(EntityId::new(1)),
923 Value::String("worker-b".into()),
924 Value::U64(2),
925 ]
926 );
927 assert_eq!(report.claimable.len(), 2);
928 assert_eq!(
929 report.accepted_outcomes[0].values,
930 vec![
931 Value::Entity(EntityId::new(1)),
932 Value::String("worker-b".into()),
933 Value::U64(2),
934 Value::String("completed".into()),
935 Value::String("current-worker-b".into()),
936 ]
937 );
938 assert_eq!(
939 report.rejected_outcomes[0].values,
940 vec![
941 Value::Entity(EntityId::new(1)),
942 Value::String("worker-a".into()),
943 Value::U64(1),
944 Value::String("completed".into()),
945 Value::String("stale-worker-a".into()),
946 ]
947 );
948 assert!(
949 report
950 .trace
951 .as_ref()
952 .map(|trace| trace.tuple_count)
953 .unwrap_or(0)
954 > 0
955 );
956 }
957
958 #[test]
959 fn coordination_pilot_report_respects_policy_context() {
960 let mut service = InMemoryKernelService::new();
961 let mut datoms = coordination_pilot_seed_history();
962 for datom in &mut datoms {
963 if datom.element.0 >= 6 {
964 datom.policy = Some(PolicyEnvelope {
965 capabilities: vec!["executor".into()],
966 visibilities: Vec::new(),
967 });
968 }
969 }
970 service
971 .append(AppendRequest { datoms })
972 .expect("append policy-filtered seed history");
973
974 let public_report = build_coordination_pilot_report_with_policy(&mut service, None)
975 .expect("build public coordination report");
976 assert_eq!(public_report.policy_context, None);
977 assert_eq!(public_report.history_len, 5);
978 assert!(public_report.as_of_authorized.is_empty());
979 assert!(public_report.current_authorized.is_empty());
980 assert!(public_report.accepted_outcomes.is_empty());
981 assert!(public_report.trace.is_none());
982
983 let executor_report = build_coordination_pilot_report_with_policy(
984 &mut service,
985 Some(PolicyContext {
986 capabilities: vec!["executor".into()],
987 visibilities: Vec::new(),
988 }),
989 )
990 .expect("build executor coordination report");
991 assert_eq!(executor_report.history_len, 25);
992 assert_eq!(
993 executor_report.current_authorized[0].values,
994 vec![
995 Value::Entity(EntityId::new(1)),
996 Value::String("worker-b".into()),
997 Value::U64(2),
998 ]
999 );
1000 assert!(executor_report.trace.is_some());
1001 }
1002}