BufferTrigger Simple Introduction
Li Wei
Title: Simple Introduction to BufferTrigger
Overview
Aggregate large amounts of data or requests and then perform batch operations. Ideal for high‑concurrency or large‑data‑volume scenarios.
Choosing an Implementation
Should you use SimpleBufferTrigger or BatchConsumeBlockingQueueTrigger?
SimpleBufferTrigger
Test class:
Set the TriggerStrategy
The source code provides a
MultiIntervalTriggerStrategy.
Just configure the time intervals and counts, e.g., “add 100 items every 1 s, 10 items every 3 s, 1 item every 5 s” – each condition will trigger.Note: The trigger fires only when both the time interval and the count thresholds are met.
Set the maximum buffer size
When the number of elements in the buffer exceeds this limit, a rejection policy (implemented by you) is applied—e.g., discard the items, push them to Kafka, etc.
Back‑pressure
This occurs when the consumption rate is slower than the production rate.
If back‑pressure is enabled, the defaultBackPressureHandlerstrategy is activated: downstream components suppress upstream production. When the buffer reaches its maximum, the current thread waits onwriteCondition. Upon consumption, a new buffer is created and all threads waiting onwriteConditionare awakened, then the old buffer is consumed.Tip: Enabling back‑pressure often exhausts the thread pool, so in such cases it’s usually better to forward data to Kafka instead.
Implement the consumption logic
Important: The callback receives the current consumption timestamp and all elements still stored in the buffer—not a per‑element iteration.
Set the bufferFactory
Each consumption creates a new buffer and drains the previous one, so you need to provide a
bufferFactory.
BatchConsumeBlockingQueueTrigger
Set
batchSizeThe number of items each consumer thread processes per batch.
Implement the consumption logic
Each batch processes exactly
batchSizeitems.Set the queue size (
bufferSize)Implemented with a
LinkedBlockingQueue. If the number of elements exceedsbufferSize, producer threads block.Set the linger interval
The consumption logic is executed every
lingerperiod.
Code Walkthrough
Methods in the bufferTrigger Interface
(details omitted)
Two Implementations of bufferTrigger
Class diagram:
SimpleBufferTrigger
- Triggered only by a scheduled task that calls
consume. - General‑purpose implementation suitable for most business cases.
- The trigger strategy accounts for the execution time of the consumption callback:
actual interval = scheduled interval – callback execution time.
If the callback runs longer than the scheduled interval, the next consumption runs immediately. - Both the time interval and the count must be satisfied to fire.
- Multi‑threaded consumption (each thread consumes one buffer) with single‑threaded writes.
- Each consumption drains all elements in the current buffer and creates a new buffer.
- Triggered only by a scheduled task that calls
BatchConsumeBlockingQueueTrigger
- Consumption can be triggered either by elements being enqueued or by the scheduled task.
- Enqueue flow → Scheduled consumption flow
- Built on
LinkedBlockingQueue, fitting the classic producer‑consumer pattern. - Trigger strategy similar to Kafka’s
linger: consumption occurs when either the batch size threshold or the time‑delay threshold is reached. - Multi‑threaded writes, single‑threaded consumption. When the threshold or time interval is met, a callback consumes a batch of elements.
FAQ
Why check
queue.size() > batchSizetwice?
After acquiring the lock, a scheduled task might have already consumed elements. Checking both before and after locking reduces lock contention (first check) and provides a safety net (second check) to avoid consuming an empty batch.Why use the
runningflag?
When enqueuing an element triggers consumption, a new thread is spawned to run the consumption logic.
Withoutrunning, the following race can occur:
Thread A acquires the lock, seesqueue.size() > batchSize, spawns a consumer thread, and releases the lock. Immediately after, Thread B acquires the lock, sees the same condition, and also spawns a consumer thread. The first consumer processes the items; the subsequent threads wake up only to find nothing to consume, wasting resources.Why does SimpleBufferTrigger use a read‑write lock?
A read‑write lock allows:- Read‑read concurrency (multiple readers simultaneously)
- Read‑write mutual exclusion
- Write‑write mutual exclusion
Enqueueing acquires the write lock, while consumption acquires the read lock. This enables single‑threaded writes to the buffer but allows multiple threads to consume different buffers concurrently.
Originally written by Li Wei (李唯_) and published in Chinese on 后端技术栈全书 (Full-Stack Backend Engineering). Translated and adapted for DriftSeas with permission.