2 回答
TA贡献1783条经验 获得超4个赞
下面是一个示例,说明如何测试抛出异常的 PySpark 函数。在此示例中,我们将验证如果排序顺序为 则抛出异常"cats"。
def it_throws_an_error_if_the_sort_order_is_invalid(spark):
source_df = spark.create_df(
[
("jose", "oak", "switch"),
("li", "redwood", "xbox"),
("luisa", "maple", "ps4"),
],
[
("name", StringType(), True),
("tree", StringType(), True),
("gaming_system", StringType(), True),
]
)
with pytest.raises(ValueError) as excinfo:
quinn.sort_columns(source_df, "cats")
assert excinfo.value.args[0] == "['asc', 'desc'] are the only valid sort orders and you entered a sort order of 'cats'"
请注意,该测试正在验证所提供的特定错误消息。
您可以向您的rename_columnsName
函数提供无效输入并验证错误消息是否符合您的预期。
TA贡献1735条经验 获得超5个赞
我找到了这个问题的解决方案,我们可以像 python 一样在 Pyspark 中处理异常。例如:
def rename_columnsName(df, columns):#provide names in dictionary format
try:
if isinstance(columns, dict):
for old_name, new_name in columns.items():
df = df.withColumnRenamed(old_name, new_name)
return df.show()
else:
raise ValueError("'columns' should be a dict, like {'old_name':'new_name',
'old_name_one more':'new_name_1'}")
except Exception as e:
print(e)
添加回答
举报