为了账号安全,请及时绑定邮箱和手机立即绑定

使用 SQLAlchemy 表达式时出现 Dask read_sql_table 错误

使用 SQLAlchemy 表达式时出现 Dask read_sql_table 错误

江户川乱折腾 2021-11-16 10:58:18
我正在尝试将 SQLAlchemy 表达式与 dask 的 read_sql_table 一起使用,以便关闭通过连接和过滤几个不同表创建的数据集。该文件表明,这应该是可能的。(下面的示例不包括任何连接,因为复制问题不需要它们。)我构建了我的连接字符串,创建了一个 SQLAlchemy 引擎和与我的数据库中的表相对应的表。(我正在使用 PostgreSQL。)import dask.dataframe as ddimport pandas as pdfrom sqlalchemy import create_enginefrom sqlalchemy import Column, MetaData, Tablefrom sqlalchemy.sql import  selectusername = 'username'password = 'password'server = 'prod'database = 'my_db'connection_string = f'postgresql+psycopg2://{username}:{password}@{server}/{database}'engine = create_engine(connection_string)metadata = MetaData()t = Table('my_table', metadata,    Column('id'),    schema='my_schema')我能够构建一个选择并将它与 SQLAlchemy 一起使用,没有问题>>> s = select([t]).limit(5)>>> rp = engine.execute(s)>>> rp.fetchall()[(3140757,), (3118225,), (3156070,), (3193075,), (3114614,)]我还可以将 SQLAlchey 选择提供给熊猫的 read_sql,它工作正常>>> pd.read_sql(s, connection_string)id0   31407571   31182252   31560703   31930754   3114614但是,当我将相同的选择传递给 dask 时,我收到了 ProgrammingError。它表明dask正在转身并调用pandas.read_sql,所以你会认为它应该工作,但显然不是。
查看完整描述

3 回答

?
白猪掌柜的

TA贡献1893条经验 获得超10个赞

正如 Chris 在不同的答案中所说,Dask 以某种形式包装您的查询SELECT columns FROM (yourquery),这对于 PostgreSQL 来说是无效的语法,因为它需要括号表达式的别名。无需重新实现整个read_sql_table方法,表达式可以简单地通过添加.alias('somename')到您的选择中来别名,即

select([t]).limit(5).alias('foo')

该表达式,当被 Dask 包装时,为 Postgres 生成正确的语法

SELECT columns FROM (yourquery) AS foo


查看完整回答
反对 回复 2021-11-16
?
慕婉清6462132

TA贡献1804条经验 获得超2个赞

对于遇到此问题的任何其他人。read_sql_table 似乎不支持这个用例(此时)。如果你传入一个 SQLAlchemy Select 对象,它最终会被包裹在另一个 SQLAlchemy Select 中并且没有别名,这是糟糕的 SQL(至少对于 PostgreSQL)。


从 dask 源查看 read_sql_table,table 是传递给 read_sql_table 的 Select 对象,正如所见,它被包装在另一个选择中。


q = sql.select(columns).where(sql.and_(index >= lower, cond)

                              ).select_from(table)

好消息是 read_sql_table 函数相对简单,而且魔术实际上只有几行从延迟对象创建数据帧。您只需要编写自己的逻辑即可将查询分成块


parts = []

for query_chunk in queries:

    parts.append(delayed(_read_sql_chunk)(q, uri, meta, **kwargs))


return from_delayed(parts, meta, divisions=divisions)



def _read_sql_chunk(q, uri, meta, **kwargs):

    df = pd.read_sql(q, uri, **kwargs)

    if df.empty:

        return meta

    else:

        return df.astype(meta.dtypes.to_dict(), copy=False)


查看完整回答
反对 回复 2021-11-16
?
跃然一笑

TA贡献1826条经验 获得超6个赞

该行发送的查询是由 SQLAlchemy 自动生成的,因此语法应该是正确的。但是,我注意到您的原始查询包含一个.limit()修饰符。该行的目的head =是获取前几行,以推断类型。如果原始查询已经有一个限制子句,我可以看到两者可能会发生冲突。请尝试使用不带 的查询.limit()


查看完整回答
反对 回复 2021-11-16
  • 3 回答
  • 0 关注
  • 250 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信