Skip to content

Commit

Permalink
NetEndpoint支持序列化
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Sep 17, 2018
1 parent 3116bab commit f980460
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 93 deletions.
16 changes: 16 additions & 0 deletions lealone-common/src/main/java/org/lealone/net/NetEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.lealone.net;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -185,4 +188,17 @@ public int compareTo(NetEndpoint o) {
return v;
}

public void serialize(DataOutput out) throws IOException {
byte[] bytes = getAddress(); // Inet4Address是4个字节,Inet6Address是16个字节
out.writeByte(bytes.length);
out.write(bytes);
out.writeInt(getPort());
}

public static NetEndpoint deserialize(DataInput in) throws IOException {
byte[] bytes = new byte[in.readByte()];
in.readFully(bytes, 0, bytes.length);
int port = in.readInt();
return new NetEndpoint(InetAddress.getByAddress(bytes), port);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
public class ConfigDescriptor {

private static Config config;
private static NetEndpoint localEndpoint;
private static NetEndpoint localP2pEndpoint;
private static IEndpointSnitch snitch;
private static String localDC;
private static Comparator<NetEndpoint> localComparator;
Expand All @@ -77,10 +77,10 @@ public static void applyConfig(Config config) throws ConfigException {
throw new ConfigException("phi_convict_threshold must be between 5 and 16");
}

localEndpoint = createLocalEndpoint(config);
localP2pEndpoint = createLocalP2pEndpoint(config);
snitch = createEndpointSnitch(config.cluster_config);

localDC = snitch.getDatacenter(localEndpoint);
localDC = snitch.getDatacenter(localP2pEndpoint);
localComparator = new Comparator<NetEndpoint>() {
@Override
public int compare(NetEndpoint endpoint1, NetEndpoint endpoint2) {
Expand Down Expand Up @@ -110,7 +110,7 @@ private static boolean isP2pServerEnabled() {
return p2pServerEnabled;
}

private static NetEndpoint createLocalEndpoint(Config config) throws ConfigException {
private static NetEndpoint createLocalP2pEndpoint(Config config) throws ConfigException {
InetAddress listenAddress = null;
// Local IP, hostname or interface to bind services to
if (config.listen_address != null && config.listen_interface != null) {
Expand Down Expand Up @@ -317,7 +317,7 @@ public static List<NetEndpoint> getSeedList() {
}

public static NetEndpoint getLocalEndpoint() {
return localEndpoint;
return localP2pEndpoint;
}

public static IInternodeAuthenticator getInternodeAuthenticator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException;

import org.lealone.net.NetEndpoint;
import org.lealone.p2p.net.CompactEndpointSerializationHelper;
import org.lealone.p2p.net.IVersionedSerializer;

/**
Expand Down Expand Up @@ -75,14 +74,14 @@ public String toString() {
private static class GossipDigestSerializer implements IVersionedSerializer<GossipDigest> {
@Override
public void serialize(GossipDigest gDigest, DataOutput out, int version) throws IOException {
CompactEndpointSerializationHelper.serialize(gDigest.endpoint, out);
gDigest.endpoint.serialize(out);
out.writeInt(gDigest.generation);
out.writeInt(gDigest.maxVersion);
}

@Override
public GossipDigest deserialize(DataInput in, int version) throws IOException {
NetEndpoint endpoint = CompactEndpointSerializationHelper.deserialize(in);
NetEndpoint endpoint = NetEndpoint.deserialize(in);
int generation = in.readInt();
int maxVersion = in.readInt();
return new GossipDigest(endpoint, generation, maxVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Map;

import org.lealone.net.NetEndpoint;
import org.lealone.p2p.net.CompactEndpointSerializationHelper;
import org.lealone.p2p.net.IVersionedSerializer;
import org.lealone.p2p.net.Message;
import org.lealone.p2p.net.MessageType;
Expand Down Expand Up @@ -70,7 +69,7 @@ public void serialize(GossipDigestAck gDigestAckMessage, DataOutput out, int ver
out.writeInt(gDigestAckMessage.epStateMap.size());
for (Map.Entry<NetEndpoint, EndpointState> entry : gDigestAckMessage.epStateMap.entrySet()) {
NetEndpoint ep = entry.getKey();
CompactEndpointSerializationHelper.serialize(ep, out);
ep.serialize(out);
EndpointState.serializer.serialize(entry.getValue(), out, version);
}
}
Expand All @@ -82,7 +81,7 @@ public GossipDigestAck deserialize(DataInput in, int version) throws IOException
Map<NetEndpoint, EndpointState> epStateMap = new HashMap<>(size);

for (int i = 0; i < size; ++i) {
NetEndpoint ep = CompactEndpointSerializationHelper.deserialize(in);
NetEndpoint ep = NetEndpoint.deserialize(in);
EndpointState epState = EndpointState.serializer.deserialize(in, version);
epStateMap.put(ep, epState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Map;

import org.lealone.net.NetEndpoint;
import org.lealone.p2p.net.CompactEndpointSerializationHelper;
import org.lealone.p2p.net.IVersionedSerializer;
import org.lealone.p2p.net.Message;
import org.lealone.p2p.net.MessageType;
Expand Down Expand Up @@ -62,7 +61,7 @@ public void serialize(GossipDigestAck2 ack2, DataOutput out, int version) throws
out.writeInt(ack2.epStateMap.size());
for (Map.Entry<NetEndpoint, EndpointState> entry : ack2.epStateMap.entrySet()) {
NetEndpoint ep = entry.getKey();
CompactEndpointSerializationHelper.serialize(ep, out);
ep.serialize(out);
EndpointState.serializer.serialize(entry.getValue(), out, version);
}
}
Expand All @@ -73,7 +72,7 @@ public GossipDigestAck2 deserialize(DataInput in, int version) throws IOExceptio
Map<NetEndpoint, EndpointState> epStateMap = new HashMap<>(size);

for (int i = 0; i < size; ++i) {
NetEndpoint ep = CompactEndpointSerializationHelper.deserialize(in);
NetEndpoint ep = NetEndpoint.deserialize(in);
EndpointState epState = EndpointState.serializer.deserialize(in, version);
epStateMap.put(ep, epState);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
* The concrete implementation of this interface would provide the functionality
* for a given verb.
*/

public interface IVerbHandler<T> {
/**
* This method delivers a message to the implementing class (if the implementing
Expand Down
17 changes: 8 additions & 9 deletions lealone-p2p/src/main/java/org/lealone/p2p/net/MessageIn.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@

public class MessageIn<T> {
public final NetEndpoint from;
public final Verb verb;
public final T payload;
public final Map<String, byte[]> parameters;
public final Verb verb;
public final int version;

private MessageIn(NetEndpoint from, T payload, Map<String, byte[]> parameters, Verb verb, int version) {
private MessageIn(NetEndpoint from, Verb verb, T payload, Map<String, byte[]> parameters, int version) {
this.from = from;
this.verb = verb;
this.payload = payload;
this.parameters = parameters;
this.verb = verb;
this.version = version;
}

Expand Down Expand Up @@ -67,22 +67,21 @@ public String toString() {
}

public static MessageIn<?> read(DataInput in, int version, int id) throws IOException {
NetEndpoint from = CompactEndpointSerializationHelper.deserialize(in);
NetEndpoint from = NetEndpoint.deserialize(in);

Verb verb = Verb.values()[in.readInt()];
int parameterCount = in.readInt();
Map<String, byte[]> parameters;
if (parameterCount == 0) {
parameters = Collections.emptyMap();
} else {
HashMap<String, byte[]> map = new HashMap<>(parameterCount);
parameters = new HashMap<>(parameterCount);
for (int i = 0; i < parameterCount; i++) {
String key = in.readUTF();
byte[] value = new byte[in.readInt()];
in.readFully(value);
map.put(key, value);
parameters.put(key, value);
}
parameters = map;
}

int payloadSize = in.readInt();
Expand All @@ -97,9 +96,9 @@ public static MessageIn<?> read(DataInput in, int version, int id) throws IOExce
serializer = callback.serializer;
}
if (payloadSize == 0 || serializer == null)
return new MessageIn<>(from, null, parameters, verb, version);
return new MessageIn<>(from, verb, null, parameters, version);

Object payload = serializer.deserialize(in, version);
return new MessageIn<>(from, payload, parameters, verb, version);
return new MessageIn<>(from, verb, payload, parameters, version);
}
}
29 changes: 13 additions & 16 deletions lealone-p2p/src/main/java/org/lealone/p2p/net/MessageOut.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,11 @@ public MessageOut(Verb verb) {
}

public MessageOut(Verb verb, T payload) {
this(verb, payload, Collections.<String, byte[]> emptyMap());
this(verb, payload, Collections.emptyMap());
}

private MessageOut(Verb verb, T payload, Map<String, byte[]> parameters) {
this(ConfigDescriptor.getLocalEndpoint(), verb, payload, parameters);
}

public MessageOut(NetEndpoint from, Verb verb, T payload, Map<String, byte[]> parameters) {
this.from = from;
this.from = ConfigDescriptor.getLocalEndpoint();
this.verb = verb;
this.payload = payload;
this.parameters = parameters;
Expand All @@ -57,7 +53,7 @@ public MessageOut(NetEndpoint from, Verb verb, T payload, Map<String, byte[]> pa
public MessageOut<T> withParameter(String key, byte[] value) {
HashMap<String, byte[]> map = new HashMap<>(parameters);
map.put(key, value);
return new MessageOut<T>(verb, payload, map);
return new MessageOut<>(verb, payload, map);
}

public Stage getStage() {
Expand All @@ -75,8 +71,8 @@ public String toString() {
return sbuf.toString();
}

public int serialize(Transfer transfer, DataOutput out, int version) throws IOException {
CompactEndpointSerializationHelper.serialize(from, out);
public void serialize(Transfer transfer, DataOutput out, int version) throws IOException {
from.serialize(out);

out.writeInt(verb.ordinal());
out.writeInt(parameters.size());
Expand All @@ -86,13 +82,14 @@ public int serialize(Transfer transfer, DataOutput out, int version) throws IOEx
out.write(entry.getValue());
}

// long longSize = payload == null ? 0 : serializer.serializedSize(payload, version);
// assert longSize <= Integer.MAX_VALUE; // larger values are supported in sstables but not messages
int payloadStartPos = transfer.getDataOutputStreamSize();
// out.writeInt((int) longSize);
out.writeInt(0); // 写设为0,会回填
if (payload != null)
// 先设为0
out.writeInt(0);
if (payload != null) {
int payloadSizeStartPos = transfer.getDataOutputStreamSize();
payload.getSerializer().serialize(payload, out, version);
return payloadStartPos;
// 再回填
int payloadSize = transfer.getDataOutputStreamSize() - payloadSizeStartPos - 4; // 需要减掉4
transfer.setPayloadSize(payloadSizeStartPos, payloadSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,7 @@ private synchronized void sendMessage(MessageOut<?> message, int id, long timest
// int cast cuts off the high-order half of the timestamp, which we can assume remains
// the same between now and when the recipient reconstructs it.
out.writeInt((int) timestamp);
int payloadStartPos = message.serialize(transfer, out, targetVersion);
int size = transfer.getDataOutputStreamSize() - payloadStartPos - 4;
transfer.setPayloadSize(payloadStartPos, size);
message.serialize(transfer, out, targetVersion);
transfer.flush();
}

Expand Down

0 comments on commit f980460

Please sign in to comment.