Skip to content

Commit

Permalink
Use Transformation.Kind.id in local and distributed log tables
Browse files Browse the repository at this point in the history
Patch by Sam Tunnicliffe and marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-19516

Co-authored-by: Sam Tunnicliffe <samt@apache.org>
Co-authored-by: Marcus Eriksson <marcuse@apache.org>
  • Loading branch information
krummas and beobal committed Apr 12, 2024
1 parent 3e4f1e2 commit d548396
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
5.1
* Use Transformation.Kind.id in local and distributed log tables (CASSANDRA-19516)
* Remove period field from ClusterMetadata and metadata log tables (CASSANDRA-19482)
* Enrich system_views.pending_hints vtable with hints sizes (CASSANDRA-19486)
* Expose all dropwizard metrics in virtual tables (CASSANDRA-14572)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.cql3.functions;

import java.nio.ByteBuffer;

import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.utils.ByteBufferUtil;

public class ClusterMetadataFcts
{
public static void addFunctionsTo(NativeFunctions functions)
{
functions.add(transformationKind);
}

public static final NativeScalarFunction transformationKind = new TransformationKind();
private static final class TransformationKind extends NativeScalarFunction
{

private TransformationKind()
{
super("transformation_kind", UTF8Type.instance, Int32Type.instance);
}

@Override
public ByteBuffer execute(Arguments arguments) throws InvalidRequestException
{
Number id = arguments.get(0);
if (id.intValue() < 0 || id.intValue() > Transformation.Kind.values().length -1)
throw new InvalidRequestException(id + " is not a valid Transformation.Kind id");

Transformation.Kind kind = Transformation.Kind.fromId(id.intValue());
return ByteBufferUtil.bytes(kind.name());

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class NativeFunctions
MathFcts.addFunctionsTo(this);
MaskingFcts.addFunctionsTo(this);
VectorFcts.addFunctionsTo(this);
ClusterMetadataFcts.addFunctionsTo(this);
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/SystemKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ private SystemKeyspace()
+ "epoch bigint,"
+ "entry_id bigint,"
+ "transformation blob,"
+ "kind text,"
+ "kind int,"
+ "PRIMARY KEY (epoch))")
.partitioner(MetaStrategy.partitioner)
.compaction(CompactionParams.twcs(ImmutableMap.of("compaction_window_unit","DAYS",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public DataSet data()
"FROM %s.%s", METADATA_KEYSPACE_NAME, TABLE_NAME), ConsistencyLevel.QUORUM);
for (UntypedResultSet.Row r : res)
{
Transformation.Kind kind = Transformation.Kind.valueOf(r.getString("kind"));
Transformation.Kind kind = Transformation.Kind.fromId(r.getInt("kind"));
Transformation transformation = kind.fromVersionedBytes(r.getBlob("transformation"));

result.row(r.getLong("epoch"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private DistributedMetadataLogKeyspace(){}
+ "epoch bigint,"
+ "entry_id bigint,"
+ "transformation blob,"
+ "kind text,"
+ "kind int,"
+ "PRIMARY KEY (epoch))";

public static final TableMetadata Log =
Expand All @@ -88,7 +88,7 @@ public static boolean initialize() throws IOException
UntypedResultSet result = QueryProcessor.execute(init, ConsistencyLevel.QUORUM,
FIRST.getEpoch(),
Transformation.Kind.PRE_INITIALIZE_CMS.toVersionedBytes(PreInitialize.blank()),
Transformation.Kind.PRE_INITIALIZE_CMS.toString(),
Transformation.Kind.PRE_INITIALIZE_CMS.id,
Entry.Id.NONE.entryId);

UntypedResultSet.Row row = result.one();
Expand All @@ -97,7 +97,7 @@ public static boolean initialize() throws IOException

if (row.getLong("epoch") == FIRST.getEpoch() &&
row.getLong("entry_id") == Entry.Id.NONE.entryId &&
Transformation.Kind.PRE_INITIALIZE_CMS.toString().equals(row.getString("kind")))
Transformation.Kind.PRE_INITIALIZE_CMS.id == row.getInt("kind"))
return true;

throw new IllegalStateException("Could not initialize log.");
Expand Down Expand Up @@ -137,7 +137,7 @@ public static boolean tryCommit(Entry.Id entryId,
nextEpoch.getEpoch(),
entryId.entryId,
serializedEvent,
transform.kind().toString());
transform.kind().id);

return result.one().getBoolean("[applied]");
}
Expand Down Expand Up @@ -191,7 +191,7 @@ public EntryHolder getEntries(Epoch since) throws IOException
{
long entryId = row.getLong("entry_id");
Epoch epoch = Epoch.create(row.getLong("epoch"));
Transformation.Kind kind = Transformation.Kind.valueOf(row.getString("kind"));
Transformation.Kind kind = Transformation.Kind.fromId(row.getInt("kind"));
Transformation transform = kind.fromVersionedBytes(row.getBlob("transformation"));
entryHolder.add(new Entry(new Entry.Id(entryId), epoch, transform));
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/tcm/Transformation.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ enum Kind
;

private final Supplier<AsymmetricMetadataSerializer<Transformation, ? extends Transformation>> serializer;
private final int id;
public final int id;

private static final Kind[] idToKindMap;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void append(Entry entry)
ByteBuffer serializedTransformation = entry.transform.kind().toVersionedBytes(entry.transform);
String query = String.format("INSERT INTO %s.%s (epoch, entry_id, transformation, kind) VALUES (?,?,?,?)",
SchemaConstants.SYSTEM_KEYSPACE_NAME, NAME);
executeInternal(query, entry.epoch.getEpoch(), entry.id.entryId, serializedTransformation, entry.transform.kind().toString());
executeInternal(query, entry.epoch.getEpoch(), entry.id.entryId, serializedTransformation, entry.transform.kind().id);
// todo; should probably not flush every time, but it simplifies tests
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(NAME).forceBlockingFlush(ColumnFamilyStore.FlushReason.INTERNALLY_FORCED);
}
Expand Down Expand Up @@ -149,7 +149,7 @@ private static EntryHolder toEntryHolder(Epoch since, UntypedResultSet resultSet
{
long entryId = row.getLong("entry_id");
Epoch epoch = Epoch.create(row.getLong("epoch"));
Transformation.Kind kind = Transformation.Kind.valueOf(row.getString("kind"));
Transformation.Kind kind = Transformation.Kind.fromId(row.getInt("kind"));
Transformation transform = kind.fromVersionedBytes(row.getBlob("transformation"));
holder.add(new Entry(new Entry.Id(entryId), epoch, transform));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.cql3.functions;

import java.nio.charset.CharacterCodingException;

import org.junit.Test;

import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.assertj.core.api.Assertions;

import static org.apache.cassandra.cql3.functions.ClusterMetadataFcts.transformationKind;
import static org.junit.Assert.assertEquals;

public class ClusterMetadataFctsTest
{

@Test
public void testTransformationKind() throws CharacterCodingException
{
int max = -1;
for (Transformation.Kind kind : Transformation.Kind.values())
{
Arguments arguments = transformationKind.newArguments(ProtocolVersion.CURRENT);
arguments.set(0, Int32Type.instance.decompose(kind.id));
assertEquals(kind.name(), ByteBufferUtil.string(transformationKind.execute(arguments)));
if (kind.id > max)
max = kind.id;
}

for (int boundary : new int[]{-1, max+1})
{
Arguments arguments = transformationKind.newArguments(ProtocolVersion.CURRENT);
arguments.set(0, Int32Type.instance.decompose(boundary));
try
{
transformationKind.execute(arguments);
}
catch (Exception e)
{
Assertions.assertThat(e)
.isInstanceOf(InvalidRequestException.class)
.hasMessageContaining(boundary + " is not a valid Transformation.Kind id");
}
}

}
}

0 comments on commit d548396

Please sign in to comment.