Skip to content

Commit

Permalink
Storing partitioning info
Browse files Browse the repository at this point in the history
  • Loading branch information
KidEinstein committed Apr 10, 2017
1 parent 07c746a commit 6414512
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void sendToNeighbors(Iterable<M> message) {
throw new UnsupportedOperationException();
}

DefaultSubgraph<S, I, V, E, SV, EI> subgraph;
private DefaultSubgraph<S, I, V, E, SV, EI> subgraph;

public Subgraph<S, I, V, E, SV, EI> getSubgraph() {
return subgraph;
Expand Down Expand Up @@ -104,6 +104,13 @@ public void sendToNeighbors(M message) {
super.sendMessageToAllEdges(subgraph, sm);
}

public void sendToNeighbors(DefaultSubgraph<S, I, V, E, SV, EI> subgraph, M message) {
WritableComparable subgraphId = subgraph.getSubgraphId();
SubgraphMessage sm = new SubgraphMessage(subgraphId, message);
super.sendMessageToAllEdges(subgraph, sm);
}




public void sendMessage(SubgraphId<S> subgraphId, M message) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package in.dream_lab.goffish.giraph;

import in.dream_lab.goffish.AbstractSubgraphComputation;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
import org.apache.hadoop.io.*;
import org.apache.log4j.Logger;
Expand All @@ -12,11 +14,15 @@
/**
* Created by anirudh on 02/11/16.
*/
public class RemoteVerticesFinder extends AbstractSubgraphComputation<LongWritable, LongWritable, DoubleWritable, DoubleWritable, BytesWritable, NullWritable, LongWritable> {
public class RemoteVerticesFinder extends GiraphSubgraphComputation<LongWritable, LongWritable, DoubleWritable, DoubleWritable, BytesWritable, NullWritable, LongWritable> {
public static final Logger LOG = Logger.getLogger(RemoteVerticesFinder.class);

@Override
public void compute(Iterable<SubgraphMessage<LongWritable, BytesWritable>> subgraphMessages) throws IOException {
DefaultSubgraph<LongWritable, LongWritable, DoubleWritable, DoubleWritable, NullWritable, LongWritable> subgraph = (DefaultSubgraph) getSubgraph();
public void compute(Vertex<SubgraphId<LongWritable>, SubgraphVertices<LongWritable, LongWritable, DoubleWritable, DoubleWritable, NullWritable, LongWritable>, DoubleWritable> vertex, Iterable<SubgraphMessage<LongWritable, BytesWritable>> subgraphMessages) throws IOException {
DefaultSubgraph<LongWritable, LongWritable, DoubleWritable, DoubleWritable, NullWritable, LongWritable> subgraph = (DefaultSubgraph) vertex;
MapWritable subgraphPartitionMap = new MapWritable();
subgraphPartitionMap.put(subgraph.getSubgraphId(), new IntWritable(subgraph.getPartitionId()));
aggregate(SubgraphMasterCompute.ID, subgraphPartitionMap);
SubgraphVertices<LongWritable, LongWritable, DoubleWritable, DoubleWritable, NullWritable, LongWritable> subgraphVertices = subgraph.getSubgraphVertices();
//System.out.println("SV in RVF 1 : " + subgraphVertices);
HashMap<LongWritable, SubgraphVertex<LongWritable, LongWritable, DoubleWritable, DoubleWritable, LongWritable>> vertices = subgraphVertices.getLocalVertices();
Expand Down Expand Up @@ -78,6 +84,6 @@ public void compute(Iterable<SubgraphMessage<LongWritable, BytesWritable>> subgr
//LOG.info("Test 2, Free memory: " + freeMemoryMB());

// LOG.info("Test, All messages sent");
sendToNeighbors(bw);
sendToNeighbors(subgraph, bw);
}
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
package in.dream_lab.goffish.giraph;

import in.dream_lab.goffish.AbstractSubgraphComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.utils.*;
import org.apache.hadoop.io.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;

/**
* Created by anirudh on 06/11/16.
*/
public class RemoteVerticesFinder2 extends AbstractSubgraphComputation<LongWritable, LongWritable, DoubleWritable, DoubleWritable, BytesWritable, NullWritable, LongWritable> {
public class RemoteVerticesFinder2 extends GiraphSubgraphComputation<LongWritable, LongWritable, DoubleWritable, DoubleWritable, BytesWritable, NullWritable, LongWritable> {
@Override
public void compute(Iterable<SubgraphMessage<LongWritable, BytesWritable>> messages) throws IOException {
DefaultSubgraph<LongWritable, LongWritable, DoubleWritable, DoubleWritable, NullWritable, LongWritable> subgraph = (DefaultSubgraph) getSubgraph();
public void compute(Vertex<SubgraphId<LongWritable>, SubgraphVertices<LongWritable, LongWritable, DoubleWritable, DoubleWritable, NullWritable, LongWritable>, DoubleWritable> vertex, Iterable<SubgraphMessage<LongWritable, BytesWritable>> subgraphMessages) throws IOException {
DefaultSubgraph<LongWritable, LongWritable, DoubleWritable, DoubleWritable, NullWritable, LongWritable> subgraph = (DefaultSubgraph) vertex;
MapWritable subgraphPartitionMapping = getAggregatedValue(SubgraphMasterCompute.ID);
subgraph.getSubgraphVertices().setSubgraphParitionMapping(subgraphPartitionMapping);
SubgraphVertices<LongWritable, LongWritable, DoubleWritable, DoubleWritable, NullWritable, LongWritable> subgraphVertices = subgraph.getSubgraphVertices();
//System.out.println("RVF2 Subgraph ID: " + subgraph.getId().getSubgraphId());
HashMap<LongWritable, SubgraphVertex<LongWritable, LongWritable, DoubleWritable, DoubleWritable, LongWritable>> vertices = subgraphVertices.getLocalVertices();
for (SubgraphMessage<LongWritable, BytesWritable> message : messages) {
for (SubgraphMessage<LongWritable, BytesWritable> message : subgraphMessages) {
LinkedList<LongWritable> vertexIdsFound = new LinkedList();
ExtendedByteArrayDataOutput dataOutput = new ExtendedByteArrayDataOutput();
SubgraphId<LongWritable> senderSubgraphId = org.apache.giraph.utils.WritableUtils.createWritable(SubgraphId.class, getConf());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package in.dream_lab.goffish.giraph;

import in.dream_lab.goffish.AbstractSubgraphComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.utils.ExtendedByteArrayDataInput;
import org.apache.hadoop.io.*;

Expand All @@ -10,15 +11,14 @@
/**
* Created by anirudh on 06/11/16.
*/
public class RemoteVerticesFinder3 extends AbstractSubgraphComputation<LongWritable,
public class RemoteVerticesFinder3 extends GiraphSubgraphComputation<LongWritable,
LongWritable, DoubleWritable, DoubleWritable, BytesWritable, NullWritable, LongWritable> {

@Override
public void compute(Iterable<SubgraphMessage<LongWritable, BytesWritable>> messages) throws IOException {
Subgraph<LongWritable, LongWritable, DoubleWritable, DoubleWritable, NullWritable, LongWritable> subgraph = getSubgraph();
public void compute(Vertex<SubgraphId<LongWritable>, SubgraphVertices<LongWritable, LongWritable, DoubleWritable, DoubleWritable, NullWritable, LongWritable>, DoubleWritable> vertex, Iterable<SubgraphMessage<LongWritable, BytesWritable>> subgraphMessages) throws IOException {
Subgraph<LongWritable, LongWritable, DoubleWritable, DoubleWritable, NullWritable, LongWritable> subgraph = (DefaultSubgraph) vertex;
HashMap<LongWritable, RemoteSubgraphVertex<LongWritable, LongWritable, DoubleWritable, DoubleWritable, LongWritable>> remoteVertices = subgraph.getSubgraphVertices().getRemoteVertices();
//System.out.println("IN RVF 3\n");
for (SubgraphMessage<LongWritable, BytesWritable> message : messages) {
for (SubgraphMessage<LongWritable, BytesWritable> message : subgraphMessages) {
ExtendedByteArrayDataInput dataInput = new ExtendedByteArrayDataInput(message.getMessage().getBytes());
SubgraphId<LongWritable> senderSubgraphId = org.apache.giraph.utils.WritableUtils.createWritable(SubgraphId.class, getConf());
senderSubgraphId.readFields(dataInput);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package in.dream_lab.goffish.giraph;

import org.apache.giraph.master.DefaultMasterCompute;

/**
* Created by anirudh on 17/03/17.
*/
public class SubgraphMasterCompute extends DefaultMasterCompute {
public static final String ID = "SubgraphPartitionMappingAggregator";

@Override
public void compute() {
long superstep = getSuperstep();
if (superstep == 0) {
setComputation(RemoteVerticesFinder.class);
} else if(superstep == 1) {
setComputation(RemoteVerticesFinder2.class);
} else if(superstep == 2) {
setComputation(RemoteVerticesFinder3.class);
} else {
setComputation(GiraphSubgraphComputation.class);
}
}

@Override
public void initialize() throws InstantiationException, IllegalAccessException {
registerAggregator(ID, SubgraphPartitionMappingAggregator.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package in.dream_lab.goffish.giraph;

import org.apache.giraph.aggregators.Aggregator;
import org.apache.giraph.aggregators.BasicAggregator;
import org.apache.giraph.utils.ExtendedByteArrayDataInput;
import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
import org.apache.hadoop.io.*;

import java.io.IOException;

/**
* Created by anirudh on 17/03/17.
*/
public class SubgraphPartitionMappingAggregator implements Aggregator<MapWritable> {
MapWritable subgraphPartitionMap;

public SubgraphPartitionMappingAggregator() {
subgraphPartitionMap = (MapWritable) createInitialValue();
}

@Override
public void aggregate(MapWritable value) {
subgraphPartitionMap.putAll(value);
}

@Override
public MapWritable createInitialValue() {
return new MapWritable();
}

@Override
public MapWritable getAggregatedValue() {
return subgraphPartitionMap;
}

@Override
public void setAggregatedValue(MapWritable value) {
subgraphPartitionMap = value;
}

@Override
public void reset() {
subgraphPartitionMap = createInitialValue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package in.dream_lab.goffish.giraph;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;

/**
* Created by anirudh on 17/03/17.
*/
public class SubgraphPartitionMappingWritable implements Writable {
public HashMap<LongWritable, IntWritable> subgraphPartitionMap = new HashMap<>();
@Override
public void write(DataOutput dataOutput) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public void readFields(DataInput dataInput) throws IOException {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.giraph.conf.GiraphConfigurationSettable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

Expand All @@ -33,6 +34,8 @@ public class SubgraphVertices<S extends WritableComparable,

private ImmutableClassesGiraphConfiguration conf;

MapWritable subgraphParitionMapping;

public SubgraphVertices() {
//// System.out.println("Calling subgraph vertices constructor");
// try {
Expand Down Expand Up @@ -160,4 +163,8 @@ public void readFields(DataInput dataInput) throws IOException {
public void setConf(ImmutableClassesGiraphConfiguration configuration) {
conf = configuration;
}

public void setSubgraphParitionMapping(MapWritable subgraphParitionMapping) {
this.subgraphParitionMapping = subgraphParitionMapping;
}
}

0 comments on commit 6414512

Please sign in to comment.