1use aether_ast::{AttributeId, Datom, ElementId, EntityId, OperationKind, PolicyEnvelope, Value};
2use aether_schema::{AttributeClass, AttributeSchema, Schema};
3use indexmap::IndexMap;
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6
7pub trait Resolver {
8 fn current(&self, schema: &Schema, datoms: &[Datom]) -> Result<ResolvedState, ResolveError>;
9 fn as_of(
10 &self,
11 schema: &Schema,
12 datoms: &[Datom],
13 at: &ElementId,
14 ) -> Result<ResolvedState, ResolveError>;
15}
16
17#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
18pub enum ResolvedValue {
19 Scalar(Option<Value>),
20 Set(Vec<Value>),
21 Sequence(Vec<Value>),
22}
23
24impl Default for ResolvedValue {
25 fn default() -> Self {
26 Self::Scalar(None)
27 }
28}
29
30#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
31pub struct ResolvedFact {
32 pub value: Value,
33 pub source_datom_ids: Vec<ElementId>,
34 pub policy: Option<PolicyEnvelope>,
35}
36
37#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
38pub struct EntityState {
39 pub attributes: IndexMap<AttributeId, ResolvedValue>,
40 pub facts: IndexMap<AttributeId, Vec<ResolvedFact>>,
41}
42
43impl EntityState {
44 pub fn attribute(&self, id: &AttributeId) -> Option<&ResolvedValue> {
45 self.attributes.get(id)
46 }
47
48 pub fn facts(&self, id: &AttributeId) -> &[ResolvedFact] {
49 self.facts.get(id).map(Vec::as_slice).unwrap_or(&[])
50 }
51}
52
53#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
54pub struct ResolvedState {
55 pub entities: IndexMap<EntityId, EntityState>,
56 pub as_of: Option<ElementId>,
57}
58
59impl ResolvedState {
60 pub fn entity(&self, id: &EntityId) -> Option<&EntityState> {
61 self.entities.get(id)
62 }
63}
64
65#[derive(Clone, Debug)]
66struct SequenceEntry {
67 element: ElementId,
68 anchor: Option<ElementId>,
69 fact: ResolvedFact,
70 removed: bool,
71}
72
73#[derive(Clone, Debug, Default)]
74struct ResolverWorkingState {
75 resolved: ResolvedState,
76 sequences: IndexMap<(EntityId, AttributeId), Vec<SequenceEntry>>,
77}
78
79#[derive(Default)]
80pub struct MaterializedResolver;
81
82impl Resolver for MaterializedResolver {
83 fn current(&self, schema: &Schema, datoms: &[Datom]) -> Result<ResolvedState, ResolveError> {
84 let as_of = datoms.last().map(|datom| datom.element);
85 resolve_datoms(schema, datoms, as_of)
86 }
87
88 fn as_of(
89 &self,
90 schema: &Schema,
91 datoms: &[Datom],
92 at: &ElementId,
93 ) -> Result<ResolvedState, ResolveError> {
94 let end = datoms
95 .iter()
96 .position(|datom| datom.element == *at)
97 .ok_or(ResolveError::UnknownElementId(*at))?;
98 resolve_datoms(schema, &datoms[..=end], Some(*at))
99 }
100}
101
102fn resolve_datoms(
103 schema: &Schema,
104 datoms: &[Datom],
105 as_of: Option<ElementId>,
106) -> Result<ResolvedState, ResolveError> {
107 let mut state = ResolverWorkingState {
108 resolved: ResolvedState {
109 entities: IndexMap::new(),
110 as_of,
111 },
112 sequences: IndexMap::new(),
113 };
114
115 for datom in datoms {
116 let attribute = schema
117 .attribute(&datom.attribute)
118 .ok_or(ResolveError::UnknownAttribute(datom.attribute))?;
119 validate_operation(attribute, datom)?;
120 apply_datom(&mut state, attribute, datom)?;
121 }
122
123 Ok(state.resolved)
124}
125
126fn default_value_for(attribute: &AttributeSchema) -> ResolvedValue {
127 match attribute.class {
128 AttributeClass::ScalarLww | AttributeClass::RefScalar => ResolvedValue::Scalar(None),
129 AttributeClass::SetAddWins | AttributeClass::RefSet => ResolvedValue::Set(Vec::new()),
130 AttributeClass::SequenceRga => ResolvedValue::Sequence(Vec::new()),
131 }
132}
133
134fn resolved_fact(datom: &Datom) -> ResolvedFact {
135 ResolvedFact {
136 value: datom.value.clone(),
137 source_datom_ids: vec![datom.element],
138 policy: datom.policy.clone().map(PolicyEnvelope::normalized),
139 }
140}
141
142fn validate_operation(attribute: &AttributeSchema, datom: &Datom) -> Result<(), ResolveError> {
143 let valid = match attribute.class {
144 AttributeClass::ScalarLww | AttributeClass::RefScalar => matches!(
145 datom.op,
146 OperationKind::Assert
147 | OperationKind::Claim
148 | OperationKind::LeaseOpen
149 | OperationKind::LeaseRenew
150 | OperationKind::Annotate
151 | OperationKind::Retract
152 | OperationKind::Release
153 | OperationKind::LeaseExpire
154 ),
155 AttributeClass::SetAddWins | AttributeClass::RefSet => matches!(
156 datom.op,
157 OperationKind::Add
158 | OperationKind::Claim
159 | OperationKind::Annotate
160 | OperationKind::Remove
161 | OperationKind::Release
162 | OperationKind::LeaseExpire
163 | OperationKind::Retract
164 ),
165 AttributeClass::SequenceRga => matches!(
166 datom.op,
167 OperationKind::InsertAfter | OperationKind::Remove | OperationKind::Retract
168 ),
169 };
170
171 if valid {
172 Ok(())
173 } else {
174 Err(ResolveError::InvalidOperationForAttribute {
175 attribute: datom.attribute,
176 class: attribute.class,
177 op: datom.op,
178 })
179 }
180}
181
182fn apply_datom(
183 state: &mut ResolverWorkingState,
184 attribute: &AttributeSchema,
185 datom: &Datom,
186) -> Result<(), ResolveError> {
187 match attribute.class {
188 AttributeClass::ScalarLww | AttributeClass::RefScalar => {
189 let entity_state = state.resolved.entities.entry(datom.entity).or_default();
190 let slot = entity_state
191 .attributes
192 .entry(datom.attribute)
193 .or_insert_with(|| default_value_for(attribute));
194 let ResolvedValue::Scalar(value) = slot else {
195 return Err(ResolveError::AttributeClassMismatch(datom.attribute));
196 };
197
198 match datom.op {
199 OperationKind::Retract | OperationKind::Release | OperationKind::LeaseExpire => {
200 *value = None;
201 entity_state
202 .facts
203 .entry(datom.attribute)
204 .or_default()
205 .clear();
206 }
207 OperationKind::Assert
208 | OperationKind::Claim
209 | OperationKind::LeaseOpen
210 | OperationKind::LeaseRenew
211 | OperationKind::Annotate => {
212 *value = Some(datom.value.clone());
213 let facts = entity_state.facts.entry(datom.attribute).or_default();
214 facts.clear();
215 facts.push(resolved_fact(datom));
216 }
217 _ => unreachable!("operation validity is checked before applying datoms"),
218 }
219 }
220 AttributeClass::SetAddWins | AttributeClass::RefSet => {
221 let entity_state = state.resolved.entities.entry(datom.entity).or_default();
222 let slot = entity_state
223 .attributes
224 .entry(datom.attribute)
225 .or_insert_with(|| default_value_for(attribute));
226 let ResolvedValue::Set(values) = slot else {
227 return Err(ResolveError::AttributeClassMismatch(datom.attribute));
228 };
229 let facts = entity_state.facts.entry(datom.attribute).or_default();
230
231 match datom.op {
232 OperationKind::Remove
233 | OperationKind::Release
234 | OperationKind::LeaseExpire
235 | OperationKind::Retract => {
236 values.retain(|value| value != &datom.value);
237 facts.retain(|fact| fact.value != datom.value);
238 }
239 OperationKind::Add | OperationKind::Claim | OperationKind::Annotate => {
240 if !values.iter().any(|value| value == &datom.value) {
241 values.push(datom.value.clone());
242 }
243 if !facts.iter().any(|fact| fact.value == datom.value) {
244 facts.push(resolved_fact(datom));
245 }
246 }
247 _ => unreachable!("operation validity is checked before applying datoms"),
248 }
249 }
250 AttributeClass::SequenceRga => apply_sequence_datom(state, attribute, datom)?,
251 }
252
253 Ok(())
254}
255
256fn apply_sequence_datom(
257 state: &mut ResolverWorkingState,
258 attribute: &AttributeSchema,
259 datom: &Datom,
260) -> Result<(), ResolveError> {
261 let sequence = state
262 .sequences
263 .entry((datom.entity, datom.attribute))
264 .or_default();
265
266 match datom.op {
267 OperationKind::InsertAfter => {
268 let anchor = match datom.provenance.parent_datom_ids.as_slice() {
269 [] if sequence.is_empty() => None,
270 [] => {
271 return Err(ResolveError::MissingSequenceAnchor {
272 attribute: datom.attribute,
273 element: datom.element,
274 });
275 }
276 [anchor] => Some(*anchor),
277 parents => {
278 return Err(ResolveError::MalformedSequenceAnchor {
279 attribute: datom.attribute,
280 element: datom.element,
281 parent_count: parents.len(),
282 });
283 }
284 };
285
286 if let Some(anchor) = anchor {
287 if !sequence.iter().any(|entry| entry.element == anchor) {
288 return Err(ResolveError::UnknownSequenceAnchor {
289 attribute: datom.attribute,
290 element: datom.element,
291 anchor,
292 });
293 }
294 }
295
296 sequence.push(SequenceEntry {
297 element: datom.element,
298 anchor,
299 fact: resolved_fact(datom),
300 removed: false,
301 });
302 }
303 OperationKind::Remove | OperationKind::Retract => {
304 for entry in sequence
305 .iter_mut()
306 .filter(|entry| !entry.removed && entry.fact.value == datom.value)
307 {
308 entry.removed = true;
309 }
310 }
311 _ => unreachable!("operation validity is checked before applying datoms"),
312 }
313
314 rebuild_sequence_attribute(state, attribute, datom.entity);
315 Ok(())
316}
317
318fn rebuild_sequence_attribute(
319 state: &mut ResolverWorkingState,
320 attribute: &AttributeSchema,
321 entity: EntityId,
322) {
323 let sequence = state
324 .sequences
325 .get(&(entity, attribute.id))
326 .cloned()
327 .unwrap_or_default();
328 let mut children: IndexMap<Option<ElementId>, Vec<SequenceEntry>> = IndexMap::new();
329 for entry in sequence {
330 children.entry(entry.anchor).or_default().push(entry);
331 }
332 for values in children.values_mut() {
333 values.sort_by_key(|entry| entry.element);
334 }
335
336 let mut ordered_facts = Vec::new();
337 collect_sequence_facts(None, &children, &mut ordered_facts);
338 let ordered_values = ordered_facts
339 .iter()
340 .map(|fact| fact.value.clone())
341 .collect::<Vec<_>>();
342
343 let entity_state = state.resolved.entities.entry(entity).or_default();
344 entity_state
345 .attributes
346 .insert(attribute.id, ResolvedValue::Sequence(ordered_values));
347 entity_state.facts.insert(attribute.id, ordered_facts);
348}
349
350fn collect_sequence_facts(
351 anchor: Option<ElementId>,
352 children: &IndexMap<Option<ElementId>, Vec<SequenceEntry>>,
353 ordered_facts: &mut Vec<ResolvedFact>,
354) {
355 let Some(entries) = children.get(&anchor) else {
356 return;
357 };
358
359 for entry in entries {
360 if !entry.removed {
361 ordered_facts.push(entry.fact.clone());
362 }
363 collect_sequence_facts(Some(entry.element), children, ordered_facts);
364 }
365}
366
367#[derive(Debug, Error)]
368pub enum ResolveError {
369 #[error("unknown attribute {0}")]
370 UnknownAttribute(AttributeId),
371 #[error("unknown element id {0}")]
372 UnknownElementId(ElementId),
373 #[error("attribute class mismatch for attribute {0}")]
374 AttributeClassMismatch(AttributeId),
375 #[error("operation {op:?} is invalid for attribute {attribute} with class {class:?}")]
376 InvalidOperationForAttribute {
377 attribute: AttributeId,
378 class: AttributeClass,
379 op: OperationKind,
380 },
381 #[error("sequence insert {element} on attribute {attribute} requires exactly one anchor")]
382 MalformedSequenceAnchor {
383 attribute: AttributeId,
384 element: ElementId,
385 parent_count: usize,
386 },
387 #[error("sequence insert {element} on attribute {attribute} requires an anchor")]
388 MissingSequenceAnchor {
389 attribute: AttributeId,
390 element: ElementId,
391 },
392 #[error(
393 "sequence insert {element} on attribute {attribute} references unknown anchor {anchor}"
394 )]
395 UnknownSequenceAnchor {
396 attribute: AttributeId,
397 element: ElementId,
398 anchor: ElementId,
399 },
400}
401
402#[cfg(test)]
403mod tests {
404 use super::{MaterializedResolver, ResolveError, ResolvedFact, ResolvedValue, Resolver};
405 use aether_ast::{
406 AttributeId, Datom, DatomProvenance, ElementId, EntityId, OperationKind, PolicyEnvelope,
407 ReplicaId, Value,
408 };
409 use aether_schema::{AttributeClass, AttributeSchema, Schema, ValueType};
410
411 const SCALAR_ATTR: AttributeId = AttributeId(1);
412 const SET_ATTR: AttributeId = AttributeId(2);
413 const SEQUENCE_ATTR: AttributeId = AttributeId(3);
414
415 fn schema() -> Schema {
416 let mut schema = Schema::new("v1");
417 for (id, name, class) in [
418 (SCALAR_ATTR, "task.status", AttributeClass::ScalarLww),
419 (SET_ATTR, "task.tags", AttributeClass::SetAddWins),
420 (SEQUENCE_ATTR, "task.steps", AttributeClass::SequenceRga),
421 ] {
422 schema
423 .register_attribute(AttributeSchema {
424 id,
425 name: name.into(),
426 class,
427 value_type: ValueType::String,
428 })
429 .expect("register attribute");
430 }
431 schema
432 }
433
434 fn datom(attribute: AttributeId, element: u64, op: OperationKind, value: &str) -> Datom {
435 Datom {
436 entity: EntityId::new(1),
437 attribute,
438 value: Value::String(value.into()),
439 op,
440 element: ElementId::new(element),
441 replica: ReplicaId::new(1),
442 causal_context: Default::default(),
443 provenance: DatomProvenance::default(),
444 policy: None,
445 }
446 }
447
448 fn datom_with_policy(
449 attribute: AttributeId,
450 element: u64,
451 op: OperationKind,
452 value: &str,
453 policy: PolicyEnvelope,
454 ) -> Datom {
455 let mut datom = datom(attribute, element, op, value);
456 datom.policy = Some(policy);
457 datom
458 }
459
460 fn sequence_insert_after(element: u64, value: &str, anchors: &[u64]) -> Datom {
461 let mut datom = datom(SEQUENCE_ATTR, element, OperationKind::InsertAfter, value);
462 datom.provenance.parent_datom_ids = anchors.iter().copied().map(ElementId::new).collect();
463 datom
464 }
465
466 #[test]
467 fn scalar_lww_and_retract_behavior_are_deterministic() {
468 let schema = schema();
469 let datoms = vec![
470 datom(SCALAR_ATTR, 1, OperationKind::Assert, "open"),
471 datom(SCALAR_ATTR, 2, OperationKind::Assert, "closed"),
472 datom(SCALAR_ATTR, 3, OperationKind::Retract, "closed"),
473 ];
474 let resolver = MaterializedResolver;
475
476 let as_of = resolver
477 .as_of(&schema, &datoms, &ElementId::new(2))
478 .expect("resolve as_of");
479 let current = resolver.current(&schema, &datoms).expect("resolve current");
480
481 assert_eq!(
482 as_of
483 .entity(&EntityId::new(1))
484 .and_then(|entity| entity.attribute(&SCALAR_ATTR)),
485 Some(&ResolvedValue::Scalar(Some(Value::String("closed".into()))))
486 );
487 assert_eq!(
488 current
489 .entity(&EntityId::new(1))
490 .and_then(|entity| entity.attribute(&SCALAR_ATTR)),
491 Some(&ResolvedValue::Scalar(None))
492 );
493 assert_eq!(
494 as_of
495 .entity(&EntityId::new(1))
496 .map(|entity| entity.facts(&SCALAR_ATTR)),
497 Some(
498 [ResolvedFact {
499 value: Value::String("closed".into()),
500 source_datom_ids: vec![ElementId::new(2)],
501 policy: None,
502 }]
503 .as_slice()
504 )
505 );
506 assert!(current
507 .entity(&EntityId::new(1))
508 .is_some_and(|entity| entity.facts(&SCALAR_ATTR).is_empty()));
509 }
510
511 #[test]
512 fn set_add_and_remove_behavior_is_preserved() {
513 let schema = schema();
514 let datoms = vec![
515 datom(SET_ATTR, 1, OperationKind::Add, "alpha"),
516 datom(SET_ATTR, 2, OperationKind::Add, "beta"),
517 datom(SET_ATTR, 3, OperationKind::Remove, "alpha"),
518 ];
519 let resolver = MaterializedResolver;
520
521 let current = resolver.current(&schema, &datoms).expect("resolve current");
522 assert_eq!(
523 current
524 .entity(&EntityId::new(1))
525 .and_then(|entity| entity.attribute(&SET_ATTR)),
526 Some(&ResolvedValue::Set(vec![Value::String("beta".into())]))
527 );
528 assert_eq!(
529 current
530 .entity(&EntityId::new(1))
531 .map(|entity| entity.facts(&SET_ATTR)),
532 Some(
533 [ResolvedFact {
534 value: Value::String("beta".into()),
535 source_datom_ids: vec![ElementId::new(2)],
536 policy: None,
537 }]
538 .as_slice()
539 )
540 );
541 }
542
543 #[test]
544 fn sequence_replay_is_stable() {
545 let schema = schema();
546 let datoms = vec![
547 sequence_insert_after(1, "a", &[]),
548 sequence_insert_after(2, "b", &[1]),
549 datom(SEQUENCE_ATTR, 3, OperationKind::Remove, "a"),
550 sequence_insert_after(4, "c", &[1]),
551 ];
552 let resolver = MaterializedResolver;
553
554 let current = resolver.current(&schema, &datoms).expect("resolve current");
555 assert_eq!(
556 current
557 .entity(&EntityId::new(1))
558 .and_then(|entity| entity.attribute(&SEQUENCE_ATTR)),
559 Some(&ResolvedValue::Sequence(vec![
560 Value::String("b".into()),
561 Value::String("c".into()),
562 ]))
563 );
564 assert_eq!(
565 current
566 .entity(&EntityId::new(1))
567 .map(|entity| entity.facts(&SEQUENCE_ATTR)),
568 Some(
569 [
570 ResolvedFact {
571 value: Value::String("b".into()),
572 source_datom_ids: vec![ElementId::new(2)],
573 policy: None,
574 },
575 ResolvedFact {
576 value: Value::String("c".into()),
577 source_datom_ids: vec![ElementId::new(4)],
578 policy: None,
579 },
580 ]
581 .as_slice()
582 )
583 );
584 }
585
586 #[test]
587 fn current_equals_as_of_last_element() {
588 let schema = schema();
589 let datoms = vec![
590 datom(SCALAR_ATTR, 1, OperationKind::Assert, "queued"),
591 datom(SCALAR_ATTR, 2, OperationKind::Assert, "running"),
592 ];
593 let resolver = MaterializedResolver;
594
595 let current = resolver.current(&schema, &datoms).expect("resolve current");
596 let as_of = resolver
597 .as_of(&schema, &datoms, &ElementId::new(2))
598 .expect("resolve as_of");
599
600 assert_eq!(current, as_of);
601 }
602
603 #[test]
604 fn scalar_resolved_fact_preserves_policy() {
605 let schema = schema();
606 let datoms = vec![datom_with_policy(
607 SCALAR_ATTR,
608 1,
609 OperationKind::Assert,
610 "ready",
611 PolicyEnvelope {
612 capabilities: vec!["executor".into()],
613 visibilities: vec!["ops".into()],
614 },
615 )];
616
617 let current = MaterializedResolver
618 .current(&schema, &datoms)
619 .expect("resolve current with policy");
620 let facts = current
621 .entity(&EntityId::new(1))
622 .expect("entity state")
623 .facts(&SCALAR_ATTR);
624
625 assert_eq!(facts.len(), 1);
626 assert_eq!(
627 facts[0].policy,
628 Some(PolicyEnvelope {
629 capabilities: vec!["executor".into()],
630 visibilities: vec!["ops".into()],
631 })
632 );
633 }
634
635 #[test]
636 fn invalid_operation_for_attribute_is_rejected() {
637 let schema = schema();
638 let error = MaterializedResolver
639 .current(
640 &schema,
641 &[datom(SCALAR_ATTR, 1, OperationKind::Add, "invalid")],
642 )
643 .expect_err("invalid scalar add should fail");
644
645 assert!(matches!(
646 error,
647 ResolveError::InvalidOperationForAttribute {
648 attribute,
649 class: AttributeClass::ScalarLww,
650 op: OperationKind::Add,
651 } if attribute == SCALAR_ATTR
652 ));
653 }
654
655 #[test]
656 fn every_v1_operation_has_a_valid_home_class() {
657 let schema = schema();
658
659 let passing_cases = [
660 datom(SCALAR_ATTR, 1, OperationKind::Assert, "scalar-assert"),
661 datom(SCALAR_ATTR, 2, OperationKind::Retract, "scalar-assert"),
662 datom(SET_ATTR, 3, OperationKind::Add, "set-add"),
663 datom(SET_ATTR, 4, OperationKind::Remove, "set-add"),
664 datom(SCALAR_ATTR, 5, OperationKind::Claim, "worker-a"),
665 datom(SCALAR_ATTR, 6, OperationKind::Release, "worker-a"),
666 datom(SCALAR_ATTR, 7, OperationKind::LeaseOpen, "active"),
667 datom(SCALAR_ATTR, 8, OperationKind::LeaseRenew, "active"),
668 datom(SCALAR_ATTR, 9, OperationKind::LeaseExpire, "active"),
669 datom(SCALAR_ATTR, 10, OperationKind::Annotate, "annotated"),
670 ];
671
672 for datom in passing_cases {
673 MaterializedResolver
674 .current(&schema, &[datom])
675 .expect("documented valid operation should resolve");
676 }
677
678 MaterializedResolver
679 .current(
680 &schema,
681 &[sequence_insert_after(11, "sequence-bootstrap", &[])],
682 )
683 .expect("insert-after bootstrap should resolve");
684 }
685
686 #[test]
687 fn documented_invalid_operation_matrix_examples_are_rejected() {
688 let schema = schema();
689
690 let invalid_cases = [
691 (SCALAR_ATTR, OperationKind::Add, "scalar-invalid-add"),
692 (SCALAR_ATTR, OperationKind::Remove, "scalar-invalid-remove"),
693 (
694 SCALAR_ATTR,
695 OperationKind::InsertAfter,
696 "scalar-invalid-insert",
697 ),
698 (SET_ATTR, OperationKind::Assert, "set-invalid-assert"),
699 (SET_ATTR, OperationKind::LeaseOpen, "set-invalid-lease-open"),
700 (
701 SET_ATTR,
702 OperationKind::LeaseRenew,
703 "set-invalid-lease-renew",
704 ),
705 (SET_ATTR, OperationKind::InsertAfter, "set-invalid-insert"),
706 (
707 SEQUENCE_ATTR,
708 OperationKind::Assert,
709 "sequence-invalid-assert",
710 ),
711 (SEQUENCE_ATTR, OperationKind::Add, "sequence-invalid-add"),
712 (
713 SEQUENCE_ATTR,
714 OperationKind::Claim,
715 "sequence-invalid-claim",
716 ),
717 (
718 SEQUENCE_ATTR,
719 OperationKind::LeaseOpen,
720 "sequence-invalid-lease-open",
721 ),
722 (
723 SEQUENCE_ATTR,
724 OperationKind::LeaseRenew,
725 "sequence-invalid-lease-renew",
726 ),
727 (
728 SEQUENCE_ATTR,
729 OperationKind::LeaseExpire,
730 "sequence-invalid-lease-expire",
731 ),
732 (
733 SEQUENCE_ATTR,
734 OperationKind::Annotate,
735 "sequence-invalid-annotate",
736 ),
737 (
738 SEQUENCE_ATTR,
739 OperationKind::Release,
740 "sequence-invalid-release",
741 ),
742 ];
743
744 for (attribute, op, value) in invalid_cases {
745 let error = MaterializedResolver
746 .current(&schema, &[datom(attribute, 1, op, value)])
747 .expect_err("documented invalid operation should fail");
748 assert!(matches!(
749 error,
750 ResolveError::InvalidOperationForAttribute { attribute: actual, op: actual_op, .. }
751 if actual == attribute && actual_op == op
752 ));
753 }
754 }
755
756 #[test]
757 fn claim_release_and_lease_cycles_replay_deterministically() {
758 let schema = schema();
759 let datoms = vec![
760 datom(SCALAR_ATTR, 1, OperationKind::Claim, "worker-a"),
761 datom(SCALAR_ATTR, 2, OperationKind::Release, "worker-a"),
762 datom(SCALAR_ATTR, 3, OperationKind::LeaseOpen, "active"),
763 datom(SCALAR_ATTR, 4, OperationKind::LeaseRenew, "active"),
764 datom(SCALAR_ATTR, 5, OperationKind::LeaseExpire, "active"),
765 ];
766
767 let as_of_claim = MaterializedResolver
768 .as_of(&schema, &datoms, &ElementId::new(1))
769 .expect("resolve claim cut");
770 let as_of_lease = MaterializedResolver
771 .as_of(&schema, &datoms, &ElementId::new(4))
772 .expect("resolve lease-renew cut");
773 let current = MaterializedResolver
774 .current(&schema, &datoms)
775 .expect("resolve current");
776
777 assert_eq!(
778 as_of_claim
779 .entity(&EntityId::new(1))
780 .and_then(|entity| entity.attribute(&SCALAR_ATTR)),
781 Some(&ResolvedValue::Scalar(Some(Value::String(
782 "worker-a".into()
783 ))))
784 );
785 assert_eq!(
786 as_of_lease
787 .entity(&EntityId::new(1))
788 .and_then(|entity| entity.attribute(&SCALAR_ATTR)),
789 Some(&ResolvedValue::Scalar(Some(Value::String("active".into()))))
790 );
791 assert_eq!(
792 current
793 .entity(&EntityId::new(1))
794 .and_then(|entity| entity.attribute(&SCALAR_ATTR)),
795 Some(&ResolvedValue::Scalar(None))
796 );
797 }
798
799 #[test]
800 fn sequence_insert_after_orders_children_by_anchor_then_element() {
801 let schema = schema();
802 let datoms = vec![
803 sequence_insert_after(1, "a", &[]),
804 sequence_insert_after(2, "c", &[1]),
805 sequence_insert_after(3, "b", &[1]),
806 ];
807
808 let current = MaterializedResolver
809 .current(&schema, &datoms)
810 .expect("resolve anchored sequence");
811 assert_eq!(
812 current
813 .entity(&EntityId::new(1))
814 .and_then(|entity| entity.attribute(&SEQUENCE_ATTR)),
815 Some(&ResolvedValue::Sequence(vec![
816 Value::String("a".into()),
817 Value::String("c".into()),
818 Value::String("b".into()),
819 ]))
820 );
821 }
822
823 #[test]
824 fn non_bootstrap_sequence_insert_requires_exactly_one_known_anchor() {
825 let schema = schema();
826
827 let missing_anchor = MaterializedResolver
828 .current(
829 &schema,
830 &[
831 sequence_insert_after(1, "a", &[]),
832 sequence_insert_after(2, "b", &[]),
833 ],
834 )
835 .expect_err("non-bootstrap insert without anchor should fail");
836 assert!(matches!(
837 missing_anchor,
838 ResolveError::MissingSequenceAnchor {
839 attribute,
840 element,
841 } if attribute == SEQUENCE_ATTR && element == ElementId::new(2)
842 ));
843
844 let malformed = MaterializedResolver
845 .current(
846 &schema,
847 &[
848 sequence_insert_after(1, "a", &[]),
849 sequence_insert_after(2, "b", &[1, 9]),
850 ],
851 )
852 .expect_err("multi-anchor insert should fail");
853 assert!(matches!(
854 malformed,
855 ResolveError::MalformedSequenceAnchor {
856 attribute,
857 element,
858 parent_count,
859 } if attribute == SEQUENCE_ATTR
860 && element == ElementId::new(2)
861 && parent_count == 2
862 ));
863
864 let unknown = MaterializedResolver
865 .current(
866 &schema,
867 &[
868 sequence_insert_after(1, "a", &[]),
869 sequence_insert_after(2, "b", &[9]),
870 ],
871 )
872 .expect_err("unknown anchor should fail");
873 assert!(matches!(
874 unknown,
875 ResolveError::UnknownSequenceAnchor {
876 attribute,
877 element,
878 anchor,
879 } if attribute == SEQUENCE_ATTR
880 && element == ElementId::new(2)
881 && anchor == ElementId::new(9)
882 ));
883 }
884}