Skip to content

Commit

Permalink
GIRAPH-1132
Browse files Browse the repository at this point in the history
closes apache#21
  • Loading branch information
Sergey Edunov committed Mar 1, 2017
1 parent f37f373 commit 18c67ca
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ public BspService(
conf.getZookeeperOpsRetryWaitMsecs(),
this,
context);
connectedEvent.waitForever();
connectedEvent.waitForTimeoutOrFail(
GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(conf));
this.fs = FileSystem.get(getConfiguration());
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.OutputFormat;

import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -1238,5 +1239,27 @@ public interface GiraphConstants {
BooleanConfOption PREFER_IP_ADDRESSES =
new BooleanConfOption("giraph.preferIP", false,
"Prefer IP addresses instead of host names");

/**
* Timeout for "waitForever", when we need to wait for zookeeper.
* Since we should never really have to wait forever.
* We should only wait some reasonable but large amount of time.
*/
LongConfOption WAIT_ZOOKEEPER_TIMEOUT_MSEC =
new LongConfOption("giraph.waitZookeeperTimeoutMsec",
MINUTES.toMillis(15),
"How long should we stay in waitForever loops in various " +
"places that require network connection");

/**
* Timeout for "waitForever", when we need to wait for other workers
* to complete their job.
* Since we should never really have to wait forever.
* We should only wait some reasonable but large amount of time.
*/
LongConfOption WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC =
new LongConfOption("giraph.waitForOtherWorkersMsec",
HOURS.toMillis(48),
"How long should workers wait to finish superstep");
}
// CHECKSTYLE: resume InterfaceIsTypeCheck
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,9 @@ public boolean becomeMaster() {
return isMaster;
}
LOG.info("becomeMaster: Waiting to become the master...");
getMasterElectionChildrenChangedEvent().waitForever();
getMasterElectionChildrenChangedEvent().waitForTimeoutOrFail(
GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(
getConfiguration()));
getMasterElectionChildrenChangedEvent().reset();
} catch (KeeperException e) {
throw new IllegalStateException(
Expand Down Expand Up @@ -1832,7 +1834,9 @@ private void cleanUpZooKeeper() {
return;
}

getCleanedUpChildrenChangedEvent().waitForever();
getCleanedUpChildrenChangedEvent().waitForTimeoutOrFail(
GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(
getConfiguration()));
getCleanedUpChildrenChangedEvent().reset();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,9 @@ private void markCurrentWorkerDoneReadingThenWaitForOthers() {
if (inputSplitsDoneStat != null) {
break;
}
getInputSplitsAllDoneEvent().waitForever();
getInputSplitsAllDoneEvent().waitForTimeoutOrFail(
GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
getConfiguration()));
getInputSplitsAllDoneEvent().reset();
}
}
Expand Down Expand Up @@ -647,7 +649,9 @@ private void registerHealth(long superstep) {
"from previous failure): " + myHealthPath +
". Waiting for change in attempts " +
"to re-join the application");
getApplicationAttemptChangedEvent().waitForever();
getApplicationAttemptChangedEvent().waitForTimeoutOrFail(
GiraphConstants.WAIT_ZOOKEEPER_TIMEOUT_MSEC.get(
getConfiguration()));
if (LOG.isInfoEnabled()) {
LOG.info("registerHealth: Got application " +
"attempt changed event, killing self");
Expand Down Expand Up @@ -868,7 +872,9 @@ private void waitForRequestsToFinish() {
private void waitForOtherWorkers(String superstepFinishedNode) {
try {
while (getZkExt().exists(superstepFinishedNode, true) == null) {
getSuperstepFinishedEvent().waitForever();
getSuperstepFinishedEvent().waitForTimeoutOrFail(
GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
getConfiguration()));
getSuperstepFinishedEvent().reset();
}
} catch (KeeperException e) {
Expand Down Expand Up @@ -1683,7 +1689,9 @@ public final void exchangeVertexPartitions(
LOG.info("exchangeVertexPartitions: Waiting for workers " +
workerIdSet);
}
getPartitionExchangeChildrenChangedEvent().waitForever();
getPartitionExchangeChildrenChangedEvent().waitForTimeoutOrFail(
GiraphConstants.WAIT_FOR_OTHER_WORKERS_TIMEOUT_MSEC.get(
getConfiguration()));
getPartitionExchangeChildrenChangedEvent().reset();
}
} catch (KeeperException | InterruptedException e) {
Expand Down
5 changes: 3 additions & 2 deletions giraph-core/src/main/java/org/apache/giraph/zk/BspEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public interface BspEvent {
boolean waitMsecs(int msecs);

/**
* Wait indefinitely until the event occurs.
* Waits until timeout or fails with runtime exception.
* @param timeout Throws exception if waiting takes longer than timeout.
*/
void waitForever();
void waitForTimeoutOrFail(long timeout);
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,12 @@ public boolean waitMsecs(int msecs) {
}

@Override
public void waitForever() {
public void waitForTimeoutOrFail(long timeout) {
long t0 = System.currentTimeMillis();
while (!waitMsecs(msecPeriod)) {
if (System.currentTimeMillis() > t0 + timeout) {
throw new RuntimeException("Timeout waiting");
}
progressable.progress();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.giraph.time.Time;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
import org.apache.hadoop.util.Progressable;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -111,14 +109,14 @@ public void testEvent() {
}

/**
* Thread signaled test for {@link PredicateLock#waitForever()}
* Thread signaled test for {@link PredicateLock#waitForTimeoutOrFail(long)}
*/
@Test
public void testWaitForever() {
BspEvent event = new PredicateLock(getStubProgressable());
Thread signalThread = new SignalThread(event);
signalThread.start();
event.waitForever();
event.waitForTimeoutOrFail(5 * 60_000);
try {
signalThread.join();
} catch (InterruptedException e) {
Expand Down

0 comments on commit 18c67ca

Please sign in to comment.