aether_resolver/
lib.rs

1use aether_ast::{AttributeId, Datom, ElementId, EntityId, OperationKind, PolicyEnvelope, Value};
2use aether_schema::{AttributeClass, AttributeSchema, Schema};
3use indexmap::IndexMap;
4use serde::{Deserialize, Serialize};
5use thiserror::Error;
6
7pub trait Resolver {
8    fn current(&self, schema: &Schema, datoms: &[Datom]) -> Result<ResolvedState, ResolveError>;
9    fn as_of(
10        &self,
11        schema: &Schema,
12        datoms: &[Datom],
13        at: &ElementId,
14    ) -> Result<ResolvedState, ResolveError>;
15}
16
17#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
18pub enum ResolvedValue {
19    Scalar(Option<Value>),
20    Set(Vec<Value>),
21    Sequence(Vec<Value>),
22}
23
24impl Default for ResolvedValue {
25    fn default() -> Self {
26        Self::Scalar(None)
27    }
28}
29
30#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
31pub struct ResolvedFact {
32    pub value: Value,
33    pub source_datom_ids: Vec<ElementId>,
34    pub policy: Option<PolicyEnvelope>,
35}
36
37#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
38pub struct EntityState {
39    pub attributes: IndexMap<AttributeId, ResolvedValue>,
40    pub facts: IndexMap<AttributeId, Vec<ResolvedFact>>,
41}
42
43impl EntityState {
44    pub fn attribute(&self, id: &AttributeId) -> Option<&ResolvedValue> {
45        self.attributes.get(id)
46    }
47
48    pub fn facts(&self, id: &AttributeId) -> &[ResolvedFact] {
49        self.facts.get(id).map(Vec::as_slice).unwrap_or(&[])
50    }
51}
52
53#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
54pub struct ResolvedState {
55    pub entities: IndexMap<EntityId, EntityState>,
56    pub as_of: Option<ElementId>,
57}
58
59impl ResolvedState {
60    pub fn entity(&self, id: &EntityId) -> Option<&EntityState> {
61        self.entities.get(id)
62    }
63}
64
65#[derive(Clone, Debug)]
66struct SequenceEntry {
67    element: ElementId,
68    anchor: Option<ElementId>,
69    fact: ResolvedFact,
70    removed: bool,
71}
72
73#[derive(Clone, Debug, Default)]
74struct ResolverWorkingState {
75    resolved: ResolvedState,
76    sequences: IndexMap<(EntityId, AttributeId), Vec<SequenceEntry>>,
77}
78
79#[derive(Default)]
80pub struct MaterializedResolver;
81
82impl Resolver for MaterializedResolver {
83    fn current(&self, schema: &Schema, datoms: &[Datom]) -> Result<ResolvedState, ResolveError> {
84        let as_of = datoms.last().map(|datom| datom.element);
85        resolve_datoms(schema, datoms, as_of)
86    }
87
88    fn as_of(
89        &self,
90        schema: &Schema,
91        datoms: &[Datom],
92        at: &ElementId,
93    ) -> Result<ResolvedState, ResolveError> {
94        let end = datoms
95            .iter()
96            .position(|datom| datom.element == *at)
97            .ok_or(ResolveError::UnknownElementId(*at))?;
98        resolve_datoms(schema, &datoms[..=end], Some(*at))
99    }
100}
101
102fn resolve_datoms(
103    schema: &Schema,
104    datoms: &[Datom],
105    as_of: Option<ElementId>,
106) -> Result<ResolvedState, ResolveError> {
107    let mut state = ResolverWorkingState {
108        resolved: ResolvedState {
109            entities: IndexMap::new(),
110            as_of,
111        },
112        sequences: IndexMap::new(),
113    };
114
115    for datom in datoms {
116        let attribute = schema
117            .attribute(&datom.attribute)
118            .ok_or(ResolveError::UnknownAttribute(datom.attribute))?;
119        validate_operation(attribute, datom)?;
120        apply_datom(&mut state, attribute, datom)?;
121    }
122
123    Ok(state.resolved)
124}
125
126fn default_value_for(attribute: &AttributeSchema) -> ResolvedValue {
127    match attribute.class {
128        AttributeClass::ScalarLww | AttributeClass::RefScalar => ResolvedValue::Scalar(None),
129        AttributeClass::SetAddWins | AttributeClass::RefSet => ResolvedValue::Set(Vec::new()),
130        AttributeClass::SequenceRga => ResolvedValue::Sequence(Vec::new()),
131    }
132}
133
134fn resolved_fact(datom: &Datom) -> ResolvedFact {
135    ResolvedFact {
136        value: datom.value.clone(),
137        source_datom_ids: vec![datom.element],
138        policy: datom.policy.clone().map(PolicyEnvelope::normalized),
139    }
140}
141
142fn validate_operation(attribute: &AttributeSchema, datom: &Datom) -> Result<(), ResolveError> {
143    let valid = match attribute.class {
144        AttributeClass::ScalarLww | AttributeClass::RefScalar => matches!(
145            datom.op,
146            OperationKind::Assert
147                | OperationKind::Claim
148                | OperationKind::LeaseOpen
149                | OperationKind::LeaseRenew
150                | OperationKind::Annotate
151                | OperationKind::Retract
152                | OperationKind::Release
153                | OperationKind::LeaseExpire
154        ),
155        AttributeClass::SetAddWins | AttributeClass::RefSet => matches!(
156            datom.op,
157            OperationKind::Add
158                | OperationKind::Claim
159                | OperationKind::Annotate
160                | OperationKind::Remove
161                | OperationKind::Release
162                | OperationKind::LeaseExpire
163                | OperationKind::Retract
164        ),
165        AttributeClass::SequenceRga => matches!(
166            datom.op,
167            OperationKind::InsertAfter | OperationKind::Remove | OperationKind::Retract
168        ),
169    };
170
171    if valid {
172        Ok(())
173    } else {
174        Err(ResolveError::InvalidOperationForAttribute {
175            attribute: datom.attribute,
176            class: attribute.class,
177            op: datom.op,
178        })
179    }
180}
181
182fn apply_datom(
183    state: &mut ResolverWorkingState,
184    attribute: &AttributeSchema,
185    datom: &Datom,
186) -> Result<(), ResolveError> {
187    match attribute.class {
188        AttributeClass::ScalarLww | AttributeClass::RefScalar => {
189            let entity_state = state.resolved.entities.entry(datom.entity).or_default();
190            let slot = entity_state
191                .attributes
192                .entry(datom.attribute)
193                .or_insert_with(|| default_value_for(attribute));
194            let ResolvedValue::Scalar(value) = slot else {
195                return Err(ResolveError::AttributeClassMismatch(datom.attribute));
196            };
197
198            match datom.op {
199                OperationKind::Retract | OperationKind::Release | OperationKind::LeaseExpire => {
200                    *value = None;
201                    entity_state
202                        .facts
203                        .entry(datom.attribute)
204                        .or_default()
205                        .clear();
206                }
207                OperationKind::Assert
208                | OperationKind::Claim
209                | OperationKind::LeaseOpen
210                | OperationKind::LeaseRenew
211                | OperationKind::Annotate => {
212                    *value = Some(datom.value.clone());
213                    let facts = entity_state.facts.entry(datom.attribute).or_default();
214                    facts.clear();
215                    facts.push(resolved_fact(datom));
216                }
217                _ => unreachable!("operation validity is checked before applying datoms"),
218            }
219        }
220        AttributeClass::SetAddWins | AttributeClass::RefSet => {
221            let entity_state = state.resolved.entities.entry(datom.entity).or_default();
222            let slot = entity_state
223                .attributes
224                .entry(datom.attribute)
225                .or_insert_with(|| default_value_for(attribute));
226            let ResolvedValue::Set(values) = slot else {
227                return Err(ResolveError::AttributeClassMismatch(datom.attribute));
228            };
229            let facts = entity_state.facts.entry(datom.attribute).or_default();
230
231            match datom.op {
232                OperationKind::Remove
233                | OperationKind::Release
234                | OperationKind::LeaseExpire
235                | OperationKind::Retract => {
236                    values.retain(|value| value != &datom.value);
237                    facts.retain(|fact| fact.value != datom.value);
238                }
239                OperationKind::Add | OperationKind::Claim | OperationKind::Annotate => {
240                    if !values.iter().any(|value| value == &datom.value) {
241                        values.push(datom.value.clone());
242                    }
243                    if !facts.iter().any(|fact| fact.value == datom.value) {
244                        facts.push(resolved_fact(datom));
245                    }
246                }
247                _ => unreachable!("operation validity is checked before applying datoms"),
248            }
249        }
250        AttributeClass::SequenceRga => apply_sequence_datom(state, attribute, datom)?,
251    }
252
253    Ok(())
254}
255
256fn apply_sequence_datom(
257    state: &mut ResolverWorkingState,
258    attribute: &AttributeSchema,
259    datom: &Datom,
260) -> Result<(), ResolveError> {
261    let sequence = state
262        .sequences
263        .entry((datom.entity, datom.attribute))
264        .or_default();
265
266    match datom.op {
267        OperationKind::InsertAfter => {
268            let anchor = match datom.provenance.parent_datom_ids.as_slice() {
269                [] if sequence.is_empty() => None,
270                [] => {
271                    return Err(ResolveError::MissingSequenceAnchor {
272                        attribute: datom.attribute,
273                        element: datom.element,
274                    });
275                }
276                [anchor] => Some(*anchor),
277                parents => {
278                    return Err(ResolveError::MalformedSequenceAnchor {
279                        attribute: datom.attribute,
280                        element: datom.element,
281                        parent_count: parents.len(),
282                    });
283                }
284            };
285
286            if let Some(anchor) = anchor {
287                if !sequence.iter().any(|entry| entry.element == anchor) {
288                    return Err(ResolveError::UnknownSequenceAnchor {
289                        attribute: datom.attribute,
290                        element: datom.element,
291                        anchor,
292                    });
293                }
294            }
295
296            sequence.push(SequenceEntry {
297                element: datom.element,
298                anchor,
299                fact: resolved_fact(datom),
300                removed: false,
301            });
302        }
303        OperationKind::Remove | OperationKind::Retract => {
304            for entry in sequence
305                .iter_mut()
306                .filter(|entry| !entry.removed && entry.fact.value == datom.value)
307            {
308                entry.removed = true;
309            }
310        }
311        _ => unreachable!("operation validity is checked before applying datoms"),
312    }
313
314    rebuild_sequence_attribute(state, attribute, datom.entity);
315    Ok(())
316}
317
318fn rebuild_sequence_attribute(
319    state: &mut ResolverWorkingState,
320    attribute: &AttributeSchema,
321    entity: EntityId,
322) {
323    let sequence = state
324        .sequences
325        .get(&(entity, attribute.id))
326        .cloned()
327        .unwrap_or_default();
328    let mut children: IndexMap<Option<ElementId>, Vec<SequenceEntry>> = IndexMap::new();
329    for entry in sequence {
330        children.entry(entry.anchor).or_default().push(entry);
331    }
332    for values in children.values_mut() {
333        values.sort_by_key(|entry| entry.element);
334    }
335
336    let mut ordered_facts = Vec::new();
337    collect_sequence_facts(None, &children, &mut ordered_facts);
338    let ordered_values = ordered_facts
339        .iter()
340        .map(|fact| fact.value.clone())
341        .collect::<Vec<_>>();
342
343    let entity_state = state.resolved.entities.entry(entity).or_default();
344    entity_state
345        .attributes
346        .insert(attribute.id, ResolvedValue::Sequence(ordered_values));
347    entity_state.facts.insert(attribute.id, ordered_facts);
348}
349
350fn collect_sequence_facts(
351    anchor: Option<ElementId>,
352    children: &IndexMap<Option<ElementId>, Vec<SequenceEntry>>,
353    ordered_facts: &mut Vec<ResolvedFact>,
354) {
355    let Some(entries) = children.get(&anchor) else {
356        return;
357    };
358
359    for entry in entries {
360        if !entry.removed {
361            ordered_facts.push(entry.fact.clone());
362        }
363        collect_sequence_facts(Some(entry.element), children, ordered_facts);
364    }
365}
366
367#[derive(Debug, Error)]
368pub enum ResolveError {
369    #[error("unknown attribute {0}")]
370    UnknownAttribute(AttributeId),
371    #[error("unknown element id {0}")]
372    UnknownElementId(ElementId),
373    #[error("attribute class mismatch for attribute {0}")]
374    AttributeClassMismatch(AttributeId),
375    #[error("operation {op:?} is invalid for attribute {attribute} with class {class:?}")]
376    InvalidOperationForAttribute {
377        attribute: AttributeId,
378        class: AttributeClass,
379        op: OperationKind,
380    },
381    #[error("sequence insert {element} on attribute {attribute} requires exactly one anchor")]
382    MalformedSequenceAnchor {
383        attribute: AttributeId,
384        element: ElementId,
385        parent_count: usize,
386    },
387    #[error("sequence insert {element} on attribute {attribute} requires an anchor")]
388    MissingSequenceAnchor {
389        attribute: AttributeId,
390        element: ElementId,
391    },
392    #[error(
393        "sequence insert {element} on attribute {attribute} references unknown anchor {anchor}"
394    )]
395    UnknownSequenceAnchor {
396        attribute: AttributeId,
397        element: ElementId,
398        anchor: ElementId,
399    },
400}
401
402#[cfg(test)]
403mod tests {
404    use super::{MaterializedResolver, ResolveError, ResolvedFact, ResolvedValue, Resolver};
405    use aether_ast::{
406        AttributeId, Datom, DatomProvenance, ElementId, EntityId, OperationKind, PolicyEnvelope,
407        ReplicaId, Value,
408    };
409    use aether_schema::{AttributeClass, AttributeSchema, Schema, ValueType};
410
411    const SCALAR_ATTR: AttributeId = AttributeId(1);
412    const SET_ATTR: AttributeId = AttributeId(2);
413    const SEQUENCE_ATTR: AttributeId = AttributeId(3);
414
415    fn schema() -> Schema {
416        let mut schema = Schema::new("v1");
417        for (id, name, class) in [
418            (SCALAR_ATTR, "task.status", AttributeClass::ScalarLww),
419            (SET_ATTR, "task.tags", AttributeClass::SetAddWins),
420            (SEQUENCE_ATTR, "task.steps", AttributeClass::SequenceRga),
421        ] {
422            schema
423                .register_attribute(AttributeSchema {
424                    id,
425                    name: name.into(),
426                    class,
427                    value_type: ValueType::String,
428                })
429                .expect("register attribute");
430        }
431        schema
432    }
433
434    fn datom(attribute: AttributeId, element: u64, op: OperationKind, value: &str) -> Datom {
435        Datom {
436            entity: EntityId::new(1),
437            attribute,
438            value: Value::String(value.into()),
439            op,
440            element: ElementId::new(element),
441            replica: ReplicaId::new(1),
442            causal_context: Default::default(),
443            provenance: DatomProvenance::default(),
444            policy: None,
445        }
446    }
447
448    fn datom_with_policy(
449        attribute: AttributeId,
450        element: u64,
451        op: OperationKind,
452        value: &str,
453        policy: PolicyEnvelope,
454    ) -> Datom {
455        let mut datom = datom(attribute, element, op, value);
456        datom.policy = Some(policy);
457        datom
458    }
459
460    fn sequence_insert_after(element: u64, value: &str, anchors: &[u64]) -> Datom {
461        let mut datom = datom(SEQUENCE_ATTR, element, OperationKind::InsertAfter, value);
462        datom.provenance.parent_datom_ids = anchors.iter().copied().map(ElementId::new).collect();
463        datom
464    }
465
466    #[test]
467    fn scalar_lww_and_retract_behavior_are_deterministic() {
468        let schema = schema();
469        let datoms = vec![
470            datom(SCALAR_ATTR, 1, OperationKind::Assert, "open"),
471            datom(SCALAR_ATTR, 2, OperationKind::Assert, "closed"),
472            datom(SCALAR_ATTR, 3, OperationKind::Retract, "closed"),
473        ];
474        let resolver = MaterializedResolver;
475
476        let as_of = resolver
477            .as_of(&schema, &datoms, &ElementId::new(2))
478            .expect("resolve as_of");
479        let current = resolver.current(&schema, &datoms).expect("resolve current");
480
481        assert_eq!(
482            as_of
483                .entity(&EntityId::new(1))
484                .and_then(|entity| entity.attribute(&SCALAR_ATTR)),
485            Some(&ResolvedValue::Scalar(Some(Value::String("closed".into()))))
486        );
487        assert_eq!(
488            current
489                .entity(&EntityId::new(1))
490                .and_then(|entity| entity.attribute(&SCALAR_ATTR)),
491            Some(&ResolvedValue::Scalar(None))
492        );
493        assert_eq!(
494            as_of
495                .entity(&EntityId::new(1))
496                .map(|entity| entity.facts(&SCALAR_ATTR)),
497            Some(
498                [ResolvedFact {
499                    value: Value::String("closed".into()),
500                    source_datom_ids: vec![ElementId::new(2)],
501                    policy: None,
502                }]
503                .as_slice()
504            )
505        );
506        assert!(current
507            .entity(&EntityId::new(1))
508            .is_some_and(|entity| entity.facts(&SCALAR_ATTR).is_empty()));
509    }
510
511    #[test]
512    fn set_add_and_remove_behavior_is_preserved() {
513        let schema = schema();
514        let datoms = vec![
515            datom(SET_ATTR, 1, OperationKind::Add, "alpha"),
516            datom(SET_ATTR, 2, OperationKind::Add, "beta"),
517            datom(SET_ATTR, 3, OperationKind::Remove, "alpha"),
518        ];
519        let resolver = MaterializedResolver;
520
521        let current = resolver.current(&schema, &datoms).expect("resolve current");
522        assert_eq!(
523            current
524                .entity(&EntityId::new(1))
525                .and_then(|entity| entity.attribute(&SET_ATTR)),
526            Some(&ResolvedValue::Set(vec![Value::String("beta".into())]))
527        );
528        assert_eq!(
529            current
530                .entity(&EntityId::new(1))
531                .map(|entity| entity.facts(&SET_ATTR)),
532            Some(
533                [ResolvedFact {
534                    value: Value::String("beta".into()),
535                    source_datom_ids: vec![ElementId::new(2)],
536                    policy: None,
537                }]
538                .as_slice()
539            )
540        );
541    }
542
543    #[test]
544    fn sequence_replay_is_stable() {
545        let schema = schema();
546        let datoms = vec![
547            sequence_insert_after(1, "a", &[]),
548            sequence_insert_after(2, "b", &[1]),
549            datom(SEQUENCE_ATTR, 3, OperationKind::Remove, "a"),
550            sequence_insert_after(4, "c", &[1]),
551        ];
552        let resolver = MaterializedResolver;
553
554        let current = resolver.current(&schema, &datoms).expect("resolve current");
555        assert_eq!(
556            current
557                .entity(&EntityId::new(1))
558                .and_then(|entity| entity.attribute(&SEQUENCE_ATTR)),
559            Some(&ResolvedValue::Sequence(vec![
560                Value::String("b".into()),
561                Value::String("c".into()),
562            ]))
563        );
564        assert_eq!(
565            current
566                .entity(&EntityId::new(1))
567                .map(|entity| entity.facts(&SEQUENCE_ATTR)),
568            Some(
569                [
570                    ResolvedFact {
571                        value: Value::String("b".into()),
572                        source_datom_ids: vec![ElementId::new(2)],
573                        policy: None,
574                    },
575                    ResolvedFact {
576                        value: Value::String("c".into()),
577                        source_datom_ids: vec![ElementId::new(4)],
578                        policy: None,
579                    },
580                ]
581                .as_slice()
582            )
583        );
584    }
585
586    #[test]
587    fn current_equals_as_of_last_element() {
588        let schema = schema();
589        let datoms = vec![
590            datom(SCALAR_ATTR, 1, OperationKind::Assert, "queued"),
591            datom(SCALAR_ATTR, 2, OperationKind::Assert, "running"),
592        ];
593        let resolver = MaterializedResolver;
594
595        let current = resolver.current(&schema, &datoms).expect("resolve current");
596        let as_of = resolver
597            .as_of(&schema, &datoms, &ElementId::new(2))
598            .expect("resolve as_of");
599
600        assert_eq!(current, as_of);
601    }
602
603    #[test]
604    fn scalar_resolved_fact_preserves_policy() {
605        let schema = schema();
606        let datoms = vec![datom_with_policy(
607            SCALAR_ATTR,
608            1,
609            OperationKind::Assert,
610            "ready",
611            PolicyEnvelope {
612                capabilities: vec!["executor".into()],
613                visibilities: vec!["ops".into()],
614            },
615        )];
616
617        let current = MaterializedResolver
618            .current(&schema, &datoms)
619            .expect("resolve current with policy");
620        let facts = current
621            .entity(&EntityId::new(1))
622            .expect("entity state")
623            .facts(&SCALAR_ATTR);
624
625        assert_eq!(facts.len(), 1);
626        assert_eq!(
627            facts[0].policy,
628            Some(PolicyEnvelope {
629                capabilities: vec!["executor".into()],
630                visibilities: vec!["ops".into()],
631            })
632        );
633    }
634
635    #[test]
636    fn invalid_operation_for_attribute_is_rejected() {
637        let schema = schema();
638        let error = MaterializedResolver
639            .current(
640                &schema,
641                &[datom(SCALAR_ATTR, 1, OperationKind::Add, "invalid")],
642            )
643            .expect_err("invalid scalar add should fail");
644
645        assert!(matches!(
646            error,
647            ResolveError::InvalidOperationForAttribute {
648                attribute,
649                class: AttributeClass::ScalarLww,
650                op: OperationKind::Add,
651            } if attribute == SCALAR_ATTR
652        ));
653    }
654
655    #[test]
656    fn every_v1_operation_has_a_valid_home_class() {
657        let schema = schema();
658
659        let passing_cases = [
660            datom(SCALAR_ATTR, 1, OperationKind::Assert, "scalar-assert"),
661            datom(SCALAR_ATTR, 2, OperationKind::Retract, "scalar-assert"),
662            datom(SET_ATTR, 3, OperationKind::Add, "set-add"),
663            datom(SET_ATTR, 4, OperationKind::Remove, "set-add"),
664            datom(SCALAR_ATTR, 5, OperationKind::Claim, "worker-a"),
665            datom(SCALAR_ATTR, 6, OperationKind::Release, "worker-a"),
666            datom(SCALAR_ATTR, 7, OperationKind::LeaseOpen, "active"),
667            datom(SCALAR_ATTR, 8, OperationKind::LeaseRenew, "active"),
668            datom(SCALAR_ATTR, 9, OperationKind::LeaseExpire, "active"),
669            datom(SCALAR_ATTR, 10, OperationKind::Annotate, "annotated"),
670        ];
671
672        for datom in passing_cases {
673            MaterializedResolver
674                .current(&schema, &[datom])
675                .expect("documented valid operation should resolve");
676        }
677
678        MaterializedResolver
679            .current(
680                &schema,
681                &[sequence_insert_after(11, "sequence-bootstrap", &[])],
682            )
683            .expect("insert-after bootstrap should resolve");
684    }
685
686    #[test]
687    fn documented_invalid_operation_matrix_examples_are_rejected() {
688        let schema = schema();
689
690        let invalid_cases = [
691            (SCALAR_ATTR, OperationKind::Add, "scalar-invalid-add"),
692            (SCALAR_ATTR, OperationKind::Remove, "scalar-invalid-remove"),
693            (
694                SCALAR_ATTR,
695                OperationKind::InsertAfter,
696                "scalar-invalid-insert",
697            ),
698            (SET_ATTR, OperationKind::Assert, "set-invalid-assert"),
699            (SET_ATTR, OperationKind::LeaseOpen, "set-invalid-lease-open"),
700            (
701                SET_ATTR,
702                OperationKind::LeaseRenew,
703                "set-invalid-lease-renew",
704            ),
705            (SET_ATTR, OperationKind::InsertAfter, "set-invalid-insert"),
706            (
707                SEQUENCE_ATTR,
708                OperationKind::Assert,
709                "sequence-invalid-assert",
710            ),
711            (SEQUENCE_ATTR, OperationKind::Add, "sequence-invalid-add"),
712            (
713                SEQUENCE_ATTR,
714                OperationKind::Claim,
715                "sequence-invalid-claim",
716            ),
717            (
718                SEQUENCE_ATTR,
719                OperationKind::LeaseOpen,
720                "sequence-invalid-lease-open",
721            ),
722            (
723                SEQUENCE_ATTR,
724                OperationKind::LeaseRenew,
725                "sequence-invalid-lease-renew",
726            ),
727            (
728                SEQUENCE_ATTR,
729                OperationKind::LeaseExpire,
730                "sequence-invalid-lease-expire",
731            ),
732            (
733                SEQUENCE_ATTR,
734                OperationKind::Annotate,
735                "sequence-invalid-annotate",
736            ),
737            (
738                SEQUENCE_ATTR,
739                OperationKind::Release,
740                "sequence-invalid-release",
741            ),
742        ];
743
744        for (attribute, op, value) in invalid_cases {
745            let error = MaterializedResolver
746                .current(&schema, &[datom(attribute, 1, op, value)])
747                .expect_err("documented invalid operation should fail");
748            assert!(matches!(
749                error,
750                ResolveError::InvalidOperationForAttribute { attribute: actual, op: actual_op, .. }
751                    if actual == attribute && actual_op == op
752            ));
753        }
754    }
755
756    #[test]
757    fn claim_release_and_lease_cycles_replay_deterministically() {
758        let schema = schema();
759        let datoms = vec![
760            datom(SCALAR_ATTR, 1, OperationKind::Claim, "worker-a"),
761            datom(SCALAR_ATTR, 2, OperationKind::Release, "worker-a"),
762            datom(SCALAR_ATTR, 3, OperationKind::LeaseOpen, "active"),
763            datom(SCALAR_ATTR, 4, OperationKind::LeaseRenew, "active"),
764            datom(SCALAR_ATTR, 5, OperationKind::LeaseExpire, "active"),
765        ];
766
767        let as_of_claim = MaterializedResolver
768            .as_of(&schema, &datoms, &ElementId::new(1))
769            .expect("resolve claim cut");
770        let as_of_lease = MaterializedResolver
771            .as_of(&schema, &datoms, &ElementId::new(4))
772            .expect("resolve lease-renew cut");
773        let current = MaterializedResolver
774            .current(&schema, &datoms)
775            .expect("resolve current");
776
777        assert_eq!(
778            as_of_claim
779                .entity(&EntityId::new(1))
780                .and_then(|entity| entity.attribute(&SCALAR_ATTR)),
781            Some(&ResolvedValue::Scalar(Some(Value::String(
782                "worker-a".into()
783            ))))
784        );
785        assert_eq!(
786            as_of_lease
787                .entity(&EntityId::new(1))
788                .and_then(|entity| entity.attribute(&SCALAR_ATTR)),
789            Some(&ResolvedValue::Scalar(Some(Value::String("active".into()))))
790        );
791        assert_eq!(
792            current
793                .entity(&EntityId::new(1))
794                .and_then(|entity| entity.attribute(&SCALAR_ATTR)),
795            Some(&ResolvedValue::Scalar(None))
796        );
797    }
798
799    #[test]
800    fn sequence_insert_after_orders_children_by_anchor_then_element() {
801        let schema = schema();
802        let datoms = vec![
803            sequence_insert_after(1, "a", &[]),
804            sequence_insert_after(2, "c", &[1]),
805            sequence_insert_after(3, "b", &[1]),
806        ];
807
808        let current = MaterializedResolver
809            .current(&schema, &datoms)
810            .expect("resolve anchored sequence");
811        assert_eq!(
812            current
813                .entity(&EntityId::new(1))
814                .and_then(|entity| entity.attribute(&SEQUENCE_ATTR)),
815            Some(&ResolvedValue::Sequence(vec![
816                Value::String("a".into()),
817                Value::String("c".into()),
818                Value::String("b".into()),
819            ]))
820        );
821    }
822
823    #[test]
824    fn non_bootstrap_sequence_insert_requires_exactly_one_known_anchor() {
825        let schema = schema();
826
827        let missing_anchor = MaterializedResolver
828            .current(
829                &schema,
830                &[
831                    sequence_insert_after(1, "a", &[]),
832                    sequence_insert_after(2, "b", &[]),
833                ],
834            )
835            .expect_err("non-bootstrap insert without anchor should fail");
836        assert!(matches!(
837            missing_anchor,
838            ResolveError::MissingSequenceAnchor {
839                attribute,
840                element,
841            } if attribute == SEQUENCE_ATTR && element == ElementId::new(2)
842        ));
843
844        let malformed = MaterializedResolver
845            .current(
846                &schema,
847                &[
848                    sequence_insert_after(1, "a", &[]),
849                    sequence_insert_after(2, "b", &[1, 9]),
850                ],
851            )
852            .expect_err("multi-anchor insert should fail");
853        assert!(matches!(
854            malformed,
855            ResolveError::MalformedSequenceAnchor {
856                attribute,
857                element,
858                parent_count,
859            } if attribute == SEQUENCE_ATTR
860                && element == ElementId::new(2)
861                && parent_count == 2
862        ));
863
864        let unknown = MaterializedResolver
865            .current(
866                &schema,
867                &[
868                    sequence_insert_after(1, "a", &[]),
869                    sequence_insert_after(2, "b", &[9]),
870                ],
871            )
872            .expect_err("unknown anchor should fail");
873        assert!(matches!(
874            unknown,
875            ResolveError::UnknownSequenceAnchor {
876                attribute,
877                element,
878                anchor,
879            } if attribute == SEQUENCE_ATTR
880                && element == ElementId::new(2)
881                && anchor == ElementId::new(9)
882        ));
883    }
884}