mirror of
https://github.com/deepseek-ai/3FS
synced 2025-06-26 18:16:45 +00:00
Abstract KV layer options to eliminate dynamic casts to FDBTransaction
This commit is contained in:
parent
49e0ad50ae
commit
47a3ed4d74
@ -31,6 +31,13 @@ static constexpr std::string_view kMetadataVersionKey = "\xff/metadataVersion";
|
|||||||
using Versionstamp = std::array<uint8_t, 10>;
|
using Versionstamp = std::array<uint8_t, 10>;
|
||||||
static_assert(sizeof(Versionstamp) == 10);
|
static_assert(sizeof(Versionstamp) == 10);
|
||||||
|
|
||||||
|
enum class Priority : uint8_t {
|
||||||
|
MIN = 0,
|
||||||
|
LOW = 1,
|
||||||
|
HIGH = 2,
|
||||||
|
MAX = 3,
|
||||||
|
};
|
||||||
|
|
||||||
class IReadOnlyTransaction {
|
class IReadOnlyTransaction {
|
||||||
public:
|
public:
|
||||||
virtual ~IReadOnlyTransaction() = default;
|
virtual ~IReadOnlyTransaction() = default;
|
||||||
@ -83,6 +90,8 @@ class IReadOnlyTransaction {
|
|||||||
|
|
||||||
virtual CoTryTask<void> cancel() = 0;
|
virtual CoTryTask<void> cancel() = 0;
|
||||||
virtual void reset() = 0;
|
virtual void reset() = 0;
|
||||||
|
|
||||||
|
virtual Result<Void> enableStaleRead() = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
class IReadWriteTransaction : public IReadOnlyTransaction {
|
class IReadWriteTransaction : public IReadOnlyTransaction {
|
||||||
@ -109,6 +118,8 @@ class IReadWriteTransaction : public IReadOnlyTransaction {
|
|||||||
|
|
||||||
virtual CoTryTask<void> commit() = 0;
|
virtual CoTryTask<void> commit() = 0;
|
||||||
virtual int64_t getCommittedVersion() = 0;
|
virtual int64_t getCommittedVersion() = 0;
|
||||||
|
|
||||||
|
virtual Result<Void> setPriority(Priority priority) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct TransactionHelper {
|
struct TransactionHelper {
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
#include <boost/core/ignore_unused.hpp>
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <folly/Likely.h>
|
#include <folly/Likely.h>
|
||||||
#include <folly/Synchronized.h>
|
#include <folly/Synchronized.h>
|
||||||
@ -245,6 +246,12 @@ class MemTransaction : public IReadWriteTransaction {
|
|||||||
writeConflicts_.clear();
|
writeConflicts_.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Result<Void> enableStaleRead() override { return Void{}; }
|
||||||
|
Result<Void> setPriority(Priority priority) override {
|
||||||
|
boost::ignore_unused(priority);
|
||||||
|
return Void{};
|
||||||
|
}
|
||||||
|
|
||||||
// check txn1 updated txn2's read set
|
// check txn1 updated txn2's read set
|
||||||
static bool checkConflict(MemTransaction &txn1, MemTransaction &txn2) {
|
static bool checkConflict(MemTransaction &txn1, MemTransaction &txn2) {
|
||||||
std::scoped_lock<std::mutex> guard1(txn1.mutex_);
|
std::scoped_lock<std::mutex> guard1(txn1.mutex_);
|
||||||
|
|||||||
@ -392,6 +392,22 @@ void FDBTransaction::reset() {
|
|||||||
tr_.reset();
|
tr_.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Result<Void> FDBTransaction::enableStaleRead() {
|
||||||
|
return setOption(FDBTransactionOption::FDB_TR_OPTION_USE_GRV_CACHE, {});
|
||||||
|
}
|
||||||
|
|
||||||
|
Result<Void> FDBTransaction::setPriority(Priority priority) {
|
||||||
|
assert(priority > PRIORITY::MIN && priority < PRIORITY::MAX);
|
||||||
|
switch (priority) {
|
||||||
|
case Priority::LOW: {
|
||||||
|
return setOption(FDBTransactionOption::FDB_TR_OPTION_PRIORITY_BATCH, {});
|
||||||
|
} break;
|
||||||
|
default: {
|
||||||
|
return Void{};
|
||||||
|
} break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Result<Void> FDBTransaction::setOption(FDBTransactionOption option, std::string_view value) {
|
Result<Void> FDBTransaction::setOption(FDBTransactionOption option, std::string_view value) {
|
||||||
fdb_error_t errcode = tr_.setOption(option, value);
|
fdb_error_t errcode = tr_.setOption(option, value);
|
||||||
if (errcode != 0) {
|
if (errcode != 0) {
|
||||||
|
|||||||
@ -40,6 +40,8 @@ class FDBTransaction : public IReadWriteTransaction {
|
|||||||
CoTryTask<void> commit() override;
|
CoTryTask<void> commit() override;
|
||||||
void reset() override;
|
void reset() override;
|
||||||
|
|
||||||
|
Result<Void> enableStaleRead() override;
|
||||||
|
Result<Void> setPriority(Priority priority) override;
|
||||||
Result<Void> setOption(FDBTransactionOption option, std::string_view value = {});
|
Result<Void> setOption(FDBTransactionOption option, std::string_view value = {});
|
||||||
CoTask<bool> onError(fdb_error_t errcode);
|
CoTask<bool> onError(fdb_error_t errcode);
|
||||||
|
|
||||||
|
|||||||
@ -306,9 +306,8 @@ CoTryTask<void> GcManager::GcTask::gcDirectory(GcManager &manager) {
|
|||||||
auto finished = false;
|
auto finished = false;
|
||||||
auto checked = false;
|
auto checked = false;
|
||||||
auto handler = [&](IReadWriteTransaction &txn) -> CoTryTask<void> {
|
auto handler = [&](IReadWriteTransaction &txn) -> CoTryTask<void> {
|
||||||
auto fdbTxn = dynamic_cast<kv::FDBTransaction *>(&txn);
|
if (manager.config_.gc().txn_low_priority()) {
|
||||||
if (fdbTxn && manager.config_.gc().txn_low_priority()) {
|
txn.setPriority(kv::Priority::LOW);
|
||||||
fdbTxn->setOption(FDBTransactionOption::FDB_TR_OPTION_PRIORITY_BATCH, {});
|
|
||||||
}
|
}
|
||||||
if (!checked) {
|
if (!checked) {
|
||||||
auto loadInode = co_await Inode::load(txn, taskEntry.id);
|
auto loadInode = co_await Inode::load(txn, taskEntry.id);
|
||||||
@ -375,9 +374,8 @@ CoTryTask<void> GcManager::GcTask::gcFile(GcManager &manager) {
|
|||||||
XLOGF(DBG, "Gc file {}", taskEntry.id);
|
XLOGF(DBG, "Gc file {}", taskEntry.id);
|
||||||
|
|
||||||
auto load = co_await manager.runReadOnly([&](auto &txn) -> CoTryTask<std::optional<Inode>> {
|
auto load = co_await manager.runReadOnly([&](auto &txn) -> CoTryTask<std::optional<Inode>> {
|
||||||
auto fdbTxn = dynamic_cast<kv::FDBTransaction *>(&txn);
|
if (manager.config_.gc().txn_low_priority()) {
|
||||||
if (fdbTxn && manager.config_.gc().txn_low_priority()) {
|
txn.setPriority(kv::Priority::LOW);
|
||||||
fdbTxn->setOption(FDBTransactionOption::FDB_TR_OPTION_PRIORITY_BATCH, {});
|
|
||||||
}
|
}
|
||||||
if (!manager.config_.gc().check_session()) {
|
if (!manager.config_.gc().check_session()) {
|
||||||
co_return co_await Inode::snapshotLoad(txn, taskEntry.id);
|
co_return co_await Inode::snapshotLoad(txn, taskEntry.id);
|
||||||
|
|||||||
@ -168,9 +168,8 @@ class OperationDriver {
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto grvCache = operation_.isReadOnly() && enableGrvCache;
|
auto grvCache = operation_.isReadOnly() && enableGrvCache;
|
||||||
if (grvCache && dynamic_cast<kv::FDBTransaction *>(txn.get())) {
|
if (grvCache) {
|
||||||
auto fdbTxn = dynamic_cast<kv::FDBTransaction *>(txn.get());
|
CO_RETURN_ON_ERROR(txn->enableStaleRead());
|
||||||
CO_RETURN_ON_ERROR(fdbTxn->setOption(FDBTransactionOption::FDB_TR_OPTION_USE_GRV_CACHE, {}));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Result<Rsp> result = makeError(MetaCode::kOperationTimeout);
|
Result<Rsp> result = makeError(MetaCode::kOperationTimeout);
|
||||||
|
|||||||
@ -1,3 +1,4 @@
|
|||||||
|
#include <boost/core/ignore_unused.hpp>
|
||||||
#include <folly/experimental/coro/BlockingWait.h>
|
#include <folly/experimental/coro/BlockingWait.h>
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
#include <string_view>
|
#include <string_view>
|
||||||
@ -65,6 +66,8 @@ class MockROTxn : public IReadOnlyTransaction {
|
|||||||
void reset() override {}
|
void reset() override {}
|
||||||
|
|
||||||
void setReadVersion(int64_t) override {}
|
void setReadVersion(int64_t) override {}
|
||||||
|
|
||||||
|
Result<Void> enableStaleRead() override { return Void{}; }
|
||||||
};
|
};
|
||||||
|
|
||||||
class MockRWTxn : public IReadWriteTransaction {
|
class MockRWTxn : public IReadWriteTransaction {
|
||||||
@ -113,6 +116,12 @@ class MockRWTxn : public IReadWriteTransaction {
|
|||||||
|
|
||||||
void reset() override {}
|
void reset() override {}
|
||||||
|
|
||||||
|
Result<Void> enableStaleRead() override { return Void{}; }
|
||||||
|
Result<Void> setPriority(Priority priority) override {
|
||||||
|
boost::ignore_unused(priority);
|
||||||
|
return Void{};
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
OpResultSeq &commitSeq_;
|
OpResultSeq &commitSeq_;
|
||||||
};
|
};
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user