1use crate::{
2 ApiError, AppendRequest, CurrentStateRequest, ExplainArtifact, HistoryRequest,
3 InMemoryKernelService, KernelService, NamedExplainResult, NamedQueryResult,
4 ParseDocumentRequest, RunDocumentRequest, RunDocumentResponse,
5};
6use aether_ast::{
7 merge_partition_cuts, policy_allows, Atom, Datom, ExplainSpec, ExplainTarget, ExtensionalFact,
8 FactProvenance, FederatedCut, PartitionCut, PartitionId, PolicyContext, PredicateRef,
9 QueryResult, QueryRow, ReplicaId, SourceRef, TemporalView, TupleId, Value,
10};
11use aether_explain::{Explainer, InMemoryExplainer};
12use aether_plan::CompiledProgram;
13use aether_resolver::ResolvedState;
14use aether_runtime::{execute_query, DerivedSet};
15use aether_schema::{PredicateSignature, Schema, ValueType};
16use indexmap::IndexMap;
17use serde::{Deserialize, Serialize};
18use std::{
19 cell::RefCell,
20 collections::BTreeSet,
21 fmt::Write as _,
22 fs,
23 path::{Path, PathBuf},
24 time::{SystemTime, UNIX_EPOCH},
25};
26
27#[derive(Debug, Default)]
28pub struct PartitionedInMemoryKernelService {
29 partitions: IndexMap<PartitionId, InMemoryKernelService>,
30}
31
32#[derive(Debug)]
33pub struct SqlitePartitionedKernelService {
34 root: PathBuf,
35 partitions: RefCell<IndexMap<PartitionId, crate::SqliteKernelService>>,
36}
37
38#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
39pub struct LeaderEpoch(pub u64);
40
41impl LeaderEpoch {
42 pub const fn new(value: u64) -> Self {
43 Self(value)
44 }
45}
46
47#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub enum ReplicaRole {
50 #[default]
51 Leader,
52 Follower,
53}
54
55#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
56pub struct ReplicaConfig {
57 pub replica_id: ReplicaId,
58 pub database_path: PathBuf,
59 pub role: ReplicaRole,
60}
61
62#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
63pub struct AuthorityPartitionConfig {
64 pub partition: PartitionId,
65 pub replicas: Vec<ReplicaConfig>,
66}
67
68#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
69pub struct ReplicaStatus {
70 pub partition: PartitionId,
71 pub replica_id: ReplicaId,
72 pub role: ReplicaRole,
73 pub leader_epoch: LeaderEpoch,
74 #[serde(default, skip_serializing_if = "Option::is_none")]
75 pub applied_element: Option<aether_ast::ElementId>,
76 pub replication_lag: u64,
77 pub healthy: bool,
78 #[serde(default, skip_serializing_if = "Option::is_none")]
79 pub detail: Option<String>,
80}
81
82#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
83pub struct PartitionStatus {
84 pub partition: PartitionId,
85 pub leader_epoch: LeaderEpoch,
86 pub replicas: Vec<ReplicaStatus>,
87}
88
89#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
90pub struct PartitionStatusResponse {
91 pub partitions: Vec<PartitionStatus>,
92}
93
94#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
95pub struct PromoteReplicaRequest {
96 pub partition: PartitionId,
97 pub replica_id: ReplicaId,
98}
99
100#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
101pub struct PromoteReplicaResponse {
102 pub partition: PartitionId,
103 pub replica_id: ReplicaId,
104 pub leader_epoch: LeaderEpoch,
105}
106
107#[derive(Debug)]
108pub struct ReplicatedAuthorityPartitionService {
109 root: PathBuf,
110 partitions: RefCell<IndexMap<PartitionId, ReplicatedPartition>>,
111 cache: ReplicatedReadCache,
112}
113
114#[derive(Debug)]
115struct ReplicatedPartition {
116 metadata_path: PathBuf,
117 metadata: ReplicatedPartitionMetadata,
118 replicas: IndexMap<ReplicaId, crate::SqliteKernelService>,
119}
120
121#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
122struct ReplicatedPartitionMetadata {
123 partition: PartitionId,
124 leader_epoch: LeaderEpoch,
125 replicas: Vec<ReplicaConfig>,
126}
127
128#[derive(Debug, Default)]
129struct ReplicatedReadCache {
130 federated_run: Option<(String, FederatedRunDocumentResponse)>,
131 federated_report: Option<(String, FederatedExplainReport)>,
132}
133
134impl ReplicatedReadCache {
135 fn clear(&mut self) {
136 self.federated_run = None;
137 self.federated_report = None;
138 }
139}
140
141impl PartitionedInMemoryKernelService {
142 pub fn new() -> Self {
143 Self::default()
144 }
145
146 pub fn append_partition(
147 &mut self,
148 request: PartitionAppendRequest,
149 ) -> Result<PartitionAppendResponse, ApiError> {
150 let PartitionAppendRequest {
151 partition, datoms, ..
152 } = request;
153 let response = self
154 .partitions
155 .entry(partition.clone())
156 .or_default()
157 .append(AppendRequest { datoms })?;
158 Ok(PartitionAppendResponse {
159 partition,
160 leader_epoch: None,
161 appended: response.appended,
162 })
163 }
164
165 pub fn partition_history(
166 &self,
167 request: PartitionHistoryRequest,
168 ) -> Result<PartitionHistoryResponse, ApiError> {
169 partition_history_for(self.partition_service(&request.cut.partition)?, request)
170 }
171
172 pub fn partition_state(
173 &self,
174 request: PartitionStateRequest,
175 ) -> Result<PartitionStateResponse, ApiError> {
176 partition_state_for(self.partition_service(&request.cut.partition)?, request)
177 }
178
179 pub fn federated_history(
180 &self,
181 request: FederatedHistoryRequest,
182 ) -> Result<FederatedHistoryResponse, ApiError> {
183 let cut = validate_federated_cut(request.cut)?;
184 let mut partitions = Vec::with_capacity(cut.cuts.len());
185 for partition_cut in cut.cuts {
186 partitions.push(self.partition_history(PartitionHistoryRequest {
187 cut: partition_cut,
188 policy_context: request.policy_context.clone(),
189 })?);
190 }
191 Ok(FederatedHistoryResponse { partitions })
192 }
193
194 pub fn import_partition_facts(
195 &mut self,
196 request: ImportedFactQueryRequest,
197 policy_context: Option<PolicyContext>,
198 ) -> Result<ImportedFactQueryResponse, ApiError> {
199 import_partition_facts_from_service(
200 self.partition_service_mut(&request.cut.partition)?,
201 request,
202 policy_context,
203 )
204 }
205
206 pub fn federated_run_document(
207 &mut self,
208 request: FederatedRunDocumentRequest,
209 ) -> Result<FederatedRunDocumentResponse, ApiError> {
210 let imports = request
211 .imports
212 .iter()
213 .cloned()
214 .map(|import| self.import_partition_facts(import, request.policy_context.clone()))
215 .collect::<Result<Vec<_>, ApiError>>()?;
216 execute_federated_document_request(request, imports)
217 }
218
219 pub fn build_federated_explain_report(
220 &mut self,
221 request: FederatedRunDocumentRequest,
222 ) -> Result<FederatedExplainReport, ApiError> {
223 let policy_context = request.policy_context.clone();
224 let response = self.federated_run_document(request)?;
225 Ok(build_federated_explain_report_from_response(
226 response,
227 policy_context,
228 ))
229 }
230
231 fn partition_service(
232 &self,
233 partition: &PartitionId,
234 ) -> Result<&InMemoryKernelService, ApiError> {
235 self.partitions
236 .get(partition)
237 .ok_or_else(|| ApiError::Validation(format!("unknown partition {}", partition)))
238 }
239
240 fn partition_service_mut(
241 &mut self,
242 partition: &PartitionId,
243 ) -> Result<&mut InMemoryKernelService, ApiError> {
244 self.partitions
245 .get_mut(partition)
246 .ok_or_else(|| ApiError::Validation(format!("unknown partition {}", partition)))
247 }
248}
249
250impl SqlitePartitionedKernelService {
251 pub fn open(root: impl AsRef<Path>) -> Result<Self, ApiError> {
252 let root = root.as_ref().to_path_buf();
253 fs::create_dir_all(&root).map_err(|error| {
254 ApiError::Validation(format!(
255 "failed to create partition root {}: {}",
256 root.display(),
257 error
258 ))
259 })?;
260 Ok(Self {
261 root,
262 partitions: RefCell::new(IndexMap::new()),
263 })
264 }
265
266 pub fn root(&self) -> &Path {
267 &self.root
268 }
269
270 pub fn append_partition(
271 &mut self,
272 request: PartitionAppendRequest,
273 ) -> Result<PartitionAppendResponse, ApiError> {
274 let PartitionAppendRequest {
275 partition, datoms, ..
276 } = request;
277 self.ensure_partition_open(&partition, true)?;
278 let mut partitions = self.partitions.borrow_mut();
279 let response = partitions
280 .get_mut(&partition)
281 .expect("partition should be open")
282 .append(AppendRequest { datoms })?;
283 Ok(PartitionAppendResponse {
284 partition,
285 leader_epoch: None,
286 appended: response.appended,
287 })
288 }
289
290 pub fn partition_history(
291 &self,
292 request: PartitionHistoryRequest,
293 ) -> Result<PartitionHistoryResponse, ApiError> {
294 self.ensure_partition_open(&request.cut.partition, false)?;
295 let mut partitions = self.partitions.borrow_mut();
296 let service = partitions
297 .get_mut(&request.cut.partition)
298 .expect("partition should be open");
299 partition_history_for(service, request)
300 }
301
302 pub fn partition_state(
303 &self,
304 request: PartitionStateRequest,
305 ) -> Result<PartitionStateResponse, ApiError> {
306 self.ensure_partition_open(&request.cut.partition, false)?;
307 let mut partitions = self.partitions.borrow_mut();
308 let service = partitions
309 .get_mut(&request.cut.partition)
310 .expect("partition should be open");
311 partition_state_for(service, request)
312 }
313
314 pub fn federated_history(
315 &self,
316 request: FederatedHistoryRequest,
317 ) -> Result<FederatedHistoryResponse, ApiError> {
318 let cut = validate_federated_cut(request.cut)?;
319 let mut partitions = Vec::with_capacity(cut.cuts.len());
320 for partition_cut in cut.cuts {
321 partitions.push(self.partition_history(PartitionHistoryRequest {
322 cut: partition_cut,
323 policy_context: request.policy_context.clone(),
324 })?);
325 }
326 Ok(FederatedHistoryResponse { partitions })
327 }
328
329 pub fn import_partition_facts(
330 &mut self,
331 request: ImportedFactQueryRequest,
332 policy_context: Option<PolicyContext>,
333 ) -> Result<ImportedFactQueryResponse, ApiError> {
334 self.ensure_partition_open(&request.cut.partition, false)?;
335 let mut partitions = self.partitions.borrow_mut();
336 let service = partitions
337 .get_mut(&request.cut.partition)
338 .expect("partition should be open");
339 import_partition_facts_from_service(service, request, policy_context)
340 }
341
342 pub fn federated_run_document(
343 &mut self,
344 request: FederatedRunDocumentRequest,
345 ) -> Result<FederatedRunDocumentResponse, ApiError> {
346 let imports = request
347 .imports
348 .iter()
349 .cloned()
350 .map(|import| self.import_partition_facts(import, request.policy_context.clone()))
351 .collect::<Result<Vec<_>, ApiError>>()?;
352 execute_federated_document_request(request, imports)
353 }
354
355 pub fn build_federated_explain_report(
356 &mut self,
357 request: FederatedRunDocumentRequest,
358 ) -> Result<FederatedExplainReport, ApiError> {
359 let policy_context = request.policy_context.clone();
360 let response = self.federated_run_document(request)?;
361 Ok(build_federated_explain_report_from_response(
362 response,
363 policy_context,
364 ))
365 }
366
367 fn ensure_partition_open(
368 &self,
369 partition: &PartitionId,
370 create_if_missing: bool,
371 ) -> Result<(), ApiError> {
372 if self.partitions.borrow().contains_key(partition) {
373 return Ok(());
374 }
375
376 let path = self.partition_path(partition);
377 if !create_if_missing && !path.exists() {
378 return Err(ApiError::Validation(format!(
379 "unknown partition {}",
380 partition
381 )));
382 }
383
384 let service = crate::SqliteKernelService::open(&path)?;
385 self.partitions
386 .borrow_mut()
387 .insert(partition.clone(), service);
388 Ok(())
389 }
390
391 fn partition_path(&self, partition: &PartitionId) -> PathBuf {
392 self.root.join(format!(
393 "partition-{}.sqlite",
394 encode_partition_id(partition)
395 ))
396 }
397}
398
399impl ReplicatedAuthorityPartitionService {
400 pub fn open(
401 root: impl AsRef<Path>,
402 configs: Vec<AuthorityPartitionConfig>,
403 ) -> Result<Self, ApiError> {
404 let root = root.as_ref().to_path_buf();
405 fs::create_dir_all(&root).map_err(|error| {
406 ApiError::Validation(format!(
407 "failed to create replication root {}: {}",
408 root.display(),
409 error
410 ))
411 })?;
412 let mut partitions = IndexMap::new();
413 for config in configs {
414 let partition = config.partition.clone();
415 let runtime = ReplicatedPartition::open(&root, config)?;
416 partitions.insert(partition, runtime);
417 }
418 Ok(Self {
419 root,
420 partitions: RefCell::new(partitions),
421 cache: ReplicatedReadCache::default(),
422 })
423 }
424
425 pub fn root(&self) -> &Path {
426 &self.root
427 }
428
429 pub fn append_partition(
430 &mut self,
431 request: PartitionAppendRequest,
432 ) -> Result<PartitionAppendResponse, ApiError> {
433 self.cache.clear();
434 let mut partitions = self.partitions.borrow_mut();
435 let partition = partitions.get_mut(&request.partition).ok_or_else(|| {
436 ApiError::Validation(format!("unknown partition {}", request.partition))
437 })?;
438 partition.append(request)
439 }
440
441 pub fn partition_history(
442 &self,
443 request: PartitionHistoryRequest,
444 ) -> Result<PartitionHistoryResponse, ApiError> {
445 let mut partitions = self.partitions.borrow_mut();
446 let partition = partitions.get_mut(&request.cut.partition).ok_or_else(|| {
447 ApiError::Validation(format!("unknown partition {}", request.cut.partition))
448 })?;
449 partition_history_for(partition.leader_service_mut()?, request)
450 }
451
452 pub fn partition_state(
453 &self,
454 request: PartitionStateRequest,
455 ) -> Result<PartitionStateResponse, ApiError> {
456 let mut partitions = self.partitions.borrow_mut();
457 let partition = partitions.get_mut(&request.cut.partition).ok_or_else(|| {
458 ApiError::Validation(format!("unknown partition {}", request.cut.partition))
459 })?;
460 partition_state_for(partition.leader_service_mut()?, request)
461 }
462
463 pub fn federated_history(
464 &self,
465 request: FederatedHistoryRequest,
466 ) -> Result<FederatedHistoryResponse, ApiError> {
467 let cut = validate_federated_cut(request.cut)?;
468 let mut partitions = Vec::with_capacity(cut.cuts.len());
469 for partition_cut in cut.cuts {
470 partitions.push(self.partition_history(PartitionHistoryRequest {
471 cut: partition_cut,
472 policy_context: request.policy_context.clone(),
473 })?);
474 }
475 Ok(FederatedHistoryResponse { partitions })
476 }
477
478 pub fn import_partition_facts(
479 &mut self,
480 request: ImportedFactQueryRequest,
481 policy_context: Option<PolicyContext>,
482 ) -> Result<ImportedFactQueryResponse, ApiError> {
483 let mut partitions = self.partitions.borrow_mut();
484 let partition = partitions.get_mut(&request.cut.partition).ok_or_else(|| {
485 ApiError::Validation(format!("unknown partition {}", request.cut.partition))
486 })?;
487 import_partition_facts_from_service(
488 partition.leader_service_mut()?,
489 request,
490 policy_context,
491 )
492 }
493
494 pub fn federated_run_document(
495 &mut self,
496 request: FederatedRunDocumentRequest,
497 ) -> Result<FederatedRunDocumentResponse, ApiError> {
498 let cache_key = cache_key(&request)?;
499 if let Some((key, response)) = &self.cache.federated_run {
500 if *key == cache_key {
501 return Ok(response.clone());
502 }
503 }
504 let imports = request
505 .imports
506 .iter()
507 .cloned()
508 .map(|import| self.import_partition_facts(import, request.policy_context.clone()))
509 .collect::<Result<Vec<_>, ApiError>>()?;
510 let response = execute_federated_document_request(request, imports)?;
511 self.cache
512 .federated_run
513 .replace((cache_key, response.clone()));
514 Ok(response)
515 }
516
517 pub fn build_federated_explain_report(
518 &mut self,
519 request: FederatedRunDocumentRequest,
520 ) -> Result<FederatedExplainReport, ApiError> {
521 let cache_key = cache_key(&request)?;
522 if let Some((key, response)) = &self.cache.federated_report {
523 if *key == cache_key {
524 return Ok(response.clone());
525 }
526 }
527 let policy_context = request.policy_context.clone();
528 let response = self.federated_run_document(request)?;
529 let report = build_federated_explain_report_from_response(response, policy_context);
530 self.cache
531 .federated_report
532 .replace((cache_key, report.clone()));
533 Ok(report)
534 }
535
536 pub fn partition_status(&self) -> Result<PartitionStatusResponse, ApiError> {
537 let mut statuses = Vec::new();
538 for partition in self.partitions.borrow().values() {
539 statuses.push(partition.status()?);
540 }
541 Ok(PartitionStatusResponse {
542 partitions: statuses,
543 })
544 }
545
546 pub fn promote_replica(
547 &mut self,
548 request: PromoteReplicaRequest,
549 ) -> Result<PromoteReplicaResponse, ApiError> {
550 self.cache.clear();
551 let mut partitions = self.partitions.borrow_mut();
552 let partition = partitions.get_mut(&request.partition).ok_or_else(|| {
553 ApiError::Validation(format!("unknown partition {}", request.partition))
554 })?;
555 let leader_epoch = partition.promote(request.replica_id)?;
556 Ok(PromoteReplicaResponse {
557 partition: request.partition,
558 replica_id: request.replica_id,
559 leader_epoch,
560 })
561 }
562}
563
564impl ReplicatedPartition {
565 fn open(root: &Path, config: AuthorityPartitionConfig) -> Result<Self, ApiError> {
566 if config.replicas.is_empty() {
567 return Err(ApiError::Validation(format!(
568 "replicated partition {} must declare at least one replica",
569 config.partition
570 )));
571 }
572 let leader_count = config
573 .replicas
574 .iter()
575 .filter(|replica| replica.role == ReplicaRole::Leader)
576 .count();
577 if leader_count != 1 {
578 return Err(ApiError::Validation(format!(
579 "replicated partition {} must declare exactly one leader replica",
580 config.partition
581 )));
582 }
583 let metadata_path = root.join(format!(
584 "replication-{}.json",
585 encode_partition_id(&config.partition)
586 ));
587 let metadata = if metadata_path.exists() {
588 let contents = fs::read_to_string(metadata_path.clone()).map_err(|error| {
589 ApiError::Validation(format!(
590 "failed to read replication metadata {}: {}",
591 metadata_path.display(),
592 error
593 ))
594 })?;
595 serde_json::from_str::<ReplicatedPartitionMetadata>(&contents).map_err(|error| {
596 ApiError::Validation(format!(
597 "failed to parse replication metadata {}: {}",
598 metadata_path.display(),
599 error
600 ))
601 })?
602 } else {
603 let metadata = ReplicatedPartitionMetadata {
604 partition: config.partition.clone(),
605 leader_epoch: LeaderEpoch::new(1),
606 replicas: config.replicas,
607 };
608 write_partition_metadata(&metadata_path, &metadata)?;
609 metadata
610 };
611
612 let mut seen = BTreeSet::new();
613 let leader_count = metadata
614 .replicas
615 .iter()
616 .filter(|replica| replica.role == ReplicaRole::Leader)
617 .count();
618 if leader_count != 1 {
619 return Err(ApiError::Validation(format!(
620 "replicated partition {} metadata must contain exactly one leader",
621 metadata.partition
622 )));
623 }
624
625 let mut replicas = IndexMap::new();
626 for replica in &metadata.replicas {
627 if !seen.insert(replica.replica_id) {
628 return Err(ApiError::Validation(format!(
629 "replicated partition {} has duplicate replica {}",
630 metadata.partition, replica.replica_id.0
631 )));
632 }
633 let database_path = if replica.database_path.is_absolute() {
634 replica.database_path.clone()
635 } else {
636 root.join(&replica.database_path)
637 };
638 replicas.insert(
639 replica.replica_id,
640 crate::SqliteKernelService::open(database_path)?,
641 );
642 }
643
644 let mut partition = Self {
645 metadata_path,
646 metadata,
647 replicas,
648 };
649 partition.replicate_followers()?;
650 Ok(partition)
651 }
652
653 fn append(
654 &mut self,
655 request: PartitionAppendRequest,
656 ) -> Result<PartitionAppendResponse, ApiError> {
657 if let Some(leader_epoch) = request.leader_epoch.as_ref() {
658 if *leader_epoch != self.metadata.leader_epoch {
659 return Err(ApiError::Validation(format!(
660 "stale leader epoch {} for partition {}; current epoch is {}",
661 leader_epoch.0, request.partition, self.metadata.leader_epoch.0
662 )));
663 }
664 }
665 let leader_id = self.leader_id()?;
666 self.append_via_replica(leader_id, request)
667 }
668
669 fn append_via_replica(
670 &mut self,
671 replica_id: ReplicaId,
672 request: PartitionAppendRequest,
673 ) -> Result<PartitionAppendResponse, ApiError> {
674 let config = self.replica_config(replica_id)?;
675 if config.role != ReplicaRole::Leader {
676 return Err(ApiError::Validation(format!(
677 "replica {} for partition {} is read-only follower",
678 replica_id.0, request.partition
679 )));
680 }
681 let service = self
682 .replicas
683 .get_mut(&replica_id)
684 .ok_or_else(|| ApiError::Validation(format!("unknown replica {}", replica_id.0)))?;
685 let response = service.append(AppendRequest {
686 datoms: request.datoms,
687 })?;
688 self.replicate_followers()?;
689 Ok(PartitionAppendResponse {
690 partition: request.partition,
691 leader_epoch: Some(self.metadata.leader_epoch.clone()),
692 appended: response.appended,
693 })
694 }
695
696 fn leader_service_mut(&mut self) -> Result<&mut crate::SqliteKernelService, ApiError> {
697 let leader_id = self.leader_id()?;
698 self.replicas
699 .get_mut(&leader_id)
700 .ok_or_else(|| ApiError::Validation(format!("unknown leader replica {}", leader_id.0)))
701 }
702
703 fn leader_id(&self) -> Result<ReplicaId, ApiError> {
704 self.metadata
705 .replicas
706 .iter()
707 .find(|replica| replica.role == ReplicaRole::Leader)
708 .map(|replica| replica.replica_id)
709 .ok_or_else(|| {
710 ApiError::Validation(format!(
711 "partition {} has no leader replica",
712 self.metadata.partition
713 ))
714 })
715 }
716
717 fn promote(&mut self, replica_id: ReplicaId) -> Result<LeaderEpoch, ApiError> {
718 if !self.replicas.contains_key(&replica_id) {
719 return Err(ApiError::Validation(format!(
720 "unknown replica {} for partition {}",
721 replica_id.0, self.metadata.partition
722 )));
723 }
724 let leader_history = {
725 let leader = self.leader_service_mut()?;
726 leader
727 .history(HistoryRequest {
728 policy_context: None,
729 })?
730 .datoms
731 };
732 let replica = self
733 .replicas
734 .get_mut(&replica_id)
735 .ok_or_else(|| ApiError::Validation(format!("unknown replica {}", replica_id.0)))?;
736 sync_replica_history(
737 replica,
738 &leader_history,
739 &self.metadata.partition,
740 replica_id,
741 )?;
742
743 for config in &mut self.metadata.replicas {
744 config.role = if config.replica_id == replica_id {
745 ReplicaRole::Leader
746 } else {
747 ReplicaRole::Follower
748 };
749 }
750 self.metadata.leader_epoch.0 += 1;
751 write_partition_metadata(&self.metadata_path, &self.metadata)?;
752 self.replicate_followers()?;
753 Ok(self.metadata.leader_epoch.clone())
754 }
755
756 fn replicate_followers(&mut self) -> Result<(), ApiError> {
757 let leader_id = self.leader_id()?;
758 let leader_history = {
759 let leader = self
760 .replicas
761 .get_mut(&leader_id)
762 .ok_or_else(|| ApiError::Validation(format!("unknown leader {}", leader_id.0)))?;
763 leader
764 .history(HistoryRequest {
765 policy_context: None,
766 })?
767 .datoms
768 };
769 for replica in &self.metadata.replicas {
770 if replica.replica_id == leader_id {
771 continue;
772 }
773 let follower = self.replicas.get_mut(&replica.replica_id).ok_or_else(|| {
774 ApiError::Validation(format!("unknown replica {}", replica.replica_id.0))
775 })?;
776 sync_replica_history(
777 follower,
778 &leader_history,
779 &self.metadata.partition,
780 replica.replica_id,
781 )?;
782 }
783 Ok(())
784 }
785
786 fn status(&self) -> Result<PartitionStatus, ApiError> {
787 let leader_id = self.leader_id()?;
788 let leader_history = self
789 .replicas
790 .get(&leader_id)
791 .ok_or_else(|| ApiError::Validation(format!("unknown leader {}", leader_id.0)))?
792 .history(HistoryRequest {
793 policy_context: None,
794 })?
795 .datoms;
796 let leader_last = leader_history.last().map(|datom| datom.element);
797
798 let mut replicas = Vec::new();
799 for config in &self.metadata.replicas {
800 let service = self.replicas.get(&config.replica_id).ok_or_else(|| {
801 ApiError::Validation(format!("unknown replica {}", config.replica_id.0))
802 })?;
803 let history = service
804 .history(HistoryRequest {
805 policy_context: None,
806 })?
807 .datoms;
808 let last = history.last().map(|datom| datom.element);
809 let mismatch = validate_replica_prefix(&history, &leader_history).err();
810 replicas.push(ReplicaStatus {
811 partition: self.metadata.partition.clone(),
812 replica_id: config.replica_id,
813 role: config.role,
814 leader_epoch: self.metadata.leader_epoch.clone(),
815 applied_element: last,
816 replication_lag: leader_last
817 .zip(last)
818 .map(|(leader, replica)| leader.0.saturating_sub(replica.0))
819 .unwrap_or_else(|| leader_last.map(|leader| leader.0).unwrap_or(0)),
820 healthy: mismatch.is_none(),
821 detail: mismatch,
822 });
823 }
824
825 Ok(PartitionStatus {
826 partition: self.metadata.partition.clone(),
827 leader_epoch: self.metadata.leader_epoch.clone(),
828 replicas,
829 })
830 }
831
832 fn replica_config(&self, replica_id: ReplicaId) -> Result<&ReplicaConfig, ApiError> {
833 self.metadata
834 .replicas
835 .iter()
836 .find(|replica| replica.replica_id == replica_id)
837 .ok_or_else(|| {
838 ApiError::Validation(format!(
839 "unknown replica {} for partition {}",
840 replica_id.0, self.metadata.partition
841 ))
842 })
843 }
844}
845
846fn write_partition_metadata(
847 path: &Path,
848 metadata: &ReplicatedPartitionMetadata,
849) -> Result<(), ApiError> {
850 if let Some(parent) = path.parent() {
851 fs::create_dir_all(parent).map_err(|error| {
852 ApiError::Validation(format!(
853 "failed to create replication metadata directory {}: {}",
854 parent.display(),
855 error
856 ))
857 })?;
858 }
859 let encoded = serde_json::to_string_pretty(metadata).map_err(|error| {
860 ApiError::Validation(format!(
861 "failed to encode replication metadata {}: {}",
862 path.display(),
863 error
864 ))
865 })?;
866 fs::write(path, encoded).map_err(|error| {
867 ApiError::Validation(format!(
868 "failed to write replication metadata {}: {}",
869 path.display(),
870 error
871 ))
872 })
873}
874
875fn sync_replica_history(
876 replica: &mut crate::SqliteKernelService,
877 leader_history: &[Datom],
878 partition: &PartitionId,
879 replica_id: ReplicaId,
880) -> Result<(), ApiError> {
881 let follower_history = replica
882 .history(HistoryRequest {
883 policy_context: None,
884 })?
885 .datoms;
886 validate_replica_prefix(&follower_history, leader_history).map_err(|detail| {
887 ApiError::Validation(format!(
888 "replica {} for partition {} diverged from leader prefix: {}",
889 replica_id.0, partition, detail
890 ))
891 })?;
892 if follower_history.len() < leader_history.len() {
893 replica.append(AppendRequest {
894 datoms: leader_history[follower_history.len()..].to_vec(),
895 })?;
896 }
897 Ok(())
898}
899
900fn validate_replica_prefix(
901 follower_history: &[Datom],
902 leader_history: &[Datom],
903) -> Result<(), String> {
904 if follower_history.len() > leader_history.len() {
905 return Err("follower history exceeds leader history".into());
906 }
907 for (index, (follower, leader)) in follower_history
908 .iter()
909 .zip(leader_history.iter())
910 .enumerate()
911 {
912 if follower != leader {
913 return Err(format!("entry {} does not match leader", index));
914 }
915 }
916 Ok(())
917}
918
919fn validate_federated_cut(cut: FederatedCut) -> Result<FederatedCut, ApiError> {
920 let normalized = cut.normalized();
921 for pair in normalized.cuts.windows(2) {
922 if pair[0].partition == pair[1].partition {
923 return Err(ApiError::Validation(format!(
924 "federated cut contains duplicate partition {}",
925 pair[0].partition
926 )));
927 }
928 }
929 Ok(normalized)
930}
931
932fn partition_history_for(
933 service: &dyn KernelService,
934 request: PartitionHistoryRequest,
935) -> Result<PartitionHistoryResponse, ApiError> {
936 let datoms = match request.cut.as_of {
937 Some(element) => {
938 let visible_history = service
939 .history(HistoryRequest {
940 policy_context: request.policy_context.clone(),
941 })?
942 .datoms;
943 let end = visible_history
944 .iter()
945 .position(|datom| datom.element == element)
946 .ok_or_else(|| {
947 ApiError::Validation(format!(
948 "unknown element {} for partition {}",
949 element, request.cut.partition
950 ))
951 })?;
952 visible_history[..=end].to_vec()
953 }
954 None => {
955 service
956 .history(HistoryRequest {
957 policy_context: request.policy_context.clone(),
958 })?
959 .datoms
960 }
961 };
962
963 Ok(PartitionHistoryResponse {
964 cut: request.cut,
965 datoms,
966 })
967}
968
969fn partition_state_for(
970 service: &dyn KernelService,
971 request: PartitionStateRequest,
972) -> Result<PartitionStateResponse, ApiError> {
973 let state = match request.cut.as_of {
974 Some(element) => {
975 let visible_history = service
976 .history(HistoryRequest {
977 policy_context: request.policy_context.clone(),
978 })?
979 .datoms;
980 if !visible_history.iter().any(|datom| datom.element == element) {
981 return Err(ApiError::Validation(format!(
982 "unknown element {} for partition {}",
983 element, request.cut.partition
984 )));
985 }
986 service
987 .as_of(crate::AsOfRequest {
988 schema: request.schema,
989 datoms: Vec::new(),
990 at: element,
991 policy_context: request.policy_context,
992 })?
993 .state
994 }
995 None => {
996 service
997 .current_state(CurrentStateRequest {
998 schema: request.schema,
999 datoms: Vec::new(),
1000 policy_context: request.policy_context,
1001 })?
1002 .state
1003 }
1004 };
1005
1006 Ok(PartitionStateResponse {
1007 cut: request.cut,
1008 state,
1009 })
1010}
1011
1012fn import_partition_facts_from_service(
1013 service: &mut dyn KernelService,
1014 request: ImportedFactQueryRequest,
1015 policy_context: Option<PolicyContext>,
1016) -> Result<ImportedFactQueryResponse, ApiError> {
1017 let parsed = service.parse_document(ParseDocumentRequest {
1018 dsl: request.dsl.clone(),
1019 })?;
1020 let query_spec = select_query_spec(&parsed, request.query_name.as_deref())?;
1021 ensure_importable_query_shape(query_spec, request.query_name.as_deref())?;
1022 let response = service.run_document(RunDocumentRequest {
1023 dsl: request.dsl.clone(),
1024 policy_context,
1025 })?;
1026 let result = select_query_result(&response, request.query_name.as_deref())?;
1027 let tuple_index = response
1028 .derived
1029 .tuples
1030 .iter()
1031 .map(|tuple| (tuple.tuple.id, tuple))
1032 .collect::<IndexMap<_, _>>();
1033
1034 let facts = result
1035 .rows
1036 .iter()
1037 .enumerate()
1038 .map(|(index, row)| {
1039 build_imported_fact(&request, index, row, &tuple_index, &response.derived)
1040 })
1041 .collect::<Result<Vec<_>, ApiError>>()?;
1042
1043 Ok(ImportedFactQueryResponse {
1044 cut: request.cut,
1045 predicate: request.predicate,
1046 query_name: request.query_name,
1047 row_count: result.rows.len(),
1048 facts,
1049 })
1050}
1051
1052fn execute_federated_document_request(
1053 request: FederatedRunDocumentRequest,
1054 imports: Vec<ImportedFactQueryResponse>,
1055) -> Result<FederatedRunDocumentResponse, ApiError> {
1056 let cut = federated_cut_from_imports(&imports)?;
1057
1058 let mut local = InMemoryKernelService::new();
1059 let parsed = local.parse_document(ParseDocumentRequest {
1060 dsl: request.dsl.clone(),
1061 })?;
1062 ensure_federated_document_uses_current_views(&parsed)?;
1063
1064 let mut schema = parsed.schema.clone();
1065 let mut program = parsed.program.clone();
1066 let predicate_lookup = parsed
1067 .program
1068 .predicates
1069 .iter()
1070 .map(|predicate| (predicate.name.clone(), predicate.clone()))
1071 .collect::<IndexMap<_, _>>();
1072 for import in &imports {
1073 for fact in &import.facts {
1074 let mut fact = fact.clone();
1075 let predicate = predicate_lookup
1076 .get(&fact.predicate.name)
1077 .ok_or_else(|| {
1078 ApiError::Validation(format!(
1079 "federated document does not declare imported predicate {}",
1080 fact.predicate.name
1081 ))
1082 })?
1083 .clone();
1084 if predicate.arity != fact.values.len() {
1085 return Err(ApiError::Validation(format!(
1086 "federated document predicate {} expects arity {}, but imported fact supplied {} value(s)",
1087 predicate.name,
1088 predicate.arity,
1089 fact.values.len()
1090 )));
1091 }
1092 fact.predicate = predicate;
1093 program.facts.push(fact);
1094 }
1095 }
1096 ensure_schema_covers_fact_predicates(&mut schema, &program.facts)?;
1097
1098 let compiled = local.compile_program(crate::CompileProgramRequest {
1099 schema: schema.clone(),
1100 program,
1101 })?;
1102 let derived = local
1103 .evaluate_program(crate::EvaluateProgramRequest {
1104 state: ResolvedState::default(),
1105 program: compiled.program.clone(),
1106 policy_context: request.policy_context.clone(),
1107 })?
1108 .derived;
1109 let visible_program =
1110 filter_compiled_program_facts(&compiled.program, request.policy_context.as_ref());
1111 let visible_derived = filter_derived_set(&derived, request.policy_context.as_ref());
1112
1113 let query = match &parsed.query {
1114 Some(query) => Some(execute_query(
1115 &ResolvedState::default(),
1116 &visible_program,
1117 &visible_derived,
1118 &query.query,
1119 request.policy_context.as_ref(),
1120 )?),
1121 None => None,
1122 };
1123 let queries = parsed
1124 .queries
1125 .iter()
1126 .map(|named_query| {
1127 Ok(NamedQueryResult {
1128 name: named_query.name.clone(),
1129 spec: named_query.spec.clone(),
1130 result: execute_query(
1131 &ResolvedState::default(),
1132 &visible_program,
1133 &visible_derived,
1134 &named_query.spec.query,
1135 request.policy_context.as_ref(),
1136 )?,
1137 })
1138 })
1139 .collect::<Result<Vec<_>, ApiError>>()?;
1140 let explains = parsed
1141 .explains
1142 .iter()
1143 .map(|named_explain| {
1144 Ok(NamedExplainResult {
1145 name: named_explain.name.clone(),
1146 spec: named_explain.spec.clone(),
1147 result: execute_federated_explain_spec(
1148 &visible_program,
1149 &derived,
1150 &visible_derived,
1151 &named_explain.spec,
1152 request.policy_context.as_ref(),
1153 )?,
1154 })
1155 })
1156 .collect::<Result<Vec<_>, ApiError>>()?;
1157
1158 Ok(FederatedRunDocumentResponse {
1159 cut,
1160 imports,
1161 run: RunDocumentResponse {
1162 state: ResolvedState::default(),
1163 program: visible_program,
1164 derived: visible_derived,
1165 query,
1166 queries,
1167 explains,
1168 },
1169 })
1170}
1171
1172fn build_federated_explain_report_from_response(
1173 response: FederatedRunDocumentResponse,
1174 policy_context: Option<PolicyContext>,
1175) -> FederatedExplainReport {
1176 let primary_query = response
1177 .run
1178 .query
1179 .as_ref()
1180 .map(report_rows)
1181 .unwrap_or_default();
1182 let named_queries = response
1183 .run
1184 .queries
1185 .iter()
1186 .map(|query| FederatedNamedQuerySummary {
1187 name: query.name.clone(),
1188 rows: report_rows(&query.result),
1189 })
1190 .collect::<Vec<_>>();
1191 let traces = response
1192 .run
1193 .explains
1194 .iter()
1195 .filter_map(|explain| match &explain.result {
1196 ExplainArtifact::Tuple(trace) => Some(build_trace_summary(explain.name.clone(), trace)),
1197 ExplainArtifact::Plan(_) => None,
1198 })
1199 .collect::<Vec<_>>();
1200
1201 FederatedExplainReport {
1202 generated_at_ms: now_millis(),
1203 cut: response.cut,
1204 policy_context,
1205 imports: response
1206 .imports
1207 .iter()
1208 .map(|import| FederatedImportedSourceSummary {
1209 cut: import.cut.clone(),
1210 predicate: import.predicate.clone(),
1211 query_name: import.query_name.clone(),
1212 fact_count: import.facts.len(),
1213 })
1214 .collect(),
1215 primary_query,
1216 named_queries,
1217 traces,
1218 }
1219}
1220
1221fn encode_partition_id(partition: &PartitionId) -> String {
1222 let mut encoded = String::with_capacity(partition.as_str().len() * 2);
1223 for byte in partition.as_str().as_bytes() {
1224 let _ = write!(&mut encoded, "{byte:02x}");
1225 }
1226 encoded
1227}
1228
1229fn cache_key<T: Serialize>(value: &T) -> Result<String, ApiError> {
1230 serde_json::to_string(value)
1231 .map_err(|error| ApiError::Validation(format!("failed to encode cache key: {error}")))
1232}
1233
1234fn select_query_result<'a>(
1235 response: &'a RunDocumentResponse,
1236 query_name: Option<&str>,
1237) -> Result<&'a QueryResult, ApiError> {
1238 match query_name {
1239 Some(name) => response
1240 .queries
1241 .iter()
1242 .find(|query| query.name.as_deref() == Some(name))
1243 .map(|query| &query.result)
1244 .ok_or_else(|| ApiError::Validation(format!("unknown named query {}", name))),
1245 None => response
1246 .query
1247 .as_ref()
1248 .ok_or_else(|| ApiError::Validation("document did not produce a primary query".into())),
1249 }
1250}
1251
1252fn select_query_spec<'a>(
1253 response: &'a crate::ParseDocumentResponse,
1254 query_name: Option<&str>,
1255) -> Result<&'a aether_ast::QuerySpec, ApiError> {
1256 match query_name {
1257 Some(name) => response
1258 .queries
1259 .iter()
1260 .find(|query| query.name.as_deref() == Some(name))
1261 .map(|query| &query.spec)
1262 .ok_or_else(|| ApiError::Validation(format!("unknown named query {}", name))),
1263 None => response
1264 .query
1265 .as_ref()
1266 .ok_or_else(|| ApiError::Validation("document did not produce a primary query".into())),
1267 }
1268}
1269
1270fn ensure_importable_query_shape(
1271 spec: &aether_ast::QuerySpec,
1272 query_name: Option<&str>,
1273) -> Result<(), ApiError> {
1274 if spec.query.goals.len() == 1 {
1275 Ok(())
1276 } else {
1277 let label = query_name.unwrap_or("<primary>");
1278 Err(ApiError::Validation(format!(
1279 "imported fact query {label} must have exactly one goal so imported provenance maps to a single semantic row"
1280 )))
1281 }
1282}
1283
1284fn build_imported_fact(
1285 request: &ImportedFactQueryRequest,
1286 index: usize,
1287 row: &QueryRow,
1288 tuples: &IndexMap<TupleId, &aether_ast::DerivedTuple>,
1289 derived: &DerivedSet,
1290) -> Result<ExtensionalFact, ApiError> {
1291 if row.values.len() != request.predicate.arity {
1292 return Err(ApiError::Validation(format!(
1293 "imported fact row {} from {} produced {} value(s), but predicate {} expects arity {}",
1294 index,
1295 request.cut,
1296 row.values.len(),
1297 request.predicate.name,
1298 request.predicate.arity
1299 )));
1300 }
1301 let tuple_id = row.tuple_id.ok_or_else(|| {
1302 ApiError::Validation(format!(
1303 "imported fact row {} from {} was not backed by a derived tuple; import a tuple-producing query instead",
1304 index, request.cut
1305 ))
1306 })?;
1307 let tuple = tuples.get(&tuple_id).copied().ok_or_else(|| {
1308 ApiError::Validation(format!(
1309 "imported fact row {} from {} referenced missing tuple t{}",
1310 index, request.cut, tuple_id.0
1311 ))
1312 })?;
1313 let imported_cuts = merge_partition_cuts(
1314 std::iter::once(&request.cut).chain(tuple.metadata.imported_cuts.iter()),
1315 );
1316 let policy = tuple.policy.clone();
1317
1318 if !derived
1319 .tuples
1320 .iter()
1321 .any(|candidate| candidate.tuple.id == tuple_id)
1322 {
1323 return Err(ApiError::Validation(format!(
1324 "imported fact row {} from {} referenced tuple t{} outside the derived set",
1325 index, request.cut, tuple_id.0
1326 )));
1327 }
1328
1329 Ok(ExtensionalFact {
1330 predicate: request.predicate.clone(),
1331 values: row.values.clone(),
1332 policy,
1333 provenance: Some(FactProvenance {
1334 source_datom_ids: tuple.metadata.source_datom_ids.clone(),
1335 imported_cuts,
1336 sidecar_origin: None,
1337 source_ref: Some(SourceRef {
1338 uri: format!(
1339 "aether://partition/{}/tuple/t{}",
1340 request.cut.partition, tuple_id.0
1341 ),
1342 digest: None,
1343 }),
1344 }),
1345 })
1346}
1347
1348fn federated_cut_from_imports(
1349 imports: &[ImportedFactQueryResponse],
1350) -> Result<FederatedCut, ApiError> {
1351 let mut by_partition = IndexMap::<PartitionId, PartitionCut>::new();
1352 for import in imports {
1353 match by_partition.get(&import.cut.partition) {
1354 Some(existing) if existing != &import.cut => {
1355 return Err(ApiError::Validation(format!(
1356 "federated imports contain conflicting cuts for partition {}",
1357 import.cut.partition
1358 )));
1359 }
1360 Some(_) => {}
1361 None => {
1362 by_partition.insert(import.cut.partition.clone(), import.cut.clone());
1363 }
1364 }
1365 }
1366 validate_federated_cut(FederatedCut {
1367 cuts: by_partition.into_values().collect(),
1368 })
1369}
1370
1371fn ensure_federated_document_uses_current_views(
1372 parsed: &crate::ParseDocumentResponse,
1373) -> Result<(), ApiError> {
1374 if let Some(query) = &parsed.query {
1375 ensure_current_view("primary query", &query.view)?;
1376 }
1377 for named_query in &parsed.queries {
1378 ensure_current_view(
1379 named_query
1380 .name
1381 .as_deref()
1382 .map(|name| format!("query {}", name))
1383 .unwrap_or_else(|| "query".into())
1384 .as_str(),
1385 &named_query.spec.view,
1386 )?;
1387 }
1388 for named_explain in &parsed.explains {
1389 ensure_current_view(
1390 named_explain
1391 .name
1392 .as_deref()
1393 .map(|name| format!("explain {}", name))
1394 .unwrap_or_else(|| "explain".into())
1395 .as_str(),
1396 &named_explain.spec.view,
1397 )?;
1398 }
1399 Ok(())
1400}
1401
1402fn ensure_current_view(label: &str, view: &TemporalView) -> Result<(), ApiError> {
1403 match view {
1404 TemporalView::Current => Ok(()),
1405 TemporalView::AsOf(element) => Err(ApiError::Validation(format!(
1406 "{label} cannot use AsOf(e{}); federated time must be expressed through explicit partition cuts",
1407 element.0
1408 ))),
1409 }
1410}
1411
1412fn ensure_schema_covers_fact_predicates(
1413 schema: &mut Schema,
1414 facts: &[ExtensionalFact],
1415) -> Result<(), ApiError> {
1416 let mut signatures = IndexMap::<aether_ast::PredicateId, PredicateSignature>::new();
1417 for fact in facts {
1418 let fields = fact.values.iter().map(value_type_for).collect::<Vec<_>>();
1419 match signatures.get(&fact.predicate.id) {
1420 Some(existing) if existing.fields != fields => {
1421 return Err(ApiError::Validation(format!(
1422 "imported predicate {} has inconsistent field types across federated facts",
1423 fact.predicate.name
1424 )));
1425 }
1426 Some(_) => {}
1427 None => {
1428 signatures.insert(
1429 fact.predicate.id,
1430 PredicateSignature {
1431 id: fact.predicate.id,
1432 name: fact.predicate.name.clone(),
1433 fields,
1434 },
1435 );
1436 }
1437 }
1438 }
1439
1440 for signature in signatures.into_values() {
1441 if schema.predicate(&signature.id).is_none() {
1442 schema
1443 .register_predicate(signature)
1444 .map_err(|error| ApiError::Validation(error.to_string()))?;
1445 }
1446 }
1447 Ok(())
1448}
1449
1450fn value_type_for(value: &Value) -> ValueType {
1451 match value {
1452 Value::Null => ValueType::String,
1453 Value::Bool(_) => ValueType::Bool,
1454 Value::I64(_) => ValueType::I64,
1455 Value::U64(_) => ValueType::U64,
1456 Value::F64(_) => ValueType::F64,
1457 Value::String(_) => ValueType::String,
1458 Value::Bytes(_) => ValueType::Bytes,
1459 Value::Entity(_) => ValueType::Entity,
1460 Value::List(values) => ValueType::List(Box::new(
1461 values
1462 .first()
1463 .map(value_type_for)
1464 .unwrap_or(ValueType::String),
1465 )),
1466 }
1467}
1468
1469fn filter_compiled_program_facts(
1470 program: &CompiledProgram,
1471 policy_context: Option<&PolicyContext>,
1472) -> CompiledProgram {
1473 let mut filtered = program.clone();
1474 filtered
1475 .facts
1476 .retain(|fact| policy_allows(policy_context, fact.policy.as_ref()));
1477 filtered
1478}
1479
1480fn filter_derived_set(derived: &DerivedSet, policy_context: Option<&PolicyContext>) -> DerivedSet {
1481 let tuples = derived
1482 .tuples
1483 .iter()
1484 .filter(|tuple| policy_allows(policy_context, tuple.policy.as_ref()))
1485 .cloned()
1486 .collect::<Vec<_>>();
1487 let visible_ids = tuples
1488 .iter()
1489 .map(|tuple| tuple.tuple.id)
1490 .collect::<std::collections::BTreeSet<_>>();
1491 let predicate_index = derived
1492 .predicate_index
1493 .iter()
1494 .map(|(predicate, tuple_ids)| {
1495 (
1496 *predicate,
1497 tuple_ids
1498 .iter()
1499 .copied()
1500 .filter(|tuple_id| visible_ids.contains(tuple_id))
1501 .collect::<Vec<_>>(),
1502 )
1503 })
1504 .collect::<IndexMap<_, _>>();
1505
1506 DerivedSet {
1507 tuples,
1508 iterations: derived.iterations.clone(),
1509 predicate_index,
1510 }
1511}
1512
1513fn execute_federated_explain_spec(
1514 program: &CompiledProgram,
1515 derived: &DerivedSet,
1516 visible_derived: &DerivedSet,
1517 spec: &ExplainSpec,
1518 policy_context: Option<&PolicyContext>,
1519) -> Result<ExplainArtifact, ApiError> {
1520 match &spec.target {
1521 ExplainTarget::Plan => Ok(ExplainArtifact::Plan(
1522 InMemoryExplainer::default().explain_plan(&program.phase_graph)?,
1523 )),
1524 ExplainTarget::Tuple(atom) => {
1525 let tuple_id = find_matching_derived_tuple(visible_derived, atom).ok_or_else(|| {
1526 ApiError::Validation(format!(
1527 "no derived tuple matched explain target {}",
1528 atom.predicate.name
1529 ))
1530 })?;
1531 let trace = InMemoryExplainer::from_derived_set(derived).explain_tuple(&tuple_id)?;
1532 Ok(ExplainArtifact::Tuple(filter_trace(trace, policy_context)?))
1533 }
1534 }
1535}
1536
1537fn find_matching_derived_tuple(derived: &DerivedSet, atom: &Atom) -> Option<TupleId> {
1538 derived.tuples.iter().find_map(|tuple| {
1539 if tuple.tuple.predicate != atom.predicate.id
1540 || tuple.tuple.values.len() != atom.terms.len()
1541 {
1542 return None;
1543 }
1544 let matches = atom
1545 .terms
1546 .iter()
1547 .zip(&tuple.tuple.values)
1548 .all(|(term, value)| matches_term(term, value));
1549 matches.then_some(tuple.tuple.id)
1550 })
1551}
1552
1553fn matches_term(term: &aether_ast::Term, value: &Value) -> bool {
1554 match term {
1555 aether_ast::Term::Value(expected) => expected == value,
1556 aether_ast::Term::Variable(_) => true,
1557 aether_ast::Term::Aggregate(_) => false,
1558 }
1559}
1560
1561fn filter_trace(
1562 trace: aether_ast::DerivationTrace,
1563 policy_context: Option<&PolicyContext>,
1564) -> Result<aether_ast::DerivationTrace, ApiError> {
1565 let tuples = trace
1566 .tuples
1567 .into_iter()
1568 .filter(|tuple| policy_allows(policy_context, tuple.policy.as_ref()))
1569 .collect::<Vec<_>>();
1570 if tuples.iter().all(|tuple| tuple.tuple.id != trace.root) {
1571 return Err(ApiError::Validation(
1572 "requested tuple is not visible under the current policy".into(),
1573 ));
1574 }
1575 Ok(aether_ast::DerivationTrace {
1576 root: trace.root,
1577 tuples,
1578 })
1579}
1580
1581fn report_rows(result: &QueryResult) -> Vec<FederatedReportRow> {
1582 result
1583 .rows
1584 .iter()
1585 .map(|row| FederatedReportRow {
1586 tuple_id: row.tuple_id,
1587 values: row.values.clone(),
1588 })
1589 .collect()
1590}
1591
1592fn build_trace_summary(
1593 name: Option<String>,
1594 trace: &aether_ast::DerivationTrace,
1595) -> FederatedTraceSummary {
1596 let imported_cuts = merge_partition_cuts(
1597 trace
1598 .tuples
1599 .iter()
1600 .flat_map(|tuple| tuple.metadata.imported_cuts.iter()),
1601 );
1602 FederatedTraceSummary {
1603 name,
1604 root: trace.root,
1605 tuple_count: trace.tuples.len(),
1606 imported_cuts,
1607 tuples: trace
1608 .tuples
1609 .iter()
1610 .map(|tuple| FederatedTraceTupleSummary {
1611 tuple_id: tuple.tuple.id,
1612 values: tuple.tuple.values.clone(),
1613 iteration: tuple.metadata.iteration,
1614 source_datom_ids: tuple.metadata.source_datom_ids.clone(),
1615 imported_cuts: tuple.metadata.imported_cuts.clone(),
1616 parent_tuple_ids: tuple.metadata.parent_tuple_ids.clone(),
1617 })
1618 .collect(),
1619 }
1620}
1621
1622pub fn render_federated_explain_report_markdown(report: &FederatedExplainReport) -> String {
1623 let mut markdown = String::new();
1624 markdown.push_str("# Federated Explain Report\n\n");
1625 markdown.push_str(&format!(
1626 "- Generated at (ms): `{}`\n- Federated cut: `{}`\n- Effective policy: `{}`\n\n",
1627 report.generated_at_ms,
1628 format_federated_cut(&report.cut),
1629 format_policy_context(report.policy_context.as_ref())
1630 ));
1631
1632 markdown.push_str("## Imported Sources\n\n");
1633 if report.imports.is_empty() {
1634 markdown.push_str("_None._\n\n");
1635 } else {
1636 for import in &report.imports {
1637 markdown.push_str(&format!(
1638 "- `{}` -> `{}` | query `{}` | facts `{}`\n",
1639 import.cut,
1640 import.predicate.name,
1641 import.query_name.as_deref().unwrap_or("<primary>"),
1642 import.fact_count
1643 ));
1644 }
1645 markdown.push('\n');
1646 }
1647
1648 markdown.push_str("## Primary Query\n\n");
1649 if report.primary_query.is_empty() {
1650 markdown.push_str("_No rows._\n\n");
1651 } else {
1652 for row in &report.primary_query {
1653 markdown.push_str(&format!(
1654 "- `{}`{}\n",
1655 format_values(&row.values),
1656 row.tuple_id
1657 .map(|tuple_id| format!(" | tuple `t{}`", tuple_id.0))
1658 .unwrap_or_default()
1659 ));
1660 }
1661 markdown.push('\n');
1662 }
1663
1664 if !report.named_queries.is_empty() {
1665 markdown.push_str("## Named Queries\n\n");
1666 for query in &report.named_queries {
1667 markdown.push_str(&format!(
1668 "### {}\n\n",
1669 query.name.as_deref().unwrap_or("<unnamed>")
1670 ));
1671 if query.rows.is_empty() {
1672 markdown.push_str("_No rows._\n\n");
1673 } else {
1674 for row in &query.rows {
1675 markdown.push_str(&format!(
1676 "- `{}`{}\n",
1677 format_values(&row.values),
1678 row.tuple_id
1679 .map(|tuple_id| format!(" | tuple `t{}`", tuple_id.0))
1680 .unwrap_or_default()
1681 ));
1682 }
1683 markdown.push('\n');
1684 }
1685 }
1686 }
1687
1688 if !report.traces.is_empty() {
1689 markdown.push_str("## Federated Traces\n\n");
1690 for trace in &report.traces {
1691 markdown.push_str(&format!(
1692 "### {}\n\n- Root: `t{}`\n- Tuple count: `{}`\n- Imported cuts: `{}`\n\n",
1693 trace.name.as_deref().unwrap_or("<unnamed trace>"),
1694 trace.root.0,
1695 trace.tuple_count,
1696 format_partition_cuts(&trace.imported_cuts)
1697 ));
1698 for tuple in &trace.tuples {
1699 markdown.push_str(&format!(
1700 "- `t{}` via iteration `{}` -> `{}` | sources `{}` | imported `{}` | parents `{}`\n",
1701 tuple.tuple_id.0,
1702 tuple.iteration,
1703 format_values(&tuple.values),
1704 format_element_ids(&tuple.source_datom_ids),
1705 format_partition_cuts(&tuple.imported_cuts),
1706 format_tuple_ids(&tuple.parent_tuple_ids)
1707 ));
1708 }
1709 markdown.push('\n');
1710 }
1711 }
1712
1713 markdown
1714}
1715
1716fn format_federated_cut(cut: &FederatedCut) -> String {
1717 if cut.cuts.is_empty() {
1718 "<none>".into()
1719 } else {
1720 cut.cuts
1721 .iter()
1722 .map(ToString::to_string)
1723 .collect::<Vec<_>>()
1724 .join(", ")
1725 }
1726}
1727
1728fn format_partition_cuts(cuts: &[PartitionCut]) -> String {
1729 if cuts.is_empty() {
1730 "<none>".into()
1731 } else {
1732 cuts.iter()
1733 .map(ToString::to_string)
1734 .collect::<Vec<_>>()
1735 .join(", ")
1736 }
1737}
1738
1739fn format_values(values: &[Value]) -> String {
1740 values
1741 .iter()
1742 .map(format_value)
1743 .collect::<Vec<_>>()
1744 .join(", ")
1745}
1746
1747fn format_value(value: &Value) -> String {
1748 match value {
1749 Value::Null => "null".into(),
1750 Value::Bool(value) => value.to_string(),
1751 Value::I64(value) => value.to_string(),
1752 Value::U64(value) => value.to_string(),
1753 Value::F64(value) => format!("{value:.4}"),
1754 Value::String(value) => value.clone(),
1755 Value::Bytes(value) => format!("<{} bytes>", value.len()),
1756 Value::Entity(value) => format!("entity({})", value.0),
1757 Value::List(values) => format!("[{}]", format_values(values)),
1758 }
1759}
1760
1761fn format_element_ids(elements: &[aether_ast::ElementId]) -> String {
1762 if elements.is_empty() {
1763 "<none>".into()
1764 } else {
1765 elements
1766 .iter()
1767 .map(|element| format!("e{}", element.0))
1768 .collect::<Vec<_>>()
1769 .join(", ")
1770 }
1771}
1772
1773fn format_tuple_ids(tuple_ids: &[TupleId]) -> String {
1774 if tuple_ids.is_empty() {
1775 "<none>".into()
1776 } else {
1777 tuple_ids
1778 .iter()
1779 .map(|tuple_id| format!("t{}", tuple_id.0))
1780 .collect::<Vec<_>>()
1781 .join(", ")
1782 }
1783}
1784
1785fn format_policy_context(policy_context: Option<&PolicyContext>) -> String {
1786 match policy_context {
1787 None => "public".into(),
1788 Some(policy) if policy.is_empty() => "public".into(),
1789 Some(policy) => {
1790 let capabilities = if policy.capabilities.is_empty() {
1791 "capabilities=<none>".into()
1792 } else {
1793 format!("capabilities={}", policy.capabilities.join(","))
1794 };
1795 let visibilities = if policy.visibilities.is_empty() {
1796 "visibilities=<none>".into()
1797 } else {
1798 format!("visibilities={}", policy.visibilities.join(","))
1799 };
1800 format!("{capabilities}; {visibilities}")
1801 }
1802 }
1803}
1804
1805fn now_millis() -> u64 {
1806 SystemTime::now()
1807 .duration_since(UNIX_EPOCH)
1808 .expect("system clock before unix epoch")
1809 .as_millis() as u64
1810}
1811
1812#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1813pub struct PartitionAppendRequest {
1814 pub partition: PartitionId,
1815 #[serde(default, skip_serializing_if = "Option::is_none")]
1816 pub leader_epoch: Option<LeaderEpoch>,
1817 pub datoms: Vec<Datom>,
1818}
1819
1820#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
1821pub struct PartitionAppendResponse {
1822 pub partition: PartitionId,
1823 #[serde(default, skip_serializing_if = "Option::is_none")]
1824 pub leader_epoch: Option<LeaderEpoch>,
1825 pub appended: usize,
1826}
1827
1828#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
1829pub struct PartitionHistoryRequest {
1830 pub cut: PartitionCut,
1831 #[serde(default, skip_serializing_if = "Option::is_none")]
1832 pub policy_context: Option<PolicyContext>,
1833}
1834
1835#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1836pub struct PartitionHistoryResponse {
1837 pub cut: PartitionCut,
1838 pub datoms: Vec<Datom>,
1839}
1840
1841#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1842pub struct PartitionStateRequest {
1843 pub cut: PartitionCut,
1844 pub schema: Schema,
1845 #[serde(default, skip_serializing_if = "Option::is_none")]
1846 pub policy_context: Option<PolicyContext>,
1847}
1848
1849#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1850pub struct PartitionStateResponse {
1851 pub cut: PartitionCut,
1852 pub state: ResolvedState,
1853}
1854
1855#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1856pub struct FederatedHistoryRequest {
1857 pub cut: FederatedCut,
1858 #[serde(default, skip_serializing_if = "Option::is_none")]
1859 pub policy_context: Option<PolicyContext>,
1860}
1861
1862#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1863pub struct FederatedHistoryResponse {
1864 pub partitions: Vec<PartitionHistoryResponse>,
1865}
1866
1867#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1868pub struct ImportedFactQueryRequest {
1869 pub cut: PartitionCut,
1870 pub dsl: String,
1871 pub predicate: PredicateRef,
1872 #[serde(default, skip_serializing_if = "Option::is_none")]
1873 pub query_name: Option<String>,
1874}
1875
1876#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1877pub struct ImportedFactQueryResponse {
1878 pub cut: PartitionCut,
1879 pub predicate: PredicateRef,
1880 #[serde(default, skip_serializing_if = "Option::is_none")]
1881 pub query_name: Option<String>,
1882 pub row_count: usize,
1883 pub facts: Vec<ExtensionalFact>,
1884}
1885
1886#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1887pub struct FederatedRunDocumentRequest {
1888 pub dsl: String,
1889 pub imports: Vec<ImportedFactQueryRequest>,
1890 #[serde(default, skip_serializing_if = "Option::is_none")]
1891 pub policy_context: Option<PolicyContext>,
1892}
1893
1894#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1895pub struct FederatedRunDocumentResponse {
1896 pub cut: FederatedCut,
1897 pub imports: Vec<ImportedFactQueryResponse>,
1898 pub run: RunDocumentResponse,
1899}
1900
1901#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1902pub struct FederatedExplainReport {
1903 pub generated_at_ms: u64,
1904 pub cut: FederatedCut,
1905 #[serde(default, skip_serializing_if = "Option::is_none")]
1906 pub policy_context: Option<PolicyContext>,
1907 pub imports: Vec<FederatedImportedSourceSummary>,
1908 pub primary_query: Vec<FederatedReportRow>,
1909 pub named_queries: Vec<FederatedNamedQuerySummary>,
1910 pub traces: Vec<FederatedTraceSummary>,
1911}
1912
1913#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1914pub struct FederatedImportedSourceSummary {
1915 pub cut: PartitionCut,
1916 pub predicate: PredicateRef,
1917 #[serde(default, skip_serializing_if = "Option::is_none")]
1918 pub query_name: Option<String>,
1919 pub fact_count: usize,
1920}
1921
1922#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1923pub struct FederatedReportRow {
1924 #[serde(default, skip_serializing_if = "Option::is_none")]
1925 pub tuple_id: Option<TupleId>,
1926 pub values: Vec<Value>,
1927}
1928
1929#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1930pub struct FederatedNamedQuerySummary {
1931 #[serde(default, skip_serializing_if = "Option::is_none")]
1932 pub name: Option<String>,
1933 pub rows: Vec<FederatedReportRow>,
1934}
1935
1936#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1937pub struct FederatedTraceSummary {
1938 #[serde(default, skip_serializing_if = "Option::is_none")]
1939 pub name: Option<String>,
1940 pub root: TupleId,
1941 pub tuple_count: usize,
1942 pub imported_cuts: Vec<PartitionCut>,
1943 pub tuples: Vec<FederatedTraceTupleSummary>,
1944}
1945
1946#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1947pub struct FederatedTraceTupleSummary {
1948 pub tuple_id: TupleId,
1949 pub values: Vec<Value>,
1950 pub iteration: usize,
1951 pub source_datom_ids: Vec<aether_ast::ElementId>,
1952 pub imported_cuts: Vec<PartitionCut>,
1953 pub parent_tuple_ids: Vec<TupleId>,
1954}
1955
1956#[cfg(test)]
1957mod tests {
1958 use super::{
1959 render_federated_explain_report_markdown, AuthorityPartitionConfig,
1960 FederatedHistoryRequest, FederatedRunDocumentRequest, ImportedFactQueryRequest,
1961 LeaderEpoch, PartitionAppendRequest, PartitionHistoryRequest, PartitionStateRequest,
1962 PartitionedInMemoryKernelService, PromoteReplicaRequest, ReplicaConfig, ReplicaRole,
1963 ReplicatedAuthorityPartitionService, SqlitePartitionedKernelService,
1964 };
1965 use aether_ast::{
1966 AttributeId, Datom, DatomProvenance, ElementId, EntityId, FederatedCut, OperationKind,
1967 PartitionCut, PartitionId, PolicyContext, PolicyEnvelope, PredicateId, PredicateRef,
1968 ReplicaId, Value,
1969 };
1970 use aether_resolver::ResolvedValue;
1971 use aether_schema::{AttributeClass, AttributeSchema, Schema, ValueType};
1972 use std::{
1973 path::{Path, PathBuf},
1974 sync::atomic::{AtomicU64, Ordering},
1975 time::{SystemTime, UNIX_EPOCH},
1976 };
1977
1978 static NEXT_TEST_ID: AtomicU64 = AtomicU64::new(1);
1979
1980 #[test]
1981 fn partitioned_service_keeps_local_truth_exact_per_partition() {
1982 let mut service = PartitionedInMemoryKernelService::new();
1983 service
1984 .append_partition(PartitionAppendRequest {
1985 partition: PartitionId::new("tenant-a"),
1986 leader_epoch: None,
1987 datoms: vec![
1988 sample_datom(1, 1, "tenant-a-open", 1, None),
1989 sample_datom(1, 1, "tenant-a-running", 2, None),
1990 ],
1991 })
1992 .expect("append tenant-a");
1993 service
1994 .append_partition(PartitionAppendRequest {
1995 partition: PartitionId::new("tenant-b"),
1996 leader_epoch: None,
1997 datoms: vec![sample_datom(1, 1, "tenant-b-ready", 1, None)],
1998 })
1999 .expect("append tenant-b");
2000
2001 let tenant_a_current = service
2002 .partition_state(PartitionStateRequest {
2003 cut: PartitionCut::current("tenant-a"),
2004 schema: schema(),
2005 policy_context: None,
2006 })
2007 .expect("tenant-a current state");
2008 let tenant_a_as_of = service
2009 .partition_state(PartitionStateRequest {
2010 cut: PartitionCut::as_of("tenant-a", ElementId::new(1)),
2011 schema: schema(),
2012 policy_context: None,
2013 })
2014 .expect("tenant-a as-of state");
2015 let tenant_b_history = service
2016 .partition_history(PartitionHistoryRequest {
2017 cut: PartitionCut::current("tenant-b"),
2018 policy_context: None,
2019 })
2020 .expect("tenant-b history");
2021
2022 assert_eq!(
2023 tenant_a_current
2024 .state
2025 .entity(&EntityId::new(1))
2026 .and_then(|entity| entity.attribute(&AttributeId::new(1))),
2027 Some(&ResolvedValue::Scalar(Some(Value::String(
2028 "tenant-a-running".into()
2029 ))))
2030 );
2031 assert_eq!(
2032 tenant_a_as_of
2033 .state
2034 .entity(&EntityId::new(1))
2035 .and_then(|entity| entity.attribute(&AttributeId::new(1))),
2036 Some(&ResolvedValue::Scalar(Some(Value::String(
2037 "tenant-a-open".into()
2038 ))))
2039 );
2040 assert_eq!(tenant_b_history.datoms.len(), 1);
2041 assert_eq!(
2042 tenant_b_history.datoms[0].value,
2043 Value::String("tenant-b-ready".into())
2044 );
2045 }
2046
2047 #[test]
2048 fn federated_history_requires_explicit_unique_partition_cuts() {
2049 let mut service = PartitionedInMemoryKernelService::new();
2050 service
2051 .append_partition(PartitionAppendRequest {
2052 partition: PartitionId::new("tenant-a"),
2053 leader_epoch: None,
2054 datoms: vec![sample_datom(1, 1, "alpha", 1, None)],
2055 })
2056 .expect("append tenant-a");
2057 service
2058 .append_partition(PartitionAppendRequest {
2059 partition: PartitionId::new("tenant-b"),
2060 leader_epoch: None,
2061 datoms: vec![sample_datom(
2062 1,
2063 1,
2064 "beta",
2065 1,
2066 Some(PolicyEnvelope {
2067 capabilities: vec!["ops".into()],
2068 visibilities: Vec::new(),
2069 }),
2070 )],
2071 })
2072 .expect("append tenant-b");
2073
2074 let federated = service
2075 .federated_history(FederatedHistoryRequest {
2076 cut: FederatedCut {
2077 cuts: vec![
2078 PartitionCut::current("tenant-b"),
2079 PartitionCut::current("tenant-a"),
2080 ],
2081 },
2082 policy_context: None,
2083 })
2084 .expect("federated history");
2085 assert_eq!(
2086 federated
2087 .partitions
2088 .iter()
2089 .map(|partition| partition.cut.to_string())
2090 .collect::<Vec<_>>(),
2091 vec![
2092 "tenant-a@current".to_string(),
2093 "tenant-b@current".to_string()
2094 ]
2095 );
2096 assert_eq!(federated.partitions[0].datoms.len(), 1);
2097 assert!(federated.partitions[1].datoms.is_empty());
2098
2099 let duplicate_partition = service.federated_history(FederatedHistoryRequest {
2100 cut: FederatedCut {
2101 cuts: vec![
2102 PartitionCut::current("tenant-a"),
2103 PartitionCut::as_of("tenant-a", ElementId::new(1)),
2104 ],
2105 },
2106 policy_context: None,
2107 });
2108 assert!(matches!(
2109 duplicate_partition,
2110 Err(crate::ApiError::Validation(message))
2111 if message == "federated cut contains duplicate partition tenant-a"
2112 ));
2113 }
2114
2115 #[test]
2116 fn unknown_partition_is_rejected_cleanly() {
2117 let service = PartitionedInMemoryKernelService::new();
2118 let error = service.partition_history(PartitionHistoryRequest {
2119 cut: PartitionCut::current("missing"),
2120 policy_context: None,
2121 });
2122 assert!(matches!(
2123 error,
2124 Err(crate::ApiError::Validation(message))
2125 if message == "unknown partition missing"
2126 ));
2127 }
2128
2129 #[test]
2130 fn imported_facts_and_federated_explain_reports_preserve_partition_cuts() {
2131 let mut service = PartitionedInMemoryKernelService::new();
2132 service
2133 .append_partition(PartitionAppendRequest {
2134 partition: PartitionId::new("readiness"),
2135 leader_epoch: None,
2136 datoms: vec![sample_datom(1, 1, "ready", 1, None)],
2137 })
2138 .expect("append readiness datoms");
2139 service
2140 .append_partition(PartitionAppendRequest {
2141 partition: PartitionId::new("authority"),
2142 leader_epoch: None,
2143 datoms: vec![sample_datom(1, 1, "worker-a", 3, None)],
2144 })
2145 .expect("append authority datoms");
2146
2147 let response = service
2148 .federated_run_document(FederatedRunDocumentRequest {
2149 dsl: federated_assignment_document(),
2150 imports: vec![
2151 ImportedFactQueryRequest {
2152 cut: PartitionCut::as_of("readiness", ElementId::new(1)),
2153 dsl: readiness_document(),
2154 predicate: PredicateRef {
2155 id: PredicateId::new(11),
2156 name: "imported_ready_task".into(),
2157 arity: 1,
2158 },
2159 query_name: Some("ready_now".into()),
2160 },
2161 ImportedFactQueryRequest {
2162 cut: PartitionCut::as_of("authority", ElementId::new(3)),
2163 dsl: authority_document(),
2164 predicate: PredicateRef {
2165 id: PredicateId::new(12),
2166 name: "imported_authorized_worker".into(),
2167 arity: 2,
2168 },
2169 query_name: Some("authorized_now".into()),
2170 },
2171 ],
2172 policy_context: None,
2173 })
2174 .expect("run federated document");
2175
2176 assert_eq!(
2177 response
2178 .cut
2179 .cuts
2180 .iter()
2181 .map(ToString::to_string)
2182 .collect::<Vec<_>>(),
2183 vec!["authority@e3".to_string(), "readiness@e1".to_string()]
2184 );
2185 assert_eq!(response.imports.len(), 2);
2186 assert_eq!(
2187 response
2188 .run
2189 .query
2190 .as_ref()
2191 .expect("primary query result")
2192 .rows[0]
2193 .values,
2194 vec![
2195 Value::Entity(EntityId::new(1)),
2196 Value::String("worker-a".into())
2197 ]
2198 );
2199
2200 let actionable_tuple_id = response
2201 .run
2202 .query
2203 .as_ref()
2204 .expect("primary query result")
2205 .rows[0]
2206 .tuple_id
2207 .expect("actionable tuple id");
2208 let actionable_tuple = response
2209 .run
2210 .derived
2211 .tuples
2212 .iter()
2213 .find(|tuple| tuple.tuple.id == actionable_tuple_id)
2214 .expect("actionable tuple");
2215 assert_eq!(
2216 actionable_tuple
2217 .metadata
2218 .imported_cuts
2219 .iter()
2220 .map(ToString::to_string)
2221 .collect::<Vec<_>>(),
2222 vec!["authority@e3".to_string(), "readiness@e1".to_string()]
2223 );
2224
2225 let trace = response
2226 .run
2227 .explains
2228 .iter()
2229 .find(|explain| explain.name.as_deref() == Some("actionable_trace"))
2230 .expect("actionable trace");
2231 let crate::ExplainArtifact::Tuple(trace) = &trace.result else {
2232 panic!("expected tuple trace");
2233 };
2234 assert_eq!(
2235 trace.tuples[0]
2236 .metadata
2237 .imported_cuts
2238 .iter()
2239 .map(ToString::to_string)
2240 .collect::<Vec<_>>(),
2241 vec!["authority@e3".to_string(), "readiness@e1".to_string()]
2242 );
2243
2244 let report = service
2245 .build_federated_explain_report(FederatedRunDocumentRequest {
2246 dsl: federated_assignment_document(),
2247 imports: vec![
2248 ImportedFactQueryRequest {
2249 cut: PartitionCut::as_of("readiness", ElementId::new(1)),
2250 dsl: readiness_document(),
2251 predicate: PredicateRef {
2252 id: PredicateId::new(11),
2253 name: "imported_ready_task".into(),
2254 arity: 1,
2255 },
2256 query_name: Some("ready_now".into()),
2257 },
2258 ImportedFactQueryRequest {
2259 cut: PartitionCut::as_of("authority", ElementId::new(3)),
2260 dsl: authority_document(),
2261 predicate: PredicateRef {
2262 id: PredicateId::new(12),
2263 name: "imported_authorized_worker".into(),
2264 arity: 2,
2265 },
2266 query_name: Some("authorized_now".into()),
2267 },
2268 ],
2269 policy_context: None,
2270 })
2271 .expect("build federated report");
2272 assert_eq!(report.traces.len(), 1);
2273 let markdown = render_federated_explain_report_markdown(&report);
2274 assert!(markdown.contains("authority@e3, readiness@e1"));
2275 assert!(markdown.contains("imported_authorized_worker"));
2276 assert!(markdown.contains("imported_ready_task"));
2277 }
2278
2279 #[test]
2280 fn imported_fact_queries_must_be_single_goal() {
2281 let mut service = PartitionedInMemoryKernelService::new();
2282 service
2283 .append_partition(PartitionAppendRequest {
2284 partition: PartitionId::new("joined"),
2285 leader_epoch: None,
2286 datoms: vec![
2287 sample_datom(1, 1, "ready", 1, None),
2288 Datom {
2289 entity: EntityId::new(1),
2290 attribute: AttributeId::new(2),
2291 value: Value::String("worker-a".into()),
2292 op: OperationKind::Assert,
2293 element: ElementId::new(2),
2294 replica: ReplicaId::new(1),
2295 causal_context: Default::default(),
2296 provenance: DatomProvenance::default(),
2297 policy: None,
2298 },
2299 ],
2300 })
2301 .expect("append joined datoms");
2302
2303 let error = service
2304 .import_partition_facts(
2305 ImportedFactQueryRequest {
2306 cut: PartitionCut::current("joined"),
2307 dsl: joined_import_document(),
2308 predicate: PredicateRef {
2309 id: PredicateId::new(21),
2310 name: "imported_assignment".into(),
2311 arity: 2,
2312 },
2313 query_name: Some("joined_now".into()),
2314 },
2315 None,
2316 )
2317 .expect_err("joined import should be rejected");
2318
2319 match error {
2320 crate::ApiError::Validation(message) => assert!(
2321 message
2322 .contains("must have exactly one goal so imported provenance maps to a single semantic row"),
2323 "{message}"
2324 ),
2325 other => panic!("unexpected error: {other:?}"),
2326 }
2327 }
2328
2329 #[test]
2330 fn hidden_partition_cut_is_rejected_under_policy() {
2331 let mut service = PartitionedInMemoryKernelService::new();
2332 service
2333 .append_partition(PartitionAppendRequest {
2334 partition: PartitionId::new("secure"),
2335 leader_epoch: None,
2336 datoms: vec![
2337 sample_datom(1, 1, "ready", 1, None),
2338 sample_datom(
2339 1,
2340 1,
2341 "running",
2342 2,
2343 Some(PolicyEnvelope {
2344 capabilities: vec!["ops".into()],
2345 visibilities: Vec::new(),
2346 }),
2347 ),
2348 ],
2349 })
2350 .expect("append secure datoms");
2351
2352 let history = service.partition_history(PartitionHistoryRequest {
2353 cut: PartitionCut::as_of("secure", ElementId::new(2)),
2354 policy_context: None,
2355 });
2356 assert!(matches!(
2357 history,
2358 Err(crate::ApiError::Validation(message))
2359 if message == "unknown element 2 for partition secure"
2360 ));
2361
2362 let state = service.partition_state(PartitionStateRequest {
2363 cut: PartitionCut::as_of("secure", ElementId::new(2)),
2364 schema: schema(),
2365 policy_context: None,
2366 });
2367 assert!(matches!(
2368 state,
2369 Err(crate::ApiError::Validation(message))
2370 if message == "unknown element 2 for partition secure"
2371 ));
2372
2373 let visible = service
2374 .partition_state(PartitionStateRequest {
2375 cut: PartitionCut::as_of("secure", ElementId::new(2)),
2376 schema: schema(),
2377 policy_context: Some(PolicyContext {
2378 capabilities: vec!["ops".into()],
2379 visibilities: Vec::new(),
2380 }),
2381 })
2382 .expect("authorized cut should resolve");
2383 assert_eq!(
2384 visible.cut,
2385 PartitionCut::as_of("secure", ElementId::new(2))
2386 );
2387 }
2388
2389 #[test]
2390 fn sqlite_partitioned_service_replays_federated_imports_after_restart() {
2391 let temp = TestPartitionDir::new("partitioned-sqlite");
2392 {
2393 let mut service =
2394 SqlitePartitionedKernelService::open(temp.path()).expect("open sqlite partitions");
2395 service
2396 .append_partition(PartitionAppendRequest {
2397 partition: PartitionId::new("readiness"),
2398 leader_epoch: None,
2399 datoms: vec![sample_datom(1, 1, "ready", 1, None)],
2400 })
2401 .expect("append readiness datoms");
2402 service
2403 .append_partition(PartitionAppendRequest {
2404 partition: PartitionId::new("authority"),
2405 leader_epoch: None,
2406 datoms: vec![sample_datom(1, 1, "worker-a", 3, None)],
2407 })
2408 .expect("append authority datoms");
2409 }
2410
2411 let mut service =
2412 SqlitePartitionedKernelService::open(temp.path()).expect("reopen sqlite partitions");
2413 let response = service
2414 .federated_run_document(FederatedRunDocumentRequest {
2415 dsl: federated_assignment_document(),
2416 imports: vec![
2417 ImportedFactQueryRequest {
2418 cut: PartitionCut::as_of("readiness", ElementId::new(1)),
2419 dsl: readiness_document(),
2420 predicate: PredicateRef {
2421 id: PredicateId::new(11),
2422 name: "imported_ready_task".into(),
2423 arity: 1,
2424 },
2425 query_name: Some("ready_now".into()),
2426 },
2427 ImportedFactQueryRequest {
2428 cut: PartitionCut::as_of("authority", ElementId::new(3)),
2429 dsl: authority_document(),
2430 predicate: PredicateRef {
2431 id: PredicateId::new(12),
2432 name: "imported_authorized_worker".into(),
2433 arity: 2,
2434 },
2435 query_name: Some("authorized_now".into()),
2436 },
2437 ],
2438 policy_context: None,
2439 })
2440 .expect("run federated document after restart");
2441
2442 assert_eq!(
2443 response.run.query.as_ref().expect("query result").rows[0].values,
2444 vec![
2445 Value::Entity(EntityId::new(1)),
2446 Value::String("worker-a".into())
2447 ]
2448 );
2449 let report = service
2450 .build_federated_explain_report(FederatedRunDocumentRequest {
2451 dsl: federated_assignment_document(),
2452 imports: vec![
2453 ImportedFactQueryRequest {
2454 cut: PartitionCut::as_of("readiness", ElementId::new(1)),
2455 dsl: readiness_document(),
2456 predicate: PredicateRef {
2457 id: PredicateId::new(11),
2458 name: "imported_ready_task".into(),
2459 arity: 1,
2460 },
2461 query_name: Some("ready_now".into()),
2462 },
2463 ImportedFactQueryRequest {
2464 cut: PartitionCut::as_of("authority", ElementId::new(3)),
2465 dsl: authority_document(),
2466 predicate: PredicateRef {
2467 id: PredicateId::new(12),
2468 name: "imported_authorized_worker".into(),
2469 arity: 2,
2470 },
2471 query_name: Some("authorized_now".into()),
2472 },
2473 ],
2474 policy_context: None,
2475 })
2476 .expect("build report after restart");
2477 assert!(render_federated_explain_report_markdown(&report)
2478 .contains("authority@e3, readiness@e1"));
2479 }
2480
2481 #[test]
2482 fn replicated_partition_service_replays_followers_and_fences_stale_epochs() {
2483 let temp = TestPartitionDir::new("replicated");
2484 let mut service = ReplicatedAuthorityPartitionService::open(
2485 temp.path(),
2486 vec![AuthorityPartitionConfig {
2487 partition: PartitionId::new("authority"),
2488 replicas: vec![
2489 ReplicaConfig {
2490 replica_id: ReplicaId::new(1),
2491 database_path: PathBuf::from("authority-leader.sqlite"),
2492 role: ReplicaRole::Leader,
2493 },
2494 ReplicaConfig {
2495 replica_id: ReplicaId::new(2),
2496 database_path: PathBuf::from("authority-follower.sqlite"),
2497 role: ReplicaRole::Follower,
2498 },
2499 ],
2500 }],
2501 )
2502 .expect("open replicated partitions");
2503
2504 let appended = service
2505 .append_partition(PartitionAppendRequest {
2506 partition: PartitionId::new("authority"),
2507 leader_epoch: None,
2508 datoms: vec![sample_datom(1, 1, "worker-a", 1, None)],
2509 })
2510 .expect("append through leader");
2511 assert_eq!(appended.leader_epoch.expect("leader epoch").0, 1);
2512
2513 let status = service.partition_status().expect("partition status");
2514 let authority = &status.partitions[0];
2515 assert_eq!(authority.replicas.len(), 2);
2516 assert!(authority.replicas.iter().all(|replica| replica.healthy));
2517 assert!(authority
2518 .replicas
2519 .iter()
2520 .all(|replica| replica.applied_element == Some(ElementId::new(1))));
2521
2522 let promoted = service
2523 .promote_replica(PromoteReplicaRequest {
2524 partition: PartitionId::new("authority"),
2525 replica_id: ReplicaId::new(2),
2526 })
2527 .expect("promote follower");
2528 assert_eq!(promoted.leader_epoch.0, 2);
2529
2530 let stale = service.append_partition(PartitionAppendRequest {
2531 partition: PartitionId::new("authority"),
2532 leader_epoch: Some(LeaderEpoch::new(1)),
2533 datoms: vec![sample_datom(1, 1, "worker-b", 2, None)],
2534 });
2535 assert!(matches!(
2536 stale,
2537 Err(crate::ApiError::Validation(message))
2538 if message.contains("stale leader epoch 1 for partition authority; current epoch is 2")
2539 ));
2540
2541 let appended = service
2542 .append_partition(PartitionAppendRequest {
2543 partition: PartitionId::new("authority"),
2544 leader_epoch: Some(LeaderEpoch::new(2)),
2545 datoms: vec![sample_datom(1, 1, "worker-b", 2, None)],
2546 })
2547 .expect("append after promotion");
2548 assert_eq!(appended.leader_epoch.expect("leader epoch").0, 2);
2549
2550 let history = service
2551 .partition_history(PartitionHistoryRequest {
2552 cut: PartitionCut::current("authority"),
2553 policy_context: None,
2554 })
2555 .expect("current history");
2556 assert_eq!(history.datoms.len(), 2);
2557
2558 let status = service
2559 .partition_status()
2560 .expect("partition status after promotion");
2561 let authority = &status.partitions[0];
2562 assert!(authority.replicas.iter().any(|replica| {
2563 replica.replica_id == ReplicaId::new(2)
2564 && replica.role == ReplicaRole::Leader
2565 && replica.leader_epoch.0 == 2
2566 }));
2567 assert!(authority
2568 .replicas
2569 .iter()
2570 .all(|replica| replica.applied_element == Some(ElementId::new(2))));
2571 }
2572
2573 #[test]
2574 fn replicated_partition_service_reuses_and_invalidates_federated_reads() {
2575 let temp = TestPartitionDir::new("replicated-cache");
2576 let mut service = ReplicatedAuthorityPartitionService::open(
2577 temp.path(),
2578 vec![
2579 AuthorityPartitionConfig {
2580 partition: PartitionId::new("readiness"),
2581 replicas: vec![ReplicaConfig {
2582 replica_id: ReplicaId::new(1),
2583 database_path: PathBuf::from("readiness.sqlite"),
2584 role: ReplicaRole::Leader,
2585 }],
2586 },
2587 AuthorityPartitionConfig {
2588 partition: PartitionId::new("authority"),
2589 replicas: vec![ReplicaConfig {
2590 replica_id: ReplicaId::new(1),
2591 database_path: PathBuf::from("authority.sqlite"),
2592 role: ReplicaRole::Leader,
2593 }],
2594 },
2595 ],
2596 )
2597 .expect("open replicated partitions");
2598 service
2599 .append_partition(PartitionAppendRequest {
2600 partition: PartitionId::new("readiness"),
2601 leader_epoch: None,
2602 datoms: vec![sample_datom(1, 1, "ready", 1, None)],
2603 })
2604 .expect("append readiness");
2605 service
2606 .append_partition(PartitionAppendRequest {
2607 partition: PartitionId::new("authority"),
2608 leader_epoch: None,
2609 datoms: vec![sample_datom(1, 1, "worker-a", 2, None)],
2610 })
2611 .expect("append authority");
2612
2613 let request = FederatedRunDocumentRequest {
2614 dsl: federated_assignment_query_document(),
2615 imports: vec![
2616 ImportedFactQueryRequest {
2617 cut: PartitionCut::as_of("readiness", ElementId::new(1)),
2618 dsl: readiness_document(),
2619 predicate: PredicateRef {
2620 id: PredicateId::new(11),
2621 name: "imported_ready_task".into(),
2622 arity: 1,
2623 },
2624 query_name: Some("ready_now".into()),
2625 },
2626 ImportedFactQueryRequest {
2627 cut: PartitionCut::as_of("authority", ElementId::new(2)),
2628 dsl: authority_document(),
2629 predicate: PredicateRef {
2630 id: PredicateId::new(12),
2631 name: "imported_authorized_worker".into(),
2632 arity: 2,
2633 },
2634 query_name: Some("authorized_now".into()),
2635 },
2636 ],
2637 policy_context: None,
2638 };
2639
2640 let first = service
2641 .federated_run_document(request.clone())
2642 .expect("first federated run");
2643 let second = service
2644 .federated_run_document(request.clone())
2645 .expect("second federated run");
2646 assert_eq!(first, second);
2647
2648 service
2649 .append_partition(PartitionAppendRequest {
2650 partition: PartitionId::new("authority"),
2651 leader_epoch: None,
2652 datoms: vec![sample_datom(1, 1, "worker-b", 3, None)],
2653 })
2654 .expect("append updated authority");
2655
2656 let updated = service
2657 .federated_run_document(FederatedRunDocumentRequest {
2658 dsl: federated_assignment_query_document(),
2659 imports: vec![
2660 ImportedFactQueryRequest {
2661 cut: PartitionCut::as_of("readiness", ElementId::new(1)),
2662 dsl: readiness_document(),
2663 predicate: PredicateRef {
2664 id: PredicateId::new(11),
2665 name: "imported_ready_task".into(),
2666 arity: 1,
2667 },
2668 query_name: Some("ready_now".into()),
2669 },
2670 ImportedFactQueryRequest {
2671 cut: PartitionCut::as_of("authority", ElementId::new(3)),
2672 dsl: authority_document(),
2673 predicate: PredicateRef {
2674 id: PredicateId::new(12),
2675 name: "imported_authorized_worker".into(),
2676 arity: 2,
2677 },
2678 query_name: Some("authorized_now".into()),
2679 },
2680 ],
2681 policy_context: None,
2682 })
2683 .expect("updated federated run");
2684 assert_eq!(
2685 updated.run.query.as_ref().expect("query result").rows[0].values,
2686 vec![
2687 Value::Entity(EntityId::new(1)),
2688 Value::String("worker-b".into())
2689 ]
2690 );
2691 }
2692
2693 fn schema() -> Schema {
2694 let mut schema = Schema::new("partitioned-v1");
2695 schema
2696 .register_attribute(AttributeSchema {
2697 id: AttributeId::new(1),
2698 name: "task.status".into(),
2699 class: AttributeClass::ScalarLww,
2700 value_type: ValueType::String,
2701 })
2702 .expect("register status attribute");
2703 schema
2704 }
2705
2706 fn readiness_document() -> String {
2707 r#"
2708schema {
2709 attr task.status: ScalarLWW<String>
2710}
2711
2712predicates {
2713 task_status(Entity, String)
2714 ready_task(Entity)
2715}
2716
2717rules {
2718 ready_task(t) <- task_status(t, "ready")
2719}
2720
2721materialize {
2722 ready_task
2723}
2724
2725query ready_now {
2726 current
2727 goal ready_task(t)
2728 keep t
2729}
2730"#
2731 .into()
2732 }
2733
2734 fn authority_document() -> String {
2735 r#"
2736schema {
2737 attr task.owner: ScalarLWW<String>
2738}
2739
2740predicates {
2741 task_owner(Entity, String)
2742 authorized_worker(Entity, String)
2743}
2744
2745rules {
2746 authorized_worker(t, worker) <- task_owner(t, worker)
2747}
2748
2749materialize {
2750 authorized_worker
2751}
2752
2753query authorized_now {
2754 current
2755 goal authorized_worker(t, worker)
2756 keep t, worker
2757}
2758"#
2759 .into()
2760 }
2761
2762 fn federated_assignment_document() -> String {
2763 r#"
2764schema {
2765}
2766
2767predicates {
2768 imported_ready_task(Entity)
2769 imported_authorized_worker(Entity, String)
2770 actionable_assignment(Entity, String)
2771}
2772
2773rules {
2774 actionable_assignment(t, worker) <- imported_ready_task(t), imported_authorized_worker(t, worker)
2775}
2776
2777materialize {
2778 actionable_assignment
2779}
2780
2781query actionable_now {
2782 current
2783 goal actionable_assignment(t, worker)
2784 keep t, worker
2785}
2786
2787explain actionable_trace {
2788 tuple actionable_assignment(entity(1), "worker-a")
2789}
2790"#
2791 .into()
2792 }
2793
2794 fn federated_assignment_query_document() -> String {
2795 r#"
2796schema {
2797}
2798
2799predicates {
2800 imported_ready_task(Entity)
2801 imported_authorized_worker(Entity, String)
2802 actionable_assignment(Entity, String)
2803}
2804
2805rules {
2806 actionable_assignment(t, worker) <- imported_ready_task(t), imported_authorized_worker(t, worker)
2807}
2808
2809materialize {
2810 actionable_assignment
2811}
2812
2813query actionable_now {
2814 current
2815 goal actionable_assignment(t, worker)
2816 keep t, worker
2817}
2818"#
2819 .into()
2820 }
2821
2822 fn joined_import_document() -> String {
2823 r#"
2824schema {
2825 attr task.status: ScalarLWW<String>
2826 attr task.owner: ScalarLWW<String>
2827}
2828
2829predicates {
2830 task_status(Entity, String)
2831 task_owner(Entity, String)
2832}
2833
2834rules {
2835}
2836
2837query joined_now {
2838 current
2839 goal task_status(t, "ready")
2840 goal task_owner(t, worker)
2841 keep t, worker
2842}
2843"#
2844 .into()
2845 }
2846
2847 fn sample_datom(
2848 entity: u64,
2849 attribute: u64,
2850 value: &str,
2851 element: u64,
2852 policy: Option<PolicyEnvelope>,
2853 ) -> Datom {
2854 Datom {
2855 entity: EntityId::new(entity),
2856 attribute: AttributeId::new(attribute),
2857 value: Value::String(value.into()),
2858 op: OperationKind::Assert,
2859 element: ElementId::new(element),
2860 replica: ReplicaId::new(1),
2861 causal_context: Default::default(),
2862 provenance: DatomProvenance::default(),
2863 policy,
2864 }
2865 }
2866
2867 struct TestPartitionDir {
2868 path: PathBuf,
2869 }
2870
2871 impl TestPartitionDir {
2872 fn new(name: &str) -> Self {
2873 let unique = NEXT_TEST_ID.fetch_add(1, Ordering::Relaxed);
2874 let nanos = SystemTime::now()
2875 .duration_since(UNIX_EPOCH)
2876 .expect("system time")
2877 .as_nanos();
2878 let mut path = std::env::temp_dir();
2879 path.push(format!("aether-partitions-{name}-{nanos}-{unique}"));
2880 Self { path }
2881 }
2882
2883 fn path(&self) -> &Path {
2884 &self.path
2885 }
2886 }
2887
2888 impl Drop for TestPartitionDir {
2889 fn drop(&mut self) {
2890 let _ = std::fs::remove_dir_all(&self.path);
2891 }
2892 }
2893}