3 回答

TA贡献1893条经验 获得超10个赞
正如 Chris 在不同的答案中所说,Dask 以某种形式包装您的查询SELECT columns FROM (yourquery)
,这对于 PostgreSQL 来说是无效的语法,因为它需要括号表达式的别名。无需重新实现整个read_sql_table
方法,表达式可以简单地通过添加.alias('somename')
到您的选择中来别名,即
select([t]).limit(5).alias('foo')
该表达式,当被 Dask 包装时,为 Postgres 生成正确的语法
SELECT columns FROM (yourquery) AS foo

TA贡献1804条经验 获得超2个赞
对于遇到此问题的任何其他人。read_sql_table 似乎不支持这个用例(此时)。如果你传入一个 SQLAlchemy Select 对象,它最终会被包裹在另一个 SQLAlchemy Select 中并且没有别名,这是糟糕的 SQL(至少对于 PostgreSQL)。
从 dask 源查看 read_sql_table,table 是传递给 read_sql_table 的 Select 对象,正如所见,它被包装在另一个选择中。
q = sql.select(columns).where(sql.and_(index >= lower, cond)
).select_from(table)
好消息是 read_sql_table 函数相对简单,而且魔术实际上只有几行从延迟对象创建数据帧。您只需要编写自己的逻辑即可将查询分成块
parts = []
for query_chunk in queries:
parts.append(delayed(_read_sql_chunk)(q, uri, meta, **kwargs))
return from_delayed(parts, meta, divisions=divisions)
def _read_sql_chunk(q, uri, meta, **kwargs):
df = pd.read_sql(q, uri, **kwargs)
if df.empty:
return meta
else:
return df.astype(meta.dtypes.to_dict(), copy=False)

TA贡献1826条经验 获得超6个赞
该行发送的查询是由 SQLAlchemy 自动生成的,因此语法应该是正确的。但是,我注意到您的原始查询包含一个.limit()
修饰符。该行的目的head =
是获取前几行,以推断类型。如果原始查询已经有一个限制子句,我可以看到两者可能会发生冲突。请尝试使用不带 的查询.limit()
。
添加回答
举报