Q- How to create custom BlockingQueue in java? Or How to implement your own BlockingQueue?
There are some steps to implement your own BlockingQueue.
There are some steps to implement your own BlockingQueue.
- I am useing array to store elements in BlockingQueue internally. Size of this array should be maximum number of elements that we want to store in BlockingQueue at a time.
- Useing lock API and conditions objects to create custom BlockingQueue.
- Producer, add the element in the queue if size of element in the queqe is less than Max_Size.
- If size of element in the queqe is equal to Max_Size that means queue is full, than the producer will wait for queue to empty.
- Consumer, consum element from the queue if size of element in the queqe is greater than zero.
- If the queue is empty then the consumer will wait for the queue to get filled.
Now creating MyBlockingQueue class.
package com.shubh.blockingqueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class MyBlockingQueue {
// Maximum Capycity of queue
int max_size;
* Array to store element for CustomBlockingQueue I have use Object array so
* that we can use this for any type of object like Sting, Integer etc.
final Object[] array;
public MyBlockingQueue(int size) {
this.max_size = size;
array = new Object[max_size];
System.out.println("MyBlockingQueue - Max_Size: " + this.max_size);
final Lock lock = new ReentrantLock();
// Conditions for Producers and Consumers
final Condition producerCond = lock.newCondition();
final Condition consumerCond = lock.newCondition();
// Index for put elements in queue-(in Object Array)
int indexAdd = 0;
// Index for take elements from queue-(from Object Array)
int indexGet;
int count;
public void put(Object element) throws InterruptedException {
try {
while (count == array.length) {
// Queue is full, producers need to wait
array[indexAdd] = "" + element;
System.out.println("Producing - " + element);
if (indexAdd == array.length) {
indexAdd = 0;
// Increment the count for the array
} finally {
public Object take() throws InterruptedException {
try {
// checking if Queue is empty, consumers will to wait
while (count == 0) {
Object element = array[indexGet];
System.out.println("Consuming - " + element);
if (indexGet == array.length) {
indexGet = 0;
// reduce the count of array
// send signal producer
return element;
} finally {
Now Let test our custom BlockingQueue. Create Producer & Consumer
package com.shubh.blockingqueue;
public class Producer implements Runnable {
private MyBlockingQueue myBlockingQueue;
public Producer(MyBlockingQueue myBlockingQueue) {
this.myBlockingQueue = myBlockingQueue;
public void run() {
for (int i = 0; i <= 10; i++) {
try {
} catch (InterruptedException e) {
package com.shubh.blockingqueue;
public class Consumer implements Runnable {
private MyBlockingQueue myBlockingQueue;
public Consumer(MyBlockingQueue myBlockingQueue) {
this.myBlockingQueue = myBlockingQueue;
public void run() {
for (int i = 1; i <= 10; i++) {
try {
} catch (InterruptedException e) {
package com.shubh.blockingqueue;
public class MyBlockingQueueTest {
public static void main(String[] args) {
MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
// Creating producer and consumer threads
Thread producer = new Thread(new Producer(myBlockingQueue));
Thread consumer = new Thread(new Consumer(myBlockingQueue));
Output: We can see first 0-9 threads are produce and then 0-9 consume after that 10th thread produce.
MyBlockingQueue - Max_Size: 10
Producing - 0
Producing - 1
Producing - 2
Producing - 3
Consuming - 0
Consuming - 1
Consuming - 2
Consuming - 3
Producing - 4
Consuming - 4
Producing - 5
Consuming - 5
Producing - 6
Consuming - 6
Producing - 7
Consuming - 7
Producing - 8
Consuming - 8
Producing - 9
Consuming - 9
Producing - 10
No comments:
Post a Comment