aether_api/
lib.rs

1use aether_ast::{
2    policy_allows, Datom, DerivationTrace, ElementId, ExplainSpec, ExplainTarget, ExtensionalFact,
3    NamedExplainSpec, NamedQuerySpec, PhaseGraph, PlanExplanation, PolicyContext, QueryResult,
4    QuerySpec, RuleProgram, TemporalView, Term, TupleId,
5};
6use aether_explain::{ExplainError, Explainer, InMemoryExplainer};
7use aether_plan::CompiledProgram;
8use aether_resolver::{MaterializedResolver, ResolveError, ResolvedState, ResolvedValue, Resolver};
9use aether_rules::{DefaultDslParser, DefaultRuleCompiler, DslParser, ParseError, RuleCompiler};
10use aether_runtime::{execute_query, DerivedSet, RuleRuntime, RuntimeError, SemiNaiveRuntime};
11use aether_schema::Schema;
12use aether_storage::{InMemoryJournal, Journal, JournalError, SqliteJournal};
13use serde::{Deserialize, Serialize};
14use std::path::Path;
15use thiserror::Error;
16
17pub mod deployment;
18pub mod http;
19pub mod partitioned;
20#[doc(hidden)]
21pub mod perf;
22pub mod pilot;
23pub mod report;
24pub mod sidecar;
25pub mod status;
26
27pub use deployment::{
28    default_audit_log_path, serve_pilot_http_service, DeploymentError, PilotAuthConfig,
29    PilotServiceConfig, PilotTokenConfig, ResolvedPilotServiceConfig, ResolvedPilotTokenSummary,
30};
31pub use http::{
32    http_router, http_router_with_options, http_router_with_partitioned_options, AuditContext,
33    AuditEntry, AuditLogResponse, AuthScope, HealthResponse, HttpAccessToken, HttpAuthConfig,
34    HttpKernelOptions, HttpKernelState,
35};
36pub use partitioned::{
37    render_federated_explain_report_markdown, AuthorityPartitionConfig, FederatedExplainReport,
38    FederatedHistoryRequest, FederatedHistoryResponse, FederatedImportedSourceSummary,
39    FederatedNamedQuerySummary, FederatedReportRow, FederatedRunDocumentRequest,
40    FederatedRunDocumentResponse, FederatedTraceSummary, FederatedTraceTupleSummary,
41    ImportedFactQueryRequest, ImportedFactQueryResponse, LeaderEpoch, PartitionAppendRequest,
42    PartitionAppendResponse, PartitionHistoryRequest, PartitionHistoryResponse,
43    PartitionStateRequest, PartitionStateResponse, PartitionStatus, PartitionStatusResponse,
44    PartitionedInMemoryKernelService, PromoteReplicaRequest, PromoteReplicaResponse, ReplicaConfig,
45    ReplicaRole, ReplicaStatus, ReplicatedAuthorityPartitionService,
46    SqlitePartitionedKernelService,
47};
48pub use pilot::{
49    coordination_pilot_dsl, coordination_pilot_seed_history,
50    COORDINATION_PILOT_AUTHORIZED_AS_OF_ELEMENT, COORDINATION_PILOT_PRE_HEARTBEAT_ELEMENT,
51};
52pub use report::{
53    build_coordination_delta_report, build_coordination_pilot_report,
54    build_coordination_pilot_report_with_policy, render_coordination_delta_report_markdown,
55    render_coordination_pilot_report_markdown, CoordinationCut, CoordinationDeltaReport,
56    CoordinationDeltaReportRequest, CoordinationPilotReport, CoordinationTraceHandle, ReportRow,
57    ReportRowChange, ReportRowDiff, ReportSectionDelta, TraceSummary, TraceTupleSummary,
58};
59pub use sidecar::{
60    ArtifactReference, GetArtifactReferenceRequest, GetArtifactReferenceResponse,
61    InMemorySidecarFederation, JournalCatalog, RegisterArtifactReferenceRequest,
62    RegisterArtifactReferenceResponse, RegisterVectorRecordRequest, RegisterVectorRecordResponse,
63    SearchVectorsRequest, SearchVectorsResponse, SidecarError, SidecarFederation,
64    SqliteSidecarFederation, VectorFactProjection, VectorMetric, VectorRecordMetadata,
65    VectorSearchMatch,
66};
67pub use status::{
68    AuthReloadResponse, PrincipalStatusSummary, ReplicaStatusSummary, ServiceMode,
69    ServiceStatusResponse, ServiceStatusStorage,
70};
71
72pub trait KernelService {
73    fn append(&mut self, request: AppendRequest) -> Result<AppendResponse, ApiError>;
74    fn history(&self, request: HistoryRequest) -> Result<HistoryResponse, ApiError>;
75    fn current_state(&self, request: CurrentStateRequest)
76        -> Result<CurrentStateResponse, ApiError>;
77    fn as_of(&self, request: AsOfRequest) -> Result<AsOfResponse, ApiError>;
78    fn compile_program(
79        &self,
80        request: CompileProgramRequest,
81    ) -> Result<CompileProgramResponse, ApiError>;
82    fn evaluate_program(
83        &mut self,
84        request: EvaluateProgramRequest,
85    ) -> Result<EvaluateProgramResponse, ApiError>;
86    fn explain_tuple(&self, request: ExplainTupleRequest)
87        -> Result<ExplainTupleResponse, ApiError>;
88    fn explain_plan(&self, request: ExplainPlanRequest) -> Result<ExplainPlanResponse, ApiError>;
89    fn parse_document(
90        &self,
91        request: ParseDocumentRequest,
92    ) -> Result<ParseDocumentResponse, ApiError>;
93    fn run_document(
94        &mut self,
95        request: RunDocumentRequest,
96    ) -> Result<RunDocumentResponse, ApiError>;
97    fn coordination_pilot_report(
98        &mut self,
99        request: CoordinationPilotReportRequest,
100    ) -> Result<CoordinationPilotReport, ApiError>;
101    fn coordination_delta_report(
102        &mut self,
103        request: CoordinationDeltaReportRequest,
104    ) -> Result<CoordinationDeltaReport, ApiError>;
105    fn register_artifact_reference(
106        &mut self,
107        request: RegisterArtifactReferenceRequest,
108    ) -> Result<RegisterArtifactReferenceResponse, ApiError>;
109    fn get_artifact_reference(
110        &self,
111        request: GetArtifactReferenceRequest,
112    ) -> Result<GetArtifactReferenceResponse, ApiError>;
113    fn register_vector_record(
114        &mut self,
115        request: RegisterVectorRecordRequest,
116    ) -> Result<RegisterVectorRecordResponse, ApiError>;
117    fn search_vectors(
118        &self,
119        request: SearchVectorsRequest,
120    ) -> Result<SearchVectorsResponse, ApiError>;
121}
122
123pub type InMemoryKernelService = KernelServiceCore<InMemoryJournal, InMemorySidecarFederation>;
124pub type SqliteKernelService = KernelServiceCore<SqliteJournal, SqliteSidecarFederation>;
125
126#[derive(Debug)]
127pub struct KernelServiceCore<J: Journal, S: SidecarFederation = InMemorySidecarFederation> {
128    journal: J,
129    sidecars: S,
130    last_derived: Option<CachedDerivedSet>,
131}
132
133impl KernelServiceCore<InMemoryJournal, InMemorySidecarFederation> {
134    pub fn new() -> Self {
135        Self::from_journal(InMemoryJournal::new())
136    }
137}
138
139impl Default for KernelServiceCore<InMemoryJournal, InMemorySidecarFederation> {
140    fn default() -> Self {
141        Self::new()
142    }
143}
144
145impl KernelServiceCore<SqliteJournal, SqliteSidecarFederation> {
146    pub fn open(path: impl AsRef<Path>) -> Result<Self, ApiError> {
147        let path = path.as_ref();
148        Ok(Self::from_parts(
149            SqliteJournal::open(path)?,
150            SqliteSidecarFederation::open(sidecar::sidecar_catalog_path_for_journal(path))?,
151        ))
152    }
153}
154
155impl<J: Journal, S: SidecarFederation> KernelServiceCore<J, S> {
156    pub fn from_parts(journal: J, sidecars: S) -> Self {
157        Self {
158            journal,
159            sidecars,
160            last_derived: None,
161        }
162    }
163
164    fn datoms_or_history(&self, datoms: &[Datom]) -> Result<Vec<Datom>, ApiError> {
165        if datoms.is_empty() {
166            Ok(self.journal.history()?)
167        } else {
168            Ok(datoms.to_vec())
169        }
170    }
171
172    fn visible_history(
173        &self,
174        datoms: &[Datom],
175        policy_context: Option<&PolicyContext>,
176    ) -> Result<Vec<Datom>, ApiError> {
177        Ok(filter_datoms(
178            self.datoms_or_history(datoms)?,
179            policy_context,
180        ))
181    }
182
183    fn cache_derived(&mut self, derived: DerivedSet) {
184        self.last_derived = Some(CachedDerivedSet { derived });
185    }
186
187    fn sidecar_journal_catalog(&self) -> Result<JournalCatalog, ApiError> {
188        Ok(JournalCatalog::from_history(&self.journal.history()?))
189    }
190
191    fn document_evaluation<'a>(
192        &self,
193        cache: &'a mut Vec<DocumentEvaluation>,
194        schema: &Schema,
195        datoms: &[Datom],
196        program: &CompiledProgram,
197        view: &TemporalView,
198    ) -> Result<&'a DocumentEvaluation, ApiError> {
199        if let Some(index) = cache.iter().position(|evaluation| &evaluation.view == view) {
200            return Ok(&cache[index]);
201        }
202
203        let state = match view {
204            TemporalView::AsOf(element) => MaterializedResolver.as_of(schema, datoms, element)?,
205            TemporalView::Current => MaterializedResolver.current(schema, datoms)?,
206        };
207        let derived = SemiNaiveRuntime.evaluate(&state, program)?;
208        cache.push(DocumentEvaluation {
209            view: view.clone(),
210            state,
211            derived,
212        });
213        Ok(cache
214            .last()
215            .expect("evaluation cache contains the inserted view"))
216    }
217}
218
219fn filter_datoms(datoms: Vec<Datom>, policy_context: Option<&PolicyContext>) -> Vec<Datom> {
220    datoms
221        .into_iter()
222        .filter(|datom| policy_allows(policy_context, datom.policy.as_ref()))
223        .collect()
224}
225
226fn filter_extensional_facts(
227    facts: Vec<ExtensionalFact>,
228    policy_context: Option<&PolicyContext>,
229) -> Vec<ExtensionalFact> {
230    facts
231        .into_iter()
232        .filter(|fact| policy_allows(policy_context, fact.policy.as_ref()))
233        .collect()
234}
235
236fn filter_compiled_program(
237    program: &CompiledProgram,
238    policy_context: Option<&PolicyContext>,
239) -> CompiledProgram {
240    let mut filtered = program.clone();
241    filtered.facts = filter_extensional_facts(filtered.facts, policy_context);
242    filtered
243}
244
245fn filter_resolved_state(
246    state: &ResolvedState,
247    policy_context: Option<&PolicyContext>,
248) -> ResolvedState {
249    let mut filtered = ResolvedState {
250        entities: indexmap::IndexMap::new(),
251        as_of: state.as_of,
252    };
253
254    for (entity_id, entity_state) in &state.entities {
255        let mut visible_entity = aether_resolver::EntityState::default();
256        for (attribute_id, value) in &entity_state.attributes {
257            let visible_facts = entity_state
258                .facts(attribute_id)
259                .iter()
260                .filter(|fact| policy_allows(policy_context, fact.policy.as_ref()))
261                .cloned()
262                .collect::<Vec<_>>();
263            if visible_facts.is_empty() {
264                continue;
265            }
266            let visible_value = match value {
267                ResolvedValue::Scalar(_) => {
268                    ResolvedValue::Scalar(visible_facts.last().map(|fact| fact.value.clone()))
269                }
270                ResolvedValue::Set(_) => ResolvedValue::Set(
271                    visible_facts
272                        .iter()
273                        .map(|fact| fact.value.clone())
274                        .collect::<Vec<_>>(),
275                ),
276                ResolvedValue::Sequence(_) => ResolvedValue::Sequence(
277                    visible_facts
278                        .iter()
279                        .map(|fact| fact.value.clone())
280                        .collect::<Vec<_>>(),
281                ),
282            };
283            visible_entity
284                .attributes
285                .insert(*attribute_id, visible_value);
286            visible_entity.facts.insert(*attribute_id, visible_facts);
287        }
288        if !visible_entity.attributes.is_empty() {
289            filtered.entities.insert(*entity_id, visible_entity);
290        }
291    }
292
293    filtered
294}
295
296fn filter_derived_set(derived: &DerivedSet, policy_context: Option<&PolicyContext>) -> DerivedSet {
297    let tuples = derived
298        .tuples
299        .iter()
300        .filter(|tuple| policy_allows(policy_context, tuple.policy.as_ref()))
301        .cloned()
302        .collect::<Vec<_>>();
303    let visible_ids = tuples
304        .iter()
305        .map(|tuple| tuple.tuple.id)
306        .collect::<std::collections::BTreeSet<_>>();
307    let predicate_index = derived
308        .predicate_index
309        .iter()
310        .map(|(predicate, tuple_ids)| {
311            (
312                *predicate,
313                tuple_ids
314                    .iter()
315                    .copied()
316                    .filter(|tuple_id| visible_ids.contains(tuple_id))
317                    .collect::<Vec<_>>(),
318            )
319        })
320        .collect();
321
322    DerivedSet {
323        tuples,
324        iterations: derived.iterations.clone(),
325        predicate_index,
326    }
327}
328
329fn filter_trace(
330    trace: DerivationTrace,
331    policy_context: Option<&PolicyContext>,
332) -> Result<DerivationTrace, ApiError> {
333    let tuples = trace
334        .tuples
335        .into_iter()
336        .filter(|tuple| policy_allows(policy_context, tuple.policy.as_ref()))
337        .collect::<Vec<_>>();
338    if tuples.iter().all(|tuple| tuple.tuple.id != trace.root) {
339        return Err(ApiError::Validation(
340            "requested tuple is not visible under the current policy".into(),
341        ));
342    }
343    Ok(DerivationTrace {
344        root: trace.root,
345        tuples,
346    })
347}
348
349fn ensure_visible_element(
350    datoms: &[Datom],
351    at: ElementId,
352    policy_context: Option<&PolicyContext>,
353) -> Result<(), ApiError> {
354    if datoms
355        .iter()
356        .any(|datom| datom.element == at && policy_allows(policy_context, datom.policy.as_ref()))
357    {
358        Ok(())
359    } else {
360        Err(ApiError::Validation(format!("unknown element {}", at.0)))
361    }
362}
363
364impl<J, S> KernelServiceCore<J, S>
365where
366    J: Journal,
367    S: SidecarFederation + Default,
368{
369    pub fn from_journal(journal: J) -> Self {
370        Self::from_parts(journal, S::default())
371    }
372}
373
374#[derive(Clone, Debug)]
375struct DocumentEvaluation {
376    view: TemporalView,
377    state: ResolvedState,
378    derived: DerivedSet,
379}
380
381#[derive(Clone, Debug)]
382struct CachedDerivedSet {
383    derived: DerivedSet,
384}
385
386impl<J: Journal, S: SidecarFederation> KernelService for KernelServiceCore<J, S> {
387    fn append(&mut self, request: AppendRequest) -> Result<AppendResponse, ApiError> {
388        self.journal.append(&request.datoms)?;
389        Ok(AppendResponse {
390            appended: request.datoms.len(),
391        })
392    }
393
394    fn history(&self, request: HistoryRequest) -> Result<HistoryResponse, ApiError> {
395        Ok(HistoryResponse {
396            datoms: self.visible_history(&[], request.policy_context.as_ref())?,
397        })
398    }
399
400    fn current_state(
401        &self,
402        request: CurrentStateRequest,
403    ) -> Result<CurrentStateResponse, ApiError> {
404        let datoms = self.datoms_or_history(&request.datoms)?;
405        Ok(CurrentStateResponse {
406            state: filter_resolved_state(
407                &MaterializedResolver.current(&request.schema, &datoms)?,
408                request.policy_context.as_ref(),
409            ),
410        })
411    }
412
413    fn as_of(&self, request: AsOfRequest) -> Result<AsOfResponse, ApiError> {
414        let datoms = self.datoms_or_history(&request.datoms)?;
415        ensure_visible_element(&datoms, request.at, request.policy_context.as_ref())?;
416        Ok(AsOfResponse {
417            state: filter_resolved_state(
418                &MaterializedResolver.as_of(&request.schema, &datoms, &request.at)?,
419                request.policy_context.as_ref(),
420            ),
421        })
422    }
423
424    fn compile_program(
425        &self,
426        request: CompileProgramRequest,
427    ) -> Result<CompileProgramResponse, ApiError> {
428        Ok(CompileProgramResponse {
429            program: DefaultRuleCompiler.compile(&request.schema, &request.program)?,
430        })
431    }
432
433    fn evaluate_program(
434        &mut self,
435        request: EvaluateProgramRequest,
436    ) -> Result<EvaluateProgramResponse, ApiError> {
437        let derived = SemiNaiveRuntime.evaluate(&request.state, &request.program)?;
438        self.cache_derived(derived.clone());
439        Ok(EvaluateProgramResponse {
440            derived: filter_derived_set(&derived, request.policy_context.as_ref()),
441        })
442    }
443
444    fn explain_tuple(
445        &self,
446        request: ExplainTupleRequest,
447    ) -> Result<ExplainTupleResponse, ApiError> {
448        let cached = self
449            .last_derived
450            .as_ref()
451            .ok_or_else(|| ApiError::Validation("no derived tuples are cached".into()))?;
452        let trace = InMemoryExplainer::from_derived_set(&cached.derived)
453            .explain_tuple(&request.tuple_id)?;
454        Ok(ExplainTupleResponse {
455            trace: filter_trace(trace, request.policy_context.as_ref())?,
456        })
457    }
458
459    fn explain_plan(&self, request: ExplainPlanRequest) -> Result<ExplainPlanResponse, ApiError> {
460        let explanation = InMemoryExplainer::default().explain_plan(&request.plan)?;
461        Ok(ExplainPlanResponse { explanation })
462    }
463
464    fn parse_document(
465        &self,
466        request: ParseDocumentRequest,
467    ) -> Result<ParseDocumentResponse, ApiError> {
468        let document = DefaultDslParser.parse_document(&request.dsl)?;
469        Ok(ParseDocumentResponse {
470            schema: document.schema,
471            program: document.program,
472            query: document.query,
473            queries: document.queries,
474            explains: document.explains,
475        })
476    }
477
478    fn run_document(
479        &mut self,
480        request: RunDocumentRequest,
481    ) -> Result<RunDocumentResponse, ApiError> {
482        let document = DefaultDslParser.parse_document(&request.dsl)?;
483        let datoms = self.datoms_or_history(&[])?;
484        let program = DefaultRuleCompiler.compile(&document.schema, &document.program)?;
485        let mut evaluations = Vec::new();
486        let primary_view = document
487            .query
488            .as_ref()
489            .map(|query| query.view.clone())
490            .or_else(|| {
491                document
492                    .queries
493                    .first()
494                    .map(|query| query.spec.view.clone())
495            })
496            .or_else(|| {
497                document
498                    .explains
499                    .first()
500                    .map(|explain| explain.spec.view.clone())
501            })
502            .unwrap_or(TemporalView::Current);
503        let primary = self.document_evaluation(
504            &mut evaluations,
505            &document.schema,
506            &datoms,
507            &program,
508            &primary_view,
509        )?;
510        let primary_state = primary.state.clone();
511        let primary_derived = primary.derived.clone();
512        let query = match &document.query {
513            Some(query) => Some(execute_query(
514                &primary_state,
515                &program,
516                &primary_derived,
517                &query.query,
518                request.policy_context.as_ref(),
519            )?),
520            None => None,
521        };
522        let queries = document
523            .queries
524            .iter()
525            .map(|named_query| {
526                let evaluation = self.document_evaluation(
527                    &mut evaluations,
528                    &document.schema,
529                    &datoms,
530                    &program,
531                    &named_query.spec.view,
532                )?;
533                Ok(NamedQueryResult {
534                    name: named_query.name.clone(),
535                    spec: named_query.spec.clone(),
536                    result: execute_query(
537                        &evaluation.state,
538                        &program,
539                        &evaluation.derived,
540                        &named_query.spec.query,
541                        request.policy_context.as_ref(),
542                    )?,
543                })
544            })
545            .collect::<Result<Vec<_>, ApiError>>()?;
546        let explains = document
547            .explains
548            .iter()
549            .map(|named_explain| {
550                let evaluation = self.document_evaluation(
551                    &mut evaluations,
552                    &document.schema,
553                    &datoms,
554                    &program,
555                    &named_explain.spec.view,
556                )?;
557                Ok(NamedExplainResult {
558                    name: named_explain.name.clone(),
559                    spec: named_explain.spec.clone(),
560                    result: execute_explain_spec(
561                        &program,
562                        evaluation,
563                        &named_explain.spec,
564                        request.policy_context.as_ref(),
565                    )?,
566                })
567            })
568            .collect::<Result<Vec<_>, ApiError>>()?;
569        self.cache_derived(primary_derived.clone());
570        let derived = filter_derived_set(&primary_derived, request.policy_context.as_ref());
571
572        Ok(RunDocumentResponse {
573            state: filter_resolved_state(&primary_state, request.policy_context.as_ref()),
574            program: filter_compiled_program(&program, request.policy_context.as_ref()),
575            derived,
576            query,
577            queries,
578            explains,
579        })
580    }
581
582    fn coordination_pilot_report(
583        &mut self,
584        request: CoordinationPilotReportRequest,
585    ) -> Result<CoordinationPilotReport, ApiError> {
586        build_coordination_pilot_report_with_policy(self, request.policy_context)
587    }
588
589    fn coordination_delta_report(
590        &mut self,
591        request: CoordinationDeltaReportRequest,
592    ) -> Result<CoordinationDeltaReport, ApiError> {
593        report::build_coordination_delta_report(self, request)
594    }
595
596    fn register_artifact_reference(
597        &mut self,
598        request: RegisterArtifactReferenceRequest,
599    ) -> Result<RegisterArtifactReferenceResponse, ApiError> {
600        let journal = self.sidecar_journal_catalog()?;
601        Ok(self
602            .sidecars
603            .register_artifact_reference(request, &journal)?)
604    }
605
606    fn get_artifact_reference(
607        &self,
608        request: GetArtifactReferenceRequest,
609    ) -> Result<GetArtifactReferenceResponse, ApiError> {
610        Ok(self.sidecars.get_artifact_reference(request)?)
611    }
612
613    fn register_vector_record(
614        &mut self,
615        request: RegisterVectorRecordRequest,
616    ) -> Result<RegisterVectorRecordResponse, ApiError> {
617        let journal = self.sidecar_journal_catalog()?;
618        Ok(self.sidecars.register_vector_record(request, &journal)?)
619    }
620
621    fn search_vectors(
622        &self,
623        request: SearchVectorsRequest,
624    ) -> Result<SearchVectorsResponse, ApiError> {
625        let journal = self.sidecar_journal_catalog()?;
626        Ok(self.sidecars.search_vectors(request, &journal)?)
627    }
628}
629
630#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
631pub struct AppendRequest {
632    pub datoms: Vec<Datom>,
633}
634
635#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
636pub struct AppendResponse {
637    pub appended: usize,
638}
639
640#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
641pub struct HistoryRequest {
642    #[serde(default, skip_serializing_if = "Option::is_none")]
643    pub policy_context: Option<PolicyContext>,
644}
645
646#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
647pub struct HistoryResponse {
648    pub datoms: Vec<Datom>,
649}
650
651#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
652pub struct CurrentStateRequest {
653    pub schema: Schema,
654    pub datoms: Vec<Datom>,
655    #[serde(default, skip_serializing_if = "Option::is_none")]
656    pub policy_context: Option<PolicyContext>,
657}
658
659#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
660pub struct CurrentStateResponse {
661    pub state: ResolvedState,
662}
663
664#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
665pub struct AsOfRequest {
666    pub schema: Schema,
667    pub datoms: Vec<Datom>,
668    pub at: ElementId,
669    #[serde(default, skip_serializing_if = "Option::is_none")]
670    pub policy_context: Option<PolicyContext>,
671}
672
673#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
674pub struct AsOfResponse {
675    pub state: ResolvedState,
676}
677
678#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
679pub struct CompileProgramRequest {
680    pub schema: Schema,
681    pub program: RuleProgram,
682}
683
684#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
685pub struct CompileProgramResponse {
686    pub program: CompiledProgram,
687}
688
689#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
690pub struct EvaluateProgramRequest {
691    pub state: ResolvedState,
692    pub program: CompiledProgram,
693    #[serde(default, skip_serializing_if = "Option::is_none")]
694    pub policy_context: Option<PolicyContext>,
695}
696
697#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
698pub struct EvaluateProgramResponse {
699    pub derived: DerivedSet,
700}
701
702#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
703pub struct ExplainTupleRequest {
704    pub tuple_id: TupleId,
705    #[serde(default, skip_serializing_if = "Option::is_none")]
706    pub policy_context: Option<PolicyContext>,
707}
708
709#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
710pub struct ExplainTupleResponse {
711    pub trace: DerivationTrace,
712}
713
714#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
715pub struct ExplainPlanRequest {
716    pub plan: PhaseGraph,
717}
718
719#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
720pub struct ExplainPlanResponse {
721    pub explanation: PlanExplanation,
722}
723
724#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
725pub struct ParseDocumentRequest {
726    pub dsl: String,
727}
728
729#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
730pub struct ParseDocumentResponse {
731    pub schema: Schema,
732    pub program: RuleProgram,
733    pub query: Option<QuerySpec>,
734    pub queries: Vec<NamedQuerySpec>,
735    pub explains: Vec<NamedExplainSpec>,
736}
737
738#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
739pub struct RunDocumentRequest {
740    pub dsl: String,
741    #[serde(default, skip_serializing_if = "Option::is_none")]
742    pub policy_context: Option<PolicyContext>,
743}
744
745#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
746pub struct CoordinationPilotReportRequest {
747    #[serde(default, skip_serializing_if = "Option::is_none")]
748    pub policy_context: Option<PolicyContext>,
749}
750
751#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
752pub struct RunDocumentResponse {
753    pub state: ResolvedState,
754    pub program: CompiledProgram,
755    pub derived: DerivedSet,
756    pub query: Option<QueryResult>,
757    pub queries: Vec<NamedQueryResult>,
758    pub explains: Vec<NamedExplainResult>,
759}
760
761#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
762pub struct NamedQueryResult {
763    pub name: Option<String>,
764    pub spec: QuerySpec,
765    pub result: QueryResult,
766}
767
768#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
769pub enum ExplainArtifact {
770    Plan(PlanExplanation),
771    Tuple(DerivationTrace),
772}
773
774impl Default for ExplainArtifact {
775    fn default() -> Self {
776        Self::Plan(PlanExplanation::default())
777    }
778}
779
780#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
781pub struct NamedExplainResult {
782    pub name: Option<String>,
783    pub spec: ExplainSpec,
784    pub result: ExplainArtifact,
785}
786
787fn execute_explain_spec(
788    program: &CompiledProgram,
789    evaluation: &DocumentEvaluation,
790    spec: &ExplainSpec,
791    policy_context: Option<&PolicyContext>,
792) -> Result<ExplainArtifact, ApiError> {
793    match &spec.target {
794        ExplainTarget::Plan => Ok(ExplainArtifact::Plan(
795            InMemoryExplainer::default().explain_plan(&program.phase_graph)?,
796        )),
797        ExplainTarget::Tuple(atom) => {
798            let visible = filter_derived_set(&evaluation.derived, policy_context);
799            let tuple_id = find_matching_derived_tuple(&visible, atom).ok_or_else(|| {
800                ApiError::Validation(format!(
801                    "no derived tuple matched explain target {}",
802                    atom.predicate.name
803                ))
804            })?;
805            Ok(ExplainArtifact::Tuple(filter_trace(
806                InMemoryExplainer::from_derived_set(&evaluation.derived)
807                    .explain_tuple(&tuple_id)?,
808                policy_context,
809            )?))
810        }
811    }
812}
813
814fn find_matching_derived_tuple(derived: &DerivedSet, atom: &aether_ast::Atom) -> Option<TupleId> {
815    derived.tuples.iter().find_map(|tuple| {
816        if tuple.tuple.predicate != atom.predicate.id
817            || tuple.tuple.values.len() != atom.terms.len()
818        {
819            return None;
820        }
821        let matches = atom
822            .terms
823            .iter()
824            .zip(&tuple.tuple.values)
825            .all(|(term, value)| match term {
826                Term::Value(expected) => expected == value,
827                Term::Variable(_) | Term::Aggregate(_) => false,
828            });
829        matches.then_some(tuple.tuple.id)
830    })
831}
832
833#[derive(Debug, Error)]
834pub enum ApiError {
835    #[error("validation error: {0}")]
836    Validation(String),
837    #[error(transparent)]
838    Journal(#[from] JournalError),
839    #[error(transparent)]
840    Sidecar(#[from] SidecarError),
841    #[error(transparent)]
842    Resolve(#[from] ResolveError),
843    #[error(transparent)]
844    Parse(#[from] ParseError),
845    #[error(transparent)]
846    Compile(#[from] aether_rules::CompileError),
847    #[error(transparent)]
848    Runtime(#[from] RuntimeError),
849    #[error(transparent)]
850    Explain(#[from] ExplainError),
851}
852
853#[cfg(test)]
854mod tests {
855    use super::{
856        coordination_pilot_dsl, coordination_pilot_seed_history, ApiError, AppendRequest,
857        AsOfRequest, CurrentStateRequest, ExplainArtifact, ExplainTupleRequest,
858        InMemoryKernelService, KernelService, ParseDocumentRequest, RunDocumentRequest,
859        COORDINATION_PILOT_AUTHORIZED_AS_OF_ELEMENT, COORDINATION_PILOT_PRE_HEARTBEAT_ELEMENT,
860    };
861    use aether_ast::{ElementId, EntityId, PolicyContext, PolicyEnvelope, Value};
862
863    #[test]
864    fn service_models_multi_worker_lease_handoff_and_fencing() {
865        let mut service = InMemoryKernelService::new();
866        service
867            .append(AppendRequest {
868                datoms: coordination_pilot_seed_history(),
869            })
870            .expect("append journal");
871
872        let parsed = service
873            .parse_document(ParseDocumentRequest {
874                dsl: coordination_pilot_dsl(
875                    &format!("as_of e{}", COORDINATION_PILOT_AUTHORIZED_AS_OF_ELEMENT),
876                    "goal execution_authorized(t, worker, epoch)\n  keep t, worker, epoch",
877                ),
878            })
879            .expect("parse coordination document");
880        assert_eq!(parsed.program.facts.len(), 7);
881
882        let pre_heartbeat_authorized = service
883            .run_document(RunDocumentRequest {
884                dsl: coordination_pilot_dsl(
885                    &format!("as_of e{}", COORDINATION_PILOT_PRE_HEARTBEAT_ELEMENT),
886                    "goal execution_authorized(t, worker, epoch)\n  keep t, worker, epoch",
887                ),
888                policy_context: None,
889            })
890            .expect("run pre-heartbeat authorization document");
891        assert_eq!(
892            pre_heartbeat_authorized.state.as_of,
893            Some(ElementId::new(COORDINATION_PILOT_PRE_HEARTBEAT_ELEMENT))
894        );
895        assert!(pre_heartbeat_authorized
896            .query
897            .as_ref()
898            .expect("query result should exist")
899            .rows
900            .is_empty());
901
902        let as_of_authorized = service
903            .run_document(RunDocumentRequest {
904                dsl: coordination_pilot_dsl(
905                    &format!("as_of e{}", COORDINATION_PILOT_AUTHORIZED_AS_OF_ELEMENT),
906                    "goal execution_authorized(t, worker, epoch)\n  keep t, worker, epoch",
907                ),
908                policy_context: None,
909            })
910            .expect("run as_of authorization document");
911        assert_eq!(
912            as_of_authorized.state.as_of,
913            Some(ElementId::new(COORDINATION_PILOT_AUTHORIZED_AS_OF_ELEMENT))
914        );
915        let as_of_authorized_rows = &as_of_authorized
916            .query
917            .as_ref()
918            .expect("query result should exist")
919            .rows;
920        assert_eq!(as_of_authorized_rows.len(), 1);
921        assert_eq!(
922            as_of_authorized_rows[0].values,
923            vec![
924                Value::Entity(EntityId::new(1)),
925                Value::String("worker-a".into()),
926                Value::U64(1),
927            ]
928        );
929
930        let current_authorized = service
931            .run_document(RunDocumentRequest {
932                dsl: coordination_pilot_dsl(
933                    "current",
934                    "goal execution_authorized(t, worker, epoch)\n  keep t, worker, epoch",
935                ),
936                policy_context: None,
937            })
938            .expect("run current authorization document");
939        let authorized_rows = &current_authorized
940            .query
941            .as_ref()
942            .expect("query result should exist")
943            .rows;
944        assert_eq!(authorized_rows.len(), 1);
945        assert_eq!(
946            authorized_rows[0].values,
947            vec![
948                Value::Entity(EntityId::new(1)),
949                Value::String("worker-b".into()),
950                Value::U64(2),
951            ]
952        );
953        let trace = service
954            .explain_tuple(ExplainTupleRequest {
955                tuple_id: authorized_rows[0]
956                    .tuple_id
957                    .expect("execution_authorized tuple id"),
958                policy_context: None,
959            })
960            .expect("explain authorization tuple")
961            .trace;
962        assert!(!trace.tuples.is_empty());
963
964        let claimable = service
965            .run_document(RunDocumentRequest {
966                dsl: coordination_pilot_dsl(
967                    "current",
968                    "goal worker_can_claim(t, worker)\n  keep t, worker",
969                ),
970                policy_context: None,
971            })
972            .expect("run claimability document");
973        let claimable_rows = &claimable
974            .query
975            .as_ref()
976            .expect("query result should exist")
977            .rows;
978        assert_eq!(claimable_rows.len(), 2);
979        assert_eq!(
980            claimable_rows
981                .iter()
982                .map(|row| row.values.clone())
983                .collect::<Vec<_>>(),
984            vec![
985                vec![
986                    Value::Entity(EntityId::new(3)),
987                    Value::String("worker-a".into()),
988                ],
989                vec![
990                    Value::Entity(EntityId::new(3)),
991                    Value::String("worker-b".into()),
992                ],
993            ]
994        );
995
996        let accepted_outcomes = service
997            .run_document(RunDocumentRequest {
998                dsl: coordination_pilot_dsl(
999                    "current",
1000                    "goal execution_outcome_accepted(t, worker, epoch, status, detail)\n  keep t, worker, epoch, status, detail",
1001                ),
1002                policy_context: None,
1003            })
1004            .expect("run accepted-outcome document");
1005        let accepted_rows = &accepted_outcomes
1006            .query
1007            .as_ref()
1008            .expect("query result should exist")
1009            .rows;
1010        assert_eq!(
1011            accepted_rows[0].values,
1012            vec![
1013                Value::Entity(EntityId::new(1)),
1014                Value::String("worker-b".into()),
1015                Value::U64(2),
1016                Value::String("completed".into()),
1017                Value::String("current-worker-b".into()),
1018            ]
1019        );
1020
1021        let rejected_outcomes = service
1022            .run_document(RunDocumentRequest {
1023                dsl: coordination_pilot_dsl(
1024                    "current",
1025                    "goal execution_outcome_rejected_stale(t, worker, epoch, status, detail)\n  keep t, worker, epoch, status, detail",
1026                ),
1027                policy_context: None,
1028            })
1029            .expect("run rejected-outcome document");
1030        let rejected_rows = &rejected_outcomes
1031            .query
1032            .as_ref()
1033            .expect("query result should exist")
1034            .rows;
1035        assert_eq!(
1036            rejected_rows[0].values,
1037            vec![
1038                Value::Entity(EntityId::new(1)),
1039                Value::String("worker-a".into()),
1040                Value::U64(1),
1041                Value::String("completed".into()),
1042                Value::String("stale-worker-a".into()),
1043            ]
1044        );
1045    }
1046
1047    #[test]
1048    fn service_parses_and_runs_named_queries_and_explain_directives() {
1049        let mut service = InMemoryKernelService::new();
1050        service
1051            .append(AppendRequest {
1052                datoms: vec![dependency_datom(1, 2, 1), dependency_datom(2, 3, 2)],
1053            })
1054            .expect("append transitive chain");
1055
1056        let parsed = service
1057            .parse_document(ParseDocumentRequest {
1058                dsl: transitive_document_dsl(),
1059            })
1060            .expect("parse transitive document");
1061        assert_eq!(parsed.query, Some(parsed.queries[0].spec.clone()));
1062        assert_eq!(parsed.queries.len(), 2);
1063        assert_eq!(parsed.explains.len(), 2);
1064
1065        let response = service
1066            .run_document(RunDocumentRequest {
1067                dsl: transitive_document_dsl(),
1068                policy_context: None,
1069            })
1070            .expect("run named-query document");
1071        assert_eq!(response.query, Some(response.queries[0].result.clone()));
1072        assert_eq!(response.queries.len(), 2);
1073        assert_eq!(response.explains.len(), 2);
1074        assert_eq!(
1075            response.queries[0].result.rows[0].values,
1076            vec![Value::Entity(EntityId::new(2))]
1077        );
1078        assert_eq!(
1079            response.queries[1]
1080                .result
1081                .rows
1082                .iter()
1083                .map(|row| row.values.clone())
1084                .collect::<Vec<_>>(),
1085            vec![
1086                vec![Value::Entity(EntityId::new(2))],
1087                vec![Value::Entity(EntityId::new(3))],
1088            ]
1089        );
1090        assert!(matches!(
1091            &response.explains[0].result,
1092            ExplainArtifact::Tuple(trace) if !trace.tuples.is_empty()
1093        ));
1094        assert!(matches!(
1095            &response.explains[1].result,
1096            ExplainArtifact::Plan(explanation) if !explanation.phase_graph.nodes.is_empty()
1097        ));
1098    }
1099
1100    #[test]
1101    fn service_filters_state_and_derivation_by_policy_context() {
1102        let mut service = InMemoryKernelService::new();
1103        let dsl = r#"
1104schema {
1105  attr task.status: ScalarLWW<String>
1106}
1107
1108predicates {
1109  task_status(Entity, String)
1110  protected_fact(Entity)
1111  visible_task(Entity)
1112}
1113
1114rules {
1115  visible_task(t) <- task_status(t, "ready")
1116  visible_task(t) <- protected_fact(t)
1117}
1118
1119materialize {
1120  visible_task
1121}
1122
1123facts {
1124  protected_fact(entity(1))
1125  protected_fact(entity(2)) @capability("executor")
1126}
1127
1128query current_cut {
1129  current
1130  goal visible_task(t)
1131  keep t
1132}
1133"#;
1134
1135        let parsed = service
1136            .parse_document(ParseDocumentRequest { dsl: dsl.into() })
1137            .expect("parse policy document");
1138        service
1139            .append(AppendRequest {
1140                datoms: vec![
1141                    status_datom(1, "ready", 1, None),
1142                    status_datom(
1143                        3,
1144                        "ready",
1145                        2,
1146                        Some(PolicyEnvelope {
1147                            capabilities: vec!["executor".into()],
1148                            visibilities: Vec::new(),
1149                        }),
1150                    ),
1151                ],
1152            })
1153            .expect("append policy datoms");
1154
1155        let default_state = service
1156            .current_state(CurrentStateRequest {
1157                schema: parsed.schema.clone(),
1158                datoms: Vec::new(),
1159                policy_context: None,
1160            })
1161            .expect("resolve default state");
1162        assert_eq!(default_state.state.entities.len(), 1);
1163
1164        let executor_state = service
1165            .current_state(CurrentStateRequest {
1166                schema: parsed.schema.clone(),
1167                datoms: Vec::new(),
1168                policy_context: Some(PolicyContext {
1169                    capabilities: vec!["executor".into()],
1170                    visibilities: Vec::new(),
1171                }),
1172            })
1173            .expect("resolve executor state");
1174        assert_eq!(executor_state.state.entities.len(), 2);
1175
1176        let default_result = service
1177            .run_document(RunDocumentRequest {
1178                dsl: dsl.into(),
1179                policy_context: None,
1180            })
1181            .expect("run default policy document");
1182        assert_eq!(
1183            default_result
1184                .query
1185                .expect("default query result")
1186                .rows
1187                .into_iter()
1188                .map(|row| row.values)
1189                .collect::<Vec<_>>(),
1190            vec![vec![Value::Entity(EntityId::new(1))]]
1191        );
1192
1193        let executor_result = service
1194            .run_document(RunDocumentRequest {
1195                dsl: dsl.into(),
1196                policy_context: Some(PolicyContext {
1197                    capabilities: vec!["executor".into()],
1198                    visibilities: Vec::new(),
1199                }),
1200            })
1201            .expect("run executor policy document");
1202        let executor_rows = executor_result
1203            .query
1204            .as_ref()
1205            .expect("executor query result")
1206            .rows
1207            .clone();
1208        assert_eq!(
1209            executor_rows
1210                .into_iter()
1211                .map(|row| row.values)
1212                .collect::<Vec<_>>(),
1213            vec![
1214                vec![Value::Entity(EntityId::new(1))],
1215                vec![Value::Entity(EntityId::new(2))],
1216                vec![Value::Entity(EntityId::new(3))],
1217            ]
1218        );
1219
1220        let protected_tuple = executor_result
1221            .query
1222            .as_ref()
1223            .expect("executor query result")
1224            .rows
1225            .iter()
1226            .find(|row| row.values == vec![Value::Entity(EntityId::new(3))])
1227            .and_then(|row| row.tuple_id)
1228            .expect("protected tuple id");
1229        let mismatch = service
1230            .explain_tuple(ExplainTupleRequest {
1231                tuple_id: protected_tuple,
1232                policy_context: None,
1233            })
1234            .expect_err("explain should reject mismatched policy context");
1235        assert!(matches!(
1236            mismatch,
1237            ApiError::Validation(message)
1238                if message == "requested tuple is not visible under the current policy"
1239        ));
1240        let executor_trace = service
1241            .explain_tuple(ExplainTupleRequest {
1242                tuple_id: protected_tuple,
1243                policy_context: Some(PolicyContext {
1244                    capabilities: vec!["executor".into()],
1245                    visibilities: Vec::new(),
1246                }),
1247            })
1248            .expect("explain protected tuple with matching policy")
1249            .trace;
1250        assert!(!executor_trace.tuples.is_empty());
1251    }
1252
1253    #[test]
1254    fn service_rejects_hidden_as_of_cuts_under_policy() {
1255        let mut service = InMemoryKernelService::new();
1256        let parsed = service
1257            .parse_document(ParseDocumentRequest {
1258                dsl: transitive_document_dsl(),
1259            })
1260            .expect("parse transitive document");
1261        service
1262            .append(AppendRequest {
1263                datoms: vec![dependency_datom(1, 2, 1), {
1264                    let mut datom = dependency_datom(2, 3, 2);
1265                    datom.policy = Some(PolicyEnvelope {
1266                        capabilities: vec!["executor".into()],
1267                        visibilities: Vec::new(),
1268                    });
1269                    datom
1270                }],
1271            })
1272            .expect("append mixed-visibility chain");
1273
1274        let hidden_as_of = service.as_of(AsOfRequest {
1275            schema: parsed.schema.clone(),
1276            datoms: Vec::new(),
1277            at: ElementId::new(2),
1278            policy_context: None,
1279        });
1280        assert!(matches!(
1281            hidden_as_of,
1282            Err(ApiError::Validation(message)) if message == "unknown element 2"
1283        ));
1284
1285        let visible_as_of = service
1286            .as_of(AsOfRequest {
1287                schema: parsed.schema,
1288                datoms: Vec::new(),
1289                at: ElementId::new(2),
1290                policy_context: Some(PolicyContext {
1291                    capabilities: vec!["executor".into()],
1292                    visibilities: Vec::new(),
1293                }),
1294            })
1295            .expect("authorized as_of should succeed");
1296        assert_eq!(visible_as_of.state.as_of, Some(ElementId::new(2)));
1297    }
1298
1299    fn transitive_document_dsl() -> String {
1300        r#"
1301schema {
1302  attr task.depends_on: RefSet<Entity>
1303}
1304
1305predicates {
1306  task_depends_on(Entity, Entity)
1307  depends_transitive(Entity, Entity)
1308}
1309
1310rules {
1311  depends_transitive(x, y) <- task_depends_on(x, y)
1312  depends_transitive(x, z) <- depends_transitive(x, y), task_depends_on(y, z)
1313}
1314
1315materialize {
1316  depends_transitive
1317}
1318
1319query first_cut {
1320  as_of e1
1321  goal depends_transitive(entity(1), y)
1322  keep y
1323}
1324
1325query current_cut {
1326  current
1327  goal depends_transitive(entity(1), y)
1328  keep y
1329}
1330
1331explain current_path {
1332  tuple depends_transitive(entity(1), entity(3))
1333}
1334
1335explain plan_shape {
1336  plan
1337}
1338"#
1339        .into()
1340    }
1341
1342    fn dependency_datom(entity: u64, value: u64, element: u64) -> aether_ast::Datom {
1343        aether_ast::Datom {
1344            entity: EntityId::new(entity),
1345            attribute: aether_ast::AttributeId::new(1),
1346            value: Value::Entity(EntityId::new(value)),
1347            op: aether_ast::OperationKind::Add,
1348            element: ElementId::new(element),
1349            replica: aether_ast::ReplicaId::new(1),
1350            causal_context: Default::default(),
1351            provenance: aether_ast::DatomProvenance::default(),
1352            policy: None,
1353        }
1354    }
1355
1356    fn status_datom(
1357        entity: u64,
1358        status: &str,
1359        element: u64,
1360        policy: Option<PolicyEnvelope>,
1361    ) -> aether_ast::Datom {
1362        aether_ast::Datom {
1363            entity: EntityId::new(entity),
1364            attribute: aether_ast::AttributeId::new(1),
1365            value: Value::String(status.into()),
1366            op: aether_ast::OperationKind::Assert,
1367            element: ElementId::new(element),
1368            replica: aether_ast::ReplicaId::new(1),
1369            causal_context: Default::default(),
1370            provenance: aether_ast::DatomProvenance::default(),
1371            policy,
1372        }
1373    }
1374}