Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 58 additions & 3 deletions crates/cli/src/commands/admin/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::Serialize;
use super::get_admin_client;
use crate::exit_code::ExitCode;
use crate::output::Formatter;
use rc_core::admin::{AdminApi, PoolDecommissionInfo, PoolStatus, PoolTarget};
use rc_core::admin::{AdminApi, ClusterInfo, PoolDecommissionInfo, PoolStatus, PoolTarget};

/// Pool subcommands
#[derive(Subcommand, Debug)]
Expand Down Expand Up @@ -98,8 +98,9 @@ async fn execute_status(args: StatusArgs, formatter: &Formatter) -> ExitCode {
}
}
} else {
match client.list_pools().await {
Ok(pools) => {
match client.cluster_info().await {
Ok(info) => {
let pools = pool_statuses_from_cluster_info(&info);
if formatter.is_json() {
formatter.json(&PoolListOutput { pools });
} else {
Expand All @@ -115,6 +116,60 @@ async fn execute_status(args: StatusArgs, formatter: &Formatter) -> ExitCode {
}
}

fn pool_statuses_from_cluster_info(info: &ClusterInfo) -> Vec<PoolStatus> {
let mut pools: Vec<PoolStatus> = info
.pools
.as_ref()
.map(|pools| {
pools
.iter()
.filter_map(|(pool_id, sets)| {
let id = usize::try_from(*pool_id).ok()?;
let raw_capacity = sets.values().map(|set| set.raw_capacity).sum::<u64>();
let raw_usage = sets.values().map(|set| set.raw_usage).sum::<u64>();

Some(PoolStatus {
id,
cmd_line: format!("pool {pool_id}"),
decommission: Some(PoolDecommissionInfo {
total_size: raw_capacity,
current_size: raw_capacity.saturating_sub(raw_usage),
..Default::default()
}),
..Default::default()
})
})
.collect()
})
.unwrap_or_default();

if pools.is_empty() {
pools = info
.servers
.as_ref()
.map(|servers| {
let mut pool_ids = servers
.iter()
.flat_map(|server| &server.disks)
.filter_map(|disk| usize::try_from(disk.pool_index).ok())
.collect::<Vec<_>>();
pool_ids.sort_unstable();
pool_ids.dedup();
pool_ids
.into_iter()
.map(|id| PoolStatus {
id,
cmd_line: format!("pool {id}"),
..Default::default()
})
.collect()
})
.unwrap_or_default();
}

pools
}

pub(super) fn print_pool_list(pools: &[PoolStatus], formatter: &Formatter) {
formatter.println(&formatter.style_name("Pools:"));
if pools.is_empty() {
Expand Down
43 changes: 43 additions & 0 deletions crates/cli/tests/admin_info.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#![cfg(not(windows))]

mod admin_support;

use std::process::Command;
use std::time::Duration;

use admin_support::{rc_binary, rc_host_alias, start_admin_test_server};

#[test]
fn cluster_info_displays_disk_location_indexes_from_snake_case_fields() {
let config_dir = tempfile::tempdir().expect("create config dir");
let (endpoint, receiver, handle) = start_admin_test_server(
r#"{"mode":"distributed","servers":[{"endpoint":"http://node1:9000","state":"online","drives":[{"endpoint":"http://node1:9000/data1","path":"/data1","state":"ok","totalspace":100,"usedspace":40,"availspace":60,"pool_index":1,"set_index":2,"disk_index":3}]}]}"#,
);

let output = Command::new(rc_binary())
.args(["admin", "info", "cluster", "myalias"])
.env("RC_CONFIG_DIR", config_dir.path())
.env("RC_HOST_myalias", rc_host_alias(&endpoint))
.output()
.expect("run rc command");

assert!(
output.status.success(),
"stderr: {}",
String::from_utf8_lossy(&output.stderr)
);

let stdout = String::from_utf8(output.stdout).expect("stdout should be UTF-8");
assert!(
stdout.contains("Location: pool:1 set:2 disk:3"),
"stdout: {stdout}"
);

let request = receiver
.recv_timeout(Duration::from_secs(5))
.expect("captured admin request");
assert_eq!(request.method, "GET");
assert_eq!(request.target, "/rustfs/admin/v3/info");

handle.join().expect("admin test server finished");
}
36 changes: 36 additions & 0 deletions crates/cli/tests/admin_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,42 @@ fn pool_list_dispatches_to_pool_list_json() {
handle.join().expect("admin test server finished");
}

#[test]
fn pool_status_without_target_dispatches_to_cluster_info_json() {
let config_dir = tempfile::tempdir().expect("create config dir");
let (endpoint, receiver, handle) = start_admin_test_server(
r#"{"pools":{"0":{"0":{"id":0,"rawUsage":10,"rawCapacity":100,"usage":4,"objectsCount":1,"versionsCount":1,"deleteMarkersCount":0,"healDisks":0}},"1":{"0":{"id":0,"rawUsage":20,"rawCapacity":200,"usage":8,"objectsCount":2,"versionsCount":2,"deleteMarkersCount":0,"healDisks":0}}}}"#,
);

let output = Command::new(rc_binary())
.args(["--json", "admin", "pool", "status", "myalias"])
.env("RC_CONFIG_DIR", config_dir.path())
.env("RC_HOST_myalias", rc_host_alias(&endpoint))
.output()
.expect("run rc command");

assert!(
output.status.success(),
"stderr: {}",
String::from_utf8_lossy(&output.stderr)
);

let stdout = String::from_utf8(output.stdout).expect("stdout should be UTF-8");
let payload: serde_json::Value = serde_json::from_str(&stdout).expect("JSON output");
let pools = payload["pools"].as_array().expect("pools array");
assert_eq!(pools.len(), 2);
assert_eq!(pools[0]["id"], 0);
assert_eq!(pools[1]["id"], 1);

let request = receiver
.recv_timeout(Duration::from_secs(5))
.expect("captured admin request");
assert_eq!(request.method, "GET");
assert_eq!(request.target, "/rustfs/admin/v3/info");

handle.join().expect("admin test server finished");
}

#[test]
fn pool_status_dispatches_by_id_pool_json() {
let config_dir = tempfile::tempdir().expect("create config dir");
Expand Down
60 changes: 56 additions & 4 deletions crates/core/src/admin/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! including server information, disk status, and heal operations.

use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};

/// Server information representing a RustFS node
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
Expand Down Expand Up @@ -96,15 +96,15 @@ pub struct DiskInfo {
pub available_space: u64,

/// Pool index
#[serde(default)]
#[serde(default, alias = "pool_index")]
pub pool_index: i32,

/// Set index
#[serde(default)]
#[serde(default, alias = "set_index")]
pub set_index: i32,

/// Disk index within set
#[serde(default)]
#[serde(default, alias = "disk_index")]
pub disk_index: i32,

/// Healing info if disk is being healed
Expand Down Expand Up @@ -284,6 +284,43 @@ pub struct ObjectsInfo {
pub error: Option<String>,
}

/// Pool erasure set metrics returned by cluster information.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct PoolErasureSetInfo {
/// Erasure set ID within the pool.
#[serde(default)]
pub id: i32,

/// Raw used capacity in bytes.
#[serde(default, rename = "rawUsage")]
pub raw_usage: u64,

/// Raw total capacity in bytes.
#[serde(default, rename = "rawCapacity")]
pub raw_capacity: u64,

/// Object data usage in bytes.
#[serde(default)]
pub usage: u64,

/// Number of objects in the set.
#[serde(default, rename = "objectsCount")]
pub objects_count: u64,

/// Number of versions in the set.
#[serde(default, rename = "versionsCount")]
pub versions_count: u64,

/// Number of delete markers in the set.
#[serde(default, rename = "deleteMarkersCount")]
pub delete_markers_count: u64,

/// Number of healing disks in the set.
#[serde(default, rename = "healDisks")]
pub heal_disks: i32,
}

/// Complete cluster information response
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -323,6 +360,10 @@ pub struct ClusterInfo {
/// Server information
#[serde(default)]
pub servers: Option<Vec<ServerInfo>>,

/// Pool metrics keyed by pool and erasure set index.
#[serde(default)]
pub pools: Option<BTreeMap<i32, BTreeMap<i32, PoolErasureSetInfo>>>,
}

impl ClusterInfo {
Expand Down Expand Up @@ -826,6 +867,17 @@ mod tests {
assert_eq!(disk.total_space, 0);
}

#[test]
fn test_disk_info_deserializes_snake_case_location_indexes() {
let json = r#"{"pool_index":1,"set_index":2,"disk_index":3}"#;

let disk: DiskInfo = serde_json::from_str(json).unwrap();

assert_eq!(disk.pool_index, 1);
assert_eq!(disk.set_index, 2);
assert_eq!(disk.disk_index, 3);
}

#[test]
fn test_server_info_default() {
let server = ServerInfo::default();
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ mod types;
pub use cluster::{
BackendInfo, BackendType, BucketsInfo, ClusterInfo, DiskInfo, HealDriveInfo, HealDriveInfos,
HealResultItem, HealScanMode, HealStartRequest, HealStatus, HealingDiskInfo, MemStats,
ObjectsInfo, PoolDecommissionInfo, PoolStatus, PoolTarget, RebalancePoolProgress,
RebalancePoolStatus, RebalanceStartResult, RebalanceStatus, ServerInfo, UsageInfo,
ObjectsInfo, PoolDecommissionInfo, PoolErasureSetInfo, PoolStatus, PoolTarget,
RebalancePoolProgress, RebalancePoolStatus, RebalanceStartResult, RebalanceStatus, ServerInfo,
UsageInfo,
};
pub use tier::{
TierAliyun, TierAzure, TierConfig, TierCreds, TierGCS, TierHuaweicloud, TierMinIO, TierR2,
Expand Down
Loading