Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion crates/cli/src/commands/admin/rebalance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::Serialize;
use super::get_admin_client;
use crate::exit_code::ExitCode;
use crate::output::Formatter;
use rc_core::admin::{AdminApi, RebalancePoolStatus, RebalanceStatus};
use rc_core::admin::{AdminApi, RebalanceCleanupWarnings, RebalancePoolStatus, RebalanceStatus};

/// Rebalance subcommands
#[derive(Subcommand, Debug)]
Expand Down Expand Up @@ -217,6 +217,14 @@ fn print_rebalance_pool(pool: &RebalancePoolStatus, formatter: &Formatter) {
formatter.println(&format!(" Last error: {error}"));
}

if pool.cleanup_warnings.count > 0 {
formatter.println(&format!(
" {} {}",
formatter.theme().warning.apply_to("Cleanup warnings:"),
format_cleanup_warnings(&pool.cleanup_warnings)
));
}

if let Some(progress) = &pool.progress {
formatter.println(&format!(
" Progress: {} moved, {} objects, {} versions",
Expand All @@ -242,6 +250,41 @@ fn print_rebalance_pool(pool: &RebalancePoolStatus, formatter: &Formatter) {
}
}

fn format_cleanup_warnings(warnings: &RebalanceCleanupWarnings) -> String {
let mut details = vec![format!("{} warning(s)", warnings.count)];

if let Some(message) = warnings
.last_message
.as_deref()
.filter(|value| !value.is_empty())
{
details.push(format!("last message: {message}"));
}
if let Some(bucket) = warnings
.last_bucket
.as_deref()
.filter(|value| !value.is_empty())
{
details.push(format!("bucket: {bucket}"));
}
if let Some(object) = warnings
.last_object
.as_deref()
.filter(|value| !value.is_empty())
{
details.push(format!("object: {object}"));
}
if let Some(at) = warnings
.last_at
.as_deref()
.filter(|value| !value.is_empty())
{
details.push(format!("at: {at}"));
}

details.join(", ")
}

fn style_status(status: &str, formatter: &Formatter) -> String {
match status {
"Started" | "running" => formatter.style_name(status),
Expand Down Expand Up @@ -326,4 +369,20 @@ mod tests {
assert_eq!(style_status("idle", &formatter), "idle");
assert_eq!(style_status("Queued", &formatter), "Queued");
}

#[test]
fn test_format_cleanup_warnings() {
let warnings = RebalanceCleanupWarnings {
count: 2,
last_message: Some("cleanup warning".to_string()),
last_bucket: Some("bucket-a".to_string()),
last_object: Some("object-a".to_string()),
last_at: Some("2026-06-12T00:00:00Z".to_string()),
};

assert_eq!(
format_cleanup_warnings(&warnings),
"2 warning(s), last message: cleanup warning, bucket: bucket-a, object: object-a, at: 2026-06-12T00:00:00Z"
);
}
}
9 changes: 7 additions & 2 deletions crates/cli/tests/admin_expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn scale_start_dispatches_to_rebalance_start_with_expansion_json() {
fn expand_status_dispatches_to_rebalance_status_json() {
let config_dir = tempfile::tempdir().expect("create config dir");
let (endpoint, receiver, handle) = start_admin_test_server(
r#"{"id":"rebalance-123","pools":[],"stoppedAt":"2026-05-06T00:00:00Z"}"#,
r#"{"id":"rebalance-123","pools":[{"id":0,"status":"Completed","used":0.5,"lastError":null,"cleanupWarnings":{"count":1,"lastMsg":"cleanup warning","lastBucket":"test-bucket","lastObject":"object-a","lastAt":"2026-06-12T00:00:00Z"},"progress":null}],"stoppedAt":"2026-05-06T00:00:00Z"}"#,
);

let output = Command::new(rc_binary())
Expand All @@ -73,7 +73,12 @@ fn expand_status_dispatches_to_rebalance_status_json() {
.as_array()
.expect("pools should be an array")
.len(),
0
1
);
assert_eq!(payload["pools"][0]["cleanupWarnings"]["count"], 1);
assert_eq!(
payload["pools"][0]["cleanupWarnings"]["lastMsg"],
"cleanup warning"
);

let request = receiver
Expand Down
13 changes: 11 additions & 2 deletions crates/cli/tests/admin_rebalance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn rebalance_start_dispatches_to_rebalance_start_json() {
fn rebalance_status_dispatches_to_rebalance_status_json() {
let config_dir = tempfile::tempdir().expect("create config dir");
let (endpoint, receiver, handle) = start_admin_test_server(
r#"{"id":"rebalance-123","pools":[],"stoppedAt":"2026-05-07T00:00:00Z"}"#,
r#"{"id":"rebalance-123","pools":[{"id":0,"status":"Completed","used":0.5,"lastError":null,"cleanupWarnings":{"count":1,"lastMsg":"cleanup warning","lastBucket":"test-bucket","lastObject":"object-a","lastAt":"2026-06-12T00:00:00Z"},"progress":null}],"stoppedAt":"2026-05-07T00:00:00Z"}"#,
);

let output = Command::new(rc_binary())
Expand All @@ -65,7 +65,16 @@ fn rebalance_status_dispatches_to_rebalance_status_json() {
let payload: serde_json::Value = serde_json::from_str(&stdout).expect("JSON output");
assert_eq!(payload["id"], "rebalance-123");
assert_eq!(payload["stoppedAt"], "2026-05-07T00:00:00Z");
assert_eq!(payload["pools"].as_array().expect("pools array").len(), 0);
assert_eq!(payload["pools"].as_array().expect("pools array").len(), 1);
assert_eq!(payload["pools"][0]["cleanupWarnings"]["count"], 1);
assert_eq!(
payload["pools"][0]["cleanupWarnings"]["lastMsg"],
"cleanup warning"
);
assert_eq!(
payload["pools"][0]["cleanupWarnings"]["lastBucket"],
"test-bucket"
);

let request = receiver
.recv_timeout(Duration::from_secs(5))
Expand Down
45 changes: 44 additions & 1 deletion crates/core/src/admin/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,11 +730,39 @@ pub struct RebalancePoolStatus {
#[serde(default, rename = "lastError")]
pub last_error: Option<String>,

/// Cleanup warnings observed after this pool finishes rebalance.
#[serde(default, rename = "cleanupWarnings")]
pub cleanup_warnings: RebalanceCleanupWarnings,

/// Rebalance progress, if this pool is active.
#[serde(default)]
pub progress: Option<RebalancePoolProgress>,
}

/// Cleanup warnings recorded for a rebalanced pool.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct RebalanceCleanupWarnings {
/// Number of cleanup warnings observed.
#[serde(default)]
pub count: u64,

/// Last cleanup warning message.
#[serde(default, rename = "lastMsg")]
pub last_message: Option<String>,

/// Bucket associated with the last cleanup warning.
#[serde(default, rename = "lastBucket")]
pub last_bucket: Option<String>,

/// Object associated with the last cleanup warning.
#[serde(default, rename = "lastObject")]
pub last_object: Option<String>,

/// Timestamp of the last cleanup warning.
#[serde(default, rename = "lastAt")]
pub last_at: Option<String>,
}

/// Rebalance progress for a single pool.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct RebalancePoolProgress {
Expand Down Expand Up @@ -919,13 +947,18 @@ mod tests {

#[test]
fn test_rebalance_status_deserialization() {
let json = r#"{"id":"rebalance-1","pools":[{"id":0,"status":"Started","used":0.5,"lastError":null,"progress":{"objects":3,"versions":4,"bytes":1024,"remainingBuckets":2,"bucket":"bucket","object":"object","elapsed":10,"eta":20}}],"stoppedAt":null}"#;
let json = r#"{"id":"rebalance-1","pools":[{"id":0,"status":"Started","used":0.5,"lastError":null,"cleanupWarnings":{"count":1,"lastMsg":"cleanup warning","lastBucket":"bucket","lastObject":"object","lastAt":"2026-06-12T00:00:00Z"},"progress":{"objects":3,"versions":4,"bytes":1024,"remainingBuckets":2,"bucket":"bucket","object":"object","elapsed":10,"eta":20}}],"stoppedAt":null}"#;

let status: RebalanceStatus = serde_json::from_str(json).unwrap();

assert_eq!(status.id, "rebalance-1");
assert_eq!(status.pools.len(), 1);
assert_eq!(status.pools[0].used, 0.5);
assert_eq!(status.pools[0].cleanup_warnings.count, 1);
assert_eq!(
status.pools[0].cleanup_warnings.last_message.as_deref(),
Some("cleanup warning")
);
let progress = status.pools[0]
.progress
.as_ref()
Expand All @@ -934,6 +967,16 @@ mod tests {
assert_eq!(progress.remaining_buckets, 2);
}

#[test]
fn test_rebalance_status_defaults_cleanup_warnings() {
let json = r#"{"id":"rebalance-1","pools":[{"id":0,"status":"Completed","used":0.5,"lastError":null,"progress":null}],"stoppedAt":null}"#;

let status: RebalanceStatus = serde_json::from_str(json).unwrap();

assert_eq!(status.pools[0].cleanup_warnings.count, 0);
assert_eq!(status.pools[0].cleanup_warnings.last_message, None);
}

#[test]
fn test_serialization() {
let info = ClusterInfo {
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ pub use cluster::{
BackendInfo, BackendType, BucketsInfo, ClusterInfo, DiskInfo, HealDriveInfo, HealDriveInfos,
HealResultItem, HealScanMode, HealStartRequest, HealStatus, HealingDiskInfo, MemStats,
ObjectsInfo, PoolDecommissionInfo, PoolErasureSetInfo, PoolStatus, PoolTarget,
RebalancePoolProgress, RebalancePoolStatus, RebalanceStartResult, RebalanceStatus, ServerInfo,
UsageInfo,
RebalanceCleanupWarnings, RebalancePoolProgress, RebalancePoolStatus, RebalanceStartResult,
RebalanceStatus, ServerInfo, UsageInfo,
};
pub use tier::{
TierAliyun, TierAzure, TierConfig, TierCreds, TierGCS, TierHuaweicloud, TierMinIO, TierR2,
Expand Down
52 changes: 52 additions & 0 deletions schemas/output_v2.json
Original file line number Diff line number Diff line change
Expand Up @@ -435,13 +435,62 @@
}
}
},
"rebalanceCleanupWarnings": {
"type": "object",
"required": [
"count",
"lastMsg",
"lastBucket",
"lastObject",
"lastAt"
],
"properties": {
"count": {
"type": "integer",
"description": "Cleanup warning count"
},
"lastMsg": {
"type": [
"string",
"null"
],
"description": "Last cleanup warning message"
},
"lastBucket": {
"type": [
"string",
"null"
],
"description": "Bucket from the last cleanup warning"
},
"lastObject": {
"type": [
"string",
"null"
],
"description": "Object from the last cleanup warning"
},
"lastAt": {
"oneOf": [
{
"$ref": "#/definitions/timestamp"
},
{
"type": "null"
}
],
"description": "Timestamp of the last cleanup warning"
}
}
},
"rebalancePoolStatus": {
"type": "object",
"required": [
"id",
"status",
"used",
"lastError",
"cleanupWarnings",
"progress"
],
"properties": {
Expand All @@ -464,6 +513,9 @@
],
"description": "Last rebalance error for this pool"
},
"cleanupWarnings": {
"$ref": "#/definitions/rebalanceCleanupWarnings"
},
"progress": {
"oneOf": [
{
Expand Down
Loading