2 回答
TA贡献1803条经验 获得超3个赞
是的,可以使用 Spark 监听 TCP 端口并处理任何传入数据。您正在寻找的是Spark Streaming。
为了方便:
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// Split each line into words
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();
jssc.start(); // Start the computation
jssc.awaitTermination(); // Wait for the computation to terminate
TA贡献1813条经验 获得超2个赞
Spark没有内置的TCP服务器来等待生产者和缓冲数据。Spark 通过其 API 库在 TCP、Kafka 等的轮询机制上工作。要使用传入的 TCP 数据,您需要有一个 Spark 可以连接到的外部 TCP 服务器,如 Shaido 在示例中所解释的那样。
添加回答
举报