Skip to content

Commit

Permalink
feat: add ARRAY_LENGTH UDF (#4725)
Browse files Browse the repository at this point in the history
* fix: add ARRAY_LENGTH UDF

fixes: #4724

`ARRAY_LENGTH` returns the length of any array passed to it, or `0` if null is passed.
  • Loading branch information
big-andy-coates committed Mar 9, 2020
1 parent 0f7de31 commit 31a9d9d
Show file tree
Hide file tree
Showing 9 changed files with 516 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.array;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import java.util.List;

/**
* Returns the length of an array
*/
@SuppressWarnings("MethodMayBeStatic") // UDF methods can not be static.
@UdfDescription(name = "ARRAY_LENGTH", description = "Returns the length on an array")
public class ArrayLength {

@Udf
public <T> int calcArrayLength(
@UdfParameter(description = "The array") final List<T> array
) {
return array == null
? 0
: array.size();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.array;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import com.google.common.collect.ImmutableList;
import org.junit.Before;
import org.junit.Test;

public class ArrayLengthTest {

private ArrayLength udf;

@Before
public void setUp() {
udf = new ArrayLength();
}

@Test
public void shouldReturnZeroForNullArray() {
assertThat(udf.calcArrayLength(null), is(0));
}

@Test
public void shouldReturnArraySize() {
assertThat(udf.calcArrayLength(ImmutableList.of()), is(0));
assertThat(udf.calcArrayLength(ImmutableList.of(1)), is(1));
assertThat(udf.calcArrayLength(ImmutableList.of("one", "two")), is(2));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (BOOLEAN_ARRAY ARRAY<BOOLEAN>, INT_ARRAY ARRAY<INTEGER>, BIGINT_ARRAY ARRAY<BIGINT>, DOUBLE_ARRAY ARRAY<DOUBLE>, STRING_ARRAY ARRAY<STRING>, DECIMAL_ARRAY ARRAY<DECIMAL(2, 1)>) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ROWKEY` STRING KEY, `BOOLEAN_ARRAY` ARRAY<BOOLEAN>, `INT_ARRAY` ARRAY<INTEGER>, `BIGINT_ARRAY` ARRAY<BIGINT>, `DOUBLE_ARRAY` ARRAY<DOUBLE>, `STRING_ARRAY` ARRAY<STRING>, `DECIMAL_ARRAY` ARRAY<DECIMAL(2, 1)>",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT\n ARRAY_LENGTH(INPUT.BOOLEAN_ARRAY) BOOLEAN_LEN,\n ARRAY_LENGTH(INPUT.INT_ARRAY) INT_LEN,\n ARRAY_LENGTH(INPUT.BIGINT_ARRAY) BIGINT_LEN,\n ARRAY_LENGTH(INPUT.DOUBLE_ARRAY) DOUBLE_LEN,\n ARRAY_LENGTH(INPUT.STRING_ARRAY) STRING_LEN,\n ARRAY_LENGTH(INPUT.DECIMAL_ARRAY) DECIMAL_LEN\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ROWKEY` STRING KEY, `BOOLEAN_LEN` INTEGER, `INT_LEN` INTEGER, `BIGINT_LEN` INTEGER, `DOUBLE_LEN` INTEGER, `STRING_LEN` INTEGER, `DECIMAL_LEN` INTEGER",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` STRING KEY, `BOOLEAN_ARRAY` ARRAY<BOOLEAN>, `INT_ARRAY` ARRAY<INTEGER>, `BIGINT_ARRAY` ARRAY<BIGINT>, `DOUBLE_ARRAY` ARRAY<DOUBLE>, `STRING_ARRAY` ARRAY<STRING>, `DECIMAL_ARRAY` ARRAY<DECIMAL(2, 1)>"
},
"selectExpressions" : [ "ARRAY_LENGTH(BOOLEAN_ARRAY) AS BOOLEAN_LEN", "ARRAY_LENGTH(INT_ARRAY) AS INT_LEN", "ARRAY_LENGTH(BIGINT_ARRAY) AS BIGINT_LEN", "ARRAY_LENGTH(DOUBLE_ARRAY) AS DOUBLE_LEN", "ARRAY_LENGTH(STRING_ARRAY) AS STRING_LEN", "ARRAY_LENGTH(DECIMAL_ARRAY) AS DECIMAL_LEN" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "OUTPUT",
"timestampColumn" : null
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.streams.state.dir" : "/var/folders/2d/3pt97ylj3zngd51bwl91bl3r0000gp/T/confluent6704432109653647691",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.any.key.name.enabled" : "false",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{
"version" : "6.0.0",
"timestamp" : 1583509764019,
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<BOOLEAN_ARRAY ARRAY<BOOLEAN>, INT_ARRAY ARRAY<INT>, BIGINT_ARRAY ARRAY<BIGINT>, DOUBLE_ARRAY ARRAY<DOUBLE>, STRING_ARRAY ARRAY<VARCHAR>, DECIMAL_ARRAY ARRAY<DECIMAL(2, 1)>> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<BOOLEAN_LEN INT, INT_LEN INT, BIGINT_LEN INT, DOUBLE_LEN INT, STRING_LEN INT, DECIMAL_LEN INT> NOT NULL"
},
"inputs" : [ {
"topic" : "test_topic",
"key" : "",
"value" : {
"boolean_array" : [ true ],
"int_array" : [ -1, 0 ],
"bigint_array" : [ -1, 0, 1 ],
"double_array" : [ 0.0, 0.1, 0.2, 0.3 ],
"string_array" : [ "a", "b", "c", "d", "e" ],
"decimal_array" : [ 1.0, 1.1, 1.2, 1.3, 1.4, 1.5 ]
}
}, {
"topic" : "test_topic",
"key" : "",
"value" : { }
} ],
"outputs" : [ {
"topic" : "OUTPUT",
"key" : "",
"value" : {
"BOOLEAN_LEN" : 1,
"INT_LEN" : 2,
"BIGINT_LEN" : 3,
"DOUBLE_LEN" : 4,
"STRING_LEN" : 5,
"DECIMAL_LEN" : 6
}
}, {
"topic" : "OUTPUT",
"key" : "",
"value" : {
"BOOLEAN_LEN" : 0,
"INT_LEN" : 0,
"BIGINT_LEN" : 0,
"DOUBLE_LEN" : 0,
"STRING_LEN" : 0,
"DECIMAL_LEN" : 0
}
} ]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Loading

0 comments on commit 31a9d9d

Please sign in to comment.