mirror of
				https://github.com/deepseek-ai/3FS
				synced 2025-06-26 18:16:45 +00:00 
			
		
		
		
	Merge e26f5a901a into 2db69ced80
				
					
				
			This commit is contained in:
		
						commit
						deb17498e6
					
				| @ -25,7 +25,6 @@ namespace { | ||||
| auto getParser() { | ||||
|   argparse::ArgumentParser parser("dump-dentries"); | ||||
|   parser.add_argument("-n", "--num-dentries-perfile").default_value(uint32_t{10'000'000}).scan<'u', uint32_t>(); | ||||
|   parser.add_argument("-f", "--fdb-cluster-file").default_value(std::string{"./fdb.cluster"}); | ||||
|   parser.add_argument("-d", "--dentry-dir").default_value(std::string{"dentries"}); | ||||
|   parser.add_argument("-t", "--threads").default_value(uint32_t(4)).scan<'u', uint32_t>(); | ||||
|   return parser; | ||||
| @ -37,14 +36,13 @@ CoTryTask<Dispatcher::OutputTable> dumpDirEntries(IEnv &ienv, | ||||
|   auto &env = dynamic_cast<AdminEnv &>(ienv); | ||||
|   ENSURE_USAGE(args.empty()); | ||||
|   ENSURE_USAGE(env.mgmtdClientGetter); | ||||
|   ENSURE_USAGE(env.kvEngineGetter); | ||||
| 
 | ||||
|   const auto &fdbClusterFile = parser.get<std::string>("fdb-cluster-file"); | ||||
|   const auto &numEntriesPerFile = parser.get<uint32_t>("num-dentries-perfile"); | ||||
|   const auto &dentryDir = parser.get<std::string>("dentry-dir"); | ||||
|   const auto threads = parser.get<uint32_t>("threads"); | ||||
| 
 | ||||
|   ENSURE_USAGE(threads > 0); | ||||
|   ENSURE_USAGE(!fdbClusterFile.empty()); | ||||
|   ENSURE_USAGE(!dentryDir.empty()); | ||||
|   ENSURE_USAGE(numEntriesPerFile > 0); | ||||
| 
 | ||||
| @ -55,13 +53,13 @@ CoTryTask<Dispatcher::OutputTable> dumpDirEntries(IEnv &ienv, | ||||
| 
 | ||||
|   Dispatcher::OutputTable table; | ||||
|   auto dumpRes = | ||||
|       co_await dumpDirEntriesFromFdb(fdbClusterFile, numEntriesPerFile, dentryDir, std::max(uint32_t(1), threads)); | ||||
|       co_await dumpDirEntriesFromFdb(env.kvEngineGetter(), numEntriesPerFile, dentryDir, std::max(uint32_t(1), threads)); | ||||
|   if (!dumpRes) co_return makeError(dumpRes.error()); | ||||
|   co_return table; | ||||
| } | ||||
| }  // namespace
 | ||||
| 
 | ||||
| CoTryTask<Void> dumpDirEntriesFromFdb(const std::string fdbClusterFile, | ||||
| CoTryTask<Void> dumpDirEntriesFromFdb(std::shared_ptr<kv::IKVEngine> kvEngine, | ||||
|                                       const uint32_t numEntriesPerDir, | ||||
|                                       const std::string dentryDir, | ||||
|                                       const uint32_t threads) { | ||||
| @ -77,10 +75,9 @@ CoTryTask<Void> dumpDirEntriesFromFdb(const std::string fdbClusterFile, | ||||
|   XLOGF(CRITICAL, "Saving directory entries to directory: {}", dentryDir); | ||||
| 
 | ||||
|   meta::server::MetaScan::Options options; | ||||
|   options.fdb_cluster_file = fdbClusterFile; | ||||
|   options.threads = 8; | ||||
|   options.coroutines = 32; | ||||
|   auto scan = std::make_unique<meta::server::MetaScan>(options); | ||||
|   auto scan = std::make_unique<meta::server::MetaScan>(options, kvEngine); | ||||
|   auto exec = std::make_unique<folly::CPUThreadPoolExecutor>(16); | ||||
| 
 | ||||
|   time_t timestamp = UtcClock::secondsSinceEpoch(); | ||||
|  | ||||
| @ -3,6 +3,7 @@ | ||||
| #include <cstdint> | ||||
| #include <vector> | ||||
| 
 | ||||
| #include "common/kv/IKVEngine.h" | ||||
| #include "common/serde/Serde.h" | ||||
| #include "fbs/meta/Schema.h" | ||||
| namespace hf3fs::client::cli { | ||||
| @ -21,7 +22,7 @@ struct DirEntryTable { | ||||
| }; | ||||
| static_assert(serde::Serializable<DirEntryTable>); | ||||
| 
 | ||||
| CoTryTask<Void> dumpDirEntriesFromFdb(const std::string fdbClusterFile, | ||||
| CoTryTask<Void> dumpDirEntriesFromFdb(std::shared_ptr<kv::IKVEngine> kvEngine, | ||||
|                                       const uint32_t numEntriesPerDir, | ||||
|                                       const std::string dentryDir, | ||||
|                                       const uint32_t threads); | ||||
|  | ||||
| @ -30,7 +30,6 @@ namespace { | ||||
| auto getParser() { | ||||
|   argparse::ArgumentParser parser("dump-inodes"); | ||||
|   parser.add_argument("-n", "--num-inodes-perfile").default_value(uint32_t{10'000'000}).scan<'u', uint32_t>(); | ||||
|   parser.add_argument("-f", "--fdb-cluster-file").default_value(std::string{"./fdb.cluster"}); | ||||
|   parser.add_argument("-i", "--inode-dir").default_value(std::string{"inodes"}); | ||||
|   parser.add_argument("-q", "--parquet-format").default_value(false).implicit_value(true); | ||||
|   parser.add_argument("-a", "--all-inodes").default_value(false).implicit_value(true); | ||||
| @ -44,8 +43,8 @@ CoTryTask<Dispatcher::OutputTable> dumpInodes(IEnv &ienv, | ||||
|   auto &env = dynamic_cast<AdminEnv &>(ienv); | ||||
|   ENSURE_USAGE(args.empty()); | ||||
|   ENSURE_USAGE(env.mgmtdClientGetter); | ||||
|   ENSURE_USAGE(env.kvEngineGetter); | ||||
| 
 | ||||
|   const auto &fdbClusterFile = parser.get<std::string>("fdb-cluster-file"); | ||||
|   const auto &numInodesPerFile = parser.get<uint32_t>("num-inodes-perfile"); | ||||
|   const auto &inodeDir = parser.get<std::string>("inode-dir"); | ||||
|   const auto &parquetFormat = parser.get<bool>("parquet-format"); | ||||
| @ -53,7 +52,6 @@ CoTryTask<Dispatcher::OutputTable> dumpInodes(IEnv &ienv, | ||||
|   const auto &threads = parser.get<uint32_t>("threads"); | ||||
| 
 | ||||
|   ENSURE_USAGE(threads > 0); | ||||
|   ENSURE_USAGE(!fdbClusterFile.empty()); | ||||
| 
 | ||||
|   if (boost::filesystem::exists(inodeDir)) { | ||||
|     XLOGF(CRITICAL, "Output directory for inodes already exists: {}", inodeDir); | ||||
| @ -61,7 +59,7 @@ CoTryTask<Dispatcher::OutputTable> dumpInodes(IEnv &ienv, | ||||
|   } | ||||
| 
 | ||||
|   Dispatcher::OutputTable table; | ||||
|   auto dumpRes = co_await dumpInodesFromFdb(fdbClusterFile, | ||||
|   auto dumpRes = co_await dumpInodesFromFdb(env.kvEngineGetter(), | ||||
|                                             numInodesPerFile, | ||||
|                                             inodeDir, | ||||
|                                             parquetFormat, | ||||
| @ -73,7 +71,7 @@ CoTryTask<Dispatcher::OutputTable> dumpInodes(IEnv &ienv, | ||||
| 
 | ||||
| }  // namespace
 | ||||
| 
 | ||||
| CoTryTask<Void> dumpInodesFromFdb(const std::string fdbClusterFile, | ||||
| CoTryTask<Void> dumpInodesFromFdb(std::shared_ptr<kv::IKVEngine> kvEngine, | ||||
|                                   const uint32_t numInodesPerFile, | ||||
|                                   const std::string inodeDir, | ||||
|                                   const bool parquetFormat, | ||||
| @ -90,10 +88,9 @@ CoTryTask<Void> dumpInodesFromFdb(const std::string fdbClusterFile, | ||||
|   XLOGF(CRITICAL, "Saving inodes to directory: {}", inodeDir); | ||||
| 
 | ||||
|   meta::server::MetaScan::Options options; | ||||
|   options.fdb_cluster_file = fdbClusterFile; | ||||
|   options.threads = 8; | ||||
|   options.coroutines = 32; | ||||
|   auto scan = std::make_unique<meta::server::MetaScan>(options); | ||||
|   auto scan = std::make_unique<meta::server::MetaScan>(options, kvEngine); | ||||
|   auto exec = std::make_unique<folly::CPUThreadPoolExecutor>(16); | ||||
| 
 | ||||
|   time_t timestamp = UtcClock::secondsSinceEpoch(); | ||||
|  | ||||
| @ -1,5 +1,6 @@ | ||||
| #pragma once | ||||
| 
 | ||||
| #include "common/kv/IKVEngine.h" | ||||
| #include "common/serde/Serde.h" | ||||
| #include "common/utils/Coroutine.h" | ||||
| #include "fbs/meta/Schema.h" | ||||
| @ -25,7 +26,7 @@ static_assert(serde::Serializable<InodeTable>); | ||||
| 
 | ||||
| std::vector<Path> listFilesFromPath(const Path path); | ||||
| 
 | ||||
| CoTryTask<Void> dumpInodesFromFdb(const std::string fdbClusterFile, | ||||
| CoTryTask<Void> dumpInodesFromFdb(std::shared_ptr<kv::IKVEngine> kvEngine, | ||||
|                                   const uint32_t numInodesPerFile, | ||||
|                                   const std::string inodeDir, | ||||
|                                   const bool parquetFormat = false, | ||||
|  | ||||
| @ -16,7 +16,6 @@ namespace { | ||||
| auto getParser() { | ||||
|   argparse::ArgumentParser parser("remove-chunks"); | ||||
|   parser.add_argument("-n", "--num-inodes-perfile").default_value(uint32_t{10'000'000}).scan<'u', uint32_t>(); | ||||
|   parser.add_argument("-f", "--fdb-cluster-file").default_value(std::string{"./fdb.cluster"}); | ||||
|   parser.add_argument("-i", "--inode-dir").default_value(std::string{"inodes2"}); | ||||
|   parser.add_argument("-o", "--orphaned-path").default_value(std::string{"orphaned"}); | ||||
|   parser.add_argument("-r", "--do-remove").default_value(false).implicit_value(true); | ||||
| @ -30,11 +29,11 @@ CoTryTask<Dispatcher::OutputTable> removeChunks(IEnv &ienv, | ||||
|   auto &env = dynamic_cast<AdminEnv &>(ienv); | ||||
|   ENSURE_USAGE(args.empty()); | ||||
|   ENSURE_USAGE(env.mgmtdClientGetter); | ||||
|   ENSURE_USAGE(env.kvEngineGetter); | ||||
|   Dispatcher::OutputTable table; | ||||
| 
 | ||||
|   // dump latest inodes
 | ||||
| 
 | ||||
|   const auto &fdbClusterFile = parser.get<std::string>("fdb-cluster-file"); | ||||
|   const auto &numInodesPerFile = parser.get<uint32_t>("num-inodes-perfile"); | ||||
|   const auto &inodeDir = parser.get<std::string>("inode-dir"); | ||||
|   const auto &parquetFormat = parser.get<bool>("parquet-format"); | ||||
| @ -44,7 +43,7 @@ CoTryTask<Dispatcher::OutputTable> removeChunks(IEnv &ienv, | ||||
|     co_return makeError(StatusCode::kInvalidArg); | ||||
|   } | ||||
| 
 | ||||
|   auto dumpRes = co_await dumpInodesFromFdb(fdbClusterFile, numInodesPerFile, inodeDir, parquetFormat); | ||||
|   auto dumpRes = co_await dumpInodesFromFdb(env.kvEngineGetter(), numInodesPerFile, inodeDir, parquetFormat); | ||||
|   if (!dumpRes) co_return makeError(dumpRes.error()); | ||||
| 
 | ||||
|   // load the inode dump
 | ||||
|  | ||||
| @ -79,15 +79,13 @@ MetaScan::MetaScan(Options options, std::shared_ptr<kv::IKVEngine> kvEngine) | ||||
|   if (options_.threads < 0 || options_.coroutines < 0) { | ||||
|     throw std::runtime_error("Invalid options, thread < 0 or coroutines < 0"); | ||||
|   } | ||||
|   if (!kvEngine && options_.fdb_cluster_file.empty()) { | ||||
|     throw std::runtime_error("Should set kvEngine or fdb cluster file"); | ||||
|   if (!kvEngine) { | ||||
|     throw std::runtime_error("kvEngine is required"); | ||||
|   } | ||||
|   if (!options_.logging.empty()) { | ||||
|     XLOGF(INFO, "Setup log: {}", options_.logging); | ||||
|     logging::initOrDie(options_.logging); | ||||
|   } | ||||
| 
 | ||||
|   createKVEngine(); | ||||
| } | ||||
| 
 | ||||
| MetaScan::~MetaScan() { | ||||
| @ -98,24 +96,6 @@ MetaScan::~MetaScan() { | ||||
|     scanDirEntryTask_->cancel.requestCancellation(); | ||||
|   } | ||||
|   exec_.stop(); | ||||
|   if (fdbNetwork_) { | ||||
|     kv::fdb::DB::stopNetwork(); | ||||
|     fdbNetwork_->join(); | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| void MetaScan::createKVEngine() { | ||||
|   if (kvEngine_) { | ||||
|     return; | ||||
|   } | ||||
| 
 | ||||
|   kv::fdb::DB::selectAPIVersion(FDB_API_VERSION); | ||||
|   auto error = kv::fdb::DB::setupNetwork(); | ||||
|   if (error) { | ||||
|     throw std::runtime_error(fmt::format("Failed to setup fdb network, error {}", kv::fdb::DB::errorMsg(error))); | ||||
|   } | ||||
|   fdbNetwork_ = std::jthread([&]() { kv::fdb::DB::runNetwork(); }); | ||||
|   kvEngine_ = std::make_shared<kv::FDBKVEngine>(kv::fdb::DB(options_.fdb_cluster_file, true /* readonly */)); | ||||
| } | ||||
| 
 | ||||
| std::vector<Inode> MetaScan::getInodes() { | ||||
|  | ||||
| @ -40,12 +40,9 @@ class MetaScan { | ||||
|     double backoff_total_wait = 60;  // 60s
 | ||||
|     // log level
 | ||||
|     std::string logging; | ||||
|     // create FDB client with given config path
 | ||||
|     std::string fdb_cluster_file; | ||||
|   }; | ||||
| 
 | ||||
|   MetaScan(Options options, | ||||
|            std::shared_ptr<kv::IKVEngine> kvEngine = {} /* create new fdb client if kvEngine is not set */); | ||||
|   MetaScan(Options options, std::shared_ptr<kv::IKVEngine> kvEngine); | ||||
|   ~MetaScan(); | ||||
| 
 | ||||
|   std::vector<Inode> getInodes(); | ||||
| @ -105,7 +102,6 @@ class MetaScan { | ||||
| 
 | ||||
|   std::mutex mutex_; | ||||
|   Options options_; | ||||
|   std::optional<std::jthread> fdbNetwork_; | ||||
|   std::shared_ptr<kv::IKVEngine> kvEngine_; | ||||
|   folly::CPUThreadPoolExecutor exec_; | ||||
|   std::optional<BackgroundTask<Inode>> scanInodeTask_; | ||||
|  | ||||
| @ -162,112 +162,4 @@ TYPED_TEST(TestScan, Exit) { | ||||
|   ASSERT_FALSE(entries.empty()); | ||||
| } | ||||
| 
 | ||||
| DEFINE_string(fdb_cluster, "", "fdb cluster file path"); | ||||
| DEFINE_uint32(scan_threads, 4, "scan threads"); | ||||
| DEFINE_uint32(scan_coroutines, 8, "scan coroutines"); | ||||
| DEFINE_bool(scan_all, false, "scan all inodes and direntries"); | ||||
| DEFINE_bool(scan_check, false, "check parent exists"); | ||||
| 
 | ||||
| TEST(TestScanFDB, DISABLED_FDB) { | ||||
|   MetaScan::Options options; | ||||
|   options.fdb_cluster_file = FLAGS_fdb_cluster; | ||||
|   options.threads = FLAGS_scan_threads; | ||||
|   options.coroutines = FLAGS_scan_coroutines; | ||||
|   MetaScan scan(options); | ||||
| 
 | ||||
|   std::set<InodeId> directories; | ||||
|   std::set<InodeId> missing; | ||||
| 
 | ||||
|   auto begin = SteadyClock::now(); | ||||
|   size_t totalInodes = 0; | ||||
|   while (true) { | ||||
|     auto inodes = scan.getInodes(); | ||||
|     totalInodes += inodes.size(); | ||||
|     if (inodes.empty() || !FLAGS_scan_all) { | ||||
|       break; | ||||
|     } | ||||
|     if (FLAGS_scan_check) { | ||||
|       for (auto &inode : inodes) { | ||||
|         if (inode.isDirectory()) { | ||||
|           directories.insert(inode.id); | ||||
|         } | ||||
|       } | ||||
|     } | ||||
|     XLOGF(DBG, "MetaScan get {} inodes", inodes.size()); | ||||
|   } | ||||
|   XLOGF(INFO, | ||||
|         "MetaScan scan Inode finished, duration {}, get {} inodes\n", | ||||
|         std::chrono::duration_cast<std::chrono::milliseconds>(SteadyClock::now() - begin), | ||||
|         totalInodes); | ||||
| 
 | ||||
|   begin = SteadyClock::now(); | ||||
|   size_t totalEntries = 0; | ||||
|   while (true) { | ||||
|     auto entries = scan.getDirEntries(); | ||||
|     totalEntries += entries.size(); | ||||
|     if (entries.empty() || !FLAGS_scan_all) { | ||||
|       break; | ||||
|     } | ||||
|     if (FLAGS_scan_check) { | ||||
|       for (auto &entry : entries) { | ||||
|         if (!directories.contains(entry.parent)) { | ||||
|           missing.insert(entry.parent); | ||||
|         } | ||||
|       } | ||||
|     } | ||||
|     XLOGF(DBG, "MetaScan get {} entries", entries.size()); | ||||
|   } | ||||
|   XLOGF(INFO, | ||||
|         "MetaScan scan DirEntry finished, duration {}, get {} entries", | ||||
|         std::chrono::duration_cast<std::chrono::milliseconds>(SteadyClock::now() - begin), | ||||
|         totalEntries); | ||||
|   XLOGF_IF(WARNING, !missing.empty(), "Missing parent: {}", fmt::join(missing.begin(), missing.end(), ", ")); | ||||
| } | ||||
| 
 | ||||
| DEFINE_int64(inode_id, 0, "inode id"); | ||||
| 
 | ||||
| TEST(TestScanFDB, DISABLED_PrintInode) { | ||||
|   MetaScan::Options options; | ||||
|   options.fdb_cluster_file = FLAGS_fdb_cluster; | ||||
|   options.threads = FLAGS_scan_threads; | ||||
|   options.coroutines = FLAGS_scan_coroutines; | ||||
|   MetaScan scan(options); | ||||
|   auto &kv = scan.kvEngine(); | ||||
| 
 | ||||
|   folly::coro::blockingWait([&]() -> CoTask<void> { | ||||
|     auto txn = kv.createReadonlyTransaction(); | ||||
|     auto inodeId = InodeId(FLAGS_inode_id); | ||||
|     auto inode = co_await Inode::snapshotLoad(*txn, inodeId); | ||||
|     CO_ASSERT_OK(inode); | ||||
|     if (inode->has_value()) { | ||||
|       fmt::print("inode: {}\n", **inode); | ||||
|     } else { | ||||
|       fmt::print("inode {} doesn't exist!", inodeId); | ||||
|     } | ||||
|     fmt::print("============\n"); | ||||
| 
 | ||||
|     std::string prev; | ||||
|     while (true) { | ||||
|       auto txn = kv.createReadonlyTransaction(); | ||||
|       auto entries = co_await DirEntryList::snapshotLoad(*txn, inodeId, prev, -1); | ||||
|       CO_ASSERT_OK(entries); | ||||
|       for (auto entry : entries->entries) { | ||||
|         auto inode = co_await Inode::snapshotLoad(*kv.createReadWriteTransaction(), entry.id); | ||||
|         CO_ASSERT_OK(inode); | ||||
|         fmt::print("entry: {} inode: {}\n", entry, inode->has_value() ? fmt::format("{}", **inode) : ""); | ||||
|         prev = entry.name; | ||||
|       } | ||||
|       if (!entries->more) break; | ||||
|     } | ||||
|     fmt::print("============\n"); | ||||
| 
 | ||||
|     if (inode->has_value() && inode->value().isDirectory()) { | ||||
|       auto txn = kv.createReadonlyTransaction(); | ||||
|       auto entry = co_await inode->value().snapshotLoadDirEntry(*txn); | ||||
|       CO_ASSERT_OK(entry); | ||||
|       fmt::print("entry points to {}: {}\n", inodeId, *entry); | ||||
|     } | ||||
|   }()); | ||||
| } | ||||
| 
 | ||||
| }  // namespace hf3fs::meta::server
 | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user