mirror of
				https://github.com/deepseek-ai/3FS
				synced 2025-06-26 18:16:45 +00:00 
			
		
		
		
	Merge 47a3ed4d74 into 2db69ced80
				
					
				
			This commit is contained in:
		
						commit
						05e67187c3
					
				| @ -31,6 +31,13 @@ static constexpr std::string_view kMetadataVersionKey = "\xff/metadataVersion"; | ||||
| using Versionstamp = std::array<uint8_t, 10>; | ||||
| static_assert(sizeof(Versionstamp) == 10); | ||||
| 
 | ||||
| enum class Priority : uint8_t { | ||||
|   MIN = 0, | ||||
|   LOW = 1, | ||||
|   HIGH = 2, | ||||
|   MAX = 3, | ||||
| }; | ||||
| 
 | ||||
| class IReadOnlyTransaction { | ||||
|  public: | ||||
|   virtual ~IReadOnlyTransaction() = default; | ||||
| @ -83,6 +90,8 @@ class IReadOnlyTransaction { | ||||
| 
 | ||||
|   virtual CoTryTask<void> cancel() = 0; | ||||
|   virtual void reset() = 0; | ||||
| 
 | ||||
|   virtual Result<Void> enableStaleRead() = 0; | ||||
| }; | ||||
| 
 | ||||
| class IReadWriteTransaction : public IReadOnlyTransaction { | ||||
| @ -109,6 +118,8 @@ class IReadWriteTransaction : public IReadOnlyTransaction { | ||||
| 
 | ||||
|   virtual CoTryTask<void> commit() = 0; | ||||
|   virtual int64_t getCommittedVersion() = 0; | ||||
| 
 | ||||
|   virtual Result<Void> setPriority(Priority priority) = 0; | ||||
| }; | ||||
| 
 | ||||
| struct TransactionHelper { | ||||
|  | ||||
| @ -1,6 +1,7 @@ | ||||
| #pragma once | ||||
| 
 | ||||
| #include <algorithm> | ||||
| #include <boost/core/ignore_unused.hpp> | ||||
| #include <cassert> | ||||
| #include <folly/Likely.h> | ||||
| #include <folly/Synchronized.h> | ||||
| @ -245,6 +246,12 @@ class MemTransaction : public IReadWriteTransaction { | ||||
|     writeConflicts_.clear(); | ||||
|   } | ||||
| 
 | ||||
|   Result<Void> enableStaleRead() override { return Void{}; } | ||||
|   Result<Void> setPriority(Priority priority) override { | ||||
|     boost::ignore_unused(priority); | ||||
|     return Void{}; | ||||
|   } | ||||
| 
 | ||||
|   // check txn1 updated txn2's read set
 | ||||
|   static bool checkConflict(MemTransaction &txn1, MemTransaction &txn2) { | ||||
|     std::scoped_lock<std::mutex> guard1(txn1.mutex_); | ||||
|  | ||||
| @ -392,6 +392,22 @@ void FDBTransaction::reset() { | ||||
|   tr_.reset(); | ||||
| } | ||||
| 
 | ||||
| Result<Void> FDBTransaction::enableStaleRead() { | ||||
|   return setOption(FDBTransactionOption::FDB_TR_OPTION_USE_GRV_CACHE, {}); | ||||
| } | ||||
| 
 | ||||
| Result<Void> FDBTransaction::setPriority(Priority priority) { | ||||
|   assert(priority > PRIORITY::MIN && priority < PRIORITY::MAX); | ||||
|   switch (priority) { | ||||
|     case Priority::LOW: { | ||||
|       return setOption(FDBTransactionOption::FDB_TR_OPTION_PRIORITY_BATCH, {}); | ||||
|     } break; | ||||
|     default: { | ||||
|       return Void{}; | ||||
|     } break; | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| Result<Void> FDBTransaction::setOption(FDBTransactionOption option, std::string_view value) { | ||||
|   fdb_error_t errcode = tr_.setOption(option, value); | ||||
|   if (errcode != 0) { | ||||
|  | ||||
| @ -40,6 +40,8 @@ class FDBTransaction : public IReadWriteTransaction { | ||||
|   CoTryTask<void> commit() override; | ||||
|   void reset() override; | ||||
| 
 | ||||
|   Result<Void> enableStaleRead() override; | ||||
|   Result<Void> setPriority(Priority priority) override; | ||||
|   Result<Void> setOption(FDBTransactionOption option, std::string_view value = {}); | ||||
|   CoTask<bool> onError(fdb_error_t errcode); | ||||
| 
 | ||||
|  | ||||
| @ -306,9 +306,8 @@ CoTryTask<void> GcManager::GcTask::gcDirectory(GcManager &manager) { | ||||
|   auto finished = false; | ||||
|   auto checked = false; | ||||
|   auto handler = [&](IReadWriteTransaction &txn) -> CoTryTask<void> { | ||||
|     auto fdbTxn = dynamic_cast<kv::FDBTransaction *>(&txn); | ||||
|     if (fdbTxn && manager.config_.gc().txn_low_priority()) { | ||||
|       fdbTxn->setOption(FDBTransactionOption::FDB_TR_OPTION_PRIORITY_BATCH, {}); | ||||
|     if (manager.config_.gc().txn_low_priority()) { | ||||
|       txn.setPriority(kv::Priority::LOW); | ||||
|     } | ||||
|     if (!checked) { | ||||
|       auto loadInode = co_await Inode::load(txn, taskEntry.id); | ||||
| @ -375,9 +374,8 @@ CoTryTask<void> GcManager::GcTask::gcFile(GcManager &manager) { | ||||
|   XLOGF(DBG, "Gc file {}", taskEntry.id); | ||||
| 
 | ||||
|   auto load = co_await manager.runReadOnly([&](auto &txn) -> CoTryTask<std::optional<Inode>> { | ||||
|     auto fdbTxn = dynamic_cast<kv::FDBTransaction *>(&txn); | ||||
|     if (fdbTxn && manager.config_.gc().txn_low_priority()) { | ||||
|       fdbTxn->setOption(FDBTransactionOption::FDB_TR_OPTION_PRIORITY_BATCH, {}); | ||||
|     if (manager.config_.gc().txn_low_priority()) { | ||||
|       txn.setPriority(kv::Priority::LOW); | ||||
|     } | ||||
|     if (!manager.config_.gc().check_session()) { | ||||
|       co_return co_await Inode::snapshotLoad(txn, taskEntry.id); | ||||
|  | ||||
| @ -168,9 +168,8 @@ class OperationDriver { | ||||
|     } | ||||
| 
 | ||||
|     auto grvCache = operation_.isReadOnly() && enableGrvCache; | ||||
|     if (grvCache && dynamic_cast<kv::FDBTransaction *>(txn.get())) { | ||||
|       auto fdbTxn = dynamic_cast<kv::FDBTransaction *>(txn.get()); | ||||
|       CO_RETURN_ON_ERROR(fdbTxn->setOption(FDBTransactionOption::FDB_TR_OPTION_USE_GRV_CACHE, {})); | ||||
|     if (grvCache) { | ||||
|       CO_RETURN_ON_ERROR(txn->enableStaleRead()); | ||||
|     } | ||||
| 
 | ||||
|     Result<Rsp> result = makeError(MetaCode::kOperationTimeout); | ||||
|  | ||||
| @ -1,3 +1,4 @@ | ||||
| #include <boost/core/ignore_unused.hpp> | ||||
| #include <folly/experimental/coro/BlockingWait.h> | ||||
| #include <gtest/gtest.h> | ||||
| #include <string_view> | ||||
| @ -65,6 +66,8 @@ class MockROTxn : public IReadOnlyTransaction { | ||||
|   void reset() override {} | ||||
| 
 | ||||
|   void setReadVersion(int64_t) override {} | ||||
| 
 | ||||
|   Result<Void> enableStaleRead() override { return Void{}; } | ||||
| }; | ||||
| 
 | ||||
| class MockRWTxn : public IReadWriteTransaction { | ||||
| @ -113,6 +116,12 @@ class MockRWTxn : public IReadWriteTransaction { | ||||
| 
 | ||||
|   void reset() override {} | ||||
| 
 | ||||
|   Result<Void> enableStaleRead() override { return Void{}; } | ||||
|   Result<Void> setPriority(Priority priority) override { | ||||
|     boost::ignore_unused(priority); | ||||
|     return Void{}; | ||||
|   } | ||||
| 
 | ||||
|  private: | ||||
|   OpResultSeq &commitSeq_; | ||||
| }; | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user