测试官方例子

进入 kafka 安装目录, 执行以下命令创建一个 生产者

./bin/kafka-console-producer.sh --broker-list 192.168.0.208:9092 --topic spark-test

同样, 进入 spark 目录, 执行以下命令, 创建消费者

./bin/run-example --master yarn --jars examples/jars/spark-examples_2.11-2.4.0.jar,external_jars/spark-streaming-kafka-0-10-assembly_2.11-2.4.0.jar streaming.JavaDirectKafkaWordCount 192.168.0.208:9092 spark spark-test

这里有几点需要注意的是:

启动之后就可以在生产者出输出字符串, 然后会在消费者处被 wordcount 程序处理.

测试pyspark

官方pyspark 的例子使用的是 0.8 的旧接口, 需要新编译另一个 jar 包

进入 external/kafka-0-8-assembly 编译 jar包,执行如下命令提交任务

./bin/spark-submit --master yarn --jars external_jars/spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar examples/src/main/python/streaming/direct_kafka_wordcount.py 192.168.0.208:9092 spark-test

Receiver-based Approach

这个方法使用了 Receivers 来接收数据。Receivers 的实现使用到 Kafka 高级消费者 API。对于所有的 Receivers,接收到的数据将会保存在 Spark executors 中,然后由 SS 启动的 Job 来处理这些数据。

然而,在默认的配置下,这种方法在失败的情况下会丢失数据,为了保证零数据丢失,可以在 SS 中使用 WAL 日志,这是在 Spark 1.2.0 才引入的功能,这使得接收到的数据可以保存到 WAL 中(WAL 日志可以存储在 HDFS 上),所以在失败的时候,可以从 WAL 中恢复,而不至于丢失数据。

<aside> 💡 不常用

</aside>