Skip to content

Commit

Permalink
[FLINK-19188][examples-table] Add a new streaming SQL example
Browse files Browse the repository at this point in the history
This closes apache#13413.
  • Loading branch information
twalthr committed Nov 24, 2020
1 parent cdcc57c commit e4286e7
Show file tree
Hide file tree
Showing 3 changed files with 266 additions and 0 deletions.
29 changes: 29 additions & 0 deletions flink-examples/flink-examples-table/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ under the License.
<version>${project.version}</version>
</dependency>

<!-- Table connectors and formats -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Flink core -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down Expand Up @@ -128,6 +135,28 @@ under the License.
</configuration>
</execution>

<execution>
<id>UpdatingTopCityExample</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>

<configuration>
<classifier>UpdatingTopCityExample</classifier>

<archive>
<manifestEntries>
<program-class>org.apache.flink.table.examples.java.basics.UpdatingTopCityExample</program-class>
</manifestEntries>
</archive>

<includes>
<include>org/apache/flink/table/examples/java/basics/UpdatingTopCityExample*</include>
</includes>
</configuration>
</execution>

<execution>
<id>StreamSQLExample</id>
<phase>package</phase>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.examples.java.basics;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;

/**
* Example for aggregating and ranking data using Flink SQL on updating tables.
*
* <p>The example shows how to declare a table using SQL DDL for reading bounded, insert-only data and
* how to handle updating results in streaming mode. It should give a first impression about Flink SQL as
* a changelog processor. The example uses some streaming operations that produce a stream of updates.
* See the other examples for pure CDC processing and more complex operations.
*
* <p>In particular, the example shows how to
* <ul>
* <li>setup a {@link TableEnvironment},
* <li>use the environment for creating a CSV file with bounded example data,
* <li>aggregate the incoming INSERT changes in an updating table,
* <li>compute an updating top-N result,
* <li>collect and materialize the table locally.
* </ul>
*
* <p>The example executes two Flink jobs. The results are written to stdout.
*
* <p>Note: Make sure to include the SQL CSV format when submitting this example to Flink (e.g. via
* command line). This step is not necessary when executing the example in an IDE.
*/
public final class UpdatingTopCityExample {

public static void main(String[] args) throws Exception {
// prepare the session
final EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
final TableEnvironment env = TableEnvironment.create(settings);

// create an empty temporary CSV directory for this example
final String populationDirPath = createTemporaryDirectory();

// register a table in the catalog that points to the CSV file
env.executeSql(
"CREATE TABLE PopulationUpdates (" +
" city STRING," +
" state STRING," +
" update_year INT," +
" population_diff INT" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = '" + populationDirPath + "'," +
" 'format' = 'csv'" +
")"
);

// insert some example data into the table
final TableResult insertionResult = env.executeSql(
"INSERT INTO PopulationUpdates VALUES" +
" ('Los Angeles', 'CA', 2013, 13106100), " +
" ('Los Angeles', 'CA', 2014, 72600), " +
" ('Los Angeles', 'CA', 2015, 72300), " +
" ('Chicago', 'IL', 2013, 9553270), " +
" ('Chicago', 'IL', 2014, 11340), " +
" ('Chicago', 'IL', 2015, -6730), " +
" ('Houston', 'TX', 2013, 6330660), " +
" ('Houston', 'TX', 2014, 172960), " +
" ('Houston', 'TX', 2015, 172940), " +
" ('Phoenix', 'AZ', 2013, 4404680), " +
" ('Phoenix', 'AZ', 2014, 86740), " +
" ('Phoenix', 'AZ', 2015, 89700), " +
" ('San Antonio', 'TX', 2013, 2280580), " +
" ('San Antonio', 'TX', 2014, 49180), " +
" ('San Antonio', 'TX', 2015, 50870), " +
" ('San Francisco', 'CA', 2013, 4521310), " +
" ('San Francisco', 'CA', 2014, 65940), " +
" ('San Francisco', 'CA', 2015, 62290), " +
" ('Dallas', 'TX', 2013, 6817520), " +
" ('Dallas', 'TX', 2014, 137740), " +
" ('Dallas', 'TX', 2015, 154020)"
);

// since all cluster operations of the Table API are executed asynchronously,
// we need to wait until the insertion has been completed,
// an exception is thrown in case of an error
insertionResult.await();

// read from table and aggregate the total population per city
final Table currentPopulation = env.sqlQuery(
"SELECT city, state, MAX(update_year) AS latest_year, SUM(population_diff) AS population " +
"FROM PopulationUpdates " +
"GROUP BY city, state"
);

// either define a nested SQL statement with sub-queries
// or divide the problem into sub-views which will be optimized
// as a whole during planning
env.createTemporaryView("CurrentPopulation", currentPopulation);

// find the top 2 cities with the highest population per state,
// we use a sub-query that is correlated with every unique state,
// for every state we rank by population and return the top 2 cities
final Table topCitiesPerState = env.sqlQuery(
"SELECT state, city, latest_year, population " +
"FROM " +
" (SELECT DISTINCT state FROM CurrentPopulation) States," +
" LATERAL (" +
" SELECT city, latest_year, population" +
" FROM CurrentPopulation" +
" WHERE state = States.state" +
" ORDER BY population DESC, latest_year DESC" +
" LIMIT 2" +
" )"
);

// uncomment the following line to get insights into the continuously evaluated query,
// execute this pipeline by using the local client as an implicit sink
// topCitiesPerState.execute().print();

// because we execute the pipeline in streaming mode, the output shows how the result evolves
// and is updated over time when new information for a city is ingested:
// +----+-------+-------------+-------------+-------------+
// | op | state | city | latest_year | population |
// +----+-------+-------------+-------------+-------------+
// | +I | CA | Los Angeles | 2013 | 13106100 |
// | -D | CA | Los Angeles | 2013 | 13106100 |
// | +I | CA | Los Angeles | 2014 | 13178700 |
// | -D | CA | Los Angeles | 2014 | 13178700 |
// | +I | CA | Los Angeles | 2015 | 13251000 |
// ...

// the changelog can be applied (i.e. materialized) in an external system,
// usually a key-value store can be used as a table sink,
// but to show the underlying changelog capabilities we simply use
// execute().collect() and a List where we maintain updates
try (CloseableIterator<Row> iterator = topCitiesPerState.execute().collect()) {
final List<Row> materializedUpdates = new ArrayList<>();
iterator.forEachRemaining(row -> {
final RowKind kind = row.getKind();
switch (kind) {
case INSERT:
case UPDATE_BEFORE:
row.setKind(RowKind.INSERT); // for full equality
materializedUpdates.add(row);
break;
case UPDATE_AFTER:
case DELETE:
row.setKind(RowKind.INSERT); // for full equality
materializedUpdates.remove(row);
break;
}
});
// show the final output table if the result is bounded,
// the output should exclude San Antonio because it has a smaller population than
// Houston or Dallas in Texas (TX)
materializedUpdates.forEach(System.out::println);
}
}

/**
* Creates an empty temporary directory for CSV files and returns the absolute path.
*/
private static String createTemporaryDirectory() throws IOException {
final Path tempDirectory = Files.createTempDirectory("population");
return tempDirectory.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.examples.java.basics;

import org.apache.flink.table.examples.utils.ExampleOutputTestBase;

import org.junit.Test;

import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertThat;

/**
* Test for {@link UpdatingTopCityExample}.
*/
public class UpdatingTopCityExampleITCase extends ExampleOutputTestBase {

@Test
public void testExample() throws Exception {
UpdatingTopCityExample.main(new String[0]);
final String consoleOutput = getOutputString();
assertThat(consoleOutput, containsString("AZ,Phoenix,2015,4581120"));
assertThat(consoleOutput, containsString("IL,Chicago,2015,9557880"));
assertThat(consoleOutput, containsString("CA,San Francisco,2015,4649540"));
assertThat(consoleOutput, containsString("CA,Los Angeles,2015,13251000"));
assertThat(consoleOutput, containsString("TX,Dallas,2015,7109280"));
assertThat(consoleOutput, containsString("TX,Houston,2015,6676560"));
}
}

0 comments on commit e4286e7

Please sign in to comment.