为了账号安全,请及时绑定邮箱和手机立即绑定

Apache Flink:Python 流 API 中的 Kafka 连接器,“无法加载用户类”

Apache Flink:Python 流 API 中的 Kafka 连接器,“无法加载用户类”

青春有我 2021-07-02 16:59:52
我正在尝试 Flink 的新 Python 流 API 并尝试使用./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py. python 脚本相当简单,我只是尝试从现有主题中使用并将所有内容发送到 stdout(或日志目录中的 *.out 文件,默认情况下输出方法在该文件中发出数据)。import globimport osimport sysfrom java.util import Propertiesfrom org.apache.flink.streaming.api.functions.source import SourceFunctionfrom org.apache.flink.streaming.api.collector.selector import OutputSelectorfrom org.apache.flink.api.common.serialization import SimpleStringSchemadirectories=['/home/user/flink/flink-1.6.1/lib']for directory in directories:    for jar in glob.glob(os.path.join(directory,'*.jar')):                sys.path.append(jar)from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer09props = Properties()config = {"bootstrap_servers": "localhost:9092",          "group_id": "flink_test",          "topics": ["TopicCategory-TopicName"]}props.setProperty("bootstrap.servers", config['bootstrap_servers'])props.setProperty("group_id", config['group_id'])props.setProperty("zookeeper.connect", "localhost:2181")def main(factory):    consumer = FlinkKafkaConsumer09([config["topics"]], SimpleStringSchema(), props)    env = factory.get_execution_environment()    env.add_java_source(consumer) \        .output()    env.execute()我从 Maven 存储库中抓取了一些 jar 文件,即flink-connector-kafka-0.9_2.11-1.6.1.jar,flink-connector-kafka-base_2.11-1.6.1.jar并将kafka-clients-0.9.0.1.jar它们复制到 Flink 的lib目录中。除非我误解了文档,否则这足以让 Flink 加载 kafka 连接器。确实,如果我删除这些 jar 中的任何一个,导入将失败,但这似乎不足以实际调用该计划。添加一个 for 循环来动态添加这些sys.path也不起作用。
查看完整描述

2 回答

?
九州编程

TA贡献1785条经验 获得超4个赞

您在这里使用了错误的 Kafka 消费者。在您的代码中,它是FlinkKafkaConsumer09,但您使用的库是flink-connector-kafka-0.11_2.11-1.6.1.jar,它用于FlinkKafkaConsumer011. 尝试FlinkKafkaConsumer09用 this替换FlinkKafkaConsumer011,或使用 lib 文件flink-connector-kafka-0.9_2.11-1.6.1.jar而不是当前文件。


查看完整回答
反对 回复 2021-07-13
?
心有法竹

TA贡献1866条经验 获得超5个赞

我guest jar 文件可能有内置的导入或依赖,所以三个jar 文件是不够的。至于如何找出java jar 依赖关系,那就是java maven 所做的。可以查看官网“项目构建设置”获取帮助。在我的例子中,我按照官方 Java 项目设置,使用“from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer”并将依赖项“org.apache.flink
flink-clients_2.11
1.8.0”添加到 pom.xml ,然后我现在可以使用 Python API 将 kafka 记录输出到标准输出。

查看完整回答
反对 回复 2021-07-13
  • 2 回答
  • 0 关注
  • 453 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信