为了账号安全,请及时绑定邮箱和手机立即绑定

如何将 PySpark 数据框插入到具有雪花模式的数据库中?

如何将 PySpark 数据框插入到具有雪花模式的数据库中?

沧海一幻觉 2023-01-04 10:23:43
使用 PySpark 我正在计算一个数据框,如果这个数据库有一个雪花模式,我如何将这个数据框附加到我的数据库中?如何指定拆分数据框的方式,以便将类似 CSV 的数据放入多个联合表中?我的问题不是特定于 Pyspark 的,同样的问题也可以问到 pandas。
查看完整描述

2 回答

?
有只小跳蛙

TA贡献1824条经验 获得超8个赞

将从 CSV 中提取的数据帧附加到由雪花模式组成的数据库:

  1. 从雪花模式中提取数据。

  2. 从外部数据源中提取新数据。

  3. 合并两个数据集。

  4. 将组合转换为一组维度表和事实表以匹配雪花模式。

  5. 将转换后的数据帧加载到数据库中,覆盖现有数据。

例如,对于具有以下模式的数据框,从外部源中提取:

StructType([StructField('customer_name', StringType()),

            StructField('campaign_name', StringType())])

def entrypoint(spark: SparkSession) -> None:

  extracted_customer_campaigns = extract_from_external_source(spark)


  existing_customers_dim, existing_campaigns_dim, existing_facts = (

    extract_from_snowflake(spark))


  combined_customer_campaigns = combine(existing_campaigns_dim,

                                        existing_customers_dim,

                                        existing_facts,

                                        extracted_customer_campaigns)


  new_campaigns_dim, new_customers_dim, new_facts = transform_to_snowflake(

    combined_customer_campaigns)


  load_snowflake(new_campaigns_dim, new_customers_dim, new_facts)



def combine(campaigns_dimension: DataFrame,

            customers_dimension: DataFrame,

            facts: DataFrame,

            extracted_customer_campaigns: DataFrame) -> DataFrame:

  existing_customer_campaigns = facts.join(

    customers_dimension,

    on=['customer_id']).join(

    campaigns_dimension, on=['campaign_id']).select('customer_name',

                                                    'campaign_name')


  combined_customer_campaigns = extracted_customer_campaigns.union(

    existing_customer_campaigns).distinct()


  return combined_customer_campaigns



def transform_to_snowflake(customer_campaigns: DataFrame) -> (

    DataFrame, DataFrame):

  customers_dim = customer_campaigns.select(

    'customer_name').distinct().withColumn(

    'customer_id', monotonically_increasing_id())


  campaigns_dim = customer_campaigns.select(

    'campaign_name').distinct().withColumn(

    'campaign_id', monotonically_increasing_id())


  facts = (

    customer_campaigns.join(customers_dim,

                            on=['customer_name']).join(

      campaigns_dim, on=[

        'campaign_name']).select('customer_id', 'campaign_id'))


  return campaigns_dim, customers_dim, facts


这是一种简单的功能方法。也许可以通过编写增量来优化,而不是为每个 ETL 批次重新生成雪花键。


此外,如果提供了一个单独的外部 CSV 包含要删除的记录,则可以类似地提取它,然后在转换之前从组合数据框中减去,以删除那些现有记录。


最后,问题仅涉及附加到表格。如果需要合并/更新插入,则需要手动添加其他步骤,因为Spark 本身不支持它。


查看完整回答
反对 回复 2023-01-04
?
跃然一笑

TA贡献1826条经验 获得超6个赞

你可以像我在下面的代码中描述的那样做一些事情。我假设您的 csv 具有与 df4 上定义的类似结构。但我认为您可能没有 customer_id、product_id 及其组的 ID。如果是这种情况,您可以使用该 row_number 窗口函数(具有序列号)来计算它们,或者使用如图所示的 monotonically_increasing_id 函数来创建 df5


这个解决方案主要是基于PySpark和SQL,所以如果你对传统的DW比较熟悉,就会更好理解。


from pyspark.sql.functions import monotonically_increasing_id



#Creates input data. Only to rows to show how it should work

#The schema is defined on the single dataframe as 

# customer_id --> business key coming from transactional system

# customer_name --> just an attribute to show how it should behave

# customer_group_id --> an id that would match the group_id on the snowflake schema, as the idea is to group customers on groups (just as a sample)

# product_id --> another future dimension on the model having a snowflake schema

# product_group_id --> group id for products to group them on categories

df1 = spark.sql("""select 1 customer_id, 'test1' customer_name, 1 customer_group_id, 'group 1' customer_group_name, 

        1 product_id, 'product 1' product_name, 1 product_group_id, 'product group 1' product_group_name,

        987.5 sales

        """)


df2 = spark.sql("""select 2 customer_id, 'test2' customer_name, 1 customer_group_id, 'group 1' customer_group_name, 

        7 product_id, 'product 7' product_name, 1 product_group_id, 'product group 1' product_group_name,

        12345.5 sales

        """)


df3 = spark.sql("""select 2 customer_id, 'test2' customer_name, 1 customer_group_id, 'group 1' customer_group_name, 

        1 product_id, 'product 1' product_name, 1 product_group_id, 'product group 1' product_group_name,

        2387.3 sales

        """)


df4 = df1.union(df2).union(df3)


# Added an id on the df to be able to calculate the rest of the surrogate keys for dimensions

df5 = df4.withColumn("id",  monotonically_increasing_id())


# Registered dataframe to be able to query using SQL

df5.createOrReplaceTempView("df")


# Now create different dfs as the structure of the DW schema is

customer_group_df = spark.sql("""select customer_group_id, customer_group_name

            from df group by customer_group_id, customer_group_name""")


# I use the row_number because the monotonically increasing id function

# returns non sequential integers, but if you are good with that, it will be much faster

# Also another solution could be to use uuid as key (or other unique identifier providers)

# but that will depend on your requirements

customer_df = spark.sql("""select row_number() over (order by customer_id, customer_name, customer_group_id) as surkey_customer, customer_id customer_bk, 

            customer_name, customer_group_id

            from df group by customer_id, customer_name, customer_group_id """)


product_group_df =  spark.sql("""select product_group_id, product_group_name

            from df group by product_group_id, product_group_name""")


product_df =  spark.sql("""select row_number() over (order by product_id) as surkey_product, product_id product_bk, 

            product_name, product_group_id

            from df group by product_id, product_name, product_group_id""")


customer_df.show()

product_df.show()

df5.show()


# You can save those dfs directly on your model in the RBMS. Sorry as you are not defining the target DB I am not writing the code, 

# but should be done calling the save method of the dataframe pointing to Hive or to a JDBC where your DW model is

# You can find more info at https://stackoverflow.com/questions/30664008/how-to-save-dataframe-directly-to-hive or if 

# the target is a RDBMS https://stackoverflow.com/questions/46552161/write-dataframe-to-mysql-table-using-pyspark


# Now the tricky part is to calculate the surrogate keys of the fact table. The way to do it is to join back those df

# to the original dataframe. That can have performance issues, so please make sure that your data is 

# properly distributed (find the best approach to redistribute your dataframes on the nodes so that you reduce shuffling on the joins) 

# when you run 


customer_df.createOrReplaceTempView("customer_df")

product_df.createOrReplaceTempView("product_df")


fact_df = spark.sql("""

    select nvl(c.surkey_customer, -1) sk_customer, nvl(p.surkey_product, -1) sk_product, sales

    from

        df d left outer join customer_df c on d.customer_id = c.customer_bk   

            left outer join product_df p on d.product_id = p.product_bk

""").show()


# You can write the fact_df to your target fact table

# Be aware that to populate surrogate keys I am using nvl to assign the unknown member on the dimension. If you need

# that it also has to be present on the dimension table (customer and product, not group tables)

如您所见,此解决方案使用简单的雪花模式。但是,如果您有 Slowly Changing Dimensions Type 2 或其他类型的维度建模,模型可能会更复杂


该代码的输出是


+---------------+-----------+-------------+-----------------+

|surkey_customer|customer_bk|customer_name|customer_group_id|

+---------------+-----------+-------------+-----------------+

|              1|          1|        test1|                1|

|              2|          2|        test2|                1|

+---------------+-----------+-------------+-----------------+


+--------------+----------+------------+----------------+

|surkey_product|product_bk|product_name|product_group_id|

+--------------+----------+------------+----------------+

|             1|         1|   product 1|               1|

|             2|         7|   product 7|               1|

+--------------+----------+------------+----------------+


+-----------+-------------+-----------------+-------------------+----------+------------+----------------+------------------+-------+-----------+

|customer_id|customer_name|customer_group_id|customer_group_name|product_id|product_name|product_group_id|product_group_name|  sales|         id|

+-----------+-------------+-----------------+-------------------+----------+------------+----------------+------------------+-------+-----------+

|          1|        test1|                1|            group 1|         1|   product 1|               1|   product group 1|  987.5|          0|

|          2|        test2|                1|            group 1|         7|   product 7|               1|   product group 1|12345.5| 8589934592|

|          2|        test2|                1|            group 1|         1|   product 1|               1|   product group 1| 2387.3|17179869184|

+-----------+-------------+-----------------+-------------------+----------+------------+----------------+------------------+-------+-----------+


+-----------+----------+-------+

|sk_customer|sk_product|  sales|

+-----------+----------+-------+

|          1|         1|  987.5|

|          2|         2|12345.5|

|          2|         1| 2387.3|

+-----------+----------+-------+

希望这可以帮助


查看完整回答
反对 回复 2023-01-04
  • 2 回答
  • 0 关注
  • 86 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信