Skip to content

Conversation

@umputun
Copy link
Member

@umputun umputun commented Dec 28, 2025

Summary

  • Fix critical bugs in pool implementation: batch channel closure, metrics timer recording, pool completion callback timing
  • Improve test coverage: add tests for edge cases in batching, chunking, metrics, and middleware
  • Expand examples: add 3 new examples (basic, chunking, pool_completion) with READMEs
  • Fix linter issues across all examples

Bug Fixes

  • Fix batch channel closure: close dedicated batch channels when using WithChunkFn
  • Fix metrics timer: record processing time even when worker returns error
  • Fix pool completion callback: ensure it runs on context deadline exceeded (not just cancellation)

Test Coverage Improvements

  • Add tests for batch channel closure with chunking
  • Add tests for metrics timer recording on errors
  • Add tests for pool completion callback edge cases
  • Add middleware Timeout tests

New Examples

  • basic: minimal hello world (~40 lines)
  • chunking: demonstrates WithChunkFn for key-based routing
  • pool_completion: shows pool vs worker completion callbacks

Example Fixes

Fixed structural linter issues across all 10 examples:

  • exitAfterDefer: use return instead of log.Fatal after defer
  • shadow: rename shadowed variables
  • intrange: use range over int (Go 1.22+)
  • modernize: use strings.FieldsSeq, interface{} -> any
  • prealloc: preallocate slices where size is known

- Close() now always closes channels even on context timeout
- Close() extracted flushAccumulators() for cleaner code structure
- Go() activation check made atomic with CompareAndSwap
- WithBatchSize() handles negative values (treats as 0)
- Wait-time metrics now calculated after processing completes
- Added GetStats() thread-safety documentation

Tests added for: double Go() activation, negative batch size,
Close/Wait context timeout, pre-cancelled context, batch flush
deadlock, worker completion error combination, middleware edge
cases (invalid Retry/Timeout values, Recovery with int panic)
- add basic example: minimal hello world for getting started
- add chunking example: demonstrates WithChunkFn for key-based routing
- add pool_completion example: shows pool completion callback usage
- add README.md for all new examples
- fix structural linter issues across all examples:
  - exitAfterDefer: use return instead of log.Fatal after defer
  - shadow: rename shadowed variables
  - intrange: use range over int (Go 1.22+)
  - modernize: use strings.FieldsSeq, interface{} -> any
  - prealloc: preallocate slices where size is known
- update examples/README.md with new examples
Copilot AI review requested due to automatic review settings December 28, 2025 20:58
@umputun umputun merged commit 7799cbe into master Dec 28, 2025
6 checks passed
@umputun umputun deleted the fix-pool-bugs-and-tests branch December 28, 2025 21:00
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes critical pool bugs, significantly improves test coverage, and enhances documentation with three new examples. The changes include fixing batch channel closure, metrics timer recording, and pool completion callback timing, along with addressing linter issues across all examples.

Key Changes:

  • Fixed metrics timer to record processing time even on worker errors by ensuring procEndTmr() is called before early returns
  • Enhanced Close/Wait methods to respect context timeouts and deadlines, preventing indefinite hangs
  • Added comprehensive edge case tests for batching, timeouts, double activation, and worker completion

Reviewed changes

Copilot reviewed 21 out of 31 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
pool.go Fixed metrics timer placement, added atomic.Bool for activation guard, improved Close/Wait with context-aware timeout handling, added negative batch size normalization, fixed typo in error message
pool_test.go Added 10 new tests covering edge cases: double activation, negative batch size, timeout handling, pre-cancelled contexts, deadlock prevention, and worker completion error scenarios
middleware/middleware_test.go Added tests for invalid values in Retry and Timeout middlewares, added panic recovery test with non-error types
metrics/metrics.go Enhanced documentation for GetStats() warning about concurrent access during active processing
examples/basic/* New minimal "hello world" example demonstrating simplest pool usage
examples/chunking/* New example showing WithChunkFn for consistent work distribution by key
examples/pool_completion/* New example demonstrating pool completion callback for final aggregation
examples/tokenizer_stateless/main.go Fixed exitAfterDefer linter issue, modernized with strings.FieldsSeq
examples/tokenizer_stateful/main.go Fixed exitAfterDefer linter issue, modernized with strings.FieldsSeq
examples/parallel_files/main.go Fixed shadow variable linter issue
examples/middleware/main.go Updated to use any instead of interface{}, used range over int
examples/direct_chain/main.go Preallocated results slice with expected capacity
examples/collectors_chain/main.go Fixed shadowed variable name, preallocated results slice
examples/collector_errors/main.go Removed unused os import, preallocated results slice, used range over int, removed emoji in output for better compatibility
examples/*/go.sum Updated dependencies to latest versions (stretchr/testify 1.11.1, golang.org/x/sync 0.19.0, golang.org/x/time 0.14.0)
examples/README.md Added documentation for new examples and reorganized structure

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +467 to +477
done := make(chan error, 1)
go func() {
done <- p.eg.Wait()
}()

select {
case err := <-done:
return err
case <-ctx.Done():
return ctx.Err()
}
Copy link

Copilot AI Dec 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential goroutine leak: when the context times out (line 476), the goroutine spawned at line 468-470 continues to wait on p.eg.Wait() indefinitely. Since the done channel is buffered with size 1, the goroutine won't block on the send, but it will remain alive until p.eg.Wait() completes. While this might eventually complete, in scenarios where workers are truly stuck, this could leak goroutines. Consider documenting this behavior or ensuring workers respect context cancellation.

Copilot uses AI. Check for mistakes.
Comment on lines +522 to 532
done := make(chan error, 1)
go func() {
done <- p.eg.Wait()
}()

select {
case err := <-done:
return err
case <-ctx.Done():
return ctx.Err()
default:
}
Copy link

Copilot AI Dec 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential goroutine leak: when the context times out (line 531), the goroutine spawned at line 523-525 continues to wait on p.eg.Wait() indefinitely. Since the done channel is buffered with size 1, the goroutine won't block on the send, but it will remain alive until p.eg.Wait() completes. While this might eventually complete, in scenarios where workers are truly stuck, this could leak goroutines. Consider documenting this behavior or ensuring workers respect context cancellation.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants