1 回答

TA贡献2021条经验 获得超8个赞
经过3天的挣扎,我把它整理出来了。主要问题是:
我没有正确地模拟 spark.read 进程签名,并且;
我没有正确实例化 Py4JJavaError 的模拟实例。
以下是我如何做到这两点:
.../utils/bigquery_util.py
import logging
from py4j.protocol import Py4JJavaError, Py4JNetworkError
def load_bq_table(spark, table, filter_string):
tries = 3
for i in range(tries):
try:
logging.info("SQL statement being executed...")
df = get_df(spark, table, filter_string)
logging.info("Table-ID: {}, Rows:{} x Cols:{}".format(table, df.count(), len(df.columns)))
logging.debug("Table-ID: {}, Schema: {}".format(table, df.schema))
return df
except Py4JJavaError as e:
java_exception_str = get_exception_msg(e)
is_runtime_exception = "java.lang.RuntimeException" in java_exception_str
table_not_found = ("{} not found".format(table)) in java_exception_str
if is_runtime_exception and table_not_found:
logging.error(java_exception_str)
raise RuntimeError("Table {} not found!".format(table))
except Py4JNetworkError as ne:
if i is tries-1:
java_exception_str = ne.cause
runtime_error_str = "Error while trying to reach server... {}"
logging.error(java_exception_str)
raise EnvironmentError(runtime_error_str.format(java_exception_str))
continue
def get_exception_msg(e):
return str(e.java_exception)
def get_df(spark, table, filter_string):
return (spark.read
.format('bigquery')
.option('table', table)
.option('filter', filter_string)
.load())
至于测试:.../test/utils/test_bigquery_util.py
import pytest
from mock import patch, mock
from <...>.utils.bigquery_util import load_bq_table
from <...>.test.utils.mock_py4jerror import *
def test_runtime_error_exception_is_thrown_if_table_not_present():
# Given
mock_table_name = 'spark_bq_test.false_table_name'
mock_filter = "word is 'V'"
# Mocking
py4j_error_exception = get_mock_py4j_error_exception(get_mock_gateway_client(), mock_target_id="o123")
mock_errmsg = "java.lang.RuntimeException: Table {} not found".format(mock_table_name)
# When
with mock.patch('red_agent.common.utils.bigquery_util.get_exception_msg', return_value=mock_errmsg):
with mock.patch('red_agent.common.utils.bigquery_util.get_df', side_effect=py4j_error_exception):
with pytest.raises(RuntimeError):
mock_spark = mock.Mock()
df = load_bq_table(mock_spark, mock_table_name, mock_filter)
..最后用于模拟 Py4JJavaError: .../test/utils/mock_py4jerror.py
import mock
from py4j.protocol import Py4JJavaError, Py4JNetworkError
def get_mock_gateway_client():
mock_client = mock.Mock()
mock_client.send_command.return_value = "0"
mock_client.converters = []
mock_client.is_connected.return_value = True
mock_client.deque = mock.Mock()
return mock_client
def get_mock_java_object(mock_client, mock_target_id):
mock_java_object = mock.Mock()
mock_java_object._target_id = mock_target_id
mock_java_object._gateway_client = mock_client
return mock_java_object
def get_mock_py4j_error_exception(mock_client, mock_target_id):
mock_java_object = get_mock_java_object(mock_client, mock_target_id)
mock_errmsg = "An error occurred while calling {}.load.".format(mock_target_id)
return Py4JJavaError(mock_errmsg, java_exception=mock_java_object)
def get_mock_py4j_network_exception(mock_target_id):
mock_errmsg = "An error occurred while calling {}.load.".format(mock_target_id)
return Py4JNetworkError(mock_errmsg)
希望这会帮助某人...
添加回答
举报