diff --git a/database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java b/database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java index 1c347dd03..1a67f09ea 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java @@ -16,7 +16,10 @@ package io.cdap.plugin.db.source; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import io.cdap.plugin.db.ConnectionConfigAccessor; import io.cdap.plugin.db.JDBCDriverShim; import io.cdap.plugin.db.NoOpCommitConnection; @@ -24,6 +27,7 @@ import io.cdap.plugin.util.DBUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; @@ -41,6 +45,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; +import java.util.List; import java.util.Properties; /** @@ -64,6 +69,58 @@ public static void setInput(Configuration conf, new ConnectionConfigAccessor(conf).setAutoCommitEnabled(enableAutoCommit); } + /** + * A wrapper around the superclass getSplits method. This is required for testing, + * as mocking a method that calls its superclass implementation is challenging. + * This wrapper allows unit tests to override this method and mock the behavior of + * the superclass call, isolating the logic within the overridden getSplits method. + * + * @param job The job context. + * @return The list of input splits generated by the parent class. + * @throws IOException If an error occurs while getting the splits. + */ + @VisibleForTesting + List getBaseSplits(JobContext job) throws IOException { + return super.getSplits(job); + } + + @Override + public List getSplits(JobContext job) throws IOException { + List splits = getBaseSplits(job); + + // Handle the edge case where the base implementation returns no splits. In this scenario, + // create a single, all-encompassing split to ensure the job can proceed. + if (splits == null || splits.isEmpty()) { + return ImmutableList.builder() + .add(new DataDrivenDBInputSplit("1=1", "1=1")) + .build(); + } + + // Check if a split for NULL values or an all-encompassing split ("1=1") already exists. + boolean shouldAddNullSplit = splits.stream() + .map(DataDrivenDBInputFormat.DataDrivenDBInputSplit.class::cast) + .map(DataDrivenDBInputFormat.DataDrivenDBInputSplit::getLowerClause) + .noneMatch(lowerClause -> + !Strings.isNullOrEmpty(lowerClause) + && (lowerClause.contains("IS NULL") || lowerClause.contains("1=1"))); + + if (shouldAddNullSplit) { + // Create a dedicated split to handle potential NULL values in the split-by column. + String splitByColumn = getDBConf().getInputOrderBy(); + LOG.debug("No split found for NULL values. Adding a split for '{} IS NULL'.", splitByColumn); + String clause = String.format("%s IS NULL", splitByColumn); + InputSplit nullSplit = + new DataDrivenDBInputFormat.DataDrivenDBInputSplit(clause, clause); + + return ImmutableList.builder() + .addAll(splits) + .add(nullSplit) + .build(); + } + + return splits; + } + @Override public Connection getConnection() { if (this.connection == null) { diff --git a/database-commons/src/test/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormatTest.java b/database-commons/src/test/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormatTest.java new file mode 100644 index 000000000..b369d008b --- /dev/null +++ b/database-commons/src/test/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormatTest.java @@ -0,0 +1,121 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db.source; + +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; +import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +@RunWith(MockitoJUnitRunner.class) +public class DataDrivenETLDBInputFormatTest { + + @Mock + private JobContext mockJobContext; + @Mock + private DBConfiguration mockDbConfiguration; + + private DataDrivenETLDBInputFormat inputFormat; + + @Before + public void setUp() { + inputFormat = Mockito.spy(new DataDrivenETLDBInputFormat()); + Mockito.doReturn(mockDbConfiguration).when(inputFormat).getDBConf(); + Mockito.doReturn("id").when(mockDbConfiguration).getInputOrderBy(); + } + + @Test + public void testGetSplitsAddsNullSplit() throws IOException { + DataDrivenDBInputFormat.DataDrivenDBInputSplit existingSplit = + new DataDrivenDBInputFormat.DataDrivenDBInputSplit("id >= 0", "id < 100"); + List initialSplits = ImmutableList.of(existingSplit); + Mockito.doReturn(initialSplits).when(inputFormat).getBaseSplits(mockJobContext); + + List finalSplits = inputFormat.getSplits(mockJobContext); + + Assert.assertEquals("A new split for NULLs should be added", 2, finalSplits.size()); + + DataDrivenDBInputFormat.DataDrivenDBInputSplit nullSplit = + (DataDrivenDBInputFormat.DataDrivenDBInputSplit) finalSplits.get(1); + Assert.assertEquals("id IS NULL", nullSplit.getLowerClause()); + Assert.assertEquals("id IS NULL", nullSplit.getUpperClause()); + } + + @Test + public void testGetSplitsDoesNotAddNullSplitIfPresent() throws IOException { + DataDrivenDBInputFormat.DataDrivenDBInputSplit existingSplit = + new DataDrivenDBInputFormat.DataDrivenDBInputSplit("id >= 0", "id < 100"); + DataDrivenDBInputFormat.DataDrivenDBInputSplit nullSplit = + new DataDrivenDBInputFormat.DataDrivenDBInputSplit("id IS NULL", "id IS NULL"); + List initialSplits = ImmutableList.of(existingSplit, nullSplit); + + Mockito.doReturn(initialSplits).when(inputFormat).getBaseSplits(mockJobContext); + + List finalSplits = inputFormat.getSplits(mockJobContext); + + Assert.assertEquals("Should not add a duplicate NULL split", 2, finalSplits.size()); + } + + @Test + public void testGetSplitsDoesNotAddNullSplitIfSelectAllPresent() throws IOException { + DataDrivenDBInputFormat.DataDrivenDBInputSplit existingSplit = + new DataDrivenDBInputFormat.DataDrivenDBInputSplit("1=1", "1=1"); + List initialSplits = ImmutableList.of(existingSplit); + + Mockito.doReturn(initialSplits).when(inputFormat).getBaseSplits(mockJobContext); + + List finalSplits = inputFormat.getSplits(mockJobContext); + + Assert.assertEquals("Should not add a NULL split", 1, finalSplits.size()); + } + + @Test + public void testGetSplitsDoesNotAddNullSplitIfBaseReturnsNull() throws IOException { + Mockito.doReturn(null).when(inputFormat).getBaseSplits(mockJobContext); + + List finalSplits = inputFormat.getSplits(mockJobContext); + Assert.assertEquals("Should not add a NULL split", 1, finalSplits.size()); + + DataDrivenDBInputFormat.DataDrivenDBInputSplit split = + (DataDrivenDBInputFormat.DataDrivenDBInputSplit) finalSplits.get(0); + Assert.assertEquals("1=1", split.getLowerClause()); + } + + @Test + public void testGetSplitsDoesNotAddNullSplitIfBaseReturnsEmptyList() throws IOException { + Mockito.doReturn(Collections.emptyList()).when(inputFormat).getBaseSplits(mockJobContext); + + List finalSplits = inputFormat.getSplits(mockJobContext); + Assert.assertEquals("Should not add a NULL split", 1, finalSplits.size()); + + DataDrivenDBInputFormat.DataDrivenDBInputSplit split = + (DataDrivenDBInputFormat.DataDrivenDBInputSplit) finalSplits.get(0); + Assert.assertEquals("1=1", split.getLowerClause()); + } +}