mirror of
https://github.com/deepseek-ai/3FS
synced 2025-05-10 15:40:38 +00:00
316 lines
9.9 KiB
OpenEdge ABL
316 lines
9.9 KiB
OpenEdge ABL
/* Storage Client */
|
|
|
|
type tWriteArgs = (from: machine, chunkId: tChunkId, offset: int, length: int, dataBytes: tBytes);
|
|
type tReadArgs = (from: machine, chunkId: tChunkId, offset: int, length: int);
|
|
type tWriteRes = (status: tErrorCode, chunkId: tChunkId, commitVer: tChunkVer);
|
|
type tReadRes = (status: tErrorCode, chunkId: tChunkId, chunkMetadata: tChunkMetadata, dataBytes: tBytes);
|
|
|
|
event eSubmitWrite : tWriteArgs;
|
|
event eSubmitRead : tReadArgs;
|
|
event eWriteComplete : tWriteRes;
|
|
event eReadComplete : tReadRes;
|
|
event eWaitConnected : machine;
|
|
event eClientConnected;
|
|
|
|
machine StorageClient {
|
|
var clientId: tNodeId;
|
|
var mgmtService: MgmtService;
|
|
var mgmtClient: MgmtClient;
|
|
// var timer: Timer;
|
|
|
|
var routingVer: tRoutingVer;
|
|
var replicaChains: tReplicaChainMap;
|
|
|
|
var nextRequestId: tRequestId;
|
|
|
|
var clientUsers: set[machine];
|
|
var submittedWrites: map[tMessageTag, tWriteArgs];
|
|
var submittedReads: map[tMessageTag, tReadArgs];
|
|
var inflightWriteReqs: map[tMessageTag, tWriteReq];
|
|
var inflightReadReqs: map[tMessageTag, tReadReq];
|
|
|
|
fun newMessageTag(): tMessageTag {
|
|
nextRequestId = nextRequestId + 1;
|
|
return (nodeId = clientId, requestId = nextRequestId);
|
|
}
|
|
|
|
fun calcGlobalKeyFromChunkId(chunkId: tChainId): tGlobalKey {
|
|
var chainIds: seq[tChainId];
|
|
var targetChain: tChainId;
|
|
var replicaChain: tReplicaChain;
|
|
|
|
chainIds = keys(replicaChains);
|
|
targetChain = chainIds[chunkId % sizeof(chainIds)];
|
|
replicaChain = replicaChains[targetChain];
|
|
|
|
return (vChainId = replicaChain.vChainId, chunkId = chunkId);
|
|
}
|
|
|
|
fun processRoutingInfo(routingInfo: tRoutingInfo) {
|
|
var newRoutingVer: tRoutingVer;
|
|
var newReplicaChains: tReplicaChainMap;
|
|
var replicaChain: tReplicaChain;
|
|
var targetId: tTargetId;
|
|
var chainId: tChainId;
|
|
var nodeId: tNodeId;
|
|
var services: seq[StorageService];
|
|
|
|
newRoutingVer = routingInfo.routingVer;
|
|
newReplicaChains = routingInfo.replicaChains;
|
|
|
|
if (routingVer > newRoutingVer) {
|
|
print format("{0}: error: routingVer {1} > newRoutingVer {2}", this, routingVer, newRoutingVer);
|
|
return;
|
|
} else if (routingVer == newRoutingVer) {
|
|
print format("{0}: ignore: routingVer {1} == newRoutingVer {2}", this, routingVer, newRoutingVer);
|
|
return;
|
|
}
|
|
|
|
print format("{0}: updating replica chains from version {1} to {2}", this, routingVer, newRoutingVer);
|
|
routingVer = newRoutingVer;
|
|
|
|
foreach (chainId in keys(replicaChains)) {
|
|
if (!(chainId in newReplicaChains))
|
|
replicaChains -= (chainId);
|
|
}
|
|
|
|
foreach (chainId in keys(newReplicaChains)) {
|
|
replicaChains[chainId] = newReplicaChains[chainId];
|
|
replicaChain = replicaChains[chainId];
|
|
|
|
print format("{0}: new replica chain {1}, targets: {2}, services: {3}",
|
|
this, newReplicaChains[chainId].vChainId, newReplicaChains[chainId].targets, newReplicaChains[chainId].services);
|
|
}
|
|
}
|
|
|
|
// fun onSendHeartbeatEvent(heartbeatConns: tHeartbeatConns) {
|
|
// send heartbeatConns.mgmtService, eRegisterClientMsg, (from = heartbeatConns.mgmtClient, nodeId = clientId, storageClient = this);
|
|
// }
|
|
|
|
fun chooseServingTarget(replicaChain: tReplicaChain): tTargetId {
|
|
var targetId: tTargetId;
|
|
var servingTargetIds: set[tTargetId];
|
|
|
|
targetId = replicaChain.targets[0];
|
|
|
|
if (replicaChain.states[targetId] == PublicTargetState_SERVING) {
|
|
return targetId;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
fun sendWriteReq(writeReq: tWriteReq) {
|
|
var replicaChain: tReplicaChain;
|
|
var targetId: tTargetId;
|
|
var targetService: StorageService;
|
|
|
|
// get the latest chain and update versioned chain id
|
|
replicaChain = replicaChains[writeReq.key.vChainId.chainId];
|
|
writeReq.key.vChainId = replicaChain.vChainId;
|
|
|
|
targetId = chooseServingTarget(replicaChain);
|
|
if (targetId > 0) {
|
|
print format("{0}: send write request #{1}: {2}", this, writeReq.retries, writeReq);
|
|
targetService = replicaChain.services[targetId];
|
|
send targetService, eWriteReq, writeReq;
|
|
}
|
|
}
|
|
|
|
fun reissueWriteReq(reqTag: tMessageTag) {
|
|
inflightWriteReqs[reqTag].retries = inflightWriteReqs[reqTag].retries + 1;
|
|
sendWriteReq(inflightWriteReqs[reqTag]);
|
|
}
|
|
|
|
fun sendReadReq(readReq: tReadReq) {
|
|
var replicaChain: tReplicaChain;
|
|
var targetId: tTargetId;
|
|
var targetService: StorageService;
|
|
|
|
// get the latest chain and update versioned chain id
|
|
replicaChain = replicaChains[readReq.key.vChainId.chainId];
|
|
readReq.key.vChainId = replicaChain.vChainId;
|
|
|
|
targetId = chooseServingTarget(replicaChain);
|
|
if (targetId > 0) {
|
|
print format("{0}: send read request #{1}: {2}", this, readReq.retries, readReq);
|
|
targetService = replicaChain.services[targetId];
|
|
send targetService, eReadReq, readReq;
|
|
}
|
|
}
|
|
|
|
fun reissueReadReq(reqTag: tMessageTag) {
|
|
sendReadReq(inflightReadReqs[reqTag]);
|
|
inflightReadReqs[reqTag].retries = inflightReadReqs[reqTag].retries + 1;
|
|
}
|
|
|
|
fun processInflightWriteReqs() {
|
|
var oldChainId: tVersionedChainId;
|
|
var newChainId: tVersionedChainId;
|
|
var writeReq: tWriteReq;
|
|
|
|
foreach (writeReq in values(inflightWriteReqs)) {
|
|
oldChainId = writeReq.key.vChainId;
|
|
newChainId = replicaChains[oldChainId.chainId].vChainId;
|
|
if (oldChainId != newChainId) {
|
|
print format("{0}: chain version updated: {1} --> {2}, reissuing request {3}", this, oldChainId, newChainId, writeReq);
|
|
reissueWriteReq(writeReq.tag);
|
|
}
|
|
}
|
|
}
|
|
|
|
fun processInflightReadReqs() {
|
|
var oldChainId: tVersionedChainId;
|
|
var newChainId: tVersionedChainId;
|
|
var readReq: tReadReq;
|
|
|
|
foreach (readReq in values(inflightReadReqs)) {
|
|
oldChainId = readReq.key.vChainId;
|
|
newChainId = replicaChains[oldChainId.chainId].vChainId;
|
|
if (oldChainId != newChainId) {
|
|
print format("{0}: chain version updated: {1} --> {2}, reissuing request {3}", this, oldChainId, newChainId, readReq);
|
|
reissueReadReq(readReq.tag);
|
|
}
|
|
}
|
|
}
|
|
|
|
start state Init {
|
|
ignore eSendHeartbeat;
|
|
|
|
entry (args: (clientId: tNodeId, mgmtService: MgmtService)) {
|
|
clientId = args.clientId;
|
|
mgmtService = args.mgmtService;
|
|
mgmtClient = new MgmtClient((nodeId = clientId, clientHost = this, mgmtService = mgmtService, sendHeartbeats = false));
|
|
// timer = new Timer(this);
|
|
}
|
|
|
|
on eWaitConnected do (user: machine) {
|
|
clientUsers += (user);
|
|
}
|
|
|
|
// on eSendHeartbeat do onSendHeartbeatEvent;
|
|
|
|
on eNewRoutingInfo goto WaitForReqs with processRoutingInfo;
|
|
}
|
|
|
|
state WaitForReqs {
|
|
ignore eSendHeartbeat;
|
|
|
|
entry {
|
|
var user: machine;
|
|
foreach (user in clientUsers) {
|
|
send user, eClientConnected;
|
|
}
|
|
// StartTimer(timer);
|
|
}
|
|
|
|
on eWaitConnected do (user: machine) {
|
|
send user, eClientConnected;
|
|
}
|
|
|
|
on eShutDown goto Stopped with (from: machine) {
|
|
print format("{0} is going to shutdown", this);
|
|
send mgmtClient, eShutDown, this;
|
|
}
|
|
|
|
// on eSendHeartbeat do onSendHeartbeatEvent;
|
|
|
|
on eNewRoutingInfo do (routingInfo: tRoutingInfo) {
|
|
processRoutingInfo(routingInfo);
|
|
processInflightWriteReqs();
|
|
processInflightReadReqs();
|
|
}
|
|
|
|
// on eTimeOut do {
|
|
// processInflightWriteReqs();
|
|
// processInflightReadReqs();
|
|
// StartTimer(timer);
|
|
// }
|
|
|
|
on eSubmitWrite do (writeArgs: tWriteArgs) {
|
|
var writeReq: tWriteReq;
|
|
|
|
writeReq = (from = this,
|
|
retries = 1,
|
|
tag = newMessageTag(),
|
|
key = calcGlobalKeyFromChunkId(writeArgs.chunkId),
|
|
updateVer = 0,
|
|
commitChainVer = 0,
|
|
fullChunkReplace = false,
|
|
removeChunk = writeArgs.dataBytes == default(tBytes),
|
|
fromClient = true,
|
|
offset = writeArgs.offset, length = writeArgs.length,
|
|
dataBytes = writeArgs.dataBytes);
|
|
|
|
sendWriteReq(writeReq);
|
|
|
|
submittedWrites += (writeReq.tag, writeArgs);
|
|
inflightWriteReqs += (writeReq.tag, writeReq);
|
|
}
|
|
|
|
on eSubmitRead do (readArgs: tReadArgs) {
|
|
var readReq: tReadReq;
|
|
|
|
readReq = (from = this,
|
|
retries = 1,
|
|
tag = newMessageTag(),
|
|
key = calcGlobalKeyFromChunkId(readArgs.chunkId),
|
|
offset = readArgs.offset, length = readArgs.length);
|
|
|
|
sendReadReq(readReq);
|
|
|
|
submittedReads += (readReq.tag, readArgs);
|
|
inflightReadReqs += (readReq.tag, readReq);
|
|
}
|
|
|
|
on eWriteResp do (writeResp: tWriteResp) {
|
|
if (!(writeResp.tag in inflightWriteReqs)) {
|
|
print format("{0}: got response for completed write request: {1}", this, writeResp.key);
|
|
return;
|
|
}
|
|
|
|
if (writeResp.status == ErrorCode_CHAIN_VERION_MISMATCH) {
|
|
print format("{0}: retry write request: {1}", this, writeResp.key);
|
|
reissueWriteReq(writeResp.tag);
|
|
return;
|
|
}
|
|
|
|
print format("{0}: write response {1}", this, writeResp);
|
|
|
|
send submittedWrites[writeResp.tag].from, eWriteComplete, (status = writeResp.status, chunkId = writeResp.key.chunkId, commitVer = writeResp.commitVer);
|
|
|
|
submittedWrites -= (writeResp.tag);
|
|
inflightWriteReqs -= (writeResp.tag);
|
|
}
|
|
|
|
on eReadResp do (readResp: tReadResp) {
|
|
if (!(readResp.tag in inflightReadReqs)) {
|
|
return;
|
|
}
|
|
|
|
if ((readResp.status == ErrorCode_CHAIN_VERION_MISMATCH) || (readResp.status == ErrorCode_CHUNK_NOT_COMMIT)) {
|
|
print format("{0}: retry read request: {1}", this, readResp.key);
|
|
reissueReadReq(readResp.tag);
|
|
return;
|
|
}
|
|
|
|
print format("{0}: read response {1}", this, readResp);
|
|
|
|
send submittedReads[readResp.tag].from, eReadComplete, (status = readResp.status, chunkId = readResp.key.chunkId,
|
|
chunkMetadata = readResp.chunkMetadata, dataBytes = readResp.dataBytes);
|
|
|
|
submittedReads -= (readResp.tag);
|
|
inflightReadReqs -= (readResp.tag);
|
|
}
|
|
}
|
|
|
|
state Stopped {
|
|
ignore eReadResp, eWriteResp, eSendHeartbeat, eNewRoutingInfo, eTimeOut;
|
|
|
|
entry {
|
|
// CancelTimer(timer);
|
|
print format("{0} stopped", this);
|
|
}
|
|
}
|
|
}
|