Skip to content

Commit

Permalink
[FLINK-14189][runtime] Do not store dynamic slots by index in TaskSlo…
Browse files Browse the repository at this point in the history
…tTable
  • Loading branch information
azagrebin committed Dec 8, 2019
1 parent 516aee3 commit db3dec5
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public int hashCode() {

@Override
public String toString() {
return resourceId + "_" + slotNumber;
return resourceId + "_" + (slotNumber >= 0 ? slotNumber : "dynamic");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public class SlotOffer implements Serializable {
private final ResourceProfile resourceProfile;

public SlotOffer(final AllocationID allocationID, final int index, final ResourceProfile resourceProfile) {
Preconditions.checkArgument(0 <= index, "The index must be greater than 0.");
this.allocationId = Preconditions.checkNotNull(allocationID);
this.slotIndex = index;
this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public TaskSlot(
final int memoryPageSize,
final JobID jobId,
final AllocationID allocationId) {
Preconditions.checkArgument(0 <= index, "The index must be greater than 0.");

this.index = index;
this.resourceProfile = Preconditions.checkNotNull(resourceProfile);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
private final Map<Integer, TaskSlot> taskSlots;

/** Mapping from allocation id to task slot. */
private final Map<AllocationID, TaskSlot> allocationIDTaskSlotMap;
private final Map<AllocationID, TaskSlot> allocatedSlots;

/** Mapping from execution attempt id to task and task slot. */
private final Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings;
Expand All @@ -96,9 +96,6 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
/** Whether the table has been started. */
private volatile boolean started;

/** Index of next allocated slot, for dynamic slot allocation. */
private int nextSlotIndex;

private final ResourceBudgetManager budgetManager;

public TaskSlotTable(
Expand All @@ -120,15 +117,14 @@ public TaskSlotTable(

budgetManager = new ResourceBudgetManager(Preconditions.checkNotNull(totalAvailableResourceProfile));

allocationIDTaskSlotMap = new HashMap<>(numberSlots);
allocatedSlots = new HashMap<>(numberSlots);

taskSlotMappings = new HashMap<>(4 * numberSlots);

slotsPerJob = new HashMap<>(4);

slotActions = null;
started = false;
nextSlotIndex = numberSlots;
}

/**
Expand All @@ -150,14 +146,22 @@ public void start(SlotActions initialSlotActions) {
public void stop() {
started = false;
timerService.stop();
allocatedSlots
.values()
.stream()
.filter(slot -> !taskSlots.containsKey(slot.getIndex()))
.forEach(TaskSlot::close);
allocatedSlots.clear();
taskSlots.values().forEach(TaskSlot::close);
taskSlots.clear();
slotActions = null;
}

@VisibleForTesting
public boolean isStopped() {
return !started && taskSlots.values().stream().allMatch(slot -> slot.getMemoryManager().isShutdown());
return !started &&
taskSlots.values().stream().allMatch(slot -> slot.getMemoryManager().isShutdown()) &&
allocatedSlots.values().stream().allMatch(slot -> slot.getMemoryManager().isShutdown());
}

/**
Expand Down Expand Up @@ -205,9 +209,9 @@ public SlotReport createSlotReport(ResourceID resourceId) {
slotStatuses.add(slotStatus);
}

for (TaskSlot taskSlot : taskSlots.values()) {
if (taskSlot.getIndex() >= numberSlots) {
SlotID slotID = new SlotID(resourceId, taskSlot.getIndex());
for (TaskSlot taskSlot : allocatedSlots.values()) {
if (taskSlot.getIndex() < 0) {
SlotID slotID = SlotID.generateDynamicSlotID(resourceId);
SlotStatus slotStatus = new SlotStatus(
slotID,
taskSlot.getResourceProfile(),
Expand Down Expand Up @@ -259,16 +263,12 @@ public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, R

Preconditions.checkArgument(index < numberSlots);

TaskSlot taskSlot = allocationIDTaskSlotMap.get(allocationId);
TaskSlot taskSlot = allocatedSlots.get(allocationId);
if (taskSlot != null) {
LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot);
return false;
}

if (index >= 0) {
resourceProfile = defaultSlotResourceProfile;
}

if (taskSlots.containsKey(index)) {
TaskSlot duplicatedTaskSlot = taskSlots.get(index);
LOG.info("Slot with index {} already exist, with resource profile {}, job id {} and allocation id {}.",
Expand All @@ -278,11 +278,11 @@ public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, R
duplicatedTaskSlot.getAllocationId());
return duplicatedTaskSlot.getJobId().equals(jobId) &&
duplicatedTaskSlot.getAllocationId().equals(allocationId);
} else if (allocatedSlots.containsKey(allocationId)) {
return true;
}

if (index < 0) {
index = nextSlotIndex++;
}
resourceProfile = index >= 0 ? defaultSlotResourceProfile : resourceProfile;

if (!budgetManager.reserve(resourceProfile)) {
LOG.info("Cannot allocate the requested resources. Trying to allocate {}, "
Expand All @@ -294,10 +294,12 @@ public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, R
}

taskSlot = new TaskSlot(index, resourceProfile, memoryPageSize, jobId, allocationId);
taskSlots.put(index, taskSlot);
if (index >= 0) {
taskSlots.put(index, taskSlot);
}

// update the allocation id to task slot map
allocationIDTaskSlotMap.put(allocationId, taskSlot);
allocatedSlots.put(allocationId, taskSlot);

// register a timeout for this slot since it's in state allocated
timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
Expand Down Expand Up @@ -411,7 +413,7 @@ public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFo

if (taskSlot.isEmpty()) {
// remove the allocation id to task slot mapping
allocationIDTaskSlotMap.remove(allocationId);
allocatedSlots.remove(allocationId);

// unregister a potential timeout
timerService.unregisterTimeout(allocationId);
Expand Down Expand Up @@ -475,8 +477,13 @@ public boolean isValidTimeout(AllocationID allocationId, UUID ticket) {
*/
public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
TaskSlot taskSlot = taskSlots.get(index);

return taskSlot != null && taskSlot.isAllocated(jobId, allocationId);
if (taskSlot != null) {
return taskSlot.isAllocated(jobId, allocationId);
} else if (index < 0) {
return allocatedSlots.containsKey(allocationId);
} else {
return false;
}
}

/**
Expand Down Expand Up @@ -694,7 +701,7 @@ public void notifyTimeout(AllocationID key, UUID ticket) {
private TaskSlot getTaskSlot(AllocationID allocationId) {
Preconditions.checkNotNull(allocationId);

return allocationIDTaskSlotMap.get(allocationId);
return allocatedSlots.get(allocationId);
}

private void checkInit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1976,7 +1976,7 @@ public void testDynamicSlotAllocation() throws Exception {
assertThat(slotReport, containsInAnyOrder(
new SlotStatus(new SlotID(resourceId, 0), DEFAULT_RESOURCE_PROFILE),
new SlotStatus(new SlotID(resourceId, 1), DEFAULT_RESOURCE_PROFILE),
new SlotStatus(new SlotID(resourceId, 2), resourceProfile, jobId, allocationId)));
new SlotStatus(SlotID.generateDynamicSlotID(resourceId), resourceProfile, jobId, allocationId)));
} finally {
RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public void testFreeSlot() throws SlotNotFoundException {
assertThat(allocatedSlots.next().getIndex(), is(0));
assertThat(allocatedSlots.hasNext(), is(false));
assertThat(taskSlotTable.isAllocated(1, jobId, allocationId1), is(false));
assertThat(taskSlotTable.isAllocated(1, jobId, allocationId2), is(false));
assertThat(taskSlotTable.isSlotFree(1), is(true));
} finally {
taskSlotTable.stop();
Expand All @@ -155,9 +156,9 @@ public void testSlotAllocationWithDynamicSlotId() {
assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId, SLOT_TIMEOUT), is(true));

Iterator<TaskSlot> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
assertThat(allocatedSlots.next().getIndex(), is(2));
assertThat(allocatedSlots.next().getIndex(), is(-1));
assertThat(allocatedSlots.hasNext(), is(false));
assertThat(taskSlotTable.isAllocated(2, jobId, allocationId), is(true));
assertThat(taskSlotTable.isAllocated(-1, jobId, allocationId), is(true));
} finally {
taskSlotTable.stop();
}
Expand All @@ -179,7 +180,7 @@ public void testSlotAllocationWithResourceProfile() {

Iterator<TaskSlot> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
TaskSlot allocatedSlot = allocatedSlots.next();
assertThat(allocatedSlot.getIndex(), is(2));
assertThat(allocatedSlot.getIndex(), is(-1));
assertThat(allocatedSlot.getResourceProfile(), is(resourceProfile));
assertThat(allocatedSlots.hasNext(), is(false));
} finally {
Expand Down Expand Up @@ -224,7 +225,7 @@ public void testGenerateSlotReport() throws SlotNotFoundException {
assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId2, SLOT_TIMEOUT), is(true)); // index 3
assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId3, SLOT_TIMEOUT), is(true)); // index 4

assertThat(taskSlotTable.freeSlot(allocationId2), is(3));
assertThat(taskSlotTable.freeSlot(allocationId2), is(-1));

ResourceID resourceId = ResourceID.generate();
SlotReport slotReport = taskSlotTable.createSlotReport(resourceId);
Expand All @@ -236,7 +237,7 @@ public void testGenerateSlotReport() throws SlotNotFoundException {
is(new SlotStatus(new SlotID(resourceId, 0), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, jobId, allocationId1)),
is(new SlotStatus(new SlotID(resourceId, 1), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, null, null)),
is(new SlotStatus(new SlotID(resourceId, 2), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, null, null)),
is(new SlotStatus(new SlotID(resourceId, 4), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, jobId, allocationId3))));
is(new SlotStatus(SlotID.generateDynamicSlotID(resourceId), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, jobId, allocationId3))));
} finally {
taskSlotTable.stop();
}
Expand Down

0 comments on commit db3dec5

Please sign in to comment.