Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@

package io.cdap.plugin.db.source;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import io.cdap.plugin.db.ConnectionConfigAccessor;
import io.cdap.plugin.db.JDBCDriverShim;
import io.cdap.plugin.db.NoOpCommitConnection;
import io.cdap.plugin.db.TransactionIsolationLevel;
import io.cdap.plugin.util.DBUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
Expand All @@ -41,6 +45,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.List;
import java.util.Properties;

/**
Expand All @@ -64,6 +69,58 @@ public static void setInput(Configuration conf,
new ConnectionConfigAccessor(conf).setAutoCommitEnabled(enableAutoCommit);
}

/**
* A wrapper around the superclass getSplits method. This is required for testing,
* as mocking a method that calls its superclass implementation is challenging.
* This wrapper allows unit tests to override this method and mock the behavior of
* the superclass call, isolating the logic within the overridden getSplits method.
*
* @param job The job context.
* @return The list of input splits generated by the parent class.
* @throws IOException If an error occurs while getting the splits.
*/
@VisibleForTesting
List<InputSplit> getBaseSplits(JobContext job) throws IOException {
return super.getSplits(job);
}

@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = getBaseSplits(job);

// Handle the edge case where the base implementation returns no splits. In this scenario,
// create a single, all-encompassing split to ensure the job can proceed.
if (splits == null || splits.isEmpty()) {
return ImmutableList.<InputSplit>builder()
.add(new DataDrivenDBInputSplit("1=1", "1=1"))
.build();
}

// Check if a split for NULL values or an all-encompassing split ("1=1") already exists.
boolean shouldAddNullSplit = splits.stream()
.map(DataDrivenDBInputFormat.DataDrivenDBInputSplit.class::cast)
.map(DataDrivenDBInputFormat.DataDrivenDBInputSplit::getLowerClause)
.noneMatch(lowerClause ->
!Strings.isNullOrEmpty(lowerClause)
&& (lowerClause.contains("IS NULL") || lowerClause.contains("1=1")));

if (shouldAddNullSplit) {
// Create a dedicated split to handle potential NULL values in the split-by column.
String splitByColumn = getDBConf().getInputOrderBy();
LOG.debug("No split found for NULL values. Adding a split for '{} IS NULL'.", splitByColumn);
String clause = String.format("%s IS NULL", splitByColumn);
InputSplit nullSplit =
new DataDrivenDBInputFormat.DataDrivenDBInputSplit(clause, clause);

return ImmutableList.<InputSplit>builder()
.addAll(splits)
.add(nullSplit)
.build();
}

return splits;
}

@Override
public Connection getConnection() {
if (this.connection == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright © 2019 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.db.source;

import com.google.common.collect.ImmutableList;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

@RunWith(MockitoJUnitRunner.class)
public class DataDrivenETLDBInputFormatTest {

@Mock
private JobContext mockJobContext;
@Mock
private DBConfiguration mockDbConfiguration;

private DataDrivenETLDBInputFormat inputFormat;

@Before
public void setUp() {
inputFormat = Mockito.spy(new DataDrivenETLDBInputFormat());
Mockito.doReturn(mockDbConfiguration).when(inputFormat).getDBConf();
Mockito.doReturn("id").when(mockDbConfiguration).getInputOrderBy();
}

@Test
public void testGetSplitsAddsNullSplit() throws IOException {
DataDrivenDBInputFormat.DataDrivenDBInputSplit existingSplit =
new DataDrivenDBInputFormat.DataDrivenDBInputSplit("id >= 0", "id < 100");
List<InputSplit> initialSplits = ImmutableList.of(existingSplit);
Mockito.doReturn(initialSplits).when(inputFormat).getBaseSplits(mockJobContext);

List<InputSplit> finalSplits = inputFormat.getSplits(mockJobContext);

Assert.assertEquals("A new split for NULLs should be added", 2, finalSplits.size());

DataDrivenDBInputFormat.DataDrivenDBInputSplit nullSplit =
(DataDrivenDBInputFormat.DataDrivenDBInputSplit) finalSplits.get(1);
Assert.assertEquals("id IS NULL", nullSplit.getLowerClause());
Assert.assertEquals("id IS NULL", nullSplit.getUpperClause());
}

@Test
public void testGetSplitsDoesNotAddNullSplitIfPresent() throws IOException {
DataDrivenDBInputFormat.DataDrivenDBInputSplit existingSplit =
new DataDrivenDBInputFormat.DataDrivenDBInputSplit("id >= 0", "id < 100");
DataDrivenDBInputFormat.DataDrivenDBInputSplit nullSplit =
new DataDrivenDBInputFormat.DataDrivenDBInputSplit("id IS NULL", "id IS NULL");
List<InputSplit> initialSplits = ImmutableList.of(existingSplit, nullSplit);

Mockito.doReturn(initialSplits).when(inputFormat).getBaseSplits(mockJobContext);

List<InputSplit> finalSplits = inputFormat.getSplits(mockJobContext);

Assert.assertEquals("Should not add a duplicate NULL split", 2, finalSplits.size());
}

@Test
public void testGetSplitsDoesNotAddNullSplitIfSelectAllPresent() throws IOException {
DataDrivenDBInputFormat.DataDrivenDBInputSplit existingSplit =
new DataDrivenDBInputFormat.DataDrivenDBInputSplit("1=1", "1=1");
List<InputSplit> initialSplits = ImmutableList.of(existingSplit);

Mockito.doReturn(initialSplits).when(inputFormat).getBaseSplits(mockJobContext);

List<InputSplit> finalSplits = inputFormat.getSplits(mockJobContext);

Assert.assertEquals("Should not add a NULL split", 1, finalSplits.size());
}

@Test
public void testGetSplitsDoesNotAddNullSplitIfBaseReturnsNull() throws IOException {
Mockito.doReturn(null).when(inputFormat).getBaseSplits(mockJobContext);

List<InputSplit> finalSplits = inputFormat.getSplits(mockJobContext);
Assert.assertEquals("Should not add a NULL split", 1, finalSplits.size());

DataDrivenDBInputFormat.DataDrivenDBInputSplit split =
(DataDrivenDBInputFormat.DataDrivenDBInputSplit) finalSplits.get(0);
Assert.assertEquals("1=1", split.getLowerClause());
}

@Test
public void testGetSplitsDoesNotAddNullSplitIfBaseReturnsEmptyList() throws IOException {
Mockito.doReturn(Collections.emptyList()).when(inputFormat).getBaseSplits(mockJobContext);

List<InputSplit> finalSplits = inputFormat.getSplits(mockJobContext);
Assert.assertEquals("Should not add a NULL split", 1, finalSplits.size());

DataDrivenDBInputFormat.DataDrivenDBInputSplit split =
(DataDrivenDBInputFormat.DataDrivenDBInputSplit) finalSplits.get(0);
Assert.assertEquals("1=1", split.getLowerClause());
}
}
Loading