Skip to content

Commit

Permalink
HDDS-3685. Remove replay logic from actual request logic. (#1082)
Browse files Browse the repository at this point in the history
  • Loading branch information
bharatviswa504 committed Jul 14, 2020
1 parent 92926ae commit 90e8211
Show file tree
Hide file tree
Showing 83 changed files with 282 additions and 1,947 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,5 @@ public enum ResultCodes {
DIRECTORY_ALREADY_EXISTS,

INVALID_VOLUME_NAME,

REPLAY // When ratis logs are replayed.
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,6 @@ enum Status {
DIRECTORY_ALREADY_EXISTS = 60;

INVALID_VOLUME_NAME = 61;

// When transactions are replayed
REPLAY = 100;
}

/**
Expand Down
4 changes: 0 additions & 4 deletions hadoop-ozone/interface-client/src/main/proto/proto.lock
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,6 @@
{
"name": "INVALID_VOLUME_NAME",
"integer": 61
},
{
"name": "REPLAY",
"integer": 100
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ServiceException;
import java.io.IOException;
Expand Down Expand Up @@ -64,7 +63,6 @@

import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.INTERNAL_ERROR;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.METADATA_ERROR;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.REPLAY;

/**
* The OM StateMachine is the state machine for OM Ratis server. It is
Expand Down Expand Up @@ -258,12 +256,6 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
terminate(omResponse, OMException.ResultCodes.INTERNAL_ERROR);
} else if (omResponse.getStatus() == METADATA_ERROR) {
terminate(omResponse, OMException.ResultCodes.METADATA_ERROR);
} else if (omResponse.getStatus() == REPLAY) {
// For replay we do not add response to double buffer, so update
// LastAppliedIndex for the replay transactions here.
computeAndUpdateLastAppliedIndex(trxLogIndex,
trx.getLogEntry().getTerm(), Lists.newArrayList(trxLogIndex),
true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.WithObjectID;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
Expand All @@ -54,7 +53,6 @@

import javax.annotation.Nonnull;

import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.REPLAY;

/**
* OMClientRequest provides methods which every write OM request should
Expand All @@ -71,8 +69,6 @@ public abstract class OMClientRequest implements RequestAuditor {
public enum Result {
SUCCESS, // The request was executed successfully

REPLAY, // The request is a replay and was ignored

FAILURE // The request failed and exception was thrown
}

Expand Down Expand Up @@ -256,7 +252,6 @@ protected OMResponse createOperationKeysErrorOMResponse(

/**
* Add the client response to double buffer and set the flush future.
* For responses which has status set to REPLAY it is a no-op.
* @param trxIndex
* @param omClientResponse
* @param omDoubleBufferHelper
Expand All @@ -265,13 +260,8 @@ protected void addResponseToDoubleBuffer(long trxIndex,
OMClientResponse omClientResponse,
OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
if (omClientResponse != null) {
// For replay transaction we do not need to add to double buffer, as
// for these transactions there is nothing needs to be done for
// addDBToBatch.
if (omClientResponse.getOMResponse().getStatus() != REPLAY) {
omClientResponse.setFlushFuture(
omDoubleBufferHelper.add(omClientResponse, trxIndex));
}
omClientResponse.setFlushFuture(
omDoubleBufferHelper.add(omClientResponse, trxIndex));
}
}

Expand Down Expand Up @@ -313,29 +303,4 @@ public Map<String, String> buildVolumeAuditMap(String volume) {
auditMap.put(OzoneConsts.VOLUME, volume);
return auditMap;
}

/**
* Check if the transaction is a replay.
* @param ozoneObj OMVolumeArgs or OMBucketInfo or OMKeyInfo object whose
* updateID needs to be compared with
* @param transactionID the current transaction ID
* @return true if transactionID is less than or equal to updateID, false
* otherwise.
*/
protected boolean isReplay(OzoneManager om, WithObjectID ozoneObj,
long transactionID) {
return om.isRatisEnabled() && ozoneObj.isUpdateIDset() &&
transactionID <= ozoneObj.getUpdateID();
}

/**
* Return a dummy OMClientResponse for when the transactions are replayed.
*/
protected OMResponse createReplayOMResponse(
@Nonnull OMResponse.Builder omResponse) {

omResponse.setSuccess(false);
omResponse.setStatus(REPLAY);
return omResponse.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;

import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_ALREADY_EXISTS;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
Expand Down Expand Up @@ -167,27 +169,13 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
//Check if the volume exists
if (omVolumeArgs == null) {
LOG.debug("volume: {} not found ", volumeName);
throw new OMException("Volume doesn't exist",
OMException.ResultCodes.VOLUME_NOT_FOUND);
throw new OMException("Volume doesn't exist", VOLUME_NOT_FOUND);
}

//Check if bucket already exists
OmBucketInfo dbBucketInfo = metadataManager.getBucketTable()
.getReadCopy(bucketKey);
if (dbBucketInfo != null) {
// Check if this transaction is a replay of ratis logs.
if (isReplay(ozoneManager, dbBucketInfo, transactionLogIndex)) {
// Replay implies the response has already been returned to
// the client. So take no further action and return a dummy
// OMClientResponse.
LOG.debug("Replayed Transaction {} ignored. Request: {}",
transactionLogIndex, createBucketRequest);
return new OMBucketCreateResponse(createReplayOMResponse(omResponse));
} else {
LOG.debug("bucket: {} already exists ", bucketName);
throw new OMException("Bucket already exist",
OMException.ResultCodes.BUCKET_ALREADY_EXISTS);
}
if (metadataManager.getBucketTable().isExist(bucketKey)) {
LOG.debug("bucket: {} already exists ", bucketName);
throw new OMException("Bucket already exist", BUCKET_ALREADY_EXISTS);
}

// Add objectID and updateID
Expand All @@ -211,7 +199,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
} catch (IOException ex) {
exception = ex;
omClientResponse = new OMBucketCreateResponse(
createErrorOMResponse(omResponse, exception), omBucketInfo);
createErrorOMResponse(omResponse, exception));
} finally {
addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
ozoneManagerDoubleBufferHelper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.response.bucket.OMBucketDeleteResponse;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.OzoneConsts;
Expand All @@ -52,6 +51,7 @@
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;

import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;

Expand Down Expand Up @@ -102,7 +102,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
volumeName, bucketName, null);
}


// acquire lock
acquiredVolumeLock =
omMetadataManager.getLock().acquireReadLock(VOLUME_LOCK, volumeName);
Expand All @@ -111,25 +110,12 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
volumeName, bucketName);

// No need to check volume exists here, as bucket cannot be created
// with out volume creation.
//Check if bucket exists
// with out volume creation. Check if bucket exists
String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
OmBucketInfo omBucketInfo = omMetadataManager.getBucketTable()
.getReadCopy(bucketKey);
if (omBucketInfo == null) {
LOG.debug("bucket: {} not found ", bucketName);
throw new OMException("Bucket doesn't exist",
OMException.ResultCodes.BUCKET_NOT_FOUND);
}

// Check if this transaction is a replay of ratis logs.
// If this is a replay, then the response has already been returned to
// the client. So take no further action and return a dummy
// OMClientResponse.
if (isReplay(ozoneManager, omBucketInfo, transactionLogIndex)) {
LOG.debug("Replayed Transaction {} ignored. Request: {}",
transactionLogIndex, deleteBucketRequest);
return new OMBucketDeleteResponse(createReplayOMResponse(omResponse));
if (!omMetadataManager.getBucketTable().isExist(bucketKey)) {
LOG.debug("bucket: {} not found ", bucketName);
throw new OMException("Bucket already exist", BUCKET_NOT_FOUND);
}

//Check if bucket is empty
Expand All @@ -155,7 +141,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
success = false;
exception = ex;
omClientResponse = new OMBucketDeleteResponse(
createErrorOMResponse(omResponse, exception), volumeName, bucketName);
createErrorOMResponse(omResponse, exception));
} finally {
addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
ozoneManagerDoubleBufferHelper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
OMException.ResultCodes.BUCKET_NOT_FOUND);
}

// Check if this transaction is a replay of ratis logs.
// If a replay, then the response has already been returned to the
// client. So take no further action and return a dummy OMClientResponse.
if (isReplay(ozoneManager, dbBucketInfo, transactionLogIndex)) {
LOG.debug("Replayed Transaction {} ignored. Request: {}",
transactionLogIndex, setBucketPropertyRequest);
return new OMBucketSetPropertyResponse(
createReplayOMResponse(omResponse));
}

OmBucketInfo.Builder bucketInfoBuilder = OmBucketInfo.newBuilder();
bucketInfoBuilder.setVolumeName(dbBucketInfo.getVolumeName())
.setBucketName(dbBucketInfo.getBucketName())
Expand Down Expand Up @@ -190,7 +180,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
success = false;
exception = ex;
omClientResponse = new OMBucketSetPropertyResponse(
createErrorOMResponse(omResponse, exception), omBucketInfo);
createErrorOMResponse(omResponse, exception));
} finally {
addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
ozoneManagerDoubleBufferHelper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.om.response.bucket.acl.OMBucketAclResponse;
import org.apache.hadoop.ozone.util.BooleanBiFunction;
import org.apache.hadoop.ozone.om.request.util.ObjectParser;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
Expand Down Expand Up @@ -106,16 +105,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
throw new OMException(OMException.ResultCodes.BUCKET_NOT_FOUND);
}

// Check if this transaction is a replay of ratis logs.
// If this is a replay, then the response has already been returned to
// the client. So take no further action and return a dummy
// OMClientResponse.
if (isReplay(ozoneManager, omBucketInfo, transactionLogIndex)) {
LOG.debug("Replayed Transaction {} ignored. Request: {}",
transactionLogIndex, getOmRequest());
return new OMBucketAclResponse(createReplayOMResponse(omResponse));
}

operationResult = omBucketAclOp.apply(ozoneAcls, omBucketInfo);
omBucketInfo.setUpdateID(transactionLogIndex,
ozoneManager.isRatisEnabled());
Expand Down
Loading

0 comments on commit 90e8211

Please sign in to comment.