Advertisement
If you have a new account but are having problems posting or verifying your account, please email us on hello@boards.ie for help. Thanks :)
Hello all! Please ensure that you are posting a new thread or question in the appropriate forum. The Feedback forum is overwhelmed with questions that are having to be moved elsewhere. If you need help to verify your account contact hello@boards.ie

Java Producer Consumer Issue

Options
  • 10-11-2016 4:51pm
    #1
    Closed Accounts Posts: 6,075 ✭✭✭


    Can anyone explain how consumer-2 below is consuming a 'null'? My code should be preventing this.
    public class Test {
    
        public static void main(String args[]) throws InterruptedException {
    
            BoundedQueue<Integer> sharedQueue = new BoundedQueue<>(10);
    
            Callable<Integer> producer1 = new Producer(sharedQueue, "producer-1");
            Callable<Integer> producer2 = new Producer(sharedQueue, "producer-2");
            Callable<Integer> consumer1 = new Consumer(sharedQueue, "consumer-1");
            Callable<Integer> consumer2 = new Consumer(sharedQueue, "consumer-2");
    
            Collection<Callable<Integer>> callables = new HashSet<>();
            callables.add(producer1);
            callables.add(producer2);
            callables.add(consumer1);
            callables.add(consumer2);
    
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            executorService.invokeAll(callables);
        }
    }
    
    package com.bounded.queue;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class BoundedQueue<T> {
    
        private int capacity;
        private int head;
        private int tail;
        private int currentSizeOfBuffer;
        private T[] buffer;
    
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition notFull = lock.newCondition();
        private final Condition notEmpty = lock.newCondition();
    
        public BoundedQueue(int capacity) {
            this.capacity = capacity;
            this.buffer = (T[]) new Object[capacity];
        }
    
        public void put(T element) throws InterruptedException {
    
            final ReentrantLock lock = this.lock;
            lock.lock();
    
            if(isBufferFull()) {
                waitOnAvailableSlot();
            }
    
            try {
                buffer[tail] = element;
                tail = getNextAvailableSlot(tail);
                currentSizeOfBuffer++;
    
                informConsumerQueueHasElement();
    
            } finally {
                lock.unlock();
            }
        }
    
        private boolean isBufferFull() {
            return capacity == currentSizeOfBuffer;
        }
    
        private void waitOnAvailableSlot() throws InterruptedException {
            notFull.await();
        }
    
        private void informConsumerQueueHasElement() {
            notEmpty.signal();
        }
    
        public T take() throws InterruptedException {
    
            final ReentrantLock lock = this.lock;
            lock.lock();
    
            if(isBufferEmpty()) {
                waitOnAvailableElement();
            }
    
            try {
                T element = buffer[head];
                head = getNextAvailableSlot(head);
                currentSizeOfBuffer--;
    
                informProducerQueueHasSpaceAvailable();
    
                return element;
            } finally {
                lock.unlock();
            }
        }
    
        private boolean isBufferEmpty() {
            return 0 == currentSizeOfBuffer;
        }
    
        private void waitOnAvailableElement() throws InterruptedException {
            notEmpty.await();
        }
    
        private void informProducerQueueHasSpaceAvailable() {
            notFull.signal();
        }
    
        private final int getNextAvailableSlot(int currentSlotPosition) {
            int nextAvailableSlot = ++currentSlotPosition;
            return (nextAvailableSlot == capacity) ? 0 : nextAvailableSlot;
        }
    }
    
    package com.bounded.queue.jobs;
    
    import com.bounded.queue.BoundedQueue;
    
    import java.util.concurrent.Callable;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    public class Producer implements Callable<Integer> {
    
        private final BoundedQueue sharedQueue;
        private String name;
    
        @Override
        public Integer call() throws Exception {
    
            for(int i=0; i<10; i++){
                try {
                    sharedQueue.put(i);
                    System.out.println(name + " produced: " + i);
                } catch (InterruptedException ex) {
                    Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
            return null;
        }
    
        public Producer(BoundedQueue sharedQueue, String name) {
            this.sharedQueue = sharedQueue;
            this.name = name;
        }
    }
    
    package com.bounded.queue.jobs;
    
    import com.bounded.queue.BoundedQueue;
    
    import java.util.concurrent.Callable;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    public class Consumer implements Callable<Integer> {
    
        private final BoundedQueue sharedQueue;
        private String name;
    
        @Override
        public Integer call() throws Exception {
    
            while(true){    //what is happening here?
                try {
                    Integer element = (Integer) sharedQueue.take();
                    System.out.println(name + " consumed: "+ element);
                } catch (InterruptedException ex) {
                    Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        }
    
        public Consumer(BoundedQueue sharedQueue, String name) {
            this.sharedQueue = sharedQueue;
            this.name = name;
        }
    }
    

    Output:
    producer-2 produced: 0
    consumer-2 consumed: null
    consumer-1 consumed: 0
    producer-2 produced: 1
    producer-2 produced: 2
    consumer-2 consumed: 2
    consumer-1 consumed: 0
    producer-1 produced: 0
    consumer-2 consumed: 3
    etc
    

    Another run did this:
    producer-2 produced: 0
    consumer-1 consumed: 0
    consumer-2 consumed: null
    producer-1 produced: 0
    producer-2 produced: 1
    producer-1 produced: 1
    consumer-2 consumed: 0
    consumer-1 consumed: null
    consumer-2 consumed: 2
    etc
    


Comments

  • Closed Accounts Posts: 6,075 ✭✭✭IamtheWalrus


    I replaced
    if(isBufferFull()) {
                waitOnAvailableSlot();
            }
    

    with
    while(isBufferFull()) {
                waitOnAvailableSlot();
            }
    

    and
    if(isBufferEmpty()) {
                waitOnAvailableElement();
            }
    

    with
    while(isBufferEmpty()) {
                waitOnAvailableElement();
            }
    

    and it fixed the issue.


  • Moderators, Technology & Internet Moderators Posts: 1,335 Mod ✭✭✭✭croo


    Your change might have stopped the error occurring but being lazy and not wanting to debug all your code; I did wonder if the problem could be caused by you use of a Set of "callables" to test.
    Collection<Callable<Integer>> callables = [B]new HashSet<>()[/B];
    callables.add(producer1);
    callables.add(producer2);
    callables.add(consumer1);
    callables.add(consumer2);
    
    The Set has no order so while you add produce1 & 2 followed by consumers they might no necessarily be retrieved again from the queue in that order. And that might mean a consumer is called before a producer!
    This would explain the non-repeatable nature of your initial error.


Advertisement