Skip to content

Commit

Permalink
[FLINK-25685][rest] RestClusterClient supports uploading local file p…
Browse files Browse the repository at this point in the history
…ath with scheme
  • Loading branch information
Sxnan committed Feb 3, 2022
1 parent e6210d4 commit 46bb038
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
artifactFilePath.getName()));
filesToUpload.add(
new FileUpload(
Paths.get(artifacts.getValue().filePath),
Paths.get(artifactFilePath.getPath()),
RestConstants.CONTENT_TYPE_BINARY));
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
Expand Down Expand Up @@ -107,12 +109,15 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import javax.annotation.Nonnull;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.file.Files;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -718,6 +723,45 @@ testJobExecutionResultHandler, new TestJobSubmitHandler())) {
}
}

@Test(timeout = 120_000)
public void testJobSubmissionWithoutUserArtifact() throws Exception {
try (final TestRestServerEndpoint restServerEndpoint =
createRestServerEndpoint(new TestJobSubmitHandler())) {
try (RestClusterClient<?> restClusterClient =
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {

restClusterClient.submitJob(jobGraph).get();
}
}
}

@Test(timeout = 120_000)
public void testJobSubmissionWithUserArtifact() throws Exception {
try (final TestRestServerEndpoint restServerEndpoint =
createRestServerEndpoint(new TestJobSubmitHandler())) {
try (RestClusterClient<?> restClusterClient =
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {

TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
File file = temporaryFolder.newFile();
Files.write(file.toPath(), "hello world".getBytes(ConfigConstants.DEFAULT_CHARSET));

// Add file path with scheme
jobGraph.addUserArtifact(
"file",
new DistributedCache.DistributedCacheEntry(file.toURI().toString(), false));

// Add file path without scheme
jobGraph.addUserArtifact(
"file2",
new DistributedCache.DistributedCacheEntry(file.toURI().getPath(), false));

restClusterClient.submitJob(jobGraph).get();
}
}
}

@Test
public void testJobSubmissionFailureCauseForwardedToClient() throws Exception {
try (final TestRestServerEndpoint restServerEndpoint =
Expand Down

0 comments on commit 46bb038

Please sign in to comment.