Read.javapublic class Read { public static void main(String[] args) { String conn = "db_url"; String username = "*****"; String pwd = "*****"; String sql = "INSERT INTO table (coloumn) values (?)"; Properties props = new Properties(); props.put("bootstrap.servers", "10.247.36.174:3306"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); rops.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = null; try (Connection con = DriverManager.getConnection(conn, username, pwd); PreparedStatement ps = con.prepareStatement(sql); BufferedReader br = new BufferedReader(new FileReader("All.log"));) { String line = null; processMessages(line, br, listparam, ps); break; } } catch (Exception ex) { System.err.println("Error in \n" + ex); } finally { producer = new KafkaProducer<>(props); String msg = "Done"; producer.send(new ProducerRecord<String, String>("HelloKafka", msg)); System.out.println("Sent: " + msg); } producer.close(); }我是卡夫卡的新手。有人可以告诉我哪一部分做错了吗?我不知道如何在java中使用Kafka。在 ReadLg.java 中,我想从日志文件中读取并将其插入到数据库中,然后在完成后我想向 RetrieveData.java 发送一条消息,以便它可以启动。检索数据将运行,但空闲等待来自 ReadLg.java 的消息。这是一个不好的方法吗?还是旧方法?有什么建议或帮助解决这个问题吗?我不断收到错误“无法连接到 IP 地址”,
1 回答

MYYA
TA贡献1868条经验 获得超4个赞
总结:要使用 Kafka Consumer/Producer,首先必须启动 Zookeeper 和 Kafka Broker。
为了测试或开发目的,您可以自己使用以下命令启动它:
如果您的 Kafka 已准备就绪,您就可以开始使用它了。您必须记住设置正确的值bootstrap.server
(对于本地使用通常是localhost:9092
)
添加回答
举报
0/150
提交
取消