aether_storage/
lib.rs

1use aether_ast::{Datom, ElementId};
2use rusqlite::{params, Connection, ErrorCode, OptionalExtension};
3use serde::{Deserialize, Serialize};
4use std::{
5    collections::BTreeSet,
6    fs,
7    path::{Path, PathBuf},
8};
9use thiserror::Error;
10
11pub trait Journal {
12    fn append(&mut self, datoms: &[Datom]) -> Result<(), JournalError>;
13    fn history(&self) -> Result<Vec<Datom>, JournalError>;
14    fn prefix(&self, at: &ElementId) -> Result<Vec<Datom>, JournalError>;
15}
16
17#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
18pub struct JournalSnapshot {
19    pub entries: Vec<Datom>,
20}
21
22#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
23pub struct InMemoryJournal {
24    entries: Vec<Datom>,
25}
26
27impl InMemoryJournal {
28    pub fn new() -> Self {
29        Self::default()
30    }
31}
32
33impl Journal for InMemoryJournal {
34    fn append(&mut self, datoms: &[Datom]) -> Result<(), JournalError> {
35        let mut batch_ids = BTreeSet::new();
36        for datom in datoms {
37            if self
38                .entries
39                .iter()
40                .any(|existing| existing.element == datom.element)
41                || !batch_ids.insert(datom.element)
42            {
43                return Err(JournalError::DuplicateElementId(datom.element));
44            }
45        }
46
47        self.entries.extend(datoms.iter().cloned());
48        Ok(())
49    }
50
51    fn history(&self) -> Result<Vec<Datom>, JournalError> {
52        Ok(self.entries.clone())
53    }
54
55    fn prefix(&self, at: &ElementId) -> Result<Vec<Datom>, JournalError> {
56        let end = self
57            .entries
58            .iter()
59            .position(|datom| datom.element == *at)
60            .ok_or(JournalError::UnknownElementId(*at))?;
61        Ok(self.entries[..=end].to_vec())
62    }
63}
64
65#[derive(Debug)]
66pub struct SqliteJournal {
67    connection: Connection,
68    path: PathBuf,
69}
70
71impl SqliteJournal {
72    pub fn open(path: impl AsRef<Path>) -> Result<Self, JournalError> {
73        let path = path.as_ref().to_path_buf();
74        if let Some(parent) = path.parent() {
75            fs::create_dir_all(parent)?;
76        }
77
78        let connection = Connection::open(&path)?;
79        configure_connection(&connection)?;
80        initialize_schema(&connection)?;
81
82        Ok(Self { connection, path })
83    }
84
85    pub fn path(&self) -> &Path {
86        &self.path
87    }
88}
89
90impl Journal for SqliteJournal {
91    fn append(&mut self, datoms: &[Datom]) -> Result<(), JournalError> {
92        let mut batch_ids = BTreeSet::new();
93        for datom in datoms {
94            if !batch_ids.insert(datom.element) {
95                return Err(JournalError::DuplicateElementId(datom.element));
96            }
97        }
98
99        let transaction = self.connection.transaction()?;
100        for datom in datoms {
101            let element = element_key(&datom.element);
102            let encoded = serde_json::to_string(datom)?;
103            let inserted = transaction.execute(
104                "INSERT INTO journal_entries (element, datom_json) VALUES (?1, ?2)",
105                params![element, encoded],
106            );
107            if let Err(error) = inserted {
108                return Err(map_insert_error(error, datom.element));
109            }
110        }
111        transaction.commit()?;
112        Ok(())
113    }
114
115    fn history(&self) -> Result<Vec<Datom>, JournalError> {
116        read_datoms(
117            &self.connection,
118            "SELECT datom_json FROM journal_entries ORDER BY seq ASC",
119            params![],
120        )
121    }
122
123    fn prefix(&self, at: &ElementId) -> Result<Vec<Datom>, JournalError> {
124        let Some(seq) = self
125            .connection
126            .query_row(
127                "SELECT seq FROM journal_entries WHERE element = ?1",
128                params![element_key(at)],
129                |row| row.get::<_, i64>(0),
130            )
131            .optional()?
132        else {
133            return Err(JournalError::UnknownElementId(*at));
134        };
135
136        read_datoms(
137            &self.connection,
138            "SELECT datom_json FROM journal_entries WHERE seq <= ?1 ORDER BY seq ASC",
139            params![seq],
140        )
141    }
142}
143
144fn configure_connection(connection: &Connection) -> Result<(), JournalError> {
145    connection.pragma_update(None, "journal_mode", "WAL")?;
146    connection.pragma_update(None, "synchronous", "NORMAL")?;
147    Ok(())
148}
149
150fn initialize_schema(connection: &Connection) -> Result<(), JournalError> {
151    connection.execute_batch(
152        "
153        CREATE TABLE IF NOT EXISTS journal_entries (
154            seq INTEGER PRIMARY KEY AUTOINCREMENT,
155            element TEXT NOT NULL UNIQUE,
156            datom_json TEXT NOT NULL
157        );
158        CREATE INDEX IF NOT EXISTS journal_entries_by_seq
159            ON journal_entries(seq);
160        ",
161    )?;
162    Ok(())
163}
164
165fn read_datoms<P>(connection: &Connection, sql: &str, params: P) -> Result<Vec<Datom>, JournalError>
166where
167    P: rusqlite::Params,
168{
169    let mut statement = connection.prepare(sql)?;
170    let rows = statement.query_map(params, |row| row.get::<_, String>(0))?;
171
172    let mut datoms = Vec::new();
173    for row in rows {
174        datoms.push(serde_json::from_str(&row?)?);
175    }
176    Ok(datoms)
177}
178
179fn element_key(element: &ElementId) -> String {
180    element.0.to_string()
181}
182
183fn map_insert_error(error: rusqlite::Error, element: ElementId) -> JournalError {
184    match error {
185        rusqlite::Error::SqliteFailure(details, _)
186            if details.code == ErrorCode::ConstraintViolation =>
187        {
188            JournalError::DuplicateElementId(element)
189        }
190        other => JournalError::Sqlite(other),
191    }
192}
193
194#[derive(Debug, Error)]
195pub enum JournalError {
196    #[error("duplicate element id {0}")]
197    DuplicateElementId(ElementId),
198    #[error("unknown element id {0}")]
199    UnknownElementId(ElementId),
200    #[error(transparent)]
201    Io(#[from] std::io::Error),
202    #[error(transparent)]
203    Sqlite(#[from] rusqlite::Error),
204    #[error(transparent)]
205    Serde(#[from] serde_json::Error),
206}
207
208#[cfg(test)]
209mod tests {
210    use super::{InMemoryJournal, Journal, JournalError, SqliteJournal};
211    use aether_ast::{
212        AttributeId, Datom, DatomProvenance, ElementId, EntityId, OperationKind, ReplicaId, Value,
213    };
214    use std::{
215        path::{Path, PathBuf},
216        sync::atomic::{AtomicU64, Ordering},
217        time::{SystemTime, UNIX_EPOCH},
218    };
219
220    static NEXT_TEST_ID: AtomicU64 = AtomicU64::new(1);
221
222    fn sample_datom(element: u64, value: &str) -> Datom {
223        Datom {
224            entity: EntityId::new(1),
225            attribute: AttributeId::new(2),
226            value: Value::String(value.into()),
227            op: OperationKind::Assert,
228            element: ElementId::new(element),
229            replica: ReplicaId::new(1),
230            causal_context: Default::default(),
231            provenance: DatomProvenance::default(),
232            policy: None,
233        }
234    }
235
236    #[test]
237    fn append_preserves_order_and_history() {
238        let mut journal = InMemoryJournal::new();
239        journal
240            .append(&[sample_datom(1, "a"), sample_datom(2, "b")])
241            .expect("append entries");
242
243        let history = journal.history().expect("history");
244        assert_eq!(history.len(), 2);
245        assert_eq!(history[0].element, ElementId::new(1));
246        assert_eq!(history[1].element, ElementId::new(2));
247    }
248
249    #[test]
250    fn append_rejects_duplicates_without_partial_writes() {
251        let mut journal = InMemoryJournal::new();
252        journal
253            .append(&[sample_datom(1, "seed")])
254            .expect("append seed");
255
256        let duplicate = journal.append(&[sample_datom(2, "next"), sample_datom(2, "dupe")]);
257        assert!(matches!(
258            duplicate,
259            Err(JournalError::DuplicateElementId(id)) if id == ElementId::new(2)
260        ));
261
262        let history = journal.history().expect("history");
263        assert_eq!(history.len(), 1);
264        assert_eq!(history[0].element, ElementId::new(1));
265    }
266
267    #[test]
268    fn prefix_returns_inclusive_journal_prefix() {
269        let mut journal = InMemoryJournal::new();
270        journal
271            .append(&[
272                sample_datom(1, "a"),
273                sample_datom(2, "b"),
274                sample_datom(3, "c"),
275            ])
276            .expect("append entries");
277
278        let prefix = journal.prefix(&ElementId::new(2)).expect("prefix");
279        assert_eq!(prefix.len(), 2);
280        assert_eq!(prefix[0].element, ElementId::new(1));
281        assert_eq!(prefix[1].element, ElementId::new(2));
282    }
283
284    #[test]
285    fn prefix_reports_unknown_elements() {
286        let journal = InMemoryJournal::new();
287        assert!(matches!(
288            journal.prefix(&ElementId::new(9)),
289            Err(JournalError::UnknownElementId(id)) if id == ElementId::new(9)
290        ));
291    }
292
293    #[test]
294    fn sqlite_journal_replays_history_after_restart() {
295        let temp = TestDbPath::new("history");
296        {
297            let mut journal = SqliteJournal::open(temp.path()).expect("open sqlite journal");
298            journal
299                .append(&[
300                    sample_datom(1, "alpha"),
301                    sample_datom(3, "beta"),
302                    sample_datom(9, "gamma"),
303                ])
304                .expect("append sqlite entries");
305        }
306
307        let journal = SqliteJournal::open(temp.path()).expect("reopen sqlite journal");
308        let history = journal.history().expect("history");
309        assert_eq!(
310            history
311                .iter()
312                .map(|datom| datom.element.0)
313                .collect::<Vec<_>>(),
314            vec![1, 3, 9]
315        );
316    }
317
318    #[test]
319    fn sqlite_journal_prefix_is_inclusive_by_append_order() {
320        let temp = TestDbPath::new("prefix");
321        let mut journal = SqliteJournal::open(temp.path()).expect("open sqlite journal");
322        journal
323            .append(&[
324                sample_datom(10, "first"),
325                sample_datom(3, "second"),
326                sample_datom(7, "third"),
327            ])
328            .expect("append sqlite entries");
329
330        let prefix = journal.prefix(&ElementId::new(3)).expect("prefix");
331        assert_eq!(
332            prefix
333                .iter()
334                .map(|datom| datom.element.0)
335                .collect::<Vec<_>>(),
336            vec![10, 3]
337        );
338    }
339
340    #[test]
341    fn sqlite_journal_rejects_duplicates_without_partial_writes() {
342        let temp = TestDbPath::new("duplicates");
343        let mut journal = SqliteJournal::open(temp.path()).expect("open sqlite journal");
344        journal
345            .append(&[sample_datom(1, "seed")])
346            .expect("append seed");
347
348        let duplicate = journal.append(&[sample_datom(2, "next"), sample_datom(2, "dupe")]);
349        assert!(matches!(
350            duplicate,
351            Err(JournalError::DuplicateElementId(id)) if id == ElementId::new(2)
352        ));
353
354        let history = journal.history().expect("history");
355        assert_eq!(
356            history
357                .iter()
358                .map(|datom| datom.element.0)
359                .collect::<Vec<_>>(),
360            vec![1]
361        );
362    }
363
364    #[test]
365    fn sqlite_journal_detects_existing_duplicate_elements() {
366        let temp = TestDbPath::new("existing-duplicate");
367        let mut journal = SqliteJournal::open(temp.path()).expect("open sqlite journal");
368        journal
369            .append(&[sample_datom(1, "seed")])
370            .expect("append seed");
371
372        let duplicate = journal.append(&[sample_datom(2, "next"), sample_datom(1, "dupe")]);
373        assert!(matches!(
374            duplicate,
375            Err(JournalError::DuplicateElementId(id)) if id == ElementId::new(1)
376        ));
377
378        let history = journal.history().expect("history");
379        assert_eq!(
380            history
381                .iter()
382                .map(|datom| datom.element.0)
383                .collect::<Vec<_>>(),
384            vec![1]
385        );
386    }
387
388    struct TestDbPath {
389        path: PathBuf,
390    }
391
392    impl TestDbPath {
393        fn new(name: &str) -> Self {
394            let unique = NEXT_TEST_ID.fetch_add(1, Ordering::Relaxed);
395            let nanos = SystemTime::now()
396                .duration_since(UNIX_EPOCH)
397                .expect("system time")
398                .as_nanos();
399            let mut path = std::env::temp_dir();
400            path.push(format!("aether-storage-{name}-{nanos}-{unique}.sqlite"));
401            Self { path }
402        }
403
404        fn path(&self) -> &Path {
405            &self.path
406        }
407    }
408
409    impl Drop for TestDbPath {
410        fn drop(&mut self) {
411            let _ = std::fs::remove_file(&self.path);
412
413            let wal = PathBuf::from(format!("{}-wal", self.path.display()));
414            let shm = PathBuf::from(format!("{}-shm", self.path.display()));
415            let _ = std::fs::remove_file(wal);
416            let _ = std::fs::remove_file(shm);
417        }
418    }
419}