Реализация Java суб паб через брокера


Я разработал простой опционально подписки на события в Java. Что-то похожее на эту ветку

Простой Паб-Саб Событие Излучатель

Таковы цели.

  1. Класс должен уметь "слушать" то, что происходит в модуле.
  2. Он должен быть отключен от излучателя событие. То есть, слушатели не должны знать, какие классы могут испускать событий.
  3. Все абоненты должны быть уведомлены параллельно.

Мой Broker.java

import java.util.*;

public final class  Broker {
    private Object mutex = new Object();
    static class CommonTopics{
        private CommonTopics(){}
        public static final String ON_CACHE_RESET = "onCacheReset";
    }
    private static Broker brokerInstance;
    private Broker(){

    }
    public static Broker getInstance(){
        if(null== brokerInstance){
            brokerInstance = new Broker();
        }
        return brokerInstance;
    }

    private Map<String, Set<Subscriber>> subscribers = new HashMap<>();

    public boolean deregister(String topic, Subscriber subscriber) {
        synchronized (mutex) {
            final Set<Subscriber> subs = this.subscribers.get(topic);
            return subs.remove(subscriber);
        }
    }

    public boolean register(String topic, Subscriber subscriber) {
        boolean returnVal;
        synchronized (mutex) {
            if (subscribers.containsKey(topic)) {
                returnVal = subscribers.get(topic).add(subscriber);
            } else {
                Set<Subscriber> sub = new HashSet<>();
                returnVal = sub.add(subscriber);
                subscribers.put(topic, sub);
            }
        }
        return returnVal;
    }

    public void sendMessage(String topic,Map map){
        synchronized (mutex) {
            final Set<Subscriber> sub = this.subscribers.get(topic);
            sub.parallelStream().forEach(subscriber -> subscriber.update(map));
        }
    }

}

Мой абонентский интерфейс

import java.util.Map;

public interface Subscriber {
    public void update(Map map);
}

И пример абонента

import java.util.Map;

public class ExampleSubscriber implements Subscriber {
    @Override
    public void update(Map map) {
        System.out.println(map.get("oldCachedValue"));
        System.out.println(map.get("newCachedValue"));
    }

    public static void main(String[] args) {
        ExampleSubscriber subscriber = new ExampleSubscriber();
        Broker.getInstance().register(Broker.CommonTopics.ON_CACHE_RESET,subscriber);
    }
}

Пример emmitter событие

Map m = new HashMap();
        m.put("oldCachedValue","Yes");
        m.put("newCachedValue","No");
        Broker.getInstance().sendMessage(Broker.CommonTopics.ON_CACHE_RESET,m);


1224
4
задан 29 марта 2018 в 10:03 Источник Поделиться
Комментарии
1 ответ


  • Во-первых, mutex должно быть finalС синхронизации на это. В текущем состоянии вашего кода, это не имеет большого значения, так как это private в любом случае и никогда не выходит за пределы класса, но все же, если вы сделаете это finalтогда поле mutex защищен от модификации компилятором, снижая риск ошибок.

  • Если вы не намерены менять brokerInstanceт. е. если Broker предназначен для одноэлементный, вы можете также сделать brokerInstance final и инициализировать его сразу с объявлением. Это позволит устранить необходимость для if построить в метод getInstance(). После погуглив, я выяснил, что цель того, что вы делаете, это, наверное, отложенная инициализация. Однако, в этот вопрос на StackOverflow, некоторые моменты сделаны в пользу подхода с final переменная (особенно пункт про потокобезопасность может быть интересно для вас).

  • Вы можете воспользоваться классе ConcurrentHashMap для синхронизации целей. А ConcurrentHashMap оптимизирована для синхронного доступа в том, что она не закрывается полностью во время операции, так что операции, которые не конфликтуют друг с другом могут быть выполнены одновременно (возможно, вы захотите взглянуть на это).

    Например, вы могли бы сделать каждый Set<Subscriber> в Broker.subscribers а ConcurrentHashMap.KeySetView (которая служит Set эквивалент ConcurrentHashMap, потому что нет класса ConcurrentHashSet) вместо обычной HashMap. Это хотя бы сделать synchronized блок Broker.deregister(String, Subscriber) ненужные. Делая Broker.subscribers сама ConcurrentHashMap также может помочь, но вы все равно должны быть осторожны, например, в методе Broker.register(String, Subscriber)потому что без любого другого синхронизации, первый звонок subscribers.containsKey(topic) может дать falseтак else блок вошел, но до звонка subscribers.put(topic, sub) достигается, subscribers возможно, были изменены, и теперь мог уже содержит сопоставления для topic что бы потом быть перезаписаны. Есть специальные методы в классе ConcurrentHashMap для таких случаев (как compute методов), но так как это всего лишь общие предположения, я не буду вдаваться в подробности об этом.


  • Ваш код отсутствует обработка исключительных случаях, например, если методы deregister(String, Subscriber) или sendMessage(String, Map) называются тема, которая не содержится в основной набор subscribers.

  • Как я уже намекнул в комментарии, используя сырье видов-это опасное предприятие. Он не ошибся как таковой, но риск ошибки выше, потому что дженерики оказывают во время компиляции тип безопасности, который вы не воспользовались, если вы используете сырые типы. Я не знаю, что ваш код может делать с картой, так что, возможно, это не будет проблемой, но вы могли бы взглянуть на этот или этот.

  • Также обратите внимание, что map что передается Broker.sendMessage(String, Map) будет осуществляться одновременно в sub.parallelStream().forEach(subscriber -> subscriber.update(map)), что может быть нежелательно , если карта может быть изменен абонентом.

1
ответ дан 29 марта 2018 в 01:03 Источник Поделиться