Skip to content

Commit

Permalink
Fix UpdateSchemaDestination when source format is set to AVRO
Browse files Browse the repository at this point in the history
  • Loading branch information
steveniemitz committed Jul 22, 2022
1 parent caaefc0 commit 884c0f9
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,7 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
WriteDisposition.WRITE_APPEND,
CreateDisposition.CREATE_NEVER,
maxRetryJobs,
ignoreUnknownValues,
kmsKey,
rowWriterFactory.getSourceFormat(),
useAvroLogicalTypes,
schemaUpdateOptions,
dynamicDestinations))
.withSideInputs(sideInputsForUpdateSchema))
Expand Down Expand Up @@ -480,10 +477,7 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
WriteDisposition.WRITE_APPEND,
CreateDisposition.CREATE_NEVER,
maxRetryJobs,
ignoreUnknownValues,
kmsKey,
rowWriterFactory.getSourceFormat(),
useAvroLogicalTypes,
schemaUpdateOptions,
dynamicDestinations))
.withSideInputs(sideInputsForUpdateSchema))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,13 @@ public class UpdateSchemaDestination
private final PCollectionView<String> loadJobIdPrefixView;
private final ValueProvider<String> loadJobProjectId;
private transient @Nullable DatasetService datasetService;

private final int maxRetryJobs;
private final @Nullable String kmsKey;
private final String sourceFormat;
private final boolean useAvroLogicalTypes;
private @Nullable BigQueryServices.JobService jobService;
private final boolean ignoreUnknownValues;
private final Set<BigQueryIO.Write.SchemaUpdateOption> schemaUpdateOptions;
private BigQueryIO.Write.WriteDisposition writeDisposition;
private BigQueryIO.Write.CreateDisposition createDisposition;
private DynamicDestinations dynamicDestinations;
private final BigQueryIO.Write.WriteDisposition writeDisposition;
private final BigQueryIO.Write.CreateDisposition createDisposition;
private final DynamicDestinations dynamicDestinations;

private static class PendingJobData {
final BigQueryHelpers.PendingJob retryJob;
Expand All @@ -80,7 +76,7 @@ public PendingJobData(
}
}

private List<UpdateSchemaDestination.PendingJobData> pendingJobs = Lists.newArrayList();
private final List<UpdateSchemaDestination.PendingJobData> pendingJobs = Lists.newArrayList();

public UpdateSchemaDestination(
BigQueryServices bqServices,
Expand All @@ -89,20 +85,14 @@ public UpdateSchemaDestination(
BigQueryIO.Write.WriteDisposition writeDisposition,
BigQueryIO.Write.CreateDisposition createDisposition,
int maxRetryJobs,
boolean ignoreUnknownValues,
@Nullable String kmsKey,
String sourceFormat,
boolean useAvroLogicalTypes,
Set<BigQueryIO.Write.SchemaUpdateOption> schemaUpdateOptions,
DynamicDestinations dynamicDestinations) {
this.loadJobProjectId = loadJobProjectId;
this.loadJobIdPrefixView = loadJobIdPrefixView;
this.bqServices = bqServices;
this.maxRetryJobs = maxRetryJobs;
this.ignoreUnknownValues = ignoreUnknownValues;
this.kmsKey = kmsKey;
this.sourceFormat = sourceFormat;
this.useAvroLogicalTypes = useAvroLogicalTypes;
this.schemaUpdateOptions = schemaUpdateOptions;
this.createDisposition = createDisposition;
this.writeDisposition = writeDisposition;
Expand Down Expand Up @@ -217,14 +207,10 @@ private BigQueryHelpers.PendingJob startZeroLoadJob(
.setSchema(schema)
.setWriteDisposition(writeDisposition.name())
.setCreateDisposition(createDisposition.name())
.setSourceFormat(sourceFormat)
.setIgnoreUnknownValues(ignoreUnknownValues)
.setUseAvroLogicalTypes(useAvroLogicalTypes);
.setSourceFormat("NEWLINE_DELIMITED_JSON");
if (schemaUpdateOptions != null) {
List<String> options =
schemaUpdateOptions.stream()
.map(Enum<BigQueryIO.Write.SchemaUpdateOption>::name)
.collect(Collectors.toList());
schemaUpdateOptions.stream().map(Enum::name).collect(Collectors.toList());
loadConfig.setSchemaUpdateOptions(options);
}
if (!loadConfig
Expand All @@ -235,7 +221,7 @@ private BigQueryHelpers.PendingJob startZeroLoadJob(
.equals(BigQueryIO.Write.WriteDisposition.WRITE_APPEND.toString())) {
return null;
}
Table destinationTable = null;
final Table destinationTable;
try {
destinationTable = datasetService.getTable(tableReference);
if (destinationTable == null) {
Expand Down

0 comments on commit 884c0f9

Please sign in to comment.