Skip to content

Commit

Permalink
Merge pull request apache#24381: [Spark Dataset runner] Remove obsole…
Browse files Browse the repository at this point in the history
…te unusable AggregatorsAccumulator / NamedAggregators
  • Loading branch information
aromanenko-dev committed Dec 12, 2022
2 parents 8367921 + f0c7f86 commit 3f2aecf
Show file tree
Hide file tree
Showing 11 changed files with 10 additions and 465 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.graph.ProjectionPushdownOptimizer;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.spark.structuredstreaming.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.structuredstreaming.metrics.AggregatorMetricSource;
import org.apache.beam.runners.spark.structuredstreaming.metrics.CompositeSource;
import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.structuredstreaming.metrics.SparkBeamMetricSource;
import org.apache.beam.runners.spark.structuredstreaming.translation.EvaluationContext;
Expand Down Expand Up @@ -152,12 +149,8 @@ public SparkStructuredStreamingPipelineResult run(final Pipeline pipeline) {
PipelineTranslator.detectStreamingMode(pipeline, options);
checkArgument(!options.isStreaming(), "Streaming is not supported.");

// clear state of Aggregators, Metrics and Watermarks if exists.
AggregatorsAccumulator.clear();
MetricsAccumulator.clear();

final SparkSession sparkSession = SparkSessionFactory.getOrCreateSession(options);
initAccumulators(sparkSession.sparkContext());
initMetrics(sparkSession.sparkContext());

final Future<?> submissionFuture =
runAsync(() -> translatePipeline(sparkSession, pipeline).evaluate());
Expand Down Expand Up @@ -204,24 +197,17 @@ private EvaluationContext translatePipeline(SparkSession sparkSession, Pipeline

private void registerMetricsSource(String appName) {
final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem();
final AggregatorMetricSource aggregatorMetricSource =
new AggregatorMetricSource(null, AggregatorsAccumulator.getInstance().value());
final SparkBeamMetricSource metricsSource = new SparkBeamMetricSource(null);
final CompositeSource compositeSource =
new CompositeSource(
appName + ".Beam",
metricsSource.metricRegistry(),
aggregatorMetricSource.metricRegistry());
final SparkBeamMetricSource metricsSource = new SparkBeamMetricSource(appName + ".Beam");
// re-register the metrics in case of context re-use
metricsSystem.removeSource(compositeSource);
metricsSystem.registerSource(compositeSource);
metricsSystem.removeSource(metricsSource);
metricsSystem.registerSource(metricsSource);
}

/** Init Metrics/Aggregators accumulators. This method is idempotent. */
private static void initAccumulators(SparkContext sparkContext) {
// Init metrics accumulators
private static void initMetrics(SparkContext sparkContext) {
// Clear and init metrics accumulators
MetricsAccumulator.clear();
MetricsAccumulator.init(sparkContext);
AggregatorsAccumulator.init(sparkContext);
}

private static Future<?> runAsync(Runnable task) {
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 3f2aecf

Please sign in to comment.