Работник, бассейн и рабочих мест


Я создал фрагмент кода, что позволяет мне создавать работнику бассейн, а из рабочего пула я могу выполнить кучу заданий.

Я могу настроить каждого задания, чтобы иметь максимальное число повторов, и устанавливает функцию обратного вызова, которая будет вызвана в конце каждого задания с логическим аргументом, представляет ли работа в конечном счете не удалось или удалось.

Это первый "параллельный" кусок кода я написал, поэтому он не может быть лучшим.

Он отлично работает, как я ожидаю, и проходит все мои тесты, даже когда я запускаю сотни тестов снова и снова.

Я хотел бы кто-то с наметанным глазам в эту область, чтобы посмотреть на мой код и скажите мне, где там могут быть некоторые недостатки и направления совершенствования.

Это как использовать код:

log := logger.New()
w := worker.NewWorker(log)
go w.Run(
    w.NewJob(func() error {
        return errors.New("foo")
    }),
    w.NewJob(func() error {
        return nil
    }).SetFinally(func(success bool) {
        if success {
            // ...
        }
    }),
)

Этот пакет (если вы хотите запустить его самостоятельно, просто снимите logger логика, или создать макет регистратора, и вы можете запустить его просто отлично:

package worker

import (
    "fmt"
    "math"
    "math/rand"
    "sync"
    "time"

    "core/logger"
)

const (
    // MaxConcurrentRunners is the limit of jobs that can be ran concurrently.
    MaxConcurrentRunners = 1000

    // MaxRetryLimit is the maximum amount of retries for a failed job.
    MaxRetryLimit = 10

    // DefaultRetryLimit is the default amount of retries for a failed job.
    DefaultRetryLimit = 3
)

// Worker is just an alias for Pool. It's nice to have so i a type can be used
// like worker.Worker instead of worker.Pool.
type Worker interface {
    Pool
}

// Pool is an interface for a worker pool.
type Pool interface {
    NewJob(handler func() error) Job
    Run(jobs ...Job)
    Flushed() bool
}

// Job is an interface for a job.
type Job interface {
    ID() string
    SetRetryLimit(retryLimit int) Job
    RetryLimit() int
    IncrementAttempts()
    CurrentAttempts() int
    SetHandler(handler func() error)
    Handler() func() error
    SetFinally(finally func(success bool)) Job
    Finally(success bool) Job
    Done() chan bool
    SignalDone()
}

// workerPool represents a worker pool.
type workerPool struct {
    id        string
    log       logger.Logger
    semaphore chan struct{}
}

// NewWorker returns a new pool, which the Worker interface implements.
func NewWorker(log logger.Logger) Worker {
    return NewPool(log)
}

// NewPool returns a new worker instance.
func NewPool(log logger.Logger) Pool {
    w := new(workerPool)
    w.id = randomString()
    w.log = log
    w.semaphore = make(chan struct{}, MaxConcurrentRunners)

    return w
}

// NewJob creates a new job for a worker pool.
func (w *workerPool) NewJob(handler func() error) Job {
    j := new(job)
    j.id = randomString()
    j.retryLimit = DefaultRetryLimit
    j.handler = handler
    j.done = make(chan bool)

    return j
}

// ID will return the ID of a pool.
func (w *workerPool) ID() string {
    return w.id
}

// Flushed checks whether the worker pool is flushed or not (has no active jobs in the buffer).
func (w *workerPool) Flushed() bool {
    return len(w.semaphore) == 0
}

// DoWork will begin processing the jobs.
func (w *workerPool) Run(jobs ...Job) {
    // Cache the count of jobs.
    l := len(jobs)

    // Create a new wait group and set the counter to the count of jobs.
    wg := new(sync.WaitGroup)
    wg.Add(l)

    // Process each job.
    for _, job := range jobs {
        // Block pool buffer is full.
        w.semaphore <- struct{}{}

        go func(job Job) {
            // Log start of job processing.
            w.log.Info(fmt.Sprintf("Worker pool (%s): Started job (%s)", w.ID(), job.ID()))

            // Execute the job.
            go func() {
                w.run(wg, job)
            }()

            // Wait for the job to be signaled as complete.
            <-job.Done()

            // Release a slot in the pool buffer.
            <-w.semaphore

            // Decrement the wait group.
            wg.Done()

            // Log end of job processing.
            w.log.Info(fmt.Sprintf("Worker pool (%s): Completed job (%s)", w.ID(), job.ID()))
        }(job)
    }

    // Wait for the wait group counter to be depleted.
    wg.Wait()
}

// run will process the job until it succeeds or reaches the maximum retries.
func (w *workerPool) run(wg *sync.WaitGroup, job Job) {
    defer func() {
        job.SignalDone()
    }()

    // Execute job.
    if err := job.Handler()(); err != nil {
        for {
            // Increment counter.
            job.IncrementAttempts()

            // Wait retry period.
            timer := time.NewTimer(ExponentialBackoff(job.CurrentAttempts()))
            <-timer.C

            // Execute job.
            if err := job.Handler()(); err != nil {
                // Maximum attempts reached without success.
                if job.CurrentAttempts() >= job.RetryLimit() {
                    job.Finally(false)
                    w.log.Error(err)
                    return
                }

                continue
            } else {
                break
            }
        }
    }

    job.Finally(true)
}

// job represents a job for a worker pool.
type job struct {
    id              string
    retryLimit      int
    currentAttempts int
    handler         func() error
    finally         func(success bool)
    done            chan bool
}

// ID will return the ID of a job.
func (j *job) ID() string {
    return j.id
}

// Done returns a channel that signals when the job is done
func (j *job) Done() chan bool {
    return j.done
}

// SignalDone will signal when a job is done. This can also be used from outside the
// worker to cancel a job, etc.
func (j *job) SignalDone() {
    j.done <- true
}

// SetRetryLimit will set the jobs retry limit.
func (j *job) SetRetryLimit(retryLimit int) Job {
    if retryLimit <= 0 {
        j.retryLimit = DefaultRetryLimit
    } else if retryLimit > MaxRetryLimit {
        j.retryLimit = MaxRetryLimit
    } else {
        j.retryLimit = retryLimit
    }

    return j
}

// RetryLimit will get the jobs retry limit.
func (j *job) RetryLimit() int {
    return j.retryLimit
}

// CurrentAttempts will get the jobs current attempts.
func (j *job) CurrentAttempts() int {
    return j.currentAttempts
}

// IncrementAttempts increments the number of attempts on this job.
func (j *job) IncrementAttempts() {
    j.currentAttempts++
}

// SetHandler will set the jobs handler.
func (j *job) SetHandler(handler func() error) {
    j.handler = handler
}

// Handler will get the jobs handler.
func (j *job) Handler() func() error {
    return j.handler
}

// SetFinally will set the finally function of the job, which will be called upon job completion.
func (j *job) SetFinally(finally func(success bool)) Job {
    j.finally = finally

    return j
}

// Finally will call finally.
func (j *job) Finally(success bool) Job {
    if j.finally != nil {
        j.finally(success)
    }

    return j
}

// ExponentialBackoff will give a duration using an exponential backup.
//
// Example failedAttempts:
// 1: 500ms
// 2: 1s
// 3: 2s
// 4: 4s
// 5: 8s
// 6: 16s
// 7: 32s
// 8: 1m4s
// 9: 2m8s
// 10: 4m16s
func ExponentialBackoff(failedAttempts int) time.Duration {
    return time.Duration(float64(time.Second) * math.Pow(2, float64(failedAttempts)) * .25)
}

// randomString will generate a random string.
func randomString() string {
    const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

    l := len(chars)
    res := make([]byte, 30)
    for i := range res {
        res[i] = chars[rand.Intn(l)]
    }

    return string(res)
}


244
3
задан 2 марта 2018 в 02:03 Источник Поделиться
Комментарии