Skip to content

Commit

Permalink
Merge db63340 into 3f695a9
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed May 7, 2023
2 parents 3f695a9 + db63340 commit 028546e
Show file tree
Hide file tree
Showing 10 changed files with 368 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package redis.clients.jedis;

public abstract class BinaryJedisShardedPubSub extends JedisShardedPubSubBase<byte[]> {

@Override
protected final byte[] encode(byte[] raw) {
return raw;
}
}
8 changes: 8 additions & 0 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -3106,6 +3106,14 @@ public final CommandObject<Long> publish(String channel, String message) {
public final CommandObject<Long> publish(byte[] channel, byte[] message) {
return new CommandObject<>(commandArguments(PUBLISH).add(channel).add(message), BuilderFactory.LONG);
}

public final CommandObject<Long> spublish(String channel, String message) {
return new CommandObject<>(commandArguments(SPUBLISH).key(channel).add(message), BuilderFactory.LONG);
}

public final CommandObject<Long> spublish(byte[] channel, byte[] message) {
return new CommandObject<>(commandArguments(SPUBLISH).key(channel).add(message), BuilderFactory.LONG);
}
// Miscellaneous commands

// RediSearch commands
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -7935,6 +7935,24 @@ public Map<String, Long> pubsubNumSub(String... channels) {
return BuilderFactory.PUBSUB_NUMSUB_MAP.build(connection.getOne());
}

public List<String> pubsubShardChannels() {
checkIsInMultiOrPipeline();
connection.sendCommand(PUBSUB, SHARDCHANNELS);
return connection.getMultiBulkReply();
}

public List<String> pubsubShardChannels(final String pattern) {
checkIsInMultiOrPipeline();
connection.sendCommand(PUBSUB, SHARDCHANNELS.name(), pattern);
return connection.getMultiBulkReply();
}

public Map<String, Long> pubsubShardNumSub(String... channels) {
checkIsInMultiOrPipeline();
connection.sendCommand(PUBSUB, joinParameters(SHARDNUMSUB.name(), channels));
return BuilderFactory.PUBSUB_NUMSUB_MAP.build(connection.getOne());
}

@Override
public Object eval(final String script, final int keyCount, final String... params) {
checkIsInMultiOrPipeline();
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
import java.util.Collections;
import java.util.Map;
import java.util.Set;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.JedisClusterCRC16;

public class JedisCluster extends UnifiedJedis {

Expand Down Expand Up @@ -205,6 +208,28 @@ public Connection getConnectionFromSlot(int slot) {
return ((ClusterConnectionProvider) provider).getConnectionFromSlot(slot);
}

// commands
public long spublish(String channel, String message) {
return executeCommand(commandObjects.spublish(channel, message));
}

public long spublish(byte[] channel, byte[] message) {
return executeCommand(commandObjects.spublish(channel, message));
}

public void ssubscribe(final JedisShardedPubSub jedisPubSub, final String... channels) {
try (Connection connection = getConnectionFromSlot(JedisClusterCRC16.getSlot(channels[0]))) {
jedisPubSub.proceed(connection, channels);
}
}

public void ssubscribe(BinaryJedisShardedPubSub jedisPubSub, final byte[]... channels) {
try (Connection connection = getConnectionFromSlot(JedisClusterCRC16.getSlot(channels[0]))) {
jedisPubSub.proceed(connection, channels);
}
}
// commands

@Override
public ClusterPipeline pipelined() {
return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects);
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/redis/clients/jedis/JedisShardedPubSub.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package redis.clients.jedis;

import redis.clients.jedis.util.SafeEncoder;

public abstract class JedisShardedPubSub extends JedisShardedPubSubBase<String> {

@Override
protected final String encode(byte[] raw) {
return SafeEncoder.encode(raw);
}
}
108 changes: 108 additions & 0 deletions src/main/java/redis/clients/jedis/JedisShardedPubSubBase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package redis.clients.jedis;

import static redis.clients.jedis.Protocol.ResponseKeyword.*;

import java.util.Arrays;
import java.util.List;

import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.exceptions.JedisException;

public abstract class JedisShardedPubSubBase<T> {

private int subscribedChannels = 0;
private volatile Connection client;

public void onSMessage(T channel, T message) {
}

public void onSSubscribe(T channel, int subscribedChannels) {
}

public void onSUnsubscribe(T channel, int subscribedChannels) {
}

private void sendAndFlushCommand(Command command, T... args) {
if (client == null) {
throw new JedisException(getClass() + " is not connected to a Connection.");
}
CommandArguments cargs = new CommandArguments(command).addObjects(args);
client.sendCommand(cargs);
client.flush();
}

public final void sunsubscribe() {
sendAndFlushCommand(Command.SUNSUBSCRIBE);
}

public final void sunsubscribe(T... channels) {
sendAndFlushCommand(Command.SUNSUBSCRIBE, channels);
}

public final void ssubscribe(T... channels) {
sendAndFlushCommand(Command.SSUBSCRIBE, channels);
}

public final boolean isSubscribed() {
return subscribedChannels > 0;
}

public final int getSubscribedChannels() {
return subscribedChannels;
}

public final void proceed(Connection client, T... channels) {
this.client = client;
this.client.setTimeoutInfinite();
try {
ssubscribe(channels);
process();
} finally {
this.client.rollbackTimeout();
}
}

protected abstract T encode(byte[] raw);

// private void process(Client client) {
private void process() {

do {
Object reply = client.getUnflushedObject();

if (reply instanceof List) {
List<Object> listReply = (List<Object>) reply;
final Object firstObj = listReply.get(0);
if (!(firstObj instanceof byte[])) {
throw new JedisException("Unknown message type: " + firstObj);
}
final byte[] resp = (byte[]) firstObj;
if (Arrays.equals(SSUBSCRIBE.getRaw(), resp)) {
subscribedChannels = ((Long) listReply.get(2)).intValue();
final byte[] bchannel = (byte[]) listReply.get(1);
final T enchannel = (bchannel == null) ? null : encode(bchannel);
onSSubscribe(enchannel, subscribedChannels);
} else if (Arrays.equals(SUNSUBSCRIBE.getRaw(), resp)) {
subscribedChannels = ((Long) listReply.get(2)).intValue();
final byte[] bchannel = (byte[]) listReply.get(1);
final T enchannel = (bchannel == null) ? null : encode(bchannel);
onSUnsubscribe(enchannel, subscribedChannels);
} else if (Arrays.equals(SMESSAGE.getRaw(), resp)) {
final byte[] bchannel = (byte[]) listReply.get(1);
final byte[] bmesg = (byte[]) listReply.get(2);
final T enchannel = (bchannel == null) ? null : encode(bchannel);
final T enmesg = (bmesg == null) ? null : encode(bmesg);
onSMessage(enchannel, enmesg);
} else {
System.out.println(redis.clients.jedis.util.SafeEncoder.encodeObject(resp));
throw new JedisException("Unknown message type: " + firstObj);
}
} else {
throw new JedisException("Unknown message type: " + reply);
}
} while (isSubscribed());

// /* Invalidate instance since this thread is no longer listening */
// this.client = null;
}
}
8 changes: 5 additions & 3 deletions src/main/java/redis/clients/jedis/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,12 @@ public static enum Keyword implements Rawable {
STREAMS, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, IDLE, TIME, BLOCK, NOACK,
RETRYCOUNT, STREAM, GROUPS, CONSUMERS, JUSTID, WITHVALUES, NOMKSTREAM, MINID, CREATECONSUMER,
SETUSER, GETUSER, DELUSER, WHOAMI, USERS, CAT, GENPASS, LOG, SAVE, DRYRUN, COPY, AUTH, AUTH2,
NX, XX, EX, PX, EXAT, PXAT, CH, ABSTTL, KEEPTTL, INCR, INFO, CHANNELS, NUMPAT, NUMSUB, NOW, REV,
NX, XX, EX, PX, EXAT, PXAT, CH, ABSTTL, KEEPTTL, INCR, INFO, NOW, REV,
WITHCOORD, WITHDIST, WITHHASH, ANY, FROMMEMBER, FROMLONLAT, BYRADIUS, BYBOX, BYLEX, BYSCORE,
STOREDIST, TO, FORCE, TIMEOUT, DB, UNLOAD, ABORT, IDX, MINMATCHLEN, WITHMATCHLEN, FULL,
DELETE, LIBRARYNAME, WITHCODE, DESCRIPTION, GETKEYS, GETKEYSANDFLAGS, DOCS, FILTERBY, DUMP,
MODULE, ACLCAT, PATTERN, DOCTOR, USAGE, SAMPLES, PURGE, STATS, LOADEX, CONFIG, ARGS;
MODULE, ACLCAT, PATTERN, DOCTOR, USAGE, SAMPLES, PURGE, STATS, LOADEX, CONFIG, ARGS,
CHANNELS, NUMPAT, NUMSUB, SHARDCHANNELS, SHARDNUMSUB;

private final byte[] raw;

Expand Down Expand Up @@ -328,7 +329,8 @@ public byte[] getRaw() {

public static enum ResponseKeyword implements Rawable {

SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, MESSAGE, PMESSAGE, PONG;
SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, MESSAGE, PMESSAGE, PONG,
SSUBSCRIBE, SUNSUBSCRIBE, SMESSAGE;

private final byte[] raw;

Expand Down
15 changes: 7 additions & 8 deletions src/main/java/redis/clients/jedis/util/JedisClusterCRC16.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package redis.clients.jedis.util;

import redis.clients.jedis.exceptions.JedisClusterOperationException;

/**
* CRC16 Implementation according to CCITT standard Polynomial : 1021 (x^16 + x^12 + x^5 + 1) See <a
* href="http://redis.io/topics/cluster-spec">Appendix A. CRC16 reference implementation in ANSI
* C</a>
*/
public final class JedisClusterCRC16 {

private static final int[] LOOKUP_TABLE = { 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5,
0x60C6, 0x70E7, 0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF, 0x1231,
0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6, 0x9339, 0x8318, 0xB37B, 0xA35A,
Expand All @@ -33,13 +32,9 @@ public final class JedisClusterCRC16 {
0x2C83, 0x1CE0, 0x0CC1, 0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0, };

private JedisClusterCRC16() {
throw new InstantiationError("Must not instantiate this class");
}

public static int getSlot(String key) {
if (key == null) {
throw new JedisClusterOperationException("Slot calculation of null is impossible");
throw new NullPointerException("Slot calculation of null is impossible");
}

key = JedisClusterHashTag.getHashTag(key);
Expand All @@ -49,7 +44,7 @@ public static int getSlot(String key) {

public static int getSlot(byte[] key) {
if (key == null) {
throw new JedisClusterOperationException("Slot calculation of null is impossible");
throw new NullPointerException("Slot calculation of null is impossible");
}

int s = -1;
Expand Down Expand Up @@ -97,4 +92,8 @@ public static int getCRC16(String key) {
byte[] bytesKey = SafeEncoder.encode(key);
return getCRC16(bytesKey, 0, bytesKey.length);
}

private JedisClusterCRC16() {
throw new InstantiationError("Must not instantiate this class");
}
}
Loading

0 comments on commit 028546e

Please sign in to comment.