How do threads communicate?

Threads use shared objects as a mediators for their communication. This article shares different approaches for thread communication. Irrespective of the approach, all the approaches use the shared object technique for the thread communication


wait() and notify()
  1. wait() and notify() methods communicate each other with the help of conditions on a shared object. 
  2. Both the methods should be called from synchronized block to avoid a race condition.
Code Template
producer:

synchronized(connectionPool){
 while(connectionPool >= 1){
  connectionPool.wait();//wait if pool is not empty yet
 }
 connectionPool.add(elem);//produce and notify the consumer that pool is filled
 connectionPool.notify();  
}
  
Consumer:

synchronized(connectionPool){
 while(connectionPool==EMPTY){
  connectionPool.wait();//wait if pool is empty
 }
 connectionPool.consume();//Consume and notity the producer that pool is empty
    connectionPool.notify();   
}

BlockingQueue(Java 1.5)
  1. This is an enhanced and simplest version of Thread communication introduced in Java 1.5 which eliminates the use of synchronized block for thread communication
  2. ArrayBlockingQueue and LinkedBlockingQueue are the 2 implementations for this interface. 
  3. put() and take() methods has an inbuilt locking capability which is required for inter-thread communication.
Code Template
public static void main(String []args){
 
 BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue ();//Sychronized block is not required

 Thread producer = new Thread(()->{//PRODUCER
  try{
   sharedQueue.put(1234);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  }).start();
    
 Thread consumer = new Thread(()->{//CONSUMER
  sharedQueue.take();
 }).start();
}

Lock API(Java 1.5)
  1.  Lock API uses await() and signalAll() methods for thread communication. Conditions are created on Lock object and shared b/w multiple threads. Threads use this Condition object's await-singalAll methods to work collaboratively based on a given problem. 
  2. This approach is relatively more transparent and declarative than wait-notify approach.
  3. In case of Lock API, locking scope has to be set explicitly with lock and unlock methods. (In case of synchronized block locking happens implicitly which means the code that is surrounded by Synchronized block is locked implicitly)  
In the below example, share object queue is protected from multiple access using lock and unlock method. For PRODUCER thread and CONSUMER to communicate, they use the conditions created on the Lock object. 
  1. Consumer waits on the bufferEmpty condition since the queue is not full initially
  2. Later, producer fills the queue and signals the consumer saying bufferEmpty condition is changed now.
  3. Consumer wakes up with the signal and consumes element in the queue and signals the producer saying bufferFull condition is changed now.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class ProducerConsumerExample {

    static final int MAX_SIZE =10;
    final Queue<Integer> queue = new LinkedList<>();
    final Random random = new Random();

    Lock lock = new ReentrantLock();
    Condition bufferFull = lock.newCondition();
    Condition bufferEmpty = lock.newCondition();

    public static void main(String[] args) {
        ProducerConsumerExample sharedReference = new ProducerConsumerExample();

        new Thread(()->{
            sharedReference.get();
        }, "CONSUMER").start();
        new Thread(()->{
            sharedReference.put();
        }, "PRODUCER").start();
    }


    public void put(){
        lock.lock();
        try {
            while(queue.size() == MAX_SIZE){
                bufferFull.await();
            }
            int number = random.nextInt();
            queue.add(number);                        
            bufferEmpty.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    public void get(){
        lock.lock();
        try {
            while(queue.size() == 0){
               bufferEmpty.await();
            }
            int number = queue.poll();            
            bufferFull.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

Comments

Popular posts from this blog

Distributed database design using CAP theorem

SQL Analytical Functions - Partition by (to split resultset into groups)

Easy approach to work with files in Java - Java NIO(New input output)