Skip to content

Commit

Permalink
[FLINK-1478] [jobmanager] Scheduler support for external location con…
Browse files Browse the repository at this point in the history
…straints
  • Loading branch information
StephanEwen committed Feb 5, 2015
1 parent a9ac7aa commit 970b2b7
Show file tree
Hide file tree
Showing 7 changed files with 586 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,15 @@ public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult>
//---------------------------------------------------------------------------------------------

public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {

// ExecutionVertex[] vertices = this.taskVertices;
//
// for (int i = 0; i < vertices.length; i++) {
// ExecutionVertex v = vertices[i];
//
// if (v.get
// }

for (ExecutionVertex ev : getTaskVertices()) {
ev.scheduleForExecution(scheduler, queued);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -82,6 +83,11 @@ public class ExecutionVertex implements Serializable {

private volatile Execution currentExecution; // this field must never be null


private volatile List<Instance> locationConstraintInstances;

private volatile boolean scheduleLocalOnly;

// --------------------------------------------------------------------------------------------

public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex,
Expand Down Expand Up @@ -294,10 +300,22 @@ else if (numSources < parallelism) {
}
}

public void setTargetHostConstraint(String hostname) {
public void setLocationConstraintHosts(List<Instance> instances) {
this.locationConstraintInstances = instances;
}

public void setScheduleLocalOnly(boolean scheduleLocalOnly) {
if (scheduleLocalOnly && inputEdges != null && inputEdges.length > 0) {
throw new IllegalArgumentException("Strictly local scheduling is only supported for sources.");
}

this.scheduleLocalOnly = scheduleLocalOnly;
}

public boolean isScheduleLocalOnly() {
return scheduleLocalOnly;
}

/**
* Gets the location preferences of this task, determined by the locations of the predecessors from which
* it receives input data.
Expand All @@ -307,23 +325,37 @@ public void setTargetHostConstraint(String hostname) {
* @return The preferred locations for this vertex execution, or null, if there is no preference.
*/
public Iterable<Instance> getPreferredLocations() {
HashSet<Instance> locations = new HashSet<Instance>();
// if we have hard location constraints, use those
{
List<Instance> constraintInstances = this.locationConstraintInstances;
if (constraintInstances != null && !constraintInstances.isEmpty()) {
return constraintInstances;
}
}

// otherwise, base the preferred locations on the input connections
if (inputEdges == null) {
return Collections.emptySet();
}
else {
HashSet<Instance> locations = new HashSet<Instance>();

for (int i = 0; i < inputEdges.length; i++) {
ExecutionEdge[] sources = inputEdges[i];
if (sources != null) {
for (int k = 0; k < sources.length; k++) {
SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource();
if (sourceSlot != null) {
locations.add(sourceSlot.getInstance());
if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
return null;
for (int i = 0; i < inputEdges.length; i++) {
ExecutionEdge[] sources = inputEdges[i];
if (sources != null) {
for (int k = 0; k < sources.length; k++) {
SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource();
if (sourceSlot != null) {
locations.add(sourceSlot.getInstance());
if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
return null;
}
}
}
}
}
return locations;
}
return locations;
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -407,6 +439,7 @@ public void prepareForArchiving() {
// clear the unnecessary fields in this class
this.resultPartitions = null;
this.inputEdges = null;
this.locationConstraintInstances = null;
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.LinkedBlockingQueue;

import akka.dispatch.Futures;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.runtime.akka.AkkaUtils;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {

private int nonLocalizedAssignments;


public Scheduler() {
this.newlyAvailableInstances = new LinkedBlockingQueue<Instance>();
}
Expand Down Expand Up @@ -164,6 +166,10 @@ private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throw
}

final ExecutionVertex vertex = task.getTaskToExecute().getVertex();

final Iterable<Instance> preferredLocations = vertex.getPreferredLocations();
final boolean forceExternalLocation = vertex.isScheduleLocalOnly() &&
preferredLocations != null && preferredLocations.iterator().hasNext();

synchronized (globalLock) {

Expand All @@ -179,6 +185,12 @@ private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throw
final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
final CoLocationConstraint constraint = task.getLocationConstraint();

// sanity check that we do not use an externally forced location and a co-location constraint together
if (constraint != null && forceExternalLocation) {
throw new IllegalArgumentException("The scheduling cannot be contrained simultaneously by a "
+ "co-location constriaint and an external location constraint.");
}

// get a slot from the group, if the group has one for us (and can fulfill the constraint)
SimpleSlot slotFromGroup;
if (constraint == null) {
Expand Down Expand Up @@ -206,16 +218,28 @@ private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throw
vertex.getPreferredLocations() : Collections.singleton(constraint.getLocation());

// get a new slot, since we could not place it into the group, or we could not place it locally
newSlot = getFreeSubSlotForTask(vertex, locations, assignment, constraint);
newSlot = getFreeSubSlotForTask(vertex, locations, assignment, constraint, forceExternalLocation);

SimpleSlot toUse;

if (newSlot == null) {
if (slotFromGroup == null) {
// both null
if (constraint == null || constraint.isUnassigned()) {
throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots());
} else {
if (forceExternalLocation) {
// could not satisfy the external location constraint
String hosts = getHostnamesFromInstances(preferredLocations);
throw new NoResourceAvailableException("Could not schedule task " + vertex
+ " to any of the required hosts: " + hosts);
}
else {
// simply nothing is available
throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(),
getTotalNumberOfSlots(), getNumberOfAvailableSlots());
}
}
else {
// nothing is available on the node where the co-location constraint pushes us
throw new NoResourceAvailableException("Could not allocate a slot on instance " +
constraint.getLocation() + ", as required by the co-location constraint.");
}
Expand Down Expand Up @@ -269,26 +293,49 @@ else if (slotFromGroup == null || newSlot.getLocality() == Locality.LOCAL) {
}

// 2) === schedule without hints and sharing ===

SimpleSlot slot = getFreeSlotForTask(vertex, vertex.getPreferredLocations());
if (slot != null) {
updateLocalityCounters(slot.getLocality());
return slot;
}
else {
// no resource available now, so queue the request
if (queueIfNoResource) {
SlotAllocationFuture future = new SlotAllocationFuture();
this.taskQueue.add(new QueuedTask(task, future));
return future;
{
SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation);
if (slot != null) {
updateLocalityCounters(slot.getLocality());
return slot;
}
else {
throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots());
// no resource available now, so queue the request
if (queueIfNoResource) {
SlotAllocationFuture future = new SlotAllocationFuture();
this.taskQueue.add(new QueuedTask(task, future));
return future;
}
else if (forceExternalLocation) {
String hosts = getHostnamesFromInstances(preferredLocations);
throw new NoResourceAvailableException("Could not schedule task " + vertex
+ " to any of the required hosts: " + hosts);
}
else {
throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots());
}
}
}
}
}

private String getHostnamesFromInstances(Iterable<Instance> instances) {
StringBuilder bld = new StringBuilder();

for (Instance i : instances) {
bld.append(i.getInstanceConnectionInfo().getHostname());
bld.append(", ");
}

if (bld.length() == 0) {
return "";
}
else {
bld.setLength(bld.length() - 2);
return bld.toString();
}
}

/**
* Gets a suitable instance to schedule the vertex execution to.
* <p>
Expand All @@ -297,21 +344,21 @@ else if (slotFromGroup == null || newSlot.getLocality() == Locality.LOCAL) {
* @param vertex The task to run.
* @return The instance to run the vertex on, it {@code null}, if no instance is available.
*/
protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable<Instance> requestedLocations) {
protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable<Instance> requestedLocations, boolean localOnly) {

// we need potentially to loop multiple times, because there may be false positives
// in the set-with-available-instances
while (true) {
Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations);
Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly);

if(instanceLocalityPair == null){
if (instanceLocalityPair == null){
return null;
}

Instance instanceToUse = instanceLocalityPair.getLeft();
Locality locality = instanceLocalityPair.getRight();

if(LOG.isDebugEnabled()){
if (LOG.isDebugEnabled()){
if(locality == Locality.LOCAL){
LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
}else if(locality == Locality.NON_LOCAL){
Expand Down Expand Up @@ -348,25 +395,26 @@ protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable<Instanc
protected SimpleSlot getFreeSubSlotForTask(ExecutionVertex vertex,
Iterable<Instance> requestedLocations,
SlotSharingGroupAssignment groupAssignment,
CoLocationConstraint constraint) {
CoLocationConstraint constraint,
boolean localOnly) {
// we need potentially to loop multiple times, because there may be false positives
// in the set-with-available-instances
while (true) {
Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations);
Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly);

if(instanceLocalityPair == null){
if (instanceLocalityPair == null) {
return null;
}

Instance instanceToUse = instanceLocalityPair.getLeft();
Locality locality = instanceLocalityPair.getRight();

if(LOG.isDebugEnabled()){
if(locality == Locality.LOCAL){
if (LOG.isDebugEnabled()) {
if (locality == Locality.LOCAL) {
LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
}else if(locality == Locality.NON_LOCAL){
} else if(locality == Locality.NON_LOCAL) {
LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
}else if(locality == Locality.UNCONSTRAINED) {
} else if(locality == Locality.UNCONSTRAINED) {
LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
}
}
Expand Down Expand Up @@ -409,7 +457,8 @@ protected SimpleSlot getFreeSubSlotForTask(ExecutionVertex vertex,
*
* @param requestedLocations
*/
private Pair<Instance, Locality> findInstance(Iterable<Instance> requestedLocations){
private Pair<Instance, Locality> findInstance(Iterable<Instance> requestedLocations, boolean localOnly){

if (this.instancesWithAvailableResources.isEmpty()) {
// check if the asynchronous calls did not yet return the queues
Instance queuedInstance = this.newlyAvailableInstances.poll();
Expand All @@ -434,14 +483,18 @@ private Pair<Instance, Locality> findInstance(Iterable<Instance> requestedLocati
if (location != null && this.instancesWithAvailableResources.remove(location)) {
instanceToUse = location;
locality = Locality.LOCAL;

break;
}
}

if (instanceToUse == null) {
instanceToUse = this.instancesWithAvailableResources.poll();
locality = Locality.NON_LOCAL;
if (localOnly) {
return null;
}
else {
instanceToUse = this.instancesWithAvailableResources.poll();
locality = Locality.NON_LOCAL;
}
}
}
else {
Expand Down Expand Up @@ -603,8 +656,8 @@ public void instanceDied(Instance instance) {
public int getNumberOfAvailableInstances() {
int numberAvailableInstances = 0;
synchronized (this.globalLock) {
for(Instance instance: allInstances){
if(instance.isAlive()){
for (Instance instance: allInstances ){
if (instance.isAlive()){
numberAvailableInstances++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.jobmanager.scheduler;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -41,13 +40,11 @@
import org.slf4j.Logger;


public class SlotSharingGroupAssignment implements Serializable {

static final long serialVersionUID = 42L;
public class SlotSharingGroupAssignment {

private static final Logger LOG = Scheduler.LOG;

private transient final Object lock = new Object();
private final Object lock = new Object();

/** All slots currently allocated to this sharing group */
private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,8 @@ public static Instance getInstance(final ActorRef taskManager) throws
return getInstance(taskManager, 1);
}

public static Instance getInstance(final ActorRef taskManager, final int numberOfSlots) throws
Exception {
HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
public static Instance getInstance(final ActorRef taskManager, final int numberOfSlots) throws Exception {
HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
InetAddress address = InetAddress.getByName("127.0.0.1");
InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);

Expand Down
Loading

0 comments on commit 970b2b7

Please sign in to comment.