mirror of
https://github.com/deepseek-ai/3FS
synced 2025-06-11 09:04:06 +00:00
Merge e26f5a901a
into 91bfcf3606
This commit is contained in:
commit
f26cb1cee4
@ -25,7 +25,6 @@ namespace {
|
||||
auto getParser() {
|
||||
argparse::ArgumentParser parser("dump-dentries");
|
||||
parser.add_argument("-n", "--num-dentries-perfile").default_value(uint32_t{10'000'000}).scan<'u', uint32_t>();
|
||||
parser.add_argument("-f", "--fdb-cluster-file").default_value(std::string{"./fdb.cluster"});
|
||||
parser.add_argument("-d", "--dentry-dir").default_value(std::string{"dentries"});
|
||||
parser.add_argument("-t", "--threads").default_value(uint32_t(4)).scan<'u', uint32_t>();
|
||||
return parser;
|
||||
@ -37,14 +36,13 @@ CoTryTask<Dispatcher::OutputTable> dumpDirEntries(IEnv &ienv,
|
||||
auto &env = dynamic_cast<AdminEnv &>(ienv);
|
||||
ENSURE_USAGE(args.empty());
|
||||
ENSURE_USAGE(env.mgmtdClientGetter);
|
||||
ENSURE_USAGE(env.kvEngineGetter);
|
||||
|
||||
const auto &fdbClusterFile = parser.get<std::string>("fdb-cluster-file");
|
||||
const auto &numEntriesPerFile = parser.get<uint32_t>("num-dentries-perfile");
|
||||
const auto &dentryDir = parser.get<std::string>("dentry-dir");
|
||||
const auto threads = parser.get<uint32_t>("threads");
|
||||
|
||||
ENSURE_USAGE(threads > 0);
|
||||
ENSURE_USAGE(!fdbClusterFile.empty());
|
||||
ENSURE_USAGE(!dentryDir.empty());
|
||||
ENSURE_USAGE(numEntriesPerFile > 0);
|
||||
|
||||
@ -55,13 +53,13 @@ CoTryTask<Dispatcher::OutputTable> dumpDirEntries(IEnv &ienv,
|
||||
|
||||
Dispatcher::OutputTable table;
|
||||
auto dumpRes =
|
||||
co_await dumpDirEntriesFromFdb(fdbClusterFile, numEntriesPerFile, dentryDir, std::max(uint32_t(1), threads));
|
||||
co_await dumpDirEntriesFromFdb(env.kvEngineGetter(), numEntriesPerFile, dentryDir, std::max(uint32_t(1), threads));
|
||||
if (!dumpRes) co_return makeError(dumpRes.error());
|
||||
co_return table;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
CoTryTask<Void> dumpDirEntriesFromFdb(const std::string fdbClusterFile,
|
||||
CoTryTask<Void> dumpDirEntriesFromFdb(std::shared_ptr<kv::IKVEngine> kvEngine,
|
||||
const uint32_t numEntriesPerDir,
|
||||
const std::string dentryDir,
|
||||
const uint32_t threads) {
|
||||
@ -77,10 +75,9 @@ CoTryTask<Void> dumpDirEntriesFromFdb(const std::string fdbClusterFile,
|
||||
XLOGF(CRITICAL, "Saving directory entries to directory: {}", dentryDir);
|
||||
|
||||
meta::server::MetaScan::Options options;
|
||||
options.fdb_cluster_file = fdbClusterFile;
|
||||
options.threads = 8;
|
||||
options.coroutines = 32;
|
||||
auto scan = std::make_unique<meta::server::MetaScan>(options);
|
||||
auto scan = std::make_unique<meta::server::MetaScan>(options, kvEngine);
|
||||
auto exec = std::make_unique<folly::CPUThreadPoolExecutor>(16);
|
||||
|
||||
time_t timestamp = UtcClock::secondsSinceEpoch();
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <cstdint>
|
||||
#include <vector>
|
||||
|
||||
#include "common/kv/IKVEngine.h"
|
||||
#include "common/serde/Serde.h"
|
||||
#include "fbs/meta/Schema.h"
|
||||
namespace hf3fs::client::cli {
|
||||
@ -21,7 +22,7 @@ struct DirEntryTable {
|
||||
};
|
||||
static_assert(serde::Serializable<DirEntryTable>);
|
||||
|
||||
CoTryTask<Void> dumpDirEntriesFromFdb(const std::string fdbClusterFile,
|
||||
CoTryTask<Void> dumpDirEntriesFromFdb(std::shared_ptr<kv::IKVEngine> kvEngine,
|
||||
const uint32_t numEntriesPerDir,
|
||||
const std::string dentryDir,
|
||||
const uint32_t threads);
|
||||
|
@ -30,7 +30,6 @@ namespace {
|
||||
auto getParser() {
|
||||
argparse::ArgumentParser parser("dump-inodes");
|
||||
parser.add_argument("-n", "--num-inodes-perfile").default_value(uint32_t{10'000'000}).scan<'u', uint32_t>();
|
||||
parser.add_argument("-f", "--fdb-cluster-file").default_value(std::string{"./fdb.cluster"});
|
||||
parser.add_argument("-i", "--inode-dir").default_value(std::string{"inodes"});
|
||||
parser.add_argument("-q", "--parquet-format").default_value(false).implicit_value(true);
|
||||
parser.add_argument("-a", "--all-inodes").default_value(false).implicit_value(true);
|
||||
@ -44,8 +43,8 @@ CoTryTask<Dispatcher::OutputTable> dumpInodes(IEnv &ienv,
|
||||
auto &env = dynamic_cast<AdminEnv &>(ienv);
|
||||
ENSURE_USAGE(args.empty());
|
||||
ENSURE_USAGE(env.mgmtdClientGetter);
|
||||
ENSURE_USAGE(env.kvEngineGetter);
|
||||
|
||||
const auto &fdbClusterFile = parser.get<std::string>("fdb-cluster-file");
|
||||
const auto &numInodesPerFile = parser.get<uint32_t>("num-inodes-perfile");
|
||||
const auto &inodeDir = parser.get<std::string>("inode-dir");
|
||||
const auto &parquetFormat = parser.get<bool>("parquet-format");
|
||||
@ -53,7 +52,6 @@ CoTryTask<Dispatcher::OutputTable> dumpInodes(IEnv &ienv,
|
||||
const auto &threads = parser.get<uint32_t>("threads");
|
||||
|
||||
ENSURE_USAGE(threads > 0);
|
||||
ENSURE_USAGE(!fdbClusterFile.empty());
|
||||
|
||||
if (boost::filesystem::exists(inodeDir)) {
|
||||
XLOGF(CRITICAL, "Output directory for inodes already exists: {}", inodeDir);
|
||||
@ -61,7 +59,7 @@ CoTryTask<Dispatcher::OutputTable> dumpInodes(IEnv &ienv,
|
||||
}
|
||||
|
||||
Dispatcher::OutputTable table;
|
||||
auto dumpRes = co_await dumpInodesFromFdb(fdbClusterFile,
|
||||
auto dumpRes = co_await dumpInodesFromFdb(env.kvEngineGetter(),
|
||||
numInodesPerFile,
|
||||
inodeDir,
|
||||
parquetFormat,
|
||||
@ -73,7 +71,7 @@ CoTryTask<Dispatcher::OutputTable> dumpInodes(IEnv &ienv,
|
||||
|
||||
} // namespace
|
||||
|
||||
CoTryTask<Void> dumpInodesFromFdb(const std::string fdbClusterFile,
|
||||
CoTryTask<Void> dumpInodesFromFdb(std::shared_ptr<kv::IKVEngine> kvEngine,
|
||||
const uint32_t numInodesPerFile,
|
||||
const std::string inodeDir,
|
||||
const bool parquetFormat,
|
||||
@ -90,10 +88,9 @@ CoTryTask<Void> dumpInodesFromFdb(const std::string fdbClusterFile,
|
||||
XLOGF(CRITICAL, "Saving inodes to directory: {}", inodeDir);
|
||||
|
||||
meta::server::MetaScan::Options options;
|
||||
options.fdb_cluster_file = fdbClusterFile;
|
||||
options.threads = 8;
|
||||
options.coroutines = 32;
|
||||
auto scan = std::make_unique<meta::server::MetaScan>(options);
|
||||
auto scan = std::make_unique<meta::server::MetaScan>(options, kvEngine);
|
||||
auto exec = std::make_unique<folly::CPUThreadPoolExecutor>(16);
|
||||
|
||||
time_t timestamp = UtcClock::secondsSinceEpoch();
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include "common/kv/IKVEngine.h"
|
||||
#include "common/serde/Serde.h"
|
||||
#include "common/utils/Coroutine.h"
|
||||
#include "fbs/meta/Schema.h"
|
||||
@ -25,7 +26,7 @@ static_assert(serde::Serializable<InodeTable>);
|
||||
|
||||
std::vector<Path> listFilesFromPath(const Path path);
|
||||
|
||||
CoTryTask<Void> dumpInodesFromFdb(const std::string fdbClusterFile,
|
||||
CoTryTask<Void> dumpInodesFromFdb(std::shared_ptr<kv::IKVEngine> kvEngine,
|
||||
const uint32_t numInodesPerFile,
|
||||
const std::string inodeDir,
|
||||
const bool parquetFormat = false,
|
||||
|
@ -16,7 +16,6 @@ namespace {
|
||||
auto getParser() {
|
||||
argparse::ArgumentParser parser("remove-chunks");
|
||||
parser.add_argument("-n", "--num-inodes-perfile").default_value(uint32_t{10'000'000}).scan<'u', uint32_t>();
|
||||
parser.add_argument("-f", "--fdb-cluster-file").default_value(std::string{"./fdb.cluster"});
|
||||
parser.add_argument("-i", "--inode-dir").default_value(std::string{"inodes2"});
|
||||
parser.add_argument("-o", "--orphaned-path").default_value(std::string{"orphaned"});
|
||||
parser.add_argument("-r", "--do-remove").default_value(false).implicit_value(true);
|
||||
@ -30,11 +29,11 @@ CoTryTask<Dispatcher::OutputTable> removeChunks(IEnv &ienv,
|
||||
auto &env = dynamic_cast<AdminEnv &>(ienv);
|
||||
ENSURE_USAGE(args.empty());
|
||||
ENSURE_USAGE(env.mgmtdClientGetter);
|
||||
ENSURE_USAGE(env.kvEngineGetter);
|
||||
Dispatcher::OutputTable table;
|
||||
|
||||
// dump latest inodes
|
||||
|
||||
const auto &fdbClusterFile = parser.get<std::string>("fdb-cluster-file");
|
||||
const auto &numInodesPerFile = parser.get<uint32_t>("num-inodes-perfile");
|
||||
const auto &inodeDir = parser.get<std::string>("inode-dir");
|
||||
const auto &parquetFormat = parser.get<bool>("parquet-format");
|
||||
@ -44,7 +43,7 @@ CoTryTask<Dispatcher::OutputTable> removeChunks(IEnv &ienv,
|
||||
co_return makeError(StatusCode::kInvalidArg);
|
||||
}
|
||||
|
||||
auto dumpRes = co_await dumpInodesFromFdb(fdbClusterFile, numInodesPerFile, inodeDir, parquetFormat);
|
||||
auto dumpRes = co_await dumpInodesFromFdb(env.kvEngineGetter(), numInodesPerFile, inodeDir, parquetFormat);
|
||||
if (!dumpRes) co_return makeError(dumpRes.error());
|
||||
|
||||
// load the inode dump
|
||||
|
@ -79,15 +79,13 @@ MetaScan::MetaScan(Options options, std::shared_ptr<kv::IKVEngine> kvEngine)
|
||||
if (options_.threads < 0 || options_.coroutines < 0) {
|
||||
throw std::runtime_error("Invalid options, thread < 0 or coroutines < 0");
|
||||
}
|
||||
if (!kvEngine && options_.fdb_cluster_file.empty()) {
|
||||
throw std::runtime_error("Should set kvEngine or fdb cluster file");
|
||||
if (!kvEngine) {
|
||||
throw std::runtime_error("kvEngine is required");
|
||||
}
|
||||
if (!options_.logging.empty()) {
|
||||
XLOGF(INFO, "Setup log: {}", options_.logging);
|
||||
logging::initOrDie(options_.logging);
|
||||
}
|
||||
|
||||
createKVEngine();
|
||||
}
|
||||
|
||||
MetaScan::~MetaScan() {
|
||||
@ -98,24 +96,6 @@ MetaScan::~MetaScan() {
|
||||
scanDirEntryTask_->cancel.requestCancellation();
|
||||
}
|
||||
exec_.stop();
|
||||
if (fdbNetwork_) {
|
||||
kv::fdb::DB::stopNetwork();
|
||||
fdbNetwork_->join();
|
||||
}
|
||||
}
|
||||
|
||||
void MetaScan::createKVEngine() {
|
||||
if (kvEngine_) {
|
||||
return;
|
||||
}
|
||||
|
||||
kv::fdb::DB::selectAPIVersion(FDB_API_VERSION);
|
||||
auto error = kv::fdb::DB::setupNetwork();
|
||||
if (error) {
|
||||
throw std::runtime_error(fmt::format("Failed to setup fdb network, error {}", kv::fdb::DB::errorMsg(error)));
|
||||
}
|
||||
fdbNetwork_ = std::jthread([&]() { kv::fdb::DB::runNetwork(); });
|
||||
kvEngine_ = std::make_shared<kv::FDBKVEngine>(kv::fdb::DB(options_.fdb_cluster_file, true /* readonly */));
|
||||
}
|
||||
|
||||
std::vector<Inode> MetaScan::getInodes() {
|
||||
|
@ -40,12 +40,9 @@ class MetaScan {
|
||||
double backoff_total_wait = 60; // 60s
|
||||
// log level
|
||||
std::string logging;
|
||||
// create FDB client with given config path
|
||||
std::string fdb_cluster_file;
|
||||
};
|
||||
|
||||
MetaScan(Options options,
|
||||
std::shared_ptr<kv::IKVEngine> kvEngine = {} /* create new fdb client if kvEngine is not set */);
|
||||
MetaScan(Options options, std::shared_ptr<kv::IKVEngine> kvEngine);
|
||||
~MetaScan();
|
||||
|
||||
std::vector<Inode> getInodes();
|
||||
@ -105,7 +102,6 @@ class MetaScan {
|
||||
|
||||
std::mutex mutex_;
|
||||
Options options_;
|
||||
std::optional<std::jthread> fdbNetwork_;
|
||||
std::shared_ptr<kv::IKVEngine> kvEngine_;
|
||||
folly::CPUThreadPoolExecutor exec_;
|
||||
std::optional<BackgroundTask<Inode>> scanInodeTask_;
|
||||
|
@ -162,112 +162,4 @@ TYPED_TEST(TestScan, Exit) {
|
||||
ASSERT_FALSE(entries.empty());
|
||||
}
|
||||
|
||||
DEFINE_string(fdb_cluster, "", "fdb cluster file path");
|
||||
DEFINE_uint32(scan_threads, 4, "scan threads");
|
||||
DEFINE_uint32(scan_coroutines, 8, "scan coroutines");
|
||||
DEFINE_bool(scan_all, false, "scan all inodes and direntries");
|
||||
DEFINE_bool(scan_check, false, "check parent exists");
|
||||
|
||||
TEST(TestScanFDB, DISABLED_FDB) {
|
||||
MetaScan::Options options;
|
||||
options.fdb_cluster_file = FLAGS_fdb_cluster;
|
||||
options.threads = FLAGS_scan_threads;
|
||||
options.coroutines = FLAGS_scan_coroutines;
|
||||
MetaScan scan(options);
|
||||
|
||||
std::set<InodeId> directories;
|
||||
std::set<InodeId> missing;
|
||||
|
||||
auto begin = SteadyClock::now();
|
||||
size_t totalInodes = 0;
|
||||
while (true) {
|
||||
auto inodes = scan.getInodes();
|
||||
totalInodes += inodes.size();
|
||||
if (inodes.empty() || !FLAGS_scan_all) {
|
||||
break;
|
||||
}
|
||||
if (FLAGS_scan_check) {
|
||||
for (auto &inode : inodes) {
|
||||
if (inode.isDirectory()) {
|
||||
directories.insert(inode.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
XLOGF(DBG, "MetaScan get {} inodes", inodes.size());
|
||||
}
|
||||
XLOGF(INFO,
|
||||
"MetaScan scan Inode finished, duration {}, get {} inodes\n",
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(SteadyClock::now() - begin),
|
||||
totalInodes);
|
||||
|
||||
begin = SteadyClock::now();
|
||||
size_t totalEntries = 0;
|
||||
while (true) {
|
||||
auto entries = scan.getDirEntries();
|
||||
totalEntries += entries.size();
|
||||
if (entries.empty() || !FLAGS_scan_all) {
|
||||
break;
|
||||
}
|
||||
if (FLAGS_scan_check) {
|
||||
for (auto &entry : entries) {
|
||||
if (!directories.contains(entry.parent)) {
|
||||
missing.insert(entry.parent);
|
||||
}
|
||||
}
|
||||
}
|
||||
XLOGF(DBG, "MetaScan get {} entries", entries.size());
|
||||
}
|
||||
XLOGF(INFO,
|
||||
"MetaScan scan DirEntry finished, duration {}, get {} entries",
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(SteadyClock::now() - begin),
|
||||
totalEntries);
|
||||
XLOGF_IF(WARNING, !missing.empty(), "Missing parent: {}", fmt::join(missing.begin(), missing.end(), ", "));
|
||||
}
|
||||
|
||||
DEFINE_int64(inode_id, 0, "inode id");
|
||||
|
||||
TEST(TestScanFDB, DISABLED_PrintInode) {
|
||||
MetaScan::Options options;
|
||||
options.fdb_cluster_file = FLAGS_fdb_cluster;
|
||||
options.threads = FLAGS_scan_threads;
|
||||
options.coroutines = FLAGS_scan_coroutines;
|
||||
MetaScan scan(options);
|
||||
auto &kv = scan.kvEngine();
|
||||
|
||||
folly::coro::blockingWait([&]() -> CoTask<void> {
|
||||
auto txn = kv.createReadonlyTransaction();
|
||||
auto inodeId = InodeId(FLAGS_inode_id);
|
||||
auto inode = co_await Inode::snapshotLoad(*txn, inodeId);
|
||||
CO_ASSERT_OK(inode);
|
||||
if (inode->has_value()) {
|
||||
fmt::print("inode: {}\n", **inode);
|
||||
} else {
|
||||
fmt::print("inode {} doesn't exist!", inodeId);
|
||||
}
|
||||
fmt::print("============\n");
|
||||
|
||||
std::string prev;
|
||||
while (true) {
|
||||
auto txn = kv.createReadonlyTransaction();
|
||||
auto entries = co_await DirEntryList::snapshotLoad(*txn, inodeId, prev, -1);
|
||||
CO_ASSERT_OK(entries);
|
||||
for (auto entry : entries->entries) {
|
||||
auto inode = co_await Inode::snapshotLoad(*kv.createReadWriteTransaction(), entry.id);
|
||||
CO_ASSERT_OK(inode);
|
||||
fmt::print("entry: {} inode: {}\n", entry, inode->has_value() ? fmt::format("{}", **inode) : "");
|
||||
prev = entry.name;
|
||||
}
|
||||
if (!entries->more) break;
|
||||
}
|
||||
fmt::print("============\n");
|
||||
|
||||
if (inode->has_value() && inode->value().isDirectory()) {
|
||||
auto txn = kv.createReadonlyTransaction();
|
||||
auto entry = co_await inode->value().snapshotLoadDirEntry(*txn);
|
||||
CO_ASSERT_OK(entry);
|
||||
fmt::print("entry points to {}: {}\n", inodeId, *entry);
|
||||
}
|
||||
}());
|
||||
}
|
||||
|
||||
} // namespace hf3fs::meta::server
|
||||
|
Loading…
Reference in New Issue
Block a user