mirror of
https://github.com/deepseek-ai/3FS
synced 2025-06-26 18:16:45 +00:00
Fix naming typo getRecorderWithTag (#210)
This commit is contained in:
parent
0caca67ee1
commit
b68df2817b
@ -679,7 +679,7 @@ typename hf3fs::storage::BatchReadReq buildBatchRequest(const ClientRequestConte
|
||||
payloads.reserve(ops.size());
|
||||
size_t requestedBytes = 0;
|
||||
auto requestTagSet = monitor::instanceTagSet("batchRead");
|
||||
auto tagged_bytes_per_operation = bytes_per_operation.getRecoderWithTag(requestTagSet);
|
||||
auto tagged_bytes_per_operation = bytes_per_operation.getRecorderWithTag(requestTagSet);
|
||||
|
||||
hf3fs::storage::RequestId requestId(nextRequestId.fetch_add(1));
|
||||
hf3fs::storage::MessageTag tag{clientId, requestId};
|
||||
|
||||
@ -50,15 +50,15 @@ void Recorder::setHostname(std::string hostname, std::string podname) {
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
Recorder::TagRef<T> Recorder::getRecoderWithTag(const TagSet &tag) {
|
||||
Recorder::TagRef<T> Recorder::getRecorderWithTag(const TagSet &tag) {
|
||||
auto iter = map_.find(tag);
|
||||
if (UNLIKELY(iter == map_.end())) {
|
||||
TagSet new_tag_set = tag_;
|
||||
for (const auto &kv : tag) {
|
||||
new_tag_set.addTag(kv.first, kv.second);
|
||||
}
|
||||
auto newRecoder = std::make_unique<T>(*reinterpret_cast<T *>(this), new_tag_set);
|
||||
auto result = map_.insert(tag, std::move(newRecoder));
|
||||
auto newRecorder = std::make_unique<T>(*reinterpret_cast<T *>(this), new_tag_set);
|
||||
auto result = map_.insert(tag, std::move(newRecorder));
|
||||
iter = std::move(result.first);
|
||||
}
|
||||
return iter;
|
||||
@ -72,7 +72,7 @@ CountRecorderWithTLSTag<ThreadLocalTag>::CountRecorderWithTLSTag(std::string_vie
|
||||
|
||||
template <class ThreadLocalTag>
|
||||
void CountRecorderWithTLSTag<ThreadLocalTag>::addSample(int64_t val, const TagSet &tag) {
|
||||
getRecoderWithTag(tag)->addSample(val);
|
||||
getRecorderWithTag(tag)->addSample(val);
|
||||
}
|
||||
|
||||
template <class ThreadLocalTag>
|
||||
@ -159,7 +159,7 @@ void DistributionRecorder::collect(std::vector<Sample> &samples) {
|
||||
}
|
||||
}
|
||||
|
||||
void DistributionRecorder::addSample(double val, const TagSet &tag) { getRecoderWithTag(tag)->addSample(val); }
|
||||
void DistributionRecorder::addSample(double val, const TagSet &tag) { getRecorderWithTag(tag)->addSample(val); }
|
||||
|
||||
void DistributionRecorder::logPer30s(const folly::TDigest &digest) {
|
||||
XLOGF(DBG3,
|
||||
@ -222,10 +222,10 @@ void SimpleDistributionRecorder::collect(std::vector<Sample> &samples) {
|
||||
}
|
||||
}
|
||||
|
||||
void SimpleDistributionRecorder::addSample(int64_t val, const TagSet &tag) { getRecoderWithTag(tag)->addSample(val); }
|
||||
void SimpleDistributionRecorder::addSample(int64_t val, const TagSet &tag) { getRecorderWithTag(tag)->addSample(val); }
|
||||
|
||||
void LatencyRecorder::addSample(std::chrono::nanoseconds duration, const TagSet &tag) {
|
||||
getRecoderWithTag(tag)->addSample(duration);
|
||||
getRecorderWithTag(tag)->addSample(duration);
|
||||
}
|
||||
|
||||
void LatencyRecorder::logPer30s(const folly::TDigest &digest) {
|
||||
@ -303,7 +303,7 @@ template class OperationRecorderT<SimpleDistributionRecorder>;
|
||||
ValueRecorder::ValueRecorder(std::string_view name, std::optional<TagSet> tag, bool resetWhenCollect)
|
||||
: ValueRecorder(Monitor::getDefaultInstance(), name, tag, resetWhenCollect) {}
|
||||
|
||||
void ValueRecorder::set(int64_t val, const TagSet &tag) { getRecoderWithTag(tag)->set(val); }
|
||||
void ValueRecorder::set(int64_t val, const TagSet &tag) { getRecorderWithTag(tag)->set(val); }
|
||||
|
||||
void ValueRecorder::collect(std::vector<Sample> &samples) {
|
||||
int64_t val = resetWhenCollect_ ? val_.exchange(0) : val_.load();
|
||||
|
||||
@ -71,7 +71,7 @@ class Recorder {
|
||||
|
||||
// Expose recorder associated with a tag set
|
||||
template <typename T>
|
||||
TagRef<T> getRecoderWithTag(const TagSet &tag);
|
||||
TagRef<T> getRecorderWithTag(const TagSet &tag);
|
||||
|
||||
protected:
|
||||
friend class Collector;
|
||||
@ -116,8 +116,8 @@ class CountRecorderWithTLSTag final : public Recorder {
|
||||
void addSample(int64_t val) { tls_->addSample(val); }
|
||||
void addSample(int64_t val, const TagSet &tag);
|
||||
|
||||
TagRef<CountRecorderWithTLSTag> getRecoderWithTag(const TagSet &tag) {
|
||||
return Recorder::getRecoderWithTag<CountRecorderWithTLSTag>(tag);
|
||||
TagRef<CountRecorderWithTLSTag> getRecorderWithTag(const TagSet &tag) {
|
||||
return Recorder::getRecorderWithTag<CountRecorderWithTLSTag>(tag);
|
||||
}
|
||||
|
||||
private:
|
||||
@ -172,8 +172,8 @@ class DistributionRecorder : public Recorder {
|
||||
void addSample(double val, const TagSet &tag);
|
||||
virtual void logPer30s(const folly::TDigest &digest);
|
||||
|
||||
TagRef<DistributionRecorder> getRecoderWithTag(const TagSet &tag) {
|
||||
return Recorder::getRecoderWithTag<DistributionRecorder>(tag);
|
||||
TagRef<DistributionRecorder> getRecorderWithTag(const TagSet &tag) {
|
||||
return Recorder::getRecorderWithTag<DistributionRecorder>(tag);
|
||||
}
|
||||
|
||||
protected:
|
||||
@ -203,8 +203,8 @@ class LatencyRecorder : public DistributionRecorder {
|
||||
void addSample(std::chrono::nanoseconds duration, const TagSet &tag);
|
||||
void logPer30s(const folly::TDigest &digest) override;
|
||||
|
||||
TagRef<LatencyRecorder> getRecoderWithTag(const TagSet &tag) {
|
||||
return Recorder::getRecoderWithTag<LatencyRecorder>(tag);
|
||||
TagRef<LatencyRecorder> getRecorderWithTag(const TagSet &tag) {
|
||||
return Recorder::getRecorderWithTag<LatencyRecorder>(tag);
|
||||
}
|
||||
};
|
||||
|
||||
@ -224,8 +224,8 @@ class SimpleDistributionRecorder : public Recorder {
|
||||
void addSample(std::chrono::nanoseconds duration) { addSample(duration.count()); }
|
||||
void addSample(std::chrono::nanoseconds duration, const TagSet &tag) { addSample(duration.count(), tag); }
|
||||
|
||||
TagRef<SimpleDistributionRecorder> getRecoderWithTag(const TagSet &tag) {
|
||||
return Recorder::getRecoderWithTag<SimpleDistributionRecorder>(tag);
|
||||
TagRef<SimpleDistributionRecorder> getRecorderWithTag(const TagSet &tag) {
|
||||
return Recorder::getRecorderWithTag<SimpleDistributionRecorder>(tag);
|
||||
}
|
||||
|
||||
protected:
|
||||
@ -366,7 +366,7 @@ class ValueRecorder : public Recorder {
|
||||
void set(int64_t val, const TagSet &tag);
|
||||
auto value() { return val_.load(); }
|
||||
|
||||
TagRef<ValueRecorder> getRecoderWithTag(const TagSet &tag) { return Recorder::getRecoderWithTag<ValueRecorder>(tag); }
|
||||
TagRef<ValueRecorder> getRecorderWithTag(const TagSet &tag) { return Recorder::getRecorderWithTag<ValueRecorder>(tag); }
|
||||
|
||||
private:
|
||||
ValueRecorder(MonitorInstance &monitor,
|
||||
|
||||
@ -18,7 +18,7 @@ DynamicCoroutinesPool::DynamicCoroutinesPool(const Config &config, std::string_v
|
||||
guard_(config_.addCallbackGuard([&] { setCoroutinesNum(config_.coroutines_num()); })),
|
||||
executor_(std::make_pair(config.threads_num(), config.threads_num()),
|
||||
std::make_shared<folly::NamedThreadFactory>(name)),
|
||||
coroutinesNumRecorder_(currentCoroutinesNum.getRecoderWithTag(monitor::instanceTagSet(std::string{name}))) {}
|
||||
coroutinesNumRecorder_(currentCoroutinesNum.getRecorderWithTag(monitor::instanceTagSet(std::string{name}))) {}
|
||||
|
||||
Result<Void> DynamicCoroutinesPool::start() { return setCoroutinesNum(config_.coroutines_num()); }
|
||||
|
||||
|
||||
@ -18,12 +18,12 @@ monitor::CountRecorder totalTaskCountRecorder{"thread_pool_group.total_task_coun
|
||||
template <class T>
|
||||
ExecutorStatsReporter<T>::ExecutorStatsReporter(const T &executor)
|
||||
: executor_(executor),
|
||||
threadCountRecorder_(threadCountRecorder.getRecoderWithTag(monitor::threadTagSet(executor.getName()))),
|
||||
idleThreadCountRecorder_(idleThreadCountRecorder.getRecoderWithTag(monitor::threadTagSet(executor.getName()))),
|
||||
threadCountRecorder_(threadCountRecorder.getRecorderWithTag(monitor::threadTagSet(executor.getName()))),
|
||||
idleThreadCountRecorder_(idleThreadCountRecorder.getRecorderWithTag(monitor::threadTagSet(executor.getName()))),
|
||||
activeThreadCountRecorder_(
|
||||
activeThreadCountRecorder.getRecoderWithTag(monitor::threadTagSet(executor.getName()))),
|
||||
pendingTaskCountRecorder_(pendingTaskCountRecorder.getRecoderWithTag(monitor::threadTagSet(executor.getName()))),
|
||||
totalTaskCountRecorder_(totalTaskCountRecorder.getRecoderWithTag(monitor::threadTagSet(executor.getName()))) {}
|
||||
activeThreadCountRecorder.getRecorderWithTag(monitor::threadTagSet(executor.getName()))),
|
||||
pendingTaskCountRecorder_(pendingTaskCountRecorder.getRecorderWithTag(monitor::threadTagSet(executor.getName()))),
|
||||
totalTaskCountRecorder_(totalTaskCountRecorder.getRecorderWithTag(monitor::threadTagSet(executor.getName()))) {}
|
||||
|
||||
template <class T>
|
||||
void ExecutorStatsReporter<T>::report() {
|
||||
|
||||
@ -15,16 +15,16 @@
|
||||
|
||||
namespace hf3fs::storage {
|
||||
|
||||
monitor::OperationRecorder storageAioEnqueueRecoder{"storage.aio_enqueue"};
|
||||
monitor::OperationRecorder storageWaitAioRecoder{"storage.wait_aio"};
|
||||
monitor::OperationRecorder storageWaitSemRecoder{"storage.wait_sem"};
|
||||
monitor::OperationRecorder storageWaitBatchRecoder{"storage.wait_batch"};
|
||||
monitor::OperationRecorder storageWaitPostRecoder{"storage.wait_post"};
|
||||
monitor::OperationRecorder storageWaitAioAndPostRecoder{"storage.wait_aio_and_post"};
|
||||
monitor::OperationRecorder storageAioEnqueueRecorder{"storage.aio_enqueue"};
|
||||
monitor::OperationRecorder storageWaitAioRecorder{"storage.wait_aio"};
|
||||
monitor::OperationRecorder storageWaitSemRecorder{"storage.wait_sem"};
|
||||
monitor::OperationRecorder storageWaitBatchRecorder{"storage.wait_batch"};
|
||||
monitor::OperationRecorder storageWaitPostRecorder{"storage.wait_post"};
|
||||
monitor::OperationRecorder storageWaitAioAndPostRecorder{"storage.wait_aio_and_post"};
|
||||
monitor::OperationRecorder storageReadPrepareTarget{"storage.read_prepare_target"};
|
||||
monitor::OperationRecorder storageReadPrepareBuffer{"storage.read_prepare_buffer"};
|
||||
|
||||
monitor::OperationRecorder storageReqReadRecoder{"storage.req_read"};
|
||||
monitor::OperationRecorder storageReqReadRecorder{"storage.req_read"};
|
||||
monitor::DistributionRecorder storageReqReadSize{"storage.req_read.size"};
|
||||
monitor::CountRecorder storageReadCount{"storage.read.count"};
|
||||
monitor::CountRecorder storageReadBytes{"storage.read.bytes"};
|
||||
@ -33,29 +33,29 @@ monitor::CountRecorder aioTotalHeadLength{"storage.aio_align.total_head_length"}
|
||||
monitor::CountRecorder aioTotalTailLength{"storage.aio_align.total_tail_length"};
|
||||
monitor::CountRecorder aioTotalAlignedLength{"storage.aio_align.total_length"};
|
||||
|
||||
monitor::OperationRecorder storageReqWriteRecoder{"storage.req_write"};
|
||||
monitor::OperationRecorder storageReqWriteRecorder{"storage.req_write"};
|
||||
monitor::CountRecorder storageWriteBytes{"storage.req_write.bytes"};
|
||||
|
||||
monitor::OperationRecorder storageReqUpdateRecoder{"storage.req_update"};
|
||||
monitor::OperationRecorder storageReqUpdateRecorder{"storage.req_update"};
|
||||
monitor::CountRecorder storageUpdateBytes{"storage.req_update.bytes"};
|
||||
|
||||
monitor::CountRecorder storageTotalWriteBytes{"storage.write.bytes"};
|
||||
|
||||
monitor::OperationRecorder storageDoUpdateRecoder{"storage.do_update"};
|
||||
monitor::OperationRecorder storageWriteWaitSemRecoder{"storage.write_wait_sem"};
|
||||
monitor::OperationRecorder storageWriteWaitPostRecoder{"storage.write_wait_post"};
|
||||
monitor::OperationRecorder storageDoCommitRecoder{"storage.do_commit"};
|
||||
monitor::OperationRecorder storageDoQueryRecoder{"storage.do_query"};
|
||||
monitor::OperationRecorder storageDoUpdateRecorder{"storage.do_update"};
|
||||
monitor::OperationRecorder storageWriteWaitSemRecorder{"storage.write_wait_sem"};
|
||||
monitor::OperationRecorder storageWriteWaitPostRecorder{"storage.write_wait_post"};
|
||||
monitor::OperationRecorder storageDoCommitRecorder{"storage.do_commit"};
|
||||
monitor::OperationRecorder storageDoQueryRecorder{"storage.do_query"};
|
||||
monitor::CountRecorder storageNumChunksInQueryRes{"storage.do_query.num_chunks"};
|
||||
monitor::OperationRecorder waitChunkLockRecorder{"storage.wait_chunk_lock"};
|
||||
monitor::OperationRecorder storageDoTruncateRecoder{"storage.do_truncate"};
|
||||
monitor::OperationRecorder storageDoRemoveRecoder{"storage.do_remove"};
|
||||
monitor::OperationRecorder storageDoTruncateRecorder{"storage.do_truncate"};
|
||||
monitor::OperationRecorder storageDoRemoveRecorder{"storage.do_remove"};
|
||||
monitor::CountRecorder storageNumChunksRemoved{"storage.do_remove.num_chunks"};
|
||||
monitor::OperationRecorder syncStartRecorder{"storage.sync_start"};
|
||||
monitor::OperationRecorder syncDoneRecorder{"storage.sync_done"};
|
||||
|
||||
monitor::OperationRecorder storageReqRemoveChunksRecoder{"storage.req_remove_chunks"};
|
||||
monitor::OperationRecorder storageRemoveRangeRecoder{"storage.remove_range"};
|
||||
monitor::OperationRecorder storageReqRemoveChunksRecorder{"storage.req_remove_chunks"};
|
||||
monitor::OperationRecorder storageRemoveRangeRecorder{"storage.remove_range"};
|
||||
|
||||
Result<Void> StorageOperator::init(uint32_t numberOfDisks) {
|
||||
storageReadAvgBytes.setLambda([&] {
|
||||
@ -84,7 +84,7 @@ CoTryTask<BatchReadRsp> StorageOperator::batchRead(ServiceRequestContext &reques
|
||||
serde::CallContext &ctx) {
|
||||
XLOGF(DBG5, "Received batch read request {} with tag {} and {} IOs", fmt::ptr(&req), req.tag, req.payloads.size());
|
||||
|
||||
auto recordGuard = storageReqReadRecoder.record(monitor::instanceTagSet(std::to_string(req.userInfo.uid)));
|
||||
auto recordGuard = storageReqReadRecorder.record(monitor::instanceTagSet(std::to_string(req.userInfo.uid)));
|
||||
|
||||
auto prepareTargetRecordGuard = storageReadPrepareTarget.record();
|
||||
auto snapshot = components_.targetMap.snapshot();
|
||||
@ -160,7 +160,7 @@ CoTryTask<BatchReadRsp> StorageOperator::batchRead(ServiceRequestContext &reques
|
||||
batch.finish(&*it);
|
||||
}
|
||||
} else {
|
||||
auto recordGuard = storageAioEnqueueRecoder.record();
|
||||
auto recordGuard = storageAioEnqueueRecorder.record();
|
||||
auto splitSize = config_.batch_read_job_split_size();
|
||||
for (uint32_t start = 0; start < batchSize; start += splitSize) {
|
||||
co_await components_.aioReadWorker.enqueue(AioReadJobIterator(&batch, start, splitSize));
|
||||
@ -168,8 +168,8 @@ CoTryTask<BatchReadRsp> StorageOperator::batchRead(ServiceRequestContext &reques
|
||||
recordGuard.report(true);
|
||||
}
|
||||
|
||||
auto waitAioAndPostRecordGuard = storageWaitAioAndPostRecoder.record();
|
||||
auto waitAioRecordGuard = storageWaitAioRecoder.record();
|
||||
auto waitAioAndPostRecordGuard = storageWaitAioAndPostRecorder.record();
|
||||
auto waitAioRecordGuard = storageWaitAioRecorder.record();
|
||||
co_await batch.complete();
|
||||
waitAioRecordGuard.report(true);
|
||||
|
||||
@ -182,7 +182,7 @@ CoTryTask<BatchReadRsp> StorageOperator::batchRead(ServiceRequestContext &reques
|
||||
co_return makeError(StatusCode::kInvalidArg, "batch read no RDMA socket");
|
||||
}
|
||||
|
||||
auto waitBatchRecordGuard = storageWaitBatchRecoder.record();
|
||||
auto waitBatchRecordGuard = storageWaitBatchRecorder.record();
|
||||
auto writeBatch = ctx.writeTransmission();
|
||||
batch.addBufferToBatch(writeBatch);
|
||||
waitBatchRecordGuard.report(true);
|
||||
@ -203,7 +203,7 @@ CoTryTask<BatchReadRsp> StorageOperator::batchRead(ServiceRequestContext &reques
|
||||
}
|
||||
|
||||
auto ibdevTagSet = monitor::instanceTagSet(ibSocket->device()->name());
|
||||
auto waitSemRecordGuard = storageWaitSemRecoder.record(ibdevTagSet);
|
||||
auto waitSemRecordGuard = storageWaitSemRecorder.record(ibdevTagSet);
|
||||
SemaphoreGuard guard(rdmaSemaphoreIter->second);
|
||||
co_await guard.coWait();
|
||||
waitSemRecordGuard.report(true);
|
||||
@ -212,7 +212,7 @@ CoTryTask<BatchReadRsp> StorageOperator::batchRead(ServiceRequestContext &reques
|
||||
co_await writeBatch.applyTransmission(RDMATransmissionReqTimeout);
|
||||
}
|
||||
|
||||
auto waitPostRecordGuard = storageWaitPostRecoder.record(ibdevTagSet);
|
||||
auto waitPostRecordGuard = storageWaitPostRecorder.record(ibdevTagSet);
|
||||
auto postResult = FAULT_INJECTION_POINT(requestCtx.debugFlags.injectServerError(),
|
||||
makeError(RPCCode::kRDMAPostFailed),
|
||||
(co_await writeBatch.post()));
|
||||
@ -233,7 +233,7 @@ CoTryTask<BatchReadRsp> StorageOperator::batchRead(ServiceRequestContext &reques
|
||||
CoTryTask<WriteRsp> StorageOperator::write(ServiceRequestContext &requestCtx,
|
||||
const WriteReq &req,
|
||||
net::IBSocket *ibSocket) {
|
||||
auto recordGuard = storageReqWriteRecoder.record(monitor::instanceTagSet(std::to_string(req.userInfo.uid)));
|
||||
auto recordGuard = storageReqWriteRecorder.record(monitor::instanceTagSet(std::to_string(req.userInfo.uid)));
|
||||
|
||||
XLOGF(DBG1,
|
||||
"Received write request {} with tag {} to chunk {} on {}",
|
||||
@ -284,7 +284,7 @@ CoTryTask<WriteRsp> StorageOperator::write(ServiceRequestContext &requestCtx,
|
||||
CoTryTask<UpdateRsp> StorageOperator::update(ServiceRequestContext &requestCtx,
|
||||
const UpdateReq &updateReq,
|
||||
net::IBSocket *ibSocket) {
|
||||
auto recordGuard = storageReqUpdateRecoder.record(monitor::instanceTagSet(std::to_string(updateReq.userInfo.uid)));
|
||||
auto recordGuard = storageReqUpdateRecorder.record(monitor::instanceTagSet(std::to_string(updateReq.userInfo.uid)));
|
||||
|
||||
auto req = updateReq;
|
||||
XLOGF(DBG1,
|
||||
@ -523,7 +523,7 @@ CoTask<IOResult> StorageOperator::doUpdate(ServiceRequestContext &requestCtx,
|
||||
net::RDMARemoteBuf &remoteBuf,
|
||||
ChunkEngineUpdateJob &chunkEngineJob,
|
||||
bool allowToAllocate) {
|
||||
auto recordGuard = storageDoUpdateRecoder.record();
|
||||
auto recordGuard = storageDoUpdateRecorder.record();
|
||||
UpdateJob job(requestCtx, updateIO, updateOptions, chunkEngineJob, target, allowToAllocate);
|
||||
|
||||
if (BITFLAGS_CONTAIN(featureFlags, FeatureFlags::SEND_DATA_INLINE)) {
|
||||
@ -575,12 +575,12 @@ CoTask<IOResult> StorageOperator::doUpdate(ServiceRequestContext &requestCtx,
|
||||
}
|
||||
|
||||
auto ibdevTagSet = monitor::instanceTagSet(ibSocket->device()->name());
|
||||
auto waitSemRecordGuard = storageWriteWaitSemRecoder.record(ibdevTagSet);
|
||||
auto waitSemRecordGuard = storageWriteWaitSemRecorder.record(ibdevTagSet);
|
||||
SemaphoreGuard guard(rdmaSemaphoreIter->second);
|
||||
co_await guard.coWait();
|
||||
waitSemRecordGuard.report(true);
|
||||
|
||||
auto waitPostRecordGuard = storageWriteWaitPostRecoder.record(ibdevTagSet);
|
||||
auto waitPostRecordGuard = storageWriteWaitPostRecorder.record(ibdevTagSet);
|
||||
auto postResult = co_await readBatch.post();
|
||||
if (UNLIKELY(!postResult)) {
|
||||
XLOGF(ERR, "write post RDMA failed, req {}, error {}", updateIO, postResult.error());
|
||||
@ -614,7 +614,7 @@ CoTask<IOResult> StorageOperator::doCommit(ServiceRequestContext &requestCtx,
|
||||
ChunkEngineUpdateJob &chunkEngineJob,
|
||||
uint32_t featureFlags,
|
||||
const std::shared_ptr<StorageTarget> &target) {
|
||||
auto recordGuard = storageDoCommitRecoder.record();
|
||||
auto recordGuard = storageDoCommitRecorder.record();
|
||||
UpdateJob job(requestCtx, commitIO, updateOptions, chunkEngineJob, target);
|
||||
if (BITFLAGS_CONTAIN(featureFlags, FeatureFlags::BYPASS_DISKIO)) {
|
||||
job.setResult(0);
|
||||
@ -633,7 +633,7 @@ CoTask<IOResult> StorageOperator::doCommit(ServiceRequestContext &requestCtx,
|
||||
Result<std::vector<std::pair<ChunkId, ChunkMetadata>>> StorageOperator::doQuery(ServiceRequestContext &requestCtx,
|
||||
const VersionedChainId &vChainId,
|
||||
const ChunkIdRange &chunkIdRange) {
|
||||
auto recordGuard = storageDoQueryRecoder.record();
|
||||
auto recordGuard = storageDoQueryRecorder.record();
|
||||
// get target for chunk query from client.
|
||||
CHECK_RESULT(target, components_.targetMap.getByChainId(vChainId));
|
||||
|
||||
@ -754,7 +754,7 @@ CoTask<IOResult> StorageOperator::doTruncate(ServiceRequestContext &requestCtx,
|
||||
const TruncateChunkOp &op,
|
||||
flat::UserInfo userInfo,
|
||||
uint32_t featureFlags) {
|
||||
auto recordGuard = storageDoTruncateRecoder.record();
|
||||
auto recordGuard = storageDoTruncateRecorder.record();
|
||||
UpdateIO updateIO{0 /*offset*/,
|
||||
op.chunkLen,
|
||||
op.chunkSize,
|
||||
@ -802,7 +802,7 @@ CoTask<IOResult> StorageOperator::doRemove(ServiceRequestContext &requestCtx,
|
||||
const RemoveChunksOp &op,
|
||||
flat::UserInfo userInfo,
|
||||
uint32_t featureFlags) {
|
||||
auto recordGuard = storageDoRemoveRecoder.record();
|
||||
auto recordGuard = storageDoRemoveRecorder.record();
|
||||
// this method requires that the chunk id range specifies one chunk
|
||||
assert(op.chunkIdRange.begin == op.chunkIdRange.end);
|
||||
UpdateIO updateIO{0 /*offset*/,
|
||||
@ -935,14 +935,14 @@ CoTryTask<TruncateChunksRsp> StorageOperator::truncateChunks(ServiceRequestConte
|
||||
|
||||
CoTryTask<RemoveChunksRsp> StorageOperator::removeChunks(ServiceRequestContext &requestCtx,
|
||||
const RemoveChunksReq &req) {
|
||||
auto recordGuard = storageReqRemoveChunksRecoder.record();
|
||||
auto recordGuard = storageReqRemoveChunksRecorder.record();
|
||||
XLOGF(DBG7, "Remove request {} with {} ops", fmt::ptr(&req), req.payloads.size());
|
||||
|
||||
RemoveChunksRsp rsp;
|
||||
rsp.results.reserve(req.payloads.size());
|
||||
|
||||
for (const auto &payload : req.payloads) {
|
||||
auto recordGuard = storageRemoveRangeRecoder.record();
|
||||
auto recordGuard = storageRemoveRangeRecorder.record();
|
||||
RemoveChunksResult removeRes{Void{}, 0 /*numChunksRemoved*/, false /*moreChunksInRange*/};
|
||||
auto removeOp = payload;
|
||||
|
||||
|
||||
@ -67,13 +67,13 @@ StorageTarget::StorageTarget(const Config &config,
|
||||
engine_(engine),
|
||||
diskTag_(monitor::instanceTagSet(std::to_string(diskIndex))),
|
||||
targetTag_(monitor::instanceTagSet(std::to_string(0))),
|
||||
readCountPerDisk_(aioReadCountPerDisk.getRecoderWithTag(diskTag_)),
|
||||
readBytesPerDisk_(aioReadBytesPerDisk.getRecoderWithTag(diskTag_)),
|
||||
readSuccBytesPerDisk_(aioReadSuccBytesPerDisk.getRecoderWithTag(diskTag_)),
|
||||
readSuccLatencyPerDisk_(aioReadSuccLatencyPerDisk.getRecoderWithTag(diskTag_)),
|
||||
targetUsedSize_(targetUsedSize.getRecoderWithTag(targetTag_)),
|
||||
targetReservedSize_(targetReservedSize.getRecoderWithTag(targetTag_)),
|
||||
targetUnrecycledSize_(targetUnrecycledSize.getRecoderWithTag(targetTag_)),
|
||||
readCountPerDisk_(aioReadCountPerDisk.getRecorderWithTag(diskTag_)),
|
||||
readBytesPerDisk_(aioReadBytesPerDisk.getRecorderWithTag(diskTag_)),
|
||||
readSuccBytesPerDisk_(aioReadSuccBytesPerDisk.getRecorderWithTag(diskTag_)),
|
||||
readSuccLatencyPerDisk_(aioReadSuccLatencyPerDisk.getRecorderWithTag(diskTag_)),
|
||||
targetUsedSize_(targetUsedSize.getRecorderWithTag(targetTag_)),
|
||||
targetReservedSize_(targetReservedSize.getRecorderWithTag(targetTag_)),
|
||||
targetUnrecycledSize_(targetUnrecycledSize.getRecorderWithTag(targetTag_)),
|
||||
chunkStore_(config_, globalFileStore) {}
|
||||
|
||||
StorageTarget::~StorageTarget() {
|
||||
@ -154,9 +154,9 @@ Result<Void> StorageTarget::create(const PhysicalConfig &config) {
|
||||
}
|
||||
*chunkSizeList_.lock() = {targetConfig_.chunk_size_list.begin(), targetConfig_.chunk_size_list.end()};
|
||||
targetTag_ = monitor::instanceTagSet(std::to_string(targetConfig_.target_id));
|
||||
targetUsedSize_ = targetUsedSize.getRecoderWithTag(targetTag_);
|
||||
targetReservedSize_ = targetReservedSize.getRecoderWithTag(targetTag_);
|
||||
targetUnrecycledSize_ = targetUnrecycledSize.getRecoderWithTag(targetTag_);
|
||||
targetUsedSize_ = targetUsedSize.getRecorderWithTag(targetTag_);
|
||||
targetReservedSize_ = targetReservedSize.getRecorderWithTag(targetTag_);
|
||||
targetUnrecycledSize_ = targetUnrecycledSize.getRecorderWithTag(targetTag_);
|
||||
return Void{};
|
||||
}
|
||||
|
||||
@ -203,9 +203,9 @@ Result<Void> StorageTarget::load(const Path &path) {
|
||||
}
|
||||
*chunkSizeList_.lock() = {targetConfig_.chunk_size_list.begin(), targetConfig_.chunk_size_list.end()};
|
||||
targetTag_ = monitor::instanceTagSet(std::to_string(targetConfig_.target_id));
|
||||
targetUsedSize_ = targetUsedSize.getRecoderWithTag(targetTag_);
|
||||
targetReservedSize_ = targetReservedSize.getRecoderWithTag(targetTag_);
|
||||
targetUnrecycledSize_ = targetUnrecycledSize.getRecoderWithTag(targetTag_);
|
||||
targetUsedSize_ = targetUsedSize.getRecorderWithTag(targetTag_);
|
||||
targetReservedSize_ = targetReservedSize.getRecorderWithTag(targetTag_);
|
||||
targetUnrecycledSize_ = targetUnrecycledSize.getRecorderWithTag(targetTag_);
|
||||
return Void{};
|
||||
}
|
||||
|
||||
|
||||
@ -146,7 +146,7 @@ CoTryTask<void> ResyncWorker::handleSync(VersionedChainId vChainId) {
|
||||
uint32_t currentSyncingSkipCount = 0;
|
||||
auto recordGuard = resyncRecorder.record(tag);
|
||||
|
||||
auto remainingChunksCount = syncingRemainingChunksCount.getRecoderWithTag(tag);
|
||||
auto remainingChunksCount = syncingRemainingChunksCount.getRecorderWithTag(tag);
|
||||
SCOPE_EXIT { remainingChunksCount->set(0); };
|
||||
|
||||
// 3. sync start.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user