Поток-безопасный и блокировки - реализация очереди


Я пытался создать блокировку очереди реализация в Java, в основном для личного обучения. Очереди должны быть общими, позволяя любое количество читателей и/или по совместительству писателей.

Пожалуйста, просмотрите его, и предложить какие-либо улучшения/вопросы вы найдете?

(Кросс-пост из StackOverflow)

import java.util.concurrent.atomic.AtomicReference;

public class LockFreeQueue<T> {
    private static class Node<E> {
        final E value;
        volatile Node<E> next;

        Node(E value) {
            this.value = value;
        }
    }

    private AtomicReference<Node<T>> head, tail;

    public LockFreeQueue() {
        // have both head and tail point to a dummy node
        Node<T> dummyNode = new Node<T>(null);
        head = new AtomicReference<Node<T>>(dummyNode);
        tail = new AtomicReference<Node<T>>(dummyNode);
    }

    /**
     * Puts an object at the end of the queue.
     */
    public void putObject(T value) {
        Node<T> newNode = new Node<T>(value);
        Node<T> prevTailNode = tail.getAndSet(newNode);
        prevTailNode.next = newNode;
    }

    /**
     * Gets an object from the beginning of the queue. The object is removed
     * from the queue. If there are no objects in the queue, returns null.
     */
    public T getObject() {
        Node<T> headNode, valueNode;

        // move head node to the next node using atomic semantics
        // as long as next node is not null
        do {
            headNode = head.get();
            valueNode = headNode.next;
            // try until the whole loop executes pseudo-atomically
            // (i.e. unaffected by modifications done by other threads)
        } while (valueNode != null && !head.compareAndSet(headNode, valueNode));

        T value = (valueNode != null ? valueNode.value : null);

        // release the value pointed to by head, keeping the head node dummy
        if (valueNode != null)
            valueNode.value = null;

        return value;
    }
}


Комментарии
4 ответа

Да.


  • Сочетание летучих и сравнить-и-обменять операций достаточно, чтобы убедиться, что узел объекты благополучно опубликован.

  • Сравнить-и-обменять, должны быть перед назначением на неустойчивой переменной в обоих методах, так тебе и там хорошо. Они не должны происходить атомарно.

Очереди экспонаты странное поведение. Допустим резьбы 1 останавливается в putObject() после КАС, но до последнего задания. Следующий поток 2 выполняет putObject() в полном объеме. Следующий поток три вызова функции getobject(), и он не может видеть, как первые два объекта, хотя резьба 2 выполнена полностью. Есть небольшая цепочка выстраивается в памяти. Только после того, как поток 1 завершает putObject() являются объекты, видимые на очереди, что несколько удивительно, и слабее, чем большинство семантика неблокирующих структур данных.

Другой способ смотреть на странный API в том, что это хорошо иметь неблокируемую поставить() , но это очень странно иметь неблокируемую вам(). Это означает, что очереди должны быть использованы при повторных опросов и спать.

21
ответ дан 27 января 2011 в 05:01 Источник Поделиться

Мне не нравится твое " пут " рутины. В 'странное поведение' другие заметили означает, что алгоритм теряет одно из главных преимуществ 'блокировки': иммунитет от инверсии приоритетов или асинхронно-прекращенные нити. Если поток получает подкараулил в середине очереди-операция записи, очередь будет полностью разбита до тех пор, пока очередь завершит свою работу.

Решение CompareAndSet последнего узла "следующий" указатель перед обновлением "хвост" указателем; если "последний узел"с "следующий" указатель имеет значение null, что означает, что это не последний узел больше, и нужно пройти одной, чтобы найти реальный последний узел, Повторяйте CompareAndSet на это, и надеюсь, что это все-таки последний узел.

4
ответ дан 28 января 2011 в 09:01 Источник Поделиться

В то время как решение ОП выглядит правильным для меня, улучшение Рон создает состояние гонки:

Нить 1 (в функции getobject()):

} while (!refHead.compareAndSet(head, next));

T value = next.value;

Нити 1 подвешен здесь, так что это еще не выполнить

next.value = null;

Мы знаем, что значение!=теперь нуль и refHead рядом, так refHead.получить().значение!=нуль

Резьба 2 (в функции getobject()):

    head = refHead.get();
assert head.value == null;

Здесь утверждают, кусается даже, что все будет ОК после резьбы 1 продолжается.

4
ответ дан 30 января 2011 в 02:01 Источник Поделиться

Я изменил ваш код немного, но я думаю, что ваш подход-это хорошо.

Как я уже упоминал в комментарии, нет никакой гарантии справедливости между потоками compareAndSetting голове, так что действительно повезло-нить может застрять на некоторое время, если есть много потребителей.

Я не думаю, что эта структура данных должна держать значения null, так как нет никакого способа для различения становится нулевым или получения из пустой очереди, так что я бросить НПЭ.

Я рефакторинг логики немного в getobject, чтобы удалить избыточные проверки.

И я переименовал некоторые Варс для hungarianlike.

import java.util.concurrent.atomic.AtomicReference;

public class LockFreeQueue<T> {
private static class Node<E> {
E value;
volatile Node<E> next;

Node(E value) {
this.value = value;
}
}

private final AtomicReference<Node<T>> refHead, refTail;
public LockFreeQueue() {
// have both head and tail point to a dummy node
Node<T> dummy = new Node<T>(null);
refHead = new AtomicReference<Node<T>>(dummy);
refTail = new AtomicReference<Node<T>>(dummy);
}

/**
* Puts an object at the end of the queue.
*/
public void putObject(T value) {
if (value == null) throw new NullPointerException();

Node<T> node = new Node<T>(value);
Node<T> prevTail = refTail.getAndSet(node);
prevTail.next = node;
}

/**
* Gets an object from the beginning of the queue. The object is removed
* from the queue. If there are no objects in the queue, returns null.
*/
public T getObject() {
Node<T> head, next;

// move head node to the next node using atomic semantics
// as long as next node is not null
do {
head = refHead.get();
next = head.next;
if (next == null) {
// empty list
return null;
}
// try until the whole loop executes pseudo-atomically
// (i.e. unaffected by modifications done by other threads)
} while (!refHead.compareAndSet(head, next));

T value = next.value;

// release the value pointed to by head, keeping the head node dummy
next.value = null;

return value;
}
}

3
ответ дан 27 января 2011 в 08:01 Источник Поделиться