aether_api/
partitioned.rs

1use crate::{
2    ApiError, AppendRequest, CurrentStateRequest, ExplainArtifact, HistoryRequest,
3    InMemoryKernelService, KernelService, NamedExplainResult, NamedQueryResult,
4    ParseDocumentRequest, RunDocumentRequest, RunDocumentResponse,
5};
6use aether_ast::{
7    merge_partition_cuts, policy_allows, Atom, Datom, ExplainSpec, ExplainTarget, ExtensionalFact,
8    FactProvenance, FederatedCut, PartitionCut, PartitionId, PolicyContext, PredicateRef,
9    QueryResult, QueryRow, ReplicaId, SourceRef, TemporalView, TupleId, Value,
10};
11use aether_explain::{Explainer, InMemoryExplainer};
12use aether_plan::CompiledProgram;
13use aether_resolver::ResolvedState;
14use aether_runtime::{execute_query, DerivedSet};
15use aether_schema::{PredicateSignature, Schema, ValueType};
16use indexmap::IndexMap;
17use serde::{Deserialize, Serialize};
18use std::{
19    cell::RefCell,
20    collections::BTreeSet,
21    fmt::Write as _,
22    fs,
23    path::{Path, PathBuf},
24    time::{SystemTime, UNIX_EPOCH},
25};
26
27#[derive(Debug, Default)]
28pub struct PartitionedInMemoryKernelService {
29    partitions: IndexMap<PartitionId, InMemoryKernelService>,
30}
31
32#[derive(Debug)]
33pub struct SqlitePartitionedKernelService {
34    root: PathBuf,
35    partitions: RefCell<IndexMap<PartitionId, crate::SqliteKernelService>>,
36}
37
38#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
39pub struct LeaderEpoch(pub u64);
40
41impl LeaderEpoch {
42    pub const fn new(value: u64) -> Self {
43        Self(value)
44    }
45}
46
47#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub enum ReplicaRole {
50    #[default]
51    Leader,
52    Follower,
53}
54
55#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
56pub struct ReplicaConfig {
57    pub replica_id: ReplicaId,
58    pub database_path: PathBuf,
59    pub role: ReplicaRole,
60}
61
62#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
63pub struct AuthorityPartitionConfig {
64    pub partition: PartitionId,
65    pub replicas: Vec<ReplicaConfig>,
66}
67
68#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
69pub struct ReplicaStatus {
70    pub partition: PartitionId,
71    pub replica_id: ReplicaId,
72    pub role: ReplicaRole,
73    pub leader_epoch: LeaderEpoch,
74    #[serde(default, skip_serializing_if = "Option::is_none")]
75    pub applied_element: Option<aether_ast::ElementId>,
76    pub replication_lag: u64,
77    pub healthy: bool,
78    #[serde(default, skip_serializing_if = "Option::is_none")]
79    pub detail: Option<String>,
80}
81
82#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
83pub struct PartitionStatus {
84    pub partition: PartitionId,
85    pub leader_epoch: LeaderEpoch,
86    pub replicas: Vec<ReplicaStatus>,
87}
88
89#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
90pub struct PartitionStatusResponse {
91    pub partitions: Vec<PartitionStatus>,
92}
93
94#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
95pub struct PromoteReplicaRequest {
96    pub partition: PartitionId,
97    pub replica_id: ReplicaId,
98}
99
100#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
101pub struct PromoteReplicaResponse {
102    pub partition: PartitionId,
103    pub replica_id: ReplicaId,
104    pub leader_epoch: LeaderEpoch,
105}
106
107#[derive(Debug)]
108pub struct ReplicatedAuthorityPartitionService {
109    root: PathBuf,
110    partitions: RefCell<IndexMap<PartitionId, ReplicatedPartition>>,
111    cache: ReplicatedReadCache,
112}
113
114#[derive(Debug)]
115struct ReplicatedPartition {
116    metadata_path: PathBuf,
117    metadata: ReplicatedPartitionMetadata,
118    replicas: IndexMap<ReplicaId, crate::SqliteKernelService>,
119}
120
121#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
122struct ReplicatedPartitionMetadata {
123    partition: PartitionId,
124    leader_epoch: LeaderEpoch,
125    replicas: Vec<ReplicaConfig>,
126}
127
128#[derive(Debug, Default)]
129struct ReplicatedReadCache {
130    federated_run: Option<(String, FederatedRunDocumentResponse)>,
131    federated_report: Option<(String, FederatedExplainReport)>,
132}
133
134impl ReplicatedReadCache {
135    fn clear(&mut self) {
136        self.federated_run = None;
137        self.federated_report = None;
138    }
139}
140
141impl PartitionedInMemoryKernelService {
142    pub fn new() -> Self {
143        Self::default()
144    }
145
146    pub fn append_partition(
147        &mut self,
148        request: PartitionAppendRequest,
149    ) -> Result<PartitionAppendResponse, ApiError> {
150        let PartitionAppendRequest {
151            partition, datoms, ..
152        } = request;
153        let response = self
154            .partitions
155            .entry(partition.clone())
156            .or_default()
157            .append(AppendRequest { datoms })?;
158        Ok(PartitionAppendResponse {
159            partition,
160            leader_epoch: None,
161            appended: response.appended,
162        })
163    }
164
165    pub fn partition_history(
166        &self,
167        request: PartitionHistoryRequest,
168    ) -> Result<PartitionHistoryResponse, ApiError> {
169        partition_history_for(self.partition_service(&request.cut.partition)?, request)
170    }
171
172    pub fn partition_state(
173        &self,
174        request: PartitionStateRequest,
175    ) -> Result<PartitionStateResponse, ApiError> {
176        partition_state_for(self.partition_service(&request.cut.partition)?, request)
177    }
178
179    pub fn federated_history(
180        &self,
181        request: FederatedHistoryRequest,
182    ) -> Result<FederatedHistoryResponse, ApiError> {
183        let cut = validate_federated_cut(request.cut)?;
184        let mut partitions = Vec::with_capacity(cut.cuts.len());
185        for partition_cut in cut.cuts {
186            partitions.push(self.partition_history(PartitionHistoryRequest {
187                cut: partition_cut,
188                policy_context: request.policy_context.clone(),
189            })?);
190        }
191        Ok(FederatedHistoryResponse { partitions })
192    }
193
194    pub fn import_partition_facts(
195        &mut self,
196        request: ImportedFactQueryRequest,
197        policy_context: Option<PolicyContext>,
198    ) -> Result<ImportedFactQueryResponse, ApiError> {
199        import_partition_facts_from_service(
200            self.partition_service_mut(&request.cut.partition)?,
201            request,
202            policy_context,
203        )
204    }
205
206    pub fn federated_run_document(
207        &mut self,
208        request: FederatedRunDocumentRequest,
209    ) -> Result<FederatedRunDocumentResponse, ApiError> {
210        let imports = request
211            .imports
212            .iter()
213            .cloned()
214            .map(|import| self.import_partition_facts(import, request.policy_context.clone()))
215            .collect::<Result<Vec<_>, ApiError>>()?;
216        execute_federated_document_request(request, imports)
217    }
218
219    pub fn build_federated_explain_report(
220        &mut self,
221        request: FederatedRunDocumentRequest,
222    ) -> Result<FederatedExplainReport, ApiError> {
223        let policy_context = request.policy_context.clone();
224        let response = self.federated_run_document(request)?;
225        Ok(build_federated_explain_report_from_response(
226            response,
227            policy_context,
228        ))
229    }
230
231    fn partition_service(
232        &self,
233        partition: &PartitionId,
234    ) -> Result<&InMemoryKernelService, ApiError> {
235        self.partitions
236            .get(partition)
237            .ok_or_else(|| ApiError::Validation(format!("unknown partition {}", partition)))
238    }
239
240    fn partition_service_mut(
241        &mut self,
242        partition: &PartitionId,
243    ) -> Result<&mut InMemoryKernelService, ApiError> {
244        self.partitions
245            .get_mut(partition)
246            .ok_or_else(|| ApiError::Validation(format!("unknown partition {}", partition)))
247    }
248}
249
250impl SqlitePartitionedKernelService {
251    pub fn open(root: impl AsRef<Path>) -> Result<Self, ApiError> {
252        let root = root.as_ref().to_path_buf();
253        fs::create_dir_all(&root).map_err(|error| {
254            ApiError::Validation(format!(
255                "failed to create partition root {}: {}",
256                root.display(),
257                error
258            ))
259        })?;
260        Ok(Self {
261            root,
262            partitions: RefCell::new(IndexMap::new()),
263        })
264    }
265
266    pub fn root(&self) -> &Path {
267        &self.root
268    }
269
270    pub fn append_partition(
271        &mut self,
272        request: PartitionAppendRequest,
273    ) -> Result<PartitionAppendResponse, ApiError> {
274        let PartitionAppendRequest {
275            partition, datoms, ..
276        } = request;
277        self.ensure_partition_open(&partition, true)?;
278        let mut partitions = self.partitions.borrow_mut();
279        let response = partitions
280            .get_mut(&partition)
281            .expect("partition should be open")
282            .append(AppendRequest { datoms })?;
283        Ok(PartitionAppendResponse {
284            partition,
285            leader_epoch: None,
286            appended: response.appended,
287        })
288    }
289
290    pub fn partition_history(
291        &self,
292        request: PartitionHistoryRequest,
293    ) -> Result<PartitionHistoryResponse, ApiError> {
294        self.ensure_partition_open(&request.cut.partition, false)?;
295        let mut partitions = self.partitions.borrow_mut();
296        let service = partitions
297            .get_mut(&request.cut.partition)
298            .expect("partition should be open");
299        partition_history_for(service, request)
300    }
301
302    pub fn partition_state(
303        &self,
304        request: PartitionStateRequest,
305    ) -> Result<PartitionStateResponse, ApiError> {
306        self.ensure_partition_open(&request.cut.partition, false)?;
307        let mut partitions = self.partitions.borrow_mut();
308        let service = partitions
309            .get_mut(&request.cut.partition)
310            .expect("partition should be open");
311        partition_state_for(service, request)
312    }
313
314    pub fn federated_history(
315        &self,
316        request: FederatedHistoryRequest,
317    ) -> Result<FederatedHistoryResponse, ApiError> {
318        let cut = validate_federated_cut(request.cut)?;
319        let mut partitions = Vec::with_capacity(cut.cuts.len());
320        for partition_cut in cut.cuts {
321            partitions.push(self.partition_history(PartitionHistoryRequest {
322                cut: partition_cut,
323                policy_context: request.policy_context.clone(),
324            })?);
325        }
326        Ok(FederatedHistoryResponse { partitions })
327    }
328
329    pub fn import_partition_facts(
330        &mut self,
331        request: ImportedFactQueryRequest,
332        policy_context: Option<PolicyContext>,
333    ) -> Result<ImportedFactQueryResponse, ApiError> {
334        self.ensure_partition_open(&request.cut.partition, false)?;
335        let mut partitions = self.partitions.borrow_mut();
336        let service = partitions
337            .get_mut(&request.cut.partition)
338            .expect("partition should be open");
339        import_partition_facts_from_service(service, request, policy_context)
340    }
341
342    pub fn federated_run_document(
343        &mut self,
344        request: FederatedRunDocumentRequest,
345    ) -> Result<FederatedRunDocumentResponse, ApiError> {
346        let imports = request
347            .imports
348            .iter()
349            .cloned()
350            .map(|import| self.import_partition_facts(import, request.policy_context.clone()))
351            .collect::<Result<Vec<_>, ApiError>>()?;
352        execute_federated_document_request(request, imports)
353    }
354
355    pub fn build_federated_explain_report(
356        &mut self,
357        request: FederatedRunDocumentRequest,
358    ) -> Result<FederatedExplainReport, ApiError> {
359        let policy_context = request.policy_context.clone();
360        let response = self.federated_run_document(request)?;
361        Ok(build_federated_explain_report_from_response(
362            response,
363            policy_context,
364        ))
365    }
366
367    fn ensure_partition_open(
368        &self,
369        partition: &PartitionId,
370        create_if_missing: bool,
371    ) -> Result<(), ApiError> {
372        if self.partitions.borrow().contains_key(partition) {
373            return Ok(());
374        }
375
376        let path = self.partition_path(partition);
377        if !create_if_missing && !path.exists() {
378            return Err(ApiError::Validation(format!(
379                "unknown partition {}",
380                partition
381            )));
382        }
383
384        let service = crate::SqliteKernelService::open(&path)?;
385        self.partitions
386            .borrow_mut()
387            .insert(partition.clone(), service);
388        Ok(())
389    }
390
391    fn partition_path(&self, partition: &PartitionId) -> PathBuf {
392        self.root.join(format!(
393            "partition-{}.sqlite",
394            encode_partition_id(partition)
395        ))
396    }
397}
398
399impl ReplicatedAuthorityPartitionService {
400    pub fn open(
401        root: impl AsRef<Path>,
402        configs: Vec<AuthorityPartitionConfig>,
403    ) -> Result<Self, ApiError> {
404        let root = root.as_ref().to_path_buf();
405        fs::create_dir_all(&root).map_err(|error| {
406            ApiError::Validation(format!(
407                "failed to create replication root {}: {}",
408                root.display(),
409                error
410            ))
411        })?;
412        let mut partitions = IndexMap::new();
413        for config in configs {
414            let partition = config.partition.clone();
415            let runtime = ReplicatedPartition::open(&root, config)?;
416            partitions.insert(partition, runtime);
417        }
418        Ok(Self {
419            root,
420            partitions: RefCell::new(partitions),
421            cache: ReplicatedReadCache::default(),
422        })
423    }
424
425    pub fn root(&self) -> &Path {
426        &self.root
427    }
428
429    pub fn append_partition(
430        &mut self,
431        request: PartitionAppendRequest,
432    ) -> Result<PartitionAppendResponse, ApiError> {
433        self.cache.clear();
434        let mut partitions = self.partitions.borrow_mut();
435        let partition = partitions.get_mut(&request.partition).ok_or_else(|| {
436            ApiError::Validation(format!("unknown partition {}", request.partition))
437        })?;
438        partition.append(request)
439    }
440
441    pub fn partition_history(
442        &self,
443        request: PartitionHistoryRequest,
444    ) -> Result<PartitionHistoryResponse, ApiError> {
445        let mut partitions = self.partitions.borrow_mut();
446        let partition = partitions.get_mut(&request.cut.partition).ok_or_else(|| {
447            ApiError::Validation(format!("unknown partition {}", request.cut.partition))
448        })?;
449        partition_history_for(partition.leader_service_mut()?, request)
450    }
451
452    pub fn partition_state(
453        &self,
454        request: PartitionStateRequest,
455    ) -> Result<PartitionStateResponse, ApiError> {
456        let mut partitions = self.partitions.borrow_mut();
457        let partition = partitions.get_mut(&request.cut.partition).ok_or_else(|| {
458            ApiError::Validation(format!("unknown partition {}", request.cut.partition))
459        })?;
460        partition_state_for(partition.leader_service_mut()?, request)
461    }
462
463    pub fn federated_history(
464        &self,
465        request: FederatedHistoryRequest,
466    ) -> Result<FederatedHistoryResponse, ApiError> {
467        let cut = validate_federated_cut(request.cut)?;
468        let mut partitions = Vec::with_capacity(cut.cuts.len());
469        for partition_cut in cut.cuts {
470            partitions.push(self.partition_history(PartitionHistoryRequest {
471                cut: partition_cut,
472                policy_context: request.policy_context.clone(),
473            })?);
474        }
475        Ok(FederatedHistoryResponse { partitions })
476    }
477
478    pub fn import_partition_facts(
479        &mut self,
480        request: ImportedFactQueryRequest,
481        policy_context: Option<PolicyContext>,
482    ) -> Result<ImportedFactQueryResponse, ApiError> {
483        let mut partitions = self.partitions.borrow_mut();
484        let partition = partitions.get_mut(&request.cut.partition).ok_or_else(|| {
485            ApiError::Validation(format!("unknown partition {}", request.cut.partition))
486        })?;
487        import_partition_facts_from_service(
488            partition.leader_service_mut()?,
489            request,
490            policy_context,
491        )
492    }
493
494    pub fn federated_run_document(
495        &mut self,
496        request: FederatedRunDocumentRequest,
497    ) -> Result<FederatedRunDocumentResponse, ApiError> {
498        let cache_key = cache_key(&request)?;
499        if let Some((key, response)) = &self.cache.federated_run {
500            if *key == cache_key {
501                return Ok(response.clone());
502            }
503        }
504        let imports = request
505            .imports
506            .iter()
507            .cloned()
508            .map(|import| self.import_partition_facts(import, request.policy_context.clone()))
509            .collect::<Result<Vec<_>, ApiError>>()?;
510        let response = execute_federated_document_request(request, imports)?;
511        self.cache
512            .federated_run
513            .replace((cache_key, response.clone()));
514        Ok(response)
515    }
516
517    pub fn build_federated_explain_report(
518        &mut self,
519        request: FederatedRunDocumentRequest,
520    ) -> Result<FederatedExplainReport, ApiError> {
521        let cache_key = cache_key(&request)?;
522        if let Some((key, response)) = &self.cache.federated_report {
523            if *key == cache_key {
524                return Ok(response.clone());
525            }
526        }
527        let policy_context = request.policy_context.clone();
528        let response = self.federated_run_document(request)?;
529        let report = build_federated_explain_report_from_response(response, policy_context);
530        self.cache
531            .federated_report
532            .replace((cache_key, report.clone()));
533        Ok(report)
534    }
535
536    pub fn partition_status(&self) -> Result<PartitionStatusResponse, ApiError> {
537        let mut statuses = Vec::new();
538        for partition in self.partitions.borrow().values() {
539            statuses.push(partition.status()?);
540        }
541        Ok(PartitionStatusResponse {
542            partitions: statuses,
543        })
544    }
545
546    pub fn promote_replica(
547        &mut self,
548        request: PromoteReplicaRequest,
549    ) -> Result<PromoteReplicaResponse, ApiError> {
550        self.cache.clear();
551        let mut partitions = self.partitions.borrow_mut();
552        let partition = partitions.get_mut(&request.partition).ok_or_else(|| {
553            ApiError::Validation(format!("unknown partition {}", request.partition))
554        })?;
555        let leader_epoch = partition.promote(request.replica_id)?;
556        Ok(PromoteReplicaResponse {
557            partition: request.partition,
558            replica_id: request.replica_id,
559            leader_epoch,
560        })
561    }
562}
563
564impl ReplicatedPartition {
565    fn open(root: &Path, config: AuthorityPartitionConfig) -> Result<Self, ApiError> {
566        if config.replicas.is_empty() {
567            return Err(ApiError::Validation(format!(
568                "replicated partition {} must declare at least one replica",
569                config.partition
570            )));
571        }
572        let leader_count = config
573            .replicas
574            .iter()
575            .filter(|replica| replica.role == ReplicaRole::Leader)
576            .count();
577        if leader_count != 1 {
578            return Err(ApiError::Validation(format!(
579                "replicated partition {} must declare exactly one leader replica",
580                config.partition
581            )));
582        }
583        let metadata_path = root.join(format!(
584            "replication-{}.json",
585            encode_partition_id(&config.partition)
586        ));
587        let metadata = if metadata_path.exists() {
588            let contents = fs::read_to_string(metadata_path.clone()).map_err(|error| {
589                ApiError::Validation(format!(
590                    "failed to read replication metadata {}: {}",
591                    metadata_path.display(),
592                    error
593                ))
594            })?;
595            serde_json::from_str::<ReplicatedPartitionMetadata>(&contents).map_err(|error| {
596                ApiError::Validation(format!(
597                    "failed to parse replication metadata {}: {}",
598                    metadata_path.display(),
599                    error
600                ))
601            })?
602        } else {
603            let metadata = ReplicatedPartitionMetadata {
604                partition: config.partition.clone(),
605                leader_epoch: LeaderEpoch::new(1),
606                replicas: config.replicas,
607            };
608            write_partition_metadata(&metadata_path, &metadata)?;
609            metadata
610        };
611
612        let mut seen = BTreeSet::new();
613        let leader_count = metadata
614            .replicas
615            .iter()
616            .filter(|replica| replica.role == ReplicaRole::Leader)
617            .count();
618        if leader_count != 1 {
619            return Err(ApiError::Validation(format!(
620                "replicated partition {} metadata must contain exactly one leader",
621                metadata.partition
622            )));
623        }
624
625        let mut replicas = IndexMap::new();
626        for replica in &metadata.replicas {
627            if !seen.insert(replica.replica_id) {
628                return Err(ApiError::Validation(format!(
629                    "replicated partition {} has duplicate replica {}",
630                    metadata.partition, replica.replica_id.0
631                )));
632            }
633            let database_path = if replica.database_path.is_absolute() {
634                replica.database_path.clone()
635            } else {
636                root.join(&replica.database_path)
637            };
638            replicas.insert(
639                replica.replica_id,
640                crate::SqliteKernelService::open(database_path)?,
641            );
642        }
643
644        let mut partition = Self {
645            metadata_path,
646            metadata,
647            replicas,
648        };
649        partition.replicate_followers()?;
650        Ok(partition)
651    }
652
653    fn append(
654        &mut self,
655        request: PartitionAppendRequest,
656    ) -> Result<PartitionAppendResponse, ApiError> {
657        if let Some(leader_epoch) = request.leader_epoch.as_ref() {
658            if *leader_epoch != self.metadata.leader_epoch {
659                return Err(ApiError::Validation(format!(
660                    "stale leader epoch {} for partition {}; current epoch is {}",
661                    leader_epoch.0, request.partition, self.metadata.leader_epoch.0
662                )));
663            }
664        }
665        let leader_id = self.leader_id()?;
666        self.append_via_replica(leader_id, request)
667    }
668
669    fn append_via_replica(
670        &mut self,
671        replica_id: ReplicaId,
672        request: PartitionAppendRequest,
673    ) -> Result<PartitionAppendResponse, ApiError> {
674        let config = self.replica_config(replica_id)?;
675        if config.role != ReplicaRole::Leader {
676            return Err(ApiError::Validation(format!(
677                "replica {} for partition {} is read-only follower",
678                replica_id.0, request.partition
679            )));
680        }
681        let service = self
682            .replicas
683            .get_mut(&replica_id)
684            .ok_or_else(|| ApiError::Validation(format!("unknown replica {}", replica_id.0)))?;
685        let response = service.append(AppendRequest {
686            datoms: request.datoms,
687        })?;
688        self.replicate_followers()?;
689        Ok(PartitionAppendResponse {
690            partition: request.partition,
691            leader_epoch: Some(self.metadata.leader_epoch.clone()),
692            appended: response.appended,
693        })
694    }
695
696    fn leader_service_mut(&mut self) -> Result<&mut crate::SqliteKernelService, ApiError> {
697        let leader_id = self.leader_id()?;
698        self.replicas
699            .get_mut(&leader_id)
700            .ok_or_else(|| ApiError::Validation(format!("unknown leader replica {}", leader_id.0)))
701    }
702
703    fn leader_id(&self) -> Result<ReplicaId, ApiError> {
704        self.metadata
705            .replicas
706            .iter()
707            .find(|replica| replica.role == ReplicaRole::Leader)
708            .map(|replica| replica.replica_id)
709            .ok_or_else(|| {
710                ApiError::Validation(format!(
711                    "partition {} has no leader replica",
712                    self.metadata.partition
713                ))
714            })
715    }
716
717    fn promote(&mut self, replica_id: ReplicaId) -> Result<LeaderEpoch, ApiError> {
718        if !self.replicas.contains_key(&replica_id) {
719            return Err(ApiError::Validation(format!(
720                "unknown replica {} for partition {}",
721                replica_id.0, self.metadata.partition
722            )));
723        }
724        let leader_history = {
725            let leader = self.leader_service_mut()?;
726            leader
727                .history(HistoryRequest {
728                    policy_context: None,
729                })?
730                .datoms
731        };
732        let replica = self
733            .replicas
734            .get_mut(&replica_id)
735            .ok_or_else(|| ApiError::Validation(format!("unknown replica {}", replica_id.0)))?;
736        sync_replica_history(
737            replica,
738            &leader_history,
739            &self.metadata.partition,
740            replica_id,
741        )?;
742
743        for config in &mut self.metadata.replicas {
744            config.role = if config.replica_id == replica_id {
745                ReplicaRole::Leader
746            } else {
747                ReplicaRole::Follower
748            };
749        }
750        self.metadata.leader_epoch.0 += 1;
751        write_partition_metadata(&self.metadata_path, &self.metadata)?;
752        self.replicate_followers()?;
753        Ok(self.metadata.leader_epoch.clone())
754    }
755
756    fn replicate_followers(&mut self) -> Result<(), ApiError> {
757        let leader_id = self.leader_id()?;
758        let leader_history = {
759            let leader = self
760                .replicas
761                .get_mut(&leader_id)
762                .ok_or_else(|| ApiError::Validation(format!("unknown leader {}", leader_id.0)))?;
763            leader
764                .history(HistoryRequest {
765                    policy_context: None,
766                })?
767                .datoms
768        };
769        for replica in &self.metadata.replicas {
770            if replica.replica_id == leader_id {
771                continue;
772            }
773            let follower = self.replicas.get_mut(&replica.replica_id).ok_or_else(|| {
774                ApiError::Validation(format!("unknown replica {}", replica.replica_id.0))
775            })?;
776            sync_replica_history(
777                follower,
778                &leader_history,
779                &self.metadata.partition,
780                replica.replica_id,
781            )?;
782        }
783        Ok(())
784    }
785
786    fn status(&self) -> Result<PartitionStatus, ApiError> {
787        let leader_id = self.leader_id()?;
788        let leader_history = self
789            .replicas
790            .get(&leader_id)
791            .ok_or_else(|| ApiError::Validation(format!("unknown leader {}", leader_id.0)))?
792            .history(HistoryRequest {
793                policy_context: None,
794            })?
795            .datoms;
796        let leader_last = leader_history.last().map(|datom| datom.element);
797
798        let mut replicas = Vec::new();
799        for config in &self.metadata.replicas {
800            let service = self.replicas.get(&config.replica_id).ok_or_else(|| {
801                ApiError::Validation(format!("unknown replica {}", config.replica_id.0))
802            })?;
803            let history = service
804                .history(HistoryRequest {
805                    policy_context: None,
806                })?
807                .datoms;
808            let last = history.last().map(|datom| datom.element);
809            let mismatch = validate_replica_prefix(&history, &leader_history).err();
810            replicas.push(ReplicaStatus {
811                partition: self.metadata.partition.clone(),
812                replica_id: config.replica_id,
813                role: config.role,
814                leader_epoch: self.metadata.leader_epoch.clone(),
815                applied_element: last,
816                replication_lag: leader_last
817                    .zip(last)
818                    .map(|(leader, replica)| leader.0.saturating_sub(replica.0))
819                    .unwrap_or_else(|| leader_last.map(|leader| leader.0).unwrap_or(0)),
820                healthy: mismatch.is_none(),
821                detail: mismatch,
822            });
823        }
824
825        Ok(PartitionStatus {
826            partition: self.metadata.partition.clone(),
827            leader_epoch: self.metadata.leader_epoch.clone(),
828            replicas,
829        })
830    }
831
832    fn replica_config(&self, replica_id: ReplicaId) -> Result<&ReplicaConfig, ApiError> {
833        self.metadata
834            .replicas
835            .iter()
836            .find(|replica| replica.replica_id == replica_id)
837            .ok_or_else(|| {
838                ApiError::Validation(format!(
839                    "unknown replica {} for partition {}",
840                    replica_id.0, self.metadata.partition
841                ))
842            })
843    }
844}
845
846fn write_partition_metadata(
847    path: &Path,
848    metadata: &ReplicatedPartitionMetadata,
849) -> Result<(), ApiError> {
850    if let Some(parent) = path.parent() {
851        fs::create_dir_all(parent).map_err(|error| {
852            ApiError::Validation(format!(
853                "failed to create replication metadata directory {}: {}",
854                parent.display(),
855                error
856            ))
857        })?;
858    }
859    let encoded = serde_json::to_string_pretty(metadata).map_err(|error| {
860        ApiError::Validation(format!(
861            "failed to encode replication metadata {}: {}",
862            path.display(),
863            error
864        ))
865    })?;
866    fs::write(path, encoded).map_err(|error| {
867        ApiError::Validation(format!(
868            "failed to write replication metadata {}: {}",
869            path.display(),
870            error
871        ))
872    })
873}
874
875fn sync_replica_history(
876    replica: &mut crate::SqliteKernelService,
877    leader_history: &[Datom],
878    partition: &PartitionId,
879    replica_id: ReplicaId,
880) -> Result<(), ApiError> {
881    let follower_history = replica
882        .history(HistoryRequest {
883            policy_context: None,
884        })?
885        .datoms;
886    validate_replica_prefix(&follower_history, leader_history).map_err(|detail| {
887        ApiError::Validation(format!(
888            "replica {} for partition {} diverged from leader prefix: {}",
889            replica_id.0, partition, detail
890        ))
891    })?;
892    if follower_history.len() < leader_history.len() {
893        replica.append(AppendRequest {
894            datoms: leader_history[follower_history.len()..].to_vec(),
895        })?;
896    }
897    Ok(())
898}
899
900fn validate_replica_prefix(
901    follower_history: &[Datom],
902    leader_history: &[Datom],
903) -> Result<(), String> {
904    if follower_history.len() > leader_history.len() {
905        return Err("follower history exceeds leader history".into());
906    }
907    for (index, (follower, leader)) in follower_history
908        .iter()
909        .zip(leader_history.iter())
910        .enumerate()
911    {
912        if follower != leader {
913            return Err(format!("entry {} does not match leader", index));
914        }
915    }
916    Ok(())
917}
918
919fn validate_federated_cut(cut: FederatedCut) -> Result<FederatedCut, ApiError> {
920    let normalized = cut.normalized();
921    for pair in normalized.cuts.windows(2) {
922        if pair[0].partition == pair[1].partition {
923            return Err(ApiError::Validation(format!(
924                "federated cut contains duplicate partition {}",
925                pair[0].partition
926            )));
927        }
928    }
929    Ok(normalized)
930}
931
932fn partition_history_for(
933    service: &dyn KernelService,
934    request: PartitionHistoryRequest,
935) -> Result<PartitionHistoryResponse, ApiError> {
936    let datoms = match request.cut.as_of {
937        Some(element) => {
938            let visible_history = service
939                .history(HistoryRequest {
940                    policy_context: request.policy_context.clone(),
941                })?
942                .datoms;
943            let end = visible_history
944                .iter()
945                .position(|datom| datom.element == element)
946                .ok_or_else(|| {
947                    ApiError::Validation(format!(
948                        "unknown element {} for partition {}",
949                        element, request.cut.partition
950                    ))
951                })?;
952            visible_history[..=end].to_vec()
953        }
954        None => {
955            service
956                .history(HistoryRequest {
957                    policy_context: request.policy_context.clone(),
958                })?
959                .datoms
960        }
961    };
962
963    Ok(PartitionHistoryResponse {
964        cut: request.cut,
965        datoms,
966    })
967}
968
969fn partition_state_for(
970    service: &dyn KernelService,
971    request: PartitionStateRequest,
972) -> Result<PartitionStateResponse, ApiError> {
973    let state = match request.cut.as_of {
974        Some(element) => {
975            let visible_history = service
976                .history(HistoryRequest {
977                    policy_context: request.policy_context.clone(),
978                })?
979                .datoms;
980            if !visible_history.iter().any(|datom| datom.element == element) {
981                return Err(ApiError::Validation(format!(
982                    "unknown element {} for partition {}",
983                    element, request.cut.partition
984                )));
985            }
986            service
987                .as_of(crate::AsOfRequest {
988                    schema: request.schema,
989                    datoms: Vec::new(),
990                    at: element,
991                    policy_context: request.policy_context,
992                })?
993                .state
994        }
995        None => {
996            service
997                .current_state(CurrentStateRequest {
998                    schema: request.schema,
999                    datoms: Vec::new(),
1000                    policy_context: request.policy_context,
1001                })?
1002                .state
1003        }
1004    };
1005
1006    Ok(PartitionStateResponse {
1007        cut: request.cut,
1008        state,
1009    })
1010}
1011
1012fn import_partition_facts_from_service(
1013    service: &mut dyn KernelService,
1014    request: ImportedFactQueryRequest,
1015    policy_context: Option<PolicyContext>,
1016) -> Result<ImportedFactQueryResponse, ApiError> {
1017    let parsed = service.parse_document(ParseDocumentRequest {
1018        dsl: request.dsl.clone(),
1019    })?;
1020    let query_spec = select_query_spec(&parsed, request.query_name.as_deref())?;
1021    ensure_importable_query_shape(query_spec, request.query_name.as_deref())?;
1022    let response = service.run_document(RunDocumentRequest {
1023        dsl: request.dsl.clone(),
1024        policy_context,
1025    })?;
1026    let result = select_query_result(&response, request.query_name.as_deref())?;
1027    let tuple_index = response
1028        .derived
1029        .tuples
1030        .iter()
1031        .map(|tuple| (tuple.tuple.id, tuple))
1032        .collect::<IndexMap<_, _>>();
1033
1034    let facts = result
1035        .rows
1036        .iter()
1037        .enumerate()
1038        .map(|(index, row)| {
1039            build_imported_fact(&request, index, row, &tuple_index, &response.derived)
1040        })
1041        .collect::<Result<Vec<_>, ApiError>>()?;
1042
1043    Ok(ImportedFactQueryResponse {
1044        cut: request.cut,
1045        predicate: request.predicate,
1046        query_name: request.query_name,
1047        row_count: result.rows.len(),
1048        facts,
1049    })
1050}
1051
1052fn execute_federated_document_request(
1053    request: FederatedRunDocumentRequest,
1054    imports: Vec<ImportedFactQueryResponse>,
1055) -> Result<FederatedRunDocumentResponse, ApiError> {
1056    let cut = federated_cut_from_imports(&imports)?;
1057
1058    let mut local = InMemoryKernelService::new();
1059    let parsed = local.parse_document(ParseDocumentRequest {
1060        dsl: request.dsl.clone(),
1061    })?;
1062    ensure_federated_document_uses_current_views(&parsed)?;
1063
1064    let mut schema = parsed.schema.clone();
1065    let mut program = parsed.program.clone();
1066    let predicate_lookup = parsed
1067        .program
1068        .predicates
1069        .iter()
1070        .map(|predicate| (predicate.name.clone(), predicate.clone()))
1071        .collect::<IndexMap<_, _>>();
1072    for import in &imports {
1073        for fact in &import.facts {
1074            let mut fact = fact.clone();
1075            let predicate = predicate_lookup
1076                .get(&fact.predicate.name)
1077                .ok_or_else(|| {
1078                    ApiError::Validation(format!(
1079                        "federated document does not declare imported predicate {}",
1080                        fact.predicate.name
1081                    ))
1082                })?
1083                .clone();
1084            if predicate.arity != fact.values.len() {
1085                return Err(ApiError::Validation(format!(
1086                    "federated document predicate {} expects arity {}, but imported fact supplied {} value(s)",
1087                    predicate.name,
1088                    predicate.arity,
1089                    fact.values.len()
1090                )));
1091            }
1092            fact.predicate = predicate;
1093            program.facts.push(fact);
1094        }
1095    }
1096    ensure_schema_covers_fact_predicates(&mut schema, &program.facts)?;
1097
1098    let compiled = local.compile_program(crate::CompileProgramRequest {
1099        schema: schema.clone(),
1100        program,
1101    })?;
1102    let derived = local
1103        .evaluate_program(crate::EvaluateProgramRequest {
1104            state: ResolvedState::default(),
1105            program: compiled.program.clone(),
1106            policy_context: request.policy_context.clone(),
1107        })?
1108        .derived;
1109    let visible_program =
1110        filter_compiled_program_facts(&compiled.program, request.policy_context.as_ref());
1111    let visible_derived = filter_derived_set(&derived, request.policy_context.as_ref());
1112
1113    let query = match &parsed.query {
1114        Some(query) => Some(execute_query(
1115            &ResolvedState::default(),
1116            &visible_program,
1117            &visible_derived,
1118            &query.query,
1119            request.policy_context.as_ref(),
1120        )?),
1121        None => None,
1122    };
1123    let queries = parsed
1124        .queries
1125        .iter()
1126        .map(|named_query| {
1127            Ok(NamedQueryResult {
1128                name: named_query.name.clone(),
1129                spec: named_query.spec.clone(),
1130                result: execute_query(
1131                    &ResolvedState::default(),
1132                    &visible_program,
1133                    &visible_derived,
1134                    &named_query.spec.query,
1135                    request.policy_context.as_ref(),
1136                )?,
1137            })
1138        })
1139        .collect::<Result<Vec<_>, ApiError>>()?;
1140    let explains = parsed
1141        .explains
1142        .iter()
1143        .map(|named_explain| {
1144            Ok(NamedExplainResult {
1145                name: named_explain.name.clone(),
1146                spec: named_explain.spec.clone(),
1147                result: execute_federated_explain_spec(
1148                    &visible_program,
1149                    &derived,
1150                    &visible_derived,
1151                    &named_explain.spec,
1152                    request.policy_context.as_ref(),
1153                )?,
1154            })
1155        })
1156        .collect::<Result<Vec<_>, ApiError>>()?;
1157
1158    Ok(FederatedRunDocumentResponse {
1159        cut,
1160        imports,
1161        run: RunDocumentResponse {
1162            state: ResolvedState::default(),
1163            program: visible_program,
1164            derived: visible_derived,
1165            query,
1166            queries,
1167            explains,
1168        },
1169    })
1170}
1171
1172fn build_federated_explain_report_from_response(
1173    response: FederatedRunDocumentResponse,
1174    policy_context: Option<PolicyContext>,
1175) -> FederatedExplainReport {
1176    let primary_query = response
1177        .run
1178        .query
1179        .as_ref()
1180        .map(report_rows)
1181        .unwrap_or_default();
1182    let named_queries = response
1183        .run
1184        .queries
1185        .iter()
1186        .map(|query| FederatedNamedQuerySummary {
1187            name: query.name.clone(),
1188            rows: report_rows(&query.result),
1189        })
1190        .collect::<Vec<_>>();
1191    let traces = response
1192        .run
1193        .explains
1194        .iter()
1195        .filter_map(|explain| match &explain.result {
1196            ExplainArtifact::Tuple(trace) => Some(build_trace_summary(explain.name.clone(), trace)),
1197            ExplainArtifact::Plan(_) => None,
1198        })
1199        .collect::<Vec<_>>();
1200
1201    FederatedExplainReport {
1202        generated_at_ms: now_millis(),
1203        cut: response.cut,
1204        policy_context,
1205        imports: response
1206            .imports
1207            .iter()
1208            .map(|import| FederatedImportedSourceSummary {
1209                cut: import.cut.clone(),
1210                predicate: import.predicate.clone(),
1211                query_name: import.query_name.clone(),
1212                fact_count: import.facts.len(),
1213            })
1214            .collect(),
1215        primary_query,
1216        named_queries,
1217        traces,
1218    }
1219}
1220
1221fn encode_partition_id(partition: &PartitionId) -> String {
1222    let mut encoded = String::with_capacity(partition.as_str().len() * 2);
1223    for byte in partition.as_str().as_bytes() {
1224        let _ = write!(&mut encoded, "{byte:02x}");
1225    }
1226    encoded
1227}
1228
1229fn cache_key<T: Serialize>(value: &T) -> Result<String, ApiError> {
1230    serde_json::to_string(value)
1231        .map_err(|error| ApiError::Validation(format!("failed to encode cache key: {error}")))
1232}
1233
1234fn select_query_result<'a>(
1235    response: &'a RunDocumentResponse,
1236    query_name: Option<&str>,
1237) -> Result<&'a QueryResult, ApiError> {
1238    match query_name {
1239        Some(name) => response
1240            .queries
1241            .iter()
1242            .find(|query| query.name.as_deref() == Some(name))
1243            .map(|query| &query.result)
1244            .ok_or_else(|| ApiError::Validation(format!("unknown named query {}", name))),
1245        None => response
1246            .query
1247            .as_ref()
1248            .ok_or_else(|| ApiError::Validation("document did not produce a primary query".into())),
1249    }
1250}
1251
1252fn select_query_spec<'a>(
1253    response: &'a crate::ParseDocumentResponse,
1254    query_name: Option<&str>,
1255) -> Result<&'a aether_ast::QuerySpec, ApiError> {
1256    match query_name {
1257        Some(name) => response
1258            .queries
1259            .iter()
1260            .find(|query| query.name.as_deref() == Some(name))
1261            .map(|query| &query.spec)
1262            .ok_or_else(|| ApiError::Validation(format!("unknown named query {}", name))),
1263        None => response
1264            .query
1265            .as_ref()
1266            .ok_or_else(|| ApiError::Validation("document did not produce a primary query".into())),
1267    }
1268}
1269
1270fn ensure_importable_query_shape(
1271    spec: &aether_ast::QuerySpec,
1272    query_name: Option<&str>,
1273) -> Result<(), ApiError> {
1274    if spec.query.goals.len() == 1 {
1275        Ok(())
1276    } else {
1277        let label = query_name.unwrap_or("<primary>");
1278        Err(ApiError::Validation(format!(
1279            "imported fact query {label} must have exactly one goal so imported provenance maps to a single semantic row"
1280        )))
1281    }
1282}
1283
1284fn build_imported_fact(
1285    request: &ImportedFactQueryRequest,
1286    index: usize,
1287    row: &QueryRow,
1288    tuples: &IndexMap<TupleId, &aether_ast::DerivedTuple>,
1289    derived: &DerivedSet,
1290) -> Result<ExtensionalFact, ApiError> {
1291    if row.values.len() != request.predicate.arity {
1292        return Err(ApiError::Validation(format!(
1293            "imported fact row {} from {} produced {} value(s), but predicate {} expects arity {}",
1294            index,
1295            request.cut,
1296            row.values.len(),
1297            request.predicate.name,
1298            request.predicate.arity
1299        )));
1300    }
1301    let tuple_id = row.tuple_id.ok_or_else(|| {
1302        ApiError::Validation(format!(
1303            "imported fact row {} from {} was not backed by a derived tuple; import a tuple-producing query instead",
1304            index, request.cut
1305        ))
1306    })?;
1307    let tuple = tuples.get(&tuple_id).copied().ok_or_else(|| {
1308        ApiError::Validation(format!(
1309            "imported fact row {} from {} referenced missing tuple t{}",
1310            index, request.cut, tuple_id.0
1311        ))
1312    })?;
1313    let imported_cuts = merge_partition_cuts(
1314        std::iter::once(&request.cut).chain(tuple.metadata.imported_cuts.iter()),
1315    );
1316    let policy = tuple.policy.clone();
1317
1318    if !derived
1319        .tuples
1320        .iter()
1321        .any(|candidate| candidate.tuple.id == tuple_id)
1322    {
1323        return Err(ApiError::Validation(format!(
1324            "imported fact row {} from {} referenced tuple t{} outside the derived set",
1325            index, request.cut, tuple_id.0
1326        )));
1327    }
1328
1329    Ok(ExtensionalFact {
1330        predicate: request.predicate.clone(),
1331        values: row.values.clone(),
1332        policy,
1333        provenance: Some(FactProvenance {
1334            source_datom_ids: tuple.metadata.source_datom_ids.clone(),
1335            imported_cuts,
1336            sidecar_origin: None,
1337            source_ref: Some(SourceRef {
1338                uri: format!(
1339                    "aether://partition/{}/tuple/t{}",
1340                    request.cut.partition, tuple_id.0
1341                ),
1342                digest: None,
1343            }),
1344        }),
1345    })
1346}
1347
1348fn federated_cut_from_imports(
1349    imports: &[ImportedFactQueryResponse],
1350) -> Result<FederatedCut, ApiError> {
1351    let mut by_partition = IndexMap::<PartitionId, PartitionCut>::new();
1352    for import in imports {
1353        match by_partition.get(&import.cut.partition) {
1354            Some(existing) if existing != &import.cut => {
1355                return Err(ApiError::Validation(format!(
1356                    "federated imports contain conflicting cuts for partition {}",
1357                    import.cut.partition
1358                )));
1359            }
1360            Some(_) => {}
1361            None => {
1362                by_partition.insert(import.cut.partition.clone(), import.cut.clone());
1363            }
1364        }
1365    }
1366    validate_federated_cut(FederatedCut {
1367        cuts: by_partition.into_values().collect(),
1368    })
1369}
1370
1371fn ensure_federated_document_uses_current_views(
1372    parsed: &crate::ParseDocumentResponse,
1373) -> Result<(), ApiError> {
1374    if let Some(query) = &parsed.query {
1375        ensure_current_view("primary query", &query.view)?;
1376    }
1377    for named_query in &parsed.queries {
1378        ensure_current_view(
1379            named_query
1380                .name
1381                .as_deref()
1382                .map(|name| format!("query {}", name))
1383                .unwrap_or_else(|| "query".into())
1384                .as_str(),
1385            &named_query.spec.view,
1386        )?;
1387    }
1388    for named_explain in &parsed.explains {
1389        ensure_current_view(
1390            named_explain
1391                .name
1392                .as_deref()
1393                .map(|name| format!("explain {}", name))
1394                .unwrap_or_else(|| "explain".into())
1395                .as_str(),
1396            &named_explain.spec.view,
1397        )?;
1398    }
1399    Ok(())
1400}
1401
1402fn ensure_current_view(label: &str, view: &TemporalView) -> Result<(), ApiError> {
1403    match view {
1404        TemporalView::Current => Ok(()),
1405        TemporalView::AsOf(element) => Err(ApiError::Validation(format!(
1406            "{label} cannot use AsOf(e{}); federated time must be expressed through explicit partition cuts",
1407            element.0
1408        ))),
1409    }
1410}
1411
1412fn ensure_schema_covers_fact_predicates(
1413    schema: &mut Schema,
1414    facts: &[ExtensionalFact],
1415) -> Result<(), ApiError> {
1416    let mut signatures = IndexMap::<aether_ast::PredicateId, PredicateSignature>::new();
1417    for fact in facts {
1418        let fields = fact.values.iter().map(value_type_for).collect::<Vec<_>>();
1419        match signatures.get(&fact.predicate.id) {
1420            Some(existing) if existing.fields != fields => {
1421                return Err(ApiError::Validation(format!(
1422                    "imported predicate {} has inconsistent field types across federated facts",
1423                    fact.predicate.name
1424                )));
1425            }
1426            Some(_) => {}
1427            None => {
1428                signatures.insert(
1429                    fact.predicate.id,
1430                    PredicateSignature {
1431                        id: fact.predicate.id,
1432                        name: fact.predicate.name.clone(),
1433                        fields,
1434                    },
1435                );
1436            }
1437        }
1438    }
1439
1440    for signature in signatures.into_values() {
1441        if schema.predicate(&signature.id).is_none() {
1442            schema
1443                .register_predicate(signature)
1444                .map_err(|error| ApiError::Validation(error.to_string()))?;
1445        }
1446    }
1447    Ok(())
1448}
1449
1450fn value_type_for(value: &Value) -> ValueType {
1451    match value {
1452        Value::Null => ValueType::String,
1453        Value::Bool(_) => ValueType::Bool,
1454        Value::I64(_) => ValueType::I64,
1455        Value::U64(_) => ValueType::U64,
1456        Value::F64(_) => ValueType::F64,
1457        Value::String(_) => ValueType::String,
1458        Value::Bytes(_) => ValueType::Bytes,
1459        Value::Entity(_) => ValueType::Entity,
1460        Value::List(values) => ValueType::List(Box::new(
1461            values
1462                .first()
1463                .map(value_type_for)
1464                .unwrap_or(ValueType::String),
1465        )),
1466    }
1467}
1468
1469fn filter_compiled_program_facts(
1470    program: &CompiledProgram,
1471    policy_context: Option<&PolicyContext>,
1472) -> CompiledProgram {
1473    let mut filtered = program.clone();
1474    filtered
1475        .facts
1476        .retain(|fact| policy_allows(policy_context, fact.policy.as_ref()));
1477    filtered
1478}
1479
1480fn filter_derived_set(derived: &DerivedSet, policy_context: Option<&PolicyContext>) -> DerivedSet {
1481    let tuples = derived
1482        .tuples
1483        .iter()
1484        .filter(|tuple| policy_allows(policy_context, tuple.policy.as_ref()))
1485        .cloned()
1486        .collect::<Vec<_>>();
1487    let visible_ids = tuples
1488        .iter()
1489        .map(|tuple| tuple.tuple.id)
1490        .collect::<std::collections::BTreeSet<_>>();
1491    let predicate_index = derived
1492        .predicate_index
1493        .iter()
1494        .map(|(predicate, tuple_ids)| {
1495            (
1496                *predicate,
1497                tuple_ids
1498                    .iter()
1499                    .copied()
1500                    .filter(|tuple_id| visible_ids.contains(tuple_id))
1501                    .collect::<Vec<_>>(),
1502            )
1503        })
1504        .collect::<IndexMap<_, _>>();
1505
1506    DerivedSet {
1507        tuples,
1508        iterations: derived.iterations.clone(),
1509        predicate_index,
1510    }
1511}
1512
1513fn execute_federated_explain_spec(
1514    program: &CompiledProgram,
1515    derived: &DerivedSet,
1516    visible_derived: &DerivedSet,
1517    spec: &ExplainSpec,
1518    policy_context: Option<&PolicyContext>,
1519) -> Result<ExplainArtifact, ApiError> {
1520    match &spec.target {
1521        ExplainTarget::Plan => Ok(ExplainArtifact::Plan(
1522            InMemoryExplainer::default().explain_plan(&program.phase_graph)?,
1523        )),
1524        ExplainTarget::Tuple(atom) => {
1525            let tuple_id = find_matching_derived_tuple(visible_derived, atom).ok_or_else(|| {
1526                ApiError::Validation(format!(
1527                    "no derived tuple matched explain target {}",
1528                    atom.predicate.name
1529                ))
1530            })?;
1531            let trace = InMemoryExplainer::from_derived_set(derived).explain_tuple(&tuple_id)?;
1532            Ok(ExplainArtifact::Tuple(filter_trace(trace, policy_context)?))
1533        }
1534    }
1535}
1536
1537fn find_matching_derived_tuple(derived: &DerivedSet, atom: &Atom) -> Option<TupleId> {
1538    derived.tuples.iter().find_map(|tuple| {
1539        if tuple.tuple.predicate != atom.predicate.id
1540            || tuple.tuple.values.len() != atom.terms.len()
1541        {
1542            return None;
1543        }
1544        let matches = atom
1545            .terms
1546            .iter()
1547            .zip(&tuple.tuple.values)
1548            .all(|(term, value)| matches_term(term, value));
1549        matches.then_some(tuple.tuple.id)
1550    })
1551}
1552
1553fn matches_term(term: &aether_ast::Term, value: &Value) -> bool {
1554    match term {
1555        aether_ast::Term::Value(expected) => expected == value,
1556        aether_ast::Term::Variable(_) => true,
1557        aether_ast::Term::Aggregate(_) => false,
1558    }
1559}
1560
1561fn filter_trace(
1562    trace: aether_ast::DerivationTrace,
1563    policy_context: Option<&PolicyContext>,
1564) -> Result<aether_ast::DerivationTrace, ApiError> {
1565    let tuples = trace
1566        .tuples
1567        .into_iter()
1568        .filter(|tuple| policy_allows(policy_context, tuple.policy.as_ref()))
1569        .collect::<Vec<_>>();
1570    if tuples.iter().all(|tuple| tuple.tuple.id != trace.root) {
1571        return Err(ApiError::Validation(
1572            "requested tuple is not visible under the current policy".into(),
1573        ));
1574    }
1575    Ok(aether_ast::DerivationTrace {
1576        root: trace.root,
1577        tuples,
1578    })
1579}
1580
1581fn report_rows(result: &QueryResult) -> Vec<FederatedReportRow> {
1582    result
1583        .rows
1584        .iter()
1585        .map(|row| FederatedReportRow {
1586            tuple_id: row.tuple_id,
1587            values: row.values.clone(),
1588        })
1589        .collect()
1590}
1591
1592fn build_trace_summary(
1593    name: Option<String>,
1594    trace: &aether_ast::DerivationTrace,
1595) -> FederatedTraceSummary {
1596    let imported_cuts = merge_partition_cuts(
1597        trace
1598            .tuples
1599            .iter()
1600            .flat_map(|tuple| tuple.metadata.imported_cuts.iter()),
1601    );
1602    FederatedTraceSummary {
1603        name,
1604        root: trace.root,
1605        tuple_count: trace.tuples.len(),
1606        imported_cuts,
1607        tuples: trace
1608            .tuples
1609            .iter()
1610            .map(|tuple| FederatedTraceTupleSummary {
1611                tuple_id: tuple.tuple.id,
1612                values: tuple.tuple.values.clone(),
1613                iteration: tuple.metadata.iteration,
1614                source_datom_ids: tuple.metadata.source_datom_ids.clone(),
1615                imported_cuts: tuple.metadata.imported_cuts.clone(),
1616                parent_tuple_ids: tuple.metadata.parent_tuple_ids.clone(),
1617            })
1618            .collect(),
1619    }
1620}
1621
1622pub fn render_federated_explain_report_markdown(report: &FederatedExplainReport) -> String {
1623    let mut markdown = String::new();
1624    markdown.push_str("# Federated Explain Report\n\n");
1625    markdown.push_str(&format!(
1626        "- Generated at (ms): `{}`\n- Federated cut: `{}`\n- Effective policy: `{}`\n\n",
1627        report.generated_at_ms,
1628        format_federated_cut(&report.cut),
1629        format_policy_context(report.policy_context.as_ref())
1630    ));
1631
1632    markdown.push_str("## Imported Sources\n\n");
1633    if report.imports.is_empty() {
1634        markdown.push_str("_None._\n\n");
1635    } else {
1636        for import in &report.imports {
1637            markdown.push_str(&format!(
1638                "- `{}` -> `{}` | query `{}` | facts `{}`\n",
1639                import.cut,
1640                import.predicate.name,
1641                import.query_name.as_deref().unwrap_or("<primary>"),
1642                import.fact_count
1643            ));
1644        }
1645        markdown.push('\n');
1646    }
1647
1648    markdown.push_str("## Primary Query\n\n");
1649    if report.primary_query.is_empty() {
1650        markdown.push_str("_No rows._\n\n");
1651    } else {
1652        for row in &report.primary_query {
1653            markdown.push_str(&format!(
1654                "- `{}`{}\n",
1655                format_values(&row.values),
1656                row.tuple_id
1657                    .map(|tuple_id| format!(" | tuple `t{}`", tuple_id.0))
1658                    .unwrap_or_default()
1659            ));
1660        }
1661        markdown.push('\n');
1662    }
1663
1664    if !report.named_queries.is_empty() {
1665        markdown.push_str("## Named Queries\n\n");
1666        for query in &report.named_queries {
1667            markdown.push_str(&format!(
1668                "### {}\n\n",
1669                query.name.as_deref().unwrap_or("<unnamed>")
1670            ));
1671            if query.rows.is_empty() {
1672                markdown.push_str("_No rows._\n\n");
1673            } else {
1674                for row in &query.rows {
1675                    markdown.push_str(&format!(
1676                        "- `{}`{}\n",
1677                        format_values(&row.values),
1678                        row.tuple_id
1679                            .map(|tuple_id| format!(" | tuple `t{}`", tuple_id.0))
1680                            .unwrap_or_default()
1681                    ));
1682                }
1683                markdown.push('\n');
1684            }
1685        }
1686    }
1687
1688    if !report.traces.is_empty() {
1689        markdown.push_str("## Federated Traces\n\n");
1690        for trace in &report.traces {
1691            markdown.push_str(&format!(
1692                "### {}\n\n- Root: `t{}`\n- Tuple count: `{}`\n- Imported cuts: `{}`\n\n",
1693                trace.name.as_deref().unwrap_or("<unnamed trace>"),
1694                trace.root.0,
1695                trace.tuple_count,
1696                format_partition_cuts(&trace.imported_cuts)
1697            ));
1698            for tuple in &trace.tuples {
1699                markdown.push_str(&format!(
1700                    "- `t{}` via iteration `{}` -> `{}` | sources `{}` | imported `{}` | parents `{}`\n",
1701                    tuple.tuple_id.0,
1702                    tuple.iteration,
1703                    format_values(&tuple.values),
1704                    format_element_ids(&tuple.source_datom_ids),
1705                    format_partition_cuts(&tuple.imported_cuts),
1706                    format_tuple_ids(&tuple.parent_tuple_ids)
1707                ));
1708            }
1709            markdown.push('\n');
1710        }
1711    }
1712
1713    markdown
1714}
1715
1716fn format_federated_cut(cut: &FederatedCut) -> String {
1717    if cut.cuts.is_empty() {
1718        "<none>".into()
1719    } else {
1720        cut.cuts
1721            .iter()
1722            .map(ToString::to_string)
1723            .collect::<Vec<_>>()
1724            .join(", ")
1725    }
1726}
1727
1728fn format_partition_cuts(cuts: &[PartitionCut]) -> String {
1729    if cuts.is_empty() {
1730        "<none>".into()
1731    } else {
1732        cuts.iter()
1733            .map(ToString::to_string)
1734            .collect::<Vec<_>>()
1735            .join(", ")
1736    }
1737}
1738
1739fn format_values(values: &[Value]) -> String {
1740    values
1741        .iter()
1742        .map(format_value)
1743        .collect::<Vec<_>>()
1744        .join(", ")
1745}
1746
1747fn format_value(value: &Value) -> String {
1748    match value {
1749        Value::Null => "null".into(),
1750        Value::Bool(value) => value.to_string(),
1751        Value::I64(value) => value.to_string(),
1752        Value::U64(value) => value.to_string(),
1753        Value::F64(value) => format!("{value:.4}"),
1754        Value::String(value) => value.clone(),
1755        Value::Bytes(value) => format!("<{} bytes>", value.len()),
1756        Value::Entity(value) => format!("entity({})", value.0),
1757        Value::List(values) => format!("[{}]", format_values(values)),
1758    }
1759}
1760
1761fn format_element_ids(elements: &[aether_ast::ElementId]) -> String {
1762    if elements.is_empty() {
1763        "<none>".into()
1764    } else {
1765        elements
1766            .iter()
1767            .map(|element| format!("e{}", element.0))
1768            .collect::<Vec<_>>()
1769            .join(", ")
1770    }
1771}
1772
1773fn format_tuple_ids(tuple_ids: &[TupleId]) -> String {
1774    if tuple_ids.is_empty() {
1775        "<none>".into()
1776    } else {
1777        tuple_ids
1778            .iter()
1779            .map(|tuple_id| format!("t{}", tuple_id.0))
1780            .collect::<Vec<_>>()
1781            .join(", ")
1782    }
1783}
1784
1785fn format_policy_context(policy_context: Option<&PolicyContext>) -> String {
1786    match policy_context {
1787        None => "public".into(),
1788        Some(policy) if policy.is_empty() => "public".into(),
1789        Some(policy) => {
1790            let capabilities = if policy.capabilities.is_empty() {
1791                "capabilities=<none>".into()
1792            } else {
1793                format!("capabilities={}", policy.capabilities.join(","))
1794            };
1795            let visibilities = if policy.visibilities.is_empty() {
1796                "visibilities=<none>".into()
1797            } else {
1798                format!("visibilities={}", policy.visibilities.join(","))
1799            };
1800            format!("{capabilities}; {visibilities}")
1801        }
1802    }
1803}
1804
1805fn now_millis() -> u64 {
1806    SystemTime::now()
1807        .duration_since(UNIX_EPOCH)
1808        .expect("system clock before unix epoch")
1809        .as_millis() as u64
1810}
1811
1812#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1813pub struct PartitionAppendRequest {
1814    pub partition: PartitionId,
1815    #[serde(default, skip_serializing_if = "Option::is_none")]
1816    pub leader_epoch: Option<LeaderEpoch>,
1817    pub datoms: Vec<Datom>,
1818}
1819
1820#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
1821pub struct PartitionAppendResponse {
1822    pub partition: PartitionId,
1823    #[serde(default, skip_serializing_if = "Option::is_none")]
1824    pub leader_epoch: Option<LeaderEpoch>,
1825    pub appended: usize,
1826}
1827
1828#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
1829pub struct PartitionHistoryRequest {
1830    pub cut: PartitionCut,
1831    #[serde(default, skip_serializing_if = "Option::is_none")]
1832    pub policy_context: Option<PolicyContext>,
1833}
1834
1835#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1836pub struct PartitionHistoryResponse {
1837    pub cut: PartitionCut,
1838    pub datoms: Vec<Datom>,
1839}
1840
1841#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1842pub struct PartitionStateRequest {
1843    pub cut: PartitionCut,
1844    pub schema: Schema,
1845    #[serde(default, skip_serializing_if = "Option::is_none")]
1846    pub policy_context: Option<PolicyContext>,
1847}
1848
1849#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1850pub struct PartitionStateResponse {
1851    pub cut: PartitionCut,
1852    pub state: ResolvedState,
1853}
1854
1855#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1856pub struct FederatedHistoryRequest {
1857    pub cut: FederatedCut,
1858    #[serde(default, skip_serializing_if = "Option::is_none")]
1859    pub policy_context: Option<PolicyContext>,
1860}
1861
1862#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1863pub struct FederatedHistoryResponse {
1864    pub partitions: Vec<PartitionHistoryResponse>,
1865}
1866
1867#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1868pub struct ImportedFactQueryRequest {
1869    pub cut: PartitionCut,
1870    pub dsl: String,
1871    pub predicate: PredicateRef,
1872    #[serde(default, skip_serializing_if = "Option::is_none")]
1873    pub query_name: Option<String>,
1874}
1875
1876#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1877pub struct ImportedFactQueryResponse {
1878    pub cut: PartitionCut,
1879    pub predicate: PredicateRef,
1880    #[serde(default, skip_serializing_if = "Option::is_none")]
1881    pub query_name: Option<String>,
1882    pub row_count: usize,
1883    pub facts: Vec<ExtensionalFact>,
1884}
1885
1886#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1887pub struct FederatedRunDocumentRequest {
1888    pub dsl: String,
1889    pub imports: Vec<ImportedFactQueryRequest>,
1890    #[serde(default, skip_serializing_if = "Option::is_none")]
1891    pub policy_context: Option<PolicyContext>,
1892}
1893
1894#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1895pub struct FederatedRunDocumentResponse {
1896    pub cut: FederatedCut,
1897    pub imports: Vec<ImportedFactQueryResponse>,
1898    pub run: RunDocumentResponse,
1899}
1900
1901#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1902pub struct FederatedExplainReport {
1903    pub generated_at_ms: u64,
1904    pub cut: FederatedCut,
1905    #[serde(default, skip_serializing_if = "Option::is_none")]
1906    pub policy_context: Option<PolicyContext>,
1907    pub imports: Vec<FederatedImportedSourceSummary>,
1908    pub primary_query: Vec<FederatedReportRow>,
1909    pub named_queries: Vec<FederatedNamedQuerySummary>,
1910    pub traces: Vec<FederatedTraceSummary>,
1911}
1912
1913#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1914pub struct FederatedImportedSourceSummary {
1915    pub cut: PartitionCut,
1916    pub predicate: PredicateRef,
1917    #[serde(default, skip_serializing_if = "Option::is_none")]
1918    pub query_name: Option<String>,
1919    pub fact_count: usize,
1920}
1921
1922#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1923pub struct FederatedReportRow {
1924    #[serde(default, skip_serializing_if = "Option::is_none")]
1925    pub tuple_id: Option<TupleId>,
1926    pub values: Vec<Value>,
1927}
1928
1929#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1930pub struct FederatedNamedQuerySummary {
1931    #[serde(default, skip_serializing_if = "Option::is_none")]
1932    pub name: Option<String>,
1933    pub rows: Vec<FederatedReportRow>,
1934}
1935
1936#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1937pub struct FederatedTraceSummary {
1938    #[serde(default, skip_serializing_if = "Option::is_none")]
1939    pub name: Option<String>,
1940    pub root: TupleId,
1941    pub tuple_count: usize,
1942    pub imported_cuts: Vec<PartitionCut>,
1943    pub tuples: Vec<FederatedTraceTupleSummary>,
1944}
1945
1946#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
1947pub struct FederatedTraceTupleSummary {
1948    pub tuple_id: TupleId,
1949    pub values: Vec<Value>,
1950    pub iteration: usize,
1951    pub source_datom_ids: Vec<aether_ast::ElementId>,
1952    pub imported_cuts: Vec<PartitionCut>,
1953    pub parent_tuple_ids: Vec<TupleId>,
1954}
1955
1956#[cfg(test)]
1957mod tests {
1958    use super::{
1959        render_federated_explain_report_markdown, AuthorityPartitionConfig,
1960        FederatedHistoryRequest, FederatedRunDocumentRequest, ImportedFactQueryRequest,
1961        LeaderEpoch, PartitionAppendRequest, PartitionHistoryRequest, PartitionStateRequest,
1962        PartitionedInMemoryKernelService, PromoteReplicaRequest, ReplicaConfig, ReplicaRole,
1963        ReplicatedAuthorityPartitionService, SqlitePartitionedKernelService,
1964    };
1965    use aether_ast::{
1966        AttributeId, Datom, DatomProvenance, ElementId, EntityId, FederatedCut, OperationKind,
1967        PartitionCut, PartitionId, PolicyContext, PolicyEnvelope, PredicateId, PredicateRef,
1968        ReplicaId, Value,
1969    };
1970    use aether_resolver::ResolvedValue;
1971    use aether_schema::{AttributeClass, AttributeSchema, Schema, ValueType};
1972    use std::{
1973        path::{Path, PathBuf},
1974        sync::atomic::{AtomicU64, Ordering},
1975        time::{SystemTime, UNIX_EPOCH},
1976    };
1977
1978    static NEXT_TEST_ID: AtomicU64 = AtomicU64::new(1);
1979
1980    #[test]
1981    fn partitioned_service_keeps_local_truth_exact_per_partition() {
1982        let mut service = PartitionedInMemoryKernelService::new();
1983        service
1984            .append_partition(PartitionAppendRequest {
1985                partition: PartitionId::new("tenant-a"),
1986                leader_epoch: None,
1987                datoms: vec![
1988                    sample_datom(1, 1, "tenant-a-open", 1, None),
1989                    sample_datom(1, 1, "tenant-a-running", 2, None),
1990                ],
1991            })
1992            .expect("append tenant-a");
1993        service
1994            .append_partition(PartitionAppendRequest {
1995                partition: PartitionId::new("tenant-b"),
1996                leader_epoch: None,
1997                datoms: vec![sample_datom(1, 1, "tenant-b-ready", 1, None)],
1998            })
1999            .expect("append tenant-b");
2000
2001        let tenant_a_current = service
2002            .partition_state(PartitionStateRequest {
2003                cut: PartitionCut::current("tenant-a"),
2004                schema: schema(),
2005                policy_context: None,
2006            })
2007            .expect("tenant-a current state");
2008        let tenant_a_as_of = service
2009            .partition_state(PartitionStateRequest {
2010                cut: PartitionCut::as_of("tenant-a", ElementId::new(1)),
2011                schema: schema(),
2012                policy_context: None,
2013            })
2014            .expect("tenant-a as-of state");
2015        let tenant_b_history = service
2016            .partition_history(PartitionHistoryRequest {
2017                cut: PartitionCut::current("tenant-b"),
2018                policy_context: None,
2019            })
2020            .expect("tenant-b history");
2021
2022        assert_eq!(
2023            tenant_a_current
2024                .state
2025                .entity(&EntityId::new(1))
2026                .and_then(|entity| entity.attribute(&AttributeId::new(1))),
2027            Some(&ResolvedValue::Scalar(Some(Value::String(
2028                "tenant-a-running".into()
2029            ))))
2030        );
2031        assert_eq!(
2032            tenant_a_as_of
2033                .state
2034                .entity(&EntityId::new(1))
2035                .and_then(|entity| entity.attribute(&AttributeId::new(1))),
2036            Some(&ResolvedValue::Scalar(Some(Value::String(
2037                "tenant-a-open".into()
2038            ))))
2039        );
2040        assert_eq!(tenant_b_history.datoms.len(), 1);
2041        assert_eq!(
2042            tenant_b_history.datoms[0].value,
2043            Value::String("tenant-b-ready".into())
2044        );
2045    }
2046
2047    #[test]
2048    fn federated_history_requires_explicit_unique_partition_cuts() {
2049        let mut service = PartitionedInMemoryKernelService::new();
2050        service
2051            .append_partition(PartitionAppendRequest {
2052                partition: PartitionId::new("tenant-a"),
2053                leader_epoch: None,
2054                datoms: vec![sample_datom(1, 1, "alpha", 1, None)],
2055            })
2056            .expect("append tenant-a");
2057        service
2058            .append_partition(PartitionAppendRequest {
2059                partition: PartitionId::new("tenant-b"),
2060                leader_epoch: None,
2061                datoms: vec![sample_datom(
2062                    1,
2063                    1,
2064                    "beta",
2065                    1,
2066                    Some(PolicyEnvelope {
2067                        capabilities: vec!["ops".into()],
2068                        visibilities: Vec::new(),
2069                    }),
2070                )],
2071            })
2072            .expect("append tenant-b");
2073
2074        let federated = service
2075            .federated_history(FederatedHistoryRequest {
2076                cut: FederatedCut {
2077                    cuts: vec![
2078                        PartitionCut::current("tenant-b"),
2079                        PartitionCut::current("tenant-a"),
2080                    ],
2081                },
2082                policy_context: None,
2083            })
2084            .expect("federated history");
2085        assert_eq!(
2086            federated
2087                .partitions
2088                .iter()
2089                .map(|partition| partition.cut.to_string())
2090                .collect::<Vec<_>>(),
2091            vec![
2092                "tenant-a@current".to_string(),
2093                "tenant-b@current".to_string()
2094            ]
2095        );
2096        assert_eq!(federated.partitions[0].datoms.len(), 1);
2097        assert!(federated.partitions[1].datoms.is_empty());
2098
2099        let duplicate_partition = service.federated_history(FederatedHistoryRequest {
2100            cut: FederatedCut {
2101                cuts: vec![
2102                    PartitionCut::current("tenant-a"),
2103                    PartitionCut::as_of("tenant-a", ElementId::new(1)),
2104                ],
2105            },
2106            policy_context: None,
2107        });
2108        assert!(matches!(
2109            duplicate_partition,
2110            Err(crate::ApiError::Validation(message))
2111                if message == "federated cut contains duplicate partition tenant-a"
2112        ));
2113    }
2114
2115    #[test]
2116    fn unknown_partition_is_rejected_cleanly() {
2117        let service = PartitionedInMemoryKernelService::new();
2118        let error = service.partition_history(PartitionHistoryRequest {
2119            cut: PartitionCut::current("missing"),
2120            policy_context: None,
2121        });
2122        assert!(matches!(
2123            error,
2124            Err(crate::ApiError::Validation(message))
2125                if message == "unknown partition missing"
2126        ));
2127    }
2128
2129    #[test]
2130    fn imported_facts_and_federated_explain_reports_preserve_partition_cuts() {
2131        let mut service = PartitionedInMemoryKernelService::new();
2132        service
2133            .append_partition(PartitionAppendRequest {
2134                partition: PartitionId::new("readiness"),
2135                leader_epoch: None,
2136                datoms: vec![sample_datom(1, 1, "ready", 1, None)],
2137            })
2138            .expect("append readiness datoms");
2139        service
2140            .append_partition(PartitionAppendRequest {
2141                partition: PartitionId::new("authority"),
2142                leader_epoch: None,
2143                datoms: vec![sample_datom(1, 1, "worker-a", 3, None)],
2144            })
2145            .expect("append authority datoms");
2146
2147        let response = service
2148            .federated_run_document(FederatedRunDocumentRequest {
2149                dsl: federated_assignment_document(),
2150                imports: vec![
2151                    ImportedFactQueryRequest {
2152                        cut: PartitionCut::as_of("readiness", ElementId::new(1)),
2153                        dsl: readiness_document(),
2154                        predicate: PredicateRef {
2155                            id: PredicateId::new(11),
2156                            name: "imported_ready_task".into(),
2157                            arity: 1,
2158                        },
2159                        query_name: Some("ready_now".into()),
2160                    },
2161                    ImportedFactQueryRequest {
2162                        cut: PartitionCut::as_of("authority", ElementId::new(3)),
2163                        dsl: authority_document(),
2164                        predicate: PredicateRef {
2165                            id: PredicateId::new(12),
2166                            name: "imported_authorized_worker".into(),
2167                            arity: 2,
2168                        },
2169                        query_name: Some("authorized_now".into()),
2170                    },
2171                ],
2172                policy_context: None,
2173            })
2174            .expect("run federated document");
2175
2176        assert_eq!(
2177            response
2178                .cut
2179                .cuts
2180                .iter()
2181                .map(ToString::to_string)
2182                .collect::<Vec<_>>(),
2183            vec!["authority@e3".to_string(), "readiness@e1".to_string()]
2184        );
2185        assert_eq!(response.imports.len(), 2);
2186        assert_eq!(
2187            response
2188                .run
2189                .query
2190                .as_ref()
2191                .expect("primary query result")
2192                .rows[0]
2193                .values,
2194            vec![
2195                Value::Entity(EntityId::new(1)),
2196                Value::String("worker-a".into())
2197            ]
2198        );
2199
2200        let actionable_tuple_id = response
2201            .run
2202            .query
2203            .as_ref()
2204            .expect("primary query result")
2205            .rows[0]
2206            .tuple_id
2207            .expect("actionable tuple id");
2208        let actionable_tuple = response
2209            .run
2210            .derived
2211            .tuples
2212            .iter()
2213            .find(|tuple| tuple.tuple.id == actionable_tuple_id)
2214            .expect("actionable tuple");
2215        assert_eq!(
2216            actionable_tuple
2217                .metadata
2218                .imported_cuts
2219                .iter()
2220                .map(ToString::to_string)
2221                .collect::<Vec<_>>(),
2222            vec!["authority@e3".to_string(), "readiness@e1".to_string()]
2223        );
2224
2225        let trace = response
2226            .run
2227            .explains
2228            .iter()
2229            .find(|explain| explain.name.as_deref() == Some("actionable_trace"))
2230            .expect("actionable trace");
2231        let crate::ExplainArtifact::Tuple(trace) = &trace.result else {
2232            panic!("expected tuple trace");
2233        };
2234        assert_eq!(
2235            trace.tuples[0]
2236                .metadata
2237                .imported_cuts
2238                .iter()
2239                .map(ToString::to_string)
2240                .collect::<Vec<_>>(),
2241            vec!["authority@e3".to_string(), "readiness@e1".to_string()]
2242        );
2243
2244        let report = service
2245            .build_federated_explain_report(FederatedRunDocumentRequest {
2246                dsl: federated_assignment_document(),
2247                imports: vec![
2248                    ImportedFactQueryRequest {
2249                        cut: PartitionCut::as_of("readiness", ElementId::new(1)),
2250                        dsl: readiness_document(),
2251                        predicate: PredicateRef {
2252                            id: PredicateId::new(11),
2253                            name: "imported_ready_task".into(),
2254                            arity: 1,
2255                        },
2256                        query_name: Some("ready_now".into()),
2257                    },
2258                    ImportedFactQueryRequest {
2259                        cut: PartitionCut::as_of("authority", ElementId::new(3)),
2260                        dsl: authority_document(),
2261                        predicate: PredicateRef {
2262                            id: PredicateId::new(12),
2263                            name: "imported_authorized_worker".into(),
2264                            arity: 2,
2265                        },
2266                        query_name: Some("authorized_now".into()),
2267                    },
2268                ],
2269                policy_context: None,
2270            })
2271            .expect("build federated report");
2272        assert_eq!(report.traces.len(), 1);
2273        let markdown = render_federated_explain_report_markdown(&report);
2274        assert!(markdown.contains("authority@e3, readiness@e1"));
2275        assert!(markdown.contains("imported_authorized_worker"));
2276        assert!(markdown.contains("imported_ready_task"));
2277    }
2278
2279    #[test]
2280    fn imported_fact_queries_must_be_single_goal() {
2281        let mut service = PartitionedInMemoryKernelService::new();
2282        service
2283            .append_partition(PartitionAppendRequest {
2284                partition: PartitionId::new("joined"),
2285                leader_epoch: None,
2286                datoms: vec![
2287                    sample_datom(1, 1, "ready", 1, None),
2288                    Datom {
2289                        entity: EntityId::new(1),
2290                        attribute: AttributeId::new(2),
2291                        value: Value::String("worker-a".into()),
2292                        op: OperationKind::Assert,
2293                        element: ElementId::new(2),
2294                        replica: ReplicaId::new(1),
2295                        causal_context: Default::default(),
2296                        provenance: DatomProvenance::default(),
2297                        policy: None,
2298                    },
2299                ],
2300            })
2301            .expect("append joined datoms");
2302
2303        let error = service
2304            .import_partition_facts(
2305                ImportedFactQueryRequest {
2306                    cut: PartitionCut::current("joined"),
2307                    dsl: joined_import_document(),
2308                    predicate: PredicateRef {
2309                        id: PredicateId::new(21),
2310                        name: "imported_assignment".into(),
2311                        arity: 2,
2312                    },
2313                    query_name: Some("joined_now".into()),
2314                },
2315                None,
2316            )
2317            .expect_err("joined import should be rejected");
2318
2319        match error {
2320            crate::ApiError::Validation(message) => assert!(
2321                message
2322                    .contains("must have exactly one goal so imported provenance maps to a single semantic row"),
2323                "{message}"
2324            ),
2325            other => panic!("unexpected error: {other:?}"),
2326        }
2327    }
2328
2329    #[test]
2330    fn hidden_partition_cut_is_rejected_under_policy() {
2331        let mut service = PartitionedInMemoryKernelService::new();
2332        service
2333            .append_partition(PartitionAppendRequest {
2334                partition: PartitionId::new("secure"),
2335                leader_epoch: None,
2336                datoms: vec![
2337                    sample_datom(1, 1, "ready", 1, None),
2338                    sample_datom(
2339                        1,
2340                        1,
2341                        "running",
2342                        2,
2343                        Some(PolicyEnvelope {
2344                            capabilities: vec!["ops".into()],
2345                            visibilities: Vec::new(),
2346                        }),
2347                    ),
2348                ],
2349            })
2350            .expect("append secure datoms");
2351
2352        let history = service.partition_history(PartitionHistoryRequest {
2353            cut: PartitionCut::as_of("secure", ElementId::new(2)),
2354            policy_context: None,
2355        });
2356        assert!(matches!(
2357            history,
2358            Err(crate::ApiError::Validation(message))
2359                if message == "unknown element 2 for partition secure"
2360        ));
2361
2362        let state = service.partition_state(PartitionStateRequest {
2363            cut: PartitionCut::as_of("secure", ElementId::new(2)),
2364            schema: schema(),
2365            policy_context: None,
2366        });
2367        assert!(matches!(
2368            state,
2369            Err(crate::ApiError::Validation(message))
2370                if message == "unknown element 2 for partition secure"
2371        ));
2372
2373        let visible = service
2374            .partition_state(PartitionStateRequest {
2375                cut: PartitionCut::as_of("secure", ElementId::new(2)),
2376                schema: schema(),
2377                policy_context: Some(PolicyContext {
2378                    capabilities: vec!["ops".into()],
2379                    visibilities: Vec::new(),
2380                }),
2381            })
2382            .expect("authorized cut should resolve");
2383        assert_eq!(
2384            visible.cut,
2385            PartitionCut::as_of("secure", ElementId::new(2))
2386        );
2387    }
2388
2389    #[test]
2390    fn sqlite_partitioned_service_replays_federated_imports_after_restart() {
2391        let temp = TestPartitionDir::new("partitioned-sqlite");
2392        {
2393            let mut service =
2394                SqlitePartitionedKernelService::open(temp.path()).expect("open sqlite partitions");
2395            service
2396                .append_partition(PartitionAppendRequest {
2397                    partition: PartitionId::new("readiness"),
2398                    leader_epoch: None,
2399                    datoms: vec![sample_datom(1, 1, "ready", 1, None)],
2400                })
2401                .expect("append readiness datoms");
2402            service
2403                .append_partition(PartitionAppendRequest {
2404                    partition: PartitionId::new("authority"),
2405                    leader_epoch: None,
2406                    datoms: vec![sample_datom(1, 1, "worker-a", 3, None)],
2407                })
2408                .expect("append authority datoms");
2409        }
2410
2411        let mut service =
2412            SqlitePartitionedKernelService::open(temp.path()).expect("reopen sqlite partitions");
2413        let response = service
2414            .federated_run_document(FederatedRunDocumentRequest {
2415                dsl: federated_assignment_document(),
2416                imports: vec![
2417                    ImportedFactQueryRequest {
2418                        cut: PartitionCut::as_of("readiness", ElementId::new(1)),
2419                        dsl: readiness_document(),
2420                        predicate: PredicateRef {
2421                            id: PredicateId::new(11),
2422                            name: "imported_ready_task".into(),
2423                            arity: 1,
2424                        },
2425                        query_name: Some("ready_now".into()),
2426                    },
2427                    ImportedFactQueryRequest {
2428                        cut: PartitionCut::as_of("authority", ElementId::new(3)),
2429                        dsl: authority_document(),
2430                        predicate: PredicateRef {
2431                            id: PredicateId::new(12),
2432                            name: "imported_authorized_worker".into(),
2433                            arity: 2,
2434                        },
2435                        query_name: Some("authorized_now".into()),
2436                    },
2437                ],
2438                policy_context: None,
2439            })
2440            .expect("run federated document after restart");
2441
2442        assert_eq!(
2443            response.run.query.as_ref().expect("query result").rows[0].values,
2444            vec![
2445                Value::Entity(EntityId::new(1)),
2446                Value::String("worker-a".into())
2447            ]
2448        );
2449        let report = service
2450            .build_federated_explain_report(FederatedRunDocumentRequest {
2451                dsl: federated_assignment_document(),
2452                imports: vec![
2453                    ImportedFactQueryRequest {
2454                        cut: PartitionCut::as_of("readiness", ElementId::new(1)),
2455                        dsl: readiness_document(),
2456                        predicate: PredicateRef {
2457                            id: PredicateId::new(11),
2458                            name: "imported_ready_task".into(),
2459                            arity: 1,
2460                        },
2461                        query_name: Some("ready_now".into()),
2462                    },
2463                    ImportedFactQueryRequest {
2464                        cut: PartitionCut::as_of("authority", ElementId::new(3)),
2465                        dsl: authority_document(),
2466                        predicate: PredicateRef {
2467                            id: PredicateId::new(12),
2468                            name: "imported_authorized_worker".into(),
2469                            arity: 2,
2470                        },
2471                        query_name: Some("authorized_now".into()),
2472                    },
2473                ],
2474                policy_context: None,
2475            })
2476            .expect("build report after restart");
2477        assert!(render_federated_explain_report_markdown(&report)
2478            .contains("authority@e3, readiness@e1"));
2479    }
2480
2481    #[test]
2482    fn replicated_partition_service_replays_followers_and_fences_stale_epochs() {
2483        let temp = TestPartitionDir::new("replicated");
2484        let mut service = ReplicatedAuthorityPartitionService::open(
2485            temp.path(),
2486            vec![AuthorityPartitionConfig {
2487                partition: PartitionId::new("authority"),
2488                replicas: vec![
2489                    ReplicaConfig {
2490                        replica_id: ReplicaId::new(1),
2491                        database_path: PathBuf::from("authority-leader.sqlite"),
2492                        role: ReplicaRole::Leader,
2493                    },
2494                    ReplicaConfig {
2495                        replica_id: ReplicaId::new(2),
2496                        database_path: PathBuf::from("authority-follower.sqlite"),
2497                        role: ReplicaRole::Follower,
2498                    },
2499                ],
2500            }],
2501        )
2502        .expect("open replicated partitions");
2503
2504        let appended = service
2505            .append_partition(PartitionAppendRequest {
2506                partition: PartitionId::new("authority"),
2507                leader_epoch: None,
2508                datoms: vec![sample_datom(1, 1, "worker-a", 1, None)],
2509            })
2510            .expect("append through leader");
2511        assert_eq!(appended.leader_epoch.expect("leader epoch").0, 1);
2512
2513        let status = service.partition_status().expect("partition status");
2514        let authority = &status.partitions[0];
2515        assert_eq!(authority.replicas.len(), 2);
2516        assert!(authority.replicas.iter().all(|replica| replica.healthy));
2517        assert!(authority
2518            .replicas
2519            .iter()
2520            .all(|replica| replica.applied_element == Some(ElementId::new(1))));
2521
2522        let promoted = service
2523            .promote_replica(PromoteReplicaRequest {
2524                partition: PartitionId::new("authority"),
2525                replica_id: ReplicaId::new(2),
2526            })
2527            .expect("promote follower");
2528        assert_eq!(promoted.leader_epoch.0, 2);
2529
2530        let stale = service.append_partition(PartitionAppendRequest {
2531            partition: PartitionId::new("authority"),
2532            leader_epoch: Some(LeaderEpoch::new(1)),
2533            datoms: vec![sample_datom(1, 1, "worker-b", 2, None)],
2534        });
2535        assert!(matches!(
2536            stale,
2537            Err(crate::ApiError::Validation(message))
2538                if message.contains("stale leader epoch 1 for partition authority; current epoch is 2")
2539        ));
2540
2541        let appended = service
2542            .append_partition(PartitionAppendRequest {
2543                partition: PartitionId::new("authority"),
2544                leader_epoch: Some(LeaderEpoch::new(2)),
2545                datoms: vec![sample_datom(1, 1, "worker-b", 2, None)],
2546            })
2547            .expect("append after promotion");
2548        assert_eq!(appended.leader_epoch.expect("leader epoch").0, 2);
2549
2550        let history = service
2551            .partition_history(PartitionHistoryRequest {
2552                cut: PartitionCut::current("authority"),
2553                policy_context: None,
2554            })
2555            .expect("current history");
2556        assert_eq!(history.datoms.len(), 2);
2557
2558        let status = service
2559            .partition_status()
2560            .expect("partition status after promotion");
2561        let authority = &status.partitions[0];
2562        assert!(authority.replicas.iter().any(|replica| {
2563            replica.replica_id == ReplicaId::new(2)
2564                && replica.role == ReplicaRole::Leader
2565                && replica.leader_epoch.0 == 2
2566        }));
2567        assert!(authority
2568            .replicas
2569            .iter()
2570            .all(|replica| replica.applied_element == Some(ElementId::new(2))));
2571    }
2572
2573    #[test]
2574    fn replicated_partition_service_reuses_and_invalidates_federated_reads() {
2575        let temp = TestPartitionDir::new("replicated-cache");
2576        let mut service = ReplicatedAuthorityPartitionService::open(
2577            temp.path(),
2578            vec![
2579                AuthorityPartitionConfig {
2580                    partition: PartitionId::new("readiness"),
2581                    replicas: vec![ReplicaConfig {
2582                        replica_id: ReplicaId::new(1),
2583                        database_path: PathBuf::from("readiness.sqlite"),
2584                        role: ReplicaRole::Leader,
2585                    }],
2586                },
2587                AuthorityPartitionConfig {
2588                    partition: PartitionId::new("authority"),
2589                    replicas: vec![ReplicaConfig {
2590                        replica_id: ReplicaId::new(1),
2591                        database_path: PathBuf::from("authority.sqlite"),
2592                        role: ReplicaRole::Leader,
2593                    }],
2594                },
2595            ],
2596        )
2597        .expect("open replicated partitions");
2598        service
2599            .append_partition(PartitionAppendRequest {
2600                partition: PartitionId::new("readiness"),
2601                leader_epoch: None,
2602                datoms: vec![sample_datom(1, 1, "ready", 1, None)],
2603            })
2604            .expect("append readiness");
2605        service
2606            .append_partition(PartitionAppendRequest {
2607                partition: PartitionId::new("authority"),
2608                leader_epoch: None,
2609                datoms: vec![sample_datom(1, 1, "worker-a", 2, None)],
2610            })
2611            .expect("append authority");
2612
2613        let request = FederatedRunDocumentRequest {
2614            dsl: federated_assignment_query_document(),
2615            imports: vec![
2616                ImportedFactQueryRequest {
2617                    cut: PartitionCut::as_of("readiness", ElementId::new(1)),
2618                    dsl: readiness_document(),
2619                    predicate: PredicateRef {
2620                        id: PredicateId::new(11),
2621                        name: "imported_ready_task".into(),
2622                        arity: 1,
2623                    },
2624                    query_name: Some("ready_now".into()),
2625                },
2626                ImportedFactQueryRequest {
2627                    cut: PartitionCut::as_of("authority", ElementId::new(2)),
2628                    dsl: authority_document(),
2629                    predicate: PredicateRef {
2630                        id: PredicateId::new(12),
2631                        name: "imported_authorized_worker".into(),
2632                        arity: 2,
2633                    },
2634                    query_name: Some("authorized_now".into()),
2635                },
2636            ],
2637            policy_context: None,
2638        };
2639
2640        let first = service
2641            .federated_run_document(request.clone())
2642            .expect("first federated run");
2643        let second = service
2644            .federated_run_document(request.clone())
2645            .expect("second federated run");
2646        assert_eq!(first, second);
2647
2648        service
2649            .append_partition(PartitionAppendRequest {
2650                partition: PartitionId::new("authority"),
2651                leader_epoch: None,
2652                datoms: vec![sample_datom(1, 1, "worker-b", 3, None)],
2653            })
2654            .expect("append updated authority");
2655
2656        let updated = service
2657            .federated_run_document(FederatedRunDocumentRequest {
2658                dsl: federated_assignment_query_document(),
2659                imports: vec![
2660                    ImportedFactQueryRequest {
2661                        cut: PartitionCut::as_of("readiness", ElementId::new(1)),
2662                        dsl: readiness_document(),
2663                        predicate: PredicateRef {
2664                            id: PredicateId::new(11),
2665                            name: "imported_ready_task".into(),
2666                            arity: 1,
2667                        },
2668                        query_name: Some("ready_now".into()),
2669                    },
2670                    ImportedFactQueryRequest {
2671                        cut: PartitionCut::as_of("authority", ElementId::new(3)),
2672                        dsl: authority_document(),
2673                        predicate: PredicateRef {
2674                            id: PredicateId::new(12),
2675                            name: "imported_authorized_worker".into(),
2676                            arity: 2,
2677                        },
2678                        query_name: Some("authorized_now".into()),
2679                    },
2680                ],
2681                policy_context: None,
2682            })
2683            .expect("updated federated run");
2684        assert_eq!(
2685            updated.run.query.as_ref().expect("query result").rows[0].values,
2686            vec![
2687                Value::Entity(EntityId::new(1)),
2688                Value::String("worker-b".into())
2689            ]
2690        );
2691    }
2692
2693    fn schema() -> Schema {
2694        let mut schema = Schema::new("partitioned-v1");
2695        schema
2696            .register_attribute(AttributeSchema {
2697                id: AttributeId::new(1),
2698                name: "task.status".into(),
2699                class: AttributeClass::ScalarLww,
2700                value_type: ValueType::String,
2701            })
2702            .expect("register status attribute");
2703        schema
2704    }
2705
2706    fn readiness_document() -> String {
2707        r#"
2708schema {
2709  attr task.status: ScalarLWW<String>
2710}
2711
2712predicates {
2713  task_status(Entity, String)
2714  ready_task(Entity)
2715}
2716
2717rules {
2718  ready_task(t) <- task_status(t, "ready")
2719}
2720
2721materialize {
2722  ready_task
2723}
2724
2725query ready_now {
2726  current
2727  goal ready_task(t)
2728  keep t
2729}
2730"#
2731        .into()
2732    }
2733
2734    fn authority_document() -> String {
2735        r#"
2736schema {
2737  attr task.owner: ScalarLWW<String>
2738}
2739
2740predicates {
2741  task_owner(Entity, String)
2742  authorized_worker(Entity, String)
2743}
2744
2745rules {
2746  authorized_worker(t, worker) <- task_owner(t, worker)
2747}
2748
2749materialize {
2750  authorized_worker
2751}
2752
2753query authorized_now {
2754  current
2755  goal authorized_worker(t, worker)
2756  keep t, worker
2757}
2758"#
2759        .into()
2760    }
2761
2762    fn federated_assignment_document() -> String {
2763        r#"
2764schema {
2765}
2766
2767predicates {
2768  imported_ready_task(Entity)
2769  imported_authorized_worker(Entity, String)
2770  actionable_assignment(Entity, String)
2771}
2772
2773rules {
2774  actionable_assignment(t, worker) <- imported_ready_task(t), imported_authorized_worker(t, worker)
2775}
2776
2777materialize {
2778  actionable_assignment
2779}
2780
2781query actionable_now {
2782  current
2783  goal actionable_assignment(t, worker)
2784  keep t, worker
2785}
2786
2787explain actionable_trace {
2788  tuple actionable_assignment(entity(1), "worker-a")
2789}
2790"#
2791        .into()
2792    }
2793
2794    fn federated_assignment_query_document() -> String {
2795        r#"
2796schema {
2797}
2798
2799predicates {
2800  imported_ready_task(Entity)
2801  imported_authorized_worker(Entity, String)
2802  actionable_assignment(Entity, String)
2803}
2804
2805rules {
2806  actionable_assignment(t, worker) <- imported_ready_task(t), imported_authorized_worker(t, worker)
2807}
2808
2809materialize {
2810  actionable_assignment
2811}
2812
2813query actionable_now {
2814  current
2815  goal actionable_assignment(t, worker)
2816  keep t, worker
2817}
2818"#
2819        .into()
2820    }
2821
2822    fn joined_import_document() -> String {
2823        r#"
2824schema {
2825  attr task.status: ScalarLWW<String>
2826  attr task.owner: ScalarLWW<String>
2827}
2828
2829predicates {
2830  task_status(Entity, String)
2831  task_owner(Entity, String)
2832}
2833
2834rules {
2835}
2836
2837query joined_now {
2838  current
2839  goal task_status(t, "ready")
2840  goal task_owner(t, worker)
2841  keep t, worker
2842}
2843"#
2844        .into()
2845    }
2846
2847    fn sample_datom(
2848        entity: u64,
2849        attribute: u64,
2850        value: &str,
2851        element: u64,
2852        policy: Option<PolicyEnvelope>,
2853    ) -> Datom {
2854        Datom {
2855            entity: EntityId::new(entity),
2856            attribute: AttributeId::new(attribute),
2857            value: Value::String(value.into()),
2858            op: OperationKind::Assert,
2859            element: ElementId::new(element),
2860            replica: ReplicaId::new(1),
2861            causal_context: Default::default(),
2862            provenance: DatomProvenance::default(),
2863            policy,
2864        }
2865    }
2866
2867    struct TestPartitionDir {
2868        path: PathBuf,
2869    }
2870
2871    impl TestPartitionDir {
2872        fn new(name: &str) -> Self {
2873            let unique = NEXT_TEST_ID.fetch_add(1, Ordering::Relaxed);
2874            let nanos = SystemTime::now()
2875                .duration_since(UNIX_EPOCH)
2876                .expect("system time")
2877                .as_nanos();
2878            let mut path = std::env::temp_dir();
2879            path.push(format!("aether-partitions-{name}-{nanos}-{unique}"));
2880            Self { path }
2881        }
2882
2883        fn path(&self) -> &Path {
2884            &self.path
2885        }
2886    }
2887
2888    impl Drop for TestPartitionDir {
2889        fn drop(&mut self) {
2890            let _ = std::fs::remove_dir_all(&self.path);
2891        }
2892    }
2893}