Skip to content

Commit

Permalink
API Fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
KidEinstein committed Apr 10, 2017
1 parent 605c308 commit 7e39f91
Show file tree
Hide file tree
Showing 37 changed files with 1,280 additions and 499 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public interface GiraphConstants {
DefaultSubgraphVertexValueFactory.class, SubgraphVertexValueFactory.class,
"Subgraph Value Factory class- optional");


ClassConfOption<EdgeIdFactory> EDGE_ID_FACTORY_CLASS =
ClassConfOption.create("giraph.edgeIdFactoryClass",
DefaultEdgeIdFactory.class, EdgeIdFactory.class,
Expand Down Expand Up @@ -350,6 +351,11 @@ public interface GiraphConstants {
DoubleWritable.class, Writable.class,
"Subgraph vertex value class");

ClassConfOption<Writable> SUBGRAPH_EDGE_VALUE_CLASS =
ClassConfOption.create("giraph.subgraphEdgeValueClass",
NullWritable.class, Writable.class,
"Subgraph vertex value class");

LongConfOption SUBGRAPH_SOURCE_VERTEX =
new LongConfOption("giraph.subgraphSourceVertex", 0, "Source vertex for algorithms like SSP");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,14 +785,21 @@ public Writable createSubgraphVertexValue() {
return getSubgraphVertexValueFactory().newInstance();
}

public Writable createSubgraphEdgeValue() {
return WritableUtils.createWritable(SUBGRAPH_EDGE_VALUE_CLASS.get(this), this);
}

public SubgraphVertexIdFactory<? extends WritableComparable> getSubgraphVertexIdFactory() {
return valueFactories.getSubgraphVertexIdFactory();
}



public WritableComparable createSubgraphVertexId() {
return getSubgraphVertexIdFactory().newInstance();
}


public Writable getSubgraphVertexValue() {
return getSubgraphVertexValueFactory().newInstance();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ private PartitionStats computePartition(
context.progress();
long tempTime = System.currentTimeMillis();
computation.compute(vertex, messages);
// LOG.info("Superstep,PartitionID,subgraphID,Time:" + serviceWorker.getSuperstep() + "," + partition.getId() + "," + ((Subgraph<SubgraphId<LongWritable>,?,?,?,?,?>)vertex).getId().getSubgraphId() + "," + (System.currentTimeMillis() - tempTime));
LOG.info("Superstep,PartitionID,subgraphID,Time:" + serviceWorker.getSuperstep() + "," + partition.getId() + "," + ((Subgraph<SubgraphId<LongWritable>,?,?,?,?,?>)vertex).getSubgraphId() + "," + (System.currentTimeMillis() - tempTime));
// Need to unwrap the mutated edges (possibly)
vertex.unwrapMutableEdges();
//Compact edges representation if possible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public void setSubgraphId(SubgraphId<S> subgraphId) {
this.subgraphId = subgraphId;
}

@Override
public V getValue() {
throw new UnsupportedOperationException("getValue() not supported for remote vertices");
}
// @Override
// public V getValue() {
// throw new UnsupportedOperationException("getValue() not supported for remote vertices");
// }
}
120 changes: 116 additions & 4 deletions giraph-core/src/main/java/org/apache/giraph/graph/DefaultSubgraph.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package org.apache.giraph.graph;

import org.apache.giraph.edge.Edge;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Iterator;

/**
* Created by anirudh on 27/09/16.
Expand All @@ -21,14 +19,128 @@ public class DefaultSubgraph<S extends WritableComparable,
I extends WritableComparable, V extends Writable, E extends Writable, SV extends Writable, EI extends WritableComparable>
extends DefaultVertex<SubgraphId<S>, SubgraphVertices<S, I, V, E, SV, EI>, E> implements Subgraph<S, I, V, E, SV, EI> {

public void setRemoteVertices(HashMap<S, RemoteSubgraphVertex<S, I, V, E, EI>> remoteVertices) {
public void setRemoteVertices(HashMap<I, RemoteSubgraphVertex<S, I, V, E, EI>> remoteVertices) {
SubgraphVertices<S, I, V, E, SV, EI> subgraphVertices = getValue();
subgraphVertices.setRemoteVertices(remoteVertices);
}

public Iterable<RemoteSubgraphVertex<S, I, V, E, EI>> getRemoteVertices() {
SubgraphVertices<S, I, V, E, SV, EI> subgraphVertices = getValue();
return subgraphVertices.getRemoteVertices().values();
}

@Override
public SubgraphEdge<I, E, EI> getEdgeById(EI edgeId) {
return null;
}

@Override
public void setSubgraphValue(SV value) {
getSubgraphVertices().setSubgraphValue(value);
}

@Override
public SV getSubgraphValue() {
return getSubgraphVertices().getSubgraphValue();
}

@Override
public SubgraphVertices<S, I, V, E, SV, EI> getSubgraphVertices() {
return getValue();
}

@Override
public SubgraphVertex<S, I, V, E, EI> getVertexById(I vertexId) {
return null;
}

@Override
public S getSubgraphId() {
return getId().getSubgraphId();
}

@Override
public long getVertexCount() {
return getSubgraphVertices().getNumVertices() + getSubgraphVertices().getNumRemoteVertices();
}

@Override
public long getLocalVertexCount() {
return getSubgraphVertices().getNumVertices();
}

@Override
public Iterable<SubgraphVertex<S, I, V, E, EI>> getVertices() {
return getSubgraphVertices().getVertices();
}

@Override
public Iterable<SubgraphVertex<S, I, V, E, EI>> getLocalVertices() {
return getSubgraphVertices().getLocalVertices().values();
}

public int getPartitionId() {
return getId().getPartitionId();
}

public Iterable<SubgraphEdge<I, E, EI>> getVertexEdges() {
return new Iterable<SubgraphEdge<I, E, EI>>() {
@Override
public Iterator<SubgraphEdge<I, E, EI>> iterator() {
return new EdgeIterator();
}
};
}

private final class EdgeIterator implements Iterator<SubgraphEdge<I, E, EI>> {
Iterator<SubgraphVertex<S, I, V, E, EI>> vertexMapIterator;
Iterator<SubgraphEdge<I, E, EI>> edgeIterator;

public EdgeIterator() {
vertexMapIterator = getVertices().iterator();
SubgraphVertex<S, I, V, E, EI> nextVertex = vertexMapIterator.next();
edgeIterator = nextVertex.getOutEdges().iterator();
}

@Override
public boolean hasNext() {
if (edgeIterator.hasNext()) {
return true;
} else {
while (vertexMapIterator.hasNext()) {
SubgraphVertex<S, I, V, E, EI> nextVertex = vertexMapIterator.next();
edgeIterator = nextVertex.getOutEdges().iterator();
if (edgeIterator.hasNext()) {
return true;
}
}
}
return false;
}

public SubgraphEdge<I, E, EI> next() {
if (edgeIterator.hasNext()) {
return edgeIterator.next();
} else {
while (vertexMapIterator.hasNext()) {
SubgraphVertex<S, I, V, E, EI> nextVertex = vertexMapIterator.next();
edgeIterator = nextVertex.getOutEdges().iterator();
if (edgeIterator.hasNext()) {
return edgeIterator.next();
}
}
}
return null;
}

// TODO: Raise exception on call to remove
@Override
public void remove() {
throw new UnsupportedOperationException();
}

}



}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.collect.Lists;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

Expand Down Expand Up @@ -76,6 +77,7 @@ public void write(DataOutput dataOutput) throws IOException {
for (SubgraphEdge<I, E, EI> edge : outEdges) {
// System.out.println("Write: " + "Edge:" + edge.getSinkVertexId() + " Class: " + edge.getSinkVertexId().getClass().getSimpleName());
edge.getSinkVertexId().write(dataOutput);
edge.getValue().write(dataOutput);
}
}

Expand Down Expand Up @@ -104,8 +106,10 @@ public void readFields(ImmutableClassesGiraphConfiguration conf, DataInput dataI
//System.out.println("\n THIS IS I : "+ i);
DefaultSubgraphEdge<I, E, EI> se = new DefaultSubgraphEdge<>();
I targetId = (I) conf.createSubgraphVertexId();
E edgeValue = (E) conf.createSubgraphEdgeValue();
targetId.readFields(dataInput);
se.initialize(null, null, targetId);
edgeValue.readFields(dataInput);
se.initialize(null, edgeValue, targetId);
//System.out.println("Read: " + "Edge:" + se.getSinkVertexId() + " Class: " + se.getSinkVertexId().getClass().getSimpleName());
outEdges.add(se);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package org.apache.giraph.graph;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.giraph.comm.messages.SubgraphMessage;
import org.apache.giraph.conf.ClassConfOption;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;

import java.io.IOException;

/**
* Created by anirudh on 27/09/16.
*
* @param <S> Subgraph id
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge data
* @param <M> Message type
* @param <SV> Subgraph Value type
*/


// S subgraph value type ----- SV now
// V vertex object type -- V is the vertex value
// E edge value type -- E is the edge value
// M msg object type -- M is the message value type
// I vertex id --- I is the vertex id here
// J edge id -- EI
// K subgraph id ---- S

public class GiraphSubgraphComputation<S extends WritableComparable,
I extends WritableComparable, V extends WritableComparable, E extends Writable, M extends Writable, SV extends Writable, EI extends WritableComparable> extends BasicComputation<SubgraphId<S>, SubgraphVertices<S, I, V, E, SV, EI>, E, SubgraphMessage<S, M>> {

private static final Logger LOG = Logger.getLogger(GiraphSubgraphComputation.class);

private static final ClassConfOption<UserSubgraphComputation> SUBGRAPH_COMPUTATION_CLASS = ClassConfOption.create("subgraphComputationClass",
null, UserSubgraphComputation.class, "Subgraph Computation Class");

private UserSubgraphComputation<S, I, V, E, M, SV, EI> userSubgraphComputation;

// TODO: Have class be specified in conf

DefaultSubgraph<S, I, V, E, SV, EI> subgraph;

public Subgraph<S, I, V, E, SV, EI> getSubgraph() {
return subgraph;
}
// TODO: Take care of state changes for the subgraph passed

public void compute(Vertex<SubgraphId<S>, SubgraphVertices<S, I, V, E, SV, EI>, E> vertex, Iterable<SubgraphMessage<S, M>> messages) throws IOException {
Class userSubgraphComputationClass;
long superstep = getSuperstep();
if (superstep == 0) {
userSubgraphComputationClass = RemoteVerticesFinder.class;
} else if (superstep == 1) {
userSubgraphComputationClass = RemoteVerticesFinder2.class;
} else if (superstep == 2) {
userSubgraphComputationClass = RemoteVerticesFinder3.class;
} else {
userSubgraphComputationClass = SUBGRAPH_COMPUTATION_CLASS.get(getConf());
LOG.info("User Class: " + userSubgraphComputationClass);
}
userSubgraphComputation = (UserSubgraphComputation<S, I, V, E, M, SV, EI>) ReflectionUtils.newInstance(userSubgraphComputationClass, getConf());
LOG.info("User Object: " + userSubgraphComputation);
userSubgraphComputation.setGiraphSubgraphComputation(this);
subgraph = (DefaultSubgraph) vertex;
userSubgraphComputation.compute(messages);
}

public void sendMessageToAllNeighboringSubgraphs(M message) {
WritableComparable subgraphId = subgraph.getSubgraphId();
SubgraphMessage sm = new SubgraphMessage(subgraphId, message);
super.sendMessageToAllEdges(subgraph, sm);
}

public void sendMessage(SubgraphId<S> subgraphId, M message) {
SubgraphMessage sm = new SubgraphMessage(subgraphId.getSubgraphId(), message);
sendMessage(subgraphId, sm);
}

public void voteToHalt() {
subgraph.voteToHalt();
}

SubgraphEdge<I, E, EI> getEdgeById(EI id) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.apache.giraph.graph;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;

/**
* Created by anirudh on 02/03/17.
*/
public class GiraphSubgraphComputationImpl extends GiraphSubgraphComputation<LongWritable, LongWritable, DoubleWritable, DoubleWritable, BytesWritable, NullWritable, LongWritable>{
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,25 @@
package org.apache.giraph.examples;
package org.apache.giraph.graph;

import org.apache.giraph.comm.messages.SubgraphMessage;
import org.apache.giraph.graph.*;
import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
import org.apache.hadoop.io.*;
import org.apache.log4j.Logger;

import java.io.DataOutput;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryType;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;

import static org.apache.giraph.utils.MemoryUtils.freeMemoryMB;

/**
* Created by anirudh on 02/11/16.
*/
public class RemoteVerticesFinder extends SubgraphComputation<LongWritable, LongWritable, DoubleWritable, DoubleWritable, BytesWritable, NullWritable, LongWritable> {
public class RemoteVerticesFinder extends UserSubgraphComputation<LongWritable, LongWritable, DoubleWritable, DoubleWritable, BytesWritable, NullWritable, LongWritable> {
public static final Logger LOG = Logger.getLogger(RemoteVerticesFinder.class);
@Override
public void compute(Subgraph<LongWritable, LongWritable, DoubleWritable, DoubleWritable, NullWritable, LongWritable> subgraph, Iterable<SubgraphMessage<LongWritable, BytesWritable>> subgraphMessages) throws IOException {
public void compute(Iterable<SubgraphMessage<LongWritable, BytesWritable>> subgraphMessages) throws IOException {
DefaultSubgraph<LongWritable, LongWritable, DoubleWritable, DoubleWritable, NullWritable, LongWritable> subgraph = (DefaultSubgraph) getSubgraph();
SubgraphVertices<LongWritable, LongWritable, DoubleWritable, DoubleWritable, NullWritable, LongWritable> subgraphVertices = subgraph.getSubgraphVertices();
//System.out.println("SV in RVF 1 : " + subgraphVertices);
HashMap<LongWritable, SubgraphVertex<LongWritable, LongWritable, DoubleWritable, DoubleWritable, LongWritable>> vertices = subgraphVertices.getVertices();
HashMap<LongWritable, SubgraphVertex<LongWritable, LongWritable, DoubleWritable, DoubleWritable, LongWritable>> vertices = subgraphVertices.getLocalVertices();
//System.out.println("SV Linked List in 1 : " + vertices);

// for (MemoryPoolMXBean mpBean: ManagementFactory.getMemoryPoolMXBeans()) {
Expand Down Expand Up @@ -58,7 +51,7 @@ public void compute(Subgraph<LongWritable, LongWritable, DoubleWritable, DoubleW
}
}

LOG.info("Partition,Subgraph,Vertices,RemoteVertices,Edges:" + subgraph.getId().getPartitionId() + "," + subgraph.getId().getSubgraphId() + "," + subgraph.getSubgraphVertices().getVertices().size() + "," + remoteVertexIds.size() + "," + edgeCount);
LOG.info("Partition,Subgraph,Vertices,RemoteVertices,Edges:" + subgraph.getPartitionId() + "," + subgraph.getSubgraphId() + "," + subgraph.getSubgraphVertices().getLocalVertices().size() + "," + remoteVertexIds.size() + "," + edgeCount);

subgraph.getId().write(dataOutput);
// LOG.info("Test, Sender subgraphID is : " + subgraph.getId());
Expand All @@ -85,6 +78,6 @@ public void compute(Subgraph<LongWritable, LongWritable, DoubleWritable, DoubleW
//LOG.info("Test 2, Free memory: " + freeMemoryMB());

// LOG.info("Test, All messages sent");
sendMessageToAllNeighboringSubgraphs(subgraph, bw);
sendToNeighbors(bw);
}
}
Loading

0 comments on commit 7e39f91

Please sign in to comment.