Реализация costreams


Я не очень хорошо с безопасностью потоков, и часто становятся жертвами тонкого проблем параллелизма. Поэтому я надеюсь, что кто-то здесь может сказать мне, есть ли это тонкий вопрос, параллелизм (состояние гонки и т. д.) в следующем коде, или это нормально. В частности, я использовал монитор.Ждать и следить.PulseAll правильно?

Конечно, если вы можете рассуждать о коде и пришел к выводу, что это уже исправить, что бы ответ тоже.

Этот код предназначен для реализации costreams шаблон (я придумал это название, поэтому вы не найдете его в Google). Он выполняется двумя методами (принят в качестве делегатов) параллельно. Она обеспечивает один из этих методов только для записи трансляций и прочих с только для чтения поток. Идея для одной из них для создания данных и записать его в поток, а другой для чтения из потока и потребления. (Чтение методы/записи передающийся в может быть все, что читает/пишет из ручья; они не могут быть написаны специально для использования с costreams. Если бы они были, они, вероятно, могут быть переписаны так, что они не должны использовать потоки вообще.)

При рассмотрении читал способ, помните, что договор на трансляцию.Читать это немного нелогично: он может читать и возвратить меньше байтов, чем было запрошено (пока она возвращает число байтов фактически чтение). Таким образом, тот факт, что он иногда возвращает меньше байт, чем подсчет параметров запросов-это не ошибка. Конечно, он не должен возвращать 0, за исключением, если конец потока достигается.

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;

namespace MyLibrary
{
    public static class Costreams
    {
        /// <summary>Runs the two specified processes in parallel, allowing one to generate data by writing it to a stream, and the other to consume the data by reading it from a stream.</summary>
        /// <param name="writingAction">An action that generates data and writes it to a stream.</param>
        /// <param name="readingAction">An action that will want to read information from a stream.</param>
        public static void RunCostreams(Action<Stream> writingAction, Action<Stream> readingAction)
        {
            // Everything the writingAction writes will be enqueued in here and dequeued by the readingAction
            var queue = new Queue<byteChunk>();

            writingCostream writer = new writingCostream(queue);
            readingCostream reader = new readingCostream(queue);

            // Start reading in a new thread. The first call to reader.Read() will block until there is something in the queue to read.
            var thread = new Thread(() => readingAction(reader));
            thread.Start();

            // Start writing. Calls to writer.Write() will place the data in the queue and signal the reading thread.
            writingAction(writer);

            // Insert a null at the end of the queue to signal to the reader that this is where the data ends.
            queue.Enqueue(null);

            // Wait for the reader to consume all the remaining data.
            thread.Join();
        }

        private sealed class byteChunk
        {
            public byte[] Buffer;
            public int Offset;
            public int Count;
        }

        private sealed class readingCostream : Stream
        {
            private Queue<byteChunk> _queue;
            public readingCostream(Queue<byteChunk> queue) { _queue = queue; }

            public override bool CanRead { get { return true; } }
            public override bool CanSeek { get { return false; } }
            public override bool CanWrite { get { return false; } }
            public override void Flush() { }
            public override long Length { get { throw new NotSupportedException(); } }
            public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } }
            public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); }
            public override void SetLength(long value) { throw new NotSupportedException(); }
            public override void Write(byte[] buffer, int offset, int count) { throw new NotSupportedException(); }

            public override int Read(byte[] buffer, int offset, int count)
            {
                lock (_queue)
                {
                    // If there is no data waiting to be read, wait for it.
                    while (_queue.Count == 0)
                        Monitor.Wait(_queue);

                    var peeked = _queue.Peek();

                    // A null element in the queue signals the end of the stream. Don't dequeue this item.
                    if (peeked == null)
                        return 0;

                    if (peeked.Count <= count)
                    {
                        // If we can return the complete item, dequeue it
                        Buffer.BlockCopy(peeked.Buffer, peeked.Offset, buffer, offset, peeked.Count);
                        _queue.Dequeue();
                        return peeked.Count;
                    }
                    else
                    {
                        // If we can only return part of the item, modify it accordingly
                        Buffer.BlockCopy(peeked.Buffer, peeked.Offset, buffer, offset, count);
                        peeked.Offset += count;
                        peeked.Count -= count;
                        return count;
                    }
                }
            }
        }

        private sealed class writingCostream : Stream
        {
            private Queue<byteChunk> _queue;
            public writingCostream(Queue<byteChunk> queue) { _queue = queue; }

            public override bool CanRead { get { return false; } }
            public override bool CanSeek { get { return false; } }
            public override bool CanWrite { get { return true; } }
            public override void Flush() { }
            public override long Length { get { throw new NotSupportedException(); } }
            public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } }
            public override int Read(byte[] buffer, int offset, int count) { throw new NotSupportedException(); }
            public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); }
            public override void SetLength(long value) { throw new NotSupportedException(); }

            public override void Write(byte[] buffer, int offset, int count)
            {
                // Ignore zero-length writes
                if (count == 0)
                    return;

                lock (_queue)
                {
                    // We have to take a copy of the data because the calling thread might re-use the same buffer multiple times.
                    var bufferCopy = new byte[count];
                    Buffer.BlockCopy(buffer, offset, bufferCopy, 0, count);

                    // Put the data in the queue
                    _queue.Enqueue(new byteChunk { Buffer = bufferCopy, Offset = 0, Count = count });

                    // Signal the reading thread(s) that the queue has changed (in case it's waiting)
                    Monitor.PulseAll(_queue);
                }
            }
        }
    }
}


508
7
задан 23 февраля 2011 в 06:02 Источник Поделиться
Комментарии
1 ответ

Зависит от того, что ты пытаешься сделать. Неужели это возможно, чтобы иметь несколько потоков чтения, как комментарий намекает? Если так, то ты рискуешь иметь несколько чтений выпущен сразу и вернется в неправильном порядке. Если нет, то зачем использовать PulseAll в отличие от импульса?

Он чувствует себя неправильно для меня, что если бы читали называется и есть по очереди, то дальнейшие вызовы запись заблокирована, но если читать ждет, пока напишет отпускает ее, затем пишите еще, пишите буду в состоянии заблокировать очередь и использовать его. Тем не менее, это не должно быть проблемой, учитывая, что читать будут иметь дело с другого элемента в то время.

Но вот возникает вопрос, зачем ждать? Почему бы не заменить все это с событие manualresetevent? Таким образом, вы только должны зафиксировать очереди, пока вы обновляете его, так что вы можете установить случае, когда вы добавляете данные, сброс нем при удалении последнего элемента.

Я не проверял это, но он будет похож на этот:

public static class Costreams
{
/// <summary>Runs the two specified processes in parallel, allowing one to generate data by writing it to a stream, and the other to consume the data by reading it from a stream.</summary>
/// <param name="writingAction">An action that generates data and writes it to a stream.</param>
/// <param name="readingAction">An action that will want to read information from a stream.</param>
public static void RunCostreams(Action<Stream> writingAction, Action<Stream> readingAction)
{
// Everything the writingAction writes will be enqueued in here and dequeued by the readingAction
var queue = new Queue<byteChunk>();
using (var hasData = new ManualResetEvent(false))
{
writingCostream writer = new writingCostream(queue, hasData);
readingCostream reader = new readingCostream(queue, hasData);

// Start reading in a new thread. The first call to reader.Read() will block until there is something in the queue to read.
var thread = new Thread(() => readingAction(reader));
thread.Start();

// Start writing. Calls to writer.Write() will place the data in the queue and signal the reading thread.
writingAction(writer);

// Insert a null at the end of the queue to signal to the reader that this is where the data ends.
lock(queue)
{
queue.Enqueue(null);
hasData.Set();
}

// Wait for the reader to consume all the remaining data.
thread.Join();
}
}

private sealed class byteChunk
{
public byte[] Buffer;
public int Offset;
public int Count;
}

private sealed class readingCostream : Stream
{
private Queue<byteChunk> _queue;
private ManualResetEvent _hasData;

public readingCostream(Queue<byteChunk> queue, ManualResetEvent hasData)
{
_queue = queue;
_hasData = hasData;
}

public override bool CanRead { get { return true; } }
public override bool CanSeek { get { return false; } }
public override bool CanWrite { get { return false; } }
public override void Flush() { }
public override long Length { get { throw new NotSupportedException(); } }
public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } }
public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); }
public override void SetLength(long value) { throw new NotSupportedException(); }
public override void Write(byte[] buffer, int offset, int count) { throw new NotSupportedException(); }

public override int Read(byte[] buffer, int offset, int count)
{
// If there is no data waiting to be read, wait for it.
_hasData.WaitOne();

byteChunk peeked;
lock(_queue)
peeked = _queue.Peek();

// A null element in the queue signals the end of the stream. Don't dequeue this item.
if (peeked == null)
return 0;

if (peeked.Count <= count)
{
// If we can return the complete item, dequeue it
Buffer.BlockCopy(peeked.Buffer, peeked.Offset, buffer, offset, peeked.Count);
lock(_queue)
{
_queue.Dequeue();
// If this has emptied the queue, tell the next call to read
if (_queue.Count == 0)
_hasData.Reset();
}

return peeked.Count;
}

// If we can only return part of the item, modify it accordingly
Buffer.BlockCopy(peeked.Buffer, peeked.Offset, buffer, offset, count);
peeked.Offset += count;
peeked.Count -= count;
return count;
}
}

private sealed class writingCostream : Stream
{
private Queue<byteChunk> _queue;
private ManualResetEvent _hasData;
public writingCostream(Queue<byteChunk> queue, ManualResetEvent _hasData)
{
_queue = queue;
_hasData = hasData;
}

public override bool CanRead { get { return false; } }
public override bool CanSeek { get { return false; } }
public override bool CanWrite { get { return true; } }
public override void Flush() { }
public override long Length { get { throw new NotSupportedException(); } }
public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } }
public override int Read(byte[] buffer, int offset, int count) { throw new NotSupportedException(); }
public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); }
public override void SetLength(long value) { throw new NotSupportedException(); }

public override void Write(byte[] buffer, int offset, int count)
{
// Ignore zero-length writes
if (count == 0)
return;

// We have to take a copy of the data because the calling thread might re-use the same buffer multiple times.
var bufferCopy = new byte[count];
Buffer.BlockCopy(buffer, offset, bufferCopy, 0, count);

// Put the data in the queue
lock (_queue)
{
_queue.Enqueue(new byteChunk { Buffer = bufferCopy, Offset = 0, Count = count });

// Inform the reading thread that the queue now has data
_hasData.Set();
}
}
}
}

Еще пара замечаний с точки зрения обзора:


  • Стандарты именования от Microsoft для C# предположить, что имена классов должны быть PascalCase

  • Возможно, вы также захотите взглянуть на еще одно событие manualresetevent для памяти перегрузок - если писать вызывается много раз, и вы бежите из памяти, вы можете подождать, пока читал удалила некоторые данные из очереди, а не с OutOfMemoryException

5
ответ дан 23 февраля 2011 в 10:02 Источник Поделиться