Skip to content

Commit 7644e06

Browse files
feat(plugins): add the FileCompletionStrategy concept (EoF, Daily) to support managing long-lived files
1 parent fcfaa19 commit 7644e06

File tree

18 files changed

+1174
-25
lines changed

18 files changed

+1174
-25
lines changed

checkstyle/suppressions.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
<suppress checks="NPathComplexity" files="DefaultFileRecordsPollingConsumer.java"/>
4343
<suppress checks="NPathComplexity" files="DefaultFileSystemMonitor.java"/>
4444
<suppress checks="ParameterNumber" files="InternalFilterContext" />
45+
<suppress checks="ParameterNumber" files="DefaultFileSystemMonitor" />
4546
<suppress checks="Header" files="kafka-connect-source-file-pulse-version.properties"/>
4647
<suppress checks="Header" files=".*.properties"/>
4748
<suppress checks="[a-zA-Z0-9]*" files="src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/parser/antlr4/*"/>
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (c) StreamThoughts
4+
*
5+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*/
7+
package io.streamthoughts.kafka.connect.filepulse.source;
8+
9+
import java.util.Map;
10+
11+
/**
12+
* Strategy interface for determining when a file should be marked as COMPLETED.
13+
*/
14+
public interface FileCompletionStrategy {
15+
16+
/**
17+
* Configure this strategy.
18+
*
19+
* @param configs the configuration properties.
20+
*/
21+
default void configure(final Map<String, ?> configs) {
22+
// Default: no-op
23+
}
24+
25+
/**
26+
* Check if the file should be marked as completed based on the strategy.
27+
*
28+
* @param context the file object context.
29+
* @return true if the file should be marked as COMPLETED, false otherwise.
30+
*/
31+
boolean shouldComplete(final FileObjectContext context);
32+
}

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileObjectStatus.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ public enum FileObjectStatus {
3232
*/
3333
READING,
3434

35+
/**
36+
* The file processing is partially completed (e.g. for long-lived processing).
37+
*/
38+
PARTIALLY_COMPLETED,
39+
3540
/**
3641
* The file processing is completed.
3742
*/
@@ -59,4 +64,4 @@ public boolean isOneOf(final FileObjectStatus...states) {
5964
public boolean isDone() {
6065
return isOneOf(COMMITTED, FAILED, CLEANED);
6166
}
62-
}
67+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (c) StreamThoughts
4+
*
5+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*/
7+
package io.streamthoughts.kafka.connect.filepulse.source;
8+
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
/**
13+
* Optional strategy interface that can be implemented alongside {@link FileCompletionStrategy}
14+
* to influence when the connector should attempt to read from continuously appended files.
15+
*
16+
* <p>This is intended for long-lived, append-only files (for example, daily log files) where
17+
* it may be desirable to back off read attempts when no new data is expected for some time,
18+
* in order to avoid unnecessary polling or timeouts while still allowing the completion
19+
* strategy to control when the file is eventually marked as COMPLETED.
20+
*/
21+
public interface LongLivedFileReadStrategy {
22+
23+
Logger LOG = LoggerFactory.getLogger(LongLivedFileReadStrategy.class);
24+
25+
/**
26+
* Determines whether the connector should attempt to read from the given file
27+
* based on its current context.
28+
*
29+
* <p>The default implementation checks if the file has been modified since
30+
* the last read offset timestamp.
31+
*
32+
* @param objectMeta the file object metadata.
33+
* @param offset the last read offset for the file.
34+
* @return true if the connector should attempt to read from the file, false otherwise.
35+
*/
36+
default boolean shouldAttemptRead(final FileObjectMeta objectMeta, final FileObjectOffset offset) {
37+
return objectMeta.lastModified() > offset.timestamp();
38+
}
39+
}

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/StateListener.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,19 @@ public interface StateListener {
4040
*/
4141
void onCompleted(final FileObjectContext context);
4242

43+
/**
44+
* This method is invoked when a source file processing is partially completed.
45+
* @see FileRecordsPollingConsumer
46+
*
47+
* @param context the file context.
48+
*/
49+
void onPartiallyCompleted(final FileObjectContext context);
50+
4351
/**
4452
* This method is invoked when an error occurred while processing a source file.
4553
* @see FileRecordsPollingConsumer
4654
*
4755
* @param context the file context.
4856
*/
4957
void onFailure(final FileObjectContext context, final Throwable t);
50-
}
58+
}

connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/NonBlockingBufferReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.nio.charset.StandardCharsets;
1616
import java.util.LinkedList;
1717
import java.util.List;
18+
import org.apache.commons.lang3.StringUtils;
1819
import org.apache.kafka.connect.errors.ConnectException;
1920
import org.slf4j.Logger;
2021
import org.slf4j.LoggerFactory;
@@ -178,7 +179,7 @@ private boolean fillWithBufferedLinesUntil(final List<TextBlock> records,
178179
TextBlock line;
179180
do {
180181
line = tryToExtractLine();
181-
if (line != null) {
182+
if (line != null && !StringUtils.isEmpty(line.data())) {
182183
records.add(line);
183184
}
184185
maxNumRecordsNotReached = records.size() < minRecords;

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/CommonSourceConfig.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import io.streamthoughts.kafka.connect.filepulse.internal.StringUtils;
1515
import io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy;
1616
import io.streamthoughts.kafka.connect.filepulse.source.DefaultTaskPartitioner;
17+
import io.streamthoughts.kafka.connect.filepulse.source.EofCompletionStrategy;
18+
import io.streamthoughts.kafka.connect.filepulse.source.FileCompletionStrategy;
1719
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffsetPolicy;
1820
import io.streamthoughts.kafka.connect.filepulse.source.TaskPartitioner;
1921
import io.streamthoughts.kafka.connect.filepulse.state.FileObjectStateBackingStore;
@@ -42,6 +44,12 @@ public class CommonSourceConfig extends AbstractConfig {
4244
public static final String FS_LISTING_FILTERS_CONFIG = "fs.listing.filters";
4345
private static final String FS_SCAN_FILTERS_DOC = "Filters classes which are used to apply list input files.";
4446

47+
public static final String FS_COMPLETION_STRATEGY_CLASS_CONFIG = "fs.completion.strategy.class";
48+
private static final String FS_COMPLETION_STRATEGY_CLASS_DOC =
49+
"The FileCompletionStrategy class to determine when files should be marked as COMPLETED. " +
50+
"Default is EofCompletionStrategy (completes when fully read). " +
51+
"Use DailyCompletionStrategy for time-based completion (e.g., daily files).";
52+
4553
public static final String TASKS_FILE_READER_CLASS_CONFIG = "tasks.reader.class";
4654
private static final String TASKS_FILE_READER_CLASS_DOC = "Class which is used by tasks to read an input file.";
4755

@@ -261,6 +269,13 @@ public static ConfigDef getConfigDef() {
261269
groupCounter++,
262270
ConfigDef.Width.NONE,
263271
CONNECT_SCHEMA_KEEP_LEADING_UNDERSCORES_ON_FIELD_NAME_CONFIG
272+
)
273+
.define(
274+
FS_COMPLETION_STRATEGY_CLASS_CONFIG,
275+
ConfigDef.Type.CLASS,
276+
EofCompletionStrategy.class,
277+
ConfigDef.Importance.MEDIUM,
278+
FS_COMPLETION_STRATEGY_CLASS_DOC
264279
);
265280
}
266281

@@ -303,6 +318,10 @@ public String getValueSchemaConditionTopicPattern() {
303318
return getString(RECORD_VALUE_SCHEMA_CONDITION_TOPIC_PATTERN_CONFIG);
304319
}
305320

321+
public FileCompletionStrategy getFileCompletionStrategy() {
322+
return getConfiguredInstance(FS_COMPLETION_STRATEGY_CLASS_CONFIG, FileCompletionStrategy.class);
323+
}
324+
306325
public Schema getValueConnectSchema() {
307326
String valueConnectSchemaTypeString = getString(RECORD_VALUE_SCHEMA_TYPE_CONFIG);
308327
ConnectSchemaType schemaType = ConnectSchemaType.getForNameIgnoreCase(valueConnectSchemaTypeString);
@@ -352,4 +371,4 @@ private Schema readConfigSchema() {
352371
"Failed to read connect-schema for '" + CommonSourceConfig.RECORD_VALUE_SCHEMA_CONFIG + "'", e);
353372
}
354373
}
355-
}
374+
}

0 commit comments

Comments
 (0)