最近工作中是做日志分析的平台,采用了sparkstreaming+kafka,采用kafka主要是看中了它对大数据量处理的高性能,处理日志类应用再好不过了,采用了sparkstreaming的流处理框架 主要是考虑到它本身是基于spark核心的,以后的批处理可以一站式服务,并且可以提供准实时服务到elasticsearch中,可以实现准实时定位系统日志。

前言

最近工作中是做日志分析的平台,采用了SparkStreaming+Kafka,采用kafka主要是看中了它对大数据量处理的高性能,处理日志类应用再好不过了,采用了sparkstreaming的流处理框架 主要是考虑到它本身是基于spark核心的,以后的批处理可以一站式服务,并且可以提供准实时服务到elasticsearch中,可以实现准实时定位系统日志。

实现

Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式。

一. 基于Receiver方式

这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。代码如下:

  1. SparkConfsparkConf=newSparkConf().setAppName("log-etl").setMaster("local[4]");
  2. JavaStreamingContextjssc=newJavaStreamingContext(sparkConf,newDuration(2000));
  3. intnumThreads=Integer.parseInt("4");
  4. Map<String,Integer>topicMap=newHashMap<String,Integer>();
  5. topicMap.put("group-45",numThreads);
  6. //接收的参数分别是JavaStreamingConetxt,zookeeper连接地址,groupId,kafak的topic
  7. JavaPairReceiverInputDStream<String,String>messages=
  8. KafkaUtils.createStream(jssc,"172.16.206.27:2181,172.16.206.28:2181,172.16.206.29:2181","1",topicMap);

刚开始的时候系统正常运行,没有发现问题,但是如果系统异常重新启动sparkstreaming程序后,发现程序会重复处理已经处理过的数据,这种基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。官方现在也已经不推荐这种整合方式,官网相关地址 http://spark.apache.org/docs/latest/streaming-kafka-integration.html ,下面我们使用官网推荐的第二种方式kafkaUtils的createDirectStream()方式。

二.基于Direct的方式

这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的***的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

代码如下:

  1. SparkConfsparkConf=newSparkConf().setAppName("log-etl");
  2. JavaStreamingContextjssc=newJavaStreamingContext(sparkConf,Durations.seconds(2));
  3. HashSet<String>topicsSet=newHashSet<String>(Arrays.asList(topics.split(",")));
  4. HashMap<String,String>kafkaParams=newHashMap<String,String>();
  5. kafkaParams.put("metadata.broker.list",brokers);
  6. //Createdirectkafkastreamwithbrokersandtopics
  7. JavaPairInputDStream<String,String>messages=KafkaUtils.createDirectStream(
  8. jssc,
  9. String.class,
  10. String.class,
  11. StringDecoder.class,
  12. StringDecoder.class,
  13. kafkaParams,
  14. topicsSet
  15. );

这种Direct方式的优点如下:

1.简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

2.一次且仅一次的事务机制:基于receiver的方式,在spark和zk中通信,很有可能导致数据的不一致。

3.高效率:在receiver的情况下,如果要保证数据的不丢失,需要开启wal机制,这种方式下,为、数据实际上被复制了两份,一份在kafka自身的副本中,另外一份要复制到wal中, direct方式下是不需要副本的。

三.基于Direct方式丢失消息的问题

貌似这种方式很***,但是还是有问题的,当业务需要重启sparkstreaming程序的时候,业务日志依然会打入到kafka中,当job重启后只能从***的offset开始消费消息,造成重启过程中的消息丢失。kafka中的offset如下图(使用kafkaManager实时监控队列中的消息):

 SparkStreaming与Kafka整合遇到的问题及解决方案(kafka对接sparkstreaming) SparkStreaming Kafka Direct 第1张

当停止业务日志的接受后,先重启spark程序,但是发现job并没有将先前打入到kafka中的数据消费掉。这是因为消息没有经过zk,topic的offset也就没有保存

四.解决消息丢失的处理方案

一般有两种方式处理这种问题,可以先spark streaming 保存offset,使用spark checkpoint机制,第二种是程序中自己实现保存offset逻辑,我比较喜欢第二种方式,以为这种方式可控,所有主动权都在自己手中。

先看下大体流程图,

 SparkStreaming与Kafka整合遇到的问题及解决方案(kafka对接sparkstreaming) SparkStreaming Kafka Direct 第2张

  1. SparkConfsparkConf=newSparkConf().setMaster("local[2]").setAppName("log-etl");
  2. Set<String>topicSet=newHashSet<String>();
  3. topicSet.add("group-45");
  4. kafkaParam.put("metadata.broker.list","172.16.206.17:9092,172.16.206.31:9092,172.16.206.32:9092");
  5. kafkaParam.put("group.id","simple1");
  6. //transformjavaMaptoscalaimmutable.map
  7. scala.collection.mutable.Map<String,String>testMap=JavaConversions.mapAsScalaMap(kafkaParam);
  8. scala.collection.immutable.Map<String,String>scalaKafkaParam=
  9. testMap.toMap(newPredef.$less$colon$less<Tuple2<String,String>,Tuple2<String,String>>(){
  10. publicTuple2<String,String>apply(Tuple2<String,String>v1){
  11. returnv1;
  12. }
  13. });
  14. //initKafkaCluster
  15. kafkaCluster=newKafkaCluster(scalaKafkaParam);
  16. scala.collection.mutable.Set<String>mutableTopics=JavaConversions.asScalaSet(topicSet);
  17. immutableTopics=mutableTopics.toSet();
  18. scala.collection.immutable.Set<TopicAndPartition>topicAndPartitionSet2=kafkaCluster.getPartitions(immutableTopics).right().get();
  19. //kafkadirectstream初始化时使用的offset数据
  20. Map<TopicAndPartition,Long>consumerOffsetsLong=newHashMap<TopicAndPartition,Long>();
  21. //没有保存offset时(该group***消费时),各个partitionoffset默认为0
  22. if(kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"),topicAndPartitionSet2).isLeft()){
  23. System.out.println(kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"),topicAndPartitionSet2).left().get());
  24. Set<TopicAndPartition>topicAndPartitionSet1=JavaConversions.setAsJavaSet((scala.collection.immutable.Set)topicAndPartitionSet2);
  25. for(TopicAndPartitiontopicAndPartition:topicAndPartitionSet1){
  26. consumerOffsetsLong.put(topicAndPartition,0L);
  27. }
  28. }
  29. //offset已存在,使用保存的offset
  30. else{
  31. scala.collection.immutable.Map<TopicAndPartition,Object>consumerOffsetsTemp=kafkaCluster.getConsumerOffsets("simple1",topicAndPartitionSet2).right().get();
  32. Map<TopicAndPartition,Object>consumerOffsets=JavaConversions.mapAsJavaMap((scala.collection.immutable.Map)consumerOffsetsTemp);
  33. Set<TopicAndPartition>topicAndPartitionSet1=JavaConversions.setAsJavaSet((scala.collection.immutable.Set)topicAndPartitionSet2);
  34. for(TopicAndPartitiontopicAndPartition:topicAndPartitionSet1){
  35. Longoffset=(Long)consumerOffsets.get(topicAndPartition);
  36. consumerOffsetsLong.put(topicAndPartition,offset);
  37. }
  38. }
  39. JavaStreamingContextjssc=newJavaStreamingContext(sparkConf,newDuration(5000));
  40. kafkaParamBroadcast=jssc.sparkContext().broadcast(kafkaParam);
  41. //createdirectstream
  42. JavaInputDStream<String>message=KafkaUtils.createDirectStream(
  43. jssc,
  44. String.class,
  45. String.class,
  46. StringDecoder.class,
  47. StringDecoder.class,
  48. String.class,
  49. kafkaParam,
  50. consumerOffsetsLong,
  51. newFunction<MessageAndMetadata<String,String>,String>(){
  52. publicStringcall(MessageAndMetadata<String,String>v1)throwsException{
  53. System.out.println("接收到的数据《《==="+v1.message());
  54. returnv1.message();
  55. }
  56. }
  57. );
  58. //得到rdd各个分区对应的offset,并保存在offsetRanges中
  59. finalAtomicReference<OffsetRange[]>offsetRanges=newAtomicReference<OffsetRange[]>();
  60. JavaDStream<String>javaDStream=message.transform(newFunction<JavaRDD<String>,JavaRDD<String>>(){
  61. publicJavaRDD<String>call(JavaRDD<String>rdd)throwsException{
  62. OffsetRange[]offsets=((HasOffsetRanges)rdd.rdd()).offsetRanges();
  63. offsetRanges.set(offsets);
  64. returnrdd;
  65. }
  66. });
  67. //output
  68. javaDStream.foreachRDD(newFunction<JavaRDD<String>,Void>(){
  69. publicVoidcall(JavaRDD<String>v1)throwsException{
  70. if(v1.isEmpty())returnnull;
  71. List<String>list=v1.collect();
  72. for(Strings:list){
  73. System.out.println("数据==="+s);
  74. }
  75. for(OffsetRangeo:offsetRanges.get()){
  76. //封装topic.partition与offset对应关系javaMap
  77. TopicAndPartitiontopicAndPartition=newTopicAndPartition(o.topic(),o.partition());
  78. Map<TopicAndPartition,Object>topicAndPartitionObjectMap=newHashMap<TopicAndPartition,Object>();
  79. topicAndPartitionObjectMap.put(topicAndPartition,o.untilOffset());
  80. //转换javamaptoscalaimmutable.map
  81. scala.collection.mutable.Map<TopicAndPartition,Object>testMap=
  82. JavaConversions.mapAsScalaMap(topicAndPartitionObjectMap);
  83. scala.collection.immutable.Map<TopicAndPartition,Object>scalatopicAndPartitionObjectMap=
  84. testMap.toMap(newPredef.$less$colon$less<Tuple2<TopicAndPartition,Object>,Tuple2<TopicAndPartition,Object>>(){
  85. publicTuple2<TopicAndPartition,Object>apply(Tuple2<TopicAndPartition,Object>v1){
  86. returnv1;
  87. }
  88. });
  89. //更新offset到kafkaCluster
  90. kafkaCluster.setConsumerOffsets(kafkaParamBroadcast.getValue().get("group.id"),scalatopicAndPartitionObjectMap);
  91. System.out.println("原数据====》"+o.topic()+""+o.partition()+""+o.fromOffset()+""+o.untilOffset()
  92. );
  93. }
  94. returnnull;
  95. }
  96. });
  97. jssc.start();
  98. jssc.awaitTermination();
  99. }

基本使用这种方式就可以解决数据丢失的问题。

转载请说明出处
知优网 » SparkStreaming与Kafka整合遇到的问题及解决方案(kafka对接sparkstreaming)

发表评论

您需要后才能发表评论