Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- **Console Tenant Events (breaking)**: Removed `GET /api/v1/namespaces/{namespace}/tenants/{tenant}/events`. Events are delivered via **SSE** `GET .../tenants/{tenant}/events/stream` (`text/event-stream`; each `data:` line is JSON `EventListResponse`). The tenant detail **Events** tab uses `EventSource` with the same session cookie as other API calls. Aggregates `core/v1` Events for the Tenant CR, Pods, StatefulSets, and PVCs scoped with `rustfs.tenant` / pool naming (see PRD); Tenant rows require `involvedObject.kind=Tenant` and matching name.

- **Tenant `spec.encryption.vault`**: Removed `tlsSkipVerify` and `customCertificates` (they were never wired to `rustfs-kms`). Vault TLS should rely on system-trusted CAs or TLS upstream. The project is still pre-production; if you have old YAML with these keys, remove them before apply.

- **KMS pod environment** ([`tenant/workloads.rs`](src/types/v1alpha1/tenant/workloads.rs)): Align variable names with the RustFS server and `rustfs-kms` (`RUSTFS_KMS_ENABLE`, `RUSTFS_KMS_VAULT_ADDRESS`, KV mount and key prefix, local `RUSTFS_KMS_KEY_DIR` / `RUSTFS_KMS_DEFAULT_KEY_ID`, etc.); remove Vault TLS certificate volume mounts; `ping_seconds` remains documented as reserved (not injected).
Expand Down
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ chrono = { version = "0.4", features = ["serde"] }
const-str = "1.0.0"
serde = { version = "1.0.228", features = ["derive"] }
tokio = { version = "1.49.0", features = ["rt", "rt-multi-thread", "macros", "fs", "io-std", "io-util"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-util = { version = "0.7", features = ["io", "compat"] }
futures = "0.3.31"
tracing = "0.1.44"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import type {
PoolDetails,
PodListItem,
EventItem,
EventListResponse,
AddPoolRequest,
EncryptionInfoResponse,
UpdateEncryptionRequest,
Expand Down Expand Up @@ -72,6 +73,7 @@ export function TenantDetailClient({ namespace, name, initialTab, initialYamlEdi
const [pools, setPools] = useState<PoolDetails[]>([])
const [pods, setPods] = useState<PodListItem[]>([])
const [events, setEvents] = useState<EventItem[]>([])
const [eventsLoading, setEventsLoading] = useState(false)
const [loading, setLoading] = useState(true)
const [deleting, setDeleting] = useState(false)
const [addPoolOpen, setAddPoolOpen] = useState(false)
Expand Down Expand Up @@ -132,26 +134,20 @@ export function TenantDetailClient({ namespace, name, initialTab, initialYamlEdi
})

const loadTenant = async () => {
const [detailResult, poolResult, podResult, eventResult] = await Promise.allSettled([
const [detailResult, poolResult, podResult] = await Promise.allSettled([
api.getTenant(namespace, name),
api.listPools(namespace, name),
api.listPods(namespace, name),
api.listTenantEvents(namespace, name),
])

const detailOk = detailResult.status === "fulfilled"
const poolOk = poolResult.status === "fulfilled"
const podOk = podResult.status === "fulfilled"
const eventOk = eventResult.status === "fulfilled"

if (detailOk && poolOk && podOk) {
setTenant(detailResult.value)
setPools(poolResult.value.pools)
setPods(podResult.value.pods)
setEvents(eventOk ? eventResult.value.events : [])
if (!eventOk) {
toast.error(t("Events could not be loaded"))
}
} else {
const err = !detailOk
? (detailResult as PromiseRejectedResult).reason
Expand Down Expand Up @@ -182,6 +178,36 @@ export function TenantDetailClient({ namespace, name, initialTab, initialYamlEdi
loadTenant()
}, [namespace, name]) // eslint-disable-line react-hooks/exhaustive-deps -- reload when route params change

useEffect(() => {
setEvents([])
}, [namespace, name])

useEffect(() => {
if (tab !== "events") return
setEventsLoading(true)
let cleaned = false
const url = api.getTenantEventsStreamUrl(namespace, name)
const es = new EventSource(url, { withCredentials: true })
es.onmessage = (ev) => {
try {
const data = JSON.parse(ev.data) as EventListResponse
setEvents(data.events ?? [])
} catch {
/* ignore malformed chunk */
}
setEventsLoading(false)
}
es.onerror = () => {
if (cleaned) return
toast.error(t("Events stream could not be loaded"))
setEventsLoading(false)
}
return () => {
cleaned = true
es.close()
}
}, [tab, namespace, name, t])

useEffect(() => {
setTenantYaml("")
setTenantYamlSnapshot("")
Expand Down Expand Up @@ -1181,7 +1207,13 @@ export function TenantDetailClient({ namespace, name, initialTab, initialYamlEdi
</TableRow>
</TableHeader>
<TableBody>
{events.length === 0 ? (
{eventsLoading && events.length === 0 ? (
<TableRow>
<TableCell colSpan={5} className="text-center text-muted-foreground py-8">
<Spinner className="inline size-4" />
</TableCell>
</TableRow>
) : events.length === 0 ? (
<TableRow>
<TableCell colSpan={5} className="text-center text-muted-foreground py-8">
{t("No events")}
Expand Down
20 changes: 14 additions & 6 deletions console-web/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import type {
PodListResponse,
PodDetails,
DeletePodResponse,
EventListResponse,
NodeListResponse,
NamespaceListResponse,
ClusterResourcesResponse,
Expand All @@ -28,6 +27,7 @@ import type {
SecurityContextUpdateResponse,
} from "@/types/api"
import type { TopologyOverviewResponse } from "@/types/topology"
import { getApiBaseUrl } from "@/lib/config"

const ns = (namespace: string) => `/namespaces/${encodeURIComponent(namespace)}`
const tenant = (namespace: string, name: string) => `${ns(namespace)}/tenants/${encodeURIComponent(name)}`
Expand All @@ -37,8 +37,6 @@ const pool = (namespace: string, name: string, poolName: string) =>
const pods = (namespace: string, name: string) => `${tenant(namespace, name)}/pods`
const pod = (namespace: string, name: string, podName: string) =>
`${pods(namespace, name)}/${encodeURIComponent(podName)}`
const events = (namespace: string, tenantName: string) =>
`${ns(namespace)}/tenants/${encodeURIComponent(tenantName)}/events`
const tenantYaml = (namespace: string, name: string) => `${tenant(namespace, name)}/yaml`
const tenantStateCounts = "/tenants/state-counts"
const tenantStateCountsByNs = (namespace: string) => `${ns(namespace)}/tenants/state-counts`
Expand Down Expand Up @@ -190,9 +188,19 @@ export async function updateSecurityContext(
return apiClient.put<SecurityContextUpdateResponse>(securityContext(namespace, name), body)
}

// ----- Events -----
export async function listTenantEvents(namespace: string, tenantName: string): Promise<EventListResponse> {
return apiClient.get<EventListResponse>(events(namespace, tenantName))
// ----- Events (SSE) -----
/** Absolute URL for `EventSource` (cookie session + `withCredentials`). */
export function getTenantEventsStreamUrl(namespace: string, tenantName: string): string {
const path = `${ns(namespace)}/tenants/${encodeURIComponent(tenantName)}/events/stream`
if (typeof window === "undefined") {
return path
}
const base = getApiBaseUrl()
if (base.startsWith("http://") || base.startsWith("https://")) {
return `${base.replace(/\/$/, "")}${path}`
}
const baseNorm = base.startsWith("/") ? base : `/${base}`
return `${window.location.origin}${baseNorm}${path}`
}

// ----- Cluster -----
Expand Down
9 changes: 8 additions & 1 deletion src/console/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,16 @@ pub enum Error {
Json { source: serde_json::Error },
}

/// Map `kube::Error` to a console error (404 -> NotFound, 409 -> Conflict).
/// Map `kube::Error` to a console error (403 -> Forbidden, 404 -> NotFound, 409 -> Conflict).
pub fn map_kube_error(e: kube::Error, not_found_resource: impl Into<String>) -> Error {
match &e {
kube::Error::Api(ae) if ae.code == 403 => Error::Forbidden {
message: if ae.message.is_empty() {
"Kubernetes API access denied".to_string()
} else {
ae.message.clone()
},
},
kube::Error::Api(ae) if ae.code == 404 => Error::NotFound {
resource: not_found_resource.into(),
},
Expand Down
159 changes: 114 additions & 45 deletions src/console/handlers/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,65 +12,134 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::convert::Infallible;
use std::result::Result as StdResult;
use std::time::Duration;

use crate::console::{
error::{Error, Result},
error::{self, Error, Result},
models::event::{EventItem, EventListResponse},
state::Claims,
tenant_event_scope::{discover_tenant_event_scope, merge_namespace_events},
};
use axum::{
Extension,
extract::Path,
response::sse::{Event, KeepAlive, Sse},
};
use axum::{Extension, Json, extract::Path};
use futures::StreamExt;
use k8s_openapi::api::core::v1 as corev1;
use kube::{Api, Client, api::ListParams};
use kube::{
Api, Client,
api::ListParams,
runtime::{WatchStreamExt, watcher},
};
use tokio_stream::wrappers::ReceiverStream;

/// List Kubernetes events for objects named like the tenant.
/// SSE stream of merged tenant-scoped Kubernetes events (PRD §5.1).
///
/// On list failure (RBAC, field selector, etc.) returns an empty list and logs a warning so the
/// tenant detail page does not 500.
pub async fn list_tenant_events(
/// Uses the same `session` cookie JWT as other console routes. Payload each tick is JSON
/// `EventListResponse` (full snapshot, max [`tenant_event_scope::MAX_EVENTS_SNAPSHOT`]).
pub async fn stream_tenant_events(
Path((namespace, tenant)): Path<(String, String)>,
Extension(claims): Extension<Claims>,
) -> Result<Json<EventListResponse>> {
let client = match create_client(&claims).await {
Ok(c) => c,
Err(e) => return Err(e),
};
let api: Api<corev1::Event> = Api::namespaced(client, &namespace);
) -> Result<Sse<ReceiverStream<StdResult<Event, Infallible>>>> {
let client = create_client(&claims).await?;
// Preflight: fail the HTTP request if snapshot cannot be built (avoids 200 + empty SSE).
let first_json = build_snapshot_json(&client, &namespace, &tenant).await?;
let (tx, rx) = tokio::sync::mpsc::channel::<StdResult<Event, Infallible>>(16);
let ns = namespace.clone();
let tenant_name = tenant.clone();

let field_selector = format!("involvedObject.name={}", tenant);
let events = match api
.list(&ListParams::default().fields(&field_selector))
tokio::spawn(async move {
if let Err(e) = run_event_sse_loop(client, ns, tenant_name, tx, first_json).await {
tracing::warn!("Tenant events SSE ended with error: {}", e);
}
});

let stream = ReceiverStream::new(rx);
Ok(Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(15))
.text("ping"),
))
}

async fn run_event_sse_loop(
client: Client,
namespace: String,
tenant: String,
tx: tokio::sync::mpsc::Sender<StdResult<Event, Infallible>>,
first_json: String,
) -> Result<()> {
if tx
.send(Ok(Event::default().data(first_json)))
.await
.is_err()
{
Ok(ev) => ev,
Err(e) => {
tracing::warn!(
"List events for tenant {}/{} failed (returning empty): {}",
namespace,
tenant,
e
);
return Ok(Json(EventListResponse { events: vec![] }));
}
};
return Ok(());
}

let event_api: Api<corev1::Event> = Api::namespaced(client.clone(), &namespace);
let mut watch = watcher(event_api, watcher::Config::default())
.default_backoff()
.boxed();
let mut scope_tick = tokio::time::interval(Duration::from_secs(30));
scope_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let items: Vec<EventItem> = events
.items
.into_iter()
.map(|e| EventItem {
event_type: e.type_.unwrap_or_default(),
reason: e.reason.unwrap_or_default(),
message: e.message.unwrap_or_default(),
involved_object: format!(
"{}/{}",
e.involved_object.kind.unwrap_or_default(),
e.involved_object.name.unwrap_or_default()
),
first_timestamp: e.first_timestamp.map(|ts| ts.0.to_rfc3339()),
last_timestamp: e.last_timestamp.map(|ts| ts.0.to_rfc3339()),
count: e.count.unwrap_or(0),
})
.collect();
loop {
tokio::select! {
_ = scope_tick.tick() => {
let json = match build_snapshot_json(&client, &namespace, &tenant).await {
Ok(j) => j,
Err(e) => {
tracing::warn!("tenant events snapshot failed: {}", e);
continue;
}
};
if tx.send(Ok(Event::default().data(json))).await.is_err() {
return Ok(());
}
}
ev = watch.next() => {
match ev {
Some(Ok(_)) => {
let json = match build_snapshot_json(&client, &namespace, &tenant).await {
Ok(j) => j,
Err(e) => {
tracing::warn!("tenant events snapshot failed: {}", e);
continue;
}
};
if tx.send(Ok(Event::default().data(json))).await.is_err() {
return Ok(());
}
}
Some(Err(e)) => {
tracing::warn!("Kubernetes Event watch error: {}", e);
}
None => return Ok(()),
}
}
}
}
}

Ok(Json(EventListResponse { events: items }))
async fn build_snapshot_json(client: &Client, namespace: &str, tenant: &str) -> Result<String> {
let scope = discover_tenant_event_scope(client, namespace, tenant).await?;
let api: Api<corev1::Event> = Api::namespaced(client.clone(), namespace);
let list = api.list(&ListParams::default()).await.map_err(|e| {
tracing::warn!(
"List events for tenant {}/{} failed: {}",
namespace,
tenant,
e
);
error::map_kube_error(e, format!("Events for tenant '{}'", tenant))
})?;
let items: Vec<EventItem> = merge_namespace_events(&scope, list.items);
let body = EventListResponse { events: items };
serde_json::to_string(&body).map_err(|e| Error::Json { source: e })
}

/// Build a client impersonating the session token.
Expand Down
1 change: 1 addition & 0 deletions src/console/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ pub mod openapi;
pub mod routes;
pub mod server;
pub mod state;
pub mod tenant_event_scope;
Loading
Loading