1use crate::{
2 deployment::PilotServiceConfig, ApiError, AppendRequest, AsOfRequest, AuthReloadResponse,
3 CoordinationDeltaReportRequest, CoordinationPilotReportRequest, CurrentStateRequest,
4 ExplainTupleRequest, FederatedExplainReport, FederatedHistoryRequest,
5 FederatedRunDocumentRequest, GetArtifactReferenceRequest, HistoryRequest, KernelService,
6 ParseDocumentRequest, PartitionAppendRequest, PartitionHistoryRequest, PartitionStateRequest,
7 PartitionStatusResponse, PromoteReplicaRequest, RegisterArtifactReferenceRequest,
8 RegisterVectorRecordRequest, ReplicatedAuthorityPartitionService, RunDocumentRequest,
9 SearchVectorsRequest, ServiceMode, ServiceStatusResponse,
10};
11use aether_ast::PolicyContext;
12use axum::{
13 extract::State,
14 http::{header::AUTHORIZATION, HeaderMap, StatusCode},
15 response::{IntoResponse, Response},
16 routing::{get, post},
17 Json, Router,
18};
19use serde::{Deserialize, Serialize};
20use std::{
21 collections::{BTreeSet, HashMap},
22 fs::{self, OpenOptions},
23 io::Write,
24 path::{Path, PathBuf},
25 sync::{Arc, Mutex},
26 time::{SystemTime, UNIX_EPOCH},
27};
28
29#[derive(Clone)]
30pub struct HttpKernelState {
31 service: Arc<Mutex<Box<dyn KernelService + Send>>>,
32 partitioned: Option<Arc<Mutex<ReplicatedAuthorityPartitionService>>>,
33 auth: Arc<Mutex<HttpAuth>>,
34 audit: AuditLog,
35 status: Arc<Mutex<ServiceStatusResponse>>,
36 auth_reload_config_path: Option<PathBuf>,
37}
38
39impl HttpKernelState {
40 pub fn new(service: impl KernelService + Send + 'static) -> Self {
41 Self::with_options(service, HttpKernelOptions::default())
42 }
43
44 pub fn with_partitioned_options(
45 service: impl KernelService + Send + 'static,
46 partitioned: ReplicatedAuthorityPartitionService,
47 options: HttpKernelOptions,
48 ) -> Self {
49 Self::with_optional_partitioned(service, Some(partitioned), options)
50 }
51
52 pub fn with_options(
53 service: impl KernelService + Send + 'static,
54 options: HttpKernelOptions,
55 ) -> Self {
56 Self::with_optional_partitioned(service, None, options)
57 }
58
59 fn with_optional_partitioned(
60 service: impl KernelService + Send + 'static,
61 partitioned: Option<ReplicatedAuthorityPartitionService>,
62 options: HttpKernelOptions,
63 ) -> Self {
64 Self {
65 service: Arc::new(Mutex::new(Box::new(service))),
66 partitioned: partitioned.map(|service| Arc::new(Mutex::new(service))),
67 auth: Arc::new(Mutex::new(HttpAuth::from_config(options.auth))),
68 audit: AuditLog::new(options.audit_log_path),
69 status: Arc::new(Mutex::new(options.service_status.unwrap_or_else(|| {
70 ServiceStatusResponse::single_node(env!("CARGO_PKG_VERSION"), "pilot-v1", "v1")
71 }))),
72 auth_reload_config_path: options.auth_reload_config_path,
73 }
74 }
75
76 fn service(
77 &self,
78 ) -> Result<std::sync::MutexGuard<'_, Box<dyn KernelService + Send>>, HttpError> {
79 self.service.lock().map_err(|_| HttpError::LockPoisoned)
80 }
81
82 fn partitioned_service(
83 &self,
84 ) -> Result<std::sync::MutexGuard<'_, ReplicatedAuthorityPartitionService>, HttpError> {
85 let partitioned = self.partitioned.as_ref().ok_or_else(|| {
86 HttpError::Api(ApiError::Validation(
87 "partitioned prototype is not configured for this service".into(),
88 ))
89 })?;
90 partitioned.lock().map_err(|_| HttpError::LockPoisoned)
91 }
92
93 fn authorize(
94 &self,
95 headers: &HeaderMap,
96 required_scope: AuthScope,
97 ) -> Result<AuthenticatedPrincipal, HttpError> {
98 self.auth
99 .lock()
100 .map_err(|_| HttpError::LockPoisoned)?
101 .authorize(headers, required_scope)
102 }
103
104 fn status_snapshot(&self) -> Result<ServiceStatusResponse, HttpError> {
105 let mut status = self
106 .status
107 .lock()
108 .map(|status| status.clone())
109 .map_err(|_| HttpError::LockPoisoned)?;
110 if let Some(partitioned) = &self.partitioned {
111 let partition_status = partitioned
112 .lock()
113 .map_err(|_| HttpError::LockPoisoned)?
114 .partition_status()
115 .map_err(HttpError::Api)?;
116 status.service_mode = ServiceMode::Partitioned;
117 status.replicas = flatten_replica_status(&partition_status);
118 }
119 Ok(status)
120 }
121
122 fn reload_auth_from_config(&self) -> Result<AuthReloadResponse, HttpError> {
123 let Some(config_path) = &self.auth_reload_config_path else {
124 return Err(HttpError::Api(ApiError::Validation(
125 "auth reload is not configured for this service".into(),
126 )));
127 };
128 let resolved = PilotServiceConfig::load(config_path)
129 .and_then(|config| config.resolve(config_path))
130 .map_err(|error| HttpError::Api(ApiError::Validation(error.to_string())))?;
131 let mut status = self.status.lock().map_err(|_| HttpError::LockPoisoned)?;
132 if status.bind_addr.as_deref() != Some(resolved.bind_addr.as_str())
133 || status.storage.database_path.as_ref() != Some(&resolved.database_path)
134 || status.storage.audit_log_path.as_ref() != Some(&resolved.audit_log_path)
135 {
136 return Err(HttpError::Api(ApiError::Validation(
137 "auth reload cannot change bind or storage paths".into(),
138 )));
139 }
140 {
141 let mut auth = self.auth.lock().map_err(|_| HttpError::LockPoisoned)?;
142 *auth = HttpAuth::from_config(resolved.auth.clone());
143 }
144 status.config_version.clone_from(&resolved.config_version);
145 status.schema_version.clone_from(&resolved.schema_version);
146 status.principals = resolved
147 .token_summaries
148 .iter()
149 .map(|summary| summary.status_summary())
150 .collect();
151 Ok(AuthReloadResponse {
152 reloaded_at_ms: now_millis(),
153 principal_count: status.principals.len(),
154 revoked_count: status
155 .principals
156 .iter()
157 .filter(|principal| principal.revoked)
158 .count(),
159 })
160 }
161
162 fn execute<T, F>(
163 &self,
164 headers: &HeaderMap,
165 method: &'static str,
166 path: &'static str,
167 required_scope: AuthScope,
168 mut context: AuditContext,
169 operation: F,
170 ) -> Result<T, HttpError>
171 where
172 F: FnOnce(
173 &mut dyn KernelService,
174 &AuthenticatedPrincipal,
175 &mut AuditContext,
176 ) -> Result<T, HttpError>,
177 {
178 let principal = match self.authorize(headers, required_scope) {
179 Ok(principal) => principal,
180 Err(error) => {
181 self.audit.record(AuditEntry::for_denied(
182 method,
183 path,
184 error.status_code(),
185 error.audit_principal(),
186 None,
187 None,
188 required_scope,
189 error.audit_message(),
190 context,
191 ));
192 return Err(error);
193 }
194 };
195
196 let result = {
197 let mut service = self.service()?;
198 operation(service.as_mut(), &principal, &mut context)
199 };
200
201 let status = match &result {
202 Ok(_) => StatusCode::OK,
203 Err(error) => error.status_code(),
204 };
205 self.audit.record(AuditEntry::for_request(
206 method,
207 path,
208 status,
209 &principal,
210 required_scope,
211 context,
212 ));
213
214 result
215 }
216
217 fn execute_partitioned<T, F>(
218 &self,
219 headers: &HeaderMap,
220 method: &'static str,
221 path: &'static str,
222 required_scope: AuthScope,
223 context: AuditContext,
224 operation: F,
225 ) -> Result<T, HttpError>
226 where
227 F: FnOnce(
228 &mut ReplicatedAuthorityPartitionService,
229 &AuthenticatedPrincipal,
230 &mut AuditContext,
231 ) -> Result<T, HttpError>,
232 {
233 let principal = match self.authorize(headers, required_scope) {
234 Ok(principal) => principal,
235 Err(error) => {
236 self.audit.record(AuditEntry::for_denied(
237 method,
238 path,
239 error.status_code(),
240 error.audit_principal(),
241 None,
242 None,
243 required_scope,
244 error.audit_message(),
245 context,
246 ));
247 return Err(error);
248 }
249 };
250
251 {
252 let mut service = self.partitioned_service()?;
253 let mut context = context;
254 let result = operation(&mut service, &principal, &mut context);
255 let status = match &result {
256 Ok(_) => StatusCode::OK,
257 Err(error) => error.status_code(),
258 };
259 self.audit.record(AuditEntry::for_request(
260 method,
261 path,
262 status,
263 &principal,
264 required_scope,
265 context,
266 ));
267 result
268 }
269 }
270
271 fn audit_entries(&self, headers: &HeaderMap) -> Result<AuditLogResponse, HttpError> {
272 let principal = match self.authorize(headers, AuthScope::Ops) {
273 Ok(principal) => principal,
274 Err(error) => {
275 self.audit.record(AuditEntry::for_denied(
276 "GET",
277 "/v1/audit",
278 error.status_code(),
279 error.audit_principal(),
280 None,
281 None,
282 AuthScope::Ops,
283 error.audit_message(),
284 AuditContext::default(),
285 ));
286 return Err(error);
287 }
288 };
289
290 let response = AuditLogResponse {
291 entries: self.audit.snapshot()?,
292 };
293 self.audit.record(AuditEntry::for_request(
294 "GET",
295 "/v1/audit",
296 StatusCode::OK,
297 &principal,
298 AuthScope::Ops,
299 AuditContext::default(),
300 ));
301 Ok(response)
302 }
303}
304
305#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
306#[serde(rename_all = "snake_case")]
307pub enum AuthScope {
308 Append,
309 Query,
310 Explain,
311 Ops,
312}
313
314impl AuthScope {
315 fn as_str(self) -> &'static str {
316 match self {
317 Self::Append => "append",
318 Self::Query => "query",
319 Self::Explain => "explain",
320 Self::Ops => "ops",
321 }
322 }
323}
324
325#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
326pub struct HttpAuthConfig {
327 pub tokens: Vec<HttpAccessToken>,
328}
329
330impl HttpAuthConfig {
331 pub fn new() -> Self {
332 Self::default()
333 }
334
335 pub fn with_token(
336 mut self,
337 token: impl Into<String>,
338 principal: impl Into<String>,
339 scopes: impl IntoIterator<Item = AuthScope>,
340 ) -> Self {
341 self.tokens.push(HttpAccessToken {
342 token: token.into(),
343 token_id: String::new(),
344 principal: principal.into(),
345 principal_id: String::new(),
346 scopes: scopes.into_iter().collect(),
347 policy_context: None,
348 source: "inline".into(),
349 revoked: false,
350 });
351 self
352 }
353
354 pub fn with_token_context(
355 mut self,
356 token: impl Into<String>,
357 principal: impl Into<String>,
358 scopes: impl IntoIterator<Item = AuthScope>,
359 policy_context: PolicyContext,
360 ) -> Self {
361 self.tokens.push(HttpAccessToken {
362 token: token.into(),
363 token_id: String::new(),
364 principal: principal.into(),
365 principal_id: String::new(),
366 scopes: scopes.into_iter().collect(),
367 policy_context: normalize_policy_context(Some(policy_context)),
368 source: "inline".into(),
369 revoked: false,
370 });
371 self
372 }
373}
374
375#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
376pub struct HttpAccessToken {
377 pub token: String,
378 #[serde(default)]
379 pub token_id: String,
380 pub principal: String,
381 #[serde(default)]
382 pub principal_id: String,
383 pub scopes: Vec<AuthScope>,
384 pub policy_context: Option<PolicyContext>,
385 #[serde(default)]
386 pub source: String,
387 #[serde(default)]
388 pub revoked: bool,
389}
390
391#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
392pub struct HttpKernelOptions {
393 pub auth: HttpAuthConfig,
394 pub audit_log_path: Option<PathBuf>,
395 pub service_status: Option<ServiceStatusResponse>,
396 pub auth_reload_config_path: Option<PathBuf>,
397}
398
399impl HttpKernelOptions {
400 pub fn new() -> Self {
401 Self::default()
402 }
403
404 pub fn with_auth(mut self, auth: HttpAuthConfig) -> Self {
405 self.auth = auth;
406 self
407 }
408
409 pub fn with_audit_log_path(mut self, path: impl Into<PathBuf>) -> Self {
410 self.audit_log_path = Some(path.into());
411 self
412 }
413
414 pub fn with_service_status(mut self, status: ServiceStatusResponse) -> Self {
415 self.service_status = Some(status);
416 self
417 }
418
419 pub fn with_auth_reload_config_path(mut self, path: impl Into<PathBuf>) -> Self {
420 self.auth_reload_config_path = Some(path.into());
421 self
422 }
423}
424
425#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
426pub struct AuditEntry {
427 pub timestamp_ms: u64,
428 pub principal: String,
429 #[serde(default, skip_serializing_if = "Option::is_none")]
430 pub principal_id: Option<String>,
431 #[serde(default, skip_serializing_if = "Option::is_none")]
432 pub token_id: Option<String>,
433 pub method: String,
434 pub path: String,
435 pub status: u16,
436 pub scope: AuthScope,
437 pub outcome: String,
438 pub detail: Option<String>,
439 pub context: AuditContext,
440}
441
442#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
443pub struct AuditContext {
444 pub temporal_view: Option<String>,
445 pub query_goal: Option<String>,
446 pub tuple_id: Option<u64>,
447 pub requested_element: Option<u64>,
448 pub datom_count: Option<usize>,
449 pub entity_count: Option<usize>,
450 pub row_count: Option<usize>,
451 pub derived_tuple_count: Option<usize>,
452 pub trace_tuple_count: Option<usize>,
453 pub last_element: Option<u64>,
454 pub requested_capabilities: Vec<String>,
455 pub requested_visibilities: Vec<String>,
456 pub granted_capabilities: Vec<String>,
457 pub granted_visibilities: Vec<String>,
458 pub effective_capabilities: Vec<String>,
459 pub effective_visibilities: Vec<String>,
460 pub policy_decision: Option<String>,
461}
462
463impl AuditEntry {
464 fn for_request(
465 method: impl Into<String>,
466 path: impl Into<String>,
467 status: StatusCode,
468 principal: &AuthenticatedPrincipal,
469 scope: AuthScope,
470 context: AuditContext,
471 ) -> Self {
472 Self {
473 timestamp_ms: now_millis(),
474 principal: principal.id.clone(),
475 principal_id: principal.principal_id.clone(),
476 token_id: principal.token_id.clone(),
477 method: method.into(),
478 path: path.into(),
479 status: status.as_u16(),
480 scope,
481 outcome: if status.is_success() {
482 "ok".into()
483 } else {
484 "error".into()
485 },
486 detail: None,
487 context,
488 }
489 }
490
491 #[allow(clippy::too_many_arguments)]
492 fn for_denied(
493 method: impl Into<String>,
494 path: impl Into<String>,
495 status: StatusCode,
496 principal: impl Into<String>,
497 principal_id: Option<String>,
498 token_id: Option<String>,
499 scope: AuthScope,
500 detail: impl Into<String>,
501 context: AuditContext,
502 ) -> Self {
503 Self {
504 timestamp_ms: now_millis(),
505 principal: principal.into(),
506 principal_id,
507 token_id,
508 method: method.into(),
509 path: path.into(),
510 status: status.as_u16(),
511 scope,
512 outcome: if status == StatusCode::UNAUTHORIZED {
513 "unauthorized".into()
514 } else {
515 "forbidden".into()
516 },
517 detail: Some(detail.into()),
518 context,
519 }
520 }
521
522 fn audit_failure(path: &Path, error: &std::io::Error) -> Self {
523 Self {
524 timestamp_ms: now_millis(),
525 principal: "aether".into(),
526 principal_id: None,
527 token_id: None,
528 method: "AUDIT".into(),
529 path: path.display().to_string(),
530 status: StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
531 scope: AuthScope::Ops,
532 outcome: "audit_write_failed".into(),
533 detail: Some(error.to_string()),
534 context: AuditContext::default(),
535 }
536 }
537}
538
539#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
540pub struct AuditLogResponse {
541 pub entries: Vec<AuditEntry>,
542}
543
544#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
545pub struct HealthResponse {
546 pub status: String,
547}
548
549impl Default for HealthResponse {
550 fn default() -> Self {
551 Self {
552 status: "ok".into(),
553 }
554 }
555}
556
557pub fn http_router(service: impl KernelService + Send + 'static) -> Router {
558 http_router_with_options(service, HttpKernelOptions::default())
559}
560
561pub fn http_router_with_partitioned_options(
562 service: impl KernelService + Send + 'static,
563 partitioned: ReplicatedAuthorityPartitionService,
564 options: HttpKernelOptions,
565) -> Router {
566 Router::new()
567 .route("/health", get(health))
568 .route("/v1/status", get(service_status))
569 .route("/v1/history", get(history))
570 .route("/v1/audit", get(audit_log))
571 .route("/v1/admin/auth/reload", post(reload_auth))
572 .route("/v1/append", post(append))
573 .route("/v1/state/current", post(current_state))
574 .route("/v1/state/as-of", post(as_of))
575 .route("/v1/documents/parse", post(parse_document))
576 .route("/v1/documents/run", post(run_document))
577 .route(
578 "/v1/reports/pilot/coordination",
579 post(coordination_pilot_report),
580 )
581 .route(
582 "/v1/reports/pilot/coordination-delta",
583 post(coordination_delta_report),
584 )
585 .route("/v1/explain/tuple", post(explain_tuple))
586 .route("/v1/partitions/status", get(partition_status))
587 .route("/v1/partitions/promote", post(promote_replica))
588 .route("/v1/partitions/append", post(partition_append))
589 .route("/v1/partitions/history", post(partition_history))
590 .route("/v1/partitions/state", post(partition_state))
591 .route("/v1/federated/history", post(federated_history))
592 .route("/v1/federated/run", post(federated_run_document))
593 .route("/v1/federated/report", post(federated_report))
594 .route(
595 "/v1/sidecars/artifacts/register",
596 post(register_artifact_reference),
597 )
598 .route("/v1/sidecars/artifacts/get", post(get_artifact_reference))
599 .route(
600 "/v1/sidecars/vectors/register",
601 post(register_vector_record),
602 )
603 .route("/v1/sidecars/vectors/search", post(search_vectors))
604 .with_state(HttpKernelState::with_partitioned_options(
605 service,
606 partitioned,
607 options,
608 ))
609}
610
611pub fn http_router_with_options(
612 service: impl KernelService + Send + 'static,
613 options: HttpKernelOptions,
614) -> Router {
615 Router::new()
616 .route("/health", get(health))
617 .route("/v1/status", get(service_status))
618 .route("/v1/history", get(history))
619 .route("/v1/audit", get(audit_log))
620 .route("/v1/admin/auth/reload", post(reload_auth))
621 .route("/v1/append", post(append))
622 .route("/v1/state/current", post(current_state))
623 .route("/v1/state/as-of", post(as_of))
624 .route("/v1/documents/parse", post(parse_document))
625 .route("/v1/documents/run", post(run_document))
626 .route(
627 "/v1/reports/pilot/coordination",
628 post(coordination_pilot_report),
629 )
630 .route(
631 "/v1/reports/pilot/coordination-delta",
632 post(coordination_delta_report),
633 )
634 .route("/v1/explain/tuple", post(explain_tuple))
635 .route(
636 "/v1/sidecars/artifacts/register",
637 post(register_artifact_reference),
638 )
639 .route("/v1/sidecars/artifacts/get", post(get_artifact_reference))
640 .route(
641 "/v1/sidecars/vectors/register",
642 post(register_vector_record),
643 )
644 .route("/v1/sidecars/vectors/search", post(search_vectors))
645 .with_state(HttpKernelState::with_options(service, options))
646}
647
648#[derive(Clone, Debug)]
649struct AuthenticatedPrincipal {
650 id: String,
651 principal_id: Option<String>,
652 token_id: Option<String>,
653 policy_context: Option<PolicyContext>,
654 policy_bound: bool,
655}
656
657#[derive(Clone, Default)]
658struct AuditLog {
659 entries: Arc<Mutex<Vec<AuditEntry>>>,
660 path: Option<PathBuf>,
661}
662
663impl AuditLog {
664 fn new(path: Option<PathBuf>) -> Self {
665 Self {
666 entries: Arc::new(Mutex::new(Vec::new())),
667 path,
668 }
669 }
670
671 fn record(&self, entry: AuditEntry) {
672 let mut entries = match self.entries.lock() {
673 Ok(entries) => entries,
674 Err(_) => return,
675 };
676 entries.push(entry.clone());
677
678 if let Some(path) = &self.path {
679 if let Err(error) = append_audit_entry(path, &entry) {
680 entries.push(AuditEntry::audit_failure(path, &error));
681 }
682 }
683 }
684
685 fn snapshot(&self) -> Result<Vec<AuditEntry>, HttpError> {
686 self.entries
687 .lock()
688 .map(|entries| entries.clone())
689 .map_err(|_| HttpError::LockPoisoned)
690 }
691}
692
693#[derive(Clone, Default)]
694struct HttpAuth {
695 tokens: HashMap<String, AuthenticatedToken>,
696}
697
698impl HttpAuth {
699 fn from_config(config: HttpAuthConfig) -> Self {
700 let mut tokens = HashMap::new();
701 for access in config.tokens {
702 let principal_id = if access.principal_id.trim().is_empty() {
703 Some(format!("principal:{}", access.principal))
704 } else {
705 Some(access.principal_id.clone())
706 };
707 let token_id = if access.token_id.trim().is_empty() {
708 Some(format!("token:{}", access.principal))
709 } else {
710 Some(access.token_id.clone())
711 };
712 tokens.insert(
713 access.token,
714 AuthenticatedToken {
715 principal: access.principal,
716 principal_id,
717 token_id,
718 scopes: access.scopes.into_iter().collect(),
719 policy_context: access.policy_context,
720 revoked: access.revoked,
721 },
722 );
723 }
724 Self { tokens }
725 }
726
727 fn authorize(
728 &self,
729 headers: &HeaderMap,
730 required_scope: AuthScope,
731 ) -> Result<AuthenticatedPrincipal, HttpError> {
732 if self.tokens.is_empty() {
733 return Ok(AuthenticatedPrincipal {
734 id: "anonymous".into(),
735 principal_id: None,
736 token_id: None,
737 policy_context: None,
738 policy_bound: false,
739 });
740 }
741
742 let header = headers.get(AUTHORIZATION).ok_or(HttpError::Unauthorized {
743 principal: "anonymous".into(),
744 message: "missing bearer token".into(),
745 })?;
746 let header = header.to_str().map_err(|_| HttpError::Unauthorized {
747 principal: "anonymous".into(),
748 message: "authorization header is not valid UTF-8".into(),
749 })?;
750 let token = header
751 .strip_prefix("Bearer ")
752 .ok_or(HttpError::Unauthorized {
753 principal: "anonymous".into(),
754 message: "authorization header must use Bearer auth".into(),
755 })?;
756
757 let Some(access) = self.tokens.get(token) else {
758 return Err(HttpError::Unauthorized {
759 principal: "anonymous".into(),
760 message: "unknown bearer token".into(),
761 });
762 };
763
764 if !access.scopes.contains(&required_scope) {
765 return Err(HttpError::Forbidden {
766 principal: access.principal.clone(),
767 message: format!("token lacks {} scope", required_scope.as_str()),
768 });
769 }
770 if access.revoked {
771 return Err(HttpError::Forbidden {
772 principal: access.principal.clone(),
773 message: "token is revoked".into(),
774 });
775 }
776
777 Ok(AuthenticatedPrincipal {
778 id: access.principal.clone(),
779 principal_id: access.principal_id.clone(),
780 token_id: access.token_id.clone(),
781 policy_context: access.policy_context.clone(),
782 policy_bound: true,
783 })
784 }
785}
786
787#[derive(Clone, Debug)]
788struct AuthenticatedToken {
789 principal: String,
790 principal_id: Option<String>,
791 token_id: Option<String>,
792 scopes: BTreeSet<AuthScope>,
793 policy_context: Option<PolicyContext>,
794 revoked: bool,
795}
796
797fn normalize_policy_context(policy_context: Option<PolicyContext>) -> Option<PolicyContext> {
798 match policy_context {
799 Some(policy_context) if policy_context.is_empty() => None,
800 other => other,
801 }
802}
803
804fn flatten_replica_status(status: &PartitionStatusResponse) -> Vec<crate::ReplicaStatusSummary> {
805 let mut replicas = Vec::new();
806 for partition in &status.partitions {
807 for replica in &partition.replicas {
808 replicas.push(crate::ReplicaStatusSummary {
809 partition: partition.partition.to_string(),
810 replica_id: replica.replica_id.0,
811 role: match replica.role {
812 crate::ReplicaRole::Leader => "leader".into(),
813 crate::ReplicaRole::Follower => "follower".into(),
814 },
815 leader_epoch: replica.leader_epoch.0,
816 applied_element: replica.applied_element.map(|element| element.0),
817 replication_lag: replica.replication_lag,
818 healthy: replica.healthy,
819 detail: replica.detail.clone(),
820 });
821 }
822 }
823 replicas
824}
825
826fn bound_policy_context(
827 principal: &AuthenticatedPrincipal,
828 requested: Option<PolicyContext>,
829) -> Result<Option<PolicyContext>, HttpError> {
830 let requested = normalize_policy_context(requested);
831 if !principal.policy_bound {
832 return Ok(requested);
833 }
834
835 let granted = normalize_policy_context(principal.policy_context.clone());
836 match (granted, requested) {
837 (None, None) => Ok(None),
838 (None, Some(_)) => Err(HttpError::Forbidden {
839 principal: principal.id.clone(),
840 message: "requested policy context exceeds token policy".into(),
841 }),
842 (Some(granted), None) => Ok(Some(granted)),
843 (Some(granted), Some(requested)) => {
844 if requested.subset_of(&granted) {
845 Ok(Some(requested))
846 } else {
847 Err(HttpError::Forbidden {
848 principal: principal.id.clone(),
849 message: "requested policy context exceeds token policy".into(),
850 })
851 }
852 }
853 }
854}
855
856fn write_policy_context_fields(
857 target_capabilities: &mut Vec<String>,
858 target_visibilities: &mut Vec<String>,
859 policy_context: Option<&PolicyContext>,
860) {
861 target_capabilities.clear();
862 target_visibilities.clear();
863 if let Some(policy_context) = policy_context {
864 target_capabilities.extend(policy_context.capabilities.iter().cloned());
865 target_visibilities.extend(policy_context.visibilities.iter().cloned());
866 }
867}
868
869fn apply_policy_binding(
870 principal: &AuthenticatedPrincipal,
871 requested: Option<PolicyContext>,
872 context: &mut AuditContext,
873) -> Result<Option<PolicyContext>, HttpError> {
874 let requested = normalize_policy_context(requested);
875 write_policy_context_fields(
876 &mut context.requested_capabilities,
877 &mut context.requested_visibilities,
878 requested.as_ref(),
879 );
880 write_policy_context_fields(
881 &mut context.granted_capabilities,
882 &mut context.granted_visibilities,
883 principal.policy_context.as_ref(),
884 );
885
886 match bound_policy_context(principal, requested.clone()) {
887 Ok(effective) => {
888 write_policy_context_fields(
889 &mut context.effective_capabilities,
890 &mut context.effective_visibilities,
891 effective.as_ref(),
892 );
893 context.policy_decision = Some(
894 match (
895 normalize_policy_context(principal.policy_context.clone()),
896 requested,
897 effective.clone(),
898 ) {
899 (None, None, None) => "public".into(),
900 (None, Some(_), Some(_)) => "request_supplied".into(),
901 (Some(_), None, Some(_)) => "token_default".into(),
902 (Some(granted), Some(requested), Some(_)) if requested == granted => {
903 "request_exact".into()
904 }
905 (Some(_), Some(_), Some(_)) => "request_narrowed".into(),
906 _ => "public".into(),
907 },
908 );
909 Ok(effective)
910 }
911 Err(error) => {
912 context.policy_decision = Some("denied_escalation".into());
913 Err(error)
914 }
915 }
916}
917
918#[derive(Debug)]
919enum HttpError {
920 Api(ApiError),
921 Unauthorized { principal: String, message: String },
922 Forbidden { principal: String, message: String },
923 LockPoisoned,
924}
925
926impl HttpError {
927 fn status_code(&self) -> StatusCode {
928 match self {
929 Self::Api(error) => status_for_api_error(error),
930 Self::Unauthorized { .. } => StatusCode::UNAUTHORIZED,
931 Self::Forbidden { .. } => StatusCode::FORBIDDEN,
932 Self::LockPoisoned => StatusCode::INTERNAL_SERVER_ERROR,
933 }
934 }
935
936 fn audit_principal(&self) -> String {
937 match self {
938 Self::Unauthorized { principal, .. } | Self::Forbidden { principal, .. } => {
939 principal.clone()
940 }
941 Self::Api(_) | Self::LockPoisoned => "aether".into(),
942 }
943 }
944
945 fn audit_message(&self) -> String {
946 match self {
947 Self::Api(error) => error.to_string(),
948 Self::Unauthorized { message, .. } | Self::Forbidden { message, .. } => message.clone(),
949 Self::LockPoisoned => "internal service state is unavailable".into(),
950 }
951 }
952}
953
954impl From<ApiError> for HttpError {
955 fn from(value: ApiError) -> Self {
956 Self::Api(value)
957 }
958}
959
960#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
961struct ErrorBody {
962 error: String,
963}
964
965impl IntoResponse for HttpError {
966 fn into_response(self) -> Response {
967 let status = self.status_code();
968 let error = match self {
969 Self::Api(error) => error.to_string(),
970 Self::Unauthorized { message, .. } | Self::Forbidden { message, .. } => message,
971 Self::LockPoisoned => "internal service state is unavailable".into(),
972 };
973
974 (status, Json(ErrorBody { error })).into_response()
975 }
976}
977
978fn status_for_api_error(error: &ApiError) -> StatusCode {
979 match error {
980 ApiError::Validation(_)
981 | ApiError::Sidecar(_)
982 | ApiError::Resolve(_)
983 | ApiError::Parse(_)
984 | ApiError::Compile(_)
985 | ApiError::Runtime(_)
986 | ApiError::Explain(_) => StatusCode::BAD_REQUEST,
987 ApiError::Journal(_) => StatusCode::CONFLICT,
988 }
989}
990
991async fn health() -> Json<HealthResponse> {
992 Json(HealthResponse::default())
993}
994
995async fn service_status(
996 State(state): State<HttpKernelState>,
997 headers: HeaderMap,
998) -> Result<Json<ServiceStatusResponse>, HttpError> {
999 let principal = match state.authorize(&headers, AuthScope::Ops) {
1000 Ok(principal) => principal,
1001 Err(error) => {
1002 state.audit.record(AuditEntry::for_denied(
1003 "GET",
1004 "/v1/status",
1005 error.status_code(),
1006 error.audit_principal(),
1007 None,
1008 None,
1009 AuthScope::Ops,
1010 error.audit_message(),
1011 AuditContext {
1012 temporal_view: Some("service_status".into()),
1013 ..Default::default()
1014 },
1015 ));
1016 return Err(error);
1017 }
1018 };
1019 let response = state.status_snapshot()?;
1020 state.audit.record(AuditEntry::for_request(
1021 "GET",
1022 "/v1/status",
1023 StatusCode::OK,
1024 &principal,
1025 AuthScope::Ops,
1026 AuditContext {
1027 temporal_view: Some("service_status".into()),
1028 ..Default::default()
1029 },
1030 ));
1031 Ok(Json(response))
1032}
1033
1034async fn history(
1035 State(state): State<HttpKernelState>,
1036 headers: HeaderMap,
1037) -> Result<Json<crate::HistoryResponse>, HttpError> {
1038 let request_context = AuditContext {
1039 temporal_view: Some("history".into()),
1040 ..Default::default()
1041 };
1042 let response = state.execute(
1043 &headers,
1044 "GET",
1045 "/v1/history",
1046 AuthScope::Ops,
1047 request_context.clone(),
1048 |service, principal, context| {
1049 let policy_context = apply_policy_binding(principal, None, context)?;
1050 let response = service
1051 .history(HistoryRequest { policy_context })
1052 .map_err(HttpError::Api)?;
1053 context.datom_count = Some(response.datoms.len());
1054 context.last_element = response.datoms.last().map(|datom| datom.element.0);
1055 Ok(response)
1056 },
1057 )?;
1058 Ok(Json(response))
1059}
1060
1061async fn audit_log(
1062 State(state): State<HttpKernelState>,
1063 headers: HeaderMap,
1064) -> Result<Json<AuditLogResponse>, HttpError> {
1065 Ok(Json(state.audit_entries(&headers)?))
1066}
1067
1068async fn reload_auth(
1069 State(state): State<HttpKernelState>,
1070 headers: HeaderMap,
1071) -> Result<Json<AuthReloadResponse>, HttpError> {
1072 let principal = match state.authorize(&headers, AuthScope::Ops) {
1073 Ok(principal) => principal,
1074 Err(error) => {
1075 state.audit.record(AuditEntry::for_denied(
1076 "POST",
1077 "/v1/admin/auth/reload",
1078 error.status_code(),
1079 error.audit_principal(),
1080 None,
1081 None,
1082 AuthScope::Ops,
1083 error.audit_message(),
1084 AuditContext {
1085 temporal_view: Some("auth_reload".into()),
1086 ..Default::default()
1087 },
1088 ));
1089 return Err(error);
1090 }
1091 };
1092 let response = state.reload_auth_from_config()?;
1093 state.audit.record(AuditEntry::for_request(
1094 "POST",
1095 "/v1/admin/auth/reload",
1096 StatusCode::OK,
1097 &principal,
1098 AuthScope::Ops,
1099 AuditContext {
1100 temporal_view: Some("auth_reload".into()),
1101 ..Default::default()
1102 },
1103 ));
1104 Ok(Json(response))
1105}
1106
1107async fn append(
1108 State(state): State<HttpKernelState>,
1109 headers: HeaderMap,
1110 Json(request): Json<AppendRequest>,
1111) -> Result<Json<crate::AppendResponse>, HttpError> {
1112 let request_context = audit_context_for_append(&request);
1113 let response = state.execute(
1114 &headers,
1115 "POST",
1116 "/v1/append",
1117 AuthScope::Append,
1118 request_context.clone(),
1119 |service, _principal, _context| {
1120 let response = service.append(request).map_err(HttpError::Api)?;
1121 Ok(response)
1122 },
1123 )?;
1124 Ok(Json(response))
1125}
1126
1127async fn current_state(
1128 State(state): State<HttpKernelState>,
1129 headers: HeaderMap,
1130 Json(request): Json<CurrentStateRequest>,
1131) -> Result<Json<crate::CurrentStateResponse>, HttpError> {
1132 let request_context = AuditContext {
1133 temporal_view: Some("current".into()),
1134 datom_count: Some(request.datoms.len()),
1135 ..Default::default()
1136 };
1137 let response = state.execute(
1138 &headers,
1139 "POST",
1140 "/v1/state/current",
1141 AuthScope::Query,
1142 request_context.clone(),
1143 |service, principal, context| {
1144 let mut request = request;
1145 request.policy_context =
1146 apply_policy_binding(principal, request.policy_context, context)?;
1147 let response = service.current_state(request).map_err(HttpError::Api)?;
1148 context.entity_count = Some(response.state.entities.len());
1149 context.last_element = response.state.as_of.map(|element| element.0);
1150 Ok(response)
1151 },
1152 )?;
1153 Ok(Json(response))
1154}
1155
1156async fn as_of(
1157 State(state): State<HttpKernelState>,
1158 headers: HeaderMap,
1159 Json(request): Json<AsOfRequest>,
1160) -> Result<Json<crate::AsOfResponse>, HttpError> {
1161 let request_context = AuditContext {
1162 temporal_view: Some(format!("as_of(e{})", request.at.0)),
1163 requested_element: Some(request.at.0),
1164 datom_count: Some(request.datoms.len()),
1165 ..Default::default()
1166 };
1167 let response = state.execute(
1168 &headers,
1169 "POST",
1170 "/v1/state/as-of",
1171 AuthScope::Query,
1172 request_context.clone(),
1173 |service, principal, context| {
1174 let mut request = request;
1175 request.policy_context =
1176 apply_policy_binding(principal, request.policy_context, context)?;
1177 let response = service.as_of(request).map_err(HttpError::Api)?;
1178 context.entity_count = Some(response.state.entities.len());
1179 context.last_element = response.state.as_of.map(|element| element.0);
1180 Ok(response)
1181 },
1182 )?;
1183 Ok(Json(response))
1184}
1185
1186async fn parse_document(
1187 State(state): State<HttpKernelState>,
1188 headers: HeaderMap,
1189 Json(request): Json<ParseDocumentRequest>,
1190) -> Result<Json<crate::ParseDocumentResponse>, HttpError> {
1191 let request_context = audit_context_for_document(&request.dsl);
1192 let response = state.execute(
1193 &headers,
1194 "POST",
1195 "/v1/documents/parse",
1196 AuthScope::Query,
1197 request_context.clone(),
1198 |service, _principal, _context| {
1199 let response = service.parse_document(request).map_err(HttpError::Api)?;
1200 Ok(response)
1201 },
1202 )?;
1203 Ok(Json(response))
1204}
1205
1206async fn run_document(
1207 State(state): State<HttpKernelState>,
1208 headers: HeaderMap,
1209 Json(request): Json<RunDocumentRequest>,
1210) -> Result<Json<crate::RunDocumentResponse>, HttpError> {
1211 let request_context = audit_context_for_document(&request.dsl);
1212 let response = state.execute(
1213 &headers,
1214 "POST",
1215 "/v1/documents/run",
1216 AuthScope::Query,
1217 request_context.clone(),
1218 |service, principal, context| {
1219 let mut request = request;
1220 request.policy_context =
1221 apply_policy_binding(principal, request.policy_context, context)?;
1222 let response = service.run_document(request).map_err(HttpError::Api)?;
1223 context.entity_count = Some(response.state.entities.len());
1224 context.last_element = response.state.as_of.map(|element| element.0);
1225 context.derived_tuple_count = Some(response.derived.tuples.len());
1226 context.row_count = response.query.as_ref().map(|query| query.rows.len());
1227 Ok(response)
1228 },
1229 )?;
1230 Ok(Json(response))
1231}
1232
1233async fn coordination_pilot_report(
1234 State(state): State<HttpKernelState>,
1235 headers: HeaderMap,
1236 Json(request): Json<CoordinationPilotReportRequest>,
1237) -> Result<Json<crate::CoordinationPilotReport>, HttpError> {
1238 let request_context = AuditContext {
1239 temporal_view: Some("coordination_pilot_report".into()),
1240 ..Default::default()
1241 };
1242 let response = state.execute(
1243 &headers,
1244 "POST",
1245 "/v1/reports/pilot/coordination",
1246 AuthScope::Query,
1247 request_context.clone(),
1248 |service, principal, context| {
1249 let mut request = request;
1250 request.policy_context =
1251 apply_policy_binding(principal, request.policy_context, context)?;
1252 let response = service
1253 .coordination_pilot_report(request)
1254 .map_err(HttpError::Api)?;
1255 context.datom_count = Some(response.history_len);
1256 context.row_count = Some(
1257 response.pre_heartbeat_authorized.len()
1258 + response.as_of_authorized.len()
1259 + response.live_heartbeats.len()
1260 + response.current_authorized.len()
1261 + response.claimable.len()
1262 + response.accepted_outcomes.len()
1263 + response.rejected_outcomes.len(),
1264 );
1265 context.trace_tuple_count = response.trace.as_ref().map(|trace| trace.tuple_count);
1266 context.tuple_id = response.trace.as_ref().map(|trace| trace.root.0);
1267 Ok(response)
1268 },
1269 )?;
1270 Ok(Json(response))
1271}
1272
1273async fn coordination_delta_report(
1274 State(state): State<HttpKernelState>,
1275 headers: HeaderMap,
1276 Json(request): Json<CoordinationDeltaReportRequest>,
1277) -> Result<Json<crate::CoordinationDeltaReport>, HttpError> {
1278 let request_context = AuditContext {
1279 temporal_view: Some("coordination_delta_report".into()),
1280 ..Default::default()
1281 };
1282 let response = state.execute(
1283 &headers,
1284 "POST",
1285 "/v1/reports/pilot/coordination-delta",
1286 AuthScope::Query,
1287 request_context,
1288 |service, principal, context| {
1289 let mut request = request;
1290 request.policy_context =
1291 apply_policy_binding(principal, request.policy_context, context)?;
1292 let response = service
1293 .coordination_delta_report(request)
1294 .map_err(HttpError::Api)?;
1295 context.datom_count = Some(response.right_history_len);
1296 context.row_count = Some(
1297 response.current_authorized.added.len()
1298 + response.current_authorized.removed.len()
1299 + response.current_authorized.changed.len()
1300 + response.claimable.added.len()
1301 + response.claimable.removed.len()
1302 + response.claimable.changed.len()
1303 + response.live_heartbeats.added.len()
1304 + response.live_heartbeats.removed.len()
1305 + response.live_heartbeats.changed.len()
1306 + response.accepted_outcomes.added.len()
1307 + response.accepted_outcomes.removed.len()
1308 + response.accepted_outcomes.changed.len()
1309 + response.rejected_outcomes.added.len()
1310 + response.rejected_outcomes.removed.len()
1311 + response.rejected_outcomes.changed.len(),
1312 );
1313 Ok(response)
1314 },
1315 )?;
1316 Ok(Json(response))
1317}
1318
1319async fn partition_status(
1320 State(state): State<HttpKernelState>,
1321 headers: HeaderMap,
1322) -> Result<Json<PartitionStatusResponse>, HttpError> {
1323 let response = state.execute_partitioned(
1324 &headers,
1325 "GET",
1326 "/v1/partitions/status",
1327 AuthScope::Ops,
1328 AuditContext {
1329 temporal_view: Some("partition_status".into()),
1330 ..Default::default()
1331 },
1332 |service, _principal, context| {
1333 let response = service.partition_status().map_err(HttpError::Api)?;
1334 context.entity_count = Some(response.partitions.len());
1335 context.row_count = Some(
1336 response
1337 .partitions
1338 .iter()
1339 .map(|partition| partition.replicas.len())
1340 .sum(),
1341 );
1342 Ok(response)
1343 },
1344 )?;
1345 Ok(Json(response))
1346}
1347
1348async fn promote_replica(
1349 State(state): State<HttpKernelState>,
1350 headers: HeaderMap,
1351 Json(request): Json<PromoteReplicaRequest>,
1352) -> Result<Json<crate::PromoteReplicaResponse>, HttpError> {
1353 let request_context = AuditContext {
1354 temporal_view: Some(format!("partition({})", request.partition)),
1355 ..Default::default()
1356 };
1357 let response = state.execute_partitioned(
1358 &headers,
1359 "POST",
1360 "/v1/partitions/promote",
1361 AuthScope::Ops,
1362 request_context,
1363 |service, _principal, context| {
1364 let response = service.promote_replica(request).map_err(HttpError::Api)?;
1365 context.requested_element = Some(response.leader_epoch.0);
1366 Ok(response)
1367 },
1368 )?;
1369 Ok(Json(response))
1370}
1371
1372async fn partition_append(
1373 State(state): State<HttpKernelState>,
1374 headers: HeaderMap,
1375 Json(request): Json<PartitionAppendRequest>,
1376) -> Result<Json<crate::PartitionAppendResponse>, HttpError> {
1377 let request_context = AuditContext {
1378 temporal_view: Some(format!("partition({})", request.partition)),
1379 datom_count: Some(request.datoms.len()),
1380 last_element: request.datoms.last().map(|datom| datom.element.0),
1381 ..Default::default()
1382 };
1383 let response = state.execute_partitioned(
1384 &headers,
1385 "POST",
1386 "/v1/partitions/append",
1387 AuthScope::Append,
1388 request_context,
1389 |service, _principal, context| {
1390 let response = service.append_partition(request).map_err(HttpError::Api)?;
1391 context.requested_element = response.leader_epoch.as_ref().map(|epoch| epoch.0);
1392 Ok(response)
1393 },
1394 )?;
1395 Ok(Json(response))
1396}
1397
1398async fn partition_history(
1399 State(state): State<HttpKernelState>,
1400 headers: HeaderMap,
1401 Json(request): Json<PartitionHistoryRequest>,
1402) -> Result<Json<crate::PartitionHistoryResponse>, HttpError> {
1403 let request_context = AuditContext {
1404 temporal_view: Some(request.cut.to_string()),
1405 requested_element: request.cut.as_of.map(|element| element.0),
1406 ..Default::default()
1407 };
1408 let response = state.execute_partitioned(
1409 &headers,
1410 "POST",
1411 "/v1/partitions/history",
1412 AuthScope::Query,
1413 request_context,
1414 |service, principal, context| {
1415 let mut request = request;
1416 request.policy_context =
1417 apply_policy_binding(principal, request.policy_context, context)?;
1418 let response = service.partition_history(request).map_err(HttpError::Api)?;
1419 context.datom_count = Some(response.datoms.len());
1420 context.entity_count = Some(
1421 response
1422 .datoms
1423 .iter()
1424 .map(|datom| datom.entity)
1425 .collect::<BTreeSet<_>>()
1426 .len(),
1427 );
1428 context.last_element = response.datoms.last().map(|datom| datom.element.0);
1429 Ok(response)
1430 },
1431 )?;
1432 Ok(Json(response))
1433}
1434
1435async fn partition_state(
1436 State(state): State<HttpKernelState>,
1437 headers: HeaderMap,
1438 Json(request): Json<PartitionStateRequest>,
1439) -> Result<Json<crate::PartitionStateResponse>, HttpError> {
1440 let request_context = AuditContext {
1441 temporal_view: Some(request.cut.to_string()),
1442 requested_element: request.cut.as_of.map(|element| element.0),
1443 ..Default::default()
1444 };
1445 let response = state.execute_partitioned(
1446 &headers,
1447 "POST",
1448 "/v1/partitions/state",
1449 AuthScope::Query,
1450 request_context,
1451 |service, principal, context| {
1452 let mut request = request;
1453 request.policy_context =
1454 apply_policy_binding(principal, request.policy_context, context)?;
1455 let response = service.partition_state(request).map_err(HttpError::Api)?;
1456 context.entity_count = Some(response.state.entities.len());
1457 context.last_element = response.cut.as_of.map(|element| element.0);
1458 Ok(response)
1459 },
1460 )?;
1461 Ok(Json(response))
1462}
1463
1464async fn federated_history(
1465 State(state): State<HttpKernelState>,
1466 headers: HeaderMap,
1467 Json(request): Json<FederatedHistoryRequest>,
1468) -> Result<Json<crate::FederatedHistoryResponse>, HttpError> {
1469 let request_context = AuditContext {
1470 temporal_view: Some("federated_history".into()),
1471 ..Default::default()
1472 };
1473 let response = state.execute_partitioned(
1474 &headers,
1475 "POST",
1476 "/v1/federated/history",
1477 AuthScope::Query,
1478 request_context,
1479 |service, principal, context| {
1480 let mut request = request;
1481 request.policy_context =
1482 apply_policy_binding(principal, request.policy_context, context)?;
1483 let response = service.federated_history(request).map_err(HttpError::Api)?;
1484 context.datom_count = Some(
1485 response
1486 .partitions
1487 .iter()
1488 .map(|partition| partition.datoms.len())
1489 .sum(),
1490 );
1491 context.entity_count = Some(response.partitions.len());
1492 Ok(response)
1493 },
1494 )?;
1495 Ok(Json(response))
1496}
1497
1498async fn federated_run_document(
1499 State(state): State<HttpKernelState>,
1500 headers: HeaderMap,
1501 Json(request): Json<FederatedRunDocumentRequest>,
1502) -> Result<Json<crate::FederatedRunDocumentResponse>, HttpError> {
1503 let request_context = AuditContext {
1504 temporal_view: Some("federated_run_document".into()),
1505 ..Default::default()
1506 };
1507 let response = state.execute_partitioned(
1508 &headers,
1509 "POST",
1510 "/v1/federated/run",
1511 AuthScope::Query,
1512 request_context,
1513 |service, principal, context| {
1514 let mut request = request;
1515 request.policy_context =
1516 apply_policy_binding(principal, request.policy_context, context)?;
1517 let response = service
1518 .federated_run_document(request)
1519 .map_err(HttpError::Api)?;
1520 context.entity_count = Some(response.cut.cuts.len());
1521 context.row_count = Some(
1522 response
1523 .run
1524 .query
1525 .as_ref()
1526 .map(|query| query.rows.len())
1527 .unwrap_or(0),
1528 );
1529 context.derived_tuple_count = Some(response.run.derived.tuples.len());
1530 Ok(response)
1531 },
1532 )?;
1533 Ok(Json(response))
1534}
1535
1536async fn federated_report(
1537 State(state): State<HttpKernelState>,
1538 headers: HeaderMap,
1539 Json(request): Json<FederatedRunDocumentRequest>,
1540) -> Result<Json<FederatedExplainReport>, HttpError> {
1541 let request_context = AuditContext {
1542 temporal_view: Some("federated_report".into()),
1543 ..Default::default()
1544 };
1545 let response = state.execute_partitioned(
1546 &headers,
1547 "POST",
1548 "/v1/federated/report",
1549 AuthScope::Explain,
1550 request_context,
1551 |service, principal, context| {
1552 let mut request = request;
1553 request.policy_context =
1554 apply_policy_binding(principal, request.policy_context, context)?;
1555 let response = service
1556 .build_federated_explain_report(request)
1557 .map_err(HttpError::Api)?;
1558 context.entity_count = Some(response.cut.cuts.len());
1559 context.row_count = Some(
1560 response.primary_query.len()
1561 + response
1562 .named_queries
1563 .iter()
1564 .map(|query| query.rows.len())
1565 .sum::<usize>(),
1566 );
1567 context.trace_tuple_count =
1568 Some(response.traces.iter().map(|trace| trace.tuple_count).sum());
1569 context.tuple_id = response.traces.first().map(|trace| trace.root.0);
1570 Ok(response)
1571 },
1572 )?;
1573 Ok(Json(response))
1574}
1575
1576async fn explain_tuple(
1577 State(state): State<HttpKernelState>,
1578 headers: HeaderMap,
1579 Json(request): Json<ExplainTupleRequest>,
1580) -> Result<Json<crate::ExplainTupleResponse>, HttpError> {
1581 let request_context = AuditContext {
1582 tuple_id: Some(request.tuple_id.0),
1583 ..Default::default()
1584 };
1585 let response = state.execute(
1586 &headers,
1587 "POST",
1588 "/v1/explain/tuple",
1589 AuthScope::Explain,
1590 request_context.clone(),
1591 |service, principal, context| {
1592 let mut request = request;
1593 request.policy_context =
1594 apply_policy_binding(principal, request.policy_context, context)?;
1595 let response = service.explain_tuple(request).map_err(HttpError::Api)?;
1596 context.trace_tuple_count = Some(response.trace.tuples.len());
1597 Ok(response)
1598 },
1599 )?;
1600 Ok(Json(response))
1601}
1602
1603async fn register_artifact_reference(
1604 State(state): State<HttpKernelState>,
1605 headers: HeaderMap,
1606 Json(request): Json<RegisterArtifactReferenceRequest>,
1607) -> Result<Json<crate::RegisterArtifactReferenceResponse>, HttpError> {
1608 let request_context = AuditContext {
1609 temporal_view: Some("sidecar_artifact_register".into()),
1610 requested_element: Some(request.reference.registered_at.0),
1611 ..Default::default()
1612 };
1613 let response = state.execute(
1614 &headers,
1615 "POST",
1616 "/v1/sidecars/artifacts/register",
1617 AuthScope::Append,
1618 request_context.clone(),
1619 |service, _principal, _context| {
1620 let response = service
1621 .register_artifact_reference(request)
1622 .map_err(HttpError::Api)?;
1623 Ok(response)
1624 },
1625 )?;
1626 Ok(Json(response))
1627}
1628
1629async fn get_artifact_reference(
1630 State(state): State<HttpKernelState>,
1631 headers: HeaderMap,
1632 Json(request): Json<GetArtifactReferenceRequest>,
1633) -> Result<Json<crate::GetArtifactReferenceResponse>, HttpError> {
1634 let request_context = AuditContext {
1635 temporal_view: Some("sidecar_artifact_lookup".into()),
1636 ..Default::default()
1637 };
1638 let response = state.execute(
1639 &headers,
1640 "POST",
1641 "/v1/sidecars/artifacts/get",
1642 AuthScope::Query,
1643 request_context.clone(),
1644 |service, principal, context| {
1645 let mut request = request;
1646 request.policy_context =
1647 apply_policy_binding(principal, request.policy_context, context)?;
1648 let response = service
1649 .get_artifact_reference(request)
1650 .map_err(HttpError::Api)?;
1651 Ok(response)
1652 },
1653 )?;
1654 Ok(Json(response))
1655}
1656
1657async fn register_vector_record(
1658 State(state): State<HttpKernelState>,
1659 headers: HeaderMap,
1660 Json(request): Json<RegisterVectorRecordRequest>,
1661) -> Result<Json<crate::RegisterVectorRecordResponse>, HttpError> {
1662 let request_context = AuditContext {
1663 temporal_view: Some("sidecar_vector_register".into()),
1664 requested_element: Some(request.record.registered_at.0),
1665 ..Default::default()
1666 };
1667 let response = state.execute(
1668 &headers,
1669 "POST",
1670 "/v1/sidecars/vectors/register",
1671 AuthScope::Append,
1672 request_context.clone(),
1673 |service, _principal, _context| {
1674 let response = service
1675 .register_vector_record(request)
1676 .map_err(HttpError::Api)?;
1677 Ok(response)
1678 },
1679 )?;
1680 Ok(Json(response))
1681}
1682
1683async fn search_vectors(
1684 State(state): State<HttpKernelState>,
1685 headers: HeaderMap,
1686 Json(request): Json<SearchVectorsRequest>,
1687) -> Result<Json<crate::SearchVectorsResponse>, HttpError> {
1688 let request_context = AuditContext {
1689 temporal_view: Some("sidecar_vector_search".into()),
1690 requested_element: request.as_of.map(|element| element.0),
1691 ..Default::default()
1692 };
1693 let response = state.execute(
1694 &headers,
1695 "POST",
1696 "/v1/sidecars/vectors/search",
1697 AuthScope::Query,
1698 request_context.clone(),
1699 |service, principal, context| {
1700 let mut request = request;
1701 request.policy_context =
1702 apply_policy_binding(principal, request.policy_context, context)?;
1703 let response = service.search_vectors(request).map_err(HttpError::Api)?;
1704 context.row_count = Some(response.matches.len());
1705 Ok(response)
1706 },
1707 )?;
1708 Ok(Json(response))
1709}
1710
1711fn audit_context_for_append(request: &AppendRequest) -> AuditContext {
1712 AuditContext {
1713 datom_count: Some(request.datoms.len()),
1714 last_element: request.datoms.last().map(|datom| datom.element.0),
1715 ..Default::default()
1716 }
1717}
1718
1719fn audit_context_for_document(dsl: &str) -> AuditContext {
1720 let summary = summarize_document_dsl(dsl);
1721 AuditContext {
1722 temporal_view: summary.temporal_view,
1723 query_goal: summary.query_goal,
1724 requested_element: summary.requested_element,
1725 ..Default::default()
1726 }
1727}
1728
1729#[derive(Default)]
1730struct DocumentAuditSummary {
1731 temporal_view: Option<String>,
1732 query_goal: Option<String>,
1733 requested_element: Option<u64>,
1734}
1735
1736fn summarize_document_dsl(dsl: &str) -> DocumentAuditSummary {
1737 let mut summary = DocumentAuditSummary::default();
1738 let mut in_query = false;
1739
1740 for line in dsl.lines() {
1741 let trimmed = line.trim();
1742 if trimmed.is_empty() {
1743 continue;
1744 }
1745 if !in_query {
1746 if trimmed.starts_with("query") && trimmed.ends_with('{') {
1747 in_query = true;
1748 }
1749 continue;
1750 }
1751
1752 if trimmed == "}" {
1753 break;
1754 }
1755
1756 if summary.temporal_view.is_none() {
1757 if trimmed == "current" {
1758 summary.temporal_view = Some("current".into());
1759 continue;
1760 }
1761 if let Some(element) = trimmed.strip_prefix("as_of ") {
1762 summary.temporal_view = Some(format!("as_of({})", element.trim()));
1763 summary.requested_element = element
1764 .trim()
1765 .strip_prefix('e')
1766 .and_then(|value| value.parse::<u64>().ok());
1767 continue;
1768 }
1769 }
1770
1771 if summary.query_goal.is_none() {
1772 if let Some(goal) = trimmed
1773 .strip_prefix("goal ")
1774 .or_else(|| trimmed.strip_prefix("find "))
1775 {
1776 summary.query_goal = Some(goal.trim().to_string());
1777 }
1778 }
1779 }
1780
1781 summary
1782}
1783
1784fn append_audit_entry(path: &Path, entry: &AuditEntry) -> Result<(), std::io::Error> {
1785 if let Some(parent) = path.parent() {
1786 fs::create_dir_all(parent)?;
1787 }
1788
1789 let mut file = OpenOptions::new().create(true).append(true).open(path)?;
1790 let json =
1791 serde_json::to_string(entry).map_err(|error| std::io::Error::other(error.to_string()))?;
1792 file.write_all(json.as_bytes())?;
1793 file.write_all(b"\n")?;
1794 Ok(())
1795}
1796
1797fn now_millis() -> u64 {
1798 let duration = SystemTime::now()
1799 .duration_since(UNIX_EPOCH)
1800 .unwrap_or_default();
1801 duration.as_millis().min(u128::from(u64::MAX)) as u64
1802}