Skip to content

Commit

Permalink
[FLINK-21298][table] Support 'USE MODULES' syntax both in SQL parser,…
Browse files Browse the repository at this point in the history
… TableEnvironment and SQL CLI

This closes apache#15005
  • Loading branch information
LadyForest committed Mar 3, 2021
1 parent 816ce96 commit 57decce
Show file tree
Hide file tree
Showing 20 changed files with 493 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,12 @@ private void callCommand(SqlCommandCall cmdCall) {
CliStrings.MESSAGE_UNLOAD_MODULE_SUCCEEDED,
CliStrings.MESSAGE_UNLOAD_MODULE_FAILED);
break;
case USE_MODULES:
callDdl(
cmdCall.operands[0],
CliStrings.MESSAGE_USE_MODULES_SUCCEEDED,
CliStrings.MESSAGE_USE_MODULES_FAILED);
break;
default:
throw new SqlClientException("Unsupported command: " + cmdCall.command);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ private CliStrings() {
formatCommand(
SqlCommand.UNLOAD_MODULE,
"Unload a module. Syntax: 'UNLOAD MODULE <name>;'"))
.append(
formatCommand(
SqlCommand.USE_MODULES,
"Enable loaded modules. Syntax: 'USE MODULES <name1> [, <name2>, ...];'"))
.style(AttributedStyle.DEFAULT.underline())
.append("\nHint")
.style(AttributedStyle.DEFAULT)
Expand All @@ -120,6 +124,7 @@ private CliStrings() {
.toAttributedString();

public static final String MESSAGE_WELCOME;

// make findbugs happy
static {
MESSAGE_WELCOME =
Expand Down Expand Up @@ -232,10 +237,14 @@ private CliStrings() {

public static final String MESSAGE_UNLOAD_MODULE_SUCCEEDED = "Unload module succeeded!";

public static final String MESSAGE_USE_MODULES_SUCCEEDED = "Use modules succeeded!";

public static final String MESSAGE_LOAD_MODULE_FAILED = "Load module failed!";

public static final String MESSAGE_UNLOAD_MODULE_FAILED = "Unload module failed!";

public static final String MESSAGE_USE_MODULES_FAILED = "Use modules failed!";

// --------------------------------------------------------------------------------------------

public static final String RESULT_TITLE = "SQL Query Result";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.table.operations.UnloadModuleOperation;
import org.apache.flink.table.operations.UseCatalogOperation;
import org.apache.flink.table.operations.UseDatabaseOperation;
import org.apache.flink.table.operations.UseModulesOperation;
import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterTableOperation;
Expand Down Expand Up @@ -178,6 +179,8 @@ private static SqlCommandCall parseBySqlParser(Parser sqlParser, String stmt) {
cmd = SqlCommand.LOAD_MODULE;
} else if (operation instanceof UnloadModuleOperation) {
cmd = SqlCommand.UNLOAD_MODULE;
} else if (operation instanceof UseModulesOperation) {
cmd = SqlCommand.USE_MODULES;
} else if (operation instanceof DescribeTableOperation) {
cmd = SqlCommand.DESCRIBE;
operands =
Expand Down Expand Up @@ -268,6 +271,8 @@ enum SqlCommand {

UNLOAD_MODULE,

USE_MODULES,

SHOW_PARTITIONS,

USE_CATALOG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void setSessionProperty(String sessionId, String key, String value)
/** Executes a SQL statement, and return {@link TableResult} as execution result. */
TableResult executeSql(String sessionId, String statement) throws SqlExecutionException;

/** Lists all modules known to the executor in their loaded order. */
/** Lists used modules known to the executor in their resolution order. */
List<String> listModules(String sessionId) throws SqlExecutionException;

/** Returns a sql parser instance. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.module.ModuleEntry;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
Expand Down Expand Up @@ -423,6 +424,13 @@ public ProgramTargetDescriptor executeUpdate(String sessionId, String statement)
return executeUpdateInternal(sessionId, context, statement);
}

@VisibleForTesting
List<ModuleEntry> listFullModules(String sessionId) throws SqlExecutionException {
final ExecutionContext<?> context = getExecutionContext(sessionId);
final TableEnvironment tableEnv = context.getTableEnvironment();
return context.wrapClassLoader(() -> Arrays.asList(tableEnv.listFullModules()));
}

// --------------------------------------------------------------------------------------------

private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,28 @@ public void testCommands() throws Exception {
"UNLOAD MODULE dummy",
SqlCommand.UNLOAD_MODULE,
"UNLOAD MODULE dummy"),
// use one module
TestItem.validSql(
"USE MODULES dummy", SqlCommand.USE_MODULES, "USE MODULES dummy"),
// use multiple modules
TestItem.validSql(
"USE MODULES x, y, z",
SqlCommand.USE_MODULES,
"USE MODULES x, y, z"),
// use modules with module names as reserved keywords
TestItem.validSql(
"USE MODULES `MODULE`, `MODULES`",
SqlCommand.USE_MODULES,
"USE MODULES `MODULE`, `MODULES`"),
// use modules with module names as literals
TestItem.invalidSql(
"USE MODULES 'dummy'",
SqlExecutionException.class,
"Encountered \"\\'dummy\\'\""),
TestItem.invalidSql(
"USE MODULES",
SqlExecutionException.class,
"Encountered \"<EOF>\""),
// Test create function.
TestItem.invalidSql(
"CREATE FUNCTION ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.flink.table.client.gateway.utils.SimpleCatalogFactory;
import org.apache.flink.table.client.gateway.utils.TestUserClassLoaderJar;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.module.ModuleEntry;
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.test.util.MiniClusterWithClientResource;
Expand Down Expand Up @@ -1713,6 +1714,111 @@ public void testHiveBuiltInFunctionWithHiveModuleEnabled() throws Exception {
hasItems("www.apache"));
}

@Test
public void testUseModulesWithModuleConfEnabled() throws Exception {
// only blink planner supports USE MODULES syntax
Assume.assumeTrue(planner.equals("blink"));
final LocalExecutor executor =
createModifiedExecutor(
MODULES_ENVIRONMENT_FILE, clusterClient, createModuleReplaceVars());
final SessionContext session = new SessionContext("test-session", new Environment());
String sessionId = executor.openSession(session);
assertEquals("test-session", sessionId);
assertEquals(
executor.listModules(sessionId),
Arrays.asList("core", "mymodule", "myhive", "myhive2"));
assertEquals(
executor.listFullModules(sessionId),
Arrays.asList(
new ModuleEntry("core", true),
new ModuleEntry("mymodule", true),
new ModuleEntry("myhive", true),
new ModuleEntry("myhive2", true)));

// change resolution order
executor.executeSql(sessionId, "use modules myhive2, core, mymodule, myhive");
assertEquals(
executor.listModules(sessionId),
Arrays.asList("myhive2", "core", "mymodule", "myhive"));
assertEquals(
executor.listFullModules(sessionId),
Arrays.asList(
new ModuleEntry("myhive2", true),
new ModuleEntry("core", true),
new ModuleEntry("mymodule", true),
new ModuleEntry("myhive", true)));

// disable modules by not using
executor.executeSql(sessionId, "use modules core, myhive");
assertEquals(executor.listModules(sessionId), Arrays.asList("core", "myhive"));
assertEquals(
executor.listFullModules(sessionId),
Arrays.asList(
new ModuleEntry("core", true),
new ModuleEntry("myhive", true),
new ModuleEntry("mymodule", false),
new ModuleEntry("myhive2", false)));

// use duplicate module names
assertThrows(
"Could not execute statement: use modules core, myhive, core",
SqlExecutionException.class,
() -> executor.executeSql(sessionId, "use modules core, myhive, core"));

// use non-existed module name
assertThrows(
"Could not execute statement: use modules core, dummy",
SqlExecutionException.class,
() -> executor.executeSql(sessionId, "use modules core, dummy"));
}

@Test
public void testHiveBuiltInFunctionWithoutUsingHiveModule() throws Exception {
// only blink planner supports USE MODULES syntax
Assume.assumeTrue(planner.equals("blink"));

final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_PLANNER", planner);
replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "100");
replaceVars.put("$VAR_RESULT_MODE", "table");

final LocalExecutor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
String sessionId = executor.openSession(session);
assertEquals("test-session", sessionId);

executor.executeSql(sessionId, "load module hive");
assertEquals(executor.listModules(sessionId), Arrays.asList("core", "hive"));
assertEquals(
executor.listFullModules(sessionId),
Arrays.asList(new ModuleEntry("core", true), new ModuleEntry("hive", true)));

assertShowResult(
executor.executeSql(
sessionId,
"select substring_index('www.apache.org', '.', 2) from TableNumber1"),
hasItems("www.apache"));

// cannot use hive built-in function without using hive module
executor.executeSql(sessionId, "use modules core");
assertEquals(executor.listModules(sessionId), Collections.singletonList("core"));
assertEquals(
executor.listFullModules(sessionId),
Arrays.asList(new ModuleEntry("core", true), new ModuleEntry("hive", false)));
assertThrows(
"Could not execute statement: select substring_index('www.apache.org', '.', 2) from TableNumber1",
SqlExecutionException.class,
() ->
executor.executeSql(
sessionId,
"select substring_index('www.apache.org', '.', 2) from TableNumber1"));
}

private void executeStreamQueryTable(
Map<String, String> replaceVars, String query, List<String> expectedResults)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
"org.apache.flink.sql.parser.ddl.SqlTableOption"
"org.apache.flink.sql.parser.ddl.SqlUseCatalog"
"org.apache.flink.sql.parser.ddl.SqlUseDatabase"
"org.apache.flink.sql.parser.ddl.SqlUseModules"
"org.apache.flink.sql.parser.ddl.SqlWatermark"
"org.apache.flink.sql.parser.dml.RichSqlInsert"
"org.apache.flink.sql.parser.dml.RichSqlInsertKeyword"
Expand Down Expand Up @@ -129,6 +130,7 @@
"LINES"
"LOAD"
"LOCATION"
"MODULES"
"NORELY"
"NOVALIDATE"
"OUTPUTFORMAT"
Expand Down Expand Up @@ -532,6 +534,7 @@
"SqlAlterView()"
"SqlShowPartitions()"
"SqlUnloadModule()"
"SqlUseModules()"
]

# List of methods for parsing custom literals.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1541,8 +1541,8 @@ SqlLoadModule SqlLoadModule() :
*/
SqlUnloadModule SqlUnloadModule() :
{
SqlParserPos startPos;
SqlIdentifier moduleName;
SqlParserPos startPos;
SqlIdentifier moduleName;
}
{
<UNLOAD> <MODULE> { startPos = getPos(); }
Expand All @@ -1551,3 +1551,33 @@ SqlUnloadModule SqlUnloadModule() :
return new SqlUnloadModule(startPos.plus(getPos()), moduleName);
}
}

/**
* Parses an use modules statement.
* USE MODULES module_name1 [, module_name2, ...];
*/
SqlUseModules SqlUseModules() :
{
final Span s;
SqlIdentifier moduleName;
final List<SqlIdentifier> moduleNames = new ArrayList<SqlIdentifier>();
}
{
<USE> <MODULES> { s = span(); }
moduleName = SimpleIdentifier()
{
moduleNames.add(moduleName);
}
[
(
<COMMA>
moduleName = SimpleIdentifier()
{
moduleNames.add(moduleName);
}
)+
]
{
return new SqlUseModules(s.end(this), moduleNames);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -454,4 +454,16 @@ public void testLoadModule() {
public void testUnloadModule() {
sql("unload module hive").ok("UNLOAD MODULE `HIVE`");
}

@Test
public void testUseModules() {
sql("use modules hive").ok("USE MODULES `HIVE`");

sql("use modules x, y, z").ok("USE MODULES `X`, `Y`, `Z`");

sql("use modules x^,^").fails("(?s).*Encountered \"<EOF>\" at line 1, column 14.\n.*");

sql("use modules ^'hive'^")
.fails("(?s).*Encountered \"\\\\'hive\\\\'\" at line 1, column 13.\n.*");
}
}
3 changes: 3 additions & 0 deletions flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"org.apache.flink.sql.parser.ddl.SqlTableOption"
"org.apache.flink.sql.parser.ddl.SqlUseCatalog"
"org.apache.flink.sql.parser.ddl.SqlUseDatabase"
"org.apache.flink.sql.parser.ddl.SqlUseModules"
"org.apache.flink.sql.parser.ddl.SqlWatermark"
"org.apache.flink.sql.parser.dml.RichSqlInsert"
"org.apache.flink.sql.parser.dml.RichSqlInsertKeyword"
Expand Down Expand Up @@ -95,6 +96,7 @@
"IF"
"LOAD"
"METADATA"
"MODULES"
"OVERWRITE"
"OVERWRITING"
"PARTITIONED"
Expand Down Expand Up @@ -472,6 +474,7 @@
"SqlAlterTable()"
"SqlShowViews()"
"SqlUnloadModule()"
"SqlUseModules()"
]

# List of methods for parsing custom literals.
Expand Down
Loading

0 comments on commit 57decce

Please sign in to comment.