为了账号安全,请及时绑定邮箱和手机立即绑定

PySpark 的 DataFrame.show() 运行缓慢

PySpark 的 DataFrame.show() 运行缓慢

烙印99 2022-07-26 20:55:52
新手在这里,我通过 JDBC 从 PySpark 中的 MySQL 读取了一个表(大约 200 万行)作为 Spark 的 DataFrame,并尝试显示前 10 行:from pyspark.sql import SparkSessionspark_session = SparkSession.builder.master("local[4]").appName("test_log_processing").getOrCreate()url = "jdbc:mysql://localhost:3306"table = "test.fakelog"properties = {"user": "myUser", "password": "********"}df = spark_session.read.jdbc(url, table, properties=properties)df.cache()df.show(10)  # can't get the printed results, and runs pretty slow and consumes 90%+ CPU resourcesspark_session.stop()这是控制台日志:Using Spark's default log4j profile: org/apache/spark/log4j-defaults.propertiesSetting default log level to "WARN".To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).[Stage 0:>                                                          (0 + 1) / 1]我的教育背景是统计学,最近刚开始学习 Spark,所以我不知道代码背后发生了什么(对于较小的数据集,这很好用),我应该如何解决这个问题?或者我应该了解更多关于 Spark 的知识?
查看完整描述

2 回答

?
慕后森

TA贡献1802条经验 获得超5个赞

由于您spark.read.jdbc为某个表调用,spark 将尝试将整个表从数据库收集到 spark 中。之后,spark 缓存数据并从缓存中打印 10 个结果。如果您运行以下代码,您会注意到一些差异。


spark_session = SparkSession.builder.master("local[4]").appName("test_log_processing").getOrCreate()

url = "jdbc:mysql://localhost:3306"

table = "(SELECT * FROM test.fakelog LIMIT 10) temp"

properties = {"user": "myUser", "password": "********"}

df = spark_session.read.jdbc(url, table, properties=properties)

df.cache()

df.show()

spark_session.stop()


查看完整回答
反对 回复 2022-07-26
?
慕容森

TA贡献1853条经验 获得超18个赞

  • 也许您的内存缓存已被填满,缓存的默认值曾经只是内存(较旧的 spark 版本)。

  • 因此,您可以尝试使用 df.persist(StorageLevel.MEMORY_AND_DISK) 代替缓存。当内存太满时,它会溢出到磁盘。

  • 试试 .take(10),它会给出行的集合,它可能不会更快,但值得一试

  • 尝试 df.coalesce(50).persist(StorageLevel.MEMORY_AND_DISK),如果您有过度分区的数据帧,则无需洗牌即可正常工作

  • 如果这些都不起作用,则可能意味着您的计算集群无法处理此负载,您可能需要向外扩展。


查看完整回答
反对 回复 2022-07-26
  • 2 回答
  • 0 关注
  • 131 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信