我试图了解 Confluent Kafka 上的 Avro 序列化以及模式注册表的使用。直到最后一切都很顺利,但 AVRO 的最终期望让我感到很困惑。根据我的阅读和理解,Avro Serialization 为我们提供了灵活性,当我们更改架构时,我们可以简单地进行管理,而不会影响旧的生产者/消费者。同样,我开发了一个 python 生成器,它将检查 Schema-Registry 中的 Schema 是否存在,如果不存在,则创建它并开始生成如下所示的 json 消息。当我需要更改架构时,我只需在我的生产者中更新它,这会生成具有新架构的消息。我的旧架构:data = '{"schema":"{\\"type\\":\\"record\\",\\"name\\":\\"value\\",\\"namespace\\":\\"my.test\\",\\"fields\\":[{\\"name\\":\\"fname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"lname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"email\\",\\"type\\":\\"string\\"},{\\"name\\":\\"principal\\",\\"type\\":\\"string\\"},{\\"name\\":\\"ipaddress\\",\\"type\\":\\"string\\"},{\\"name\\":\\"mobile\\",\\"type\\":\\"long\\"},{\\"name\\":\\"passport_make_date\\",\\"type\\":[\\"string\\",\\"null\\"],\\"logicalType\\":\\"timestamp\\",\\"default\\":\\"None\\"},{\\"name\\":\\"passport_expiry_date\\",\\"type\\":\\"string\\",\\"logicalType\\":\\"date\\"}]}"}'Producer-1 的示例数据:{u'mobile': 9819841242, u'lname': u'Rogers', u'passport_expiry_date': u'2026-05-21', u'passport_make_date': u'2016-05-21', u'fname': u'tom', u'ipaddress': u'208.103.236.60', u'email': u'tom_Rogers@TEST.co.nz', u'principal': u'tom@EXAMPLE.COM'}我的新架构:data = '{"schema":"{\\"type\\":\\"record\\",\\"name\\":\\"value\\",\\"namespace\\":\\"my.test\\",\\"fields\\":[{\\"name\\":\\"fname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"lname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"email\\",\\"type\\":\\"string\\"},{\\"name\\":\\"principal\\",\\"type\\":\\"string\\"},{\\"name\\":\\"ipaddress\\",\\"type\\":\\"string\\"},{\\"name\\":\\"mobile\\",\\"type\\":\\"long\\"},{\\"name\\":\\"new_passport_make_date\\",\\"type\\":[\\"string\\",\\"null\\"],\\"logicalType\\":\\"timestamp\\",\\"default\\":\\"None\\"},{\\"name\\":\\"new_passport_expiry_date\\",\\"type\\":\\"string\\",\\"logicalType\\":\\"date\\"}]}"}'
1 回答
杨魅力
TA贡献1811条经验 获得超6个赞
目前尚不清楚您在注册表上使用的兼容性设置,但我将向后假设,这意味着您需要添加一个具有默认值的字段。
听起来您正在使用 Python,KeyError因为这些键不存在。
相反msg.value()["non-existing-key"],您可以尝试
选项 1:将其视为 dict()
msg.value().get("non-existing-key", "Default value")
选项 2:单独检查所有可能不存在的键
some_var = None # What you want to parse
val = msg.value()
if "non-existing-key" not in val:
some_var = "Default Value"
否则,您必须在旧数据上“投影”新模式,这是 Java 代码通过使用SpecificRecord子类所做的。这样,较旧的数据将使用较新的架构进行解析,该架构具有带有默认值的较新字段。
如果你GenericRecord在 Java 中使用,你会遇到类似的问题。我不确定在 Python 中是否有与 Java 的SpecificRecord.
顺便说一句,我不认为字符串"None"可以申请logicalType=timestamp
添加回答
举报
0/150
提交
取消