aether_api/
sidecar.rs

1use aether_ast::{
2    merge_policy_envelopes, policy_allows, Datom, DatomProvenance, ElementId, EntityId,
3    ExtensionalFact, FactProvenance, PolicyContext, PolicyEnvelope, PredicateRef, SidecarKind,
4    SidecarOrigin, SourceRef, Value,
5};
6use indexmap::IndexMap;
7use rusqlite::{params, Connection, OptionalExtension};
8use serde::{Deserialize, Serialize};
9use std::{
10    cmp::Ordering,
11    collections::{BTreeMap, BTreeSet},
12    fs,
13    path::{Path, PathBuf},
14};
15use thiserror::Error;
16
17pub trait SidecarFederation {
18    fn register_artifact_reference(
19        &mut self,
20        request: RegisterArtifactReferenceRequest,
21        journal: &JournalCatalog,
22    ) -> Result<RegisterArtifactReferenceResponse, SidecarError>;
23    fn get_artifact_reference(
24        &self,
25        request: GetArtifactReferenceRequest,
26    ) -> Result<GetArtifactReferenceResponse, SidecarError>;
27    fn register_vector_record(
28        &mut self,
29        request: RegisterVectorRecordRequest,
30        journal: &JournalCatalog,
31    ) -> Result<RegisterVectorRecordResponse, SidecarError>;
32    fn search_vectors(
33        &self,
34        request: SearchVectorsRequest,
35        journal: &JournalCatalog,
36    ) -> Result<SearchVectorsResponse, SidecarError>;
37}
38
39#[derive(Clone, Debug, Default)]
40pub struct JournalCatalog {
41    positions: BTreeMap<ElementId, usize>,
42    latest: Option<ElementId>,
43}
44
45impl JournalCatalog {
46    pub fn from_history(history: &[Datom]) -> Self {
47        let mut positions = BTreeMap::new();
48        let mut latest = None;
49        for (position, datom) in history.iter().enumerate() {
50            positions.insert(datom.element, position);
51            latest = Some(datom.element);
52        }
53
54        Self { positions, latest }
55    }
56
57    fn ensure_known(&self, element: ElementId) -> Result<usize, SidecarError> {
58        self.positions
59            .get(&element)
60            .copied()
61            .ok_or(SidecarError::UnknownJournalElement(element))
62    }
63
64    fn ensure_current_tail(&self, element: ElementId) -> Result<(), SidecarError> {
65        self.ensure_known(element)?;
66        match self.latest {
67            Some(current_tail) if current_tail == element => Ok(()),
68            Some(current_tail) => Err(SidecarError::NonCurrentJournalElement {
69                element,
70                current_tail,
71            }),
72            None => Err(SidecarError::UnknownJournalElement(element)),
73        }
74    }
75
76    fn position_of(&self, element: ElementId) -> Result<usize, SidecarError> {
77        self.ensure_known(element)
78    }
79
80    fn is_visible_at(
81        &self,
82        element: ElementId,
83        as_of: Option<ElementId>,
84    ) -> Result<bool, SidecarError> {
85        let position = self.position_of(element)?;
86        match as_of {
87            Some(cutoff) => Ok(position <= self.position_of(cutoff)?),
88            None => Ok(true),
89        }
90    }
91}
92
93#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
94#[serde(rename_all = "snake_case")]
95pub enum VectorMetric {
96    #[default]
97    Cosine,
98    DotProduct,
99}
100
101#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
102pub struct ArtifactReference {
103    pub sidecar_id: String,
104    pub artifact_id: String,
105    pub entity: EntityId,
106    pub uri: String,
107    pub media_type: String,
108    pub byte_length: u64,
109    pub digest: Option<String>,
110    pub metadata: BTreeMap<String, Value>,
111    pub provenance: DatomProvenance,
112    pub policy: Option<PolicyEnvelope>,
113    pub registered_at: ElementId,
114}
115
116#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
117pub struct VectorRecordMetadata {
118    pub sidecar_id: String,
119    pub vector_id: String,
120    pub entity: EntityId,
121    pub source_artifact_id: Option<String>,
122    pub embedding_ref: String,
123    pub dimensions: usize,
124    pub metric: VectorMetric,
125    pub metadata: BTreeMap<String, Value>,
126    pub provenance: DatomProvenance,
127    pub policy: Option<PolicyEnvelope>,
128    pub registered_at: ElementId,
129}
130
131#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
132pub struct VectorFactProjection {
133    pub predicate: PredicateRef,
134    pub query_entity: EntityId,
135}
136
137#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
138pub struct RegisterArtifactReferenceRequest {
139    pub reference: ArtifactReference,
140}
141
142#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
143pub struct RegisterArtifactReferenceResponse {
144    pub reference: ArtifactReference,
145}
146
147#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
148pub struct GetArtifactReferenceRequest {
149    pub sidecar_id: String,
150    pub artifact_id: String,
151    #[serde(default, skip_serializing_if = "Option::is_none")]
152    pub policy_context: Option<PolicyContext>,
153}
154
155#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
156pub struct GetArtifactReferenceResponse {
157    pub reference: ArtifactReference,
158}
159
160#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
161pub struct RegisterVectorRecordRequest {
162    pub record: VectorRecordMetadata,
163    pub embedding: Vec<f32>,
164}
165
166#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
167pub struct RegisterVectorRecordResponse {
168    pub record: VectorRecordMetadata,
169}
170
171#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
172pub struct SearchVectorsRequest {
173    pub sidecar_id: String,
174    pub query_embedding: Vec<f32>,
175    pub top_k: usize,
176    pub metric: VectorMetric,
177    pub as_of: Option<ElementId>,
178    pub projection: Option<VectorFactProjection>,
179    #[serde(default, skip_serializing_if = "Option::is_none")]
180    pub policy_context: Option<PolicyContext>,
181}
182
183#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
184pub struct VectorSearchMatch {
185    pub vector_id: String,
186    pub entity: EntityId,
187    pub source_artifact_id: Option<String>,
188    pub source_artifact_uri: Option<String>,
189    pub score: f64,
190    pub provenance: FactProvenance,
191    pub metadata: BTreeMap<String, Value>,
192    pub policy: Option<PolicyEnvelope>,
193}
194
195#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
196pub struct SearchVectorsResponse {
197    pub matches: Vec<VectorSearchMatch>,
198    pub facts: Vec<ExtensionalFact>,
199}
200
201#[derive(Clone, Debug, Default)]
202pub struct InMemorySidecarFederation {
203    artifacts: IndexMap<(String, String), ArtifactReference>,
204    vectors: IndexMap<(String, String), StoredVectorRecord>,
205    catalog_elements: BTreeSet<ElementId>,
206}
207
208#[derive(Debug)]
209pub struct SqliteSidecarFederation {
210    connection: Connection,
211    path: PathBuf,
212}
213
214#[derive(Clone, Debug)]
215struct StoredVectorRecord {
216    metadata: VectorRecordMetadata,
217    embedding: Vec<f32>,
218}
219
220pub fn sidecar_catalog_path_for_journal(journal_path: impl AsRef<Path>) -> PathBuf {
221    PathBuf::from(format!(
222        "{}.sidecars.sqlite",
223        journal_path.as_ref().display()
224    ))
225}
226
227impl SidecarFederation for InMemorySidecarFederation {
228    fn register_artifact_reference(
229        &mut self,
230        request: RegisterArtifactReferenceRequest,
231        journal: &JournalCatalog,
232    ) -> Result<RegisterArtifactReferenceResponse, SidecarError> {
233        self.ensure_catalog_element_is_available(request.reference.registered_at, journal)?;
234        let key = artifact_key(
235            &request.reference.sidecar_id,
236            &request.reference.artifact_id,
237        );
238        if self.artifacts.contains_key(&key) {
239            return Err(SidecarError::DuplicateArtifactId {
240                sidecar_id: request.reference.sidecar_id.clone(),
241                artifact_id: request.reference.artifact_id.clone(),
242            });
243        }
244
245        self.record_catalog_element(request.reference.registered_at);
246        self.artifacts.insert(key, request.reference.clone());
247        Ok(RegisterArtifactReferenceResponse {
248            reference: request.reference,
249        })
250    }
251
252    fn get_artifact_reference(
253        &self,
254        request: GetArtifactReferenceRequest,
255    ) -> Result<GetArtifactReferenceResponse, SidecarError> {
256        let key = artifact_key(&request.sidecar_id, &request.artifact_id);
257        let reference =
258            self.artifacts
259                .get(&key)
260                .cloned()
261                .ok_or(SidecarError::UnknownArtifactId {
262                    sidecar_id: request.sidecar_id,
263                    artifact_id: request.artifact_id,
264                })?;
265        ensure_policy_allows_artifact(&reference, request.policy_context.as_ref())?;
266        Ok(GetArtifactReferenceResponse { reference })
267    }
268
269    fn register_vector_record(
270        &mut self,
271        request: RegisterVectorRecordRequest,
272        journal: &JournalCatalog,
273    ) -> Result<RegisterVectorRecordResponse, SidecarError> {
274        self.ensure_catalog_element_is_available(request.record.registered_at, journal)?;
275        if request.record.dimensions != request.embedding.len() {
276            return Err(SidecarError::EmbeddingDimensionMismatch {
277                vector_id: request.record.vector_id.clone(),
278                expected: request.record.dimensions,
279                actual: request.embedding.len(),
280            });
281        }
282        if let Some(artifact_id) = &request.record.source_artifact_id {
283            let artifact_key = artifact_key(&request.record.sidecar_id, artifact_id);
284            let Some(artifact) = self.artifacts.get(&artifact_key) else {
285                return Err(SidecarError::UnknownArtifactId {
286                    sidecar_id: request.record.sidecar_id.clone(),
287                    artifact_id: artifact_id.clone(),
288                });
289            };
290            ensure_artifact_precedes_vector(artifact, &request.record, journal)?;
291        }
292
293        let key = vector_key(&request.record.sidecar_id, &request.record.vector_id);
294        if self.vectors.contains_key(&key) {
295            return Err(SidecarError::DuplicateVectorId {
296                sidecar_id: request.record.sidecar_id.clone(),
297                vector_id: request.record.vector_id.clone(),
298            });
299        }
300
301        self.record_catalog_element(request.record.registered_at);
302        self.vectors.insert(
303            key,
304            StoredVectorRecord {
305                metadata: request.record.clone(),
306                embedding: request.embedding,
307            },
308        );
309        Ok(RegisterVectorRecordResponse {
310            record: request.record,
311        })
312    }
313
314    fn search_vectors(
315        &self,
316        request: SearchVectorsRequest,
317        journal: &JournalCatalog,
318    ) -> Result<SearchVectorsResponse, SidecarError> {
319        validate_projection(request.projection.as_ref())?;
320        let mut records = Vec::new();
321        for record in self.vectors.values() {
322            if record.metadata.sidecar_id != request.sidecar_id
323                || record.metadata.metric != request.metric
324                || record.metadata.dimensions != request.query_embedding.len()
325                || !journal.is_visible_at(record.metadata.registered_at, request.as_of)?
326                || !policy_allows(
327                    request.policy_context.as_ref(),
328                    record.metadata.policy.as_ref(),
329                )
330            {
331                continue;
332            }
333
334            let artifact = record
335                .metadata
336                .source_artifact_id
337                .as_ref()
338                .and_then(|artifact_id| {
339                    self.artifacts
340                        .get(&artifact_key(&record.metadata.sidecar_id, artifact_id))
341                })
342                .cloned();
343            if artifact.as_ref().is_some_and(|artifact| {
344                !policy_allows(request.policy_context.as_ref(), artifact.policy.as_ref())
345            }) {
346                continue;
347            }
348            records.push((record.clone(), artifact));
349        }
350        build_search_response(request, records)
351    }
352}
353
354impl InMemorySidecarFederation {
355    fn ensure_catalog_element_is_available(
356        &self,
357        element: ElementId,
358        journal: &JournalCatalog,
359    ) -> Result<(), SidecarError> {
360        journal.ensure_current_tail(element)?;
361        if self.catalog_elements.contains(&element) {
362            return Err(SidecarError::DuplicateCatalogElement(element));
363        }
364        Ok(())
365    }
366
367    fn record_catalog_element(&mut self, element: ElementId) {
368        self.catalog_elements.insert(element);
369    }
370}
371
372impl SqliteSidecarFederation {
373    pub fn open(path: impl AsRef<Path>) -> Result<Self, SidecarError> {
374        let path = path.as_ref().to_path_buf();
375        if let Some(parent) = path.parent() {
376            fs::create_dir_all(parent)?;
377        }
378
379        let connection = Connection::open(&path)?;
380        initialize_sqlite_schema(&connection)?;
381
382        Ok(Self { connection, path })
383    }
384
385    pub fn path(&self) -> &Path {
386        &self.path
387    }
388
389    fn artifact_exists(&self, sidecar_id: &str, artifact_id: &str) -> Result<bool, SidecarError> {
390        Ok(self
391            .connection
392            .query_row(
393                "SELECT 1 FROM artifact_references WHERE sidecar_id = ?1 AND artifact_id = ?2",
394                params![sidecar_id, artifact_id],
395                |_row| Ok(()),
396            )
397            .optional()?
398            .is_some())
399    }
400
401    fn vector_exists(&self, sidecar_id: &str, vector_id: &str) -> Result<bool, SidecarError> {
402        Ok(self
403            .connection
404            .query_row(
405                "SELECT 1 FROM vector_records WHERE sidecar_id = ?1 AND vector_id = ?2",
406                params![sidecar_id, vector_id],
407                |_row| Ok(()),
408            )
409            .optional()?
410            .is_some())
411    }
412
413    fn catalog_element_exists(&self, element: ElementId) -> Result<bool, SidecarError> {
414        Ok(self
415            .connection
416            .query_row(
417                "SELECT 1 FROM sidecar_catalog WHERE element = ?1",
418                params![element_key(element)],
419                |_row| Ok(()),
420            )
421            .optional()?
422            .is_some())
423    }
424
425    fn lookup_artifact(
426        &self,
427        sidecar_id: &str,
428        artifact_id: &str,
429    ) -> Result<Option<ArtifactReference>, SidecarError> {
430        let json = self
431            .connection
432            .query_row(
433                "SELECT reference_json FROM artifact_references WHERE sidecar_id = ?1 AND artifact_id = ?2",
434                params![sidecar_id, artifact_id],
435                |row| row.get::<_, String>(0),
436            )
437            .optional()?;
438        json.map(|json| serde_json::from_str(&json))
439            .transpose()
440            .map_err(SidecarError::from)
441    }
442
443    fn visible_vector_records(
444        &self,
445        request: &SearchVectorsRequest,
446        journal: &JournalCatalog,
447    ) -> Result<Vec<(StoredVectorRecord, Option<ArtifactReference>)>, SidecarError> {
448        let mut statement = self.connection.prepare(
449            "SELECT metadata_json, embedding_json
450             FROM vector_records
451             WHERE sidecar_id = ?1",
452        )?;
453        let rows = statement.query_map(params![&request.sidecar_id], |row| {
454            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
455        })?;
456
457        let mut records = Vec::new();
458        for row in rows {
459            let (metadata_json, embedding_json) = row?;
460            let metadata: VectorRecordMetadata = serde_json::from_str(&metadata_json)?;
461            if metadata.metric != request.metric
462                || metadata.dimensions != request.query_embedding.len()
463            {
464                continue;
465            }
466            if !journal.is_visible_at(metadata.registered_at, request.as_of)? {
467                continue;
468            }
469            if !policy_allows(request.policy_context.as_ref(), metadata.policy.as_ref()) {
470                continue;
471            }
472            let embedding: Vec<f32> = serde_json::from_str(&embedding_json)?;
473            let artifact = match &metadata.source_artifact_id {
474                Some(artifact_id) => self.lookup_artifact(&metadata.sidecar_id, artifact_id)?,
475                None => None,
476            };
477            if artifact.as_ref().is_some_and(|artifact| {
478                !policy_allows(request.policy_context.as_ref(), artifact.policy.as_ref())
479            }) {
480                continue;
481            }
482            records.push((
483                StoredVectorRecord {
484                    metadata,
485                    embedding,
486                },
487                artifact,
488            ));
489        }
490        Ok(records)
491    }
492}
493
494impl SidecarFederation for SqliteSidecarFederation {
495    fn register_artifact_reference(
496        &mut self,
497        request: RegisterArtifactReferenceRequest,
498        journal: &JournalCatalog,
499    ) -> Result<RegisterArtifactReferenceResponse, SidecarError> {
500        journal.ensure_current_tail(request.reference.registered_at)?;
501        if self.catalog_element_exists(request.reference.registered_at)? {
502            return Err(SidecarError::DuplicateCatalogElement(
503                request.reference.registered_at,
504            ));
505        }
506        if self.artifact_exists(
507            &request.reference.sidecar_id,
508            &request.reference.artifact_id,
509        )? {
510            return Err(SidecarError::DuplicateArtifactId {
511                sidecar_id: request.reference.sidecar_id.clone(),
512                artifact_id: request.reference.artifact_id.clone(),
513            });
514        }
515
516        let transaction = self.connection.transaction()?;
517        transaction.execute(
518            "INSERT INTO sidecar_catalog (element, kind, sidecar_id, record_id)
519             VALUES (?1, ?2, ?3, ?4)",
520            params![
521                element_key(request.reference.registered_at),
522                "artifact",
523                &request.reference.sidecar_id,
524                &request.reference.artifact_id,
525            ],
526        )?;
527        transaction.execute(
528            "INSERT INTO artifact_references (sidecar_id, artifact_id, catalog_element, reference_json)
529             VALUES (?1, ?2, ?3, ?4)",
530            params![
531                &request.reference.sidecar_id,
532                &request.reference.artifact_id,
533                element_key(request.reference.registered_at),
534                serde_json::to_string(&request.reference)?,
535            ],
536        )?;
537        transaction.commit()?;
538
539        Ok(RegisterArtifactReferenceResponse {
540            reference: request.reference,
541        })
542    }
543
544    fn get_artifact_reference(
545        &self,
546        request: GetArtifactReferenceRequest,
547    ) -> Result<GetArtifactReferenceResponse, SidecarError> {
548        let Some(reference) = self.lookup_artifact(&request.sidecar_id, &request.artifact_id)?
549        else {
550            return Err(SidecarError::UnknownArtifactId {
551                sidecar_id: request.sidecar_id,
552                artifact_id: request.artifact_id,
553            });
554        };
555        ensure_policy_allows_artifact(&reference, request.policy_context.as_ref())?;
556        Ok(GetArtifactReferenceResponse { reference })
557    }
558
559    fn register_vector_record(
560        &mut self,
561        request: RegisterVectorRecordRequest,
562        journal: &JournalCatalog,
563    ) -> Result<RegisterVectorRecordResponse, SidecarError> {
564        journal.ensure_current_tail(request.record.registered_at)?;
565        if self.catalog_element_exists(request.record.registered_at)? {
566            return Err(SidecarError::DuplicateCatalogElement(
567                request.record.registered_at,
568            ));
569        }
570        if request.record.dimensions != request.embedding.len() {
571            return Err(SidecarError::EmbeddingDimensionMismatch {
572                vector_id: request.record.vector_id.clone(),
573                expected: request.record.dimensions,
574                actual: request.embedding.len(),
575            });
576        }
577        if let Some(artifact_id) = &request.record.source_artifact_id {
578            let Some(artifact) = self.lookup_artifact(&request.record.sidecar_id, artifact_id)?
579            else {
580                return Err(SidecarError::UnknownArtifactId {
581                    sidecar_id: request.record.sidecar_id.clone(),
582                    artifact_id: artifact_id.clone(),
583                });
584            };
585            ensure_artifact_precedes_vector(&artifact, &request.record, journal)?;
586        }
587        if self.vector_exists(&request.record.sidecar_id, &request.record.vector_id)? {
588            return Err(SidecarError::DuplicateVectorId {
589                sidecar_id: request.record.sidecar_id.clone(),
590                vector_id: request.record.vector_id.clone(),
591            });
592        }
593
594        let transaction = self.connection.transaction()?;
595        transaction.execute(
596            "INSERT INTO sidecar_catalog (element, kind, sidecar_id, record_id)
597             VALUES (?1, ?2, ?3, ?4)",
598            params![
599                element_key(request.record.registered_at),
600                "vector",
601                &request.record.sidecar_id,
602                &request.record.vector_id,
603            ],
604        )?;
605        transaction.execute(
606            "INSERT INTO vector_records (sidecar_id, vector_id, catalog_element, metadata_json, embedding_json)
607             VALUES (?1, ?2, ?3, ?4, ?5)",
608            params![
609                &request.record.sidecar_id,
610                &request.record.vector_id,
611                element_key(request.record.registered_at),
612                serde_json::to_string(&request.record)?,
613                serde_json::to_string(&request.embedding)?,
614            ],
615        )?;
616        transaction.commit()?;
617
618        Ok(RegisterVectorRecordResponse {
619            record: request.record,
620        })
621    }
622
623    fn search_vectors(
624        &self,
625        request: SearchVectorsRequest,
626        journal: &JournalCatalog,
627    ) -> Result<SearchVectorsResponse, SidecarError> {
628        validate_projection(request.projection.as_ref())?;
629        let records = self.visible_vector_records(&request, journal)?;
630        build_search_response(request, records)
631    }
632}
633
634fn initialize_sqlite_schema(connection: &Connection) -> Result<(), SidecarError> {
635    connection.execute_batch(
636        "
637        CREATE TABLE IF NOT EXISTS sidecar_catalog (
638            seq INTEGER PRIMARY KEY AUTOINCREMENT,
639            element TEXT NOT NULL UNIQUE,
640            kind TEXT NOT NULL,
641            sidecar_id TEXT NOT NULL,
642            record_id TEXT NOT NULL
643        );
644        CREATE TABLE IF NOT EXISTS artifact_references (
645            sidecar_id TEXT NOT NULL,
646            artifact_id TEXT NOT NULL,
647            catalog_element TEXT NOT NULL UNIQUE,
648            reference_json TEXT NOT NULL,
649            PRIMARY KEY (sidecar_id, artifact_id)
650        );
651        CREATE TABLE IF NOT EXISTS vector_records (
652            sidecar_id TEXT NOT NULL,
653            vector_id TEXT NOT NULL,
654            catalog_element TEXT NOT NULL UNIQUE,
655            metadata_json TEXT NOT NULL,
656            embedding_json TEXT NOT NULL,
657            PRIMARY KEY (sidecar_id, vector_id)
658        );
659        CREATE INDEX IF NOT EXISTS sidecar_catalog_by_element
660            ON sidecar_catalog(element);
661        CREATE INDEX IF NOT EXISTS artifact_references_by_sidecar
662            ON artifact_references(sidecar_id, artifact_id);
663        CREATE INDEX IF NOT EXISTS vector_records_by_sidecar
664            ON vector_records(sidecar_id, vector_id);
665        ",
666    )?;
667    Ok(())
668}
669
670fn artifact_key(sidecar_id: &str, artifact_id: &str) -> (String, String) {
671    (sidecar_id.to_string(), artifact_id.to_string())
672}
673
674fn vector_key(sidecar_id: &str, vector_id: &str) -> (String, String) {
675    (sidecar_id.to_string(), vector_id.to_string())
676}
677
678fn element_key(element: ElementId) -> String {
679    element.0.to_string()
680}
681
682fn validate_projection(projection: Option<&VectorFactProjection>) -> Result<(), SidecarError> {
683    if let Some(projection) = projection {
684        if projection.predicate.arity != 3 {
685            return Err(SidecarError::UnsupportedProjectionArity {
686                predicate: projection.predicate.name.clone(),
687                arity: projection.predicate.arity,
688            });
689        }
690    }
691    Ok(())
692}
693
694fn ensure_policy_allows_artifact(
695    reference: &ArtifactReference,
696    policy_context: Option<&PolicyContext>,
697) -> Result<(), SidecarError> {
698    if policy_allows(policy_context, reference.policy.as_ref()) {
699        Ok(())
700    } else {
701        Err(SidecarError::PolicyDenied {
702            subject: format!(
703                "artifact {}:{}",
704                reference.sidecar_id, reference.artifact_id
705            ),
706        })
707    }
708}
709
710fn build_search_response(
711    request: SearchVectorsRequest,
712    records: Vec<(StoredVectorRecord, Option<ArtifactReference>)>,
713) -> Result<SearchVectorsResponse, SidecarError> {
714    let mut matches = records
715        .into_iter()
716        .map(|(record, artifact)| {
717            let provenance = FactProvenance {
718                source_datom_ids: source_datom_ids_for_record(&record.metadata, artifact.as_ref()),
719                imported_cuts: Vec::new(),
720                sidecar_origin: Some(SidecarOrigin {
721                    kind: SidecarKind::Vector,
722                    sidecar_id: record.metadata.sidecar_id.clone(),
723                    record_id: record.metadata.vector_id.clone(),
724                }),
725                source_ref: Some(artifact.as_ref().map_or_else(
726                    || SourceRef {
727                        uri: record.metadata.embedding_ref.clone(),
728                        digest: None,
729                    },
730                    |artifact| SourceRef {
731                        uri: artifact.uri.clone(),
732                        digest: artifact.digest.clone(),
733                    },
734                )),
735            };
736            VectorSearchMatch {
737                vector_id: record.metadata.vector_id.clone(),
738                entity: record.metadata.entity,
739                source_artifact_id: record.metadata.source_artifact_id.clone(),
740                source_artifact_uri: artifact.as_ref().map(|artifact| artifact.uri.clone()),
741                score: similarity_score(
742                    request.metric,
743                    &request.query_embedding,
744                    &record.embedding,
745                ),
746                provenance,
747                metadata: record.metadata.metadata.clone(),
748                policy: merge_policy_envelopes([
749                    record.metadata.policy.as_ref(),
750                    artifact
751                        .as_ref()
752                        .and_then(|artifact| artifact.policy.as_ref()),
753                ]),
754            }
755        })
756        .collect::<Vec<_>>();
757
758    matches.sort_by(|left, right| {
759        right
760            .score
761            .partial_cmp(&left.score)
762            .unwrap_or(Ordering::Equal)
763            .then_with(|| left.vector_id.cmp(&right.vector_id))
764    });
765    matches.truncate(request.top_k.max(1));
766
767    let facts = match &request.projection {
768        Some(projection) => matches
769            .iter()
770            .map(|item| ExtensionalFact {
771                predicate: projection.predicate.clone(),
772                values: vec![
773                    Value::Entity(projection.query_entity),
774                    Value::Entity(item.entity),
775                    Value::F64(item.score),
776                ],
777                policy: item.policy.clone(),
778                provenance: Some(item.provenance.clone()),
779            })
780            .collect(),
781        None => Vec::new(),
782    };
783
784    Ok(SearchVectorsResponse { matches, facts })
785}
786
787fn ensure_artifact_precedes_vector(
788    artifact: &ArtifactReference,
789    record: &VectorRecordMetadata,
790    journal: &JournalCatalog,
791) -> Result<(), SidecarError> {
792    let artifact_position = journal.position_of(artifact.registered_at)?;
793    let vector_position = journal.position_of(record.registered_at)?;
794    if artifact_position > vector_position {
795        return Err(SidecarError::SourceArtifactRegisteredAfterVector {
796            vector_id: record.vector_id.clone(),
797            artifact_id: artifact.artifact_id.clone(),
798        });
799    }
800    Ok(())
801}
802
803fn source_datom_ids_for_record(
804    record: &VectorRecordMetadata,
805    artifact: Option<&ArtifactReference>,
806) -> Vec<ElementId> {
807    let mut source_datom_ids = vec![record.registered_at];
808    if let Some(artifact) = artifact {
809        if artifact.registered_at != record.registered_at {
810            source_datom_ids.push(artifact.registered_at);
811        }
812    }
813    source_datom_ids
814}
815
816fn similarity_score(metric: VectorMetric, query: &[f32], candidate: &[f32]) -> f64 {
817    match metric {
818        VectorMetric::Cosine => cosine_similarity(query, candidate),
819        VectorMetric::DotProduct => dot_product(query, candidate),
820    }
821}
822
823fn cosine_similarity(query: &[f32], candidate: &[f32]) -> f64 {
824    let numerator = dot_product(query, candidate);
825    let query_norm = query
826        .iter()
827        .map(|value| f64::from(*value) * f64::from(*value))
828        .sum::<f64>()
829        .sqrt();
830    let candidate_norm = candidate
831        .iter()
832        .map(|value| f64::from(*value) * f64::from(*value))
833        .sum::<f64>()
834        .sqrt();
835    if query_norm == 0.0 || candidate_norm == 0.0 {
836        0.0
837    } else {
838        numerator / (query_norm * candidate_norm)
839    }
840}
841
842fn dot_product(query: &[f32], candidate: &[f32]) -> f64 {
843    query
844        .iter()
845        .zip(candidate)
846        .map(|(left, right)| f64::from(*left) * f64::from(*right))
847        .sum()
848}
849
850#[derive(Debug, Error)]
851pub enum SidecarError {
852    #[error("sidecar catalog already contains element {0}")]
853    DuplicateCatalogElement(ElementId),
854    #[error("journal does not contain element {0}")]
855    UnknownJournalElement(ElementId),
856    #[error(
857        "sidecar registration element {element} must match the current journal tail {current_tail}"
858    )]
859    NonCurrentJournalElement {
860        element: ElementId,
861        current_tail: ElementId,
862    },
863    #[error("sidecar {sidecar_id} already contains artifact {artifact_id}")]
864    DuplicateArtifactId {
865        sidecar_id: String,
866        artifact_id: String,
867    },
868    #[error("sidecar {sidecar_id} does not contain artifact {artifact_id}")]
869    UnknownArtifactId {
870        sidecar_id: String,
871        artifact_id: String,
872    },
873    #[error("sidecar {sidecar_id} already contains vector {vector_id}")]
874    DuplicateVectorId {
875        sidecar_id: String,
876        vector_id: String,
877    },
878    #[error("vector {vector_id} declared dimension {expected}, but received {actual}")]
879    EmbeddingDimensionMismatch {
880        vector_id: String,
881        expected: usize,
882        actual: usize,
883    },
884    #[error(
885        "vector {vector_id} cannot point at artifact {artifact_id} registered later in the journal"
886    )]
887    SourceArtifactRegisteredAfterVector {
888        vector_id: String,
889        artifact_id: String,
890    },
891    #[error("vector fact projection for predicate {predicate} requires arity 3, found {arity}")]
892    UnsupportedProjectionArity { predicate: String, arity: usize },
893    #[error("policy denied access to {subject}")]
894    PolicyDenied { subject: String },
895    #[error(transparent)]
896    Io(#[from] std::io::Error),
897    #[error(transparent)]
898    Sqlite(#[from] rusqlite::Error),
899    #[error(transparent)]
900    Serde(#[from] serde_json::Error),
901}
902
903#[cfg(test)]
904mod tests {
905    use super::{
906        ArtifactReference, GetArtifactReferenceRequest, InMemorySidecarFederation, JournalCatalog,
907        RegisterArtifactReferenceRequest, RegisterVectorRecordRequest, SearchVectorsRequest,
908        SidecarError, SidecarFederation, VectorFactProjection, VectorMetric, VectorRecordMetadata,
909    };
910    use aether_ast::{
911        AttributeId, Datom, DatomProvenance, ElementId, EntityId, OperationKind, PolicyContext,
912        PolicyEnvelope, PredicateId, PredicateRef, ReplicaId, SidecarKind, Value,
913    };
914    use std::collections::BTreeMap;
915
916    fn journal_catalog(elements: &[u64]) -> JournalCatalog {
917        JournalCatalog::from_history(
918            &elements
919                .iter()
920                .map(|element| Datom {
921                    entity: EntityId::new(1),
922                    attribute: AttributeId::new(1),
923                    value: Value::String(format!("anchor-{element}")),
924                    op: OperationKind::Annotate,
925                    element: ElementId::new(*element),
926                    replica: ReplicaId::new(1),
927                    causal_context: Default::default(),
928                    provenance: DatomProvenance::default(),
929                    policy: None,
930                })
931                .collect::<Vec<_>>(),
932        )
933    }
934
935    #[test]
936    fn artifact_references_remain_external_metadata() {
937        let mut federation = InMemorySidecarFederation::default();
938        let journal = journal_catalog(&[1]);
939        let reference = ArtifactReference {
940            sidecar_id: "artifact-store".into(),
941            artifact_id: "artifact-1".into(),
942            entity: EntityId::new(10),
943            uri: "s3://aether/artifacts/artifact-1.pdf".into(),
944            media_type: "application/pdf".into(),
945            byte_length: 4096,
946            digest: Some("sha256:artifact-1".into()),
947            metadata: BTreeMap::from([("title".into(), Value::String("Runbook".into()))]),
948            provenance: DatomProvenance::default(),
949            policy: None,
950            registered_at: ElementId::new(1),
951        };
952
953        federation
954            .register_artifact_reference(
955                RegisterArtifactReferenceRequest {
956                    reference: reference.clone(),
957                },
958                &journal,
959            )
960            .expect("register artifact reference");
961        let fetched = federation
962            .get_artifact_reference(GetArtifactReferenceRequest {
963                sidecar_id: "artifact-store".into(),
964                artifact_id: "artifact-1".into(),
965                policy_context: None,
966            })
967            .expect("fetch artifact reference")
968            .reference;
969
970        assert_eq!(fetched, reference);
971        assert_eq!(fetched.uri, "s3://aether/artifacts/artifact-1.pdf");
972    }
973
974    #[test]
975    fn vector_search_projects_provenance_bearing_semantic_facts() {
976        let mut federation = InMemorySidecarFederation::default();
977        let artifact_journal = journal_catalog(&[1]);
978        federation
979            .register_artifact_reference(
980                RegisterArtifactReferenceRequest {
981                    reference: ArtifactReference {
982                        sidecar_id: "vector-store".into(),
983                        artifact_id: "doc-1".into(),
984                        entity: EntityId::new(21),
985                        uri: "s3://aether/docs/doc-1.md".into(),
986                        media_type: "text/markdown".into(),
987                        byte_length: 512,
988                        digest: Some("sha256:doc-1".into()),
989                        metadata: BTreeMap::new(),
990                        provenance: DatomProvenance::default(),
991                        policy: None,
992                        registered_at: ElementId::new(1),
993                    },
994                },
995                &artifact_journal,
996            )
997            .expect("register artifact");
998        let vector_journal = journal_catalog(&[1, 2]);
999        federation
1000            .register_vector_record(
1001                RegisterVectorRecordRequest {
1002                    record: VectorRecordMetadata {
1003                        sidecar_id: "vector-store".into(),
1004                        vector_id: "vec-1".into(),
1005                        entity: EntityId::new(21),
1006                        source_artifact_id: Some("doc-1".into()),
1007                        embedding_ref: "s3://aether/vectors/vec-1.bin".into(),
1008                        dimensions: 3,
1009                        metric: VectorMetric::Cosine,
1010                        metadata: BTreeMap::from([(
1011                            "topic".into(),
1012                            Value::String("coordination".into()),
1013                        )]),
1014                        provenance: DatomProvenance::default(),
1015                        policy: None,
1016                        registered_at: ElementId::new(2),
1017                    },
1018                    embedding: vec![0.8, 0.1, 0.0],
1019                },
1020                &vector_journal,
1021            )
1022            .expect("register vector");
1023
1024        let response = federation
1025            .search_vectors(
1026                SearchVectorsRequest {
1027                    sidecar_id: "vector-store".into(),
1028                    query_embedding: vec![1.0, 0.0, 0.0],
1029                    top_k: 1,
1030                    metric: VectorMetric::Cosine,
1031                    as_of: Some(ElementId::new(2)),
1032                    projection: Some(VectorFactProjection {
1033                        predicate: PredicateRef {
1034                            id: PredicateId::new(90),
1035                            name: "similar_document".into(),
1036                            arity: 3,
1037                        },
1038                        query_entity: EntityId::new(1),
1039                    }),
1040                    policy_context: None,
1041                },
1042                &vector_journal,
1043            )
1044            .expect("search vectors");
1045
1046        assert_eq!(response.matches.len(), 1);
1047        assert_eq!(
1048            response.matches[0].source_artifact_uri.as_deref(),
1049            Some("s3://aether/docs/doc-1.md")
1050        );
1051        assert_eq!(
1052            response.matches[0]
1053                .provenance
1054                .sidecar_origin
1055                .as_ref()
1056                .map(|origin| origin.kind),
1057            Some(SidecarKind::Vector)
1058        );
1059        assert_eq!(
1060            response.facts[0]
1061                .provenance
1062                .as_ref()
1063                .expect("fact provenance")
1064                .source_datom_ids,
1065            vec![ElementId::new(2), ElementId::new(1)]
1066        );
1067    }
1068
1069    #[test]
1070    fn vector_search_respects_catalog_as_of_cut() {
1071        let mut federation = InMemorySidecarFederation::default();
1072        let first_journal = journal_catalog(&[1]);
1073        let second_journal = journal_catalog(&[1, 2]);
1074        for (vector_id, element, embedding) in
1075            [("vec-1", 1, vec![1.0, 0.0]), ("vec-2", 2, vec![0.0, 1.0])]
1076        {
1077            let journal = if element == 1 {
1078                &first_journal
1079            } else {
1080                &second_journal
1081            };
1082            federation
1083                .register_vector_record(
1084                    RegisterVectorRecordRequest {
1085                        record: VectorRecordMetadata {
1086                            sidecar_id: "vector-store".into(),
1087                            vector_id: vector_id.into(),
1088                            entity: EntityId::new(element),
1089                            source_artifact_id: None,
1090                            embedding_ref: format!("mem://{}", vector_id),
1091                            dimensions: 2,
1092                            metric: VectorMetric::DotProduct,
1093                            metadata: BTreeMap::new(),
1094                            provenance: DatomProvenance::default(),
1095                            policy: None,
1096                            registered_at: ElementId::new(element),
1097                        },
1098                        embedding,
1099                    },
1100                    journal,
1101                )
1102                .expect("register vector");
1103        }
1104
1105        let current_journal = journal_catalog(&[1, 2]);
1106        let before_second = federation
1107            .search_vectors(
1108                SearchVectorsRequest {
1109                    sidecar_id: "vector-store".into(),
1110                    query_embedding: vec![0.0, 1.0],
1111                    top_k: 4,
1112                    metric: VectorMetric::DotProduct,
1113                    as_of: Some(ElementId::new(1)),
1114                    projection: None,
1115                    policy_context: None,
1116                },
1117                &current_journal,
1118            )
1119            .expect("search vectors before second insert");
1120        assert_eq!(
1121            before_second
1122                .matches
1123                .iter()
1124                .map(|item| item.vector_id.as_str())
1125                .collect::<Vec<_>>(),
1126            vec!["vec-1"]
1127        );
1128
1129        let unknown = federation.search_vectors(
1130            SearchVectorsRequest {
1131                sidecar_id: "vector-store".into(),
1132                query_embedding: vec![0.0, 1.0],
1133                top_k: 4,
1134                metric: VectorMetric::DotProduct,
1135                as_of: Some(ElementId::new(9)),
1136                projection: None,
1137                policy_context: None,
1138            },
1139            &current_journal,
1140        );
1141        assert!(matches!(
1142            unknown,
1143            Err(SidecarError::UnknownJournalElement(id)) if id == ElementId::new(9)
1144        ));
1145    }
1146
1147    #[test]
1148    fn sidecar_registration_requires_the_current_journal_tail() {
1149        let mut federation = InMemorySidecarFederation::default();
1150        let journal = journal_catalog(&[1, 2]);
1151
1152        let result = federation.register_artifact_reference(
1153            RegisterArtifactReferenceRequest {
1154                reference: ArtifactReference {
1155                    sidecar_id: "artifact-store".into(),
1156                    artifact_id: "artifact-1".into(),
1157                    entity: EntityId::new(10),
1158                    uri: "s3://aether/artifacts/artifact-1.pdf".into(),
1159                    media_type: "application/pdf".into(),
1160                    byte_length: 4096,
1161                    digest: Some("sha256:artifact-1".into()),
1162                    metadata: BTreeMap::new(),
1163                    provenance: DatomProvenance::default(),
1164                    policy: None,
1165                    registered_at: ElementId::new(1),
1166                },
1167            },
1168            &journal,
1169        );
1170
1171        assert!(matches!(
1172            result,
1173            Err(SidecarError::NonCurrentJournalElement {
1174                element,
1175                current_tail,
1176            }) if element == ElementId::new(1) && current_tail == ElementId::new(2)
1177        ));
1178    }
1179
1180    #[test]
1181    fn sidecar_reads_and_searches_respect_policy_context() {
1182        let mut federation = InMemorySidecarFederation::default();
1183        let artifact_journal = journal_catalog(&[1]);
1184        federation
1185            .register_artifact_reference(
1186                RegisterArtifactReferenceRequest {
1187                    reference: ArtifactReference {
1188                        sidecar_id: "vector-store".into(),
1189                        artifact_id: "doc-1".into(),
1190                        entity: EntityId::new(21),
1191                        uri: "s3://aether/docs/doc-1.md".into(),
1192                        media_type: "text/markdown".into(),
1193                        byte_length: 512,
1194                        digest: Some("sha256:doc-1".into()),
1195                        metadata: BTreeMap::new(),
1196                        provenance: DatomProvenance::default(),
1197                        policy: Some(PolicyEnvelope {
1198                            capabilities: vec!["memory_reader".into()],
1199                            visibilities: Vec::new(),
1200                        }),
1201                        registered_at: ElementId::new(1),
1202                    },
1203                },
1204                &artifact_journal,
1205            )
1206            .expect("register artifact");
1207        let vector_journal = journal_catalog(&[1, 2]);
1208        federation
1209            .register_vector_record(
1210                RegisterVectorRecordRequest {
1211                    record: VectorRecordMetadata {
1212                        sidecar_id: "vector-store".into(),
1213                        vector_id: "vec-1".into(),
1214                        entity: EntityId::new(21),
1215                        source_artifact_id: Some("doc-1".into()),
1216                        embedding_ref: "s3://aether/vectors/vec-1.bin".into(),
1217                        dimensions: 3,
1218                        metric: VectorMetric::Cosine,
1219                        metadata: BTreeMap::new(),
1220                        provenance: DatomProvenance::default(),
1221                        policy: Some(PolicyEnvelope {
1222                            capabilities: vec!["memory_reader".into()],
1223                            visibilities: Vec::new(),
1224                        }),
1225                        registered_at: ElementId::new(2),
1226                    },
1227                    embedding: vec![0.8, 0.1, 0.0],
1228                },
1229                &vector_journal,
1230            )
1231            .expect("register vector");
1232
1233        let denied = federation.get_artifact_reference(GetArtifactReferenceRequest {
1234            sidecar_id: "vector-store".into(),
1235            artifact_id: "doc-1".into(),
1236            policy_context: None,
1237        });
1238        assert!(matches!(denied, Err(SidecarError::PolicyDenied { .. })));
1239
1240        let allowed = federation
1241            .get_artifact_reference(GetArtifactReferenceRequest {
1242                sidecar_id: "vector-store".into(),
1243                artifact_id: "doc-1".into(),
1244                policy_context: Some(PolicyContext {
1245                    capabilities: vec!["memory_reader".into()],
1246                    visibilities: Vec::new(),
1247                }),
1248            })
1249            .expect("fetch allowed artifact");
1250        assert_eq!(allowed.reference.artifact_id, "doc-1");
1251
1252        let hidden_search = federation
1253            .search_vectors(
1254                SearchVectorsRequest {
1255                    sidecar_id: "vector-store".into(),
1256                    query_embedding: vec![1.0, 0.0, 0.0],
1257                    top_k: 1,
1258                    metric: VectorMetric::Cosine,
1259                    as_of: Some(ElementId::new(2)),
1260                    projection: None,
1261                    policy_context: None,
1262                },
1263                &vector_journal,
1264            )
1265            .expect("search without policy");
1266        assert!(hidden_search.matches.is_empty());
1267
1268        let visible_search = federation
1269            .search_vectors(
1270                SearchVectorsRequest {
1271                    sidecar_id: "vector-store".into(),
1272                    query_embedding: vec![1.0, 0.0, 0.0],
1273                    top_k: 1,
1274                    metric: VectorMetric::Cosine,
1275                    as_of: Some(ElementId::new(2)),
1276                    projection: None,
1277                    policy_context: Some(PolicyContext {
1278                        capabilities: vec!["memory_reader".into()],
1279                        visibilities: Vec::new(),
1280                    }),
1281                },
1282                &vector_journal,
1283            )
1284            .expect("search with policy");
1285        assert_eq!(visible_search.matches.len(), 1);
1286    }
1287}