Skip to content

Commit

Permalink
Revisit metadata log schema to remove period field
Browse files Browse the repository at this point in the history
Patch by Sam Tunnicliffe and Marcus Eriksson; reviewed by Alex Petrov for CASSANDRA-19482

Co-authored-by: Marcus Eriksson <marcuse@apache.org>
Co-authored-by: Sam Tunnicliffe <samt@apache.org>
  • Loading branch information
beobal and krummas committed Apr 11, 2024
1 parent c156258 commit 728b9ec
Show file tree
Hide file tree
Showing 96 changed files with 1,264 additions and 991 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
5.1
* Remove period field from ClusterMetadata and metadata log tables (CASSANDRA-19482)
* Enrich system_views.pending_hints vtable with hints sizes (CASSANDRA-19486)
* Expose all dropwizard metrics in virtual tables (CASSANDRA-14572)
* Ensured that PropertyFileSnitchTest do not overwrite cassandra-toploogy.properties (CASSANDRA-19502)
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/db/PartitionPosition.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
*/
package org.apache.cassandra.db;

import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.cassandra.dht.*;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
Expand Down Expand Up @@ -100,7 +100,7 @@ public void serialize(PartitionPosition pos, DataOutputPlus out, int version) th
Token.serializer.serialize(pos.getToken(), out, version);
}

public PartitionPosition deserialize(DataInput in, IPartitioner p, int version) throws IOException
public PartitionPosition deserialize(DataInputPlus in, IPartitioner p, int version) throws IOException
{
Kind kind = Kind.fromOrdinal(in.readByte());
if (kind == Kind.ROW_KEY)
Expand Down
43 changes: 29 additions & 14 deletions src/java/org/apache/cassandra/db/SystemKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -72,7 +71,6 @@
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.ReversedLongLocalPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.EndpointState;
Expand All @@ -86,6 +84,7 @@
import org.apache.cassandra.io.util.RebufferingInputStream;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.MetaStrategy;
import org.apache.cassandra.metrics.RestorableMeter;
import org.apache.cassandra.metrics.TopPartitionTracker;
import org.apache.cassandra.net.MessagingService;
Expand Down Expand Up @@ -487,16 +486,14 @@ private SystemKeyspace()
parse(METADATA_LOG,
"Local Metadata Log",
"CREATE TABLE %s ("
+ "period bigint,"
+ "current_epoch bigint static,"
+ "epoch bigint,"
+ "entry_id bigint,"
+ "transformation blob,"
+ "kind text,"
+ "PRIMARY KEY (period, epoch))")
+ "PRIMARY KEY (epoch))")
.partitioner(MetaStrategy.partitioner)
.compaction(CompactionParams.twcs(ImmutableMap.of("compaction_window_unit","DAYS",
"compaction_window_size","1")))
.partitioner(new LocalPartitioner(LongType.instance))
.build();

public static final TableMetadata Snapshots = parse(SNAPSHOT_TABLE_NAME,
Expand All @@ -505,7 +502,7 @@ private SystemKeyspace()
"epoch bigint PRIMARY KEY," +
"period bigint," +
"snapshot blob)")
.partitioner(ReversedLongLocalPartitioner.instance)
.partitioner(MetaStrategy.partitioner)
.build();

@Deprecated(since = "4.0")
Expand Down Expand Up @@ -1878,7 +1875,7 @@ private static Range<Token> byteBufferToRange(ByteBuffer rawRange, IPartitioner
try
{
// See rangeToBytes above for why version is 0.
return (Range<Token>) Range.tokenSerializer.deserialize(ByteStreams.newDataInput(ByteBufferUtil.getArray(rawRange)),
return (Range<Token>) Range.tokenSerializer.deserialize(new DataInputBuffer(ByteBufferUtil.getArray(rawRange)),
partitioner,
0);
}
Expand Down Expand Up @@ -1985,22 +1982,27 @@ public static TopPartitionTracker.StoredTopPartitions getTopPartitions(TableMeta
}
}

public static void storeSnapshot(Epoch epoch, long period, ByteBuffer snapshot)
public static void storeSnapshot(Epoch epoch, ByteBuffer snapshot)
{
logger.info("Storing snapshot of cluster metadata at epoch {} (period {})", epoch, period);
String query = String.format("INSERT INTO %s.%s (epoch, period, snapshot) VALUES (?, ?, ?)", SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME);
executeInternal(query, epoch.getEpoch(), period, snapshot);
Preconditions.checkArgument(epoch.isAfter(Epoch.FIRST), "Cannot store a snapshot for an epoch less than " + Epoch.FIRST.getEpoch());
logger.info("Storing snapshot of cluster metadata at epoch {}", epoch);
String query = String.format("INSERT INTO %s.%s (epoch, snapshot) VALUES (?, ?)", SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME);
executeInternal(query, epoch.getEpoch(), snapshot);
forceBlockingFlush(SNAPSHOT_TABLE_NAME);
}

public static ByteBuffer getSnapshot(Epoch epoch)
{
Preconditions.checkArgument(epoch.isAfter(Epoch.FIRST), "Cannot retrieve a snapshot for an epoch less than " + Epoch.FIRST.getEpoch());
logger.info("Getting snapshot of epoch = {}", epoch);
String query = String.format("SELECT snapshot FROM %s.%s WHERE epoch = ?", SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME);
UntypedResultSet res = executeInternal(query, epoch.getEpoch());
if (res == null || res.isEmpty())
return null;
return res.one().getBytes("snapshot").duplicate();
ByteBuffer bytes = res.one().getBytes("snapshot");
if (bytes == null || !bytes.hasRemaining())
return null;
return bytes.duplicate();
}

/**
Expand All @@ -2020,14 +2022,27 @@ public static ByteBuffer getSnapshot(Epoch epoch)
*/
public static ByteBuffer findSnapshotBefore(Epoch search)
{
// during gossip upgrade we have epoch = Long.MIN_VALUE + 1 (and the reverse partitioner doesn't support negative keys)
search = search.isBefore(Epoch.EMPTY) ? Epoch.EMPTY : search;
String query = String.format("SELECT snapshot FROM %s.%s WHERE token(epoch) >= token(?) LIMIT 1", SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME);

UntypedResultSet res = executeInternal(query, search.getEpoch());
if (res != null && !res.isEmpty())
return res.one().getBytes("snapshot").duplicate();
return null;
}

public static List<Epoch> listSnapshotsSince(Epoch search)
{
// during gossip upgrade we have epoch = Long.MIN_VALUE + 1 (and the reverse partitioner doesn't support negative keys)
search = search.isBefore(Epoch.EMPTY) ? Epoch.EMPTY : search;
String query = String.format("SELECT epoch FROM %s.%s WHERE token(epoch) < token(?)", SchemaConstants.SYSTEM_KEYSPACE_NAME, SNAPSHOT_TABLE_NAME);
UntypedResultSet res = executeInternal(query, search.getEpoch());
if (res == null)
return Collections.emptyList();

return res.stream().map(row -> Epoch.create(row.getLong("epoch"))).collect(Collectors.toList());
}

/**
* Find the latest snapshot we have in the log.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.TimestampType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.locator.MetaStrategy;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.tcm.Transformation;

Expand All @@ -36,7 +36,6 @@

final class ClusterMetadataLogTable extends AbstractVirtualTable
{
private static final String PERIOD = "period";
private static final String EPOCH = "epoch";
private static final String KIND = "kind";
private static final String TRANSFORMATION = "transformation";
Expand All @@ -48,9 +47,8 @@ final class ClusterMetadataLogTable extends AbstractVirtualTable
super(TableMetadata.builder(keyspace, "cluster_metadata_log")
.comment("cluster metadata log")
.kind(TableMetadata.Kind.VIRTUAL)
.partitioner(new LocalPartitioner(LongType.instance))
.addPartitionKeyColumn(PERIOD, LongType.instance)
.addClusteringColumn(EPOCH, LongType.instance)
.partitioner(MetaStrategy.partitioner)
.addPartitionKeyColumn(EPOCH, LongType.instance)
.addRegularColumn(KIND, UTF8Type.instance)
.addRegularColumn(TRANSFORMATION, UTF8Type.instance)
.addRegularColumn(ENTRY_ID, LongType.instance)
Expand All @@ -64,14 +62,14 @@ public DataSet data()
try
{
SimpleDataSet result = new SimpleDataSet(metadata());
UntypedResultSet res = execute(format("SELECT period, epoch, kind, transformation, entry_id, writetime(kind) as wt " +
UntypedResultSet res = execute(format("SELECT epoch, kind, transformation, entry_id, writetime(kind) as wt " +
"FROM %s.%s", METADATA_KEYSPACE_NAME, TABLE_NAME), ConsistencyLevel.QUORUM);
for (UntypedResultSet.Row r : res)
{
Transformation.Kind kind = Transformation.Kind.valueOf(r.getString("kind"));
Transformation transformation = kind.fromVersionedBytes(r.getBlob("transformation"));

result.row(r.getLong("period"), r.getLong("epoch"))
result.row(r.getLong("epoch"))
.column(KIND, kind.toString())
.column(TRANSFORMATION, transformation.toString())
.column(ENTRY_ID, r.getLong("entry_id"))
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/dht/AbstractBounds.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.cassandra.dht;

import java.io.DataInput;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
Expand All @@ -27,6 +26,7 @@
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.Pair;
Expand Down Expand Up @@ -195,7 +195,7 @@ public void serialize(AbstractBounds<T> range, DataOutputPlus out, int version)
serializer.serialize(range.right, out, version);
}

public AbstractBounds<T> deserialize(DataInput in, IPartitioner p, int version) throws IOException
public AbstractBounds<T> deserialize(DataInputPlus in, IPartitioner p, int version) throws IOException
{
boolean isToken, startInclusive, endInclusive;
// !WARNING! See serialize method above for why we still need to have that condition.
Expand Down
15 changes: 0 additions & 15 deletions src/java/org/apache/cassandra/dht/IPartitioner.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.cassandra.dht;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -37,20 +36,6 @@ static IPartitioner global()
return DatabaseDescriptor.getPartitioner();
}

static void validate(Collection<? extends AbstractBounds<?>> allBounds)
{
for (AbstractBounds<?> bounds : allBounds)
validate(bounds);
}

static void validate(AbstractBounds<?> bounds)
{
if (global() != bounds.left.getPartitioner())
throw new AssertionError(String.format("Partitioner in bounds serialization. Expected %s, was %s.",
global().getClass().getName(),
bounds.left.getPartitioner().getClass().getName()));
}

/**
* Transform key to object representation of the on-disk format.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
*/
package org.apache.cassandra.dht;

import java.io.DataInput;
import java.io.IOException;

import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;

/**
Expand Down Expand Up @@ -49,7 +49,7 @@ public interface IPartitionerDependentSerializer<T>
* @return the type that was deserialized
* @throws IOException if deserialization fails
*/
public T deserialize(DataInput in, IPartitioner p, int version) throws IOException;
public T deserialize(DataInputPlus in, IPartitioner p, int version) throws IOException;

/**
* Calculate serialized size of object without actually serializing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.cassandra.dht;

import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
Expand All @@ -30,7 +31,9 @@
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.cassandra.utils.bytecomparable.ByteSource;
import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
Expand All @@ -53,9 +56,14 @@ public DecoratedKey decorateKey(ByteBuffer key)
return new CachedHashDecoratedKey(getToken(key), key); // CachedHashDecoratedKey is used for bloom filter hash calculation
}

public Token midpoint(Token left, Token right)
public Token midpoint(Token ltoken, Token rtoken)
{
throw new UnsupportedOperationException();
// the symbolic MINIMUM token should act as ZERO: the empty bit array
BigInteger left = ltoken.equals(MIN_TOKEN) ? BigInteger.ZERO : BigInteger.valueOf(ltoken.getLongValue());
BigInteger right = rtoken.equals(MIN_TOKEN) ? BigInteger.ZERO : BigInteger.valueOf(rtoken.getLongValue());
Pair<BigInteger, Boolean> midpair = FBUtilities.midpoint(left, right, 63);
// discard the remainder
return new ReversedLongLocalToken(midpair.left.longValue());
}

public Token split(Token left, Token right, double ratioToLeft)
Expand Down Expand Up @@ -174,6 +182,12 @@ public Object getTokenValue()
return token;
}

@Override
public long getLongValue()
{
return token;
}

@Override
public ByteSource asComparableBytes(ByteComparable.Version version)
{
Expand Down
17 changes: 14 additions & 3 deletions src/java/org/apache/cassandra/dht/Token.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.cassandra.dht;

import java.io.DataInput;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -99,7 +98,13 @@ public static class MetadataSerializer implements PartitionerAwareMetadataSerial
{
private static final int SERDE_VERSION = MessagingService.VERSION_40;

// Convenience method as Token has a reference to its Partitioner
public void serialize(Token t, DataOutputPlus out, Version version) throws IOException
{
serialize(t, out, t.getPartitioner(), version);
}

public void serialize(Token t, DataOutputPlus out, IPartitioner partitioner, Version version) throws IOException
{
serializer.serialize(t, out, SERDE_VERSION);
}
Expand All @@ -111,7 +116,13 @@ public Token deserialize(DataInputPlus in, IPartitioner partitioner, Version ver
return serializer.deserialize(in, partitioner, SERDE_VERSION);
}

// Convenience method as Token has a reference to its Partitioner
public long serializedSize(Token t, Version version)
{
return serializedSize(t, t.getPartitioner(), version);
}

public long serializedSize(Token t, IPartitioner partitioner, Version version)
{
return serializer.serializedSize(t, SERDE_VERSION);
}
Expand All @@ -126,15 +137,15 @@ public void serialize(Token token, DataOutputPlus out, int version) throws IOExc
p.getTokenFactory().serialize(token, out);
}

public Token deserialize(DataInput in, IPartitioner p, int version) throws IOException
public Token deserialize(DataInputPlus in, IPartitioner p, int version) throws IOException
{
int size = deserializeSize(in);
byte[] bytes = new byte[size];
in.readFully(bytes);
return p.getTokenFactory().fromByteArray(ByteBuffer.wrap(bytes));
}

public int deserializeSize(DataInput in) throws IOException
public int deserializeSize(DataInputPlus in) throws IOException
{
return in.readInt();
}
Expand Down
Loading

0 comments on commit 728b9ec

Please sign in to comment.