1use aether_ast::{Datom, ElementId};
2use rusqlite::{params, Connection, ErrorCode, OptionalExtension};
3use serde::{Deserialize, Serialize};
4use std::{
5 collections::BTreeSet,
6 fs,
7 path::{Path, PathBuf},
8};
9use thiserror::Error;
10
11pub trait Journal {
12 fn append(&mut self, datoms: &[Datom]) -> Result<(), JournalError>;
13 fn history(&self) -> Result<Vec<Datom>, JournalError>;
14 fn prefix(&self, at: &ElementId) -> Result<Vec<Datom>, JournalError>;
15}
16
17#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
18pub struct JournalSnapshot {
19 pub entries: Vec<Datom>,
20}
21
22#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
23pub struct InMemoryJournal {
24 entries: Vec<Datom>,
25}
26
27impl InMemoryJournal {
28 pub fn new() -> Self {
29 Self::default()
30 }
31}
32
33impl Journal for InMemoryJournal {
34 fn append(&mut self, datoms: &[Datom]) -> Result<(), JournalError> {
35 let mut batch_ids = BTreeSet::new();
36 for datom in datoms {
37 if self
38 .entries
39 .iter()
40 .any(|existing| existing.element == datom.element)
41 || !batch_ids.insert(datom.element)
42 {
43 return Err(JournalError::DuplicateElementId(datom.element));
44 }
45 }
46
47 self.entries.extend(datoms.iter().cloned());
48 Ok(())
49 }
50
51 fn history(&self) -> Result<Vec<Datom>, JournalError> {
52 Ok(self.entries.clone())
53 }
54
55 fn prefix(&self, at: &ElementId) -> Result<Vec<Datom>, JournalError> {
56 let end = self
57 .entries
58 .iter()
59 .position(|datom| datom.element == *at)
60 .ok_or(JournalError::UnknownElementId(*at))?;
61 Ok(self.entries[..=end].to_vec())
62 }
63}
64
65#[derive(Debug)]
66pub struct SqliteJournal {
67 connection: Connection,
68 path: PathBuf,
69}
70
71impl SqliteJournal {
72 pub fn open(path: impl AsRef<Path>) -> Result<Self, JournalError> {
73 let path = path.as_ref().to_path_buf();
74 if let Some(parent) = path.parent() {
75 fs::create_dir_all(parent)?;
76 }
77
78 let connection = Connection::open(&path)?;
79 configure_connection(&connection)?;
80 initialize_schema(&connection)?;
81
82 Ok(Self { connection, path })
83 }
84
85 pub fn path(&self) -> &Path {
86 &self.path
87 }
88}
89
90impl Journal for SqliteJournal {
91 fn append(&mut self, datoms: &[Datom]) -> Result<(), JournalError> {
92 let mut batch_ids = BTreeSet::new();
93 for datom in datoms {
94 if !batch_ids.insert(datom.element) {
95 return Err(JournalError::DuplicateElementId(datom.element));
96 }
97 }
98
99 let transaction = self.connection.transaction()?;
100 for datom in datoms {
101 let element = element_key(&datom.element);
102 let encoded = serde_json::to_string(datom)?;
103 let inserted = transaction.execute(
104 "INSERT INTO journal_entries (element, datom_json) VALUES (?1, ?2)",
105 params![element, encoded],
106 );
107 if let Err(error) = inserted {
108 return Err(map_insert_error(error, datom.element));
109 }
110 }
111 transaction.commit()?;
112 Ok(())
113 }
114
115 fn history(&self) -> Result<Vec<Datom>, JournalError> {
116 read_datoms(
117 &self.connection,
118 "SELECT datom_json FROM journal_entries ORDER BY seq ASC",
119 params![],
120 )
121 }
122
123 fn prefix(&self, at: &ElementId) -> Result<Vec<Datom>, JournalError> {
124 let Some(seq) = self
125 .connection
126 .query_row(
127 "SELECT seq FROM journal_entries WHERE element = ?1",
128 params![element_key(at)],
129 |row| row.get::<_, i64>(0),
130 )
131 .optional()?
132 else {
133 return Err(JournalError::UnknownElementId(*at));
134 };
135
136 read_datoms(
137 &self.connection,
138 "SELECT datom_json FROM journal_entries WHERE seq <= ?1 ORDER BY seq ASC",
139 params![seq],
140 )
141 }
142}
143
144fn configure_connection(connection: &Connection) -> Result<(), JournalError> {
145 connection.pragma_update(None, "journal_mode", "WAL")?;
146 connection.pragma_update(None, "synchronous", "NORMAL")?;
147 Ok(())
148}
149
150fn initialize_schema(connection: &Connection) -> Result<(), JournalError> {
151 connection.execute_batch(
152 "
153 CREATE TABLE IF NOT EXISTS journal_entries (
154 seq INTEGER PRIMARY KEY AUTOINCREMENT,
155 element TEXT NOT NULL UNIQUE,
156 datom_json TEXT NOT NULL
157 );
158 CREATE INDEX IF NOT EXISTS journal_entries_by_seq
159 ON journal_entries(seq);
160 ",
161 )?;
162 Ok(())
163}
164
165fn read_datoms<P>(connection: &Connection, sql: &str, params: P) -> Result<Vec<Datom>, JournalError>
166where
167 P: rusqlite::Params,
168{
169 let mut statement = connection.prepare(sql)?;
170 let rows = statement.query_map(params, |row| row.get::<_, String>(0))?;
171
172 let mut datoms = Vec::new();
173 for row in rows {
174 datoms.push(serde_json::from_str(&row?)?);
175 }
176 Ok(datoms)
177}
178
179fn element_key(element: &ElementId) -> String {
180 element.0.to_string()
181}
182
183fn map_insert_error(error: rusqlite::Error, element: ElementId) -> JournalError {
184 match error {
185 rusqlite::Error::SqliteFailure(details, _)
186 if details.code == ErrorCode::ConstraintViolation =>
187 {
188 JournalError::DuplicateElementId(element)
189 }
190 other => JournalError::Sqlite(other),
191 }
192}
193
194#[derive(Debug, Error)]
195pub enum JournalError {
196 #[error("duplicate element id {0}")]
197 DuplicateElementId(ElementId),
198 #[error("unknown element id {0}")]
199 UnknownElementId(ElementId),
200 #[error(transparent)]
201 Io(#[from] std::io::Error),
202 #[error(transparent)]
203 Sqlite(#[from] rusqlite::Error),
204 #[error(transparent)]
205 Serde(#[from] serde_json::Error),
206}
207
208#[cfg(test)]
209mod tests {
210 use super::{InMemoryJournal, Journal, JournalError, SqliteJournal};
211 use aether_ast::{
212 AttributeId, Datom, DatomProvenance, ElementId, EntityId, OperationKind, ReplicaId, Value,
213 };
214 use std::{
215 path::{Path, PathBuf},
216 sync::atomic::{AtomicU64, Ordering},
217 time::{SystemTime, UNIX_EPOCH},
218 };
219
220 static NEXT_TEST_ID: AtomicU64 = AtomicU64::new(1);
221
222 fn sample_datom(element: u64, value: &str) -> Datom {
223 Datom {
224 entity: EntityId::new(1),
225 attribute: AttributeId::new(2),
226 value: Value::String(value.into()),
227 op: OperationKind::Assert,
228 element: ElementId::new(element),
229 replica: ReplicaId::new(1),
230 causal_context: Default::default(),
231 provenance: DatomProvenance::default(),
232 policy: None,
233 }
234 }
235
236 #[test]
237 fn append_preserves_order_and_history() {
238 let mut journal = InMemoryJournal::new();
239 journal
240 .append(&[sample_datom(1, "a"), sample_datom(2, "b")])
241 .expect("append entries");
242
243 let history = journal.history().expect("history");
244 assert_eq!(history.len(), 2);
245 assert_eq!(history[0].element, ElementId::new(1));
246 assert_eq!(history[1].element, ElementId::new(2));
247 }
248
249 #[test]
250 fn append_rejects_duplicates_without_partial_writes() {
251 let mut journal = InMemoryJournal::new();
252 journal
253 .append(&[sample_datom(1, "seed")])
254 .expect("append seed");
255
256 let duplicate = journal.append(&[sample_datom(2, "next"), sample_datom(2, "dupe")]);
257 assert!(matches!(
258 duplicate,
259 Err(JournalError::DuplicateElementId(id)) if id == ElementId::new(2)
260 ));
261
262 let history = journal.history().expect("history");
263 assert_eq!(history.len(), 1);
264 assert_eq!(history[0].element, ElementId::new(1));
265 }
266
267 #[test]
268 fn prefix_returns_inclusive_journal_prefix() {
269 let mut journal = InMemoryJournal::new();
270 journal
271 .append(&[
272 sample_datom(1, "a"),
273 sample_datom(2, "b"),
274 sample_datom(3, "c"),
275 ])
276 .expect("append entries");
277
278 let prefix = journal.prefix(&ElementId::new(2)).expect("prefix");
279 assert_eq!(prefix.len(), 2);
280 assert_eq!(prefix[0].element, ElementId::new(1));
281 assert_eq!(prefix[1].element, ElementId::new(2));
282 }
283
284 #[test]
285 fn prefix_reports_unknown_elements() {
286 let journal = InMemoryJournal::new();
287 assert!(matches!(
288 journal.prefix(&ElementId::new(9)),
289 Err(JournalError::UnknownElementId(id)) if id == ElementId::new(9)
290 ));
291 }
292
293 #[test]
294 fn sqlite_journal_replays_history_after_restart() {
295 let temp = TestDbPath::new("history");
296 {
297 let mut journal = SqliteJournal::open(temp.path()).expect("open sqlite journal");
298 journal
299 .append(&[
300 sample_datom(1, "alpha"),
301 sample_datom(3, "beta"),
302 sample_datom(9, "gamma"),
303 ])
304 .expect("append sqlite entries");
305 }
306
307 let journal = SqliteJournal::open(temp.path()).expect("reopen sqlite journal");
308 let history = journal.history().expect("history");
309 assert_eq!(
310 history
311 .iter()
312 .map(|datom| datom.element.0)
313 .collect::<Vec<_>>(),
314 vec![1, 3, 9]
315 );
316 }
317
318 #[test]
319 fn sqlite_journal_prefix_is_inclusive_by_append_order() {
320 let temp = TestDbPath::new("prefix");
321 let mut journal = SqliteJournal::open(temp.path()).expect("open sqlite journal");
322 journal
323 .append(&[
324 sample_datom(10, "first"),
325 sample_datom(3, "second"),
326 sample_datom(7, "third"),
327 ])
328 .expect("append sqlite entries");
329
330 let prefix = journal.prefix(&ElementId::new(3)).expect("prefix");
331 assert_eq!(
332 prefix
333 .iter()
334 .map(|datom| datom.element.0)
335 .collect::<Vec<_>>(),
336 vec![10, 3]
337 );
338 }
339
340 #[test]
341 fn sqlite_journal_rejects_duplicates_without_partial_writes() {
342 let temp = TestDbPath::new("duplicates");
343 let mut journal = SqliteJournal::open(temp.path()).expect("open sqlite journal");
344 journal
345 .append(&[sample_datom(1, "seed")])
346 .expect("append seed");
347
348 let duplicate = journal.append(&[sample_datom(2, "next"), sample_datom(2, "dupe")]);
349 assert!(matches!(
350 duplicate,
351 Err(JournalError::DuplicateElementId(id)) if id == ElementId::new(2)
352 ));
353
354 let history = journal.history().expect("history");
355 assert_eq!(
356 history
357 .iter()
358 .map(|datom| datom.element.0)
359 .collect::<Vec<_>>(),
360 vec![1]
361 );
362 }
363
364 #[test]
365 fn sqlite_journal_detects_existing_duplicate_elements() {
366 let temp = TestDbPath::new("existing-duplicate");
367 let mut journal = SqliteJournal::open(temp.path()).expect("open sqlite journal");
368 journal
369 .append(&[sample_datom(1, "seed")])
370 .expect("append seed");
371
372 let duplicate = journal.append(&[sample_datom(2, "next"), sample_datom(1, "dupe")]);
373 assert!(matches!(
374 duplicate,
375 Err(JournalError::DuplicateElementId(id)) if id == ElementId::new(1)
376 ));
377
378 let history = journal.history().expect("history");
379 assert_eq!(
380 history
381 .iter()
382 .map(|datom| datom.element.0)
383 .collect::<Vec<_>>(),
384 vec![1]
385 );
386 }
387
388 struct TestDbPath {
389 path: PathBuf,
390 }
391
392 impl TestDbPath {
393 fn new(name: &str) -> Self {
394 let unique = NEXT_TEST_ID.fetch_add(1, Ordering::Relaxed);
395 let nanos = SystemTime::now()
396 .duration_since(UNIX_EPOCH)
397 .expect("system time")
398 .as_nanos();
399 let mut path = std::env::temp_dir();
400 path.push(format!("aether-storage-{name}-{nanos}-{unique}.sqlite"));
401 Self { path }
402 }
403
404 fn path(&self) -> &Path {
405 &self.path
406 }
407 }
408
409 impl Drop for TestDbPath {
410 fn drop(&mut self) {
411 let _ = std::fs::remove_file(&self.path);
412
413 let wal = PathBuf::from(format!("{}-wal", self.path.display()));
414 let shm = PathBuf::from(format!("{}-shm", self.path.display()));
415 let _ = std::fs::remove_file(wal);
416 let _ = std::fs::remove_file(shm);
417 }
418 }
419}