Skip to content

Conversation

@mmenozzi
Copy link
Member

If for a given ProducerInstance for a flow, the method ProducerInstance::produceAndQueueJobs() is called concurrently multiple times (for example because the produces is an HttpRequestProducer and the same request arrives multiple times at the same time) there is the chance that same jobs are enqueued multiple times or, worst, some jobs are lost.

This is because the "batch" of the QueueManager is always the same and is shared between the multiple calls of ProducerInstance::produceAndQueueJobs().

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses a race condition that occurs when ProducerInstance::produceAndQueueJobs() is called concurrently for the same flow (e.g., when multiple HTTP requests arrive simultaneously for an HttpRequestProducer). The race condition could cause jobs to be enqueued multiple times or lost entirely due to concurrent access to a shared batch in the QueueManager.

Key changes:

  • Introduced batch IDs (UUIDs) to isolate concurrent batches in the QueueManager
  • Modified the ProducerQueueManagerInterface to require a batchId parameter for enqueue and flush operations
  • Updated QueueManager to maintain separate batches per batchId instead of a single shared batch

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

File Description
tests/Integration/HttpRequestProducerAndWorkerTest.php Added new test case to verify concurrent HTTP requests are properly handled without job loss or duplication
src/Service/QueueManager.php Refactored from single batch array to multi-batch map keyed by batchId, updated all batch operations to use batchId parameter
src/Service/ProducerQueueManagerInterface.php Extended interface methods (enqueue and flush) to accept batchId parameter for batch isolation
src/ProducerInstance.php Generate unique UUID for each produceAndQueueJobs invocation and pass it to enqueue/flush operations

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

return;
}

$this->batches[$batchId] = [];
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory leak: The emptyBatch method sets the batch to an empty array but doesn't remove the batchId key from the batches array. This means every unique batchId will permanently occupy memory even after its batch is processed. Since a new UUID is generated for each call to produceAndQueueJobs, this will cause unbounded memory growth over time. The method should use unset($this->batches[$batchId]) instead of setting it to an empty array.

Suggested change
$this->batches[$batchId] = [];
unset($this->batches[$batchId]);

Copilot uses AI. Check for mistakes.
Comment on lines 181 to 191
} catch (\Throwable $error) {
$this->logger->error(
'An error occurred producing/queueing jobs.',
[
'producer' => \get_class($this->producer),
'last_job_payload_data' => $job ? NonUtf8Cleaner::clean($job->getPayloadData()) : null,
'error' => $error->getMessage(),
'batch_id' => $batchId,
]
);
}
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing cleanup on error path: When an exception is caught in the produceAndQueueJobs method, the batch is not cleaned up from the QueueManager's batches map. This means if an error occurs during job production or enqueueing, the partially filled batch will remain in memory indefinitely, contributing to a memory leak. Consider adding a finally block or catch-all cleanup that calls flush or manually cleans up the batch using a new cleanup method on the QueueManager interface.

Copilot uses AI. Check for mistakes.
);
$httpPort = self::$kernel->getContainer()->getParameter('http_server_port');

Loop::delay(100, function () use ($httpPort){
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space before opening brace. Should be function () use ($httpPort) { instead of function () use ($httpPort){ to maintain consistency with the code style used elsewhere in the file (e.g., line 175).

Suggested change
Loop::delay(100, function () use ($httpPort){
Loop::delay(100, function () use ($httpPort) {

Copilot uses AI. Check for mistakes.
unset($this->batches[$batchId][$jobUuid]);
}

private function emptyBatch(string $batchId): void
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private function emptyBatch(string $batchId): void
private function clearBatch(string $batchId): void

I would rename this method to "clearBatch" to make it clearer that it does not test whether a batch is empty, it actually clears it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants