Class MemoryAwareThreadPoolExecutor

java.lang.Object
java.util.concurrent.AbstractExecutorService
java.util.concurrent.ThreadPoolExecutor
org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor
All Implemented Interfaces:
Executor, ExecutorService
Direct Known Subclasses:
FairOrderedMemoryAwareThreadPoolExecutor, OrderedMemoryAwareThreadPoolExecutor

public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor
A ThreadPoolExecutor which blocks the task submission when there's too many tasks in the queue. Both per-Channel and per-Executor limitation can be applied.

When a task (i.e. Runnable) is submitted, MemoryAwareThreadPoolExecutor calls ObjectSizeEstimator.estimateSize(Object) to get the estimated size of the task in bytes to calculate the amount of memory occupied by the unprocessed tasks.

If the total size of the unprocessed tasks exceeds either per-Channel or per-Executor threshold, any further execute(Runnable) call will block until the tasks in the queue are processed so that the total size goes under the threshold.

Using an alternative task size estimation strategy

Although the default implementation does its best to guess the size of an object of unknown type, it is always good idea to to use an alternative ObjectSizeEstimator implementation instead of the DefaultObjectSizeEstimator to avoid incorrect task size calculation, especially when: Here is an example that demonstrates how to implement an ObjectSizeEstimator which understands a user-defined object:
 public class MyRunnable implements Runnable {

     private final byte[] data;

     public MyRunnable(byte[] data) {
         this.data = data;
     }

     public void run() {
         // Process 'data' ..
     }
 }

 public class MyObjectSizeEstimator extends DefaultObjectSizeEstimator {

     @Override
     public int estimateSize(Object o) {
         if (o instanceof MyRunnable) {
             return ((MyRunnable) o).data.length + 8;
         }
         return super.estimateSize(o);
     }
 }

 ThreadPoolExecutor pool = new MemoryAwareThreadPoolExecutor(
         16, 65536, 1048576, 30, TimeUnit.SECONDS,
         new MyObjectSizeEstimator(),
         Executors.defaultThreadFactory());

 pool.execute(new MyRunnable(data));
 

Event execution order

Please note that this executor does not maintain the order of the ChannelEvents for the same Channel. For example, you can even receive a "channelClosed" event before a "messageReceived" event, as depicted by the following diagram. For example, the events can be processed as depicted below:
           --------------------------------> Timeline -------------------------------->

 Thread X: --- Channel A (Event 1) --- Channel A (Event 2) --------------------------->

 Thread Y: --- Channel A (Event 3) --- Channel B (Event 2) --- Channel B (Event 3) --->

 Thread Z: --- Channel B (Event 1) --- Channel B (Event 4) --- Channel A (Event 4) --->
 
To maintain the event order, you must use OrderedMemoryAwareThreadPoolExecutor.
  • Field Details

  • Constructor Details

    • MemoryAwareThreadPoolExecutor

      public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize)
      Creates a new instance.
      Parameters:
      corePoolSize - the maximum number of active threads
      maxChannelMemorySize - the maximum total size of the queued events per channel. Specify 0 to disable.
      maxTotalMemorySize - the maximum total size of the queued events for this pool Specify 0 to disable.
    • MemoryAwareThreadPoolExecutor

      public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit)
      Creates a new instance.
      Parameters:
      corePoolSize - the maximum number of active threads
      maxChannelMemorySize - the maximum total size of the queued events per channel. Specify 0 to disable.
      maxTotalMemorySize - the maximum total size of the queued events for this pool Specify 0 to disable.
      keepAliveTime - the amount of time for an inactive thread to shut itself down
      unit - the TimeUnit of keepAliveTime
    • MemoryAwareThreadPoolExecutor

      public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory)
      Creates a new instance.
      Parameters:
      corePoolSize - the maximum number of active threads
      maxChannelMemorySize - the maximum total size of the queued events per channel. Specify 0 to disable.
      maxTotalMemorySize - the maximum total size of the queued events for this pool Specify 0 to disable.
      keepAliveTime - the amount of time for an inactive thread to shut itself down
      unit - the TimeUnit of keepAliveTime
      threadFactory - the ThreadFactory of this pool
    • MemoryAwareThreadPoolExecutor

      public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory)
      Creates a new instance.
      Parameters:
      corePoolSize - the maximum number of active threads
      maxChannelMemorySize - the maximum total size of the queued events per channel. Specify 0 to disable.
      maxTotalMemorySize - the maximum total size of the queued events for this pool Specify 0 to disable.
      keepAliveTime - the amount of time for an inactive thread to shut itself down
      unit - the TimeUnit of keepAliveTime
      objectSizeEstimator - the ObjectSizeEstimator of this pool
      threadFactory - the ThreadFactory of this pool
  • Method Details