3FS/specs/DataStorage/PSrc/StorageClient.p
2025-02-27 21:53:53 +08:00

316 lines
9.9 KiB
OpenEdge ABL

/* Storage Client */
type tWriteArgs = (from: machine, chunkId: tChunkId, offset: int, length: int, dataBytes: tBytes);
type tReadArgs = (from: machine, chunkId: tChunkId, offset: int, length: int);
type tWriteRes = (status: tErrorCode, chunkId: tChunkId, commitVer: tChunkVer);
type tReadRes = (status: tErrorCode, chunkId: tChunkId, chunkMetadata: tChunkMetadata, dataBytes: tBytes);
event eSubmitWrite : tWriteArgs;
event eSubmitRead : tReadArgs;
event eWriteComplete : tWriteRes;
event eReadComplete : tReadRes;
event eWaitConnected : machine;
event eClientConnected;
machine StorageClient {
var clientId: tNodeId;
var mgmtService: MgmtService;
var mgmtClient: MgmtClient;
// var timer: Timer;
var routingVer: tRoutingVer;
var replicaChains: tReplicaChainMap;
var nextRequestId: tRequestId;
var clientUsers: set[machine];
var submittedWrites: map[tMessageTag, tWriteArgs];
var submittedReads: map[tMessageTag, tReadArgs];
var inflightWriteReqs: map[tMessageTag, tWriteReq];
var inflightReadReqs: map[tMessageTag, tReadReq];
fun newMessageTag(): tMessageTag {
nextRequestId = nextRequestId + 1;
return (nodeId = clientId, requestId = nextRequestId);
}
fun calcGlobalKeyFromChunkId(chunkId: tChainId): tGlobalKey {
var chainIds: seq[tChainId];
var targetChain: tChainId;
var replicaChain: tReplicaChain;
chainIds = keys(replicaChains);
targetChain = chainIds[chunkId % sizeof(chainIds)];
replicaChain = replicaChains[targetChain];
return (vChainId = replicaChain.vChainId, chunkId = chunkId);
}
fun processRoutingInfo(routingInfo: tRoutingInfo) {
var newRoutingVer: tRoutingVer;
var newReplicaChains: tReplicaChainMap;
var replicaChain: tReplicaChain;
var targetId: tTargetId;
var chainId: tChainId;
var nodeId: tNodeId;
var services: seq[StorageService];
newRoutingVer = routingInfo.routingVer;
newReplicaChains = routingInfo.replicaChains;
if (routingVer > newRoutingVer) {
print format("{0}: error: routingVer {1} > newRoutingVer {2}", this, routingVer, newRoutingVer);
return;
} else if (routingVer == newRoutingVer) {
print format("{0}: ignore: routingVer {1} == newRoutingVer {2}", this, routingVer, newRoutingVer);
return;
}
print format("{0}: updating replica chains from version {1} to {2}", this, routingVer, newRoutingVer);
routingVer = newRoutingVer;
foreach (chainId in keys(replicaChains)) {
if (!(chainId in newReplicaChains))
replicaChains -= (chainId);
}
foreach (chainId in keys(newReplicaChains)) {
replicaChains[chainId] = newReplicaChains[chainId];
replicaChain = replicaChains[chainId];
print format("{0}: new replica chain {1}, targets: {2}, services: {3}",
this, newReplicaChains[chainId].vChainId, newReplicaChains[chainId].targets, newReplicaChains[chainId].services);
}
}
// fun onSendHeartbeatEvent(heartbeatConns: tHeartbeatConns) {
// send heartbeatConns.mgmtService, eRegisterClientMsg, (from = heartbeatConns.mgmtClient, nodeId = clientId, storageClient = this);
// }
fun chooseServingTarget(replicaChain: tReplicaChain): tTargetId {
var targetId: tTargetId;
var servingTargetIds: set[tTargetId];
targetId = replicaChain.targets[0];
if (replicaChain.states[targetId] == PublicTargetState_SERVING) {
return targetId;
}
return 0;
}
fun sendWriteReq(writeReq: tWriteReq) {
var replicaChain: tReplicaChain;
var targetId: tTargetId;
var targetService: StorageService;
// get the latest chain and update versioned chain id
replicaChain = replicaChains[writeReq.key.vChainId.chainId];
writeReq.key.vChainId = replicaChain.vChainId;
targetId = chooseServingTarget(replicaChain);
if (targetId > 0) {
print format("{0}: send write request #{1}: {2}", this, writeReq.retries, writeReq);
targetService = replicaChain.services[targetId];
send targetService, eWriteReq, writeReq;
}
}
fun reissueWriteReq(reqTag: tMessageTag) {
inflightWriteReqs[reqTag].retries = inflightWriteReqs[reqTag].retries + 1;
sendWriteReq(inflightWriteReqs[reqTag]);
}
fun sendReadReq(readReq: tReadReq) {
var replicaChain: tReplicaChain;
var targetId: tTargetId;
var targetService: StorageService;
// get the latest chain and update versioned chain id
replicaChain = replicaChains[readReq.key.vChainId.chainId];
readReq.key.vChainId = replicaChain.vChainId;
targetId = chooseServingTarget(replicaChain);
if (targetId > 0) {
print format("{0}: send read request #{1}: {2}", this, readReq.retries, readReq);
targetService = replicaChain.services[targetId];
send targetService, eReadReq, readReq;
}
}
fun reissueReadReq(reqTag: tMessageTag) {
sendReadReq(inflightReadReqs[reqTag]);
inflightReadReqs[reqTag].retries = inflightReadReqs[reqTag].retries + 1;
}
fun processInflightWriteReqs() {
var oldChainId: tVersionedChainId;
var newChainId: tVersionedChainId;
var writeReq: tWriteReq;
foreach (writeReq in values(inflightWriteReqs)) {
oldChainId = writeReq.key.vChainId;
newChainId = replicaChains[oldChainId.chainId].vChainId;
if (oldChainId != newChainId) {
print format("{0}: chain version updated: {1} --> {2}, reissuing request {3}", this, oldChainId, newChainId, writeReq);
reissueWriteReq(writeReq.tag);
}
}
}
fun processInflightReadReqs() {
var oldChainId: tVersionedChainId;
var newChainId: tVersionedChainId;
var readReq: tReadReq;
foreach (readReq in values(inflightReadReqs)) {
oldChainId = readReq.key.vChainId;
newChainId = replicaChains[oldChainId.chainId].vChainId;
if (oldChainId != newChainId) {
print format("{0}: chain version updated: {1} --> {2}, reissuing request {3}", this, oldChainId, newChainId, readReq);
reissueReadReq(readReq.tag);
}
}
}
start state Init {
ignore eSendHeartbeat;
entry (args: (clientId: tNodeId, mgmtService: MgmtService)) {
clientId = args.clientId;
mgmtService = args.mgmtService;
mgmtClient = new MgmtClient((nodeId = clientId, clientHost = this, mgmtService = mgmtService, sendHeartbeats = false));
// timer = new Timer(this);
}
on eWaitConnected do (user: machine) {
clientUsers += (user);
}
// on eSendHeartbeat do onSendHeartbeatEvent;
on eNewRoutingInfo goto WaitForReqs with processRoutingInfo;
}
state WaitForReqs {
ignore eSendHeartbeat;
entry {
var user: machine;
foreach (user in clientUsers) {
send user, eClientConnected;
}
// StartTimer(timer);
}
on eWaitConnected do (user: machine) {
send user, eClientConnected;
}
on eShutDown goto Stopped with (from: machine) {
print format("{0} is going to shutdown", this);
send mgmtClient, eShutDown, this;
}
// on eSendHeartbeat do onSendHeartbeatEvent;
on eNewRoutingInfo do (routingInfo: tRoutingInfo) {
processRoutingInfo(routingInfo);
processInflightWriteReqs();
processInflightReadReqs();
}
// on eTimeOut do {
// processInflightWriteReqs();
// processInflightReadReqs();
// StartTimer(timer);
// }
on eSubmitWrite do (writeArgs: tWriteArgs) {
var writeReq: tWriteReq;
writeReq = (from = this,
retries = 1,
tag = newMessageTag(),
key = calcGlobalKeyFromChunkId(writeArgs.chunkId),
updateVer = 0,
commitChainVer = 0,
fullChunkReplace = false,
removeChunk = writeArgs.dataBytes == default(tBytes),
fromClient = true,
offset = writeArgs.offset, length = writeArgs.length,
dataBytes = writeArgs.dataBytes);
sendWriteReq(writeReq);
submittedWrites += (writeReq.tag, writeArgs);
inflightWriteReqs += (writeReq.tag, writeReq);
}
on eSubmitRead do (readArgs: tReadArgs) {
var readReq: tReadReq;
readReq = (from = this,
retries = 1,
tag = newMessageTag(),
key = calcGlobalKeyFromChunkId(readArgs.chunkId),
offset = readArgs.offset, length = readArgs.length);
sendReadReq(readReq);
submittedReads += (readReq.tag, readArgs);
inflightReadReqs += (readReq.tag, readReq);
}
on eWriteResp do (writeResp: tWriteResp) {
if (!(writeResp.tag in inflightWriteReqs)) {
print format("{0}: got response for completed write request: {1}", this, writeResp.key);
return;
}
if (writeResp.status == ErrorCode_CHAIN_VERION_MISMATCH) {
print format("{0}: retry write request: {1}", this, writeResp.key);
reissueWriteReq(writeResp.tag);
return;
}
print format("{0}: write response {1}", this, writeResp);
send submittedWrites[writeResp.tag].from, eWriteComplete, (status = writeResp.status, chunkId = writeResp.key.chunkId, commitVer = writeResp.commitVer);
submittedWrites -= (writeResp.tag);
inflightWriteReqs -= (writeResp.tag);
}
on eReadResp do (readResp: tReadResp) {
if (!(readResp.tag in inflightReadReqs)) {
return;
}
if ((readResp.status == ErrorCode_CHAIN_VERION_MISMATCH) || (readResp.status == ErrorCode_CHUNK_NOT_COMMIT)) {
print format("{0}: retry read request: {1}", this, readResp.key);
reissueReadReq(readResp.tag);
return;
}
print format("{0}: read response {1}", this, readResp);
send submittedReads[readResp.tag].from, eReadComplete, (status = readResp.status, chunkId = readResp.key.chunkId,
chunkMetadata = readResp.chunkMetadata, dataBytes = readResp.dataBytes);
submittedReads -= (readResp.tag);
inflightReadReqs -= (readResp.tag);
}
}
state Stopped {
ignore eReadResp, eWriteResp, eSendHeartbeat, eNewRoutingInfo, eTimeOut;
entry {
// CancelTimer(timer);
print format("{0} stopped", this);
}
}
}