Home

BufferTrigger Simple Introduction

Li

Li Wei

September 5, 20254 min read

Title: Simple Introduction to BufferTrigger

Overview

Aggregate large amounts of data or requests and then perform batch operations. Ideal for high‑concurrency or large‑data‑volume scenarios.

Choosing an Implementation

Should you use SimpleBufferTrigger or BatchConsumeBlockingQueueTrigger?

SimpleBufferTrigger

Test class:

  • Set the TriggerStrategy

    The source code provides a MultiIntervalTriggerStrategy.
    Just configure the time intervals and counts, e.g., “add 100 items every 1 s, 10 items every 3 s, 1 item every 5 s” – each condition will trigger.

    Note: The trigger fires only when both the time interval and the count thresholds are met.

  • Set the maximum buffer size

    When the number of elements in the buffer exceeds this limit, a rejection policy (implemented by you) is applied—e.g., discard the items, push them to Kafka, etc.

  • Back‑pressure

    This occurs when the consumption rate is slower than the production rate.
    If back‑pressure is enabled, the default BackPressureHandler strategy is activated: downstream components suppress upstream production. When the buffer reaches its maximum, the current thread waits on writeCondition. Upon consumption, a new buffer is created and all threads waiting on writeCondition are awakened, then the old buffer is consumed.

    Tip: Enabling back‑pressure often exhausts the thread pool, so in such cases it’s usually better to forward data to Kafka instead.

  • Implement the consumption logic

    Important: The callback receives the current consumption timestamp and all elements still stored in the buffer—not a per‑element iteration.

  • Set the bufferFactory

    Each consumption creates a new buffer and drains the previous one, so you need to provide a bufferFactory.

BatchConsumeBlockingQueueTrigger

  • Set batchSize

    The number of items each consumer thread processes per batch.

  • Implement the consumption logic

    Each batch processes exactly batchSize items.

  • Set the queue size (bufferSize)

    Implemented with a LinkedBlockingQueue. If the number of elements exceeds bufferSize, producer threads block.

  • Set the linger interval

    The consumption logic is executed every linger period.

Code Walkthrough

Methods in the bufferTrigger Interface

(details omitted)

Two Implementations of bufferTrigger

Class diagram:

  • SimpleBufferTrigger

    • Triggered only by a scheduled task that calls consume.
    • General‑purpose implementation suitable for most business cases.
    • The trigger strategy accounts for the execution time of the consumption callback:
      actual interval = scheduled interval – callback execution time.
      If the callback runs longer than the scheduled interval, the next consumption runs immediately.
    • Both the time interval and the count must be satisfied to fire.
    • Multi‑threaded consumption (each thread consumes one buffer) with single‑threaded writes.
    • Each consumption drains all elements in the current buffer and creates a new buffer.
  • BatchConsumeBlockingQueueTrigger

    • Consumption can be triggered either by elements being enqueued or by the scheduled task.
    • Enqueue flowScheduled consumption flow
    • Built on LinkedBlockingQueue, fitting the classic producer‑consumer pattern.
    • Trigger strategy similar to Kafka’s linger: consumption occurs when either the batch size threshold or the time‑delay threshold is reached.
    • Multi‑threaded writes, single‑threaded consumption. When the threshold or time interval is met, a callback consumes a batch of elements.

FAQ

  • Why check queue.size() > batchSize twice?
    After acquiring the lock, a scheduled task might have already consumed elements. Checking both before and after locking reduces lock contention (first check) and provides a safety net (second check) to avoid consuming an empty batch.

  • Why use the running flag?
    When enqueuing an element triggers consumption, a new thread is spawned to run the consumption logic.
    Without running, the following race can occur:
    Thread A acquires the lock, sees queue.size() > batchSize, spawns a consumer thread, and releases the lock. Immediately after, Thread B acquires the lock, sees the same condition, and also spawns a consumer thread. The first consumer processes the items; the subsequent threads wake up only to find nothing to consume, wasting resources.

  • Why does SimpleBufferTrigger use a read‑write lock?
    A read‑write lock allows:

    • Read‑read concurrency (multiple readers simultaneously)
    • Read‑write mutual exclusion
    • Write‑write mutual exclusion

    Enqueueing acquires the write lock, while consumption acquires the read lock. This enables single‑threaded writes to the buffer but allows multiple threads to consume different buffers concurrently.


Originally written by Li Wei (李唯_) and published in Chinese on 后端技术栈全书 (Full-Stack Backend Engineering). Translated and adapted for DriftSeas with permission.

Keep reading

More related articles from DriftSeas.