为了账号安全,请及时绑定邮箱和手机立即绑定

简单的Python挑战:数据缓冲区上最快的按位异或

简单的Python挑战:数据缓冲区上最快的按位异或

慕莱坞森 2019-11-13 13:15:13
挑战:对两个相等大小的缓冲区执行按位XOR。缓冲区将要求是python str类型,因为传统上这是python中数据缓冲区的类型。将结果值作为返回str。尽快执行此操作。输入是两个1兆字节(2 ** 20字节)字符串。目前的挑战是基本使用python的或现有的第三方Python模块打我的低效算法(放宽的规定:或者创建自己的模块)的边际增加是无用的。from os import urandomfrom numpy import frombuffer,bitwise_xor,bytedef slow_xor(aa,bb):    a=frombuffer(aa,dtype=byte)    b=frombuffer(bb,dtype=byte)    c=bitwise_xor(a,b)    r=c.tostring()    return raa=urandom(2**20)bb=urandom(2**20)def test_it():    for x in xrange(1000):        slow_xor(aa,bb)
查看完整描述

3 回答

?
小怪兽爱吃肉

TA贡献1852条经验 获得超1个赞

第一次尝试

使用scipy.weave和SSE2内在函数会带来一点改进。第一次调用要慢一些,因为需要从磁盘加载代码并进行缓存,随后的调用会更快:


import numpy

import time

from os import urandom

from scipy import weave


SIZE = 2**20


def faster_slow_xor(aa,bb):

    b = numpy.fromstring(bb, dtype=numpy.uint64)

    numpy.bitwise_xor(numpy.frombuffer(aa,dtype=numpy.uint64), b, b)

    return b.tostring()


code = """

const __m128i* pa = (__m128i*)a;

const __m128i* pend = (__m128i*)(a + arr_size);

__m128i* pb = (__m128i*)b;

__m128i xmm1, xmm2;

while (pa < pend) {

  xmm1 = _mm_loadu_si128(pa); // must use unaligned access 

  xmm2 = _mm_load_si128(pb); // numpy will align at 16 byte boundaries

  _mm_store_si128(pb, _mm_xor_si128(xmm1, xmm2));

  ++pa;

  ++pb;

}

"""


def inline_xor(aa, bb):

    a = numpy.frombuffer(aa, dtype=numpy.uint64)

    b = numpy.fromstring(bb, dtype=numpy.uint64)

    arr_size = a.shape[0]

    weave.inline(code, ["a", "b", "arr_size"], headers = ['"emmintrin.h"'])

    return b.tostring()

第二次尝试

考虑到注释,我重新检查了代码,以查明是否可以避免复制。原来我读错了字符串对象的文档,所以这是我的第二次尝试:


support = """

#define ALIGNMENT 16

static void memxor(const char* in1, const char* in2, char* out, ssize_t n) {

    const char* end = in1 + n;

    while (in1 < end) {

       *out = *in1 ^ *in2;

       ++in1; 

       ++in2;

       ++out;

    }

}

"""


code2 = """

PyObject* res = PyString_FromStringAndSize(NULL, real_size);


const ssize_t tail = (ssize_t)PyString_AS_STRING(res) % ALIGNMENT;

const ssize_t head = (ALIGNMENT - tail) % ALIGNMENT;


memxor((const char*)a, (const char*)b, PyString_AS_STRING(res), head);


const __m128i* pa = (__m128i*)((char*)a + head);

const __m128i* pend = (__m128i*)((char*)a + real_size - tail);

const __m128i* pb = (__m128i*)((char*)b + head);

__m128i xmm1, xmm2;

__m128i* pc = (__m128i*)(PyString_AS_STRING(res) + head);

while (pa < pend) {

    xmm1 = _mm_loadu_si128(pa);

    xmm2 = _mm_loadu_si128(pb);

    _mm_stream_si128(pc, _mm_xor_si128(xmm1, xmm2));

    ++pa;

    ++pb;

    ++pc;

}

memxor((const char*)pa, (const char*)pb, (char*)pc, tail);

return_val = res;

Py_DECREF(res);

"""


def inline_xor_nocopy(aa, bb):

    real_size = len(aa)

    a = numpy.frombuffer(aa, dtype=numpy.uint64)

    b = numpy.frombuffer(bb, dtype=numpy.uint64)

    return weave.inline(code2, ["a", "b", "real_size"], 

                        headers = ['"emmintrin.h"'], 

                        support_code = support)

区别在于字符串是在C代码内部分配的。不可能按照SSE2指令的要求将其对齐在16字节的边界上,因此使用字节访问来复制开头和结尾未对齐的内存区域。


无论如何,使用numpy数组传递输入数据,因为weave坚持将Python str对象复制到std::strings。frombuffer不会复制,因此很好,但是内存未对齐16字节,因此我们需要使用_mm_loadu_si128而不是更快的_mm_load_si128。


而不是使用_mm_store_si128,而是使用_mm_stream_si128,它可以确保所有写入均尽快流式传输到主存储器中-这样,输出数组不会耗尽宝贵的缓存行。


时机

至于时间安排,slow_xor第一次编辑中的条目指的是我的改进版本(内联按位xor,uint64),我消除了这种困惑。slow_xor指的是来自原始问题的代码。所有计时都完成了1000次运行。


slow_xor:1.85秒(1倍)

faster_slow_xor:1.25s(1.48x)

inline_xor:0.95秒(1.95倍)

inline_xor_nocopy:0.32s(5.78x)

该代码是使用gcc 4.4.3编译的,我已经验证了编译器实际上使用了SSE指令。


查看完整回答
反对 回复 2019-11-13
?
ibeautiful

TA贡献1993条经验 获得超5个赞

这是我的cython结果


slow_xor   0.456888198853

faster_xor 0.400228977203

cython_xor 0.232881069183

cython_xor_vectorised 0.171468019485

在cython中进行矢量化可将计算机上的for循环节省25%左右的时间,但是花了一半以上的时间来构建python字符串(该return语句)-我认为(合法地)避免多余的拷贝是可能的,因为数组可能包含空字节。


非法的方法是传入Python字符串并对其进行适当的突变,这会使函数的速度加倍。


xor.py


from time import time

from os import urandom

from numpy import frombuffer,bitwise_xor,byte,uint64

import pyximport; pyximport.install()

import xor_


def slow_xor(aa,bb):

    a=frombuffer(aa,dtype=byte)

    b=frombuffer(bb,dtype=byte)

    c=bitwise_xor(a,b)

    r=c.tostring()

    return r


def faster_xor(aa,bb):

    a=frombuffer(aa,dtype=uint64)

    b=frombuffer(bb,dtype=uint64)

    c=bitwise_xor(a,b)

    r=c.tostring()

    return r


aa=urandom(2**20)

bb=urandom(2**20)


def test_it():

    t=time()

    for x in xrange(100):

        slow_xor(aa,bb)

    print "slow_xor  ",time()-t

    t=time()

    for x in xrange(100):

        faster_xor(aa,bb)

    print "faster_xor",time()-t

    t=time()

    for x in xrange(100):

        xor_.cython_xor(aa,bb)

    print "cython_xor",time()-t

    t=time()

    for x in xrange(100):

        xor_.cython_xor_vectorised(aa,bb)

    print "cython_xor_vectorised",time()-t


if __name__=="__main__":

    test_it()

xor_.pyx


cdef char c[1048576]

def cython_xor(char *a,char *b):

    cdef int i

    for i in range(1048576):

        c[i]=a[i]^b[i]

    return c[:1048576]


def cython_xor_vectorised(char *a,char *b):

    cdef int i

    for i in range(131094):

        (<unsigned long long *>c)[i]=(<unsigned long long *>a)[i]^(<unsigned long long *>b)[i]

    return c[:1048576]


查看完整回答
反对 回复 2019-11-13
  • 3 回答
  • 0 关注
  • 796 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信