Thursday, June 13, 2019

Custom BlockingQueue implementation In Java

Q- How to create custom BlockingQueue in java? Or How to implement your own BlockingQueue?

There are some steps to implement your own BlockingQueue.
  • I am useing array to store elements in BlockingQueue internally. Size of this array should be maximum number of elements that we want to store in BlockingQueue at a time.
  • Useing lock API and conditions objects to create custom BlockingQueue.
  • Producer, add the element in the queue if size of element in the queqe is less than Max_Size.
  • If size of element in the queqe is equal to Max_Size that means queue is full, than the producer will wait for queue to empty.
  • Consumer, consum element from the queue if size of element in the queqe is greater than zero.
  • If the queue is empty then the consumer will wait for the queue to get filled.

Now creating  MyBlockingQueue class.
  MyBlockingQueue.java  

package com.shubh.blockingqueue;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyBlockingQueue {

 // Maximum Capycity of queue
 int max_size;
 
 /*
  * Array to store element for CustomBlockingQueue I have use Object array so
  * that we can use this for any type of object like Sting, Integer etc.
  */
 final Object[] array;

 public MyBlockingQueue(int size) {
  this.max_size = size;
  array = new Object[max_size];
  System.out.println("MyBlockingQueue - Max_Size: " + this.max_size);
 }

 final Lock lock = new ReentrantLock();

 // Conditions for Producers and Consumers
 final Condition producerCond = lock.newCondition();
 final Condition consumerCond = lock.newCondition();

 // Index for put elements in queue-(in Object Array)
 int indexAdd = 0;
 // Index for take elements from queue-(from Object Array)
 int indexGet;

 int count;

 public void put(Object element) throws InterruptedException {

  lock.lock();
  try {
   while (count == array.length) {
    // Queue is full, producers need to wait
    producerCond.await();
   }

   array[indexAdd] = "" + element;
   System.out.println("Producing - " + element);
   indexAdd++;

   if (indexAdd == array.length) {
    indexAdd = 0;
   }

   // Increment the count for the array
   ++count;
   consumerCond.signal();
  } finally {
   lock.unlock();
  }
 }

 public Object take() throws InterruptedException {
  lock.lock();
  try {
   // checking if Queue is empty, consumers will to wait
   while (count == 0) {
    consumerCond.await();
   }
   Object element = array[indexGet];
   System.out.println("Consuming - " + element);
   indexGet++;
   if (indexGet == array.length) {
    indexGet = 0;
   }
   // reduce the count of array
   --count;
   // send signal producer
   producerCond.signal();
   return element;
  } finally {
   lock.unlock();
  }
 }
}

Now Let test our custom BlockingQueue. Create Producer & Consumer 
  Producer.java  

package com.shubh.blockingqueue;

public class Producer implements Runnable {

 private MyBlockingQueue myBlockingQueue;

 public Producer(MyBlockingQueue myBlockingQueue) {
  this.myBlockingQueue = myBlockingQueue;
 }

 @Override
 public void run() {
  for (int i = 0; i <= 10; i++) {
   try {
    myBlockingQueue.put(i);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }
 }
}

  Consumer.java  
package com.shubh.blockingqueue;

public class Consumer implements Runnable {
 private MyBlockingQueue myBlockingQueue;

 public Consumer(MyBlockingQueue myBlockingQueue) {
  this.myBlockingQueue = myBlockingQueue;
 }

 @Override
 public void run() {
  for (int i = 1; i <= 10; i++) {
   try {
    myBlockingQueue.take();
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }
 }
}

  MyBlockingQueueTest.java  

package com.shubh.blockingqueue;

public class MyBlockingQueueTest {
  
 public static void main(String[] args) {
  MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
  // Creating producer and consumer threads
  Thread producer = new Thread(new Producer(myBlockingQueue));
  Thread consumer = new Thread(new Consumer(myBlockingQueue)); 
  
  producer.start();
  consumer.start();
 }
}

Output: We can see first 0-9 threads are produce and then 0-9 consume after that 10th thread produce.

MyBlockingQueue - Max_Size: 10
Producing - 0
Producing - 1
Producing - 2
Producing - 3
Consuming - 0
Consuming - 1
Consuming - 2
Consuming - 3
Producing - 4
Consuming - 4
Producing - 5
Consuming - 5
Producing - 6
Consuming - 6
Producing - 7
Consuming - 7
Producing - 8
Consuming - 8
Producing - 9
Consuming - 9
Producing - 10

No comments:

Post a Comment