我正在为 kafka 使用 hbase sink 连接器(https://github.com/mravi/kafka-connect-hbase)。所以我尝试在事件解析器类中使用它的 JsonConverter 来实现这个连接器,如下所示。{ "name": "test-hbase", "config": { "connector.class": "io.svectors.hbase.sink.HBaseSinkConnector", "tasks.max": "1", "topics": "hbase_connect", "zookeeper.quorum": "xxxxx.xxxx.xx.xx,xxxxx.xxxx.xx.xx,xxxxx.xxxx.xx.xx", "event.parser.class": "io.svectors.hbase.parser.JsonEventParser", "hbase.hbase_connect.rowkey.columns": "id", "hbase.hbase_connect.family": "col1", }}这是我运行的 kafka connect 分布式属性:key.converter=org.apache.kafka.connect.storage.StringConvertervalue.converter=org.apache.kafka.connect.json.JsonConverterkey.converter.schemas.enable=falsevalue.converter.schemas.enable=falseinternal.key.converter=org.apache.kafka.connect.json.JsonConverterinternal.value.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter.schemas.enable=falseinternal.value.converter.schemas.enable=false问题是当我尝试生成没有模式的 JSON 消息时,连接器抛出null pointer exception如下: [2018-12-10 16:45:06,607] ERROR WorkerSinkTask{id=hbase_connect-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:515) java.lang.NullPointerExceptionat io.svectors.hbase.util.ToPutFunction.apply(ToPutFunction.java:78)at io.svectors.hbase.sink.HBaseSinkTask.lambda$4(HBaseSinkTask.java:105) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)这是我使用的消息:{"id": "9","name": "wis"}对这个错误有什么建议吗?
1 回答
慕工程0101907
TA贡献1887条经验 获得超5个赞
添加回答
举报
0/150
提交
取消