2 回答
TA贡献1995条经验 获得超2个赞
将 csv 文件读取为文本并将值拆分,并计算元素。
df = spark.read.text('test.csv')
df.show(10, False)
+-------------------------------+
|value |
+-------------------------------+
|Col1,Col2,Col3,Col4 |
|Value11,Value12,Value13,Value14|
|Value21,Value22,Value23,Value24|
+-------------------------------+
import pyspark.sql.functions as F
df2 = df.withColumn('count', F.size(F.split('value', ',')))
df2.show(10, False)
+-------------------------------+-----+
|value |count|
+-------------------------------+-----+
|Col1,Col2,Col3,Col4 |4 |
|Value11,Value12,Value13,Value14|4 |
|Value21,Value22,Value23,Value24|4 |
+-------------------------------+-----+
df2.groupBy().agg(F.min('count'), F.max('count')).show(10, False)
+----------+----------+
|min(count)|max(count)|
+----------+----------+
|4 |4 |
+----------+----------+
TA贡献1808条经验 获得超4个赞
由于您想知道错误的行,因此唯一的方法就是循环:
In [18]: erroneous_lines = []
In [19]: with open(r'C:\Users\abaskaran\Desktop\mycsv.txt') as fd:
...: for line_num, line in enumerate(fd,1):
...: if len(line.split(',')) != 4:
...: erroneous_lines.append((line_num, line))
In [20]: erroneous_lines
Out[20]:
[(5, 'Value21,Value22,Value23,Value24Value11,Value12,Value13,Value14\n'),
(6, 'Value21,Value22,Value23\n')]
该erroneous_lines列表将包含一个元组列表,包含行号和行的实际内容,但不包含所有值。
我将 CSV 内容修改为 belowj 只是为了测试:
Col1,Col2,Col3,Col4
Value11,Value12,Value13,Value14
Value21,Value22,Value23,Value24
Value11,Value12,Value13,Value14
Value21,Value22,Value23,Value24Value11,Value12,Value13,Value14
Value21,Value22,Value23
Value11,Value12,Value13,Value14
Value21,Value22,Value23,Value24
Value11,Value12,Value13,Value14
Value21,Value22,Value23,Value24
添加回答
举报