From db3dec55d545989642510f3bf8f307605e732a6d Mon Sep 17 00:00:00 2001 From: Andrey Zagrebin Date: Fri, 6 Dec 2019 22:57:39 +0100 Subject: [PATCH] [FLINK-14189][runtime] Do not store dynamic slots by index in TaskSlotTable --- .../clusterframework/types/SlotID.java | 2 +- .../runtime/taskexecutor/slot/SlotOffer.java | 1 - .../runtime/taskexecutor/slot/TaskSlot.java | 2 +- .../taskexecutor/slot/TaskSlotTable.java | 55 +++++++++++-------- .../taskexecutor/TaskExecutorTest.java | 2 +- .../taskexecutor/slot/TaskSlotTableTest.java | 11 ++-- 6 files changed, 40 insertions(+), 33 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java index fb430405a8fe0..c579c0e0774ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java @@ -87,7 +87,7 @@ public int hashCode() { @Override public String toString() { - return resourceId + "_" + slotNumber; + return resourceId + "_" + (slotNumber >= 0 ? slotNumber : "dynamic"); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java index f8d7e6ca2eea5..1f62e2823c10b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotOffer.java @@ -41,7 +41,6 @@ public class SlotOffer implements Serializable { private final ResourceProfile resourceProfile; public SlotOffer(final AllocationID allocationID, final int index, final ResourceProfile resourceProfile) { - Preconditions.checkArgument(0 <= index, "The index must be greater than 0."); this.allocationId = Preconditions.checkNotNull(allocationID); this.slotIndex = index; this.resourceProfile = Preconditions.checkNotNull(resourceProfile); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java index 9f81f1c776357..9124957134998 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java @@ -83,7 +83,7 @@ public TaskSlot( final int memoryPageSize, final JobID jobId, final AllocationID allocationId) { - Preconditions.checkArgument(0 <= index, "The index must be greater than 0."); + this.index = index; this.resourceProfile = Preconditions.checkNotNull(resourceProfile); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java index 16ed4a59bcf4e..5053040497597 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java @@ -82,7 +82,7 @@ public class TaskSlotTable implements TimeoutListener { private final Map taskSlots; /** Mapping from allocation id to task slot. */ - private final Map allocationIDTaskSlotMap; + private final Map allocatedSlots; /** Mapping from execution attempt id to task and task slot. */ private final Map taskSlotMappings; @@ -96,9 +96,6 @@ public class TaskSlotTable implements TimeoutListener { /** Whether the table has been started. */ private volatile boolean started; - /** Index of next allocated slot, for dynamic slot allocation. */ - private int nextSlotIndex; - private final ResourceBudgetManager budgetManager; public TaskSlotTable( @@ -120,7 +117,7 @@ public TaskSlotTable( budgetManager = new ResourceBudgetManager(Preconditions.checkNotNull(totalAvailableResourceProfile)); - allocationIDTaskSlotMap = new HashMap<>(numberSlots); + allocatedSlots = new HashMap<>(numberSlots); taskSlotMappings = new HashMap<>(4 * numberSlots); @@ -128,7 +125,6 @@ public TaskSlotTable( slotActions = null; started = false; - nextSlotIndex = numberSlots; } /** @@ -150,6 +146,12 @@ public void start(SlotActions initialSlotActions) { public void stop() { started = false; timerService.stop(); + allocatedSlots + .values() + .stream() + .filter(slot -> !taskSlots.containsKey(slot.getIndex())) + .forEach(TaskSlot::close); + allocatedSlots.clear(); taskSlots.values().forEach(TaskSlot::close); taskSlots.clear(); slotActions = null; @@ -157,7 +159,9 @@ public void stop() { @VisibleForTesting public boolean isStopped() { - return !started && taskSlots.values().stream().allMatch(slot -> slot.getMemoryManager().isShutdown()); + return !started && + taskSlots.values().stream().allMatch(slot -> slot.getMemoryManager().isShutdown()) && + allocatedSlots.values().stream().allMatch(slot -> slot.getMemoryManager().isShutdown()); } /** @@ -205,9 +209,9 @@ public SlotReport createSlotReport(ResourceID resourceId) { slotStatuses.add(slotStatus); } - for (TaskSlot taskSlot : taskSlots.values()) { - if (taskSlot.getIndex() >= numberSlots) { - SlotID slotID = new SlotID(resourceId, taskSlot.getIndex()); + for (TaskSlot taskSlot : allocatedSlots.values()) { + if (taskSlot.getIndex() < 0) { + SlotID slotID = SlotID.generateDynamicSlotID(resourceId); SlotStatus slotStatus = new SlotStatus( slotID, taskSlot.getResourceProfile(), @@ -259,16 +263,12 @@ public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, R Preconditions.checkArgument(index < numberSlots); - TaskSlot taskSlot = allocationIDTaskSlotMap.get(allocationId); + TaskSlot taskSlot = allocatedSlots.get(allocationId); if (taskSlot != null) { LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot); return false; } - if (index >= 0) { - resourceProfile = defaultSlotResourceProfile; - } - if (taskSlots.containsKey(index)) { TaskSlot duplicatedTaskSlot = taskSlots.get(index); LOG.info("Slot with index {} already exist, with resource profile {}, job id {} and allocation id {}.", @@ -278,11 +278,11 @@ public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, R duplicatedTaskSlot.getAllocationId()); return duplicatedTaskSlot.getJobId().equals(jobId) && duplicatedTaskSlot.getAllocationId().equals(allocationId); + } else if (allocatedSlots.containsKey(allocationId)) { + return true; } - if (index < 0) { - index = nextSlotIndex++; - } + resourceProfile = index >= 0 ? defaultSlotResourceProfile : resourceProfile; if (!budgetManager.reserve(resourceProfile)) { LOG.info("Cannot allocate the requested resources. Trying to allocate {}, " @@ -294,10 +294,12 @@ public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, R } taskSlot = new TaskSlot(index, resourceProfile, memoryPageSize, jobId, allocationId); - taskSlots.put(index, taskSlot); + if (index >= 0) { + taskSlots.put(index, taskSlot); + } // update the allocation id to task slot map - allocationIDTaskSlotMap.put(allocationId, taskSlot); + allocatedSlots.put(allocationId, taskSlot); // register a timeout for this slot since it's in state allocated timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit()); @@ -411,7 +413,7 @@ public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFo if (taskSlot.isEmpty()) { // remove the allocation id to task slot mapping - allocationIDTaskSlotMap.remove(allocationId); + allocatedSlots.remove(allocationId); // unregister a potential timeout timerService.unregisterTimeout(allocationId); @@ -475,8 +477,13 @@ public boolean isValidTimeout(AllocationID allocationId, UUID ticket) { */ public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) { TaskSlot taskSlot = taskSlots.get(index); - - return taskSlot != null && taskSlot.isAllocated(jobId, allocationId); + if (taskSlot != null) { + return taskSlot.isAllocated(jobId, allocationId); + } else if (index < 0) { + return allocatedSlots.containsKey(allocationId); + } else { + return false; + } } /** @@ -694,7 +701,7 @@ public void notifyTimeout(AllocationID key, UUID ticket) { private TaskSlot getTaskSlot(AllocationID allocationId) { Preconditions.checkNotNull(allocationId); - return allocationIDTaskSlotMap.get(allocationId); + return allocatedSlots.get(allocationId); } private void checkInit() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 6370f21c04790..3715612efa6e7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -1976,7 +1976,7 @@ public void testDynamicSlotAllocation() throws Exception { assertThat(slotReport, containsInAnyOrder( new SlotStatus(new SlotID(resourceId, 0), DEFAULT_RESOURCE_PROFILE), new SlotStatus(new SlotID(resourceId, 1), DEFAULT_RESOURCE_PROFILE), - new SlotStatus(new SlotID(resourceId, 2), resourceProfile, jobId, allocationId))); + new SlotStatus(SlotID.generateDynamicSlotID(resourceId), resourceProfile, jobId, allocationId))); } finally { RpcUtils.terminateRpcEndpoint(taskExecutor, timeout); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java index f3fd725b094b4..2655b1ee3eff4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java @@ -136,6 +136,7 @@ public void testFreeSlot() throws SlotNotFoundException { assertThat(allocatedSlots.next().getIndex(), is(0)); assertThat(allocatedSlots.hasNext(), is(false)); assertThat(taskSlotTable.isAllocated(1, jobId, allocationId1), is(false)); + assertThat(taskSlotTable.isAllocated(1, jobId, allocationId2), is(false)); assertThat(taskSlotTable.isSlotFree(1), is(true)); } finally { taskSlotTable.stop(); @@ -155,9 +156,9 @@ public void testSlotAllocationWithDynamicSlotId() { assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId, SLOT_TIMEOUT), is(true)); Iterator allocatedSlots = taskSlotTable.getAllocatedSlots(jobId); - assertThat(allocatedSlots.next().getIndex(), is(2)); + assertThat(allocatedSlots.next().getIndex(), is(-1)); assertThat(allocatedSlots.hasNext(), is(false)); - assertThat(taskSlotTable.isAllocated(2, jobId, allocationId), is(true)); + assertThat(taskSlotTable.isAllocated(-1, jobId, allocationId), is(true)); } finally { taskSlotTable.stop(); } @@ -179,7 +180,7 @@ public void testSlotAllocationWithResourceProfile() { Iterator allocatedSlots = taskSlotTable.getAllocatedSlots(jobId); TaskSlot allocatedSlot = allocatedSlots.next(); - assertThat(allocatedSlot.getIndex(), is(2)); + assertThat(allocatedSlot.getIndex(), is(-1)); assertThat(allocatedSlot.getResourceProfile(), is(resourceProfile)); assertThat(allocatedSlots.hasNext(), is(false)); } finally { @@ -224,7 +225,7 @@ public void testGenerateSlotReport() throws SlotNotFoundException { assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId2, SLOT_TIMEOUT), is(true)); // index 3 assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId3, SLOT_TIMEOUT), is(true)); // index 4 - assertThat(taskSlotTable.freeSlot(allocationId2), is(3)); + assertThat(taskSlotTable.freeSlot(allocationId2), is(-1)); ResourceID resourceId = ResourceID.generate(); SlotReport slotReport = taskSlotTable.createSlotReport(resourceId); @@ -236,7 +237,7 @@ public void testGenerateSlotReport() throws SlotNotFoundException { is(new SlotStatus(new SlotID(resourceId, 0), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, jobId, allocationId1)), is(new SlotStatus(new SlotID(resourceId, 1), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, null, null)), is(new SlotStatus(new SlotID(resourceId, 2), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, null, null)), - is(new SlotStatus(new SlotID(resourceId, 4), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, jobId, allocationId3)))); + is(new SlotStatus(SlotID.generateDynamicSlotID(resourceId), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, jobId, allocationId3)))); } finally { taskSlotTable.stop(); }