我准备了一个 kafka 生产者,它将一个 List 放入 kafka 主题中。它适用于 100 万行/记录。我得到的生产文件包含 1.1 亿多条记录。 在我的 KafkaProducer 处理如此庞大的数据的最佳方法是什么?下面是代码,我曾经处理过 100 万条记录,将其放入 kafka 主题大约需要 4 分钟。import java.io.BufferedReader;import java.io.File;import java.io.FileInputStream;import java.io.IOException;import java.io.InputStreamReader;import java.io.RandomAccessFile;import java.nio.charset.StandardCharsets;import java.util.ArrayList;import java.util.List;import java.util.Map;import org.apache.kafka.connect.data.Schema;import org.apache.kafka.connect.data.SchemaBuilder;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.source.SourceRecord;import org.apache.kafka.connect.source.SourceTask;public class KafkaSourceTask extends SourceTask { private String filename; private String topic; private RandomAccessFile raf; private long lastRecordedOffset = 0L; private BufferedReader bufferedReader = null; Schema schema = SchemaBuilder.struct().field("emp_id", Schema.STRING_SCHEMA).field("name", Schema.STRING_SCHEMA) .field("last_name", Schema.STRING_SCHEMA).field("department", Schema.STRING_SCHEMA).build();public void start(Map<String, String> props) { filename = props.get("file"); topic = props.get("topic");}对此的任何帮助或建议将不胜感激。在此先感谢您。
2 回答

慕无忌1623718
TA贡献1744条经验 获得超4个赞
首先,在将 Kafka 生产者批处理记录发送给代理之前,您应该检查并使用这两个配置linger.ms
和batch.record.size
.
现在您可以使用另一个线程来读取文件(我认为它是每行一条记录)并将它们放入 java 队列中,并使用托管 kafka 生产者的线程连续读取该队列。
多个生产者被认为是一种反模式,尤其是在写入 Kafka 主题时,请查看 Single Writer Principle。
好吧,无论哪种方式,您都必须稍微调整一下您的 kafka 生产者,但就像@cricket_007 所说,您应该考虑使用带有文件 csv 连接器的 kafka 连接,至少如果您找不到适合您的连接器,您可以开发一个连接器自己。
希望这会有所帮助。

慕码人8056858
TA贡献1803条经验 获得超6个赞
拥有数十亿条记录的 ArrayList?想想看,如果你有 10 亿条记录,而每条记录的大小只有 1 个字节(一个可笑的低估),你就有 1 个 SI GB 的内存消耗。
根据“大数据”的粗略和现成的定义,作为不适合单个主机上内存的数据,你要么处于边缘,要么超过那个点,你需要开始使用大数据技术。首先你可以尝试多线程,然后你可以在多台机器上尝试多线程,这是使用 Kafka 的优势——客户端 API——无论是从消费端还是生产端,都可以轻松实现。
添加回答
举报
0/150
提交
取消