1use aether_ast::{
2 policy_allows, Datom, DerivationTrace, ElementId, ExplainSpec, ExplainTarget, ExtensionalFact,
3 NamedExplainSpec, NamedQuerySpec, PhaseGraph, PlanExplanation, PolicyContext, QueryResult,
4 QuerySpec, RuleProgram, TemporalView, Term, TupleId,
5};
6use aether_explain::{ExplainError, Explainer, InMemoryExplainer};
7use aether_plan::CompiledProgram;
8use aether_resolver::{MaterializedResolver, ResolveError, ResolvedState, ResolvedValue, Resolver};
9use aether_rules::{DefaultDslParser, DefaultRuleCompiler, DslParser, ParseError, RuleCompiler};
10use aether_runtime::{execute_query, DerivedSet, RuleRuntime, RuntimeError, SemiNaiveRuntime};
11use aether_schema::Schema;
12use aether_storage::{InMemoryJournal, Journal, JournalError, SqliteJournal};
13use serde::{Deserialize, Serialize};
14use std::path::Path;
15use thiserror::Error;
16
17pub mod deployment;
18pub mod http;
19pub mod partitioned;
20#[doc(hidden)]
21pub mod perf;
22pub mod pilot;
23pub mod report;
24pub mod sidecar;
25pub mod status;
26
27pub use deployment::{
28 default_audit_log_path, serve_pilot_http_service, DeploymentError, PilotAuthConfig,
29 PilotServiceConfig, PilotTokenConfig, ResolvedPilotServiceConfig, ResolvedPilotTokenSummary,
30};
31pub use http::{
32 http_router, http_router_with_options, http_router_with_partitioned_options, AuditContext,
33 AuditEntry, AuditLogResponse, AuthScope, HealthResponse, HttpAccessToken, HttpAuthConfig,
34 HttpKernelOptions, HttpKernelState,
35};
36pub use partitioned::{
37 render_federated_explain_report_markdown, AuthorityPartitionConfig, FederatedExplainReport,
38 FederatedHistoryRequest, FederatedHistoryResponse, FederatedImportedSourceSummary,
39 FederatedNamedQuerySummary, FederatedReportRow, FederatedRunDocumentRequest,
40 FederatedRunDocumentResponse, FederatedTraceSummary, FederatedTraceTupleSummary,
41 ImportedFactQueryRequest, ImportedFactQueryResponse, LeaderEpoch, PartitionAppendRequest,
42 PartitionAppendResponse, PartitionHistoryRequest, PartitionHistoryResponse,
43 PartitionStateRequest, PartitionStateResponse, PartitionStatus, PartitionStatusResponse,
44 PartitionedInMemoryKernelService, PromoteReplicaRequest, PromoteReplicaResponse, ReplicaConfig,
45 ReplicaRole, ReplicaStatus, ReplicatedAuthorityPartitionService,
46 SqlitePartitionedKernelService,
47};
48pub use pilot::{
49 coordination_pilot_dsl, coordination_pilot_seed_history,
50 COORDINATION_PILOT_AUTHORIZED_AS_OF_ELEMENT, COORDINATION_PILOT_PRE_HEARTBEAT_ELEMENT,
51};
52pub use report::{
53 build_coordination_delta_report, build_coordination_pilot_report,
54 build_coordination_pilot_report_with_policy, render_coordination_delta_report_markdown,
55 render_coordination_pilot_report_markdown, CoordinationCut, CoordinationDeltaReport,
56 CoordinationDeltaReportRequest, CoordinationPilotReport, CoordinationTraceHandle, ReportRow,
57 ReportRowChange, ReportRowDiff, ReportSectionDelta, TraceSummary, TraceTupleSummary,
58};
59pub use sidecar::{
60 ArtifactReference, GetArtifactReferenceRequest, GetArtifactReferenceResponse,
61 InMemorySidecarFederation, JournalCatalog, RegisterArtifactReferenceRequest,
62 RegisterArtifactReferenceResponse, RegisterVectorRecordRequest, RegisterVectorRecordResponse,
63 SearchVectorsRequest, SearchVectorsResponse, SidecarError, SidecarFederation,
64 SqliteSidecarFederation, VectorFactProjection, VectorMetric, VectorRecordMetadata,
65 VectorSearchMatch,
66};
67pub use status::{
68 AuthReloadResponse, PrincipalStatusSummary, ReplicaStatusSummary, ServiceMode,
69 ServiceStatusResponse, ServiceStatusStorage,
70};
71
72pub trait KernelService {
73 fn append(&mut self, request: AppendRequest) -> Result<AppendResponse, ApiError>;
74 fn history(&self, request: HistoryRequest) -> Result<HistoryResponse, ApiError>;
75 fn current_state(&self, request: CurrentStateRequest)
76 -> Result<CurrentStateResponse, ApiError>;
77 fn as_of(&self, request: AsOfRequest) -> Result<AsOfResponse, ApiError>;
78 fn compile_program(
79 &self,
80 request: CompileProgramRequest,
81 ) -> Result<CompileProgramResponse, ApiError>;
82 fn evaluate_program(
83 &mut self,
84 request: EvaluateProgramRequest,
85 ) -> Result<EvaluateProgramResponse, ApiError>;
86 fn explain_tuple(&self, request: ExplainTupleRequest)
87 -> Result<ExplainTupleResponse, ApiError>;
88 fn explain_plan(&self, request: ExplainPlanRequest) -> Result<ExplainPlanResponse, ApiError>;
89 fn parse_document(
90 &self,
91 request: ParseDocumentRequest,
92 ) -> Result<ParseDocumentResponse, ApiError>;
93 fn run_document(
94 &mut self,
95 request: RunDocumentRequest,
96 ) -> Result<RunDocumentResponse, ApiError>;
97 fn coordination_pilot_report(
98 &mut self,
99 request: CoordinationPilotReportRequest,
100 ) -> Result<CoordinationPilotReport, ApiError>;
101 fn coordination_delta_report(
102 &mut self,
103 request: CoordinationDeltaReportRequest,
104 ) -> Result<CoordinationDeltaReport, ApiError>;
105 fn register_artifact_reference(
106 &mut self,
107 request: RegisterArtifactReferenceRequest,
108 ) -> Result<RegisterArtifactReferenceResponse, ApiError>;
109 fn get_artifact_reference(
110 &self,
111 request: GetArtifactReferenceRequest,
112 ) -> Result<GetArtifactReferenceResponse, ApiError>;
113 fn register_vector_record(
114 &mut self,
115 request: RegisterVectorRecordRequest,
116 ) -> Result<RegisterVectorRecordResponse, ApiError>;
117 fn search_vectors(
118 &self,
119 request: SearchVectorsRequest,
120 ) -> Result<SearchVectorsResponse, ApiError>;
121}
122
123pub type InMemoryKernelService = KernelServiceCore<InMemoryJournal, InMemorySidecarFederation>;
124pub type SqliteKernelService = KernelServiceCore<SqliteJournal, SqliteSidecarFederation>;
125
126#[derive(Debug)]
127pub struct KernelServiceCore<J: Journal, S: SidecarFederation = InMemorySidecarFederation> {
128 journal: J,
129 sidecars: S,
130 last_derived: Option<CachedDerivedSet>,
131}
132
133impl KernelServiceCore<InMemoryJournal, InMemorySidecarFederation> {
134 pub fn new() -> Self {
135 Self::from_journal(InMemoryJournal::new())
136 }
137}
138
139impl Default for KernelServiceCore<InMemoryJournal, InMemorySidecarFederation> {
140 fn default() -> Self {
141 Self::new()
142 }
143}
144
145impl KernelServiceCore<SqliteJournal, SqliteSidecarFederation> {
146 pub fn open(path: impl AsRef<Path>) -> Result<Self, ApiError> {
147 let path = path.as_ref();
148 Ok(Self::from_parts(
149 SqliteJournal::open(path)?,
150 SqliteSidecarFederation::open(sidecar::sidecar_catalog_path_for_journal(path))?,
151 ))
152 }
153}
154
155impl<J: Journal, S: SidecarFederation> KernelServiceCore<J, S> {
156 pub fn from_parts(journal: J, sidecars: S) -> Self {
157 Self {
158 journal,
159 sidecars,
160 last_derived: None,
161 }
162 }
163
164 fn datoms_or_history(&self, datoms: &[Datom]) -> Result<Vec<Datom>, ApiError> {
165 if datoms.is_empty() {
166 Ok(self.journal.history()?)
167 } else {
168 Ok(datoms.to_vec())
169 }
170 }
171
172 fn visible_history(
173 &self,
174 datoms: &[Datom],
175 policy_context: Option<&PolicyContext>,
176 ) -> Result<Vec<Datom>, ApiError> {
177 Ok(filter_datoms(
178 self.datoms_or_history(datoms)?,
179 policy_context,
180 ))
181 }
182
183 fn cache_derived(&mut self, derived: DerivedSet) {
184 self.last_derived = Some(CachedDerivedSet { derived });
185 }
186
187 fn sidecar_journal_catalog(&self) -> Result<JournalCatalog, ApiError> {
188 Ok(JournalCatalog::from_history(&self.journal.history()?))
189 }
190
191 fn document_evaluation<'a>(
192 &self,
193 cache: &'a mut Vec<DocumentEvaluation>,
194 schema: &Schema,
195 datoms: &[Datom],
196 program: &CompiledProgram,
197 view: &TemporalView,
198 ) -> Result<&'a DocumentEvaluation, ApiError> {
199 if let Some(index) = cache.iter().position(|evaluation| &evaluation.view == view) {
200 return Ok(&cache[index]);
201 }
202
203 let state = match view {
204 TemporalView::AsOf(element) => MaterializedResolver.as_of(schema, datoms, element)?,
205 TemporalView::Current => MaterializedResolver.current(schema, datoms)?,
206 };
207 let derived = SemiNaiveRuntime.evaluate(&state, program)?;
208 cache.push(DocumentEvaluation {
209 view: view.clone(),
210 state,
211 derived,
212 });
213 Ok(cache
214 .last()
215 .expect("evaluation cache contains the inserted view"))
216 }
217}
218
219fn filter_datoms(datoms: Vec<Datom>, policy_context: Option<&PolicyContext>) -> Vec<Datom> {
220 datoms
221 .into_iter()
222 .filter(|datom| policy_allows(policy_context, datom.policy.as_ref()))
223 .collect()
224}
225
226fn filter_extensional_facts(
227 facts: Vec<ExtensionalFact>,
228 policy_context: Option<&PolicyContext>,
229) -> Vec<ExtensionalFact> {
230 facts
231 .into_iter()
232 .filter(|fact| policy_allows(policy_context, fact.policy.as_ref()))
233 .collect()
234}
235
236fn filter_compiled_program(
237 program: &CompiledProgram,
238 policy_context: Option<&PolicyContext>,
239) -> CompiledProgram {
240 let mut filtered = program.clone();
241 filtered.facts = filter_extensional_facts(filtered.facts, policy_context);
242 filtered
243}
244
245fn filter_resolved_state(
246 state: &ResolvedState,
247 policy_context: Option<&PolicyContext>,
248) -> ResolvedState {
249 let mut filtered = ResolvedState {
250 entities: indexmap::IndexMap::new(),
251 as_of: state.as_of,
252 };
253
254 for (entity_id, entity_state) in &state.entities {
255 let mut visible_entity = aether_resolver::EntityState::default();
256 for (attribute_id, value) in &entity_state.attributes {
257 let visible_facts = entity_state
258 .facts(attribute_id)
259 .iter()
260 .filter(|fact| policy_allows(policy_context, fact.policy.as_ref()))
261 .cloned()
262 .collect::<Vec<_>>();
263 if visible_facts.is_empty() {
264 continue;
265 }
266 let visible_value = match value {
267 ResolvedValue::Scalar(_) => {
268 ResolvedValue::Scalar(visible_facts.last().map(|fact| fact.value.clone()))
269 }
270 ResolvedValue::Set(_) => ResolvedValue::Set(
271 visible_facts
272 .iter()
273 .map(|fact| fact.value.clone())
274 .collect::<Vec<_>>(),
275 ),
276 ResolvedValue::Sequence(_) => ResolvedValue::Sequence(
277 visible_facts
278 .iter()
279 .map(|fact| fact.value.clone())
280 .collect::<Vec<_>>(),
281 ),
282 };
283 visible_entity
284 .attributes
285 .insert(*attribute_id, visible_value);
286 visible_entity.facts.insert(*attribute_id, visible_facts);
287 }
288 if !visible_entity.attributes.is_empty() {
289 filtered.entities.insert(*entity_id, visible_entity);
290 }
291 }
292
293 filtered
294}
295
296fn filter_derived_set(derived: &DerivedSet, policy_context: Option<&PolicyContext>) -> DerivedSet {
297 let tuples = derived
298 .tuples
299 .iter()
300 .filter(|tuple| policy_allows(policy_context, tuple.policy.as_ref()))
301 .cloned()
302 .collect::<Vec<_>>();
303 let visible_ids = tuples
304 .iter()
305 .map(|tuple| tuple.tuple.id)
306 .collect::<std::collections::BTreeSet<_>>();
307 let predicate_index = derived
308 .predicate_index
309 .iter()
310 .map(|(predicate, tuple_ids)| {
311 (
312 *predicate,
313 tuple_ids
314 .iter()
315 .copied()
316 .filter(|tuple_id| visible_ids.contains(tuple_id))
317 .collect::<Vec<_>>(),
318 )
319 })
320 .collect();
321
322 DerivedSet {
323 tuples,
324 iterations: derived.iterations.clone(),
325 predicate_index,
326 }
327}
328
329fn filter_trace(
330 trace: DerivationTrace,
331 policy_context: Option<&PolicyContext>,
332) -> Result<DerivationTrace, ApiError> {
333 let tuples = trace
334 .tuples
335 .into_iter()
336 .filter(|tuple| policy_allows(policy_context, tuple.policy.as_ref()))
337 .collect::<Vec<_>>();
338 if tuples.iter().all(|tuple| tuple.tuple.id != trace.root) {
339 return Err(ApiError::Validation(
340 "requested tuple is not visible under the current policy".into(),
341 ));
342 }
343 Ok(DerivationTrace {
344 root: trace.root,
345 tuples,
346 })
347}
348
349fn ensure_visible_element(
350 datoms: &[Datom],
351 at: ElementId,
352 policy_context: Option<&PolicyContext>,
353) -> Result<(), ApiError> {
354 if datoms
355 .iter()
356 .any(|datom| datom.element == at && policy_allows(policy_context, datom.policy.as_ref()))
357 {
358 Ok(())
359 } else {
360 Err(ApiError::Validation(format!("unknown element {}", at.0)))
361 }
362}
363
364impl<J, S> KernelServiceCore<J, S>
365where
366 J: Journal,
367 S: SidecarFederation + Default,
368{
369 pub fn from_journal(journal: J) -> Self {
370 Self::from_parts(journal, S::default())
371 }
372}
373
374#[derive(Clone, Debug)]
375struct DocumentEvaluation {
376 view: TemporalView,
377 state: ResolvedState,
378 derived: DerivedSet,
379}
380
381#[derive(Clone, Debug)]
382struct CachedDerivedSet {
383 derived: DerivedSet,
384}
385
386impl<J: Journal, S: SidecarFederation> KernelService for KernelServiceCore<J, S> {
387 fn append(&mut self, request: AppendRequest) -> Result<AppendResponse, ApiError> {
388 self.journal.append(&request.datoms)?;
389 Ok(AppendResponse {
390 appended: request.datoms.len(),
391 })
392 }
393
394 fn history(&self, request: HistoryRequest) -> Result<HistoryResponse, ApiError> {
395 Ok(HistoryResponse {
396 datoms: self.visible_history(&[], request.policy_context.as_ref())?,
397 })
398 }
399
400 fn current_state(
401 &self,
402 request: CurrentStateRequest,
403 ) -> Result<CurrentStateResponse, ApiError> {
404 let datoms = self.datoms_or_history(&request.datoms)?;
405 Ok(CurrentStateResponse {
406 state: filter_resolved_state(
407 &MaterializedResolver.current(&request.schema, &datoms)?,
408 request.policy_context.as_ref(),
409 ),
410 })
411 }
412
413 fn as_of(&self, request: AsOfRequest) -> Result<AsOfResponse, ApiError> {
414 let datoms = self.datoms_or_history(&request.datoms)?;
415 ensure_visible_element(&datoms, request.at, request.policy_context.as_ref())?;
416 Ok(AsOfResponse {
417 state: filter_resolved_state(
418 &MaterializedResolver.as_of(&request.schema, &datoms, &request.at)?,
419 request.policy_context.as_ref(),
420 ),
421 })
422 }
423
424 fn compile_program(
425 &self,
426 request: CompileProgramRequest,
427 ) -> Result<CompileProgramResponse, ApiError> {
428 Ok(CompileProgramResponse {
429 program: DefaultRuleCompiler.compile(&request.schema, &request.program)?,
430 })
431 }
432
433 fn evaluate_program(
434 &mut self,
435 request: EvaluateProgramRequest,
436 ) -> Result<EvaluateProgramResponse, ApiError> {
437 let derived = SemiNaiveRuntime.evaluate(&request.state, &request.program)?;
438 self.cache_derived(derived.clone());
439 Ok(EvaluateProgramResponse {
440 derived: filter_derived_set(&derived, request.policy_context.as_ref()),
441 })
442 }
443
444 fn explain_tuple(
445 &self,
446 request: ExplainTupleRequest,
447 ) -> Result<ExplainTupleResponse, ApiError> {
448 let cached = self
449 .last_derived
450 .as_ref()
451 .ok_or_else(|| ApiError::Validation("no derived tuples are cached".into()))?;
452 let trace = InMemoryExplainer::from_derived_set(&cached.derived)
453 .explain_tuple(&request.tuple_id)?;
454 Ok(ExplainTupleResponse {
455 trace: filter_trace(trace, request.policy_context.as_ref())?,
456 })
457 }
458
459 fn explain_plan(&self, request: ExplainPlanRequest) -> Result<ExplainPlanResponse, ApiError> {
460 let explanation = InMemoryExplainer::default().explain_plan(&request.plan)?;
461 Ok(ExplainPlanResponse { explanation })
462 }
463
464 fn parse_document(
465 &self,
466 request: ParseDocumentRequest,
467 ) -> Result<ParseDocumentResponse, ApiError> {
468 let document = DefaultDslParser.parse_document(&request.dsl)?;
469 Ok(ParseDocumentResponse {
470 schema: document.schema,
471 program: document.program,
472 query: document.query,
473 queries: document.queries,
474 explains: document.explains,
475 })
476 }
477
478 fn run_document(
479 &mut self,
480 request: RunDocumentRequest,
481 ) -> Result<RunDocumentResponse, ApiError> {
482 let document = DefaultDslParser.parse_document(&request.dsl)?;
483 let datoms = self.datoms_or_history(&[])?;
484 let program = DefaultRuleCompiler.compile(&document.schema, &document.program)?;
485 let mut evaluations = Vec::new();
486 let primary_view = document
487 .query
488 .as_ref()
489 .map(|query| query.view.clone())
490 .or_else(|| {
491 document
492 .queries
493 .first()
494 .map(|query| query.spec.view.clone())
495 })
496 .or_else(|| {
497 document
498 .explains
499 .first()
500 .map(|explain| explain.spec.view.clone())
501 })
502 .unwrap_or(TemporalView::Current);
503 let primary = self.document_evaluation(
504 &mut evaluations,
505 &document.schema,
506 &datoms,
507 &program,
508 &primary_view,
509 )?;
510 let primary_state = primary.state.clone();
511 let primary_derived = primary.derived.clone();
512 let query = match &document.query {
513 Some(query) => Some(execute_query(
514 &primary_state,
515 &program,
516 &primary_derived,
517 &query.query,
518 request.policy_context.as_ref(),
519 )?),
520 None => None,
521 };
522 let queries = document
523 .queries
524 .iter()
525 .map(|named_query| {
526 let evaluation = self.document_evaluation(
527 &mut evaluations,
528 &document.schema,
529 &datoms,
530 &program,
531 &named_query.spec.view,
532 )?;
533 Ok(NamedQueryResult {
534 name: named_query.name.clone(),
535 spec: named_query.spec.clone(),
536 result: execute_query(
537 &evaluation.state,
538 &program,
539 &evaluation.derived,
540 &named_query.spec.query,
541 request.policy_context.as_ref(),
542 )?,
543 })
544 })
545 .collect::<Result<Vec<_>, ApiError>>()?;
546 let explains = document
547 .explains
548 .iter()
549 .map(|named_explain| {
550 let evaluation = self.document_evaluation(
551 &mut evaluations,
552 &document.schema,
553 &datoms,
554 &program,
555 &named_explain.spec.view,
556 )?;
557 Ok(NamedExplainResult {
558 name: named_explain.name.clone(),
559 spec: named_explain.spec.clone(),
560 result: execute_explain_spec(
561 &program,
562 evaluation,
563 &named_explain.spec,
564 request.policy_context.as_ref(),
565 )?,
566 })
567 })
568 .collect::<Result<Vec<_>, ApiError>>()?;
569 self.cache_derived(primary_derived.clone());
570 let derived = filter_derived_set(&primary_derived, request.policy_context.as_ref());
571
572 Ok(RunDocumentResponse {
573 state: filter_resolved_state(&primary_state, request.policy_context.as_ref()),
574 program: filter_compiled_program(&program, request.policy_context.as_ref()),
575 derived,
576 query,
577 queries,
578 explains,
579 })
580 }
581
582 fn coordination_pilot_report(
583 &mut self,
584 request: CoordinationPilotReportRequest,
585 ) -> Result<CoordinationPilotReport, ApiError> {
586 build_coordination_pilot_report_with_policy(self, request.policy_context)
587 }
588
589 fn coordination_delta_report(
590 &mut self,
591 request: CoordinationDeltaReportRequest,
592 ) -> Result<CoordinationDeltaReport, ApiError> {
593 report::build_coordination_delta_report(self, request)
594 }
595
596 fn register_artifact_reference(
597 &mut self,
598 request: RegisterArtifactReferenceRequest,
599 ) -> Result<RegisterArtifactReferenceResponse, ApiError> {
600 let journal = self.sidecar_journal_catalog()?;
601 Ok(self
602 .sidecars
603 .register_artifact_reference(request, &journal)?)
604 }
605
606 fn get_artifact_reference(
607 &self,
608 request: GetArtifactReferenceRequest,
609 ) -> Result<GetArtifactReferenceResponse, ApiError> {
610 Ok(self.sidecars.get_artifact_reference(request)?)
611 }
612
613 fn register_vector_record(
614 &mut self,
615 request: RegisterVectorRecordRequest,
616 ) -> Result<RegisterVectorRecordResponse, ApiError> {
617 let journal = self.sidecar_journal_catalog()?;
618 Ok(self.sidecars.register_vector_record(request, &journal)?)
619 }
620
621 fn search_vectors(
622 &self,
623 request: SearchVectorsRequest,
624 ) -> Result<SearchVectorsResponse, ApiError> {
625 let journal = self.sidecar_journal_catalog()?;
626 Ok(self.sidecars.search_vectors(request, &journal)?)
627 }
628}
629
630#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
631pub struct AppendRequest {
632 pub datoms: Vec<Datom>,
633}
634
635#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
636pub struct AppendResponse {
637 pub appended: usize,
638}
639
640#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
641pub struct HistoryRequest {
642 #[serde(default, skip_serializing_if = "Option::is_none")]
643 pub policy_context: Option<PolicyContext>,
644}
645
646#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
647pub struct HistoryResponse {
648 pub datoms: Vec<Datom>,
649}
650
651#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
652pub struct CurrentStateRequest {
653 pub schema: Schema,
654 pub datoms: Vec<Datom>,
655 #[serde(default, skip_serializing_if = "Option::is_none")]
656 pub policy_context: Option<PolicyContext>,
657}
658
659#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
660pub struct CurrentStateResponse {
661 pub state: ResolvedState,
662}
663
664#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
665pub struct AsOfRequest {
666 pub schema: Schema,
667 pub datoms: Vec<Datom>,
668 pub at: ElementId,
669 #[serde(default, skip_serializing_if = "Option::is_none")]
670 pub policy_context: Option<PolicyContext>,
671}
672
673#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
674pub struct AsOfResponse {
675 pub state: ResolvedState,
676}
677
678#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
679pub struct CompileProgramRequest {
680 pub schema: Schema,
681 pub program: RuleProgram,
682}
683
684#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
685pub struct CompileProgramResponse {
686 pub program: CompiledProgram,
687}
688
689#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
690pub struct EvaluateProgramRequest {
691 pub state: ResolvedState,
692 pub program: CompiledProgram,
693 #[serde(default, skip_serializing_if = "Option::is_none")]
694 pub policy_context: Option<PolicyContext>,
695}
696
697#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
698pub struct EvaluateProgramResponse {
699 pub derived: DerivedSet,
700}
701
702#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
703pub struct ExplainTupleRequest {
704 pub tuple_id: TupleId,
705 #[serde(default, skip_serializing_if = "Option::is_none")]
706 pub policy_context: Option<PolicyContext>,
707}
708
709#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
710pub struct ExplainTupleResponse {
711 pub trace: DerivationTrace,
712}
713
714#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
715pub struct ExplainPlanRequest {
716 pub plan: PhaseGraph,
717}
718
719#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
720pub struct ExplainPlanResponse {
721 pub explanation: PlanExplanation,
722}
723
724#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
725pub struct ParseDocumentRequest {
726 pub dsl: String,
727}
728
729#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
730pub struct ParseDocumentResponse {
731 pub schema: Schema,
732 pub program: RuleProgram,
733 pub query: Option<QuerySpec>,
734 pub queries: Vec<NamedQuerySpec>,
735 pub explains: Vec<NamedExplainSpec>,
736}
737
738#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
739pub struct RunDocumentRequest {
740 pub dsl: String,
741 #[serde(default, skip_serializing_if = "Option::is_none")]
742 pub policy_context: Option<PolicyContext>,
743}
744
745#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
746pub struct CoordinationPilotReportRequest {
747 #[serde(default, skip_serializing_if = "Option::is_none")]
748 pub policy_context: Option<PolicyContext>,
749}
750
751#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
752pub struct RunDocumentResponse {
753 pub state: ResolvedState,
754 pub program: CompiledProgram,
755 pub derived: DerivedSet,
756 pub query: Option<QueryResult>,
757 pub queries: Vec<NamedQueryResult>,
758 pub explains: Vec<NamedExplainResult>,
759}
760
761#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
762pub struct NamedQueryResult {
763 pub name: Option<String>,
764 pub spec: QuerySpec,
765 pub result: QueryResult,
766}
767
768#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
769pub enum ExplainArtifact {
770 Plan(PlanExplanation),
771 Tuple(DerivationTrace),
772}
773
774impl Default for ExplainArtifact {
775 fn default() -> Self {
776 Self::Plan(PlanExplanation::default())
777 }
778}
779
780#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
781pub struct NamedExplainResult {
782 pub name: Option<String>,
783 pub spec: ExplainSpec,
784 pub result: ExplainArtifact,
785}
786
787fn execute_explain_spec(
788 program: &CompiledProgram,
789 evaluation: &DocumentEvaluation,
790 spec: &ExplainSpec,
791 policy_context: Option<&PolicyContext>,
792) -> Result<ExplainArtifact, ApiError> {
793 match &spec.target {
794 ExplainTarget::Plan => Ok(ExplainArtifact::Plan(
795 InMemoryExplainer::default().explain_plan(&program.phase_graph)?,
796 )),
797 ExplainTarget::Tuple(atom) => {
798 let visible = filter_derived_set(&evaluation.derived, policy_context);
799 let tuple_id = find_matching_derived_tuple(&visible, atom).ok_or_else(|| {
800 ApiError::Validation(format!(
801 "no derived tuple matched explain target {}",
802 atom.predicate.name
803 ))
804 })?;
805 Ok(ExplainArtifact::Tuple(filter_trace(
806 InMemoryExplainer::from_derived_set(&evaluation.derived)
807 .explain_tuple(&tuple_id)?,
808 policy_context,
809 )?))
810 }
811 }
812}
813
814fn find_matching_derived_tuple(derived: &DerivedSet, atom: &aether_ast::Atom) -> Option<TupleId> {
815 derived.tuples.iter().find_map(|tuple| {
816 if tuple.tuple.predicate != atom.predicate.id
817 || tuple.tuple.values.len() != atom.terms.len()
818 {
819 return None;
820 }
821 let matches = atom
822 .terms
823 .iter()
824 .zip(&tuple.tuple.values)
825 .all(|(term, value)| match term {
826 Term::Value(expected) => expected == value,
827 Term::Variable(_) | Term::Aggregate(_) => false,
828 });
829 matches.then_some(tuple.tuple.id)
830 })
831}
832
833#[derive(Debug, Error)]
834pub enum ApiError {
835 #[error("validation error: {0}")]
836 Validation(String),
837 #[error(transparent)]
838 Journal(#[from] JournalError),
839 #[error(transparent)]
840 Sidecar(#[from] SidecarError),
841 #[error(transparent)]
842 Resolve(#[from] ResolveError),
843 #[error(transparent)]
844 Parse(#[from] ParseError),
845 #[error(transparent)]
846 Compile(#[from] aether_rules::CompileError),
847 #[error(transparent)]
848 Runtime(#[from] RuntimeError),
849 #[error(transparent)]
850 Explain(#[from] ExplainError),
851}
852
853#[cfg(test)]
854mod tests {
855 use super::{
856 coordination_pilot_dsl, coordination_pilot_seed_history, ApiError, AppendRequest,
857 AsOfRequest, CurrentStateRequest, ExplainArtifact, ExplainTupleRequest,
858 InMemoryKernelService, KernelService, ParseDocumentRequest, RunDocumentRequest,
859 COORDINATION_PILOT_AUTHORIZED_AS_OF_ELEMENT, COORDINATION_PILOT_PRE_HEARTBEAT_ELEMENT,
860 };
861 use aether_ast::{ElementId, EntityId, PolicyContext, PolicyEnvelope, Value};
862
863 #[test]
864 fn service_models_multi_worker_lease_handoff_and_fencing() {
865 let mut service = InMemoryKernelService::new();
866 service
867 .append(AppendRequest {
868 datoms: coordination_pilot_seed_history(),
869 })
870 .expect("append journal");
871
872 let parsed = service
873 .parse_document(ParseDocumentRequest {
874 dsl: coordination_pilot_dsl(
875 &format!("as_of e{}", COORDINATION_PILOT_AUTHORIZED_AS_OF_ELEMENT),
876 "goal execution_authorized(t, worker, epoch)\n keep t, worker, epoch",
877 ),
878 })
879 .expect("parse coordination document");
880 assert_eq!(parsed.program.facts.len(), 7);
881
882 let pre_heartbeat_authorized = service
883 .run_document(RunDocumentRequest {
884 dsl: coordination_pilot_dsl(
885 &format!("as_of e{}", COORDINATION_PILOT_PRE_HEARTBEAT_ELEMENT),
886 "goal execution_authorized(t, worker, epoch)\n keep t, worker, epoch",
887 ),
888 policy_context: None,
889 })
890 .expect("run pre-heartbeat authorization document");
891 assert_eq!(
892 pre_heartbeat_authorized.state.as_of,
893 Some(ElementId::new(COORDINATION_PILOT_PRE_HEARTBEAT_ELEMENT))
894 );
895 assert!(pre_heartbeat_authorized
896 .query
897 .as_ref()
898 .expect("query result should exist")
899 .rows
900 .is_empty());
901
902 let as_of_authorized = service
903 .run_document(RunDocumentRequest {
904 dsl: coordination_pilot_dsl(
905 &format!("as_of e{}", COORDINATION_PILOT_AUTHORIZED_AS_OF_ELEMENT),
906 "goal execution_authorized(t, worker, epoch)\n keep t, worker, epoch",
907 ),
908 policy_context: None,
909 })
910 .expect("run as_of authorization document");
911 assert_eq!(
912 as_of_authorized.state.as_of,
913 Some(ElementId::new(COORDINATION_PILOT_AUTHORIZED_AS_OF_ELEMENT))
914 );
915 let as_of_authorized_rows = &as_of_authorized
916 .query
917 .as_ref()
918 .expect("query result should exist")
919 .rows;
920 assert_eq!(as_of_authorized_rows.len(), 1);
921 assert_eq!(
922 as_of_authorized_rows[0].values,
923 vec![
924 Value::Entity(EntityId::new(1)),
925 Value::String("worker-a".into()),
926 Value::U64(1),
927 ]
928 );
929
930 let current_authorized = service
931 .run_document(RunDocumentRequest {
932 dsl: coordination_pilot_dsl(
933 "current",
934 "goal execution_authorized(t, worker, epoch)\n keep t, worker, epoch",
935 ),
936 policy_context: None,
937 })
938 .expect("run current authorization document");
939 let authorized_rows = ¤t_authorized
940 .query
941 .as_ref()
942 .expect("query result should exist")
943 .rows;
944 assert_eq!(authorized_rows.len(), 1);
945 assert_eq!(
946 authorized_rows[0].values,
947 vec![
948 Value::Entity(EntityId::new(1)),
949 Value::String("worker-b".into()),
950 Value::U64(2),
951 ]
952 );
953 let trace = service
954 .explain_tuple(ExplainTupleRequest {
955 tuple_id: authorized_rows[0]
956 .tuple_id
957 .expect("execution_authorized tuple id"),
958 policy_context: None,
959 })
960 .expect("explain authorization tuple")
961 .trace;
962 assert!(!trace.tuples.is_empty());
963
964 let claimable = service
965 .run_document(RunDocumentRequest {
966 dsl: coordination_pilot_dsl(
967 "current",
968 "goal worker_can_claim(t, worker)\n keep t, worker",
969 ),
970 policy_context: None,
971 })
972 .expect("run claimability document");
973 let claimable_rows = &claimable
974 .query
975 .as_ref()
976 .expect("query result should exist")
977 .rows;
978 assert_eq!(claimable_rows.len(), 2);
979 assert_eq!(
980 claimable_rows
981 .iter()
982 .map(|row| row.values.clone())
983 .collect::<Vec<_>>(),
984 vec![
985 vec![
986 Value::Entity(EntityId::new(3)),
987 Value::String("worker-a".into()),
988 ],
989 vec![
990 Value::Entity(EntityId::new(3)),
991 Value::String("worker-b".into()),
992 ],
993 ]
994 );
995
996 let accepted_outcomes = service
997 .run_document(RunDocumentRequest {
998 dsl: coordination_pilot_dsl(
999 "current",
1000 "goal execution_outcome_accepted(t, worker, epoch, status, detail)\n keep t, worker, epoch, status, detail",
1001 ),
1002 policy_context: None,
1003 })
1004 .expect("run accepted-outcome document");
1005 let accepted_rows = &accepted_outcomes
1006 .query
1007 .as_ref()
1008 .expect("query result should exist")
1009 .rows;
1010 assert_eq!(
1011 accepted_rows[0].values,
1012 vec![
1013 Value::Entity(EntityId::new(1)),
1014 Value::String("worker-b".into()),
1015 Value::U64(2),
1016 Value::String("completed".into()),
1017 Value::String("current-worker-b".into()),
1018 ]
1019 );
1020
1021 let rejected_outcomes = service
1022 .run_document(RunDocumentRequest {
1023 dsl: coordination_pilot_dsl(
1024 "current",
1025 "goal execution_outcome_rejected_stale(t, worker, epoch, status, detail)\n keep t, worker, epoch, status, detail",
1026 ),
1027 policy_context: None,
1028 })
1029 .expect("run rejected-outcome document");
1030 let rejected_rows = &rejected_outcomes
1031 .query
1032 .as_ref()
1033 .expect("query result should exist")
1034 .rows;
1035 assert_eq!(
1036 rejected_rows[0].values,
1037 vec![
1038 Value::Entity(EntityId::new(1)),
1039 Value::String("worker-a".into()),
1040 Value::U64(1),
1041 Value::String("completed".into()),
1042 Value::String("stale-worker-a".into()),
1043 ]
1044 );
1045 }
1046
1047 #[test]
1048 fn service_parses_and_runs_named_queries_and_explain_directives() {
1049 let mut service = InMemoryKernelService::new();
1050 service
1051 .append(AppendRequest {
1052 datoms: vec![dependency_datom(1, 2, 1), dependency_datom(2, 3, 2)],
1053 })
1054 .expect("append transitive chain");
1055
1056 let parsed = service
1057 .parse_document(ParseDocumentRequest {
1058 dsl: transitive_document_dsl(),
1059 })
1060 .expect("parse transitive document");
1061 assert_eq!(parsed.query, Some(parsed.queries[0].spec.clone()));
1062 assert_eq!(parsed.queries.len(), 2);
1063 assert_eq!(parsed.explains.len(), 2);
1064
1065 let response = service
1066 .run_document(RunDocumentRequest {
1067 dsl: transitive_document_dsl(),
1068 policy_context: None,
1069 })
1070 .expect("run named-query document");
1071 assert_eq!(response.query, Some(response.queries[0].result.clone()));
1072 assert_eq!(response.queries.len(), 2);
1073 assert_eq!(response.explains.len(), 2);
1074 assert_eq!(
1075 response.queries[0].result.rows[0].values,
1076 vec![Value::Entity(EntityId::new(2))]
1077 );
1078 assert_eq!(
1079 response.queries[1]
1080 .result
1081 .rows
1082 .iter()
1083 .map(|row| row.values.clone())
1084 .collect::<Vec<_>>(),
1085 vec![
1086 vec![Value::Entity(EntityId::new(2))],
1087 vec![Value::Entity(EntityId::new(3))],
1088 ]
1089 );
1090 assert!(matches!(
1091 &response.explains[0].result,
1092 ExplainArtifact::Tuple(trace) if !trace.tuples.is_empty()
1093 ));
1094 assert!(matches!(
1095 &response.explains[1].result,
1096 ExplainArtifact::Plan(explanation) if !explanation.phase_graph.nodes.is_empty()
1097 ));
1098 }
1099
1100 #[test]
1101 fn service_filters_state_and_derivation_by_policy_context() {
1102 let mut service = InMemoryKernelService::new();
1103 let dsl = r#"
1104schema {
1105 attr task.status: ScalarLWW<String>
1106}
1107
1108predicates {
1109 task_status(Entity, String)
1110 protected_fact(Entity)
1111 visible_task(Entity)
1112}
1113
1114rules {
1115 visible_task(t) <- task_status(t, "ready")
1116 visible_task(t) <- protected_fact(t)
1117}
1118
1119materialize {
1120 visible_task
1121}
1122
1123facts {
1124 protected_fact(entity(1))
1125 protected_fact(entity(2)) @capability("executor")
1126}
1127
1128query current_cut {
1129 current
1130 goal visible_task(t)
1131 keep t
1132}
1133"#;
1134
1135 let parsed = service
1136 .parse_document(ParseDocumentRequest { dsl: dsl.into() })
1137 .expect("parse policy document");
1138 service
1139 .append(AppendRequest {
1140 datoms: vec![
1141 status_datom(1, "ready", 1, None),
1142 status_datom(
1143 3,
1144 "ready",
1145 2,
1146 Some(PolicyEnvelope {
1147 capabilities: vec!["executor".into()],
1148 visibilities: Vec::new(),
1149 }),
1150 ),
1151 ],
1152 })
1153 .expect("append policy datoms");
1154
1155 let default_state = service
1156 .current_state(CurrentStateRequest {
1157 schema: parsed.schema.clone(),
1158 datoms: Vec::new(),
1159 policy_context: None,
1160 })
1161 .expect("resolve default state");
1162 assert_eq!(default_state.state.entities.len(), 1);
1163
1164 let executor_state = service
1165 .current_state(CurrentStateRequest {
1166 schema: parsed.schema.clone(),
1167 datoms: Vec::new(),
1168 policy_context: Some(PolicyContext {
1169 capabilities: vec!["executor".into()],
1170 visibilities: Vec::new(),
1171 }),
1172 })
1173 .expect("resolve executor state");
1174 assert_eq!(executor_state.state.entities.len(), 2);
1175
1176 let default_result = service
1177 .run_document(RunDocumentRequest {
1178 dsl: dsl.into(),
1179 policy_context: None,
1180 })
1181 .expect("run default policy document");
1182 assert_eq!(
1183 default_result
1184 .query
1185 .expect("default query result")
1186 .rows
1187 .into_iter()
1188 .map(|row| row.values)
1189 .collect::<Vec<_>>(),
1190 vec![vec![Value::Entity(EntityId::new(1))]]
1191 );
1192
1193 let executor_result = service
1194 .run_document(RunDocumentRequest {
1195 dsl: dsl.into(),
1196 policy_context: Some(PolicyContext {
1197 capabilities: vec!["executor".into()],
1198 visibilities: Vec::new(),
1199 }),
1200 })
1201 .expect("run executor policy document");
1202 let executor_rows = executor_result
1203 .query
1204 .as_ref()
1205 .expect("executor query result")
1206 .rows
1207 .clone();
1208 assert_eq!(
1209 executor_rows
1210 .into_iter()
1211 .map(|row| row.values)
1212 .collect::<Vec<_>>(),
1213 vec![
1214 vec![Value::Entity(EntityId::new(1))],
1215 vec![Value::Entity(EntityId::new(2))],
1216 vec![Value::Entity(EntityId::new(3))],
1217 ]
1218 );
1219
1220 let protected_tuple = executor_result
1221 .query
1222 .as_ref()
1223 .expect("executor query result")
1224 .rows
1225 .iter()
1226 .find(|row| row.values == vec![Value::Entity(EntityId::new(3))])
1227 .and_then(|row| row.tuple_id)
1228 .expect("protected tuple id");
1229 let mismatch = service
1230 .explain_tuple(ExplainTupleRequest {
1231 tuple_id: protected_tuple,
1232 policy_context: None,
1233 })
1234 .expect_err("explain should reject mismatched policy context");
1235 assert!(matches!(
1236 mismatch,
1237 ApiError::Validation(message)
1238 if message == "requested tuple is not visible under the current policy"
1239 ));
1240 let executor_trace = service
1241 .explain_tuple(ExplainTupleRequest {
1242 tuple_id: protected_tuple,
1243 policy_context: Some(PolicyContext {
1244 capabilities: vec!["executor".into()],
1245 visibilities: Vec::new(),
1246 }),
1247 })
1248 .expect("explain protected tuple with matching policy")
1249 .trace;
1250 assert!(!executor_trace.tuples.is_empty());
1251 }
1252
1253 #[test]
1254 fn service_rejects_hidden_as_of_cuts_under_policy() {
1255 let mut service = InMemoryKernelService::new();
1256 let parsed = service
1257 .parse_document(ParseDocumentRequest {
1258 dsl: transitive_document_dsl(),
1259 })
1260 .expect("parse transitive document");
1261 service
1262 .append(AppendRequest {
1263 datoms: vec![dependency_datom(1, 2, 1), {
1264 let mut datom = dependency_datom(2, 3, 2);
1265 datom.policy = Some(PolicyEnvelope {
1266 capabilities: vec!["executor".into()],
1267 visibilities: Vec::new(),
1268 });
1269 datom
1270 }],
1271 })
1272 .expect("append mixed-visibility chain");
1273
1274 let hidden_as_of = service.as_of(AsOfRequest {
1275 schema: parsed.schema.clone(),
1276 datoms: Vec::new(),
1277 at: ElementId::new(2),
1278 policy_context: None,
1279 });
1280 assert!(matches!(
1281 hidden_as_of,
1282 Err(ApiError::Validation(message)) if message == "unknown element 2"
1283 ));
1284
1285 let visible_as_of = service
1286 .as_of(AsOfRequest {
1287 schema: parsed.schema,
1288 datoms: Vec::new(),
1289 at: ElementId::new(2),
1290 policy_context: Some(PolicyContext {
1291 capabilities: vec!["executor".into()],
1292 visibilities: Vec::new(),
1293 }),
1294 })
1295 .expect("authorized as_of should succeed");
1296 assert_eq!(visible_as_of.state.as_of, Some(ElementId::new(2)));
1297 }
1298
1299 fn transitive_document_dsl() -> String {
1300 r#"
1301schema {
1302 attr task.depends_on: RefSet<Entity>
1303}
1304
1305predicates {
1306 task_depends_on(Entity, Entity)
1307 depends_transitive(Entity, Entity)
1308}
1309
1310rules {
1311 depends_transitive(x, y) <- task_depends_on(x, y)
1312 depends_transitive(x, z) <- depends_transitive(x, y), task_depends_on(y, z)
1313}
1314
1315materialize {
1316 depends_transitive
1317}
1318
1319query first_cut {
1320 as_of e1
1321 goal depends_transitive(entity(1), y)
1322 keep y
1323}
1324
1325query current_cut {
1326 current
1327 goal depends_transitive(entity(1), y)
1328 keep y
1329}
1330
1331explain current_path {
1332 tuple depends_transitive(entity(1), entity(3))
1333}
1334
1335explain plan_shape {
1336 plan
1337}
1338"#
1339 .into()
1340 }
1341
1342 fn dependency_datom(entity: u64, value: u64, element: u64) -> aether_ast::Datom {
1343 aether_ast::Datom {
1344 entity: EntityId::new(entity),
1345 attribute: aether_ast::AttributeId::new(1),
1346 value: Value::Entity(EntityId::new(value)),
1347 op: aether_ast::OperationKind::Add,
1348 element: ElementId::new(element),
1349 replica: aether_ast::ReplicaId::new(1),
1350 causal_context: Default::default(),
1351 provenance: aether_ast::DatomProvenance::default(),
1352 policy: None,
1353 }
1354 }
1355
1356 fn status_datom(
1357 entity: u64,
1358 status: &str,
1359 element: u64,
1360 policy: Option<PolicyEnvelope>,
1361 ) -> aether_ast::Datom {
1362 aether_ast::Datom {
1363 entity: EntityId::new(entity),
1364 attribute: aether_ast::AttributeId::new(1),
1365 value: Value::String(status.into()),
1366 op: aether_ast::OperationKind::Assert,
1367 element: ElementId::new(element),
1368 replica: aether_ast::ReplicaId::new(1),
1369 causal_context: Default::default(),
1370 provenance: aether_ast::DatomProvenance::default(),
1371 policy,
1372 }
1373 }
1374}