为了账号安全,请及时绑定邮箱和手机立即绑定

如何避免在 heapq 中使用 _siftup 或 _siftdown

如何避免在 heapq 中使用 _siftup 或 _siftdown

慕桂英3389331 2021-12-17 16:41:05
我不知道如何在不使用_siftupor 的情况下有效地解决以下问题_siftdown:当一个元素无序时,如何恢复堆不变式?换句话说,更新old_value中heap至new_value,并保持heap工作。您可以假设old_value堆中只有一个。功能定义如下:def update_value_in_heap(heap, old_value, new_value):这是我的真实场景,有兴趣的可以看看。你可以想象它是一个小型的自动完成系统。我需要统计单词出现的频率,并保持前k个max-count单词,随时准备输出。所以我heap在这里使用。当一个字数++时,如果它在堆中,我需要更新它。所有的词和计数都存储在 trie-tree 的叶子中,heaps存储在 trie-tree 的中间节点中。如果你关心堆外这个词,别担心,我可以从trie-tree的叶子节点中得到它。当用户输入一个单词时,它将首先从堆中读取然后更新它。为了获得更好的性能,我们可以考虑通过批量更新来降低更新频率。那么当一个特定的字数增加时,如何更新堆呢?这是 _siftup 或 _siftdown 版本的简单示例(不是我的场景):>>> from heapq import _siftup, _siftdown, heapify, heappop>>> data = [10, 5, 18, 2, 37, 3, 8, 7, 19, 1]>>> heapify(data)>>> old, new = 8, 22              # increase the 8 to 22>>> i = data.index(old)>>> data[i] = new>>> _siftup(data, i)>>> [heappop(data) for i in range(len(data))][1, 2, 3, 5, 7, 10, 18, 19, 22, 37]>>> data = [10, 5, 18, 2, 37, 3, 8, 7, 19, 1]>>> heapify(data)>>> old, new = 8, 4              # decrease the 8 to 4>>> i = data.index(old)>>> data[i] = new>>> _siftdown(data, 0, i)>>> [heappop(data) for i in range(len(data))][1, 2, 3, 4, 5, 7, 10, 18, 19, 37]索引的成本为 O(n),更新的成本为 O(logn)。heapify是另一种解决方案,但效率低于_siftup或_siftdown。但是_siftupand_siftdown是heapq中的protected成员,所以不建议从外部访问。那么有没有更好更有效的方法来解决这个问题呢?这种情况的最佳实践?感谢您的阅读,我真的很感谢它帮助我。:)
查看完整描述

2 回答

?
慕勒3428872

TA贡献1848条经验 获得超6个赞

使用heapify

您必须记住的一件重要事情是理论复杂性和性能是两个不同的东西(即使它们是相关的)。换句话说,实施也很重要。渐近复杂性为您提供了一些下限,您可以将其视为保证,例如 O(n) 中的算法确保在最坏的情况下,您将执行许多输入大小线性的指令。这里有两件重要的事情:

  1. 常量被忽略,但常量在现实生活中很重要;

  2. 最坏的情况取决于您考虑的算法,而不仅仅是输入。

根据您考虑的主题/问题,第一点可能非常重要。在某些域中,隐藏在渐近复杂性中的常量非常大,以至于您甚至无法构建大于常量的输入(或者考虑该输入是不现实的)。情况并非如此,但这是您始终必须牢记的事情。

给出这两个观察结果,你真的不能说:实现 B 比 A 快,因为 A 派生自 O(n) 算法而 B 派生自 O(log n) 算法。即使这是一个很好的论据,但它并不总是足够的。当所有输入发生的可能性相同时,理论复杂性特别适合比较算法。换句话说,当你的算法非常通用时。

如果您知道您的用例和输入是什么,您可以直接测试性能。使用测试和渐近复杂度将使您对算法的执行方式有一个很好的了解(在极端情况下和任意实际情况下)。

话虽如此,让我们对以下将实施三种不同策略的类运行一些性能测试(这里实际上有四种策略,但Invalidate 和 Reinsert在您的情况下似乎不正确,因为您会使每个项目无效的次数与你看到一个给定的词)。我将包括我的大部分代码,以便您可以仔细检查我没有搞砸(您甚至可以检查完整的笔记本):

from heapq import _siftup, _siftdown, heapify, heappop


class Heap(list):

  def __init__(self, values, sort=False, heap=False):

    super().__init__(values)

    heapify(self)

    self._broken = False

    self.sort = sort

    self.heap = heap or not sort


  # Solution 1) repair using the knowledge we have after every update:        

  def update(self, key, value):

    old, self[key] = self[key], value

    if value > old:

        _siftup(self, key)

    else:

        _siftdown(self, 0, key)

    

  # Solution 2 and 3) repair using sort/heapify in a lazzy way:

  def __setitem__(self, key, value):

    super().__setitem__(key, value)

    self._broken = True

    

  def __getitem__(self, key):

    if self._broken:

        self._repair()

        self._broken = False

    return super().__getitem__(key)


  def _repair(self):  

    if self.sort:

        self.sort()

    elif self.heap:

        heapify(self)


  # … you'll also need to delegate all other heap functions, for example:

  def pop(self):

    self._repair()

    return heappop(self)

我们可以首先检查所有三种方法是否有效:


data = [10, 5, 18, 2, 37, 3, 8, 7, 19, 1]


heap = Heap(data[:])

heap.update(8, 22)

heap.update(7, 4)

print(heap)


heap = Heap(data[:], sort_fix=True)

heap[8] = 22

heap[7] = 4

print(heap)


heap = Heap(data[:], heap_fix=True)

heap[8] = 22

heap[7] = 4

print(heap)

然后我们可以使用以下函数运行一些性能测试:


import time

import random


def rand_update(heap, lazzy_fix=False, **kwargs):

    index = random.randint(0, len(heap)-1)

    new_value = random.randint(max_int+1, max_int*2)

    if lazzy_fix:

        heap[index] = new_value

    else:

        heap.update(index, new_value)

    

def rand_updates(n, heap, lazzy_fix=False, **kwargs):

    for _ in range(n):

        rand_update(heap, lazzy_fix)

        

def run_perf_test(n, data, **kwargs):

    test_heap = Heap(data[:], **kwargs)

    t0 = time.time()

    rand_updates(n, test_heap, **kwargs)

    test_heap[0]

    return (time.time() - t0)*1e3


results = []

max_int = 500

nb_updates = 1


for i in range(3, 7):

    test_size = 10**i

    test_data = [random.randint(0, max_int) for _ in range(test_size)]


    perf = run_perf_test(nb_updates, test_data)

    results.append((test_size, "update", perf))

    

    perf = run_perf_test(nb_updates, test_data, lazzy_fix=True, heap_fix=True)

    results.append((test_size, "heapify", perf))


    perf = run_perf_test(nb_updates, test_data, lazzy_fix=True, sort_fix=True)

    results.append((test_size, "sort", perf))

结果如下:


import pandas as pd

import seaborn as sns


dtf = pd.DataFrame(results, columns=["heap size", "method", "duration (ms)"])

print(dtf)


sns.lineplot(

    data=dtf, 

    x="heap size", 

    y="duration (ms)", 

    hue="method",

)

//img1.sycdn.imooc.com//61bc4d570001dccf11580867.jpg

从这些测试中我们可以看到,这heapify似乎是最合理的选择,它在最坏的情况下具有相当的复杂性:O(n) 并且在实践中表现更好。另一方面,调查其他选项可能是个好主意(例如拥有专用于该特定问题的数据结构,例如使用垃圾箱将单词放入其中,然后将它们从垃圾箱移动到下一个看起来像可能的轨道调查)。


重要说明:这种情况(更新与读取比率为 1:1)对heapify和sort解决方案都不利。所以,如果你管理有AK:1分的比例,这一结论将更加清晰(你可以替换nb_updates = 1与nb_updates = k上面的代码)。


数据框详细信息:


    heap size   method  duration in ms

0        1000   update        0.435114

1        1000  heapify        0.073195

2        1000     sort        0.101089

3       10000   update        1.668930

4       10000  heapify        0.480175

5       10000     sort        1.151085

6      100000   update       13.194084

7      100000  heapify        4.875898

8      100000     sort       11.922121

9     1000000   update      153.587103

10    1000000  heapify       51.237106

11    1000000     sort      145.306110


查看完整回答
反对 回复 2021-12-17
?
RISEBY

TA贡献1856条经验 获得超5个赞

他提供的代码片段完全损坏了!它也很难阅读。 _siftup()被调用 n//2 次,heapify()所以它不能比_siftup()它自己快。

要回答最初的问题,没有更好的方法。如果您担心方法是私有的,请创建自己的方法来做同样的事情。

我唯一同意的一点是,如果您长时间不需要从堆中读取数据,那么在需要时将其延迟可能会有所帮助heapify()。问题是您是否应该为此使用堆。

让我们来看看他的片段的问题:

heapify()函数被多次调用以进行“更新”运行。导致这种情况的错误链如下:

  • 他通过了heap_fix,但期望heap和同样适用于sort

  • 如果self.sort总是Falseself.heap则总是True

  • 他重新定义__getitem__()__setitem__()被称为每次_siftup()_siftdown()分配或读的东西(注:这两个是不是在C调用,因此他们使用__getitem__()__setitem__()

  • 如果self.heapTrue__getitem__()__setitem__()被调用时,_repair()函数被调用每一次_siftup()siftdown()交换的元素。但是调用heapify()是在 C 中完成的,所以__getitem__()不会被调用,也不会以无限循环结束

  • 他重新定义了self.sort这样的称呼,就像他试图做的那样,会失败

  • 他读了一次,但更新一个项目的nb_updates次数,而不是他声称的 1:1

我修正了这个例子,我尽可能地验证它,但我们都会犯错误。随意检查它自己。

代码

import time

import random


from heapq import _siftup, _siftdown, heapify, heappop


class UpdateHeap(list):

    def __init__(self, values):

        super().__init__(values)

        heapify(self)


    def update(self, index, value):

        old, self[index] = self[index], value

        if value > old:

            _siftup(self, index)

        else:

            _siftdown(self, 0, index)


    def pop(self):

        return heappop(self)


class SlowHeap(list):

    def __init__(self, values):

        super().__init__(values)

        heapify(self)

        self._broken = False

        

    # Solution 2 and 3) repair using sort/heapify in a lazy way:

    def update(self, index, value):

        super().__setitem__(index, value)

        self._broken = True

    

    def __getitem__(self, index):

        if self._broken:

            self._repair()

            self._broken = False

        return super().__getitem__(index)


    def _repair(self):

        ...


    def pop(self):

        if self._broken:

            self._repair()

        return heappop(self)


class HeapifyHeap(SlowHeap):


    def _repair(self):

        heapify(self)



class SortHeap(SlowHeap):


    def _repair(self):

        self.sort()


def rand_update(heap):

    index = random.randint(0, len(heap)-1)

    new_value = random.randint(max_int+1, max_int*2)

    heap.update(index, new_value)

    

def rand_updates(update_count, heap):

    for i in range(update_count):

        rand_update(heap)

        heap[0]

        

def verify(heap):

    last = None

    while heap:

        item = heap.pop()

        if last is not None and item < last:

            raise RuntimeError(f"{item} was smaller than last {last}")

        last = item


def run_perf_test(update_count, data, heap_class):

    test_heap = heap_class(data)

    t0 = time.time()

    rand_updates(update_count, test_heap)

    perf = (time.time() - t0)*1e3

    verify(test_heap)

    return perf



results = []

max_int = 500

update_count = 100


for i in range(2, 7):

    test_size = 10**i

    test_data = [random.randint(0, max_int) for _ in range(test_size)]


    perf = run_perf_test(update_count, test_data, UpdateHeap)

    results.append((test_size, "update", perf))

    

    perf = run_perf_test(update_count, test_data, HeapifyHeap)

    results.append((test_size, "heapify", perf))


    perf = run_perf_test(update_count, test_data, SortHeap)

    results.append((test_size, "sort", perf))


import pandas as pd

import seaborn as sns


dtf = pd.DataFrame(results, columns=["heap size", "method", "duration (ms)"])

print(dtf)


sns.lineplot(

    data=dtf, 

    x="heap size", 

    y="duration (ms)", 

    hue="method",

)


结果

如您所见,使用_siftdown()和的“更新”方法_siftup()渐近地更快。


你应该知道你的代码做了什么,以及运行需要多长时间。如果有疑问,您应该检查一下。@cglaced 检查了执行需要多长时间,但他没有质疑应该需要多长时间。如果他这样做了,他会发现两者不匹配。而其他人则为之倾倒。


    heap size   method  duration (ms)

0         100   update       0.219107

1         100  heapify       0.412703

2         100     sort       0.242710

3        1000   update       0.198841

4        1000  heapify       2.947330

5        1000     sort       0.605345

6       10000   update       0.203848

7       10000  heapify      32.759190

8       10000     sort       4.621506

9      100000   update       0.348568

10     100000  heapify     327.646971

11     100000     sort      49.481153

12    1000000   update       0.256062

13    1000000  heapify    3475.244761

14    1000000     sort    1106.570005

//img1.sycdn.imooc.com//61bc4d7e000114b303940258.jpg

查看完整回答
反对 回复 2021-12-17
  • 2 回答
  • 0 关注
  • 276 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信