Продюсер реализации потребительского сценария в Java


Пожалуйста, предложите улучшения в следующих Java-программу я написал для производителей и потребителей сценарию. Программа, кажется, работает нормально. Не страдать от возможных сценариев тупик? Насколько лучше я мог бы сделать это? Поскольку я использую стек чтение/запись (уже пуш/поп) были синхронизированы? Что, если они этого не делают?

import java.util.Stack;

import logger.CustomLogger;

public class TestProducerConsumer {

    private Stack<Integer> buffer;
    public static final int MAX_SIZE = 10;
    public int count;

    public TestProducerConsumer(){

        buffer = new Stack<Integer>();
        count = 0;
    }

    public Stack<Integer> getBuffer(){
        return buffer;
    }

    public void addToBuffer(Integer i) throws StackException{

        if(buffer.size() < MAX_SIZE){
            buffer.push(i);
            CustomLogger.logger.info("pushed "+i);
        }else
            throw new StackException("Stack Over Flow");
    }

    public Integer removeFromBuffer() throws StackException{

        if(buffer.size() == 0)
            throw new StackException("Buffer Empty");
        else 
            return buffer.pop();
    }

    public static void main(String[] args) {

        TestProducerConsumer pd = new TestProducerConsumer();
        Producer p1 = new Producer(pd);
        Producer p2 = new Producer(pd);
        Producer p3 = new Producer(pd);

        Consumer c1 = new Consumer(pd);
        Consumer c2 = new Consumer(pd);
        Consumer c3 = new Consumer(pd);
        Consumer c4 = new Consumer(pd);
        Consumer c5 = new Consumer(pd);

        Thread tp1 = new Thread(p1);
        Thread tp2 = new Thread(p2);
        Thread tp3 = new Thread(p3);

        Thread tc1 = new Thread(c1);
        Thread tc2 = new Thread(c2);
        Thread tc3 = new Thread(c3);
        Thread tc4 = new Thread(c4);
        Thread tc5 = new Thread(c5);

        tp1.start();
        tc1.start();
        tc2.start();
        tc3.start();
        tc4.start();
        tc5.start();
        tp2.start();
        tp3.start();
    }
}

class Producer implements Runnable{

    private TestProducerConsumer pc;

    public Producer(){

    }
    public Producer(TestProducerConsumer pc){
        this.pc = pc;
    }

    public void run() {

        Stack<Integer> buf = pc.getBuffer();


        while(true){    
            synchronized(pc){               
                if(buf.size() < pc.MAX_SIZE){
                    try {
                        pc.addToBuffer(new Integer((pc.count)++));
                        if(buf.size() == 1){
                            CustomLogger.logger.info("Wake up consumer");
                            pc.notifyAll();
                        }
                    } catch (StackException e) {
                        CustomLogger.logger.info(e.getError());
                        break;              
                    }
                } else{
                    try {
                        CustomLogger.logger.info("Producer sleeping");
                        pc.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

      }
    }

}

class Consumer implements Runnable{
    private TestProducerConsumer pc;

    public Consumer(TestProducerConsumer pc){
        this.pc = pc;
    }

    public void run(){

        Stack<Integer> buf = pc.getBuffer();
        int i;

        while(true){

            synchronized(pc){

                if(buf.size() == 0){

                    try {
                        CustomLogger.logger.info("Consumer Sleeping");
                        pc.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else
                {
                    try {
                        i = pc.removeFromBuffer();
                        CustomLogger.logger.info("poped "+i);

                        if(buf.size() == 0){
                            CustomLogger.logger.info("Wake up Producer");
                            pc.notifyAll();
                        }
                    } catch (StackException e) {
                        System.out.println(e.getError());
                        break;
                    }
                }
            }
        }
    }
}

class StackException extends Exception{
    private String reason;

    public StackException(){
        super();
    }
    public StackException(String reason){
        super(reason);
        this.reason = reason;
    }

    public String getError(){
        return reason;
    }

}


4285
5
задан 20 мая 2011 в 01:05 Источник Поделиться
Комментарии
2 ответа

Вы можете попробовать использовать очереди, так как они предназначены для такого рода вещи. Код намного короче.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class TestProducerConsumer {
public static final int MAX_SIZE = 10;
private final BlockingQueue<Integer> tasks = new ArrayBlockingQueue<Integer>(MAX_SIZE);
public final ExecutorService executor = Executors.newCachedThreadPool();
public final AtomicInteger count = new AtomicInteger();

public static final int POISON_VALUE = -1;

public void addToBuffer(Integer i) {
try {
tasks.put(i);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}

public Integer removeFromBuffer() {
try {
return tasks.take();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}

public static void main(String... args) {
TestProducerConsumer pd = new TestProducerConsumer();
pd.new Producer();
pd.new Producer();
pd.new Producer();

pd.new Consumer();
pd.new Consumer();
pd.new Consumer();
pd.new Consumer();
pd.new Consumer();
}

class Producer implements Runnable {
public Producer() {
executor.execute(this);
}

public void run() {
while (count.get() >= 0) {
addToBuffer(count.getAndIncrement());
}
addToBuffer(TestProducerConsumer.POISON_VALUE);
}
}

class Consumer implements Runnable {
public Consumer() {
executor.execute(this);
}

public void run() {
Integer num;
while ((num = removeFromBuffer()) != TestProducerConsumer.POISON_VALUE) {
System.out.println("popped " + num);
}
}
}
}

4
ответ дан 20 мая 2011 в 01:05 Источник Поделиться

Ваша программа не является потокобезопасным.

Например, если два потока вызвать addToBuffer(целое число I) в то же время, как может пройти если(буфер.размер() < Аргумент max_size) проверить, прежде чем один из них помещает элемент в стек, таким образом, стек может иметь больше элементов, чем аргумент max_size!

То же самое касается removeFromBuffer(), и комбинации добавить и удалить свой неожиданному поведению.


В общем вместо того, чтобы реализовать это самостоятельно, посмотрите на Java.утиль.параллельный пакет


Очереди

На Java.утиль.параллельного класса двухсторонней поставки эффективную масштабируемую потокобезопасная очередь FIFO неблокирования. Пять реализаций в Java.утиль.одновременная поддержка расширенного интерфейса blockingqueue, что определяет блокирование версии ставить и принимать: LinkedBlockingQueue, ArrayBlockingQueue, SynchronousQueue, PriorityBlockingQueue, и задержки. Различные классы покрывают наиболее распространенные контексты использования для производителя-потребителя, обмен сообщениями, параллельных задач, и связанных параллельных конструкций. Интерфейс BlockingDeque расширяет интерфейса blockingqueue для поддержки FIFO и LIFO (стек операций). Класс LinkedBlockingDeque обеспечивает реализацию.

3
ответ дан 20 мая 2011 в 01:05 Источник Поделиться