Простой сервер для посланника


Для гонца программы я пишу сервер класс, который будет выполнять неблокирующий сервер в один поток. Какой-нить мессенджер будет использовать этот сервер для общения с другими клиентами.

Что сервер должен делать

  • Принимать подключения / соединения с другими экземплярами сервера и связать подбора ключей для подключений на карте ключи Вит ID и, следовательно, посланник поток может получить доступ к подключениям по ID
  • Читать из / писать в связи
  • Хранения входящих сообщений в очереди
  • Нить Messenger может
    • Выборка входящих сообщений
    • Очереди сообщений: send_message(int идентификатор, строка MSG)

package snserver;

/* imports */

//class SNServer (Simple non-blocking Server)

public class SNServer extends Thread {
    private int port;
    private Selector selector;
    private ConcurrentMap<Integer, SelectionKey> keys; // ID -> associated key
    private ConcurrentMap<SocketChannel,List<byte[]>> dataMap_out;
    ConcurrentLinkedQueue<String> in_msg; //incoming messages to be fetched by messenger thread

    public SNServer(int port) {
        this.port = port;
        dataMap_out = new ConcurrentHashMap<SocketChannel, List<byte[]>>();
        keys = new ConcurrentHashMap<Integer, SelectionKey>();
    }

    public void start_server() throws IOException {
        // create selector and channel
        this.selector = Selector.open();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);

        // bind to port
        InetSocketAddress listenAddr = new InetSocketAddress((InetAddress)null, this.port);
        serverChannel.socket().bind(listenAddr);
        serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);

        log("Echo server ready. Ctrl-C to stop.");

        // processing
        while (true) {
            // wait for events
            this.selector.select();

            // wakeup to work on selected keys
            Iterator keys = this.selector.selectedKeys().iterator();
            while (keys.hasNext()) {
                SelectionKey key = (SelectionKey) keys.next();

                // this is necessary to prevent the same key from coming up 
                // again the next time around.
                keys.remove();

                if (! key.isValid()) {
                    continue;
                }

                if (key.isAcceptable()) {
                    this.accept(key);
                }
                else if (key.isReadable()) {
                    this.read(key);
                }
                else if (key.isWritable()) {
                    this.write(key);
                }
                else if(key.isConnectable()) {
                    this.connect(key);
                }
            }
        }
    }

    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel channel = serverChannel.accept();
        channel.configureBlocking(false);
        send_message(key, "Welcome."); //DEBUG

        Socket socket = channel.socket();
        SocketAddress remoteAddr = socket.getRemoteSocketAddress();
        log("Connected to: " + remoteAddr);

        // register channel with selector for further IO
        dataMap_out.put(channel, new ArrayList<byte[]>());
        channel.register(this.selector, SelectionKey.OP_READ);

        //store key in 'keys' to be accessable by ID from messenger thread //TODO first get ID
        keys.put(0, key);
    }

    //TODO verify, test
    public void init_connect(String addr, int port){
        try {
            SocketChannel channel = createSocketChannel(addr, port);
            channel.register(this.selector, channel.validOps()/*, SelectionKey.OP_?*/);
        }
        catch (IOException e) {
            //TODO handle
        }
    }

    //TODO verify, test
    private void connect(SelectionKey key) {
        SocketChannel channel = (SocketChannel) key.channel();
        try {
            channel.finishConnect(); //try to finish connection - if 'false' is returned keep 'OP_CONNECT' registered
            //store key in 'keys' to be accessable by ID from messenger thread //TODO first get ID
            keys.put(0, key);
        }
        catch (IOException e0) {
            try {
                //TODO handle ok?
                channel.close();
            }
            catch (IOException e1) {
                //TODO handle
            }
        }

    }

    private void read(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();

        ByteBuffer buffer = ByteBuffer.allocate(8192);
        int numRead = -1;
        try {
            numRead = channel.read(buffer);
        }
        catch (IOException e) {
            e.printStackTrace();
        }

        if (numRead == -1) {
            this.dataMap_out.remove(channel);
            Socket socket = channel.socket();
            SocketAddress remoteAddr = socket.getRemoteSocketAddress();
            log("Connection closed by client: " + remoteAddr); //TODO handle
            channel.close();
            return;
        }

        byte[] data = new byte[numRead];
        System.arraycopy(buffer.array(), 0, data, 0, numRead);
        in_msg.add(new String(data, "utf-8"));
    }

    private void write(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        List<byte[]> pendingData = this.dataMap_out.get(channel);
        Iterator<byte[]> items = pendingData.iterator();
        while (items.hasNext()) {
            byte[] item = items.next();
            items.remove();
            //TODO is this correct? -> re-doing write in loop with same buffer object
            ByteBuffer buffer = ByteBuffer.wrap(item);
            int bytes_to_write = buffer.capacity();
            while (bytes_to_write > 0) {
                bytes_to_write -= channel.write(buffer);
            }
        }
        key.interestOps(SelectionKey.OP_READ);
    }

    public void queue_data(SelectionKey key, byte[] data) {
        SocketChannel channel = (SocketChannel) key.channel();
        List<byte[]> pendingData = this.dataMap_out.get(channel);
        key.interestOps(SelectionKey.OP_WRITE);

        pendingData.add(data);
    }

    public void send_message(int id, String msg) {
        SelectionKey key = keys.get(id);
        if (key != null)
            send_message(key, msg);
        //else
            //TODO handle
    }

    public void send_message(SelectionKey key, String msg) {
        try {
            queue_data(key, msg.getBytes("utf-8"));
        }
        catch (UnsupportedEncodingException ex) {
            //is not thrown: utf-8 is always defined
        }
    }

    public String get_message() {
        return in_msg.poll();
    }

    private static void log(String s) {
        System.out.println(s);
    }

    @Override
    public void run() {
        try {
            start_server();
        }
        catch (IOException e) {
            System.out.println("IOException: " + e);
            //TODO handle exception
        }
    }    


    // Creates a non-blocking socket channel for the specified host name and port.
    // connect() is called on the new channel before it is returned.
    public static SocketChannel createSocketChannel(String hostName, int port) throws IOException {
        // Create a non-blocking socket channel
        SocketChannel sChannel = SocketChannel.open();
        sChannel.configureBlocking(false);

        // Send a connection request to the server; this method is non-blocking
        sChannel.connect(new InetSocketAddress(hostName, port));
        return sChannel;
    }
}

Общие проблемы

Поскольку я новичок в Java и сети там может быть несколько вещей неправильно или не хорошо в этот код. Пожалуйста, помогите мне улучшить этот код, он делает то, что я хотел бы это сделать. Также дать предложения по улучшению концепции!

Текущие проблемы:

  • После вызова init_connect() там, кажется, нет события на селектор, чтобы соединение не построено.


1757
2
задан 27 августа 2011 в 11:08 Источник Поделиться
Комментарии
3 ответа

Недавно я наткнулась на удобную библиотеку для ведения сетевого вещи в Java называется Нетти. Это даже не библиотека - это скорее основа для построения масштабируемых приложений, так что это накладывает некоторые архитектурные решения по архитектуре приложения. Хотя его использование может быть излишним в вашей ситуации я советую вам проверить его документы, как вы могли узнать подходов, реализованных там.

Что я вижу, может быть улучшено:


  • Код отводками. В коде у вас все в одном месте: транспортная обработка сетевых пакетов в очереди и так далее. Так что трудно понять, где, чтобы проверить и отладить его. Я бы попробовал отделить как-то слоями.

  • Разделение сети и бизнес-логики темы. В случае, если ваш бизнес-логики и ввода/вывода получает значительно больше нагрузки, то аналогом вся производительность будет деградировать, поскольку это делается в одном потоке. Это может быть не проблема в данном конкретном случае, поскольку логика здесь довольно простая, но тем не менее. Если перенести бизнес-логику в отдельный поток ввода-вывода не будет блокировать, ждущих его для выполнения своей работы и общая производительность возрастет. Ловушка является правильное взаимодействие/синхронизация между потоками так, чтобы они не лупить данные.

Вышеперечисленные моменты довольно сильно обработаны в том, что либерал, поэтому я предлагаю вам заглянуть в свои документы и примеры, чтобы понять, почему и как они это реализовали. Даже если вы не выбрать его, ты можешь узнать что-то полезное для своего проекта.

С. П.: Я не связан ни с названием JBoss ни с Нетти, но я действительно влюбилась в него.

1
ответ дан 23 октября 2011 в 09:10 Источник Поделиться

У меня мало опыта с сетью, поэтому я не могу критиковать сама логика, увы. У меня есть некоторые незначительные замечания по форме, хотя:


  • Это спорно, потому что вы найдете некоторые люди сторонник этого стиля, но я лично (и его разделяют многие люди!) избежать использования этого.имяполя за пределами конструкторы, потому что она избыточна. Я вижу это как визуальный шум. Это может помочь отличить локальные переменные из полей, я решил использовать префикс, который является слишком спорным...

  • in_msg поле не объявлены как private, не уверен, если это является намеренным.

  • Не глотать исключения в init_connect, вы можете упустить некоторые проблемы...

Вот большинство из того, что я думаю; вам может понадобиться для очистки ресурсов, если вы поймать исключение на уровне запуска().

0
ответ дан 29 августа 2011 в 09:08 Источник Поделиться

Короткий ответ

Использовать государственную машину.

Более подробный ответ

Я не читал ваш код в деталях, но я уже работал с Java не-блокирующих сокетов. Поэтому я могу сказать, это довольно трудно получить это право.

Я когда-то унаследовали кода, который был похож на ваш код. Моей задачей было писать юнит-тесты, чтобы быть близко к 100% покрытия ветвей. Я пытался писать тесты, не касаясь кода, но есть так много государств для отслеживания (селектор, буфера сокета государства), что я не был в состоянии достоверно проверить код.

Итак, я переписал код, используя государство-машина для инкапсуляции этих государств. Я закончил с 7 маленькие классы, каждый из которых представляет состояние розетки, или состояние обработки запроса. Полученный код был более надежный (много пропавших без вести края-дел стало очевидным), легче понять и поддержать. Написание тестов было легче, потом.

Если бы мне пришлось сделать это снова, я бы попытался использовать Нетти (как Игорь предложил). Нетти инкапсулирует государства тоже, и управляет ошибок соединения в единый путь.

0
ответ дан 22 ноября 2011 в 01:11 Источник Поделиться