Tuesday, June 4, 2019

How To Create Custom Thread Pool

Q- What is ThreadPool?

Thread Pool is a pool of threads which reuses a fixed number of threads  to execute number of tasks.

Q- How To Create Custom Thread Pool in java? 

  Implementing ThreadPool using custom LinkedBlockingQueue in java  




ThreadPoolCustom.java 

package com.shubh.custom.threadpool;

import java.util.concurrent.LinkedBlockingQueue;

public class ThreadPoolCustom {
 private final int size;
 private final LinkedBlockingQueue queue;
 /*
  * if shutdown pool will be initiated, poolShutDownInitiated will become true.
  */
 private boolean poolShutDownInitiated = false;

 public ThreadPoolCustom(int size) {
  this.size = size;
  /* create queue of given size */
  queue = new LinkedBlockingQueue(size);

  /* create thread pool of given size */
  for (int i = 0; i < size; i++) {
   WorkerPool workerPool = new WorkerPool(queue, this);
   // workerPool.setName("Thread-" + i);
   System.out.println("Thread-" + i + " created in WorkerPool.");
   workerPool.start();
  }
 }

 /* this method is used to execute Runnable task */
 public void execute(Runnable task) throws Exception {
  if (this.poolShutDownInitiated)
   throw new Exception("ThreadPoolCustom has been shutDown, no further tasks can be added");

  synchronized (queue) {
   queue.add(task);
   queue.notify();
  }
 }

 /* this method is used to check poolShutDownInitiated */
 public boolean isPoolShutDownInitiated() {
  return poolShutDownInitiated;
 }

 /* this method is used to check all thread are Terminated */
 public boolean isTerminated() {
  boolean isTerminated = false;
  if (this.poolShutDownInitiated && this.queue.size() == 0) {
   isTerminated = true;
  }
  return isTerminated;
 }

 /* this method is used to call shutdown executor */
 public synchronized void shutdown() {
  this.poolShutDownInitiated = true;
  System.out.println("ThreadPoolCustom SHUTDOWN initiated.");
 }
}

WorkerPool.java

package com.shubh.custom.threadpool;

import java.util.concurrent.LinkedBlockingQueue;

public class WorkerPool extends Thread {

 private LinkedBlockingQueue queue;
 private ThreadPoolCustom threadPoolCustom;

 public WorkerPool(LinkedBlockingQueue queue, ThreadPoolCustom threadPoolCustom) {
  this.queue = queue;
  this.threadPoolCustom = threadPoolCustom;
 }

 public void run() {
  try {
   while (true) {
    synchronized (queue) {
     while (queue.isEmpty()) {
      try {
       /* wait if queue is empty */
       queue.wait();
      } catch (InterruptedException e) {
       System.out.println("An error occurred while queue is waiting: " + e.getMessage());
      }
     }
     /* take runnable task from queue */
     Runnable task = queue.poll();

     /* run runnable task */
     task.run();

     /* if shutdown is Initiated */
     if (this.threadPoolCustom.isPoolShutDownInitiated() && this.queue.size() == 0) {
      this.interrupt();
      System.out.println("Interrupt call.");
      Thread.sleep(1);
     }
    }
   }
  } catch (RuntimeException | InterruptedException e) {
   System.out.println("WorkerPool is interrupted due to an issue: " + e.getMessage());
  }
 }
}

Task.java

package com.shubh.custom.threadpool;

public class Task implements Runnable {
  
    private String command;
    
    public Task(String s){
        this.command=s;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+" Start. Command = "+command);
        processCommand();
        System.out.println(Thread.currentThread().getName()+" End.");
    }

    private void processCommand() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString(){
        return this.command;
    }
}

CustomThreadPoolMain.java

package com.shubh.custom.threadpool;

public class CustomThreadPoolMain {

 public static void main(String[] args) {
  try {
   ThreadPoolCustom executor = new ThreadPoolCustom(5);
   for (int i = 0; i < 10; i++) {
    Runnable task = new Task("" + i);
    executor.execute(task);

    /* use if we want to shutdown before all Runnable task execute */
    /*
     * if (i == 6) { executor.shutdown(); }
     */

   }
   executor.shutdown();
   while (!executor.isTerminated()) {
   }
   System.out.println("Finished all threads");
  } catch (Exception e) {
   System.out.println("ThreadPoolCustom has been shutDown, no further tasks can be added");
  }
 }
}

  Output:  

Thread-0 created in WorkerPool.
Thread-1 created in WorkerPool.
Thread-2 created in WorkerPool.
Thread-3 created in WorkerPool.
Thread-4 created in WorkerPool.
Thread-0 Start. Command = 0
Thread-0 End.
Thread-2 Start. Command = 1
Thread-2 End.
Thread-1 Start. Command = 2
Thread-1 End.
Thread-0 Start. Command = 3
Thread-0 End.
Thread-4 Start. Command = 4
Thread-4 End.
Thread-3 Start. Command = 5
Thread-3 End.
Thread-2 Start. Command = 6
Thread-2 End.
Thread-1 Start. Command = 7
Thread-1 End.
Thread-0 Start. Command = 8
Thread-0 End.
ThreadPoolCustom SHUTDOWN initiated.
Thread-4 Start. Command = 9
Finished all threads
Thread-4 End.
Interrupt call.
WorkerPool is interrupted due to an issue: sleep interrupted


No comments:

Post a Comment