1use aether_ast::{
2 merge_policy_envelopes, policy_allows, Datom, DatomProvenance, ElementId, EntityId,
3 ExtensionalFact, FactProvenance, PolicyContext, PolicyEnvelope, PredicateRef, SidecarKind,
4 SidecarOrigin, SourceRef, Value,
5};
6use indexmap::IndexMap;
7use rusqlite::{params, Connection, OptionalExtension};
8use serde::{Deserialize, Serialize};
9use std::{
10 cmp::Ordering,
11 collections::{BTreeMap, BTreeSet},
12 fs,
13 path::{Path, PathBuf},
14};
15use thiserror::Error;
16
17pub trait SidecarFederation {
18 fn register_artifact_reference(
19 &mut self,
20 request: RegisterArtifactReferenceRequest,
21 journal: &JournalCatalog,
22 ) -> Result<RegisterArtifactReferenceResponse, SidecarError>;
23 fn get_artifact_reference(
24 &self,
25 request: GetArtifactReferenceRequest,
26 ) -> Result<GetArtifactReferenceResponse, SidecarError>;
27 fn register_vector_record(
28 &mut self,
29 request: RegisterVectorRecordRequest,
30 journal: &JournalCatalog,
31 ) -> Result<RegisterVectorRecordResponse, SidecarError>;
32 fn search_vectors(
33 &self,
34 request: SearchVectorsRequest,
35 journal: &JournalCatalog,
36 ) -> Result<SearchVectorsResponse, SidecarError>;
37}
38
39#[derive(Clone, Debug, Default)]
40pub struct JournalCatalog {
41 positions: BTreeMap<ElementId, usize>,
42 latest: Option<ElementId>,
43}
44
45impl JournalCatalog {
46 pub fn from_history(history: &[Datom]) -> Self {
47 let mut positions = BTreeMap::new();
48 let mut latest = None;
49 for (position, datom) in history.iter().enumerate() {
50 positions.insert(datom.element, position);
51 latest = Some(datom.element);
52 }
53
54 Self { positions, latest }
55 }
56
57 fn ensure_known(&self, element: ElementId) -> Result<usize, SidecarError> {
58 self.positions
59 .get(&element)
60 .copied()
61 .ok_or(SidecarError::UnknownJournalElement(element))
62 }
63
64 fn ensure_current_tail(&self, element: ElementId) -> Result<(), SidecarError> {
65 self.ensure_known(element)?;
66 match self.latest {
67 Some(current_tail) if current_tail == element => Ok(()),
68 Some(current_tail) => Err(SidecarError::NonCurrentJournalElement {
69 element,
70 current_tail,
71 }),
72 None => Err(SidecarError::UnknownJournalElement(element)),
73 }
74 }
75
76 fn position_of(&self, element: ElementId) -> Result<usize, SidecarError> {
77 self.ensure_known(element)
78 }
79
80 fn is_visible_at(
81 &self,
82 element: ElementId,
83 as_of: Option<ElementId>,
84 ) -> Result<bool, SidecarError> {
85 let position = self.position_of(element)?;
86 match as_of {
87 Some(cutoff) => Ok(position <= self.position_of(cutoff)?),
88 None => Ok(true),
89 }
90 }
91}
92
93#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
94#[serde(rename_all = "snake_case")]
95pub enum VectorMetric {
96 #[default]
97 Cosine,
98 DotProduct,
99}
100
101#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
102pub struct ArtifactReference {
103 pub sidecar_id: String,
104 pub artifact_id: String,
105 pub entity: EntityId,
106 pub uri: String,
107 pub media_type: String,
108 pub byte_length: u64,
109 pub digest: Option<String>,
110 pub metadata: BTreeMap<String, Value>,
111 pub provenance: DatomProvenance,
112 pub policy: Option<PolicyEnvelope>,
113 pub registered_at: ElementId,
114}
115
116#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
117pub struct VectorRecordMetadata {
118 pub sidecar_id: String,
119 pub vector_id: String,
120 pub entity: EntityId,
121 pub source_artifact_id: Option<String>,
122 pub embedding_ref: String,
123 pub dimensions: usize,
124 pub metric: VectorMetric,
125 pub metadata: BTreeMap<String, Value>,
126 pub provenance: DatomProvenance,
127 pub policy: Option<PolicyEnvelope>,
128 pub registered_at: ElementId,
129}
130
131#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
132pub struct VectorFactProjection {
133 pub predicate: PredicateRef,
134 pub query_entity: EntityId,
135}
136
137#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
138pub struct RegisterArtifactReferenceRequest {
139 pub reference: ArtifactReference,
140}
141
142#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
143pub struct RegisterArtifactReferenceResponse {
144 pub reference: ArtifactReference,
145}
146
147#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
148pub struct GetArtifactReferenceRequest {
149 pub sidecar_id: String,
150 pub artifact_id: String,
151 #[serde(default, skip_serializing_if = "Option::is_none")]
152 pub policy_context: Option<PolicyContext>,
153}
154
155#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
156pub struct GetArtifactReferenceResponse {
157 pub reference: ArtifactReference,
158}
159
160#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
161pub struct RegisterVectorRecordRequest {
162 pub record: VectorRecordMetadata,
163 pub embedding: Vec<f32>,
164}
165
166#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
167pub struct RegisterVectorRecordResponse {
168 pub record: VectorRecordMetadata,
169}
170
171#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
172pub struct SearchVectorsRequest {
173 pub sidecar_id: String,
174 pub query_embedding: Vec<f32>,
175 pub top_k: usize,
176 pub metric: VectorMetric,
177 pub as_of: Option<ElementId>,
178 pub projection: Option<VectorFactProjection>,
179 #[serde(default, skip_serializing_if = "Option::is_none")]
180 pub policy_context: Option<PolicyContext>,
181}
182
183#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
184pub struct VectorSearchMatch {
185 pub vector_id: String,
186 pub entity: EntityId,
187 pub source_artifact_id: Option<String>,
188 pub source_artifact_uri: Option<String>,
189 pub score: f64,
190 pub provenance: FactProvenance,
191 pub metadata: BTreeMap<String, Value>,
192 pub policy: Option<PolicyEnvelope>,
193}
194
195#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
196pub struct SearchVectorsResponse {
197 pub matches: Vec<VectorSearchMatch>,
198 pub facts: Vec<ExtensionalFact>,
199}
200
201#[derive(Clone, Debug, Default)]
202pub struct InMemorySidecarFederation {
203 artifacts: IndexMap<(String, String), ArtifactReference>,
204 vectors: IndexMap<(String, String), StoredVectorRecord>,
205 catalog_elements: BTreeSet<ElementId>,
206}
207
208#[derive(Debug)]
209pub struct SqliteSidecarFederation {
210 connection: Connection,
211 path: PathBuf,
212}
213
214#[derive(Clone, Debug)]
215struct StoredVectorRecord {
216 metadata: VectorRecordMetadata,
217 embedding: Vec<f32>,
218}
219
220pub fn sidecar_catalog_path_for_journal(journal_path: impl AsRef<Path>) -> PathBuf {
221 PathBuf::from(format!(
222 "{}.sidecars.sqlite",
223 journal_path.as_ref().display()
224 ))
225}
226
227impl SidecarFederation for InMemorySidecarFederation {
228 fn register_artifact_reference(
229 &mut self,
230 request: RegisterArtifactReferenceRequest,
231 journal: &JournalCatalog,
232 ) -> Result<RegisterArtifactReferenceResponse, SidecarError> {
233 self.ensure_catalog_element_is_available(request.reference.registered_at, journal)?;
234 let key = artifact_key(
235 &request.reference.sidecar_id,
236 &request.reference.artifact_id,
237 );
238 if self.artifacts.contains_key(&key) {
239 return Err(SidecarError::DuplicateArtifactId {
240 sidecar_id: request.reference.sidecar_id.clone(),
241 artifact_id: request.reference.artifact_id.clone(),
242 });
243 }
244
245 self.record_catalog_element(request.reference.registered_at);
246 self.artifacts.insert(key, request.reference.clone());
247 Ok(RegisterArtifactReferenceResponse {
248 reference: request.reference,
249 })
250 }
251
252 fn get_artifact_reference(
253 &self,
254 request: GetArtifactReferenceRequest,
255 ) -> Result<GetArtifactReferenceResponse, SidecarError> {
256 let key = artifact_key(&request.sidecar_id, &request.artifact_id);
257 let reference =
258 self.artifacts
259 .get(&key)
260 .cloned()
261 .ok_or(SidecarError::UnknownArtifactId {
262 sidecar_id: request.sidecar_id,
263 artifact_id: request.artifact_id,
264 })?;
265 ensure_policy_allows_artifact(&reference, request.policy_context.as_ref())?;
266 Ok(GetArtifactReferenceResponse { reference })
267 }
268
269 fn register_vector_record(
270 &mut self,
271 request: RegisterVectorRecordRequest,
272 journal: &JournalCatalog,
273 ) -> Result<RegisterVectorRecordResponse, SidecarError> {
274 self.ensure_catalog_element_is_available(request.record.registered_at, journal)?;
275 if request.record.dimensions != request.embedding.len() {
276 return Err(SidecarError::EmbeddingDimensionMismatch {
277 vector_id: request.record.vector_id.clone(),
278 expected: request.record.dimensions,
279 actual: request.embedding.len(),
280 });
281 }
282 if let Some(artifact_id) = &request.record.source_artifact_id {
283 let artifact_key = artifact_key(&request.record.sidecar_id, artifact_id);
284 let Some(artifact) = self.artifacts.get(&artifact_key) else {
285 return Err(SidecarError::UnknownArtifactId {
286 sidecar_id: request.record.sidecar_id.clone(),
287 artifact_id: artifact_id.clone(),
288 });
289 };
290 ensure_artifact_precedes_vector(artifact, &request.record, journal)?;
291 }
292
293 let key = vector_key(&request.record.sidecar_id, &request.record.vector_id);
294 if self.vectors.contains_key(&key) {
295 return Err(SidecarError::DuplicateVectorId {
296 sidecar_id: request.record.sidecar_id.clone(),
297 vector_id: request.record.vector_id.clone(),
298 });
299 }
300
301 self.record_catalog_element(request.record.registered_at);
302 self.vectors.insert(
303 key,
304 StoredVectorRecord {
305 metadata: request.record.clone(),
306 embedding: request.embedding,
307 },
308 );
309 Ok(RegisterVectorRecordResponse {
310 record: request.record,
311 })
312 }
313
314 fn search_vectors(
315 &self,
316 request: SearchVectorsRequest,
317 journal: &JournalCatalog,
318 ) -> Result<SearchVectorsResponse, SidecarError> {
319 validate_projection(request.projection.as_ref())?;
320 let mut records = Vec::new();
321 for record in self.vectors.values() {
322 if record.metadata.sidecar_id != request.sidecar_id
323 || record.metadata.metric != request.metric
324 || record.metadata.dimensions != request.query_embedding.len()
325 || !journal.is_visible_at(record.metadata.registered_at, request.as_of)?
326 || !policy_allows(
327 request.policy_context.as_ref(),
328 record.metadata.policy.as_ref(),
329 )
330 {
331 continue;
332 }
333
334 let artifact = record
335 .metadata
336 .source_artifact_id
337 .as_ref()
338 .and_then(|artifact_id| {
339 self.artifacts
340 .get(&artifact_key(&record.metadata.sidecar_id, artifact_id))
341 })
342 .cloned();
343 if artifact.as_ref().is_some_and(|artifact| {
344 !policy_allows(request.policy_context.as_ref(), artifact.policy.as_ref())
345 }) {
346 continue;
347 }
348 records.push((record.clone(), artifact));
349 }
350 build_search_response(request, records)
351 }
352}
353
354impl InMemorySidecarFederation {
355 fn ensure_catalog_element_is_available(
356 &self,
357 element: ElementId,
358 journal: &JournalCatalog,
359 ) -> Result<(), SidecarError> {
360 journal.ensure_current_tail(element)?;
361 if self.catalog_elements.contains(&element) {
362 return Err(SidecarError::DuplicateCatalogElement(element));
363 }
364 Ok(())
365 }
366
367 fn record_catalog_element(&mut self, element: ElementId) {
368 self.catalog_elements.insert(element);
369 }
370}
371
372impl SqliteSidecarFederation {
373 pub fn open(path: impl AsRef<Path>) -> Result<Self, SidecarError> {
374 let path = path.as_ref().to_path_buf();
375 if let Some(parent) = path.parent() {
376 fs::create_dir_all(parent)?;
377 }
378
379 let connection = Connection::open(&path)?;
380 initialize_sqlite_schema(&connection)?;
381
382 Ok(Self { connection, path })
383 }
384
385 pub fn path(&self) -> &Path {
386 &self.path
387 }
388
389 fn artifact_exists(&self, sidecar_id: &str, artifact_id: &str) -> Result<bool, SidecarError> {
390 Ok(self
391 .connection
392 .query_row(
393 "SELECT 1 FROM artifact_references WHERE sidecar_id = ?1 AND artifact_id = ?2",
394 params![sidecar_id, artifact_id],
395 |_row| Ok(()),
396 )
397 .optional()?
398 .is_some())
399 }
400
401 fn vector_exists(&self, sidecar_id: &str, vector_id: &str) -> Result<bool, SidecarError> {
402 Ok(self
403 .connection
404 .query_row(
405 "SELECT 1 FROM vector_records WHERE sidecar_id = ?1 AND vector_id = ?2",
406 params![sidecar_id, vector_id],
407 |_row| Ok(()),
408 )
409 .optional()?
410 .is_some())
411 }
412
413 fn catalog_element_exists(&self, element: ElementId) -> Result<bool, SidecarError> {
414 Ok(self
415 .connection
416 .query_row(
417 "SELECT 1 FROM sidecar_catalog WHERE element = ?1",
418 params![element_key(element)],
419 |_row| Ok(()),
420 )
421 .optional()?
422 .is_some())
423 }
424
425 fn lookup_artifact(
426 &self,
427 sidecar_id: &str,
428 artifact_id: &str,
429 ) -> Result<Option<ArtifactReference>, SidecarError> {
430 let json = self
431 .connection
432 .query_row(
433 "SELECT reference_json FROM artifact_references WHERE sidecar_id = ?1 AND artifact_id = ?2",
434 params![sidecar_id, artifact_id],
435 |row| row.get::<_, String>(0),
436 )
437 .optional()?;
438 json.map(|json| serde_json::from_str(&json))
439 .transpose()
440 .map_err(SidecarError::from)
441 }
442
443 fn visible_vector_records(
444 &self,
445 request: &SearchVectorsRequest,
446 journal: &JournalCatalog,
447 ) -> Result<Vec<(StoredVectorRecord, Option<ArtifactReference>)>, SidecarError> {
448 let mut statement = self.connection.prepare(
449 "SELECT metadata_json, embedding_json
450 FROM vector_records
451 WHERE sidecar_id = ?1",
452 )?;
453 let rows = statement.query_map(params![&request.sidecar_id], |row| {
454 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
455 })?;
456
457 let mut records = Vec::new();
458 for row in rows {
459 let (metadata_json, embedding_json) = row?;
460 let metadata: VectorRecordMetadata = serde_json::from_str(&metadata_json)?;
461 if metadata.metric != request.metric
462 || metadata.dimensions != request.query_embedding.len()
463 {
464 continue;
465 }
466 if !journal.is_visible_at(metadata.registered_at, request.as_of)? {
467 continue;
468 }
469 if !policy_allows(request.policy_context.as_ref(), metadata.policy.as_ref()) {
470 continue;
471 }
472 let embedding: Vec<f32> = serde_json::from_str(&embedding_json)?;
473 let artifact = match &metadata.source_artifact_id {
474 Some(artifact_id) => self.lookup_artifact(&metadata.sidecar_id, artifact_id)?,
475 None => None,
476 };
477 if artifact.as_ref().is_some_and(|artifact| {
478 !policy_allows(request.policy_context.as_ref(), artifact.policy.as_ref())
479 }) {
480 continue;
481 }
482 records.push((
483 StoredVectorRecord {
484 metadata,
485 embedding,
486 },
487 artifact,
488 ));
489 }
490 Ok(records)
491 }
492}
493
494impl SidecarFederation for SqliteSidecarFederation {
495 fn register_artifact_reference(
496 &mut self,
497 request: RegisterArtifactReferenceRequest,
498 journal: &JournalCatalog,
499 ) -> Result<RegisterArtifactReferenceResponse, SidecarError> {
500 journal.ensure_current_tail(request.reference.registered_at)?;
501 if self.catalog_element_exists(request.reference.registered_at)? {
502 return Err(SidecarError::DuplicateCatalogElement(
503 request.reference.registered_at,
504 ));
505 }
506 if self.artifact_exists(
507 &request.reference.sidecar_id,
508 &request.reference.artifact_id,
509 )? {
510 return Err(SidecarError::DuplicateArtifactId {
511 sidecar_id: request.reference.sidecar_id.clone(),
512 artifact_id: request.reference.artifact_id.clone(),
513 });
514 }
515
516 let transaction = self.connection.transaction()?;
517 transaction.execute(
518 "INSERT INTO sidecar_catalog (element, kind, sidecar_id, record_id)
519 VALUES (?1, ?2, ?3, ?4)",
520 params![
521 element_key(request.reference.registered_at),
522 "artifact",
523 &request.reference.sidecar_id,
524 &request.reference.artifact_id,
525 ],
526 )?;
527 transaction.execute(
528 "INSERT INTO artifact_references (sidecar_id, artifact_id, catalog_element, reference_json)
529 VALUES (?1, ?2, ?3, ?4)",
530 params![
531 &request.reference.sidecar_id,
532 &request.reference.artifact_id,
533 element_key(request.reference.registered_at),
534 serde_json::to_string(&request.reference)?,
535 ],
536 )?;
537 transaction.commit()?;
538
539 Ok(RegisterArtifactReferenceResponse {
540 reference: request.reference,
541 })
542 }
543
544 fn get_artifact_reference(
545 &self,
546 request: GetArtifactReferenceRequest,
547 ) -> Result<GetArtifactReferenceResponse, SidecarError> {
548 let Some(reference) = self.lookup_artifact(&request.sidecar_id, &request.artifact_id)?
549 else {
550 return Err(SidecarError::UnknownArtifactId {
551 sidecar_id: request.sidecar_id,
552 artifact_id: request.artifact_id,
553 });
554 };
555 ensure_policy_allows_artifact(&reference, request.policy_context.as_ref())?;
556 Ok(GetArtifactReferenceResponse { reference })
557 }
558
559 fn register_vector_record(
560 &mut self,
561 request: RegisterVectorRecordRequest,
562 journal: &JournalCatalog,
563 ) -> Result<RegisterVectorRecordResponse, SidecarError> {
564 journal.ensure_current_tail(request.record.registered_at)?;
565 if self.catalog_element_exists(request.record.registered_at)? {
566 return Err(SidecarError::DuplicateCatalogElement(
567 request.record.registered_at,
568 ));
569 }
570 if request.record.dimensions != request.embedding.len() {
571 return Err(SidecarError::EmbeddingDimensionMismatch {
572 vector_id: request.record.vector_id.clone(),
573 expected: request.record.dimensions,
574 actual: request.embedding.len(),
575 });
576 }
577 if let Some(artifact_id) = &request.record.source_artifact_id {
578 let Some(artifact) = self.lookup_artifact(&request.record.sidecar_id, artifact_id)?
579 else {
580 return Err(SidecarError::UnknownArtifactId {
581 sidecar_id: request.record.sidecar_id.clone(),
582 artifact_id: artifact_id.clone(),
583 });
584 };
585 ensure_artifact_precedes_vector(&artifact, &request.record, journal)?;
586 }
587 if self.vector_exists(&request.record.sidecar_id, &request.record.vector_id)? {
588 return Err(SidecarError::DuplicateVectorId {
589 sidecar_id: request.record.sidecar_id.clone(),
590 vector_id: request.record.vector_id.clone(),
591 });
592 }
593
594 let transaction = self.connection.transaction()?;
595 transaction.execute(
596 "INSERT INTO sidecar_catalog (element, kind, sidecar_id, record_id)
597 VALUES (?1, ?2, ?3, ?4)",
598 params![
599 element_key(request.record.registered_at),
600 "vector",
601 &request.record.sidecar_id,
602 &request.record.vector_id,
603 ],
604 )?;
605 transaction.execute(
606 "INSERT INTO vector_records (sidecar_id, vector_id, catalog_element, metadata_json, embedding_json)
607 VALUES (?1, ?2, ?3, ?4, ?5)",
608 params![
609 &request.record.sidecar_id,
610 &request.record.vector_id,
611 element_key(request.record.registered_at),
612 serde_json::to_string(&request.record)?,
613 serde_json::to_string(&request.embedding)?,
614 ],
615 )?;
616 transaction.commit()?;
617
618 Ok(RegisterVectorRecordResponse {
619 record: request.record,
620 })
621 }
622
623 fn search_vectors(
624 &self,
625 request: SearchVectorsRequest,
626 journal: &JournalCatalog,
627 ) -> Result<SearchVectorsResponse, SidecarError> {
628 validate_projection(request.projection.as_ref())?;
629 let records = self.visible_vector_records(&request, journal)?;
630 build_search_response(request, records)
631 }
632}
633
634fn initialize_sqlite_schema(connection: &Connection) -> Result<(), SidecarError> {
635 connection.execute_batch(
636 "
637 CREATE TABLE IF NOT EXISTS sidecar_catalog (
638 seq INTEGER PRIMARY KEY AUTOINCREMENT,
639 element TEXT NOT NULL UNIQUE,
640 kind TEXT NOT NULL,
641 sidecar_id TEXT NOT NULL,
642 record_id TEXT NOT NULL
643 );
644 CREATE TABLE IF NOT EXISTS artifact_references (
645 sidecar_id TEXT NOT NULL,
646 artifact_id TEXT NOT NULL,
647 catalog_element TEXT NOT NULL UNIQUE,
648 reference_json TEXT NOT NULL,
649 PRIMARY KEY (sidecar_id, artifact_id)
650 );
651 CREATE TABLE IF NOT EXISTS vector_records (
652 sidecar_id TEXT NOT NULL,
653 vector_id TEXT NOT NULL,
654 catalog_element TEXT NOT NULL UNIQUE,
655 metadata_json TEXT NOT NULL,
656 embedding_json TEXT NOT NULL,
657 PRIMARY KEY (sidecar_id, vector_id)
658 );
659 CREATE INDEX IF NOT EXISTS sidecar_catalog_by_element
660 ON sidecar_catalog(element);
661 CREATE INDEX IF NOT EXISTS artifact_references_by_sidecar
662 ON artifact_references(sidecar_id, artifact_id);
663 CREATE INDEX IF NOT EXISTS vector_records_by_sidecar
664 ON vector_records(sidecar_id, vector_id);
665 ",
666 )?;
667 Ok(())
668}
669
670fn artifact_key(sidecar_id: &str, artifact_id: &str) -> (String, String) {
671 (sidecar_id.to_string(), artifact_id.to_string())
672}
673
674fn vector_key(sidecar_id: &str, vector_id: &str) -> (String, String) {
675 (sidecar_id.to_string(), vector_id.to_string())
676}
677
678fn element_key(element: ElementId) -> String {
679 element.0.to_string()
680}
681
682fn validate_projection(projection: Option<&VectorFactProjection>) -> Result<(), SidecarError> {
683 if let Some(projection) = projection {
684 if projection.predicate.arity != 3 {
685 return Err(SidecarError::UnsupportedProjectionArity {
686 predicate: projection.predicate.name.clone(),
687 arity: projection.predicate.arity,
688 });
689 }
690 }
691 Ok(())
692}
693
694fn ensure_policy_allows_artifact(
695 reference: &ArtifactReference,
696 policy_context: Option<&PolicyContext>,
697) -> Result<(), SidecarError> {
698 if policy_allows(policy_context, reference.policy.as_ref()) {
699 Ok(())
700 } else {
701 Err(SidecarError::PolicyDenied {
702 subject: format!(
703 "artifact {}:{}",
704 reference.sidecar_id, reference.artifact_id
705 ),
706 })
707 }
708}
709
710fn build_search_response(
711 request: SearchVectorsRequest,
712 records: Vec<(StoredVectorRecord, Option<ArtifactReference>)>,
713) -> Result<SearchVectorsResponse, SidecarError> {
714 let mut matches = records
715 .into_iter()
716 .map(|(record, artifact)| {
717 let provenance = FactProvenance {
718 source_datom_ids: source_datom_ids_for_record(&record.metadata, artifact.as_ref()),
719 imported_cuts: Vec::new(),
720 sidecar_origin: Some(SidecarOrigin {
721 kind: SidecarKind::Vector,
722 sidecar_id: record.metadata.sidecar_id.clone(),
723 record_id: record.metadata.vector_id.clone(),
724 }),
725 source_ref: Some(artifact.as_ref().map_or_else(
726 || SourceRef {
727 uri: record.metadata.embedding_ref.clone(),
728 digest: None,
729 },
730 |artifact| SourceRef {
731 uri: artifact.uri.clone(),
732 digest: artifact.digest.clone(),
733 },
734 )),
735 };
736 VectorSearchMatch {
737 vector_id: record.metadata.vector_id.clone(),
738 entity: record.metadata.entity,
739 source_artifact_id: record.metadata.source_artifact_id.clone(),
740 source_artifact_uri: artifact.as_ref().map(|artifact| artifact.uri.clone()),
741 score: similarity_score(
742 request.metric,
743 &request.query_embedding,
744 &record.embedding,
745 ),
746 provenance,
747 metadata: record.metadata.metadata.clone(),
748 policy: merge_policy_envelopes([
749 record.metadata.policy.as_ref(),
750 artifact
751 .as_ref()
752 .and_then(|artifact| artifact.policy.as_ref()),
753 ]),
754 }
755 })
756 .collect::<Vec<_>>();
757
758 matches.sort_by(|left, right| {
759 right
760 .score
761 .partial_cmp(&left.score)
762 .unwrap_or(Ordering::Equal)
763 .then_with(|| left.vector_id.cmp(&right.vector_id))
764 });
765 matches.truncate(request.top_k.max(1));
766
767 let facts = match &request.projection {
768 Some(projection) => matches
769 .iter()
770 .map(|item| ExtensionalFact {
771 predicate: projection.predicate.clone(),
772 values: vec![
773 Value::Entity(projection.query_entity),
774 Value::Entity(item.entity),
775 Value::F64(item.score),
776 ],
777 policy: item.policy.clone(),
778 provenance: Some(item.provenance.clone()),
779 })
780 .collect(),
781 None => Vec::new(),
782 };
783
784 Ok(SearchVectorsResponse { matches, facts })
785}
786
787fn ensure_artifact_precedes_vector(
788 artifact: &ArtifactReference,
789 record: &VectorRecordMetadata,
790 journal: &JournalCatalog,
791) -> Result<(), SidecarError> {
792 let artifact_position = journal.position_of(artifact.registered_at)?;
793 let vector_position = journal.position_of(record.registered_at)?;
794 if artifact_position > vector_position {
795 return Err(SidecarError::SourceArtifactRegisteredAfterVector {
796 vector_id: record.vector_id.clone(),
797 artifact_id: artifact.artifact_id.clone(),
798 });
799 }
800 Ok(())
801}
802
803fn source_datom_ids_for_record(
804 record: &VectorRecordMetadata,
805 artifact: Option<&ArtifactReference>,
806) -> Vec<ElementId> {
807 let mut source_datom_ids = vec![record.registered_at];
808 if let Some(artifact) = artifact {
809 if artifact.registered_at != record.registered_at {
810 source_datom_ids.push(artifact.registered_at);
811 }
812 }
813 source_datom_ids
814}
815
816fn similarity_score(metric: VectorMetric, query: &[f32], candidate: &[f32]) -> f64 {
817 match metric {
818 VectorMetric::Cosine => cosine_similarity(query, candidate),
819 VectorMetric::DotProduct => dot_product(query, candidate),
820 }
821}
822
823fn cosine_similarity(query: &[f32], candidate: &[f32]) -> f64 {
824 let numerator = dot_product(query, candidate);
825 let query_norm = query
826 .iter()
827 .map(|value| f64::from(*value) * f64::from(*value))
828 .sum::<f64>()
829 .sqrt();
830 let candidate_norm = candidate
831 .iter()
832 .map(|value| f64::from(*value) * f64::from(*value))
833 .sum::<f64>()
834 .sqrt();
835 if query_norm == 0.0 || candidate_norm == 0.0 {
836 0.0
837 } else {
838 numerator / (query_norm * candidate_norm)
839 }
840}
841
842fn dot_product(query: &[f32], candidate: &[f32]) -> f64 {
843 query
844 .iter()
845 .zip(candidate)
846 .map(|(left, right)| f64::from(*left) * f64::from(*right))
847 .sum()
848}
849
850#[derive(Debug, Error)]
851pub enum SidecarError {
852 #[error("sidecar catalog already contains element {0}")]
853 DuplicateCatalogElement(ElementId),
854 #[error("journal does not contain element {0}")]
855 UnknownJournalElement(ElementId),
856 #[error(
857 "sidecar registration element {element} must match the current journal tail {current_tail}"
858 )]
859 NonCurrentJournalElement {
860 element: ElementId,
861 current_tail: ElementId,
862 },
863 #[error("sidecar {sidecar_id} already contains artifact {artifact_id}")]
864 DuplicateArtifactId {
865 sidecar_id: String,
866 artifact_id: String,
867 },
868 #[error("sidecar {sidecar_id} does not contain artifact {artifact_id}")]
869 UnknownArtifactId {
870 sidecar_id: String,
871 artifact_id: String,
872 },
873 #[error("sidecar {sidecar_id} already contains vector {vector_id}")]
874 DuplicateVectorId {
875 sidecar_id: String,
876 vector_id: String,
877 },
878 #[error("vector {vector_id} declared dimension {expected}, but received {actual}")]
879 EmbeddingDimensionMismatch {
880 vector_id: String,
881 expected: usize,
882 actual: usize,
883 },
884 #[error(
885 "vector {vector_id} cannot point at artifact {artifact_id} registered later in the journal"
886 )]
887 SourceArtifactRegisteredAfterVector {
888 vector_id: String,
889 artifact_id: String,
890 },
891 #[error("vector fact projection for predicate {predicate} requires arity 3, found {arity}")]
892 UnsupportedProjectionArity { predicate: String, arity: usize },
893 #[error("policy denied access to {subject}")]
894 PolicyDenied { subject: String },
895 #[error(transparent)]
896 Io(#[from] std::io::Error),
897 #[error(transparent)]
898 Sqlite(#[from] rusqlite::Error),
899 #[error(transparent)]
900 Serde(#[from] serde_json::Error),
901}
902
903#[cfg(test)]
904mod tests {
905 use super::{
906 ArtifactReference, GetArtifactReferenceRequest, InMemorySidecarFederation, JournalCatalog,
907 RegisterArtifactReferenceRequest, RegisterVectorRecordRequest, SearchVectorsRequest,
908 SidecarError, SidecarFederation, VectorFactProjection, VectorMetric, VectorRecordMetadata,
909 };
910 use aether_ast::{
911 AttributeId, Datom, DatomProvenance, ElementId, EntityId, OperationKind, PolicyContext,
912 PolicyEnvelope, PredicateId, PredicateRef, ReplicaId, SidecarKind, Value,
913 };
914 use std::collections::BTreeMap;
915
916 fn journal_catalog(elements: &[u64]) -> JournalCatalog {
917 JournalCatalog::from_history(
918 &elements
919 .iter()
920 .map(|element| Datom {
921 entity: EntityId::new(1),
922 attribute: AttributeId::new(1),
923 value: Value::String(format!("anchor-{element}")),
924 op: OperationKind::Annotate,
925 element: ElementId::new(*element),
926 replica: ReplicaId::new(1),
927 causal_context: Default::default(),
928 provenance: DatomProvenance::default(),
929 policy: None,
930 })
931 .collect::<Vec<_>>(),
932 )
933 }
934
935 #[test]
936 fn artifact_references_remain_external_metadata() {
937 let mut federation = InMemorySidecarFederation::default();
938 let journal = journal_catalog(&[1]);
939 let reference = ArtifactReference {
940 sidecar_id: "artifact-store".into(),
941 artifact_id: "artifact-1".into(),
942 entity: EntityId::new(10),
943 uri: "s3://aether/artifacts/artifact-1.pdf".into(),
944 media_type: "application/pdf".into(),
945 byte_length: 4096,
946 digest: Some("sha256:artifact-1".into()),
947 metadata: BTreeMap::from([("title".into(), Value::String("Runbook".into()))]),
948 provenance: DatomProvenance::default(),
949 policy: None,
950 registered_at: ElementId::new(1),
951 };
952
953 federation
954 .register_artifact_reference(
955 RegisterArtifactReferenceRequest {
956 reference: reference.clone(),
957 },
958 &journal,
959 )
960 .expect("register artifact reference");
961 let fetched = federation
962 .get_artifact_reference(GetArtifactReferenceRequest {
963 sidecar_id: "artifact-store".into(),
964 artifact_id: "artifact-1".into(),
965 policy_context: None,
966 })
967 .expect("fetch artifact reference")
968 .reference;
969
970 assert_eq!(fetched, reference);
971 assert_eq!(fetched.uri, "s3://aether/artifacts/artifact-1.pdf");
972 }
973
974 #[test]
975 fn vector_search_projects_provenance_bearing_semantic_facts() {
976 let mut federation = InMemorySidecarFederation::default();
977 let artifact_journal = journal_catalog(&[1]);
978 federation
979 .register_artifact_reference(
980 RegisterArtifactReferenceRequest {
981 reference: ArtifactReference {
982 sidecar_id: "vector-store".into(),
983 artifact_id: "doc-1".into(),
984 entity: EntityId::new(21),
985 uri: "s3://aether/docs/doc-1.md".into(),
986 media_type: "text/markdown".into(),
987 byte_length: 512,
988 digest: Some("sha256:doc-1".into()),
989 metadata: BTreeMap::new(),
990 provenance: DatomProvenance::default(),
991 policy: None,
992 registered_at: ElementId::new(1),
993 },
994 },
995 &artifact_journal,
996 )
997 .expect("register artifact");
998 let vector_journal = journal_catalog(&[1, 2]);
999 federation
1000 .register_vector_record(
1001 RegisterVectorRecordRequest {
1002 record: VectorRecordMetadata {
1003 sidecar_id: "vector-store".into(),
1004 vector_id: "vec-1".into(),
1005 entity: EntityId::new(21),
1006 source_artifact_id: Some("doc-1".into()),
1007 embedding_ref: "s3://aether/vectors/vec-1.bin".into(),
1008 dimensions: 3,
1009 metric: VectorMetric::Cosine,
1010 metadata: BTreeMap::from([(
1011 "topic".into(),
1012 Value::String("coordination".into()),
1013 )]),
1014 provenance: DatomProvenance::default(),
1015 policy: None,
1016 registered_at: ElementId::new(2),
1017 },
1018 embedding: vec![0.8, 0.1, 0.0],
1019 },
1020 &vector_journal,
1021 )
1022 .expect("register vector");
1023
1024 let response = federation
1025 .search_vectors(
1026 SearchVectorsRequest {
1027 sidecar_id: "vector-store".into(),
1028 query_embedding: vec![1.0, 0.0, 0.0],
1029 top_k: 1,
1030 metric: VectorMetric::Cosine,
1031 as_of: Some(ElementId::new(2)),
1032 projection: Some(VectorFactProjection {
1033 predicate: PredicateRef {
1034 id: PredicateId::new(90),
1035 name: "similar_document".into(),
1036 arity: 3,
1037 },
1038 query_entity: EntityId::new(1),
1039 }),
1040 policy_context: None,
1041 },
1042 &vector_journal,
1043 )
1044 .expect("search vectors");
1045
1046 assert_eq!(response.matches.len(), 1);
1047 assert_eq!(
1048 response.matches[0].source_artifact_uri.as_deref(),
1049 Some("s3://aether/docs/doc-1.md")
1050 );
1051 assert_eq!(
1052 response.matches[0]
1053 .provenance
1054 .sidecar_origin
1055 .as_ref()
1056 .map(|origin| origin.kind),
1057 Some(SidecarKind::Vector)
1058 );
1059 assert_eq!(
1060 response.facts[0]
1061 .provenance
1062 .as_ref()
1063 .expect("fact provenance")
1064 .source_datom_ids,
1065 vec![ElementId::new(2), ElementId::new(1)]
1066 );
1067 }
1068
1069 #[test]
1070 fn vector_search_respects_catalog_as_of_cut() {
1071 let mut federation = InMemorySidecarFederation::default();
1072 let first_journal = journal_catalog(&[1]);
1073 let second_journal = journal_catalog(&[1, 2]);
1074 for (vector_id, element, embedding) in
1075 [("vec-1", 1, vec![1.0, 0.0]), ("vec-2", 2, vec![0.0, 1.0])]
1076 {
1077 let journal = if element == 1 {
1078 &first_journal
1079 } else {
1080 &second_journal
1081 };
1082 federation
1083 .register_vector_record(
1084 RegisterVectorRecordRequest {
1085 record: VectorRecordMetadata {
1086 sidecar_id: "vector-store".into(),
1087 vector_id: vector_id.into(),
1088 entity: EntityId::new(element),
1089 source_artifact_id: None,
1090 embedding_ref: format!("mem://{}", vector_id),
1091 dimensions: 2,
1092 metric: VectorMetric::DotProduct,
1093 metadata: BTreeMap::new(),
1094 provenance: DatomProvenance::default(),
1095 policy: None,
1096 registered_at: ElementId::new(element),
1097 },
1098 embedding,
1099 },
1100 journal,
1101 )
1102 .expect("register vector");
1103 }
1104
1105 let current_journal = journal_catalog(&[1, 2]);
1106 let before_second = federation
1107 .search_vectors(
1108 SearchVectorsRequest {
1109 sidecar_id: "vector-store".into(),
1110 query_embedding: vec![0.0, 1.0],
1111 top_k: 4,
1112 metric: VectorMetric::DotProduct,
1113 as_of: Some(ElementId::new(1)),
1114 projection: None,
1115 policy_context: None,
1116 },
1117 ¤t_journal,
1118 )
1119 .expect("search vectors before second insert");
1120 assert_eq!(
1121 before_second
1122 .matches
1123 .iter()
1124 .map(|item| item.vector_id.as_str())
1125 .collect::<Vec<_>>(),
1126 vec!["vec-1"]
1127 );
1128
1129 let unknown = federation.search_vectors(
1130 SearchVectorsRequest {
1131 sidecar_id: "vector-store".into(),
1132 query_embedding: vec![0.0, 1.0],
1133 top_k: 4,
1134 metric: VectorMetric::DotProduct,
1135 as_of: Some(ElementId::new(9)),
1136 projection: None,
1137 policy_context: None,
1138 },
1139 ¤t_journal,
1140 );
1141 assert!(matches!(
1142 unknown,
1143 Err(SidecarError::UnknownJournalElement(id)) if id == ElementId::new(9)
1144 ));
1145 }
1146
1147 #[test]
1148 fn sidecar_registration_requires_the_current_journal_tail() {
1149 let mut federation = InMemorySidecarFederation::default();
1150 let journal = journal_catalog(&[1, 2]);
1151
1152 let result = federation.register_artifact_reference(
1153 RegisterArtifactReferenceRequest {
1154 reference: ArtifactReference {
1155 sidecar_id: "artifact-store".into(),
1156 artifact_id: "artifact-1".into(),
1157 entity: EntityId::new(10),
1158 uri: "s3://aether/artifacts/artifact-1.pdf".into(),
1159 media_type: "application/pdf".into(),
1160 byte_length: 4096,
1161 digest: Some("sha256:artifact-1".into()),
1162 metadata: BTreeMap::new(),
1163 provenance: DatomProvenance::default(),
1164 policy: None,
1165 registered_at: ElementId::new(1),
1166 },
1167 },
1168 &journal,
1169 );
1170
1171 assert!(matches!(
1172 result,
1173 Err(SidecarError::NonCurrentJournalElement {
1174 element,
1175 current_tail,
1176 }) if element == ElementId::new(1) && current_tail == ElementId::new(2)
1177 ));
1178 }
1179
1180 #[test]
1181 fn sidecar_reads_and_searches_respect_policy_context() {
1182 let mut federation = InMemorySidecarFederation::default();
1183 let artifact_journal = journal_catalog(&[1]);
1184 federation
1185 .register_artifact_reference(
1186 RegisterArtifactReferenceRequest {
1187 reference: ArtifactReference {
1188 sidecar_id: "vector-store".into(),
1189 artifact_id: "doc-1".into(),
1190 entity: EntityId::new(21),
1191 uri: "s3://aether/docs/doc-1.md".into(),
1192 media_type: "text/markdown".into(),
1193 byte_length: 512,
1194 digest: Some("sha256:doc-1".into()),
1195 metadata: BTreeMap::new(),
1196 provenance: DatomProvenance::default(),
1197 policy: Some(PolicyEnvelope {
1198 capabilities: vec!["memory_reader".into()],
1199 visibilities: Vec::new(),
1200 }),
1201 registered_at: ElementId::new(1),
1202 },
1203 },
1204 &artifact_journal,
1205 )
1206 .expect("register artifact");
1207 let vector_journal = journal_catalog(&[1, 2]);
1208 federation
1209 .register_vector_record(
1210 RegisterVectorRecordRequest {
1211 record: VectorRecordMetadata {
1212 sidecar_id: "vector-store".into(),
1213 vector_id: "vec-1".into(),
1214 entity: EntityId::new(21),
1215 source_artifact_id: Some("doc-1".into()),
1216 embedding_ref: "s3://aether/vectors/vec-1.bin".into(),
1217 dimensions: 3,
1218 metric: VectorMetric::Cosine,
1219 metadata: BTreeMap::new(),
1220 provenance: DatomProvenance::default(),
1221 policy: Some(PolicyEnvelope {
1222 capabilities: vec!["memory_reader".into()],
1223 visibilities: Vec::new(),
1224 }),
1225 registered_at: ElementId::new(2),
1226 },
1227 embedding: vec![0.8, 0.1, 0.0],
1228 },
1229 &vector_journal,
1230 )
1231 .expect("register vector");
1232
1233 let denied = federation.get_artifact_reference(GetArtifactReferenceRequest {
1234 sidecar_id: "vector-store".into(),
1235 artifact_id: "doc-1".into(),
1236 policy_context: None,
1237 });
1238 assert!(matches!(denied, Err(SidecarError::PolicyDenied { .. })));
1239
1240 let allowed = federation
1241 .get_artifact_reference(GetArtifactReferenceRequest {
1242 sidecar_id: "vector-store".into(),
1243 artifact_id: "doc-1".into(),
1244 policy_context: Some(PolicyContext {
1245 capabilities: vec!["memory_reader".into()],
1246 visibilities: Vec::new(),
1247 }),
1248 })
1249 .expect("fetch allowed artifact");
1250 assert_eq!(allowed.reference.artifact_id, "doc-1");
1251
1252 let hidden_search = federation
1253 .search_vectors(
1254 SearchVectorsRequest {
1255 sidecar_id: "vector-store".into(),
1256 query_embedding: vec![1.0, 0.0, 0.0],
1257 top_k: 1,
1258 metric: VectorMetric::Cosine,
1259 as_of: Some(ElementId::new(2)),
1260 projection: None,
1261 policy_context: None,
1262 },
1263 &vector_journal,
1264 )
1265 .expect("search without policy");
1266 assert!(hidden_search.matches.is_empty());
1267
1268 let visible_search = federation
1269 .search_vectors(
1270 SearchVectorsRequest {
1271 sidecar_id: "vector-store".into(),
1272 query_embedding: vec![1.0, 0.0, 0.0],
1273 top_k: 1,
1274 metric: VectorMetric::Cosine,
1275 as_of: Some(ElementId::new(2)),
1276 projection: None,
1277 policy_context: Some(PolicyContext {
1278 capabilities: vec!["memory_reader".into()],
1279 visibilities: Vec::new(),
1280 }),
1281 },
1282 &vector_journal,
1283 )
1284 .expect("search with policy");
1285 assert_eq!(visible_search.matches.len(), 1);
1286 }
1287}