为了账号安全,请及时绑定邮箱和手机立即绑定

PySpark 模拟:异常测试成功但未处理异常

PySpark 模拟:异常测试成功但未处理异常

梵蒂冈之花 2022-06-22 18:23:53
我正在使用python 2.7(不要问我为什么,我是承包商,我只是使用他们给我的东西)。我正在尝试实现一个pyspark函数,该函数利用spark-bigquery 连接器提交一个利用 Spark SQL 数据源 API 的简单查询。我正在经历最奇怪的事情;我编写了这个函数,并在我实际运行它时确认它确实适用于服务器。我想确保如果用户提供了一个不存在的表名,则会根据处理服务器返回的表名引发异常,我确实做到了(我知道这不是 TDD,但就这样吧)。然后我开始为它编写测试,显然我必须生成一个模拟异常,我做了如下操作:模块/query_bqfrom py4j.protocol import Py4JJavaErrorfrom pyspark.sql import SparkSessiondef submit_bq_query(spark, table, filter_string):    try:        df = spark.read.format('bigquery').option('table', table).option('filter', filter_string).load()        return df    except Py4JJavaError as e:        java_error_msg = str(e).split('\n')[1]        if "java.lang.RuntimeException" in java_error_msg and ("{} not found".format(table)) in java_error_msg:            raise Exception("RuntimeException: Table {} not found!".format(table))正如我所说,这就像一个魅力。现在,它的测试看起来像这样:模块/test_query_bqimport pytestfrom mock import patch, mockfrom py4j.java_gateway import GatewayProperty, GatewayClient, JavaObjectfrom py4j.protocol import Py4JJavaErrorfrom pyspark.sql.dataframe import DataFramefrom pyspark.sql.types import StructTypedef mock_p4j_java_error_generator(msg):    gateway_property = GatewayProperty(auto_field="Mock", pool="Mock")    client = GatewayClient(gateway_property=gateway_property)    java_object = JavaObject("RunTimeError", client)    exception = Py4JJavaError(msg, java_exception=java_object)    return Exception(exception)def test_exception_is_thrown_if_table_not_present():    # Given    mock_table_name = 'spark_bq_test.false_table_name'    mock_filter = "word is 'V'"    mock_errmsg = "Table {} not found".format(mock_table_name)运行测试成功,但是当我尝试调试它时,只是为了跟踪执行,我注意到捕获到异常之后的代码:永远达不到。尽管如此,测试仍然成功。简而言之,在测试中应该模拟和抛出异常。它也被捕获,但未处理。测试的断言通过并且测试成功,就好像它没有被处理一样,但我从来没有检查过模拟异常的内部。再一次,让我注意到module/query_bq在服务器上工作得很好;当表不存在时,返回dataframes并处理异常就好了!这里的重点是测试。我需要对module/query_bq中的异常处理部分做额外的事情,但我不能,因为我不知道发生了什么。谁能解释一下?
查看完整描述

1 回答

?
宝慕林4294392

TA贡献2021条经验 获得超8个赞

经过3天的挣扎,我把它整理出来了。主要问题是:

  • 我没有正确地模拟 spark.read 进程签名,并且;

  • 我没有正确实例化 Py4JJavaError 的模拟实例。

以下是我如何做到这两点:

.../utils/bigquery_util.py

import logging


from py4j.protocol import Py4JJavaError, Py4JNetworkError



def load_bq_table(spark, table, filter_string):

    tries = 3

    for i in range(tries):

        try:

            logging.info("SQL statement being executed...")

            df = get_df(spark, table, filter_string)

            logging.info("Table-ID: {}, Rows:{} x Cols:{}".format(table, df.count(), len(df.columns)))

            logging.debug("Table-ID: {}, Schema: {}".format(table, df.schema))

            return df

        except Py4JJavaError as e:

            java_exception_str = get_exception_msg(e)

            is_runtime_exception = "java.lang.RuntimeException" in java_exception_str

            table_not_found = ("{} not found".format(table)) in java_exception_str

            if is_runtime_exception and table_not_found:

                logging.error(java_exception_str)

                raise RuntimeError("Table {} not found!".format(table))

        except Py4JNetworkError as ne:

            if i is tries-1:

                java_exception_str = ne.cause

                runtime_error_str = "Error while trying to reach server... {}"

                logging.error(java_exception_str)

                raise EnvironmentError(runtime_error_str.format(java_exception_str))

            continue



def get_exception_msg(e):

    return str(e.java_exception)



def get_df(spark, table, filter_string):

    return (spark.read

            .format('bigquery')

            .option('table', table)

            .option('filter', filter_string)

            .load())

至于测试:.../test/utils/test_bigquery_util.py


    import pytest

    from mock import patch, mock


    from <...>.utils.bigquery_util import load_bq_table

    from <...>.test.utils.mock_py4jerror import *


    def test_runtime_error_exception_is_thrown_if_table_not_present():


        # Given

        mock_table_name = 'spark_bq_test.false_table_name'

        mock_filter = "word is 'V'"


        # Mocking

        py4j_error_exception = get_mock_py4j_error_exception(get_mock_gateway_client(), mock_target_id="o123")

        mock_errmsg = "java.lang.RuntimeException: Table {} not found".format(mock_table_name)


        # When

        with mock.patch('red_agent.common.utils.bigquery_util.get_exception_msg', return_value=mock_errmsg):

            with mock.patch('red_agent.common.utils.bigquery_util.get_df', side_effect=py4j_error_exception):

                with pytest.raises(RuntimeError):

                    mock_spark = mock.Mock()

                    df = load_bq_table(mock_spark, mock_table_name, mock_filter)

..最后用于模拟 Py4JJavaError: .../test/utils/mock_py4jerror.py


import mock

from py4j.protocol import Py4JJavaError, Py4JNetworkError



def get_mock_gateway_client():

    mock_client = mock.Mock()

    mock_client.send_command.return_value = "0"

    mock_client.converters = []

    mock_client.is_connected.return_value = True

    mock_client.deque = mock.Mock()

    return mock_client



def get_mock_java_object(mock_client, mock_target_id):

    mock_java_object = mock.Mock()

    mock_java_object._target_id = mock_target_id

    mock_java_object._gateway_client = mock_client

    return mock_java_object



def get_mock_py4j_error_exception(mock_client, mock_target_id):

    mock_java_object = get_mock_java_object(mock_client, mock_target_id)

    mock_errmsg = "An error occurred while calling {}.load.".format(mock_target_id)

    return Py4JJavaError(mock_errmsg, java_exception=mock_java_object)



def get_mock_py4j_network_exception(mock_target_id):

    mock_errmsg = "An error occurred while calling {}.load.".format(mock_target_id)

    return Py4JNetworkError(mock_errmsg)

希望这会帮助某人...


查看完整回答
反对 回复 2022-06-22
  • 1 回答
  • 0 关注
  • 118 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号