为了账号安全,请及时绑定邮箱和手机立即绑定

如何在python中矢量化包含eigvalsh的复杂代码

如何在python中矢量化包含eigvalsh的复杂代码

阿波罗的战车 2021-09-11 16:03:01
我有以下代码(对不起,它不是太小,我已经尝试从原始代码中减少它)。基本上,我在运行以下eval_s()方法/函数时遇到了性能问题:1) 找到 4x4 厄米矩阵的 4 个特征值 eigvalsh()2)将特征值的倒数和成一个变量result3) 我对许多由 参数化的矩阵重复步骤 1 和 2 x,y,z,将累积和存储在 中result。我在第 3 步中重复计算(查找特征值和求和)的次数取决于ksep我的代码中的一个变量,我需要这个数字在我的实际代码中增加(即,ksep必须减少)。但是计算中eval_s()有一个 for 循环x,y,z,我猜这确实会减慢速度。[试着ksep=0.5明白我的意思。]有没有办法向量化我的示例代码中指示的方法(或者一般来说,涉及查找参数化矩阵的特征值的函数)?代码:import numpy as npimport sympy as spimport itertools as itfrom sympy.abc import x, y, zclass Solver:    def __init__(self, vmat):        self._vfunc = sp.lambdify((x, y, z),                                  expr=vmat,                                  modules='numpy')        self._q_count, self._qs = None, []  # these depend on ksep!    ################################################################    # How to vectorize this?    def eval_s(self, stiff):        assert len(self._qs) == self._q_count, "Run 'populate_qs' first!"        result = 0        for k in self._qs:            evs = np.linalg.eigvalsh(self._vfunc(*k))            result += np.sum(np.divide(1., (stiff + evs)))        return result.real - 4 * self._q_count    ################################################################    def populate_qs(self, ksep: float = 1.7):        self._qs = [(kx, ky, kz) for kx, ky, kz                    in it.product(np.arange(-3*np.pi, 3.01*np.pi, ksep),                                  np.arange(-3*np.pi, 3.01*np.pi, ksep),                                  np.arange(-3*np.pi, 3.01*np.pi, ksep))]        self._q_count = len(self._qs)ps 代码的 sympy 部分可能看起来很奇怪,但它在我的原始代码中起到了作用。
查看完整描述

2 回答

?
三国纷争

TA贡献1804条经验 获得超7个赞

您可以,方法如下:


def eval_s_vectorized(self, stiff):

    assert len(self._qs) == self._q_count, "Run 'populate_qs' first!"

    mats = np.stack([self._vfunc(*k) for k in self._qs], axis=0)

    evs = np.linalg.eigvalsh(mats)

    result = np.sum(np.divide(1., (stiff + evs)))

    return result.real - 4 * self._q_count

这仍然使 Sympy 表达式的评估未向量化。这部分矢量化有点棘手,主要是因为1输入矩阵中的s。您可以通过修改来制作代码的完全矢量化版本,Solver以便用数组常量替换标量常量vmat:


import itertools as it

import numpy as np

import sympy as sp

from sympy.abc import x, y, z

from sympy.core.numbers import Number

from sympy.utilities.lambdify import implemented_function


xones = implemented_function('xones', lambda x: np.ones(len(x)))

lfuncs = {'xones': xones}


def vectorizemat(mat):

    ret = mat.copy()

    # get the first element of the set of symbols that mat uses

    for x in mat.free_symbols: break

    for i,j in it.product(*(range(s) for s in mat.shape)):

        if isinstance(mat[i,j], Number):

            ret[i,j] = xones(x) * mat[i,j]

    return ret


class Solver:

    def __init__(self, vmat):

        self._vfunc = sp.lambdify((x, y, z),

                                  expr=vectorizemat(vmat),

                                  modules=[lfuncs, 'numpy'])

        self._q_count, self._qs = None, []  # these depend on ksep!


    def eval_s_vectorized_completely(self, stiff):

        assert len(self._qs) == self._q_count, "Run 'populate_qs' first!"

        evs = np.linalg.eigvalsh(self._vfunc(*self._qs.T).T)

        result = np.sum(np.divide(1., (stiff + evs)))

        return result.real - 4 * self._q_count


    def populate_qs(self, ksep: float = 1.7):

        self._qs = np.array([(kx, ky, kz) for kx, ky, kz

                    in it.product(np.arange(-3*np.pi, 3.01*np.pi, ksep),

                                  np.arange(-3*np.pi, 3.01*np.pi, ksep),

                                  np.arange(-3*np.pi, 3.01*np.pi, ksep))])

        self._q_count = len(self._qs)

测试/时间

对于小型ksep矢量化版本比原始版本快约 2 倍,完全矢量化版本比原始版本快约 20 倍:


# old version for ksep=.3

import timeit

print(timeit.timeit("test()", setup="from __main__ import test", number=10))

-85240.46154500882

-85240.46154500882

-85240.46154500882

-85240.46154500882

-85240.46154500882

-85240.46154500882

-85240.46154500882

-85240.46154500882

-85240.46154500882

-85240.46154500882

118.42847006605007


# vectorized version for ksep=.3

import timeit

print(timeit.timeit("test()", setup="from __main__ import test", number=10))

-85240.46154498367

-85240.46154498367

-85240.46154498367

-85240.46154498367

-85240.46154498367

-85240.46154498367

-85240.46154498367

-85240.46154498367

-85240.46154498367

-85240.46154498367

64.95763925800566


# completely vectorized version for ksep=.3

import timeit

print(timeit.timeit("test()", setup="from __main__ import test", number=10))

-85240.46154498367

-85240.46154498367

-85240.46154498367

-85240.46154498367

-85240.46154498367

-85240.46154498367

-85240.46154498367

-85240.46154498367

-85240.46154498367

-85240.46154498367

5.648927717003971

矢量化版本结果中的舍入误差与原始结果略有不同。这大概是由于result计算总和的方式不同造成的。


查看完整回答
反对 回复 2021-09-11
?
撒科打诨

TA贡献1934条经验 获得超2个赞

已经完成了大部分工作。以下是如何在 20 倍的基础上再获得 2 倍的加速。


手动做线性代数。当我尝试这样做时,我很震惊 numpy 在小矩阵上是多么浪费:


>>> from timeit import timeit


# using eigvalsh

>>> print(timeit("test(False, 0.1)", setup="from __main__ import test", number=3))

-2301206.495955009

-2301206.495955009

-2301206.495955009

55.794611917983275

>>> print(timeit("test(False, 0.3)", setup="from __main__ import test", number=5))

-85240.46154498367

-85240.46154498367

-85240.46154498367

-85240.46154498367

-85240.46154498367

3.400342195003759


# by hand

>>> print(timeit("test(True, 0.1)", setup="from __main__ import test", number=3))

-2301206.495955076

-2301206.495955076

-2301206.495955076

26.67294767702697

>>> print(timeit("test(True, 0.3)", setup="from __main__ import test", number=5))

-85240.46154498379

-85240.46154498379

-85240.46154498379

-85240.46154498379

-85240.46154498379

1.5047460949863307

请注意,部分加速可能被共享代码掩盖了,仅在线性代数上似乎更多,尽管我没有仔细检查。


一个警告:我在矩阵的 2by2 分割上使用 Schur 补码来计算逆矩阵的对角元素。如果 Schur 补集不存在,即如果左上角或右下角子矩阵不可逆,这将失败。


这是修改后的代码:


import itertools as it

import numpy as np

import sympy as sp

from sympy.abc import x, y, z

from sympy.core.numbers import Number

from sympy.utilities.lambdify import implemented_function


xones = implemented_function('xones', lambda x: np.ones(len(x)))

lfuncs = {'xones': xones}


def vectorizemat(mat):

    ret = mat.copy()

    for x in mat.free_symbols: break

    for i,j in it.product(*(range(s) for s in mat.shape)):

        if isinstance(mat[i,j], Number):

            ret[i,j] = xones(x) * mat[i,j]

    return ret


class Solver:

    def __init__(self, vmat):

        vmat = vectorizemat(vmat)

        self._vfunc = sp.lambdify((x, y, z),

                                  expr=vmat,

                                  modules=[lfuncs, 'numpy'])

        self._q_count, self._qs = None, []  # these depend on ksep!


    def eval_s_vectorized_completely(self, stiff):

        assert len(self._qs) == self._q_count, "Run 'populate_qs' first!"

        mats = self._vfunc(*self._qs.T).T

        evs = np.linalg.eigvalsh(mats)

        result = np.sum(np.divide(1., (stiff + evs)))

        return result.real - 4 * self._q_count


    def eval_s_pp(self, stiff):

        assert len(self._qs) == self._q_count, "Run 'populate_qs' first!"

        mats = self._vfunc(*self._qs.T).T

        np.einsum('...ii->...i', mats)[...] += stiff

        (A, B), (C, D) = mats.reshape(-1, 2, 2, 2, 2).transpose(1, 3, 0, 2, 4)

        res = 0

        for AA, BB, CC, DD in ((A, B, C, D), (D, C, B, A)):

            (a, b), (c, d) = DD.transpose(1, 2, 0)

            rdet = 1 / (a*d - b*b)[:, None]

            iD = DD[..., ::-1, ::-1].copy()

            iD.reshape(-1, 4)[..., 1:3] *= -rdet

            np.einsum('...ii->...i', iD)[...] *= rdet

            (Aa, Ab), (Ac, Ad) = AA.transpose(1, 2, 0)

            (Ba, Bb), (Bc, Bd) = BB.transpose(1, 2, 0)

            (Da, Db), (Dc, Dd) = iD.transpose(1, 2, 0)

            a = Aa - Ba*Ba*Da - 2*Bb*Ba*Db - Bb*Bb*Dd

            d = Ad - Bd*Bd*Dd - 2*Bc*Bd*Db - Bc*Bc*Da

            b = Ab - Ba*Bc*Da - Ba*Bd*Db - Bb*Bd*Dd - Bb*Bc*Dc

            res += ((a + d) / (a*d - b*b)).sum()

        return res - 4 * self._q_count


    def populate_qs(self, ksep: float = 1.7):

        self._qs = np.array([(kx, ky, kz) for kx, ky, kz

                    in it.product(np.arange(-3*np.pi, 3.01*np.pi, ksep),

                                  np.arange(-3*np.pi, 3.01*np.pi, ksep),

                                  np.arange(-3*np.pi, 3.01*np.pi, ksep))])

        self._q_count = len(self._qs)



def test(manual=False, ksep=0.3):

    vmat = sp.Matrix([[1, sp.cos(x/4+y/4), sp.cos(x/4+z/4), sp.cos(y/4+z/4)],

                      [sp.cos(x/4+y/4), 1, sp.cos(y/4-z/4), sp.cos(x/4 - z/4)],

                      [sp.cos(x/4+z/4), sp.cos(y/4-z/4), 1, sp.cos(x/4-y/4)],

                      [sp.cos(y/4+z/4), sp.cos(x/4-z/4), sp.cos(x/4-y/4), 1]]) * 2

    solver = Solver(vmat)

    solver.populate_qs(ksep=ksep)  # <---- Performance starts to worsen (in eval_s) when ksep is reduced!

    if manual:

        print(solver.eval_s_pp(0.65))

    else:

        print(solver.eval_s_vectorized_completely(0.65))


查看完整回答
反对 回复 2021-09-11
  • 2 回答
  • 0 关注
  • 159 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信