Одновременных событий счетчика служебные классы/усреднитель


Для личного проекта, Я работаю на Java Profiler, которая специально предназначена для ThreadPoolExecutor и его подклассы, а также предоставляет статистику о пропускной способности выполненными заданиями. В поддержку этого я написал следующие служебные классы, которые предназначены, чтобы помочь вычислить сглаженный средний уровень событий с течением времени. Они предназначены для того, чтобы быть ориентирован на многопотоковое исполнение и блокировка-free; я хочу, чтобы как можно меньше вмешиваться в темы, которые рассчитывают (и выполнение) задач.

Первое-это EventMeter, которая используется для подсчета один тип события. Это поддерживает вычисления средней скоростью событий, сохраняя текущую сумму подсчитанных событий и быть в состоянии выбросить старые данные. (Простите мой русский!)

public final class EventMeter {
    private final long bucketLengthNanos;
    private final double[] buckets;
    private volatile double sum;
    private int bucketNum;
    private AtomicLong accumulator;

    public EventMeter(int numBuckets, long bucketLengthNanos) {
        // TODO assert that numBuckets >= 1, bucketLengthNanos >= some small number

        this.bucketLengthNanos = bucketLengthNanos;
        this.buckets = new double[numBuckets];
        this.sum = 0.0;
        this.bucketNum = 0;
        this.accumulator = new AtomicLong(Double.doubleToLongBits(0.0));
    }

    public long getBucketLengthNanos() {
        return bucketLengthNanos;
    }

    public void countEvent() {
        long oldAccumulator;
        long newAccumulator;
        do {
            oldAccumulator = accumulator.get();
            newAccumulator = Double.doubleToLongBits(Double.longBitsToDouble(oldAccumulator) + 1.0);
        } while (!accumulator.compareAndSet(oldAccumulator, newAccumulator));
    }

    public double getRate() {
        return sum / buckets.length;
    }

    public void fillBucketFromAccumulator(double accumulatorFraction) {
        long oldAccumulator;
        long newAccumulator;
        do {
            oldAccumulator = accumulator.get();
            newAccumulator = Double.doubleToLongBits(Double.longBitsToDouble(oldAccumulator) * (1.0 - accumulatorFraction));
        } while (!accumulator.compareAndSet(oldAccumulator, newAccumulator));

        double oldAccumulatorValue = Double.longBitsToDouble(oldAccumulator);

        double oldBucketContents = buckets[bucketNum];
        double newBucketContents = oldAccumulatorValue * accumulatorFraction;

        buckets[bucketNum] = newBucketContents;
        bucketNum = (bucketNum + 1) % buckets.length;

        sum = sum - oldBucketContents + newBucketContents;
    }
}

Далее идет EventMeterManager, который использует рабочий поток для продвижения "ведра" в коллекции EventMeters при постоянной скорости.

public final class EventMeterManager {
    private final DelayQueue<AdvanceMeterTask> meterQueue;
    private final Map<EventMeter, AdvanceMeterTask> tasks;
    private final AtomicInteger activeMeters;
    private AtomicReference<Thread> worker;

    public EventMeterManager() {
        this.meterQueue = new DelayQueue<AdvanceMeterTask>();
        this.worker = new AtomicReference<Thread>();
        this.tasks = new ConcurrentHashMap<EventMeter, AdvanceMeterTask>();
        this.activeMeters = new AtomicInteger(0);
    }

    public EventMeter startEventMeter(int numBuckets, long bucketLengthNanos) {
        final EventMeter meter = new EventMeter(numBuckets, bucketLengthNanos);

        if (activeMeters.incrementAndGet() == 1) {
            final Thread newWorker = new Thread(new DriverWorker(), "EventMeterManager worker");

            // It will either already be null or it will be shortly.
            while (!worker.compareAndSet(null, newWorker)) {}

            newWorker.start();
        }

        final AdvanceMeterTask advanceTask = new AdvanceMeterTask(meter, System.nanoTime());
        tasks.put(meter, advanceTask);
        meterQueue.put(advanceTask);

        return meter;
    }

    public void stopEventMeter(EventMeter meter) {
        final AdvanceMeterTask cancelledTask = tasks.remove(meter);
        if (cancelledTask != null) {
            cancelledTask.cancel();

            if (activeMeters.decrementAndGet() == 0) {
                worker.get().interrupt();
                worker.set(null);
            }
        }
    }

    private class DriverWorker implements Runnable {
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    final AdvanceMeterTask task = meterQueue.take();
                    task.run();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private class AdvanceMeterTask implements Delayed, Runnable {
        private final EventMeter meter;
        private long bucketStartTime;
        private long bucketEndTime;
        private volatile boolean cancelled;

        public AdvanceMeterTask(final EventMeter meter, final long firstBucketStartTime) {
            this.meter = meter;
            this.bucketStartTime = firstBucketStartTime;
            this.bucketEndTime = this.bucketStartTime + meter.getBucketLengthNanos();
        }

        @Override
        public int compareTo(Delayed o) {
            final long difference = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
            if (difference < 0) {
                return -1;
            } else if (difference > 0) {
                return 1;
            } else {
                return 0;
            }
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(bucketEndTime - System.nanoTime(), TimeUnit.NANOSECONDS);
        }

        @Override
        public void run() {
            if (!cancelled) {
                final long now = System.nanoTime();
                final long bucketLength = meter.getBucketLengthNanos();

                while (bucketEndTime < now) {
                    double bucketFraction = (double) bucketLength / (now - bucketStartTime);
                    meter.fillBucketFromAccumulator(bucketFraction);
                    bucketStartTime = bucketEndTime;
                    bucketEndTime += bucketLength;
                }

                meterQueue.put(this);
            }
        }

        public void cancel() {
            this.cancelled = true;
        }
    }
}

В дополнение к общим комментарием (что всегда приветствуется) хотелось бы конкретные отзывы, если можно на эти вещи:

  • Эти классы как потокобезопасные, как я надеюсь, что они?
  • Эти классы кажутся довольно вкупе, но в интересах не создавать кучу лишних ниток я не хочу EventMeter, чтобы создать свой собственный рабочий поток. Есть ли подход я могу взять EventMeters позволяет разделить поток, не требуя от клиента использовать EventMeterManager?


1310
3
задан 6 августа 2011 в 03:08 Источник Поделиться
Комментарии
1 ответ

fillBucketFromAccumulator() не является потокобезопасным. Два потока могут писать в один и тот же слот в массиве, а потом оба на один.

2
ответ дан 9 августа 2011 в 01:08 Источник Поделиться