Skip to content

Commit

Permalink
[FLINK-25192][checkpointing] Implement no-claim mode support
Browse files Browse the repository at this point in the history
We introduce NO_CLAIM restore mode in which we do not take ownership of
the restored checkpoint. We do enforce the first, successful checkpoint
after a restore to be "full" meaning it can not share any artefacts with
the initial one.

Taking a "full" snapshot needs support from state backend, therefore we
add a method supportsNoClaimMode to the Snapshottable interface, so
that state backend can add support incrementally. If a state backend
does not support forces snapshots, user can switch to either the LEGACY
mode or CLAIM mode.

This closes apache#18086
  • Loading branch information
dawidwys committed Dec 17, 2021
1 parent 50be4e6 commit 08794a6
Show file tree
Hide file tree
Showing 49 changed files with 796 additions and 125 deletions.
2 changes: 1 addition & 1 deletion docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@
},
"restoreMode" : {
"type" : "string",
"enum" : [ "CLAIM", "LEGACY" ]
"enum" : [ "CLAIM", "NO_CLAIM", "LEGACY" ]
},
"savepointPath" : {
"type" : "string"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
<tbody>
<tr>
<td><h5>execution.savepoint-restore-mode</h5></td>
<td style="word-wrap: break-word;">LEGACY</td>
<td style="word-wrap: break-word;">NO_CLAIM</td>
<td><p>Enum</p></td>
<td>Describes the mode how Flink should restore from the given savepoint or retained checkpoint.<br /><br />Possible values:<ul><li>"CLAIM": Flink will take ownership of the given snapshot. It will clean the snapshot once it is subsumed by newer ones.</li><li>"LEGACY": Flink will not claim ownership of the snapshot and will not delete the files. However, it can directly depend on the existence of the files of the restored checkpoint. It might not be safe to delete checkpoints that were restored in legacy mode </li></ul></td>
<td>Describes the mode how Flink should restore from the given savepoint or retained checkpoint.<br /><br />Possible values:<ul><li>"CLAIM": Flink will take ownership of the given snapshot. It will clean the snapshot once it is subsumed by newer ones.</li><li>"NO_CLAIM": Flink will not claim ownership of the snapshot files. However it will make sure it does not depend on any artefacts from the restored snapshot. In order to do that, Flink will take the first checkpoint as a full one, which means it might reupload/duplicate files that are part of the restored checkpoint.</li><li>"LEGACY": This is the mode in which Flink worked so far. It will not claim ownership of the snapshot and will not delete the files. However, it can directly depend on the existence of the files of the restored checkpoint. It might not be safe to delete checkpoints that were restored in legacy mode </li></ul></td>
</tr>
<tr>
<td><h5>execution.savepoint.ignore-unclaimed-state</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,10 @@ public class CliFrontendParser {
true,
"Defines how should we restore from the given savepoint. Supported options: "
+ "[claim - claim ownership of the savepoint and delete once it is"
+ " subsumed, legacy (default) - do not assume ownership of the"
+ " savepoint files.");
+ " subsumed, no_claim (default) - do not claim ownership, the first"
+ " checkpoint will not reuse any files from the restored one, legacy "
+ "- the old behaviour, do not assume ownership of the savepoint files,"
+ " but can reuse some shared files.");

static final Option SAVEPOINT_DISPOSE_OPTION =
new Option("d", "dispose", true, "Path of savepoint to dispose.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,26 @@ public void testLegacyRestoreModeParsing() throws Exception {
assertTrue(savepointSettings.allowNonRestoredState());
}

@Test
public void testNoClaimRestoreModeParsing() throws Exception {
// test configure savepoint with claim mode
String[] parameters = {
"-s", "expectedSavepointPath", "-n", "-restoreMode", "no_claim", getTestJarPath()
};

CommandLine commandLine =
CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, parameters, true);
ProgramOptions programOptions = ProgramOptions.create(commandLine);
ExecutionConfigAccessor executionOptions =
ExecutionConfigAccessor.fromProgramOptions(programOptions, Collections.emptyList());

SavepointRestoreSettings savepointSettings = executionOptions.getSavepointRestoreSettings();
assertTrue(savepointSettings.restoreSavepoint());
assertEquals(RestoreMode.NO_CLAIM, savepointSettings.getRestoreMode());
assertEquals("expectedSavepointPath", savepointSettings.getRestorePath());
assertTrue(savepointSettings.allowNonRestoredState());
}

@Test(expected = CliArgsException.class)
public void testUnrecognizedOption() throws Exception {
// test unrecognized option
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,9 @@ public OperatorStateBackend createOperatorStateBackend(
return backend.createOperatorStateBackend(
env, operatorIdentifier, stateHandles, cancelStreamRegistry);
}

@Override
public boolean supportsNoClaimRestoreMode() {
return backend.supportsNoClaimRestoreMode();
}
}
4 changes: 2 additions & 2 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@
},
"restoreMode" : {
"type" : "string",
"enum" : [ "CLAIM", "LEGACY" ]
"enum" : [ "CLAIM", "NO_CLAIM", "LEGACY" ]
}
}
},
Expand Down Expand Up @@ -3378,4 +3378,4 @@
}
}
} ]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public class CheckpointCoordinator {
private final ExecutionAttemptMappingProvider attemptMappingProvider;

private boolean baseLocationsForCheckpointInitialized = false;
private boolean forceFullSnapshot;

// --------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -684,9 +685,16 @@ private CompletableFuture<Void> triggerTasks(
// no exception, no discarding, everything is OK
final long checkpointId = checkpoint.getCheckpointID();

final CheckpointType type;
if (this.forceFullSnapshot && !request.props.isSavepoint()) {
type = CheckpointType.FULL_CHECKPOINT;
} else {
type = request.props.getCheckpointType();
}

final CheckpointOptions checkpointOptions =
CheckpointOptions.forConfig(
request.props.getCheckpointType(),
type,
checkpoint.getCheckpointStorageLocation().getLocationReference(),
isExactlyOnceMode,
unalignedCheckpointsEnabled,
Expand Down Expand Up @@ -1333,8 +1341,13 @@ private CompletedCheckpoint addCompletedCheckpointToStoreAndSubsumeOldest(
List<ExecutionVertex> tasksToAbort)
throws CheckpointException {
try {
return completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
completedCheckpoint, checkpointsCleaner, this::scheduleTriggerRequest);
final CompletedCheckpoint subsumedCheckpoint =
completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
completedCheckpoint, checkpointsCleaner, this::scheduleTriggerRequest);
// reset the force full snapshot flag, we should've completed at least one full
// snapshot by now
this.forceFullSnapshot = false;
return subsumedCheckpoint;
} catch (Exception exception) {
if (exception instanceof PossibleInconsistentStateException) {
LOG.warn(
Expand Down Expand Up @@ -1591,6 +1604,8 @@ private OptionalLong restoreLatestCheckpointedStateInternal(

LOG.info("Restoring job {} from {}.", job, latest);

this.forceFullSnapshot = latest.getProperties().isUnclaimed();

// re-assign the task states
final Map<OperatorID, OperatorState> operatorStates = extractOperatorStates(latest);

Expand Down Expand Up @@ -1695,6 +1710,9 @@ public boolean restoreSavepoint(
case LEGACY:
checkpointProperties = CheckpointProperties.forSavepoint(false);
break;
case NO_CLAIM:
checkpointProperties = CheckpointProperties.forUnclaimedSnapshot();
break;
default:
throw new IllegalArgumentException("Unknown snapshot restore mode");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,30 +73,29 @@ public static CheckpointOptions alignedNoTimeout(
type, location, AlignmentType.ALIGNED, NO_ALIGNED_CHECKPOINT_TIME_OUT);
}

public static CheckpointOptions unaligned(CheckpointStorageLocationReference location) {
public static CheckpointOptions unaligned(
CheckpointType type, CheckpointStorageLocationReference location) {
checkArgument(!type.isSavepoint(), "Savepoints can not be unaligned");
return new CheckpointOptions(
CheckpointType.CHECKPOINT,
location,
AlignmentType.UNALIGNED,
NO_ALIGNED_CHECKPOINT_TIME_OUT);
type, location, AlignmentType.UNALIGNED, NO_ALIGNED_CHECKPOINT_TIME_OUT);
}

public static CheckpointOptions alignedWithTimeout(
CheckpointStorageLocationReference location, long alignedCheckpointTimeout) {
CheckpointType type,
CheckpointStorageLocationReference location,
long alignedCheckpointTimeout) {
checkArgument(!type.isSavepoint(), "Savepoints can not be unaligned");
return new CheckpointOptions(
CheckpointType.CHECKPOINT,
location,
AlignmentType.ALIGNED,
alignedCheckpointTimeout);
type, location, AlignmentType.ALIGNED, alignedCheckpointTimeout);
}

private static CheckpointOptions forceAligned(
CheckpointStorageLocationReference location, long alignedCheckpointTimeout) {
CheckpointType type,
CheckpointStorageLocationReference location,
long alignedCheckpointTimeout) {
checkArgument(!type.isSavepoint(), "Savepoints can not be unaligned");
return new CheckpointOptions(
CheckpointType.CHECKPOINT,
location,
AlignmentType.FORCED_ALIGNED,
alignedCheckpointTimeout);
type, location, AlignmentType.FORCED_ALIGNED, alignedCheckpointTimeout);
}

public static CheckpointOptions forConfig(
Expand All @@ -113,9 +112,9 @@ public static CheckpointOptions forConfig(
return alignedNoTimeout(checkpointType, locationReference);
} else if (alignedCheckpointTimeout == 0
|| alignedCheckpointTimeout == NO_ALIGNED_CHECKPOINT_TIME_OUT) {
return unaligned(locationReference);
return unaligned(checkpointType, locationReference);
} else {
return alignedWithTimeout(locationReference, alignedCheckpointTimeout);
return alignedWithTimeout(checkpointType, locationReference, alignedCheckpointTimeout);
}
}

Expand Down Expand Up @@ -190,15 +189,15 @@ public boolean isUnalignedCheckpoint() {
public CheckpointOptions withUnalignedSupported() {
if (alignmentType == AlignmentType.FORCED_ALIGNED) {
return alignedCheckpointTimeout != NO_ALIGNED_CHECKPOINT_TIME_OUT
? alignedWithTimeout(targetLocation, alignedCheckpointTimeout)
: unaligned(targetLocation);
? alignedWithTimeout(checkpointType, targetLocation, alignedCheckpointTimeout)
: unaligned(checkpointType, targetLocation);
}
return this;
}

public CheckpointOptions withUnalignedUnsupported() {
if (isUnalignedCheckpoint() || isTimeoutable()) {
return forceAligned(targetLocation, alignedCheckpointTimeout);
return forceAligned(checkpointType, targetLocation, alignedCheckpointTimeout);
}
return this;
}
Expand Down Expand Up @@ -228,18 +227,17 @@ public boolean equals(Object obj) {

@Override
public String toString() {
return "CheckpointOptions {"
+ "checkpointType = "
return "CheckpointOptions{"
+ "checkpointType="
+ checkpointType
+ ", targetLocation = "
+ ", targetLocation="
+ targetLocation
+ ", alignment = "
+ ", alignmentType="
+ alignmentType
+ ", alignedCheckpointTimeout = "
+ ", alignedCheckpointTimeout="
+ alignedCheckpointTimeout
+ "}";
+ '}';
}

// ------------------------------------------------------------------------
// Factory methods
// ------------------------------------------------------------------------
Expand All @@ -255,6 +253,6 @@ public static CheckpointOptions forCheckpointWithDefaultLocation() {

public CheckpointOptions toUnaligned() {
checkState(alignmentType == AlignmentType.ALIGNED);
return unaligned(targetLocation);
return unaligned(checkpointType, targetLocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.jobgraph.RestoreMode;

import java.io.Serializable;

Expand Down Expand Up @@ -53,6 +54,8 @@ public class CheckpointProperties implements Serializable {
private final boolean discardFailed;
private final boolean discardSuspended;

private final boolean unclaimed;

@VisibleForTesting
CheckpointProperties(
boolean forced,
Expand All @@ -61,7 +64,8 @@ public class CheckpointProperties implements Serializable {
boolean discardFinished,
boolean discardCancelled,
boolean discardFailed,
boolean discardSuspended) {
boolean discardSuspended,
boolean unclaimed) {

this.forced = forced;
this.checkpointType = checkNotNull(checkpointType);
Expand All @@ -70,6 +74,7 @@ public class CheckpointProperties implements Serializable {
this.discardCancelled = discardCancelled;
this.discardFailed = discardFailed;
this.discardSuspended = discardSuspended;
this.unclaimed = unclaimed;
}

// ------------------------------------------------------------------------
Expand All @@ -89,6 +94,11 @@ boolean forceCheckpoint() {
return forced;
}

/** Returns whether the checkpoint should be restored in a {@link RestoreMode#NO_CLAIM} mode. */
public boolean isUnclaimed() {
return unclaimed;
}

// ------------------------------------------------------------------------
// Garbage collection behaviour
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -240,11 +250,11 @@ public String toString() {

private static final CheckpointProperties SAVEPOINT =
new CheckpointProperties(
true, CheckpointType.SAVEPOINT, false, false, false, false, false);
true, CheckpointType.SAVEPOINT, false, false, false, false, false, false);

private static final CheckpointProperties SAVEPOINT_NO_FORCE =
new CheckpointProperties(
false, CheckpointType.SAVEPOINT, false, false, false, false, false);
false, CheckpointType.SAVEPOINT, false, false, false, false, false, false);

private static final CheckpointProperties CHECKPOINT_NEVER_RETAINED =
new CheckpointProperties(
Expand All @@ -254,7 +264,8 @@ public String toString() {
true, // Delete on success
true, // Delete on cancellation
true, // Delete on failure
true); // Delete on suspension
true, // Delete on suspension
false);

private static final CheckpointProperties CHECKPOINT_RETAINED_ON_FAILURE =
new CheckpointProperties(
Expand All @@ -264,7 +275,8 @@ public String toString() {
true, // Delete on success
true, // Delete on cancellation
false, // Retain on failure
true); // Delete on suspension
true, // Delete on suspension
false);

private static final CheckpointProperties CHECKPOINT_RETAINED_ON_CANCELLATION =
new CheckpointProperties(
Expand All @@ -274,7 +286,8 @@ public String toString() {
true, // Delete on success
false, // Retain on cancellation
false, // Retain on failure
false); // Retain on suspension
false, // Retain on suspension
false);

/**
* Creates the checkpoint properties for a (manually triggered) savepoint.
Expand All @@ -288,6 +301,25 @@ public static CheckpointProperties forSavepoint(boolean forced) {
return forced ? SAVEPOINT : SAVEPOINT_NO_FORCE;
}

/**
* Creates the checkpoint properties for a snapshot restored in {@link RestoreMode#NO_CLAIM}.
* Those properties should not be used when triggering a checkpoint/savepoint. They're useful
* when restoring a {@link CompletedCheckpointStore} after a JM failover.
*
* @return Checkpoint properties for a snapshot restored in {@link RestoreMode#NO_CLAIM}.
*/
public static CheckpointProperties forUnclaimedSnapshot() {
return new CheckpointProperties(
false,
CheckpointType.SAVEPOINT, // unclaimed snapshot is similar to a savepoint
false,
false,
false,
false,
false,
true);
}

public static CheckpointProperties forSyncSavepoint(boolean forced, boolean terminate) {
return new CheckpointProperties(
forced,
Expand All @@ -296,6 +328,7 @@ public static CheckpointProperties forSyncSavepoint(boolean forced, boolean term
false,
false,
false,
false,
false);
}

Expand Down
Loading

0 comments on commit 08794a6

Please sign in to comment.