mirror of
https://github.com/deepseek-ai/3FS
synced 2025-06-26 18:16:45 +00:00
Resolve mgmtd address name
Name resolution is very convenient for service discovery. We can just write the domain name in the config file and let external component to manage the DNS, e.g. Kubernetes coredns.
This commit is contained in:
parent
3a30c53bf2
commit
1c00672322
@ -169,13 +169,13 @@ bool runBenchmarks() {
|
||||
endpointRawStrs.clear();
|
||||
boost::split(endpointRawStrs, FLAGS_mgmtdEndpoints, boost::is_any_of(", "));
|
||||
|
||||
std::vector<net::Address> mgmtdEndpoints;
|
||||
std::vector<net::NamedAddress> mgmtdEndpoints;
|
||||
|
||||
for (auto str : endpointRawStrs) {
|
||||
boost::trim(str);
|
||||
if (str.empty()) continue;
|
||||
|
||||
auto endpoint = net::Address::fromString(str);
|
||||
auto endpoint = net::NamedAddress::from(str).value();
|
||||
mgmtdEndpoints.push_back(endpoint);
|
||||
XLOGF(WARN, "Add mgmtd endpoint: {}", endpoint);
|
||||
}
|
||||
|
||||
@ -52,7 +52,7 @@ class StorageBench : public test::UnitTestFabric {
|
||||
const std::string statsFilePath = "./perfstats.csv";
|
||||
const std::vector<std::string> ibvDevices = {};
|
||||
const std::vector<std::string> ibnetZones = {};
|
||||
const std::vector<net::Address> mgmtdEndpoints = {};
|
||||
const std::vector<net::NamedAddress> mgmtdEndpoints = {};
|
||||
const std::string clusterId = kClusterId;
|
||||
const uint32_t chainTableId = 0;
|
||||
const uint32_t chainTableVersion = 0;
|
||||
|
||||
@ -9,12 +9,15 @@
|
||||
#include <folly/experimental/coro/Promise.h>
|
||||
#include <folly/experimental/coro/Sleep.h>
|
||||
#include <folly/logging/xlog.h>
|
||||
#include <iterator>
|
||||
#include <memory>
|
||||
#include <scn/scn.h>
|
||||
|
||||
#include "common/app/ApplicationBase.h"
|
||||
#include "common/utils/BackgroundRunner.h"
|
||||
#include "common/utils/LogCommands.h"
|
||||
#include "common/utils/Result.h"
|
||||
#include "common/utils/Status.h"
|
||||
#include "core/utils/ServiceOperation.h"
|
||||
#include "core/utils/runOp.h"
|
||||
|
||||
@ -461,7 +464,18 @@ struct MgmtdClient::Impl {
|
||||
if (serverAddrs.empty()) {
|
||||
return makeError(StatusCode::kInvalidConfig, "Empty mgmtdServers");
|
||||
}
|
||||
for (auto addr : serverAddrs) {
|
||||
std::vector<net::Address> resolved;
|
||||
for (const auto &addr : serverAddrs) {
|
||||
auto res = addr.resolve(std::back_inserter(resolved));
|
||||
if (res.hasError()) {
|
||||
XLOG(WARN, "resolve mgmtd address: {}", res.error().describe());
|
||||
}
|
||||
}
|
||||
if (resolved.empty()) {
|
||||
return makeError(StatusCode::kInvalidConfig, "No resolved mgmtdServers");
|
||||
}
|
||||
|
||||
for (auto addr : resolved) {
|
||||
if (addr == net::Address(0))
|
||||
return makeError(StatusCode::kInvalidConfig, "Invalid MGMTD address: " + addr.toString());
|
||||
if (config_.network_type() && addr.type != *config_.network_type()) {
|
||||
|
||||
@ -5,6 +5,7 @@
|
||||
#include "RoutingInfo.h"
|
||||
#include "common/utils/ConfigBase.h"
|
||||
#include "common/utils/Duration.h"
|
||||
#include "common/utils/NamedAddress.h"
|
||||
#include "stubs/common/IStubFactory.h"
|
||||
#include "stubs/mgmtd/IMgmtdServiceStub.h"
|
||||
|
||||
@ -12,7 +13,7 @@ namespace hf3fs::client {
|
||||
class MgmtdClient {
|
||||
public:
|
||||
struct Config : public ConfigBase<Config> {
|
||||
CONFIG_ITEM(mgmtd_server_addresses, std::vector<net::Address>{});
|
||||
CONFIG_ITEM(mgmtd_server_addresses, std::vector<net::NamedAddress>{});
|
||||
CONFIG_ITEM(work_queue_size, 100, ConfigCheckers::checkPositive);
|
||||
CONFIG_ITEM(network_type, std::optional<net::Address::Type>{});
|
||||
|
||||
|
||||
71
src/common/utils/NamedAddress.cc
Normal file
71
src/common/utils/NamedAddress.cc
Normal file
@ -0,0 +1,71 @@
|
||||
#include "common/utils/NamedAddress.h"
|
||||
|
||||
#include <netdb.h>
|
||||
#include <netinet/in.h>
|
||||
#include <sys/socket.h>
|
||||
|
||||
namespace hf3fs::net {
|
||||
|
||||
template<typename It>
|
||||
requires std::output_iterator<It, Address>
|
||||
Result<Void> NamedAddress::resolve(It out) const {
|
||||
struct addrinfo req{
|
||||
.ai_family = AF_INET,
|
||||
.ai_socktype = SOCK_STREAM,
|
||||
};
|
||||
struct addrinfo *res;
|
||||
int err = getaddrinfo(node.c_str(), service.c_str(), &req, &res);
|
||||
if (err != 0) {
|
||||
return MAKE_ERROR_F(StatusCode::kInvalidFormat, "failed to resolve {}:{}: {}", node, service, gai_strerror(err));
|
||||
}
|
||||
SCOPE_EXIT { freeaddrinfo(res); };
|
||||
|
||||
auto iter = res;
|
||||
while (iter != nullptr) {
|
||||
if (iter->ai_family == AF_INET) {
|
||||
auto sin = (struct sockaddr_in *)iter->ai_addr;
|
||||
if (sin->sin_family == AF_INET) {
|
||||
*out++ = Address(sin->sin_addr.s_addr, ntohs(sin->sin_port), type);
|
||||
}
|
||||
}
|
||||
iter = iter->ai_next;
|
||||
}
|
||||
return Void{};
|
||||
}
|
||||
|
||||
template Result<Void> NamedAddress::resolve(std::back_insert_iterator<std::vector<Address>> out) const;
|
||||
|
||||
Result<NamedAddress> NamedAddress::from(std::string_view sv) {
|
||||
constexpr std::string_view delimiter = "://";
|
||||
auto pos = sv.find(delimiter);
|
||||
auto tp = Address::Type::TCP;
|
||||
if (pos != sv.npos) {
|
||||
auto tpStr = sv.substr(0, pos);
|
||||
auto opt = magic_enum::enum_cast<Address::Type>(tpStr, magic_enum::case_insensitive);
|
||||
if (!opt) {
|
||||
return makeError(StatusCode::kInvalidFormat, "invalid address type: {}", tpStr);
|
||||
}
|
||||
tp = *opt;
|
||||
sv = sv.substr(pos + delimiter.size());
|
||||
}
|
||||
pos = sv.find_last_of(':');
|
||||
if (pos == sv.npos) {
|
||||
return makeError(StatusCode::kInvalidFormat, "service not found in address: {}", sv);
|
||||
}
|
||||
return NamedAddress(std::string(sv.substr(0, pos)), std::string(sv.substr(pos + 1)), tp);
|
||||
}
|
||||
|
||||
NamedAddress to_named(Address addr) {
|
||||
return NamedAddress(addr.ipStr(), std::to_string(addr.port), addr.type);
|
||||
}
|
||||
|
||||
std::vector<NamedAddress> to_named(const std::vector<Address> &addrs) {
|
||||
std::vector<NamedAddress> named;
|
||||
named.reserve(addrs.size());
|
||||
for (auto a : addrs) {
|
||||
named.push_back(to_named(a));
|
||||
}
|
||||
return named;
|
||||
}
|
||||
|
||||
}
|
||||
38
src/common/utils/NamedAddress.h
Normal file
38
src/common/utils/NamedAddress.h
Normal file
@ -0,0 +1,38 @@
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <iterator>
|
||||
|
||||
#include "common/utils/Address.h"
|
||||
#include "common/utils/Result.h"
|
||||
|
||||
namespace hf3fs::net {
|
||||
|
||||
struct NamedAddress {
|
||||
std::string node, service;
|
||||
Address::Type type = Address::Type::TCP;
|
||||
|
||||
NamedAddress(std::string node, std::string service, Address::Type type)
|
||||
: node(node), service(service), type(type) {}
|
||||
|
||||
bool operator==(const NamedAddress &other) const {
|
||||
return node == other.node && service == other.service && type == other.type;
|
||||
}
|
||||
|
||||
template<typename It>
|
||||
requires std::output_iterator<It, Address>
|
||||
Result<Void> resolve(It out) const;
|
||||
|
||||
std::string toString() const {
|
||||
return fmt::format("{}://{}:{}", magic_enum::enum_name(type), node, service);
|
||||
}
|
||||
|
||||
static Result<NamedAddress> from(std::string_view sv);
|
||||
};
|
||||
|
||||
inline auto format_as(const NamedAddress &a) { return a.toString(); }
|
||||
|
||||
net::NamedAddress to_named(net::Address addr);
|
||||
std::vector<net::NamedAddress> to_named(const std::vector<net::Address> &addrs);
|
||||
|
||||
}
|
||||
@ -16,7 +16,7 @@ struct MgmtdClientWithConfig {
|
||||
MgmtdClientWithConfig(String clusterId,
|
||||
stubs::ClientContextCreator clientContextCreator,
|
||||
std::vector<net::Address> mgmtdServerAddrs) {
|
||||
config.set_mgmtd_server_addresses(mgmtdServerAddrs);
|
||||
config.set_mgmtd_server_addresses(to_named(mgmtdServerAddrs));
|
||||
config.set_enable_auto_refresh(false);
|
||||
config.set_enable_auto_heartbeat(false);
|
||||
config.set_enable_auto_extend_client_session(false);
|
||||
|
||||
@ -81,7 +81,7 @@ struct MetaServerWithConfig : ServerWithConfig<meta::server::MetaServer> {
|
||||
MetaServerWithConfig(String clusterId, flat::NodeId nodeId, std::vector<net::Address> mgmtdServerAddrs)
|
||||
: Base(std::move(clusterId), nodeId) {
|
||||
auto &mgmtdClientConfig = config.mgmtd_client();
|
||||
mgmtdClientConfig.set_mgmtd_server_addresses(mgmtdServerAddrs);
|
||||
mgmtdClientConfig.set_mgmtd_server_addresses(to_named(mgmtdServerAddrs));
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@ -144,7 +144,7 @@ TEST_F(MgmtdClientTest, testWhenNoPrimary) {
|
||||
};
|
||||
|
||||
MgmtdClient::Config config;
|
||||
config.set_mgmtd_server_addresses({ada, adb, adc});
|
||||
config.set_mgmtd_server_addresses({to_named(ada), to_named(adb), to_named(adc)});
|
||||
config.set_enable_auto_refresh(false);
|
||||
config.set_enable_auto_heartbeat(false);
|
||||
MgmtdClient client("", std::make_unique<StubFactory>(), config);
|
||||
@ -158,7 +158,7 @@ TEST_F(MgmtdClientTest, testWhenNoPrimary) {
|
||||
|
||||
TEST_F(MgmtdClientTest, testInvalidAddress) {
|
||||
MgmtdClient::Config config;
|
||||
config.set_mgmtd_server_addresses({ada, ade});
|
||||
config.set_mgmtd_server_addresses({to_named(ada), to_named(ade)});
|
||||
MgmtdClient client("", nullptr, config);
|
||||
|
||||
folly::coro::blockingWait([&]() -> CoTask<void> {
|
||||
@ -169,7 +169,7 @@ TEST_F(MgmtdClientTest, testInvalidAddress) {
|
||||
|
||||
TEST_F(MgmtdClientTest, testAddressTypeMismatch) {
|
||||
MgmtdClient::Config config;
|
||||
config.set_mgmtd_server_addresses({ada, adf});
|
||||
config.set_mgmtd_server_addresses({to_named(ada), to_named(adf)});
|
||||
config.set_network_type(net::Address::TCP);
|
||||
MgmtdClient client("", nullptr, config);
|
||||
|
||||
@ -195,7 +195,7 @@ TEST_F(MgmtdClientTest, testWhenLastIsPrimary) {
|
||||
};
|
||||
|
||||
MgmtdClient::Config config;
|
||||
config.set_mgmtd_server_addresses({ada, adb, adc});
|
||||
config.set_mgmtd_server_addresses({to_named(ada), to_named(adb), to_named(adc)});
|
||||
config.set_enable_auto_refresh(false);
|
||||
config.set_enable_auto_heartbeat(false);
|
||||
MgmtdClient client("", std::make_unique<StubFactory>(), config);
|
||||
@ -229,7 +229,7 @@ TEST_F(MgmtdClientTest, testWhenPrimaryNotInConfig) {
|
||||
};
|
||||
|
||||
MgmtdClient::Config config;
|
||||
config.set_mgmtd_server_addresses({ada, adb, adc});
|
||||
config.set_mgmtd_server_addresses({to_named(ada), to_named(adb), to_named(adc)});
|
||||
config.set_enable_auto_refresh(false);
|
||||
config.set_enable_auto_heartbeat(false);
|
||||
MgmtdClient client("", std::make_unique<StubFactory>(), config);
|
||||
@ -272,7 +272,7 @@ TEST_F(MgmtdClientTest, testWhenGetPrimaryLoop) {
|
||||
};
|
||||
|
||||
MgmtdClient::Config config;
|
||||
config.set_mgmtd_server_addresses(addresses);
|
||||
config.set_mgmtd_server_addresses(to_named(addresses));
|
||||
config.set_enable_auto_refresh(false);
|
||||
config.set_enable_auto_heartbeat(false);
|
||||
MgmtdClient client("", std::make_unique<StubFactory>(addresses), config);
|
||||
@ -321,7 +321,7 @@ TEST_F(MgmtdClientTest, testRetryOnRefreshFail) {
|
||||
};
|
||||
|
||||
MgmtdClient::Config config;
|
||||
config.set_mgmtd_server_addresses(addresses);
|
||||
config.set_mgmtd_server_addresses(to_named(addresses));
|
||||
config.set_enable_auto_refresh(false);
|
||||
config.set_enable_auto_heartbeat(false);
|
||||
MgmtdClient client("", std::make_unique<StubFactory>(), config);
|
||||
@ -356,7 +356,7 @@ TEST_F(MgmtdClientTest, testRefreshRoutingInfoCallback) {
|
||||
};
|
||||
|
||||
MgmtdClient::Config config;
|
||||
config.set_mgmtd_server_addresses(addresses);
|
||||
config.set_mgmtd_server_addresses(to_named(addresses));
|
||||
config.set_enable_auto_refresh(false);
|
||||
config.set_enable_auto_heartbeat(false);
|
||||
MgmtdClient client("", std::make_unique<StubFactory>(), config);
|
||||
@ -400,7 +400,7 @@ TEST_F(MgmtdClientTest, testRetryAllAvailableAddresses) {
|
||||
};
|
||||
|
||||
MgmtdClient::Config config;
|
||||
config.set_mgmtd_server_addresses({adj});
|
||||
config.set_mgmtd_server_addresses({to_named(adj)});
|
||||
config.set_enable_auto_refresh(false);
|
||||
config.set_enable_auto_heartbeat(false);
|
||||
config.set_network_type(net::Address::TCP);
|
||||
@ -462,7 +462,7 @@ TEST_F(MgmtdClientTest, testRetryEndWhenNoPrimary) {
|
||||
};
|
||||
|
||||
MgmtdClient::Config config;
|
||||
config.set_mgmtd_server_addresses({adj});
|
||||
config.set_mgmtd_server_addresses({to_named(adj)});
|
||||
config.set_enable_auto_refresh(false);
|
||||
config.set_enable_auto_heartbeat(false);
|
||||
config.set_network_type(net::Address::TCP);
|
||||
@ -522,7 +522,7 @@ TEST_F(MgmtdClientTest, testRetryUnknownAddrs) {
|
||||
};
|
||||
|
||||
MgmtdClient::Config config;
|
||||
config.set_mgmtd_server_addresses({adj, add});
|
||||
config.set_mgmtd_server_addresses({to_named(adj), to_named(add)});
|
||||
config.set_enable_auto_refresh(false);
|
||||
config.set_enable_auto_heartbeat(false);
|
||||
config.set_network_type(net::Address::TCP);
|
||||
@ -562,7 +562,6 @@ TEST_F(MgmtdClientTest, testRetryUnknownAddrs) {
|
||||
}
|
||||
|
||||
TEST_F(MgmtdClientTest, testSetGetConfigViaInvoke) {
|
||||
std::vector<net::Address> addresses = {ada};
|
||||
struct StubFactory : public stubs::IStubFactory<mgmtd::IMgmtdServiceStub> {
|
||||
std::map<flat::NodeType, flat::ConfigInfo> configs;
|
||||
std::unique_ptr<mgmtd::IMgmtdServiceStub> create(net::Address) {
|
||||
@ -585,7 +584,7 @@ TEST_F(MgmtdClientTest, testSetGetConfigViaInvoke) {
|
||||
};
|
||||
|
||||
MgmtdClient::Config config;
|
||||
config.set_mgmtd_server_addresses(addresses);
|
||||
config.set_mgmtd_server_addresses({to_named(ada)});
|
||||
config.set_enable_auto_refresh(false);
|
||||
config.set_enable_auto_heartbeat(false);
|
||||
MgmtdClient client("", std::make_unique<StubFactory>(), config);
|
||||
|
||||
@ -195,7 +195,7 @@ std::unique_ptr<storage::StorageServer> UnitTestFabric::createStorageServer(size
|
||||
serverConfig.targets().storage_target().kv_store().set_type(metaStoreType);
|
||||
serverConfig.targets().storage_target().file_store().set_preopen_chunk_size_list(
|
||||
std::set(chunkSizeList.begin(), chunkSizeList.end()));
|
||||
serverConfig.mgmtd().set_mgmtd_server_addresses(mgmtdAddressList);
|
||||
serverConfig.mgmtd().set_mgmtd_server_addresses(to_named(mgmtdAddressList));
|
||||
serverConfig.coroutines_pool_read().set_threads_num(32);
|
||||
serverConfig.coroutines_pool_read().set_coroutines_num(4096);
|
||||
serverConfig.coroutines_pool_update().set_threads_num(32);
|
||||
@ -417,7 +417,7 @@ bool UnitTestFabric::setUpStorageSystem() {
|
||||
|
||||
mgmtdForClient_.reset((new FakeMgmtdClient(rawRoutingInfo_))->asCommon());
|
||||
} else {
|
||||
mgmtdClientConfig_.set_mgmtd_server_addresses(mgmtdAddressList);
|
||||
mgmtdClientConfig_.set_mgmtd_server_addresses(to_named(mgmtdAddressList));
|
||||
mgmtdClientConfig_.set_enable_auto_refresh(true);
|
||||
mgmtdClientConfig_.set_enable_auto_heartbeat(false);
|
||||
mgmtdClientConfig_.set_auto_refresh_interval(mgmtdServer_.config.service().check_status_interval());
|
||||
|
||||
@ -371,7 +371,7 @@ class MockCluster {
|
||||
CONFIG_OBJ(mock_mgmtd, mgmtd::MockMgmtd::Config);
|
||||
CONFIG_OBJ(mgmtd_client, client::MgmtdClientForServer::Config, [](client::MgmtdClientForServer::Config &cfg) {
|
||||
cfg.set_enable_auto_heartbeat(false); // currently heartbeat is not needed in meta test
|
||||
cfg.set_mgmtd_server_addresses({net::Address::from("TCP://127.0.0.1:8000").value()}); // just a fake TCP address.
|
||||
cfg.set_mgmtd_server_addresses({net::NamedAddress::from("TCP://127.0.0.1:8000").value()}); // just a fake TCP address.
|
||||
});
|
||||
|
||||
CONFIG_OBJ_ARRAY(chain_tables, ChainTableConfig, 64, [](std::array<ChainTableConfig, 64> &cfg) {
|
||||
|
||||
@ -65,7 +65,7 @@ TEST_F(TestTestMigrationService, StartAndStopServer) {
|
||||
const auto &mgmtdAddressList = mgmtdServer_.collectAddressList("Mgmtd");
|
||||
|
||||
server::MigrationServer::Config config;
|
||||
config.mgmtd_client().set_mgmtd_server_addresses(mgmtdAddressList);
|
||||
config.mgmtd_client().set_mgmtd_server_addresses(to_named(mgmtdAddressList));
|
||||
server::MigrationServer server(config);
|
||||
|
||||
auto result = server.setup();
|
||||
|
||||
Loading…
Reference in New Issue
Block a user