Skip to content

Conversation

@AlexisSouquiere
Copy link
Contributor

@AlexisSouquiere AlexisSouquiere commented Dec 15, 2025

🎯 Overview

This PR introduces a new file completion strategy system that allows users to control when files should be marked as COMPLETED, addressing the limitations of the previous immediate completion on EOF behavior. This is particularly valuable for long-lived, continuously appended files such as daily log files.

💡 Motivation

Problem

Previously, Kafka Connect File Pulse would always mark files as COMPLETED immediately when they were fully read (EOF reached). While this works perfectly for static files, it creates significant challenges for continuously appended files:

  1. Daily/Rolling Files: Files that are written throughout the day (e.g., app-2025-12-08.log) would be marked complete as soon as they're first read, preventing the connector from reading additional data appended later
  2. Streaming Data Files: Files receiving real-time data streams would be completed prematurely
  3. No Control: Users had no way to specify custom completion logic based on their use case

Solution

I have implemented a strategy pattern that allows users to:

  • Configure different completion strategies based on their use case
  • Implement custom completion logic without modifying core connector code
  • Maintain full backward compatibility with existing configurations

🏗️ Architecture

New Components

1. FileCompletionStrategy Interface (API)

Location: connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileCompletionStrategy.java

Core strategy interface that defines:

public interface FileCompletionStrategy {
    void configure(Map<String, ?> configs);
    boolean shouldComplete(FileObjectContext context);
}

2. LongLivedFileReadStrategy Interface (API)

Location: connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/LongLivedFileReadStrategy.java

Optional interface for strategies managing long-lived files:

public interface LongLivedFileReadStrategy {
    boolean shouldAttemptRead(FileObjectMeta objectMeta, FileObjectOffset offset);
}

This interface enables read deferral - strategies can signal to the connector when NOT to attempt reading a file (e.g., when no new data is expected), avoiding unnecessary polling and timeouts while still allowing eventual completion.

3. EofCompletionStrategy (Default)

Location: connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/EofCompletionStrategy.java

Default implementation that maintains 100% backward compatibility:

  • Marks files as COMPLETED immediately when fully read
  • No-configuration required
  • Optimal for static files

4. DailyCompletionStrategy

Location: connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/DailyCompletionStrategy.java

New strategy for daily files that:

  • Extracts the date from the filename using a regex pattern
  • Schedules completion for the next day at a configured time

Example:

  • File: logs-2025-12-08.log (created Dec 8, 2025 at 6:00 AM)
  • Completion time: 01:00:00
  • Completion scheduled for: December 9, 2025 at 01:00:00
  • Benefit: Captures all data written during Dec 8, plus a 1-hour buffer period

5. DailyCompletionStrategyConfig

Location: connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/DailyCompletionStrategyConfig.java

Configuration class with validators for:

  • daily.completion.schedule.time: Time in HH:mm:ss format (default: 00:01:00)
  • daily.completion.schedule.date.pattern: Regex to extract date (default: .*?(\\d{4}-\\d{2}-\\d{2}).*)
  • daily.completion.schedule.date.format: Date format pattern (default: yyyy-MM-dd)

Modified Components

1. CommonSourceConfig

  • Added FS_COMPLETION_STRATEGY_CLASS_CONFIG configuration property
  • Default value: EofCompletionStrategy.class (backward compatible)
  • Added getFileCompletionStrategy() method to instantiate configured strategy

2. DefaultFileRecordsPollingConsumer

  • Integrated FileCompletionStrategy into the polling logic
  • Uses strategy's shouldComplete() to determine when to mark files as COMPLETED

3. DefaultFileSystemMonitor

  • Integrated LongLivedFileReadStrategy support
  • Filters files based on shouldAttemptRead() to avoid unnecessary polling

4. FilePulseSourceTask & FilePulseSourceConnector

  • Configure and pass FileCompletionStrategy to consumer
  • Ensure proper initialization and configuration propagation

🔄 Backward Compatibility

100% Backward Compatible

  • Default behavior unchanged: Existing connectors will continue to use EofCompletionStrategy (immediate completion)
  • No configuration changes required: Users don't need to modify existing configurations
  • No breaking API changes: All existing interfaces and classes remain unchanged

Migration Path

Existing configurations continue to work as-is:

{
  "name": "my-existing-connector",
  "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"
  // No fs.completion.strategy.class specified = uses EofCompletionStrategy (default)
}

To adopt new functionality, simply add the configuration:

{
  "name": "my-connector",
  "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
  "fs.completion.strategy.class": "io.streamthoughts.kafka.connect.filepulse.source.DailyCompletionStrategy",
  "daily.completion.schedule.time": "01:00:00",
  "daily.completion.schedule.date.pattern": ".*?(\\d{4}-\\d{2}-\\d{2}).*",
  "daily.completion.schedule.date.format": "yyyy-MM-dd"
}

📋 Configuration

Common Configuration

Property Description Default Importance
fs.completion.strategy.class FileCompletionStrategy class to use EofCompletionStrategy MEDIUM

DailyCompletionStrategy Configuration

Property Description Default Importance
daily.completion.schedule.time Time to complete files (HH:mm:ss) 00:01:00 HIGH
daily.completion.schedule.date.pattern Regex to extract date from filename .*?(\\d{4}-\\d{2}-\\d{2}).* HIGH
daily.completion.schedule.date.format Date format pattern for parsing yyyy-MM-dd HIGH

🎯 Use Cases

Use Case 1: Daily Log Files

Scenario: Application writes to app-2025-12-08.log throughout December 8th. You want to continuously read new data but only mark the file complete at 1 AM on December 9th.

Configuration:

{
  "fs.completion.strategy.class": "io.streamthoughts.kafka.connect.filepulse.source.DailyCompletionStrategy",
  "daily.completion.schedule.time": "01:00:00",
  "daily.completion.schedule.date.pattern": ".*?(\\d{4}-\\d{2}-\\d{2}).*",
  "daily.completion.schedule.date.format": "yyyy-MM-dd"
}

Use Case 2: Static Files (Backward Compatible)

Scenario: Standard file ingestion - complete files immediately when fully read.

Configuration: None needed (default behavior).

Use Case 3: Custom Completion Logic

Scenario: Implement your own completion strategy based on file size, record count, external triggers, etc.

Implementation: Implement FileCompletionStrategy interface and configure your custom class.

🚀 Benefits

  1. Flexibility: Users can choose the right completion strategy for their use case
  2. Extensibility: Easy to implement custom strategies without modifying core code
  3. Efficiency: LongLivedFileReadStrategy reduces unnecessary polling and resource usage
  4. Backward Compatible: Zero impact on existing deployments

do {
line = tryToExtractLine();
if (line != null) {
if (line != null && !StringUtils.isEmpty(line.data())) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When appending new lines, a "\n" is added on the last read line from the previous poll. The reader resumes on the \n, then adds an empty line in the records (because tryToExtractLine "removes" \n), which causes an empty message to be produced.
Adding this isEmpty check solves this issue

@AlexisSouquiere AlexisSouquiere marked this pull request as ready for review December 18, 2025 09:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant