Cythonic Лапласа решения на 3D регулярной сетке


Я следовал официальной на Cython для пользователей пакете numpy , чтобы сделать Лапласа решатель на 3D регулярной сетке с условиями Дирихле.

Сейчас я доволен результатами его возвращает, но я интересно, если есть более эффективный способ, чтобы этот код, тем более, что учебник я следовал составляет около 10 лет. У меня очень ограниченные знания в C и я не уверен, если типы данных (ctypedef) правильно выбрали здесь и/или окажет значительное влияние на производительность. Я пробовал видах моем индексов переменных, как unsigned int по-разному, но в итоге я дала, как я не совсем уверен, если это актуально.

Кроме того, я рад любым общий стиль кодирования и хороший совет практики.

Код Python, который вызывает мои функции на Cython внутри класса и выглядит так:

def _solve_laplacian(self):
    init = np.zeros_like(self.domain, float)
    init[np.logical_not(self.border)] = 1
    log.info("Solving Laplacian...")
    laplace_grid, iterations, max_error = laplace_solver(
        self.domain_idx_i,
        self.domain_idx_j,
        self.domain_idx_k,
        init,
        self.laplace_tolerance,
        self.laplace_max_iter,
        self.spacing)
    log.debug(
        f"Laplacian: {iterations} iterations, max_error = {max_error}")
    self.cropped_laplace_grid = laplace_grid
    self.laplacian_iterations = iterations
    self.laplacian_max_error = max_error

а вот саму функцию на Cython:

import numpy as np
cimport numpy as np
cimport cython

ctypedef np.float64_t DTYPE_FLOAT
ctypedef np.int_t DTYPE_INT


# dirty fix to get rid of annoying warnings
np.seterr(divide='ignore', invalid='ignore')

@cython.boundscheck(False)
@cython.wraparound(False)
def solve_3D(np.ndarray[DTYPE_INT, ndim=1] domain_idx_i,
             np.ndarray[DTYPE_INT, ndim=1] domain_idx_j,
             np.ndarray[DTYPE_INT, ndim=1] domain_idx_k,
             np.ndarray[DTYPE_FLOAT, ndim=3] init,
             DTYPE_FLOAT tolerance,
             DTYPE_INT max_iterations,
             np.ndarray[DTYPE_FLOAT, ndim=1] spacing):
    cdef np.ndarray[DTYPE_FLOAT, ndim=3] laplace_grid = np.copy(init)
    cdef np.ndarray[DTYPE_FLOAT, ndim=3] prev_laplace_grid = np.copy(
        laplace_grid)
    cdef DTYPE_INT iteration = 0
    cdef DTYPE_FLOAT max_error = 1.0
    cdef DTYPE_INT n_points = len(domain_idx_i)
    cdef DTYPE_INT i, j, k, n
    cdef DTYPE_FLOAT error
    cdef DTYPE_FLOAT value
    cdef DTYPE_FLOAT hi, hj, hk, hi2, hj2, hk2, factor

    hi, hj, hk = spacing
    hi2, hj2, hk2 = spacing ** 2

    factor = (hi2 * hj2 * hk2) / (2 * (hi2 * hj2 + hi2 * hk2 + hj2 * hk2))

    while max_error > tolerance and iteration < max_iterations:
        iteration += 1
        prev_laplace_grid[:] = laplace_grid
        for n in range(n_points):
            i = domain_idx_i[n]
            j = domain_idx_j[n]
            k = domain_idx_k[n]
            value = \
                (
                    (laplace_grid[i + 1, j, k] +
                     laplace_grid[i - 1, j, k]) / hi2 +
                    (laplace_grid[i, j + 1, k] +
                     laplace_grid[i, j - 1, k]) / hj2 +
                    (laplace_grid[i, j, k - 1] +
                     laplace_grid[i, j, k + 1]) / hk2
                ) * factor
            laplace_grid[i, j, k] = value
        max_error = np.nanmax(
            np.abs(
                (prev_laplace_grid[domain_idx_i, domain_idx_j, domain_idx_k] -
                 laplace_grid[domain_idx_i, domain_idx_j, domain_idx_k]) /
                prev_laplace_grid[domain_idx_i, domain_idx_j, domain_idx_k]))
    return laplace_grid, iteration, max_error

Если вы заинтересованы в полной питона пакет это часть, это здесь и совсем некрасиво сейчас. Я пытаюсь улучшить его, так что я не стыжусь этого.



108
1
задан 9 февраля 2018 в 12:02 Источник Поделиться
Комментарии
1 ответ

Мне помогли на #scipy IRC-канал пользователя @ngoldbaum и может существенно ускорить мой код:


  1. Избегая использовать любые numpy методы в основном цикле

  2. С использованием OpenMP распараллеливание

  3. Используя истинной отдела, так как нет риска, о делении на ноль

  4. Избегая хранить предыдущие значения итерации

...и некоторые другие мелкие вещи.

Вот что мой решатель выглядит сейчас:

import numpy as np
cimport numpy as np
cimport cython
from cython.parallel import prange
from libc.math cimport fabs

ctypedef np.float64_t DTYPE_FLOAT
ctypedef np.int_t DTYPE_INT

# dirty fix to get rid of annoying warnings
np.seterr(divide='ignore', invalid='ignore')

@cython.boundscheck(False)
@cython.wraparound(False)
cdef bint has_converged(np.ndarray[DTYPE_FLOAT, ndim=1] errors,
DTYPE_INT n,
DTYPE_FLOAT tolerance):
cdef bint res = True
cdef DTYPE_INT i
for i in prange(n, nogil=True): # is prange useful here ?
if errors[i] > tolerance:
res = False
break
return res

@cython.boundscheck(False)
@cython.wraparound(False)
@cython.cdivision(True)
def solve_3D(np.ndarray[DTYPE_INT, ndim=1] domain_idx_i,
np.ndarray[DTYPE_INT, ndim=1] domain_idx_j,
np.ndarray[DTYPE_INT, ndim=1] domain_idx_k,
np.ndarray[DTYPE_FLOAT, ndim=3] init,
DTYPE_FLOAT tolerance,
DTYPE_INT max_iterations,
np.ndarray[DTYPE_FLOAT, ndim=1] spacing):
cdef np.ndarray[DTYPE_FLOAT, ndim=3] laplace_grid = np.copy(init)
cdef DTYPE_INT iteration = 0
cdef DTYPE_INT n_points = len(domain_idx_i)
cdef DTYPE_INT i, j, k, n
cdef DTYPE_FLOAT value
cdef DTYPE_FLOAT hi, hj, hk, hi2, hj2, hk2, factor, prev_value
cdef np.ndarray[DTYPE_FLOAT, ndim=1] errors = np.zeros(n_points,
np.float64)
cdef bint convergence = False

hi, hj, hk = spacing
hi2, hj2, hk2 = spacing ** 2

factor = (hi2 * hj2 * hk2) / (2 * (hi2 * hj2 + hi2 * hk2 + hj2 * hk2))

while not convergence and iteration < max_iterations:
iteration += 1
for n in prange(n_points, nogil=True):
i = domain_idx_i[n]
j = domain_idx_j[n]
k = domain_idx_k[n]
value = ((laplace_grid[i + 1, j, k] +
laplace_grid[i - 1, j, k]) / hi2 +
(laplace_grid[i, j + 1, k] +
laplace_grid[i, j - 1, k]) / hj2 +
(laplace_grid[i, j, k - 1] +
laplace_grid[i, j, k + 1]) / hk2) * factor
prev_value = laplace_grid[i, j, k]
laplace_grid[i, j, k] = value
errors[n] = fabs((prev_value - value) / prev_value)

if iteration == 1:
convergence = False
else:
convergence = has_converged(errors, n, tolerance)

return laplace_grid, iteration, np.nanmax(errors)

Я думаю, что это могло быть немного лучше, если errors были проверены "на лету", убрав вторую for петли каждый while итерации. Однако, я не стал выяснять, чтобы сделать это эффективно в многопоточных prange... может оно того не стоит!

1
ответ дан 15 февраля 2018 в 12:02 Источник Поделиться