早上好,我开发了一个简单的归并排序算法,我想比较它在并行化和非并行化时的性能。首先,我生成一个数字列表来排序并检查合并排序对列表进行排序需要多长时间。我要做的下一件事是将数字列表传递给sc.parallelize()并转换为list,RDD然后将合并排序函数传递给mapPartitions()然后collect()。import randomimport timefrom pyspark import SparkContextdef execute_merge_sort(generated_list): start_time = time.time() sorted_list = merge_sort(generated_list) elapsed = time.time() - start_time print('Simple merge sort: %f sec' % elapsed) return sorted_listdef generate_list(length): N = length generated_list = [random.random() for num in range(N)] return generated_listdef merging(left_side, right_side): result = [] i = j = 0 while i < len(left_side) and j < len(right_side): if left_side[i] <= right_side[j]: result.append(left_side[i]) i += 1 else: result.append(right_side[j]) j += 1 if i == len(left_side): result.extend(right_side[j:]) else: result.extend(left_side[i:]) return resultdef merge_sort(generated_list): if len(generated_list) <= 1: return generated_list middle_value = len(generated_list) // 2 sorted_list = merging(merge_sort(generated_list[:middle_value]), merge_sort(generated_list[middle_value:])) return sorted_listdef is_sorted(num_array): for i in range(1, len(num_array)): if num_array[i] < num_array[i - 1]: return False return Truegenerate_list = generate_list(500000)sorted_list = execute_merge_sort(generate_list)sc = SparkContext()rdd = sc.parallelize(generate_list).mapPartitions(execute_merge_sort).collect()当我执行此操作时sc.parallelize(generate_list).mapPartitions(execute_merge_sort).collect(),出现以下错误:File "<ipython-input-15-1b7974b4fa56>", line 7, in execute_merge_sort File "<ipython-input-15-1b7974b4fa56>", line 36, in merge_sortTypeError: object of type 'itertools.chain' has no len()任何帮助,将不胜感激。提前致谢。
1 回答

当年话下
TA贡献1890条经验 获得超9个赞
我想出了如何解决TypeError: 'float' object is not iterable
.
这可以通过使用flatMap(lambda x: x)
和调用扁平化数据glom()
以包装列表并使其可由函数执行来解决execute_merge_sort
。通过执行以下行,返回的结果是一个包含排序列表的列表。
sc.parallelize(random_list_of_lists).flatMap(lambda x: x).glom().mapPartitions(execute_merge_sort_rdd).collect()
添加回答
举报
0/150
提交
取消