Переключение функции включения и выключения в назначенное нить


Чтобы проиллюстрировать, как это сделать лидер выборов с Apache зоопарка я создал простое приложение, и я хотел бы, чтобы продевать нитку части заявление рассмотрено. Я создал простой класс (динамик), который выдает уникальный идентификатор файла на очередной интервал. Я реализовал интерфейс Runnable интерфейс и запустите его с помощью scheduledexecutorservice выполняет с фиксированной задержкой, как это:

scheduler.scheduleWithFixedDelay(speaker, 0, delay, TimeUnit.MILLISECONDS);

В динамик класс также реализует слушателя интерфейс, так что он может зарегистрировать себя на мониторе (работает в отдельном потоке, общаясь смотрителю сервера), который может включить или выключить вывод в файл.

На мой вопрос, комментарий (помимо обзора общего кода): это лучший способ для запуска и остановки функции в запланированном нить?

Динамик И Динамик Сервера:

Полный проект: https://github.com/cyberroadie/zookeeper-leader

Объяснение кода: http://cyberroadie.wordpress.com/2011/11/24/implementing-leader-election-with-zookeeper/

public class Speaker implements Runnable, NodeMonitor.NodeMonitorListener {

    private String message;
    private String processName;
    private long counter = 0;
    private volatile boolean canSpeak = false;

    public Speaker(String message) throws IOException, InterruptedException, KeeperException {
        this.message = message;
         this.processName = getUniqueIdentifier();
    }

    private static String getUniqueIdentifier() {
        String processName = ManagementFactory.getRuntimeMXBean().getName();
        String processId = processName.substring(0, processName.indexOf("@"));
        return "pid-" + processId + ".";
    }

    public void run() {
        try {
            if (canSpeak) {
                handleTask();
            }
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void handleTask() throws IOException {
        FileWriter fstream = new FileWriter("out.txt");
        BufferedWriter out = new BufferedWriter(fstream);
        out.write(message + ": " + counter++ + " " + processName + "\n");
        out.close();
    }

    @Override
    public void startSpeaking() {
        this.canSpeak = true;
    }

    @Override
    public void stopSpeaking() {
        this.canSpeak = false;
    }

    @Override
    public String getProcessName() {
        return processName;
    }
}

Спикер Сервер:

public class SpeakerServer {

    final static Logger logger = LoggerFactory.getLogger(SpeakerServer.class);

    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private static NodeMonitor monitor;

    private static void printUsage() {
        System.out.println("program [message] [wait between messages in millisecond]");
    }

    public static void main(String[] args) {
        if (args.length < 2) {
            printUsage();
            System.exit(1);
        }

        long delay = Long.parseLong(args[1]);

        Speaker speaker = null;

        try {
            speaker = new Speaker(args[0]);
            monitor = new NodeMonitor();
            monitor.setListener(speaker);
            monitor.start();
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
        scheduler.scheduleWithFixedDelay(speaker, 0, delay, TimeUnit.MILLISECONDS);
        logger.info("Speaker server started with fixed time delay of " + delay + " milliseconds.");
    }
}


330
3
задан 30 ноября 2011 в 11:11 Источник Поделиться
Комментарии
2 ответа

вы также можете посмотреть на использование фиксированного пула потоков. Таким образом, если один поток блокируется по каким-то причинам, говорят, что диск стал переполнен, потоки будут ждать в очереди, пока блокированный поток прекращается. Таким образом, вы не потеряете какие-либо уникального идентификатора или столкнетесь с проблемами памяти, если слишком много потоков блокируется.

Надеюсь, что помогает,

Росс

1
ответ дан 30 ноября 2011 в 12:11 Источник Поделиться

Я считаю остановка планировщика (или отмена задания) на stopSpeaking вызова и перезапуска на startSpeaking звонок (или повторить выполнимое). Я не знаком с зоопарка, я не знаю ничего об обычном использовании, так что если стоп/startSpeaking вызовы приезжает часто это не лучшее решение, но кажется чище, если есть длительные перерывы между stopSpeaking и startSpeaking звонки.

Несколько заметок о код:


  1. Рекомендуется использовать SLF4J в журнал исключения, а не печатные().

  2. Закройте BufferedWriter в конце концов блок:

    final BufferedWriter out = new BufferedWriter(fstream);
    try {
    ...
    } finally {
    out.close();
    }

  3. Системы.выход не лучший способ остановить все приложение, особенно скрытые в поймать блок запуска метода. Я повторно сгенерировать исключение IOException как к RuntimeException (или к RuntimeException подкласс):

    @Override
    public void run() {
    if (!canSpeak) {
    return;
    }
    try {
    handleTask();
    } catch (final IOException e) {
    throw new RuntimeException(e);
    }
    }

    затем используйте ScheduledFuture из scheduleWithFixedDelay ждать к RuntimeException:

    final ScheduledFuture<?> speakerFuture = 
    scheduler.scheduleWithFixedDelay(speaker, 0, delay, TimeUnit.MILLISECONDS);
    logger.info("Speaker server started with fixed time delay of " + delay + " milliseconds.");
    try {
    speakerFuture.get();
    } finally {
    scheduler.shutdown();
    }

    Отказ от ответственности: я не люблю использование к RuntimeException здесь, но, к сожалению, этот класс можно только исполняемые объекты (не вызываемыйS, которая может бросить любое исключение).

    Пожалуйста, обратите внимание на пункт охраны также.


  4. Я не совсем уверен, что scheduledexecutorservice выполняет гарантирует ли запланированные задачи всегда выполняются в том же потоке или нет. (Я предполагаю, что это не так.) Кроме того, встречное поле в динамик класса могут быть синхронизированы должным образом, в связи с нестабильной спекуляция, но я бы использовал оборонительную стратегию и использовать AtomicInteger для рецепта, чтобы сделать его менее подверженным ошибкам.

  5. Я проверить значение nullв конструкторе динамик. checkNotNull из гуавы - отличный выбор для этого.

    this.message = checkNotNull(message, "message cannot be null");

    (См. также: Эффективная Java 2-е издание, пункт 38: проверка параметров на валидность)


  6. SLF4J поддерживает {}, используйте это:

    logger.info("Speaker server started with fixed time delay of {} milliseconds.", delay);

0
ответ дан 14 июля 2012 в 10:07 Источник Поделиться