aether_api/
http.rs

1use crate::{
2    deployment::PilotServiceConfig, ApiError, AppendRequest, AsOfRequest, AuthReloadResponse,
3    CoordinationDeltaReportRequest, CoordinationPilotReportRequest, CurrentStateRequest,
4    ExplainTupleRequest, FederatedExplainReport, FederatedHistoryRequest,
5    FederatedRunDocumentRequest, GetArtifactReferenceRequest, HistoryRequest, KernelService,
6    ParseDocumentRequest, PartitionAppendRequest, PartitionHistoryRequest, PartitionStateRequest,
7    PartitionStatusResponse, PromoteReplicaRequest, RegisterArtifactReferenceRequest,
8    RegisterVectorRecordRequest, ReplicatedAuthorityPartitionService, RunDocumentRequest,
9    SearchVectorsRequest, ServiceMode, ServiceStatusResponse,
10};
11use aether_ast::PolicyContext;
12use axum::{
13    extract::State,
14    http::{header::AUTHORIZATION, HeaderMap, StatusCode},
15    response::{IntoResponse, Response},
16    routing::{get, post},
17    Json, Router,
18};
19use serde::{Deserialize, Serialize};
20use std::{
21    collections::{BTreeSet, HashMap},
22    fs::{self, OpenOptions},
23    io::Write,
24    path::{Path, PathBuf},
25    sync::{Arc, Mutex},
26    time::{SystemTime, UNIX_EPOCH},
27};
28
29#[derive(Clone)]
30pub struct HttpKernelState {
31    service: Arc<Mutex<Box<dyn KernelService + Send>>>,
32    partitioned: Option<Arc<Mutex<ReplicatedAuthorityPartitionService>>>,
33    auth: Arc<Mutex<HttpAuth>>,
34    audit: AuditLog,
35    status: Arc<Mutex<ServiceStatusResponse>>,
36    auth_reload_config_path: Option<PathBuf>,
37}
38
39impl HttpKernelState {
40    pub fn new(service: impl KernelService + Send + 'static) -> Self {
41        Self::with_options(service, HttpKernelOptions::default())
42    }
43
44    pub fn with_partitioned_options(
45        service: impl KernelService + Send + 'static,
46        partitioned: ReplicatedAuthorityPartitionService,
47        options: HttpKernelOptions,
48    ) -> Self {
49        Self::with_optional_partitioned(service, Some(partitioned), options)
50    }
51
52    pub fn with_options(
53        service: impl KernelService + Send + 'static,
54        options: HttpKernelOptions,
55    ) -> Self {
56        Self::with_optional_partitioned(service, None, options)
57    }
58
59    fn with_optional_partitioned(
60        service: impl KernelService + Send + 'static,
61        partitioned: Option<ReplicatedAuthorityPartitionService>,
62        options: HttpKernelOptions,
63    ) -> Self {
64        Self {
65            service: Arc::new(Mutex::new(Box::new(service))),
66            partitioned: partitioned.map(|service| Arc::new(Mutex::new(service))),
67            auth: Arc::new(Mutex::new(HttpAuth::from_config(options.auth))),
68            audit: AuditLog::new(options.audit_log_path),
69            status: Arc::new(Mutex::new(options.service_status.unwrap_or_else(|| {
70                ServiceStatusResponse::single_node(env!("CARGO_PKG_VERSION"), "pilot-v1", "v1")
71            }))),
72            auth_reload_config_path: options.auth_reload_config_path,
73        }
74    }
75
76    fn service(
77        &self,
78    ) -> Result<std::sync::MutexGuard<'_, Box<dyn KernelService + Send>>, HttpError> {
79        self.service.lock().map_err(|_| HttpError::LockPoisoned)
80    }
81
82    fn partitioned_service(
83        &self,
84    ) -> Result<std::sync::MutexGuard<'_, ReplicatedAuthorityPartitionService>, HttpError> {
85        let partitioned = self.partitioned.as_ref().ok_or_else(|| {
86            HttpError::Api(ApiError::Validation(
87                "partitioned prototype is not configured for this service".into(),
88            ))
89        })?;
90        partitioned.lock().map_err(|_| HttpError::LockPoisoned)
91    }
92
93    fn authorize(
94        &self,
95        headers: &HeaderMap,
96        required_scope: AuthScope,
97    ) -> Result<AuthenticatedPrincipal, HttpError> {
98        self.auth
99            .lock()
100            .map_err(|_| HttpError::LockPoisoned)?
101            .authorize(headers, required_scope)
102    }
103
104    fn status_snapshot(&self) -> Result<ServiceStatusResponse, HttpError> {
105        let mut status = self
106            .status
107            .lock()
108            .map(|status| status.clone())
109            .map_err(|_| HttpError::LockPoisoned)?;
110        if let Some(partitioned) = &self.partitioned {
111            let partition_status = partitioned
112                .lock()
113                .map_err(|_| HttpError::LockPoisoned)?
114                .partition_status()
115                .map_err(HttpError::Api)?;
116            status.service_mode = ServiceMode::Partitioned;
117            status.replicas = flatten_replica_status(&partition_status);
118        }
119        Ok(status)
120    }
121
122    fn reload_auth_from_config(&self) -> Result<AuthReloadResponse, HttpError> {
123        let Some(config_path) = &self.auth_reload_config_path else {
124            return Err(HttpError::Api(ApiError::Validation(
125                "auth reload is not configured for this service".into(),
126            )));
127        };
128        let resolved = PilotServiceConfig::load(config_path)
129            .and_then(|config| config.resolve(config_path))
130            .map_err(|error| HttpError::Api(ApiError::Validation(error.to_string())))?;
131        let mut status = self.status.lock().map_err(|_| HttpError::LockPoisoned)?;
132        if status.bind_addr.as_deref() != Some(resolved.bind_addr.as_str())
133            || status.storage.database_path.as_ref() != Some(&resolved.database_path)
134            || status.storage.audit_log_path.as_ref() != Some(&resolved.audit_log_path)
135        {
136            return Err(HttpError::Api(ApiError::Validation(
137                "auth reload cannot change bind or storage paths".into(),
138            )));
139        }
140        {
141            let mut auth = self.auth.lock().map_err(|_| HttpError::LockPoisoned)?;
142            *auth = HttpAuth::from_config(resolved.auth.clone());
143        }
144        status.config_version.clone_from(&resolved.config_version);
145        status.schema_version.clone_from(&resolved.schema_version);
146        status.principals = resolved
147            .token_summaries
148            .iter()
149            .map(|summary| summary.status_summary())
150            .collect();
151        Ok(AuthReloadResponse {
152            reloaded_at_ms: now_millis(),
153            principal_count: status.principals.len(),
154            revoked_count: status
155                .principals
156                .iter()
157                .filter(|principal| principal.revoked)
158                .count(),
159        })
160    }
161
162    fn execute<T, F>(
163        &self,
164        headers: &HeaderMap,
165        method: &'static str,
166        path: &'static str,
167        required_scope: AuthScope,
168        mut context: AuditContext,
169        operation: F,
170    ) -> Result<T, HttpError>
171    where
172        F: FnOnce(
173            &mut dyn KernelService,
174            &AuthenticatedPrincipal,
175            &mut AuditContext,
176        ) -> Result<T, HttpError>,
177    {
178        let principal = match self.authorize(headers, required_scope) {
179            Ok(principal) => principal,
180            Err(error) => {
181                self.audit.record(AuditEntry::for_denied(
182                    method,
183                    path,
184                    error.status_code(),
185                    error.audit_principal(),
186                    None,
187                    None,
188                    required_scope,
189                    error.audit_message(),
190                    context,
191                ));
192                return Err(error);
193            }
194        };
195
196        let result = {
197            let mut service = self.service()?;
198            operation(service.as_mut(), &principal, &mut context)
199        };
200
201        let status = match &result {
202            Ok(_) => StatusCode::OK,
203            Err(error) => error.status_code(),
204        };
205        self.audit.record(AuditEntry::for_request(
206            method,
207            path,
208            status,
209            &principal,
210            required_scope,
211            context,
212        ));
213
214        result
215    }
216
217    fn execute_partitioned<T, F>(
218        &self,
219        headers: &HeaderMap,
220        method: &'static str,
221        path: &'static str,
222        required_scope: AuthScope,
223        context: AuditContext,
224        operation: F,
225    ) -> Result<T, HttpError>
226    where
227        F: FnOnce(
228            &mut ReplicatedAuthorityPartitionService,
229            &AuthenticatedPrincipal,
230            &mut AuditContext,
231        ) -> Result<T, HttpError>,
232    {
233        let principal = match self.authorize(headers, required_scope) {
234            Ok(principal) => principal,
235            Err(error) => {
236                self.audit.record(AuditEntry::for_denied(
237                    method,
238                    path,
239                    error.status_code(),
240                    error.audit_principal(),
241                    None,
242                    None,
243                    required_scope,
244                    error.audit_message(),
245                    context,
246                ));
247                return Err(error);
248            }
249        };
250
251        {
252            let mut service = self.partitioned_service()?;
253            let mut context = context;
254            let result = operation(&mut service, &principal, &mut context);
255            let status = match &result {
256                Ok(_) => StatusCode::OK,
257                Err(error) => error.status_code(),
258            };
259            self.audit.record(AuditEntry::for_request(
260                method,
261                path,
262                status,
263                &principal,
264                required_scope,
265                context,
266            ));
267            result
268        }
269    }
270
271    fn audit_entries(&self, headers: &HeaderMap) -> Result<AuditLogResponse, HttpError> {
272        let principal = match self.authorize(headers, AuthScope::Ops) {
273            Ok(principal) => principal,
274            Err(error) => {
275                self.audit.record(AuditEntry::for_denied(
276                    "GET",
277                    "/v1/audit",
278                    error.status_code(),
279                    error.audit_principal(),
280                    None,
281                    None,
282                    AuthScope::Ops,
283                    error.audit_message(),
284                    AuditContext::default(),
285                ));
286                return Err(error);
287            }
288        };
289
290        let response = AuditLogResponse {
291            entries: self.audit.snapshot()?,
292        };
293        self.audit.record(AuditEntry::for_request(
294            "GET",
295            "/v1/audit",
296            StatusCode::OK,
297            &principal,
298            AuthScope::Ops,
299            AuditContext::default(),
300        ));
301        Ok(response)
302    }
303}
304
305#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
306#[serde(rename_all = "snake_case")]
307pub enum AuthScope {
308    Append,
309    Query,
310    Explain,
311    Ops,
312}
313
314impl AuthScope {
315    fn as_str(self) -> &'static str {
316        match self {
317            Self::Append => "append",
318            Self::Query => "query",
319            Self::Explain => "explain",
320            Self::Ops => "ops",
321        }
322    }
323}
324
325#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
326pub struct HttpAuthConfig {
327    pub tokens: Vec<HttpAccessToken>,
328}
329
330impl HttpAuthConfig {
331    pub fn new() -> Self {
332        Self::default()
333    }
334
335    pub fn with_token(
336        mut self,
337        token: impl Into<String>,
338        principal: impl Into<String>,
339        scopes: impl IntoIterator<Item = AuthScope>,
340    ) -> Self {
341        self.tokens.push(HttpAccessToken {
342            token: token.into(),
343            token_id: String::new(),
344            principal: principal.into(),
345            principal_id: String::new(),
346            scopes: scopes.into_iter().collect(),
347            policy_context: None,
348            source: "inline".into(),
349            revoked: false,
350        });
351        self
352    }
353
354    pub fn with_token_context(
355        mut self,
356        token: impl Into<String>,
357        principal: impl Into<String>,
358        scopes: impl IntoIterator<Item = AuthScope>,
359        policy_context: PolicyContext,
360    ) -> Self {
361        self.tokens.push(HttpAccessToken {
362            token: token.into(),
363            token_id: String::new(),
364            principal: principal.into(),
365            principal_id: String::new(),
366            scopes: scopes.into_iter().collect(),
367            policy_context: normalize_policy_context(Some(policy_context)),
368            source: "inline".into(),
369            revoked: false,
370        });
371        self
372    }
373}
374
375#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
376pub struct HttpAccessToken {
377    pub token: String,
378    #[serde(default)]
379    pub token_id: String,
380    pub principal: String,
381    #[serde(default)]
382    pub principal_id: String,
383    pub scopes: Vec<AuthScope>,
384    pub policy_context: Option<PolicyContext>,
385    #[serde(default)]
386    pub source: String,
387    #[serde(default)]
388    pub revoked: bool,
389}
390
391#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
392pub struct HttpKernelOptions {
393    pub auth: HttpAuthConfig,
394    pub audit_log_path: Option<PathBuf>,
395    pub service_status: Option<ServiceStatusResponse>,
396    pub auth_reload_config_path: Option<PathBuf>,
397}
398
399impl HttpKernelOptions {
400    pub fn new() -> Self {
401        Self::default()
402    }
403
404    pub fn with_auth(mut self, auth: HttpAuthConfig) -> Self {
405        self.auth = auth;
406        self
407    }
408
409    pub fn with_audit_log_path(mut self, path: impl Into<PathBuf>) -> Self {
410        self.audit_log_path = Some(path.into());
411        self
412    }
413
414    pub fn with_service_status(mut self, status: ServiceStatusResponse) -> Self {
415        self.service_status = Some(status);
416        self
417    }
418
419    pub fn with_auth_reload_config_path(mut self, path: impl Into<PathBuf>) -> Self {
420        self.auth_reload_config_path = Some(path.into());
421        self
422    }
423}
424
425#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
426pub struct AuditEntry {
427    pub timestamp_ms: u64,
428    pub principal: String,
429    #[serde(default, skip_serializing_if = "Option::is_none")]
430    pub principal_id: Option<String>,
431    #[serde(default, skip_serializing_if = "Option::is_none")]
432    pub token_id: Option<String>,
433    pub method: String,
434    pub path: String,
435    pub status: u16,
436    pub scope: AuthScope,
437    pub outcome: String,
438    pub detail: Option<String>,
439    pub context: AuditContext,
440}
441
442#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
443pub struct AuditContext {
444    pub temporal_view: Option<String>,
445    pub query_goal: Option<String>,
446    pub tuple_id: Option<u64>,
447    pub requested_element: Option<u64>,
448    pub datom_count: Option<usize>,
449    pub entity_count: Option<usize>,
450    pub row_count: Option<usize>,
451    pub derived_tuple_count: Option<usize>,
452    pub trace_tuple_count: Option<usize>,
453    pub last_element: Option<u64>,
454    pub requested_capabilities: Vec<String>,
455    pub requested_visibilities: Vec<String>,
456    pub granted_capabilities: Vec<String>,
457    pub granted_visibilities: Vec<String>,
458    pub effective_capabilities: Vec<String>,
459    pub effective_visibilities: Vec<String>,
460    pub policy_decision: Option<String>,
461}
462
463impl AuditEntry {
464    fn for_request(
465        method: impl Into<String>,
466        path: impl Into<String>,
467        status: StatusCode,
468        principal: &AuthenticatedPrincipal,
469        scope: AuthScope,
470        context: AuditContext,
471    ) -> Self {
472        Self {
473            timestamp_ms: now_millis(),
474            principal: principal.id.clone(),
475            principal_id: principal.principal_id.clone(),
476            token_id: principal.token_id.clone(),
477            method: method.into(),
478            path: path.into(),
479            status: status.as_u16(),
480            scope,
481            outcome: if status.is_success() {
482                "ok".into()
483            } else {
484                "error".into()
485            },
486            detail: None,
487            context,
488        }
489    }
490
491    #[allow(clippy::too_many_arguments)]
492    fn for_denied(
493        method: impl Into<String>,
494        path: impl Into<String>,
495        status: StatusCode,
496        principal: impl Into<String>,
497        principal_id: Option<String>,
498        token_id: Option<String>,
499        scope: AuthScope,
500        detail: impl Into<String>,
501        context: AuditContext,
502    ) -> Self {
503        Self {
504            timestamp_ms: now_millis(),
505            principal: principal.into(),
506            principal_id,
507            token_id,
508            method: method.into(),
509            path: path.into(),
510            status: status.as_u16(),
511            scope,
512            outcome: if status == StatusCode::UNAUTHORIZED {
513                "unauthorized".into()
514            } else {
515                "forbidden".into()
516            },
517            detail: Some(detail.into()),
518            context,
519        }
520    }
521
522    fn audit_failure(path: &Path, error: &std::io::Error) -> Self {
523        Self {
524            timestamp_ms: now_millis(),
525            principal: "aether".into(),
526            principal_id: None,
527            token_id: None,
528            method: "AUDIT".into(),
529            path: path.display().to_string(),
530            status: StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
531            scope: AuthScope::Ops,
532            outcome: "audit_write_failed".into(),
533            detail: Some(error.to_string()),
534            context: AuditContext::default(),
535        }
536    }
537}
538
539#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
540pub struct AuditLogResponse {
541    pub entries: Vec<AuditEntry>,
542}
543
544#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
545pub struct HealthResponse {
546    pub status: String,
547}
548
549impl Default for HealthResponse {
550    fn default() -> Self {
551        Self {
552            status: "ok".into(),
553        }
554    }
555}
556
557pub fn http_router(service: impl KernelService + Send + 'static) -> Router {
558    http_router_with_options(service, HttpKernelOptions::default())
559}
560
561pub fn http_router_with_partitioned_options(
562    service: impl KernelService + Send + 'static,
563    partitioned: ReplicatedAuthorityPartitionService,
564    options: HttpKernelOptions,
565) -> Router {
566    Router::new()
567        .route("/health", get(health))
568        .route("/v1/status", get(service_status))
569        .route("/v1/history", get(history))
570        .route("/v1/audit", get(audit_log))
571        .route("/v1/admin/auth/reload", post(reload_auth))
572        .route("/v1/append", post(append))
573        .route("/v1/state/current", post(current_state))
574        .route("/v1/state/as-of", post(as_of))
575        .route("/v1/documents/parse", post(parse_document))
576        .route("/v1/documents/run", post(run_document))
577        .route(
578            "/v1/reports/pilot/coordination",
579            post(coordination_pilot_report),
580        )
581        .route(
582            "/v1/reports/pilot/coordination-delta",
583            post(coordination_delta_report),
584        )
585        .route("/v1/explain/tuple", post(explain_tuple))
586        .route("/v1/partitions/status", get(partition_status))
587        .route("/v1/partitions/promote", post(promote_replica))
588        .route("/v1/partitions/append", post(partition_append))
589        .route("/v1/partitions/history", post(partition_history))
590        .route("/v1/partitions/state", post(partition_state))
591        .route("/v1/federated/history", post(federated_history))
592        .route("/v1/federated/run", post(federated_run_document))
593        .route("/v1/federated/report", post(federated_report))
594        .route(
595            "/v1/sidecars/artifacts/register",
596            post(register_artifact_reference),
597        )
598        .route("/v1/sidecars/artifacts/get", post(get_artifact_reference))
599        .route(
600            "/v1/sidecars/vectors/register",
601            post(register_vector_record),
602        )
603        .route("/v1/sidecars/vectors/search", post(search_vectors))
604        .with_state(HttpKernelState::with_partitioned_options(
605            service,
606            partitioned,
607            options,
608        ))
609}
610
611pub fn http_router_with_options(
612    service: impl KernelService + Send + 'static,
613    options: HttpKernelOptions,
614) -> Router {
615    Router::new()
616        .route("/health", get(health))
617        .route("/v1/status", get(service_status))
618        .route("/v1/history", get(history))
619        .route("/v1/audit", get(audit_log))
620        .route("/v1/admin/auth/reload", post(reload_auth))
621        .route("/v1/append", post(append))
622        .route("/v1/state/current", post(current_state))
623        .route("/v1/state/as-of", post(as_of))
624        .route("/v1/documents/parse", post(parse_document))
625        .route("/v1/documents/run", post(run_document))
626        .route(
627            "/v1/reports/pilot/coordination",
628            post(coordination_pilot_report),
629        )
630        .route(
631            "/v1/reports/pilot/coordination-delta",
632            post(coordination_delta_report),
633        )
634        .route("/v1/explain/tuple", post(explain_tuple))
635        .route(
636            "/v1/sidecars/artifacts/register",
637            post(register_artifact_reference),
638        )
639        .route("/v1/sidecars/artifacts/get", post(get_artifact_reference))
640        .route(
641            "/v1/sidecars/vectors/register",
642            post(register_vector_record),
643        )
644        .route("/v1/sidecars/vectors/search", post(search_vectors))
645        .with_state(HttpKernelState::with_options(service, options))
646}
647
648#[derive(Clone, Debug)]
649struct AuthenticatedPrincipal {
650    id: String,
651    principal_id: Option<String>,
652    token_id: Option<String>,
653    policy_context: Option<PolicyContext>,
654    policy_bound: bool,
655}
656
657#[derive(Clone, Default)]
658struct AuditLog {
659    entries: Arc<Mutex<Vec<AuditEntry>>>,
660    path: Option<PathBuf>,
661}
662
663impl AuditLog {
664    fn new(path: Option<PathBuf>) -> Self {
665        Self {
666            entries: Arc::new(Mutex::new(Vec::new())),
667            path,
668        }
669    }
670
671    fn record(&self, entry: AuditEntry) {
672        let mut entries = match self.entries.lock() {
673            Ok(entries) => entries,
674            Err(_) => return,
675        };
676        entries.push(entry.clone());
677
678        if let Some(path) = &self.path {
679            if let Err(error) = append_audit_entry(path, &entry) {
680                entries.push(AuditEntry::audit_failure(path, &error));
681            }
682        }
683    }
684
685    fn snapshot(&self) -> Result<Vec<AuditEntry>, HttpError> {
686        self.entries
687            .lock()
688            .map(|entries| entries.clone())
689            .map_err(|_| HttpError::LockPoisoned)
690    }
691}
692
693#[derive(Clone, Default)]
694struct HttpAuth {
695    tokens: HashMap<String, AuthenticatedToken>,
696}
697
698impl HttpAuth {
699    fn from_config(config: HttpAuthConfig) -> Self {
700        let mut tokens = HashMap::new();
701        for access in config.tokens {
702            let principal_id = if access.principal_id.trim().is_empty() {
703                Some(format!("principal:{}", access.principal))
704            } else {
705                Some(access.principal_id.clone())
706            };
707            let token_id = if access.token_id.trim().is_empty() {
708                Some(format!("token:{}", access.principal))
709            } else {
710                Some(access.token_id.clone())
711            };
712            tokens.insert(
713                access.token,
714                AuthenticatedToken {
715                    principal: access.principal,
716                    principal_id,
717                    token_id,
718                    scopes: access.scopes.into_iter().collect(),
719                    policy_context: access.policy_context,
720                    revoked: access.revoked,
721                },
722            );
723        }
724        Self { tokens }
725    }
726
727    fn authorize(
728        &self,
729        headers: &HeaderMap,
730        required_scope: AuthScope,
731    ) -> Result<AuthenticatedPrincipal, HttpError> {
732        if self.tokens.is_empty() {
733            return Ok(AuthenticatedPrincipal {
734                id: "anonymous".into(),
735                principal_id: None,
736                token_id: None,
737                policy_context: None,
738                policy_bound: false,
739            });
740        }
741
742        let header = headers.get(AUTHORIZATION).ok_or(HttpError::Unauthorized {
743            principal: "anonymous".into(),
744            message: "missing bearer token".into(),
745        })?;
746        let header = header.to_str().map_err(|_| HttpError::Unauthorized {
747            principal: "anonymous".into(),
748            message: "authorization header is not valid UTF-8".into(),
749        })?;
750        let token = header
751            .strip_prefix("Bearer ")
752            .ok_or(HttpError::Unauthorized {
753                principal: "anonymous".into(),
754                message: "authorization header must use Bearer auth".into(),
755            })?;
756
757        let Some(access) = self.tokens.get(token) else {
758            return Err(HttpError::Unauthorized {
759                principal: "anonymous".into(),
760                message: "unknown bearer token".into(),
761            });
762        };
763
764        if !access.scopes.contains(&required_scope) {
765            return Err(HttpError::Forbidden {
766                principal: access.principal.clone(),
767                message: format!("token lacks {} scope", required_scope.as_str()),
768            });
769        }
770        if access.revoked {
771            return Err(HttpError::Forbidden {
772                principal: access.principal.clone(),
773                message: "token is revoked".into(),
774            });
775        }
776
777        Ok(AuthenticatedPrincipal {
778            id: access.principal.clone(),
779            principal_id: access.principal_id.clone(),
780            token_id: access.token_id.clone(),
781            policy_context: access.policy_context.clone(),
782            policy_bound: true,
783        })
784    }
785}
786
787#[derive(Clone, Debug)]
788struct AuthenticatedToken {
789    principal: String,
790    principal_id: Option<String>,
791    token_id: Option<String>,
792    scopes: BTreeSet<AuthScope>,
793    policy_context: Option<PolicyContext>,
794    revoked: bool,
795}
796
797fn normalize_policy_context(policy_context: Option<PolicyContext>) -> Option<PolicyContext> {
798    match policy_context {
799        Some(policy_context) if policy_context.is_empty() => None,
800        other => other,
801    }
802}
803
804fn flatten_replica_status(status: &PartitionStatusResponse) -> Vec<crate::ReplicaStatusSummary> {
805    let mut replicas = Vec::new();
806    for partition in &status.partitions {
807        for replica in &partition.replicas {
808            replicas.push(crate::ReplicaStatusSummary {
809                partition: partition.partition.to_string(),
810                replica_id: replica.replica_id.0,
811                role: match replica.role {
812                    crate::ReplicaRole::Leader => "leader".into(),
813                    crate::ReplicaRole::Follower => "follower".into(),
814                },
815                leader_epoch: replica.leader_epoch.0,
816                applied_element: replica.applied_element.map(|element| element.0),
817                replication_lag: replica.replication_lag,
818                healthy: replica.healthy,
819                detail: replica.detail.clone(),
820            });
821        }
822    }
823    replicas
824}
825
826fn bound_policy_context(
827    principal: &AuthenticatedPrincipal,
828    requested: Option<PolicyContext>,
829) -> Result<Option<PolicyContext>, HttpError> {
830    let requested = normalize_policy_context(requested);
831    if !principal.policy_bound {
832        return Ok(requested);
833    }
834
835    let granted = normalize_policy_context(principal.policy_context.clone());
836    match (granted, requested) {
837        (None, None) => Ok(None),
838        (None, Some(_)) => Err(HttpError::Forbidden {
839            principal: principal.id.clone(),
840            message: "requested policy context exceeds token policy".into(),
841        }),
842        (Some(granted), None) => Ok(Some(granted)),
843        (Some(granted), Some(requested)) => {
844            if requested.subset_of(&granted) {
845                Ok(Some(requested))
846            } else {
847                Err(HttpError::Forbidden {
848                    principal: principal.id.clone(),
849                    message: "requested policy context exceeds token policy".into(),
850                })
851            }
852        }
853    }
854}
855
856fn write_policy_context_fields(
857    target_capabilities: &mut Vec<String>,
858    target_visibilities: &mut Vec<String>,
859    policy_context: Option<&PolicyContext>,
860) {
861    target_capabilities.clear();
862    target_visibilities.clear();
863    if let Some(policy_context) = policy_context {
864        target_capabilities.extend(policy_context.capabilities.iter().cloned());
865        target_visibilities.extend(policy_context.visibilities.iter().cloned());
866    }
867}
868
869fn apply_policy_binding(
870    principal: &AuthenticatedPrincipal,
871    requested: Option<PolicyContext>,
872    context: &mut AuditContext,
873) -> Result<Option<PolicyContext>, HttpError> {
874    let requested = normalize_policy_context(requested);
875    write_policy_context_fields(
876        &mut context.requested_capabilities,
877        &mut context.requested_visibilities,
878        requested.as_ref(),
879    );
880    write_policy_context_fields(
881        &mut context.granted_capabilities,
882        &mut context.granted_visibilities,
883        principal.policy_context.as_ref(),
884    );
885
886    match bound_policy_context(principal, requested.clone()) {
887        Ok(effective) => {
888            write_policy_context_fields(
889                &mut context.effective_capabilities,
890                &mut context.effective_visibilities,
891                effective.as_ref(),
892            );
893            context.policy_decision = Some(
894                match (
895                    normalize_policy_context(principal.policy_context.clone()),
896                    requested,
897                    effective.clone(),
898                ) {
899                    (None, None, None) => "public".into(),
900                    (None, Some(_), Some(_)) => "request_supplied".into(),
901                    (Some(_), None, Some(_)) => "token_default".into(),
902                    (Some(granted), Some(requested), Some(_)) if requested == granted => {
903                        "request_exact".into()
904                    }
905                    (Some(_), Some(_), Some(_)) => "request_narrowed".into(),
906                    _ => "public".into(),
907                },
908            );
909            Ok(effective)
910        }
911        Err(error) => {
912            context.policy_decision = Some("denied_escalation".into());
913            Err(error)
914        }
915    }
916}
917
918#[derive(Debug)]
919enum HttpError {
920    Api(ApiError),
921    Unauthorized { principal: String, message: String },
922    Forbidden { principal: String, message: String },
923    LockPoisoned,
924}
925
926impl HttpError {
927    fn status_code(&self) -> StatusCode {
928        match self {
929            Self::Api(error) => status_for_api_error(error),
930            Self::Unauthorized { .. } => StatusCode::UNAUTHORIZED,
931            Self::Forbidden { .. } => StatusCode::FORBIDDEN,
932            Self::LockPoisoned => StatusCode::INTERNAL_SERVER_ERROR,
933        }
934    }
935
936    fn audit_principal(&self) -> String {
937        match self {
938            Self::Unauthorized { principal, .. } | Self::Forbidden { principal, .. } => {
939                principal.clone()
940            }
941            Self::Api(_) | Self::LockPoisoned => "aether".into(),
942        }
943    }
944
945    fn audit_message(&self) -> String {
946        match self {
947            Self::Api(error) => error.to_string(),
948            Self::Unauthorized { message, .. } | Self::Forbidden { message, .. } => message.clone(),
949            Self::LockPoisoned => "internal service state is unavailable".into(),
950        }
951    }
952}
953
954impl From<ApiError> for HttpError {
955    fn from(value: ApiError) -> Self {
956        Self::Api(value)
957    }
958}
959
960#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
961struct ErrorBody {
962    error: String,
963}
964
965impl IntoResponse for HttpError {
966    fn into_response(self) -> Response {
967        let status = self.status_code();
968        let error = match self {
969            Self::Api(error) => error.to_string(),
970            Self::Unauthorized { message, .. } | Self::Forbidden { message, .. } => message,
971            Self::LockPoisoned => "internal service state is unavailable".into(),
972        };
973
974        (status, Json(ErrorBody { error })).into_response()
975    }
976}
977
978fn status_for_api_error(error: &ApiError) -> StatusCode {
979    match error {
980        ApiError::Validation(_)
981        | ApiError::Sidecar(_)
982        | ApiError::Resolve(_)
983        | ApiError::Parse(_)
984        | ApiError::Compile(_)
985        | ApiError::Runtime(_)
986        | ApiError::Explain(_) => StatusCode::BAD_REQUEST,
987        ApiError::Journal(_) => StatusCode::CONFLICT,
988    }
989}
990
991async fn health() -> Json<HealthResponse> {
992    Json(HealthResponse::default())
993}
994
995async fn service_status(
996    State(state): State<HttpKernelState>,
997    headers: HeaderMap,
998) -> Result<Json<ServiceStatusResponse>, HttpError> {
999    let principal = match state.authorize(&headers, AuthScope::Ops) {
1000        Ok(principal) => principal,
1001        Err(error) => {
1002            state.audit.record(AuditEntry::for_denied(
1003                "GET",
1004                "/v1/status",
1005                error.status_code(),
1006                error.audit_principal(),
1007                None,
1008                None,
1009                AuthScope::Ops,
1010                error.audit_message(),
1011                AuditContext {
1012                    temporal_view: Some("service_status".into()),
1013                    ..Default::default()
1014                },
1015            ));
1016            return Err(error);
1017        }
1018    };
1019    let response = state.status_snapshot()?;
1020    state.audit.record(AuditEntry::for_request(
1021        "GET",
1022        "/v1/status",
1023        StatusCode::OK,
1024        &principal,
1025        AuthScope::Ops,
1026        AuditContext {
1027            temporal_view: Some("service_status".into()),
1028            ..Default::default()
1029        },
1030    ));
1031    Ok(Json(response))
1032}
1033
1034async fn history(
1035    State(state): State<HttpKernelState>,
1036    headers: HeaderMap,
1037) -> Result<Json<crate::HistoryResponse>, HttpError> {
1038    let request_context = AuditContext {
1039        temporal_view: Some("history".into()),
1040        ..Default::default()
1041    };
1042    let response = state.execute(
1043        &headers,
1044        "GET",
1045        "/v1/history",
1046        AuthScope::Ops,
1047        request_context.clone(),
1048        |service, principal, context| {
1049            let policy_context = apply_policy_binding(principal, None, context)?;
1050            let response = service
1051                .history(HistoryRequest { policy_context })
1052                .map_err(HttpError::Api)?;
1053            context.datom_count = Some(response.datoms.len());
1054            context.last_element = response.datoms.last().map(|datom| datom.element.0);
1055            Ok(response)
1056        },
1057    )?;
1058    Ok(Json(response))
1059}
1060
1061async fn audit_log(
1062    State(state): State<HttpKernelState>,
1063    headers: HeaderMap,
1064) -> Result<Json<AuditLogResponse>, HttpError> {
1065    Ok(Json(state.audit_entries(&headers)?))
1066}
1067
1068async fn reload_auth(
1069    State(state): State<HttpKernelState>,
1070    headers: HeaderMap,
1071) -> Result<Json<AuthReloadResponse>, HttpError> {
1072    let principal = match state.authorize(&headers, AuthScope::Ops) {
1073        Ok(principal) => principal,
1074        Err(error) => {
1075            state.audit.record(AuditEntry::for_denied(
1076                "POST",
1077                "/v1/admin/auth/reload",
1078                error.status_code(),
1079                error.audit_principal(),
1080                None,
1081                None,
1082                AuthScope::Ops,
1083                error.audit_message(),
1084                AuditContext {
1085                    temporal_view: Some("auth_reload".into()),
1086                    ..Default::default()
1087                },
1088            ));
1089            return Err(error);
1090        }
1091    };
1092    let response = state.reload_auth_from_config()?;
1093    state.audit.record(AuditEntry::for_request(
1094        "POST",
1095        "/v1/admin/auth/reload",
1096        StatusCode::OK,
1097        &principal,
1098        AuthScope::Ops,
1099        AuditContext {
1100            temporal_view: Some("auth_reload".into()),
1101            ..Default::default()
1102        },
1103    ));
1104    Ok(Json(response))
1105}
1106
1107async fn append(
1108    State(state): State<HttpKernelState>,
1109    headers: HeaderMap,
1110    Json(request): Json<AppendRequest>,
1111) -> Result<Json<crate::AppendResponse>, HttpError> {
1112    let request_context = audit_context_for_append(&request);
1113    let response = state.execute(
1114        &headers,
1115        "POST",
1116        "/v1/append",
1117        AuthScope::Append,
1118        request_context.clone(),
1119        |service, _principal, _context| {
1120            let response = service.append(request).map_err(HttpError::Api)?;
1121            Ok(response)
1122        },
1123    )?;
1124    Ok(Json(response))
1125}
1126
1127async fn current_state(
1128    State(state): State<HttpKernelState>,
1129    headers: HeaderMap,
1130    Json(request): Json<CurrentStateRequest>,
1131) -> Result<Json<crate::CurrentStateResponse>, HttpError> {
1132    let request_context = AuditContext {
1133        temporal_view: Some("current".into()),
1134        datom_count: Some(request.datoms.len()),
1135        ..Default::default()
1136    };
1137    let response = state.execute(
1138        &headers,
1139        "POST",
1140        "/v1/state/current",
1141        AuthScope::Query,
1142        request_context.clone(),
1143        |service, principal, context| {
1144            let mut request = request;
1145            request.policy_context =
1146                apply_policy_binding(principal, request.policy_context, context)?;
1147            let response = service.current_state(request).map_err(HttpError::Api)?;
1148            context.entity_count = Some(response.state.entities.len());
1149            context.last_element = response.state.as_of.map(|element| element.0);
1150            Ok(response)
1151        },
1152    )?;
1153    Ok(Json(response))
1154}
1155
1156async fn as_of(
1157    State(state): State<HttpKernelState>,
1158    headers: HeaderMap,
1159    Json(request): Json<AsOfRequest>,
1160) -> Result<Json<crate::AsOfResponse>, HttpError> {
1161    let request_context = AuditContext {
1162        temporal_view: Some(format!("as_of(e{})", request.at.0)),
1163        requested_element: Some(request.at.0),
1164        datom_count: Some(request.datoms.len()),
1165        ..Default::default()
1166    };
1167    let response = state.execute(
1168        &headers,
1169        "POST",
1170        "/v1/state/as-of",
1171        AuthScope::Query,
1172        request_context.clone(),
1173        |service, principal, context| {
1174            let mut request = request;
1175            request.policy_context =
1176                apply_policy_binding(principal, request.policy_context, context)?;
1177            let response = service.as_of(request).map_err(HttpError::Api)?;
1178            context.entity_count = Some(response.state.entities.len());
1179            context.last_element = response.state.as_of.map(|element| element.0);
1180            Ok(response)
1181        },
1182    )?;
1183    Ok(Json(response))
1184}
1185
1186async fn parse_document(
1187    State(state): State<HttpKernelState>,
1188    headers: HeaderMap,
1189    Json(request): Json<ParseDocumentRequest>,
1190) -> Result<Json<crate::ParseDocumentResponse>, HttpError> {
1191    let request_context = audit_context_for_document(&request.dsl);
1192    let response = state.execute(
1193        &headers,
1194        "POST",
1195        "/v1/documents/parse",
1196        AuthScope::Query,
1197        request_context.clone(),
1198        |service, _principal, _context| {
1199            let response = service.parse_document(request).map_err(HttpError::Api)?;
1200            Ok(response)
1201        },
1202    )?;
1203    Ok(Json(response))
1204}
1205
1206async fn run_document(
1207    State(state): State<HttpKernelState>,
1208    headers: HeaderMap,
1209    Json(request): Json<RunDocumentRequest>,
1210) -> Result<Json<crate::RunDocumentResponse>, HttpError> {
1211    let request_context = audit_context_for_document(&request.dsl);
1212    let response = state.execute(
1213        &headers,
1214        "POST",
1215        "/v1/documents/run",
1216        AuthScope::Query,
1217        request_context.clone(),
1218        |service, principal, context| {
1219            let mut request = request;
1220            request.policy_context =
1221                apply_policy_binding(principal, request.policy_context, context)?;
1222            let response = service.run_document(request).map_err(HttpError::Api)?;
1223            context.entity_count = Some(response.state.entities.len());
1224            context.last_element = response.state.as_of.map(|element| element.0);
1225            context.derived_tuple_count = Some(response.derived.tuples.len());
1226            context.row_count = response.query.as_ref().map(|query| query.rows.len());
1227            Ok(response)
1228        },
1229    )?;
1230    Ok(Json(response))
1231}
1232
1233async fn coordination_pilot_report(
1234    State(state): State<HttpKernelState>,
1235    headers: HeaderMap,
1236    Json(request): Json<CoordinationPilotReportRequest>,
1237) -> Result<Json<crate::CoordinationPilotReport>, HttpError> {
1238    let request_context = AuditContext {
1239        temporal_view: Some("coordination_pilot_report".into()),
1240        ..Default::default()
1241    };
1242    let response = state.execute(
1243        &headers,
1244        "POST",
1245        "/v1/reports/pilot/coordination",
1246        AuthScope::Query,
1247        request_context.clone(),
1248        |service, principal, context| {
1249            let mut request = request;
1250            request.policy_context =
1251                apply_policy_binding(principal, request.policy_context, context)?;
1252            let response = service
1253                .coordination_pilot_report(request)
1254                .map_err(HttpError::Api)?;
1255            context.datom_count = Some(response.history_len);
1256            context.row_count = Some(
1257                response.pre_heartbeat_authorized.len()
1258                    + response.as_of_authorized.len()
1259                    + response.live_heartbeats.len()
1260                    + response.current_authorized.len()
1261                    + response.claimable.len()
1262                    + response.accepted_outcomes.len()
1263                    + response.rejected_outcomes.len(),
1264            );
1265            context.trace_tuple_count = response.trace.as_ref().map(|trace| trace.tuple_count);
1266            context.tuple_id = response.trace.as_ref().map(|trace| trace.root.0);
1267            Ok(response)
1268        },
1269    )?;
1270    Ok(Json(response))
1271}
1272
1273async fn coordination_delta_report(
1274    State(state): State<HttpKernelState>,
1275    headers: HeaderMap,
1276    Json(request): Json<CoordinationDeltaReportRequest>,
1277) -> Result<Json<crate::CoordinationDeltaReport>, HttpError> {
1278    let request_context = AuditContext {
1279        temporal_view: Some("coordination_delta_report".into()),
1280        ..Default::default()
1281    };
1282    let response = state.execute(
1283        &headers,
1284        "POST",
1285        "/v1/reports/pilot/coordination-delta",
1286        AuthScope::Query,
1287        request_context,
1288        |service, principal, context| {
1289            let mut request = request;
1290            request.policy_context =
1291                apply_policy_binding(principal, request.policy_context, context)?;
1292            let response = service
1293                .coordination_delta_report(request)
1294                .map_err(HttpError::Api)?;
1295            context.datom_count = Some(response.right_history_len);
1296            context.row_count = Some(
1297                response.current_authorized.added.len()
1298                    + response.current_authorized.removed.len()
1299                    + response.current_authorized.changed.len()
1300                    + response.claimable.added.len()
1301                    + response.claimable.removed.len()
1302                    + response.claimable.changed.len()
1303                    + response.live_heartbeats.added.len()
1304                    + response.live_heartbeats.removed.len()
1305                    + response.live_heartbeats.changed.len()
1306                    + response.accepted_outcomes.added.len()
1307                    + response.accepted_outcomes.removed.len()
1308                    + response.accepted_outcomes.changed.len()
1309                    + response.rejected_outcomes.added.len()
1310                    + response.rejected_outcomes.removed.len()
1311                    + response.rejected_outcomes.changed.len(),
1312            );
1313            Ok(response)
1314        },
1315    )?;
1316    Ok(Json(response))
1317}
1318
1319async fn partition_status(
1320    State(state): State<HttpKernelState>,
1321    headers: HeaderMap,
1322) -> Result<Json<PartitionStatusResponse>, HttpError> {
1323    let response = state.execute_partitioned(
1324        &headers,
1325        "GET",
1326        "/v1/partitions/status",
1327        AuthScope::Ops,
1328        AuditContext {
1329            temporal_view: Some("partition_status".into()),
1330            ..Default::default()
1331        },
1332        |service, _principal, context| {
1333            let response = service.partition_status().map_err(HttpError::Api)?;
1334            context.entity_count = Some(response.partitions.len());
1335            context.row_count = Some(
1336                response
1337                    .partitions
1338                    .iter()
1339                    .map(|partition| partition.replicas.len())
1340                    .sum(),
1341            );
1342            Ok(response)
1343        },
1344    )?;
1345    Ok(Json(response))
1346}
1347
1348async fn promote_replica(
1349    State(state): State<HttpKernelState>,
1350    headers: HeaderMap,
1351    Json(request): Json<PromoteReplicaRequest>,
1352) -> Result<Json<crate::PromoteReplicaResponse>, HttpError> {
1353    let request_context = AuditContext {
1354        temporal_view: Some(format!("partition({})", request.partition)),
1355        ..Default::default()
1356    };
1357    let response = state.execute_partitioned(
1358        &headers,
1359        "POST",
1360        "/v1/partitions/promote",
1361        AuthScope::Ops,
1362        request_context,
1363        |service, _principal, context| {
1364            let response = service.promote_replica(request).map_err(HttpError::Api)?;
1365            context.requested_element = Some(response.leader_epoch.0);
1366            Ok(response)
1367        },
1368    )?;
1369    Ok(Json(response))
1370}
1371
1372async fn partition_append(
1373    State(state): State<HttpKernelState>,
1374    headers: HeaderMap,
1375    Json(request): Json<PartitionAppendRequest>,
1376) -> Result<Json<crate::PartitionAppendResponse>, HttpError> {
1377    let request_context = AuditContext {
1378        temporal_view: Some(format!("partition({})", request.partition)),
1379        datom_count: Some(request.datoms.len()),
1380        last_element: request.datoms.last().map(|datom| datom.element.0),
1381        ..Default::default()
1382    };
1383    let response = state.execute_partitioned(
1384        &headers,
1385        "POST",
1386        "/v1/partitions/append",
1387        AuthScope::Append,
1388        request_context,
1389        |service, _principal, context| {
1390            let response = service.append_partition(request).map_err(HttpError::Api)?;
1391            context.requested_element = response.leader_epoch.as_ref().map(|epoch| epoch.0);
1392            Ok(response)
1393        },
1394    )?;
1395    Ok(Json(response))
1396}
1397
1398async fn partition_history(
1399    State(state): State<HttpKernelState>,
1400    headers: HeaderMap,
1401    Json(request): Json<PartitionHistoryRequest>,
1402) -> Result<Json<crate::PartitionHistoryResponse>, HttpError> {
1403    let request_context = AuditContext {
1404        temporal_view: Some(request.cut.to_string()),
1405        requested_element: request.cut.as_of.map(|element| element.0),
1406        ..Default::default()
1407    };
1408    let response = state.execute_partitioned(
1409        &headers,
1410        "POST",
1411        "/v1/partitions/history",
1412        AuthScope::Query,
1413        request_context,
1414        |service, principal, context| {
1415            let mut request = request;
1416            request.policy_context =
1417                apply_policy_binding(principal, request.policy_context, context)?;
1418            let response = service.partition_history(request).map_err(HttpError::Api)?;
1419            context.datom_count = Some(response.datoms.len());
1420            context.entity_count = Some(
1421                response
1422                    .datoms
1423                    .iter()
1424                    .map(|datom| datom.entity)
1425                    .collect::<BTreeSet<_>>()
1426                    .len(),
1427            );
1428            context.last_element = response.datoms.last().map(|datom| datom.element.0);
1429            Ok(response)
1430        },
1431    )?;
1432    Ok(Json(response))
1433}
1434
1435async fn partition_state(
1436    State(state): State<HttpKernelState>,
1437    headers: HeaderMap,
1438    Json(request): Json<PartitionStateRequest>,
1439) -> Result<Json<crate::PartitionStateResponse>, HttpError> {
1440    let request_context = AuditContext {
1441        temporal_view: Some(request.cut.to_string()),
1442        requested_element: request.cut.as_of.map(|element| element.0),
1443        ..Default::default()
1444    };
1445    let response = state.execute_partitioned(
1446        &headers,
1447        "POST",
1448        "/v1/partitions/state",
1449        AuthScope::Query,
1450        request_context,
1451        |service, principal, context| {
1452            let mut request = request;
1453            request.policy_context =
1454                apply_policy_binding(principal, request.policy_context, context)?;
1455            let response = service.partition_state(request).map_err(HttpError::Api)?;
1456            context.entity_count = Some(response.state.entities.len());
1457            context.last_element = response.cut.as_of.map(|element| element.0);
1458            Ok(response)
1459        },
1460    )?;
1461    Ok(Json(response))
1462}
1463
1464async fn federated_history(
1465    State(state): State<HttpKernelState>,
1466    headers: HeaderMap,
1467    Json(request): Json<FederatedHistoryRequest>,
1468) -> Result<Json<crate::FederatedHistoryResponse>, HttpError> {
1469    let request_context = AuditContext {
1470        temporal_view: Some("federated_history".into()),
1471        ..Default::default()
1472    };
1473    let response = state.execute_partitioned(
1474        &headers,
1475        "POST",
1476        "/v1/federated/history",
1477        AuthScope::Query,
1478        request_context,
1479        |service, principal, context| {
1480            let mut request = request;
1481            request.policy_context =
1482                apply_policy_binding(principal, request.policy_context, context)?;
1483            let response = service.federated_history(request).map_err(HttpError::Api)?;
1484            context.datom_count = Some(
1485                response
1486                    .partitions
1487                    .iter()
1488                    .map(|partition| partition.datoms.len())
1489                    .sum(),
1490            );
1491            context.entity_count = Some(response.partitions.len());
1492            Ok(response)
1493        },
1494    )?;
1495    Ok(Json(response))
1496}
1497
1498async fn federated_run_document(
1499    State(state): State<HttpKernelState>,
1500    headers: HeaderMap,
1501    Json(request): Json<FederatedRunDocumentRequest>,
1502) -> Result<Json<crate::FederatedRunDocumentResponse>, HttpError> {
1503    let request_context = AuditContext {
1504        temporal_view: Some("federated_run_document".into()),
1505        ..Default::default()
1506    };
1507    let response = state.execute_partitioned(
1508        &headers,
1509        "POST",
1510        "/v1/federated/run",
1511        AuthScope::Query,
1512        request_context,
1513        |service, principal, context| {
1514            let mut request = request;
1515            request.policy_context =
1516                apply_policy_binding(principal, request.policy_context, context)?;
1517            let response = service
1518                .federated_run_document(request)
1519                .map_err(HttpError::Api)?;
1520            context.entity_count = Some(response.cut.cuts.len());
1521            context.row_count = Some(
1522                response
1523                    .run
1524                    .query
1525                    .as_ref()
1526                    .map(|query| query.rows.len())
1527                    .unwrap_or(0),
1528            );
1529            context.derived_tuple_count = Some(response.run.derived.tuples.len());
1530            Ok(response)
1531        },
1532    )?;
1533    Ok(Json(response))
1534}
1535
1536async fn federated_report(
1537    State(state): State<HttpKernelState>,
1538    headers: HeaderMap,
1539    Json(request): Json<FederatedRunDocumentRequest>,
1540) -> Result<Json<FederatedExplainReport>, HttpError> {
1541    let request_context = AuditContext {
1542        temporal_view: Some("federated_report".into()),
1543        ..Default::default()
1544    };
1545    let response = state.execute_partitioned(
1546        &headers,
1547        "POST",
1548        "/v1/federated/report",
1549        AuthScope::Explain,
1550        request_context,
1551        |service, principal, context| {
1552            let mut request = request;
1553            request.policy_context =
1554                apply_policy_binding(principal, request.policy_context, context)?;
1555            let response = service
1556                .build_federated_explain_report(request)
1557                .map_err(HttpError::Api)?;
1558            context.entity_count = Some(response.cut.cuts.len());
1559            context.row_count = Some(
1560                response.primary_query.len()
1561                    + response
1562                        .named_queries
1563                        .iter()
1564                        .map(|query| query.rows.len())
1565                        .sum::<usize>(),
1566            );
1567            context.trace_tuple_count =
1568                Some(response.traces.iter().map(|trace| trace.tuple_count).sum());
1569            context.tuple_id = response.traces.first().map(|trace| trace.root.0);
1570            Ok(response)
1571        },
1572    )?;
1573    Ok(Json(response))
1574}
1575
1576async fn explain_tuple(
1577    State(state): State<HttpKernelState>,
1578    headers: HeaderMap,
1579    Json(request): Json<ExplainTupleRequest>,
1580) -> Result<Json<crate::ExplainTupleResponse>, HttpError> {
1581    let request_context = AuditContext {
1582        tuple_id: Some(request.tuple_id.0),
1583        ..Default::default()
1584    };
1585    let response = state.execute(
1586        &headers,
1587        "POST",
1588        "/v1/explain/tuple",
1589        AuthScope::Explain,
1590        request_context.clone(),
1591        |service, principal, context| {
1592            let mut request = request;
1593            request.policy_context =
1594                apply_policy_binding(principal, request.policy_context, context)?;
1595            let response = service.explain_tuple(request).map_err(HttpError::Api)?;
1596            context.trace_tuple_count = Some(response.trace.tuples.len());
1597            Ok(response)
1598        },
1599    )?;
1600    Ok(Json(response))
1601}
1602
1603async fn register_artifact_reference(
1604    State(state): State<HttpKernelState>,
1605    headers: HeaderMap,
1606    Json(request): Json<RegisterArtifactReferenceRequest>,
1607) -> Result<Json<crate::RegisterArtifactReferenceResponse>, HttpError> {
1608    let request_context = AuditContext {
1609        temporal_view: Some("sidecar_artifact_register".into()),
1610        requested_element: Some(request.reference.registered_at.0),
1611        ..Default::default()
1612    };
1613    let response = state.execute(
1614        &headers,
1615        "POST",
1616        "/v1/sidecars/artifacts/register",
1617        AuthScope::Append,
1618        request_context.clone(),
1619        |service, _principal, _context| {
1620            let response = service
1621                .register_artifact_reference(request)
1622                .map_err(HttpError::Api)?;
1623            Ok(response)
1624        },
1625    )?;
1626    Ok(Json(response))
1627}
1628
1629async fn get_artifact_reference(
1630    State(state): State<HttpKernelState>,
1631    headers: HeaderMap,
1632    Json(request): Json<GetArtifactReferenceRequest>,
1633) -> Result<Json<crate::GetArtifactReferenceResponse>, HttpError> {
1634    let request_context = AuditContext {
1635        temporal_view: Some("sidecar_artifact_lookup".into()),
1636        ..Default::default()
1637    };
1638    let response = state.execute(
1639        &headers,
1640        "POST",
1641        "/v1/sidecars/artifacts/get",
1642        AuthScope::Query,
1643        request_context.clone(),
1644        |service, principal, context| {
1645            let mut request = request;
1646            request.policy_context =
1647                apply_policy_binding(principal, request.policy_context, context)?;
1648            let response = service
1649                .get_artifact_reference(request)
1650                .map_err(HttpError::Api)?;
1651            Ok(response)
1652        },
1653    )?;
1654    Ok(Json(response))
1655}
1656
1657async fn register_vector_record(
1658    State(state): State<HttpKernelState>,
1659    headers: HeaderMap,
1660    Json(request): Json<RegisterVectorRecordRequest>,
1661) -> Result<Json<crate::RegisterVectorRecordResponse>, HttpError> {
1662    let request_context = AuditContext {
1663        temporal_view: Some("sidecar_vector_register".into()),
1664        requested_element: Some(request.record.registered_at.0),
1665        ..Default::default()
1666    };
1667    let response = state.execute(
1668        &headers,
1669        "POST",
1670        "/v1/sidecars/vectors/register",
1671        AuthScope::Append,
1672        request_context.clone(),
1673        |service, _principal, _context| {
1674            let response = service
1675                .register_vector_record(request)
1676                .map_err(HttpError::Api)?;
1677            Ok(response)
1678        },
1679    )?;
1680    Ok(Json(response))
1681}
1682
1683async fn search_vectors(
1684    State(state): State<HttpKernelState>,
1685    headers: HeaderMap,
1686    Json(request): Json<SearchVectorsRequest>,
1687) -> Result<Json<crate::SearchVectorsResponse>, HttpError> {
1688    let request_context = AuditContext {
1689        temporal_view: Some("sidecar_vector_search".into()),
1690        requested_element: request.as_of.map(|element| element.0),
1691        ..Default::default()
1692    };
1693    let response = state.execute(
1694        &headers,
1695        "POST",
1696        "/v1/sidecars/vectors/search",
1697        AuthScope::Query,
1698        request_context.clone(),
1699        |service, principal, context| {
1700            let mut request = request;
1701            request.policy_context =
1702                apply_policy_binding(principal, request.policy_context, context)?;
1703            let response = service.search_vectors(request).map_err(HttpError::Api)?;
1704            context.row_count = Some(response.matches.len());
1705            Ok(response)
1706        },
1707    )?;
1708    Ok(Json(response))
1709}
1710
1711fn audit_context_for_append(request: &AppendRequest) -> AuditContext {
1712    AuditContext {
1713        datom_count: Some(request.datoms.len()),
1714        last_element: request.datoms.last().map(|datom| datom.element.0),
1715        ..Default::default()
1716    }
1717}
1718
1719fn audit_context_for_document(dsl: &str) -> AuditContext {
1720    let summary = summarize_document_dsl(dsl);
1721    AuditContext {
1722        temporal_view: summary.temporal_view,
1723        query_goal: summary.query_goal,
1724        requested_element: summary.requested_element,
1725        ..Default::default()
1726    }
1727}
1728
1729#[derive(Default)]
1730struct DocumentAuditSummary {
1731    temporal_view: Option<String>,
1732    query_goal: Option<String>,
1733    requested_element: Option<u64>,
1734}
1735
1736fn summarize_document_dsl(dsl: &str) -> DocumentAuditSummary {
1737    let mut summary = DocumentAuditSummary::default();
1738    let mut in_query = false;
1739
1740    for line in dsl.lines() {
1741        let trimmed = line.trim();
1742        if trimmed.is_empty() {
1743            continue;
1744        }
1745        if !in_query {
1746            if trimmed.starts_with("query") && trimmed.ends_with('{') {
1747                in_query = true;
1748            }
1749            continue;
1750        }
1751
1752        if trimmed == "}" {
1753            break;
1754        }
1755
1756        if summary.temporal_view.is_none() {
1757            if trimmed == "current" {
1758                summary.temporal_view = Some("current".into());
1759                continue;
1760            }
1761            if let Some(element) = trimmed.strip_prefix("as_of ") {
1762                summary.temporal_view = Some(format!("as_of({})", element.trim()));
1763                summary.requested_element = element
1764                    .trim()
1765                    .strip_prefix('e')
1766                    .and_then(|value| value.parse::<u64>().ok());
1767                continue;
1768            }
1769        }
1770
1771        if summary.query_goal.is_none() {
1772            if let Some(goal) = trimmed
1773                .strip_prefix("goal ")
1774                .or_else(|| trimmed.strip_prefix("find "))
1775            {
1776                summary.query_goal = Some(goal.trim().to_string());
1777            }
1778        }
1779    }
1780
1781    summary
1782}
1783
1784fn append_audit_entry(path: &Path, entry: &AuditEntry) -> Result<(), std::io::Error> {
1785    if let Some(parent) = path.parent() {
1786        fs::create_dir_all(parent)?;
1787    }
1788
1789    let mut file = OpenOptions::new().create(true).append(true).open(path)?;
1790    let json =
1791        serde_json::to_string(entry).map_err(|error| std::io::Error::other(error.to_string()))?;
1792    file.write_all(json.as_bytes())?;
1793    file.write_all(b"\n")?;
1794    Ok(())
1795}
1796
1797fn now_millis() -> u64 {
1798    let duration = SystemTime::now()
1799        .duration_since(UNIX_EPOCH)
1800        .unwrap_or_default();
1801    duration.as_millis().min(u128::from(u64::MAX)) as u64
1802}