aether_api/
report.rs

1use crate::{
2    pilot::{
3        coordination_pilot_dsl, COORDINATION_PILOT_AUTHORIZED_AS_OF_ELEMENT,
4        COORDINATION_PILOT_PRE_HEARTBEAT_ELEMENT,
5    },
6    ApiError, ExplainTupleRequest, HistoryRequest, KernelService, RunDocumentRequest,
7};
8use aether_ast::{ElementId, PolicyContext, QueryRow, TupleId, Value};
9use aether_resolver::ResolveError;
10use serde::{Deserialize, Serialize};
11use std::collections::{BTreeMap, VecDeque};
12use std::fmt::Write as _;
13use std::time::{SystemTime, UNIX_EPOCH};
14
15#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
16pub struct CoordinationPilotReport {
17    pub generated_at_ms: u64,
18    pub policy_context: Option<PolicyContext>,
19    pub history_len: usize,
20    pub pre_heartbeat_authorized: Vec<ReportRow>,
21    pub as_of_authorized: Vec<ReportRow>,
22    pub live_heartbeats: Vec<ReportRow>,
23    pub current_authorized: Vec<ReportRow>,
24    pub claimable: Vec<ReportRow>,
25    pub accepted_outcomes: Vec<ReportRow>,
26    pub rejected_outcomes: Vec<ReportRow>,
27    pub trace: Option<TraceSummary>,
28}
29
30#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
31pub struct ReportRow {
32    pub tuple_id: Option<TupleId>,
33    pub values: Vec<Value>,
34}
35
36#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
37pub struct TraceSummary {
38    pub root: TupleId,
39    pub tuple_count: usize,
40    pub tuples: Vec<TraceTupleSummary>,
41}
42
43#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
44pub struct TraceTupleSummary {
45    pub tuple_id: TupleId,
46    pub values: Vec<Value>,
47    pub iteration: usize,
48    pub source_datom_ids: Vec<ElementId>,
49    pub parent_tuple_ids: Vec<TupleId>,
50}
51
52#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
53#[serde(tag = "kind", rename_all = "snake_case")]
54pub enum CoordinationCut {
55    #[default]
56    Current,
57    AsOf {
58        element: ElementId,
59    },
60}
61
62impl CoordinationCut {
63    fn view_label(&self) -> String {
64        match self {
65            Self::Current => "current".into(),
66            Self::AsOf { element } => format!("as_of e{}", element.0),
67        }
68    }
69
70    fn human_label(&self) -> String {
71        match self {
72            Self::Current => "Current".into(),
73            Self::AsOf { element } => format!("AsOf(e{})", element.0),
74        }
75    }
76}
77
78#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
79pub struct CoordinationDeltaReportRequest {
80    #[serde(default)]
81    pub left: CoordinationCut,
82    #[serde(default)]
83    pub right: CoordinationCut,
84    #[serde(default, skip_serializing_if = "Option::is_none")]
85    pub policy_context: Option<PolicyContext>,
86}
87
88#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
89pub struct CoordinationTraceHandle {
90    pub tuple_id: TupleId,
91    pub tuple_count: usize,
92    pub source_datom_ids: Vec<ElementId>,
93    pub parent_tuple_ids: Vec<TupleId>,
94}
95
96#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
97pub struct ReportRowDiff {
98    pub row: ReportRow,
99    #[serde(default, skip_serializing_if = "Option::is_none")]
100    pub trace: Option<CoordinationTraceHandle>,
101}
102
103#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
104pub struct ReportRowChange {
105    pub before: ReportRow,
106    pub after: ReportRow,
107    #[serde(default, skip_serializing_if = "Option::is_none")]
108    pub before_trace: Option<CoordinationTraceHandle>,
109    #[serde(default, skip_serializing_if = "Option::is_none")]
110    pub after_trace: Option<CoordinationTraceHandle>,
111}
112
113#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
114pub struct ReportSectionDelta {
115    pub added: Vec<ReportRowDiff>,
116    pub removed: Vec<ReportRowDiff>,
117    pub changed: Vec<ReportRowChange>,
118}
119
120#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
121pub struct CoordinationDeltaReport {
122    pub generated_at_ms: u64,
123    pub left: CoordinationCut,
124    pub right: CoordinationCut,
125    #[serde(default, skip_serializing_if = "Option::is_none")]
126    pub policy_context: Option<PolicyContext>,
127    pub left_history_len: usize,
128    pub right_history_len: usize,
129    pub current_authorized: ReportSectionDelta,
130    pub claimable: ReportSectionDelta,
131    pub live_heartbeats: ReportSectionDelta,
132    pub accepted_outcomes: ReportSectionDelta,
133    pub rejected_outcomes: ReportSectionDelta,
134}
135
136pub fn build_coordination_pilot_report(
137    service: &mut impl KernelService,
138) -> Result<CoordinationPilotReport, ApiError> {
139    build_coordination_pilot_report_with_policy(service, None)
140}
141
142pub fn build_coordination_pilot_report_with_policy(
143    service: &mut impl KernelService,
144    policy_context: Option<PolicyContext>,
145) -> Result<CoordinationPilotReport, ApiError> {
146    let history_len = service
147        .history(HistoryRequest {
148            policy_context: policy_context.clone(),
149        })?
150        .datoms
151        .len();
152    let pre_heartbeat_authorized = run_report_query(
153        service,
154        RunDocumentRequest {
155            dsl: coordination_pilot_dsl(
156                &format!("as_of e{}", COORDINATION_PILOT_PRE_HEARTBEAT_ELEMENT),
157                "goal execution_authorized(t, worker, epoch)\n  keep t, worker, epoch",
158            ),
159            policy_context: policy_context.clone(),
160        },
161    )?;
162    let as_of_authorized = run_report_query(
163        service,
164        RunDocumentRequest {
165            dsl: coordination_pilot_dsl(
166                &format!("as_of e{}", COORDINATION_PILOT_AUTHORIZED_AS_OF_ELEMENT),
167                "goal execution_authorized(t, worker, epoch)\n  keep t, worker, epoch",
168            ),
169            policy_context: policy_context.clone(),
170        },
171    )?;
172    let live_heartbeats = run_report_query(
173        service,
174        RunDocumentRequest {
175            dsl: coordination_pilot_dsl(
176                "current",
177                "goal live_authority(t, worker, epoch, beat)\n  keep t, worker, epoch, beat",
178            ),
179            policy_context: policy_context.clone(),
180        },
181    )?;
182    let current_authorized = run_report_query(
183        service,
184        RunDocumentRequest {
185            dsl: coordination_pilot_dsl(
186                "current",
187                "goal execution_authorized(t, worker, epoch)\n  keep t, worker, epoch",
188            ),
189            policy_context: policy_context.clone(),
190        },
191    )?;
192    let claimable = run_report_query(
193        service,
194        RunDocumentRequest {
195            dsl: coordination_pilot_dsl(
196                "current",
197                "goal worker_can_claim(t, worker)\n  keep t, worker",
198            ),
199            policy_context: policy_context.clone(),
200        },
201    )?;
202    let accepted_outcomes = run_report_query(
203        service,
204        RunDocumentRequest {
205            dsl: coordination_pilot_dsl(
206                "current",
207                "goal execution_outcome_accepted(t, worker, epoch, status, detail)\n  keep t, worker, epoch, status, detail",
208            ),
209            policy_context: policy_context.clone(),
210        },
211    )?;
212    let rejected_outcomes = run_report_query(
213        service,
214        RunDocumentRequest {
215            dsl: coordination_pilot_dsl(
216                "current",
217                "goal execution_outcome_rejected_stale(t, worker, epoch, status, detail)\n  keep t, worker, epoch, status, detail",
218            ),
219            policy_context: policy_context.clone(),
220        },
221    )?;
222
223    let trace = current_authorized
224        .first()
225        .and_then(|row| row.tuple_id)
226        .map(|tuple_id| -> Result<TraceSummary, ApiError> {
227            let trace = service
228                .explain_tuple(ExplainTupleRequest {
229                    tuple_id,
230                    policy_context: policy_context.clone(),
231                })?
232                .trace;
233            Ok(TraceSummary {
234                root: trace.root,
235                tuple_count: trace.tuples.len(),
236                tuples: trace
237                    .tuples
238                    .into_iter()
239                    .map(|tuple| TraceTupleSummary {
240                        tuple_id: tuple.tuple.id,
241                        values: tuple.tuple.values,
242                        iteration: tuple.metadata.iteration,
243                        source_datom_ids: tuple.metadata.source_datom_ids,
244                        parent_tuple_ids: tuple.metadata.parent_tuple_ids,
245                    })
246                    .collect(),
247            })
248        })
249        .transpose()?;
250
251    Ok(CoordinationPilotReport {
252        generated_at_ms: now_millis(),
253        policy_context,
254        history_len,
255        pre_heartbeat_authorized: into_report_rows(pre_heartbeat_authorized),
256        as_of_authorized: into_report_rows(as_of_authorized),
257        live_heartbeats: into_report_rows(live_heartbeats),
258        current_authorized: into_report_rows(current_authorized),
259        claimable: into_report_rows(claimable),
260        accepted_outcomes: into_report_rows(accepted_outcomes),
261        rejected_outcomes: into_report_rows(rejected_outcomes),
262        trace,
263    })
264}
265
266pub fn build_coordination_delta_report(
267    service: &mut impl KernelService,
268    request: CoordinationDeltaReportRequest,
269) -> Result<CoordinationDeltaReport, ApiError> {
270    let policy_context = request.policy_context.clone();
271    let left = build_coordination_snapshot(service, &request.left, request.policy_context.clone())?;
272    let right =
273        build_coordination_snapshot(service, &request.right, request.policy_context.clone())?;
274
275    Ok(CoordinationDeltaReport {
276        generated_at_ms: now_millis(),
277        left: request.left,
278        right: request.right,
279        policy_context: policy_context.clone(),
280        left_history_len: left.history_len,
281        right_history_len: right.history_len,
282        current_authorized: diff_report_rows(
283            service,
284            left.current_authorized,
285            right.current_authorized,
286            policy_context.as_ref(),
287        )?,
288        claimable: diff_report_rows(
289            service,
290            left.claimable,
291            right.claimable,
292            policy_context.as_ref(),
293        )?,
294        live_heartbeats: diff_report_rows(
295            service,
296            left.live_heartbeats,
297            right.live_heartbeats,
298            policy_context.as_ref(),
299        )?,
300        accepted_outcomes: diff_report_rows(
301            service,
302            left.accepted_outcomes,
303            right.accepted_outcomes,
304            policy_context.as_ref(),
305        )?,
306        rejected_outcomes: diff_report_rows(
307            service,
308            left.rejected_outcomes,
309            right.rejected_outcomes,
310            policy_context.as_ref(),
311        )?,
312    })
313}
314
315pub fn render_coordination_pilot_report_markdown(report: &CoordinationPilotReport) -> String {
316    let mut output = String::new();
317    let _ = writeln!(output, "# AETHER Coordination Pilot Report");
318    let _ = writeln!(output);
319    let _ = writeln!(output, "- Generated at: `{}`", report.generated_at_ms);
320    let _ = writeln!(
321        output,
322        "- Effective policy: `{}`",
323        format_policy_context(report.policy_context.as_ref())
324    );
325    let _ = writeln!(output, "- Journal entries: `{}`", report.history_len);
326    let _ = writeln!(output);
327
328    render_row_section(
329        &mut output,
330        &format!(
331            "Authorization Before Heartbeat At AsOf(e{})",
332            COORDINATION_PILOT_PRE_HEARTBEAT_ELEMENT
333        ),
334        &report.pre_heartbeat_authorized,
335    );
336    render_row_section(
337        &mut output,
338        &format!(
339            "Authorization At AsOf(e{})",
340            COORDINATION_PILOT_AUTHORIZED_AS_OF_ELEMENT
341        ),
342        &report.as_of_authorized,
343    );
344    render_row_section(
345        &mut output,
346        "Current Live Heartbeats",
347        &report.live_heartbeats,
348    );
349    render_row_section(
350        &mut output,
351        "Authorization At Current",
352        &report.current_authorized,
353    );
354    render_row_section(&mut output, "Current Claimable Work", &report.claimable);
355    render_row_section(
356        &mut output,
357        "Current Accepted Outcomes",
358        &report.accepted_outcomes,
359    );
360    render_row_section(
361        &mut output,
362        "Current Rejected Outcomes",
363        &report.rejected_outcomes,
364    );
365
366    let _ = writeln!(output, "## Proof Trace");
367    let _ = writeln!(output);
368    match &report.trace {
369        Some(trace) => {
370            let _ = writeln!(output, "- Root tuple: `{}`", trace.root.0);
371            let _ = writeln!(output, "- Tuples in trace: `{}`", trace.tuple_count);
372            let _ = writeln!(output);
373            for tuple in &trace.tuples {
374                let _ = writeln!(
375                    output,
376                    "- `t{}` | values `{}` | iteration `{}` | sources `{}` | parents `{}`",
377                    tuple.tuple_id.0,
378                    format_values(&tuple.values),
379                    tuple.iteration,
380                    format_element_ids(&tuple.source_datom_ids),
381                    format_tuple_ids(&tuple.parent_tuple_ids),
382                );
383            }
384        }
385        None => {
386            let _ = writeln!(
387                output,
388                "No current authorization tuple was available to explain."
389            );
390        }
391    }
392
393    output
394}
395
396pub fn render_coordination_delta_report_markdown(report: &CoordinationDeltaReport) -> String {
397    let mut output = String::new();
398    let _ = writeln!(output, "# AETHER Coordination Delta Report");
399    let _ = writeln!(output);
400    let _ = writeln!(output, "- Generated at: `{}`", report.generated_at_ms);
401    let _ = writeln!(output, "- Left cut: `{}`", report.left.human_label());
402    let _ = writeln!(output, "- Right cut: `{}`", report.right.human_label());
403    let _ = writeln!(
404        output,
405        "- Effective policy: `{}`",
406        format_policy_context(report.policy_context.as_ref())
407    );
408    let _ = writeln!(
409        output,
410        "- Left journal entries: `{}`",
411        report.left_history_len
412    );
413    let _ = writeln!(
414        output,
415        "- Right journal entries: `{}`",
416        report.right_history_len
417    );
418    let _ = writeln!(output);
419
420    render_delta_section(
421        &mut output,
422        "Authorization Delta",
423        &report.current_authorized,
424    );
425    render_delta_section(&mut output, "Claimable Work Delta", &report.claimable);
426    render_delta_section(
427        &mut output,
428        "Live Heartbeats Delta",
429        &report.live_heartbeats,
430    );
431    render_delta_section(
432        &mut output,
433        "Accepted Outcomes Delta",
434        &report.accepted_outcomes,
435    );
436    render_delta_section(
437        &mut output,
438        "Rejected Outcomes Delta",
439        &report.rejected_outcomes,
440    );
441
442    output
443}
444
445fn format_policy_context(policy_context: Option<&PolicyContext>) -> String {
446    match policy_context {
447        None => "public".into(),
448        Some(policy_context) => {
449            let capabilities = if policy_context.capabilities.is_empty() {
450                "-".into()
451            } else {
452                policy_context.capabilities.join(", ")
453            };
454            let visibilities = if policy_context.visibilities.is_empty() {
455                "-".into()
456            } else {
457                policy_context.visibilities.join(", ")
458            };
459            format!("capabilities=[{capabilities}] visibilities=[{visibilities}]")
460        }
461    }
462}
463
464fn into_report_rows(rows: Vec<QueryRow>) -> Vec<ReportRow> {
465    rows.into_iter()
466        .map(|row| ReportRow {
467            tuple_id: row.tuple_id,
468            values: row.values,
469        })
470        .collect()
471}
472
473fn render_row_section(output: &mut String, title: &str, rows: &[ReportRow]) {
474    let _ = writeln!(output, "## {title}");
475    let _ = writeln!(output);
476    if rows.is_empty() {
477        let _ = writeln!(output, "No rows.");
478        let _ = writeln!(output);
479        return;
480    }
481
482    for row in rows {
483        let tuple_id = row
484            .tuple_id
485            .map(|tuple_id| format!("t{}", tuple_id.0))
486            .unwrap_or_else(|| "-".into());
487        let _ = writeln!(
488            output,
489            "- `{}` | `{}`",
490            tuple_id,
491            format_values(&row.values)
492        );
493    }
494    let _ = writeln!(output);
495}
496
497fn render_delta_section(output: &mut String, title: &str, delta: &ReportSectionDelta) {
498    let _ = writeln!(output, "## {title}");
499    let _ = writeln!(output);
500    if delta.added.is_empty() && delta.removed.is_empty() && delta.changed.is_empty() {
501        let _ = writeln!(output, "No changes.");
502        let _ = writeln!(output);
503        return;
504    }
505
506    render_diff_rows(output, "Added", &delta.added);
507    render_diff_rows(output, "Removed", &delta.removed);
508    render_changed_rows(output, &delta.changed);
509}
510
511fn render_diff_rows(output: &mut String, title: &str, rows: &[ReportRowDiff]) {
512    if rows.is_empty() {
513        return;
514    }
515    let _ = writeln!(output, "### {title}");
516    let _ = writeln!(output);
517    for row in rows {
518        let tuple = row
519            .row
520            .tuple_id
521            .map(|tuple_id| format!("t{}", tuple_id.0))
522            .unwrap_or_else(|| "-".into());
523        let _ = writeln!(
524            output,
525            "- `{}` | `{}`",
526            tuple,
527            format_values(&row.row.values)
528        );
529        if let Some(trace) = &row.trace {
530            let _ = writeln!(
531                output,
532                "  trace `t{}` | tuples `{}` | sources `{}` | parents `{}`",
533                trace.tuple_id.0,
534                trace.tuple_count,
535                format_element_ids(&trace.source_datom_ids),
536                format_tuple_ids(&trace.parent_tuple_ids),
537            );
538        }
539    }
540    let _ = writeln!(output);
541}
542
543fn render_changed_rows(output: &mut String, rows: &[ReportRowChange]) {
544    if rows.is_empty() {
545        return;
546    }
547    let _ = writeln!(output, "### Changed");
548    let _ = writeln!(output);
549    for row in rows {
550        let _ = writeln!(
551            output,
552            "- before `{}` | after `{}`",
553            format_values(&row.before.values),
554            format_values(&row.after.values),
555        );
556        if let Some(trace) = &row.before_trace {
557            let _ = writeln!(
558                output,
559                "  before trace `t{}` | tuples `{}` | sources `{}` | parents `{}`",
560                trace.tuple_id.0,
561                trace.tuple_count,
562                format_element_ids(&trace.source_datom_ids),
563                format_tuple_ids(&trace.parent_tuple_ids),
564            );
565        }
566        if let Some(trace) = &row.after_trace {
567            let _ = writeln!(
568                output,
569                "  after trace `t{}` | tuples `{}` | sources `{}` | parents `{}`",
570                trace.tuple_id.0,
571                trace.tuple_count,
572                format_element_ids(&trace.source_datom_ids),
573                format_tuple_ids(&trace.parent_tuple_ids),
574            );
575        }
576    }
577    let _ = writeln!(output);
578}
579
580fn run_report_query(
581    service: &mut impl KernelService,
582    request: RunDocumentRequest,
583) -> Result<Vec<QueryRow>, ApiError> {
584    match service.run_document(request) {
585        Ok(response) => Ok(response.query.unwrap_or_default().rows),
586        Err(ApiError::Resolve(ResolveError::UnknownElementId(_))) => Ok(Vec::new()),
587        Err(error) => Err(error),
588    }
589}
590
591#[derive(Clone, Debug, Default)]
592struct CoordinationSnapshot {
593    history_len: usize,
594    current_authorized: Vec<ReportRow>,
595    claimable: Vec<ReportRow>,
596    live_heartbeats: Vec<ReportRow>,
597    accepted_outcomes: Vec<ReportRow>,
598    rejected_outcomes: Vec<ReportRow>,
599}
600
601fn build_coordination_snapshot(
602    service: &mut impl KernelService,
603    cut: &CoordinationCut,
604    policy_context: Option<PolicyContext>,
605) -> Result<CoordinationSnapshot, ApiError> {
606    let history_len = run_report_history_len(service, cut, policy_context.clone())?;
607    Ok(CoordinationSnapshot {
608        history_len,
609        current_authorized: into_report_rows(run_report_query_for_cut(
610            service,
611            cut,
612            "goal execution_authorized(t, worker, epoch)\n  keep t, worker, epoch",
613            policy_context.clone(),
614        )?),
615        claimable: into_report_rows(run_report_query_for_cut(
616            service,
617            cut,
618            "goal worker_can_claim(t, worker)\n  keep t, worker",
619            policy_context.clone(),
620        )?),
621        live_heartbeats: into_report_rows(run_report_query_for_cut(
622            service,
623            cut,
624            "goal live_authority(t, worker, epoch, beat)\n  keep t, worker, epoch, beat",
625            policy_context.clone(),
626        )?),
627        accepted_outcomes: into_report_rows(run_report_query_for_cut(
628            service,
629            cut,
630            "goal execution_outcome_accepted(t, worker, epoch, status, detail)\n  keep t, worker, epoch, status, detail",
631            policy_context.clone(),
632        )?),
633        rejected_outcomes: into_report_rows(run_report_query_for_cut(
634            service,
635            cut,
636            "goal execution_outcome_rejected_stale(t, worker, epoch, status, detail)\n  keep t, worker, epoch, status, detail",
637            policy_context,
638        )?),
639    })
640}
641
642fn run_report_history_len(
643    service: &mut impl KernelService,
644    cut: &CoordinationCut,
645    policy_context: Option<PolicyContext>,
646) -> Result<usize, ApiError> {
647    let history = service.history(HistoryRequest {
648        policy_context: policy_context.clone(),
649    })?;
650    match cut {
651        CoordinationCut::Current => Ok(history.datoms.len()),
652        CoordinationCut::AsOf { element } => history
653            .datoms
654            .iter()
655            .position(|datom| datom.element == *element)
656            .map(|index| index + 1)
657            .ok_or_else(|| ApiError::Validation(format!("unknown element {}", element.0))),
658    }
659}
660
661fn run_report_query_for_cut(
662    service: &mut impl KernelService,
663    cut: &CoordinationCut,
664    query_body: &str,
665    policy_context: Option<PolicyContext>,
666) -> Result<Vec<QueryRow>, ApiError> {
667    run_report_query(
668        service,
669        RunDocumentRequest {
670            dsl: coordination_pilot_dsl(&cut.view_label(), query_body),
671            policy_context,
672        },
673    )
674}
675
676fn diff_report_rows(
677    service: &mut impl KernelService,
678    left: Vec<ReportRow>,
679    right: Vec<ReportRow>,
680    policy_context: Option<&PolicyContext>,
681) -> Result<ReportSectionDelta, ApiError> {
682    let mut left_exact = rows_by_signature(left);
683    let mut right_exact = rows_by_signature(right);
684    let exact_keys = left_exact
685        .keys()
686        .filter(|signature| right_exact.contains_key(*signature))
687        .cloned()
688        .collect::<Vec<_>>();
689    for signature in exact_keys {
690        let left_rows = left_exact
691            .get_mut(&signature)
692            .expect("left signature exists");
693        let right_rows = right_exact
694            .get_mut(&signature)
695            .expect("right signature exists");
696        let pair_count = left_rows.len().min(right_rows.len());
697        for _ in 0..pair_count {
698            left_rows.pop_front();
699            right_rows.pop_front();
700        }
701        if left_rows.is_empty() {
702            left_exact.remove(&signature);
703        }
704        if right_rows.is_empty() {
705            right_exact.remove(&signature);
706        }
707    }
708
709    let mut left_grouped = rows_by_primary_key(left_exact);
710    let mut right_grouped = rows_by_primary_key(right_exact);
711    let group_keys = left_grouped
712        .keys()
713        .chain(right_grouped.keys())
714        .cloned()
715        .collect::<std::collections::BTreeSet<_>>();
716
717    let mut added = Vec::new();
718    let mut removed = Vec::new();
719    let mut changed = Vec::new();
720
721    for key in group_keys {
722        let mut left_rows = left_grouped.remove(&key).unwrap_or_default();
723        let mut right_rows = right_grouped.remove(&key).unwrap_or_default();
724        while let (Some(before), Some(after)) = (left_rows.pop_front(), right_rows.pop_front()) {
725            changed.push(ReportRowChange {
726                before_trace: trace_handle_for_row(service, &before, policy_context)?,
727                after_trace: trace_handle_for_row(service, &after, policy_context)?,
728                before,
729                after,
730            });
731        }
732        for row in left_rows {
733            removed.push(ReportRowDiff {
734                trace: trace_handle_for_row(service, &row, policy_context)?,
735                row,
736            });
737        }
738        for row in right_rows {
739            added.push(ReportRowDiff {
740                trace: trace_handle_for_row(service, &row, policy_context)?,
741                row,
742            });
743        }
744    }
745
746    Ok(ReportSectionDelta {
747        added,
748        removed,
749        changed,
750    })
751}
752
753fn rows_by_signature(rows: Vec<ReportRow>) -> BTreeMap<String, VecDeque<ReportRow>> {
754    let mut grouped = BTreeMap::new();
755    for row in rows {
756        grouped
757            .entry(row_signature(&row))
758            .or_insert_with(VecDeque::new)
759            .push_back(row);
760    }
761    grouped
762}
763
764fn rows_by_primary_key(
765    rows: BTreeMap<String, VecDeque<ReportRow>>,
766) -> BTreeMap<String, VecDeque<ReportRow>> {
767    let mut grouped = BTreeMap::new();
768    for (_, mut bucket) in rows {
769        while let Some(row) = bucket.pop_front() {
770            grouped
771                .entry(row_primary_key(&row))
772                .or_insert_with(VecDeque::new)
773                .push_back(row);
774        }
775    }
776    grouped
777}
778
779fn row_signature(row: &ReportRow) -> String {
780    format!(
781        "{}|{}",
782        row.tuple_id
783            .map(|tuple_id| tuple_id.0.to_string())
784            .unwrap_or_else(|| "-".into()),
785        format_values(&row.values)
786    )
787}
788
789fn row_primary_key(row: &ReportRow) -> String {
790    row.values
791        .first()
792        .map(format_value)
793        .or_else(|| row.tuple_id.map(|tuple_id| format!("t{}", tuple_id.0)))
794        .unwrap_or_else(|| "-".into())
795}
796
797fn trace_handle_for_row(
798    service: &mut impl KernelService,
799    row: &ReportRow,
800    policy_context: Option<&PolicyContext>,
801) -> Result<Option<CoordinationTraceHandle>, ApiError> {
802    let Some(tuple_id) = row.tuple_id else {
803        return Ok(None);
804    };
805    let trace = match service.explain_tuple(ExplainTupleRequest {
806        tuple_id,
807        policy_context: policy_context.cloned(),
808    }) {
809        Ok(response) => response.trace,
810        Err(ApiError::Validation(message))
811            if message == "requested tuple is not visible under the current policy" =>
812        {
813            return Ok(None);
814        }
815        Err(error) => return Err(error),
816    };
817    let root = trace
818        .tuples
819        .iter()
820        .find(|tuple| tuple.tuple.id == trace.root)
821        .or_else(|| trace.tuples.first())
822        .ok_or_else(|| ApiError::Validation("empty explain trace".into()))?;
823    Ok(Some(CoordinationTraceHandle {
824        tuple_id: trace.root,
825        tuple_count: trace.tuples.len(),
826        source_datom_ids: root.metadata.source_datom_ids.clone(),
827        parent_tuple_ids: root.metadata.parent_tuple_ids.clone(),
828    }))
829}
830
831fn format_values(values: &[Value]) -> String {
832    values
833        .iter()
834        .map(format_value)
835        .collect::<Vec<_>>()
836        .join(", ")
837}
838
839fn format_value(value: &Value) -> String {
840    match value {
841        Value::Null => "null".into(),
842        Value::Bool(value) => value.to_string(),
843        Value::I64(value) => value.to_string(),
844        Value::U64(value) => value.to_string(),
845        Value::F64(value) => value.to_string(),
846        Value::String(value) => format!("\"{value}\""),
847        Value::Bytes(bytes) => format!("{bytes:?}"),
848        Value::Entity(entity) => format!("entity({})", entity.0),
849        Value::List(values) => format!("[{}]", format_values(values)),
850    }
851}
852
853fn format_element_ids(ids: &[ElementId]) -> String {
854    if ids.is_empty() {
855        return "-".into();
856    }
857    ids.iter()
858        .map(|id| format!("e{}", id.0))
859        .collect::<Vec<_>>()
860        .join(", ")
861}
862
863fn format_tuple_ids(ids: &[TupleId]) -> String {
864    if ids.is_empty() {
865        return "-".into();
866    }
867    ids.iter()
868        .map(|id| format!("t{}", id.0))
869        .collect::<Vec<_>>()
870        .join(", ")
871}
872
873fn now_millis() -> u64 {
874    let duration = SystemTime::now()
875        .duration_since(UNIX_EPOCH)
876        .unwrap_or_default();
877    duration.as_millis().min(u128::from(u64::MAX)) as u64
878}
879
880#[cfg(test)]
881mod tests {
882    use super::{build_coordination_pilot_report, build_coordination_pilot_report_with_policy};
883    use crate::{
884        coordination_pilot_seed_history, AppendRequest, InMemoryKernelService, KernelService,
885    };
886    use aether_ast::{EntityId, PolicyContext, PolicyEnvelope, Value};
887
888    #[test]
889    fn coordination_pilot_report_captures_expected_answers() {
890        let mut service = InMemoryKernelService::new();
891        service
892            .append(AppendRequest {
893                datoms: coordination_pilot_seed_history(),
894            })
895            .expect("append seed history");
896
897        let report =
898            build_coordination_pilot_report(&mut service).expect("build coordination report");
899
900        assert_eq!(report.history_len, 25);
901        assert!(report.pre_heartbeat_authorized.is_empty());
902        assert_eq!(
903            report.as_of_authorized[0].values,
904            vec![
905                Value::Entity(EntityId::new(1)),
906                Value::String("worker-a".into()),
907                Value::U64(1),
908            ]
909        );
910        assert_eq!(
911            report.live_heartbeats[0].values,
912            vec![
913                Value::Entity(EntityId::new(1)),
914                Value::String("worker-b".into()),
915                Value::U64(2),
916                Value::U64(200),
917            ]
918        );
919        assert_eq!(
920            report.current_authorized[0].values,
921            vec![
922                Value::Entity(EntityId::new(1)),
923                Value::String("worker-b".into()),
924                Value::U64(2),
925            ]
926        );
927        assert_eq!(report.claimable.len(), 2);
928        assert_eq!(
929            report.accepted_outcomes[0].values,
930            vec![
931                Value::Entity(EntityId::new(1)),
932                Value::String("worker-b".into()),
933                Value::U64(2),
934                Value::String("completed".into()),
935                Value::String("current-worker-b".into()),
936            ]
937        );
938        assert_eq!(
939            report.rejected_outcomes[0].values,
940            vec![
941                Value::Entity(EntityId::new(1)),
942                Value::String("worker-a".into()),
943                Value::U64(1),
944                Value::String("completed".into()),
945                Value::String("stale-worker-a".into()),
946            ]
947        );
948        assert!(
949            report
950                .trace
951                .as_ref()
952                .map(|trace| trace.tuple_count)
953                .unwrap_or(0)
954                > 0
955        );
956    }
957
958    #[test]
959    fn coordination_pilot_report_respects_policy_context() {
960        let mut service = InMemoryKernelService::new();
961        let mut datoms = coordination_pilot_seed_history();
962        for datom in &mut datoms {
963            if datom.element.0 >= 6 {
964                datom.policy = Some(PolicyEnvelope {
965                    capabilities: vec!["executor".into()],
966                    visibilities: Vec::new(),
967                });
968            }
969        }
970        service
971            .append(AppendRequest { datoms })
972            .expect("append policy-filtered seed history");
973
974        let public_report = build_coordination_pilot_report_with_policy(&mut service, None)
975            .expect("build public coordination report");
976        assert_eq!(public_report.policy_context, None);
977        assert_eq!(public_report.history_len, 5);
978        assert!(public_report.as_of_authorized.is_empty());
979        assert!(public_report.current_authorized.is_empty());
980        assert!(public_report.accepted_outcomes.is_empty());
981        assert!(public_report.trace.is_none());
982
983        let executor_report = build_coordination_pilot_report_with_policy(
984            &mut service,
985            Some(PolicyContext {
986                capabilities: vec!["executor".into()],
987                visibilities: Vec::new(),
988            }),
989        )
990        .expect("build executor coordination report");
991        assert_eq!(executor_report.history_len, 25);
992        assert_eq!(
993            executor_report.current_authorized[0].values,
994            vec![
995                Value::Entity(EntityId::new(1)),
996                Value::String("worker-b".into()),
997                Value::U64(2),
998            ]
999        );
1000        assert!(executor_report.trace.is_some());
1001    }
1002}