Flume+Spark整合案例

Spark Streaming + Flume

有两个整合方式

1 Push

需要先启动Spark Streaming 程序,然后启动Flume

Flume 配置

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# sources
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop1
a1.sources.r1.port = 12345
# sinks
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop1
a1.sinks.k1.port = 44444
# channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

依赖的Jar包

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.6.3</version>
</dependency>

Spark Streaming 程序

val conf = new SparkConf()
      .setAppName("sparkStreaming")
      .setMaster("local[2]")
# 控制台输出的日志太多不方便测试将日志输出级别降低
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

val ssc = new StreamingContext(conf, Seconds(5))

val flumeStream = FlumeUtils.createStream(ssc, "hadoop1", 44444)

flumeStream.map(x => new String(x.event.getBody.array()).trim)
    .flatMap(_.split(",")).map((_, 1))
    .reduceByKey(_+_)
    .print()

ssc.start()
ssc.awaitTermination()

2 Pull (推荐使用)

Pull支持事务,可以提高容错性。

Flume 配置

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# sources
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop1
a1.sources.r1.port = 12345
# sinks
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = hadoop1
a1.sinks.k1.port = 44444
a1.sinks.k1.channel = memoryChannel
# channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

依赖Jar包

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume-sink_2.10</artifactId>
    <version>1.6.3</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.3.2</version>
</dependency>
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.10.5</version>
</dependency>

Spark Streaming 代码

val ssc = new StreamingContext(conf, Seconds(5))
val flumeStream = FlumeUtils.createPollingStream(ssc, "hadoop1", 44444)

flumeStream.map(x => new String(x.event.getBody.array()).trim)
    .flatMap(_.split(",")).map((_, 1))
    .reduceByKey(_+_)
    .print()

ssc.start()
ssc.awaitTermination()

参考:

Spark Streaming + Flume Integration Guide