Skip to content

Commit

Permalink
Add new TriggersPolicy configuration to allow operators to disable tr…
Browse files Browse the repository at this point in the history
…iggers

patch by Abe Ratnofsky; reviewed by Stefan Miklosovic and Sam Tunnicliffe for CASSANDRA-19532
  • Loading branch information
aratno authored and smiklosovic committed Apr 16, 2024
1 parent 9abd84b commit 8d705b3
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
5.1
* Add new TriggersPolicy configuration to allow operators to disable triggers (CASSANDRA-19532)
* Use Transformation.Kind.id in local and distributed log tables (CASSANDRA-19516)
* Remove period field from ClusterMetadata and metadata log tables (CASSANDRA-19482)
* Enrich system_views.pending_hints vtable with hints sizes (CASSANDRA-19486)
Expand Down
6 changes: 6 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1691,6 +1691,12 @@ trace_type_repair_ttl: 7d
# As of Cassandra 3.0 there is a sandbox in place that should prevent execution of evil code.
user_defined_functions_enabled: false

# Triggers are enabled by default.
# `enabled` executes queries and their triggers.
# `disabled` executes queries but skips trigger execution, and logs a warning.
# `forbidden` fails queries that would execute triggers with TriggerDisabledException.
triggers_policy: enabled

# Enables encrypting data at-rest (on disk). Different key providers can be plugged in, but the default reads from
# a JCE-style keystore. A single keystore can hold multiple keys, but the one referenced by
# the "key_alias" is the only key that will be used for encrypt opertaions; previously used keys
Expand Down
6 changes: 6 additions & 0 deletions conf/cassandra_latest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1655,6 +1655,12 @@ trace_type_repair_ttl: 7d
# As of Cassandra 3.0 there is a sandbox in place that should prevent execution of evil code.
user_defined_functions_enabled: false

# Triggers are enabled by default.
# `enabled` executes queries and their triggers.
# `disabled` executes queries but skips trigger execution, and logs a warning.
# `forbidden` fails queries that would execute triggers with TriggerDisabledException.
triggers_policy: enabled

# Enables encrypting data at-rest (on disk). Different key providers can be plugged in, but the default reads from
# a JCE-style keystore. A single keystore can hold multiple keys, but the one referenced by
# the "key_alias" is the only key that will be used for encrypt opertaions; previously used keys
Expand Down
4 changes: 4 additions & 0 deletions doc/modules/cassandra/pages/developing/cql/triggers.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,7 @@ For instance:
----
include::cassandra:example$CQL/drop_trigger.cql[]
----

Triggers can be disabled in two steps. `triggers_policy` is `enabled` by default, which runs all created triggers as
mutations are executed. `disabled` skips trigger execution but otherwise executes query operations as normal (and logs a
warning). `forbidden` will fail queries that would execute triggers.
12 changes: 12 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1300,4 +1300,16 @@ public static void log(Config config)
public volatile DurationSpec.LongMillisecondsBound progress_barrier_backoff = new DurationSpec.LongMillisecondsBound("1000ms");
public volatile DurationSpec.LongSecondsBound discovery_timeout = new DurationSpec.LongSecondsBound("30s");
public boolean unsafe_tcm_mode = false;

public enum TriggersPolicy
{
// Execute triggers
enabled,
// Don't execute triggers when executing queries
disabled,
// Throw an exception when attempting to execute a trigger
forbidden
}

public TriggersPolicy triggers_policy = TriggersPolicy.enabled;
}
12 changes: 12 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -5158,4 +5158,16 @@ public static int getSaiSSTableIndexesPerQueryFailThreshold()
{
return conf.sai_sstable_indexes_per_query_fail_threshold;
}

@VisibleForTesting
public static void setTriggersPolicy(Config.TriggersPolicy policy)
{
logger.info("triggers_policy set to {}", policy);
conf.triggers_policy = policy;
}

public static Config.TriggersPolicy getTriggersPolicy()
{
return conf.triggers_policy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.cassandra.audit.AuditLogEntryType;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.QualifiedName;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
import org.apache.cassandra.service.ClientState;
Expand Down Expand Up @@ -72,11 +73,13 @@ public Keyspaces apply(ClusterMetadata metadata)

try
{
TriggerExecutor.instance.loadTriggerInstance(triggerClass);
TriggerExecutor.instance.loadTriggerClass(triggerClass);
}
catch (Exception e)
{
throw ire("Trigger class '%s' couldn't be loaded", triggerClass);
InvalidRequestException thrown = ire("Trigger class '%s' couldn't be loaded", triggerClass);
thrown.initCause(e);
throw thrown;
}

TableMetadata newTable = table.withSwapped(table.triggers.with(TriggerMetadata.create(triggerName, triggerClass)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.triggers;

import org.apache.cassandra.exceptions.InvalidRequestException;

public class TriggerDisabledException extends InvalidRequestException
{
public TriggerDisabledException(String message)
{
super(message);
}
}
35 changes: 35 additions & 0 deletions src/java/org/apache/cassandra/triggers/TriggerExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@

import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionUpdate;
Expand All @@ -37,10 +43,14 @@
import org.apache.cassandra.schema.TriggerMetadata;
import org.apache.cassandra.schema.Triggers;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.Pair;

public class TriggerExecutor
{
private static final Logger logger = LoggerFactory.getLogger(TriggerExecutor.class);
private static final NoSpamLogger skippedTriggerLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);

public static final TriggerExecutor instance = new TriggerExecutor();

private final Map<String, ITrigger> cachedTriggers = Maps.newConcurrentMap();
Expand Down Expand Up @@ -220,6 +230,16 @@ private List<Mutation> executeInternal(PartitionUpdate update)
Triggers triggers = update.metadata().triggers;
if (triggers.isEmpty())
return null;
Config.TriggersPolicy policy = DatabaseDescriptor.getTriggersPolicy();
if (policy == Config.TriggersPolicy.disabled)
{
skippedTriggerLogger.warn("Skipping execution of triggers due to configuration TriggersPolicy.disabled: {}", triggers);
return null;
}
if (policy == Config.TriggersPolicy.forbidden)
{
throw new TriggerDisabledException(String.format("Triggers are present but TriggersPolicy.forbidden is configured. Failing query that would execute triggers: %s", triggers));
}
List<Mutation> tmutations = Lists.newLinkedList();
Thread.currentThread().setContextClassLoader(customClassLoader);
try
Expand Down Expand Up @@ -252,8 +272,23 @@ private List<Mutation> executeInternal(PartitionUpdate update)
}
}

public synchronized void loadTriggerClass(String triggerClass) throws Exception
{
// Allow loading the class regardless of Config, since this could happen as part of TCM replay via
// CreateTriggerStatement#apply.
// Check that triggerClass is available on the classpath, but do not initialize the class since that would
// execute static blocks.
customClassLoader.loadClass(triggerClass).getConstructor();
}

public synchronized ITrigger loadTriggerInstance(String triggerClass) throws Exception
{
Config.TriggersPolicy policy = DatabaseDescriptor.getTriggersPolicy();
if (policy == Config.TriggersPolicy.disabled || policy == Config.TriggersPolicy.forbidden)
{
throw new TriggerDisabledException(String.format("Refusing to load new trigger class %s with TriggersPolicy.%s", triggerClass, policy));
}

// double check.
if (cachedTriggers.get(triggerClass) != null)
return cachedTriggers.get(triggerClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class DatabaseDescriptorRefTest
"org.apache.cassandra.config.Config$PaxosVariant",
"org.apache.cassandra.config.Config$RepairCommandPoolFullStrategy",
"org.apache.cassandra.config.Config$SSTableConfig",
"org.apache.cassandra.config.Config$TriggersPolicy",
"org.apache.cassandra.config.Config$UserFunctionTimeoutPolicy",
"org.apache.cassandra.config.ConfigBeanInfo",
"org.apache.cassandra.config.ConfigCustomizer",
Expand Down
98 changes: 91 additions & 7 deletions test/unit/org/apache/cassandra/triggers/TriggersTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,31 @@

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.config.Config.TriggersPolicy;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.assertj.core.api.Assertions;

import static org.apache.cassandra.utils.ByteBufferUtil.toInt;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
Expand All @@ -42,6 +52,8 @@

public class TriggersTest
{
private static final Logger logger = LoggerFactory.getLogger(TriggersTest.class);
private TriggersPolicy originalTriggersPolicy;
private static boolean triggerCreated = false;

private static String ksName = "triggers_test_ks";
Expand All @@ -58,6 +70,7 @@ public static void beforeTest() throws ConfigurationException
public void setup() throws Exception
{
StorageService.instance.initServer();
originalTriggersPolicy = DatabaseDescriptor.getTriggersPolicy();

String cql = String.format("CREATE KEYSPACE IF NOT EXISTS %s " +
"WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}",
Expand All @@ -70,8 +83,10 @@ public void setup() throws Exception
cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (k int, v1 int, v2 int, PRIMARY KEY (k))", ksName, otherCf);
QueryProcessor.process(cql, ConsistencyLevel.ONE);

DatabaseDescriptor.setTriggersPolicy(TriggersPolicy.enabled);

// no conditional execution of create trigger stmt yet
if (! triggerCreated)
if (!triggerCreated)
{
cql = String.format("CREATE TRIGGER trigger_1 ON %s.%s USING '%s'",
ksName, cfName, TestTrigger.class.getName());
Expand All @@ -80,12 +95,56 @@ public void setup() throws Exception
}
}

@After
public void after()
{
DatabaseDescriptor.setTriggersPolicy(originalTriggersPolicy);
}

@Test
public void testTriggersPolicy()
{
QueryProcessor.process(String.format("INSERT INTO %s.%s (k, v1) VALUES (0, 0)", ksName, cfName), ConsistencyLevel.ONE);
assertUpdateIsAugmented(0, "v1", 0);
QueryProcessor.process(String.format("DELETE FROM %s.%s WHERE k = 0", ksName, cfName), ConsistencyLevel.ONE);

DatabaseDescriptor.setTriggersPolicy(TriggersPolicy.disabled);
QueryProcessor.process(String.format("INSERT INTO %s.%s (k, v1) VALUES (0, 0)", ksName, cfName), ConsistencyLevel.ONE);

UntypedResultSet rs = QueryProcessor.process(String.format("SELECT * FROM %s.%s WHERE k=%s", ksName, cfName, 0), ConsistencyLevel.ONE);
assertRowValue(rs.one(), 0, "v1", 0); // from original update
assertEquals(-1, rs.one().getInt("v2", -1)); // from trigger
QueryProcessor.process(String.format("DELETE FROM %s.%s WHERE k = 0", ksName, cfName), ConsistencyLevel.ONE);

DatabaseDescriptor.setTriggersPolicy(TriggersPolicy.forbidden);
Assertions.assertThatThrownBy(() -> {
QueryProcessor.process(String.format("INSERT INTO %s.%s (k, v1) VALUES (0, 0)", ksName, cfName), ConsistencyLevel.ONE);
})
.isInstanceOf(TriggerDisabledException.class)
.hasMessageContaining(TestTrigger.class.getName());
}

@Test
public void triggerClassNotInitializedOnCreate()
{
for (TriggersPolicy policy : new TriggersPolicy[]{TriggersPolicy.disabled, TriggersPolicy.forbidden})
{
DatabaseDescriptor.setTriggersPolicy(policy);
String cql = String.format("CREATE TRIGGER initializationdetector ON %s.%s USING '%s'", ksName, cfName, InitializationDetector.class.getName());
QueryProcessor.process(cql, ConsistencyLevel.ONE);
Assert.assertFalse(INITIALIZATION_DETECTOR_MARKER.get());

cql = String.format("DROP TRIGGER initializationdetector ON %s.%s", ksName, cfName);
QueryProcessor.process(cql, ConsistencyLevel.ONE);
}
}

@Test
public void executeTriggerOnCqlInsert() throws Exception
{
String cql = String.format("INSERT INTO %s.%s (k, v1) VALUES (0, 0)", ksName, cfName);
String cql = String.format("INSERT INTO %s.%s (k, v1) VALUES (3, 3)", ksName, cfName);
QueryProcessor.process(cql, ConsistencyLevel.ONE);
assertUpdateIsAugmented(0, "v1", 0);
assertUpdateIsAugmented(3, "v1", 3);
}

@Test
Expand Down Expand Up @@ -190,7 +249,6 @@ private void assertUpdateIsAugmented(int key, String originColumnName, Object or
assertRowValue(rs.one(), key, "v2", 999); // from trigger
assertRowValue(rs.one(), key, originColumnName, originColumnValue); // from original update
}

private void assertRowValue(UntypedResultSet.Row row, int key, String columnName, Object columnValue)
{
assertTrue(String.format("Expected value (%s) for augmented cell %s was not found", key, columnName),
Expand All @@ -209,7 +267,8 @@ public static class TestTrigger implements ITrigger
{
public Collection<Mutation> augment(Partition partition)
{
RowUpdateBuilder update = new RowUpdateBuilder(partition.metadata(), FBUtilities.timestampMicros(), partition.partitionKey().getKey());
// Use a fixed early timestamp so this update can be deleted
RowUpdateBuilder update = new RowUpdateBuilder(partition.metadata(), 1L, partition.partitionKey().getKey());
update.add("v2", 999);

return Collections.singletonList(update.build());
Expand Down Expand Up @@ -247,4 +306,29 @@ public Collection<Mutation> augment(Partition partition)
throw new org.apache.cassandra.exceptions.InvalidRequestException(MESSAGE);
}
}

public static class NoOpTrigger implements ITrigger
{
public Collection<Mutation> augment(Partition partition)
{
return null;
}
}

// This is not part of InitializationDetector because if it was, accessing it would initialize the class
final static AtomicBoolean INITIALIZATION_DETECTOR_MARKER = new AtomicBoolean();
public static class InitializationDetector implements ITrigger
{

static
{
logger.info("{} static block was executed", InitializationDetector.class.getSimpleName());
INITIALIZATION_DETECTOR_MARKER.set(true);
}

public Collection<Mutation> augment(Partition partition)
{
return null;
}
}
}

0 comments on commit 8d705b3

Please sign in to comment.