原创

Flink的Dataflows DataSource数据源

Flink内嵌支持的数据源非常多,比如HDFS、Socket、Kafka、Collections Flink也提供了addSource方式,可以自定义数据源,本小节将讲解Flink所有内嵌数据源及自定义数据源的原理及API

File Source
通过读取本地、HDFS文件创建一个数据源
如果读取的是HDFS上的文件,那么需要导入Hadoop依赖

<dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-client</artifactId>
 <version>2.6.5</version> 
</dependency>

代码:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
//在算子转换的时候,会将数据转换成Flink内置的数据类型,所以需要将隐式转换导入进来,才能自动进行 类型转换
import org.apache.flink.streaming.api.scala._ 
object FileSource {
 def main(args: Array[String]): Unit = { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val textStream = env.readTextFile("hdfs://node01:9000/flink/data/wc") 
    textStream.flatMap(_.split(" ")).map((_,1)).keyBy(0).sum(1).print() 
    //读完就停止 
    env.execute() 
  } 
}

每隔10s中读取HDFS指定目录下的新增文件内容,并且进行WordCount
业务场景:在企业中一般都会做实时的ETL,当Flume采集来新的数据,那么基于Flink实时做ETL入仓

import org.apache.flink.api.java.io.TextInputFormat 
import org.apache.flink.core.fs.Path 
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
//在算子转换的时候,会将数据转换成Flink内置的数据类型,所以需要将隐式转换导入进来,才能自动进行 类型转换 
import org.apache.flink.streaming.api.scala._ 
object FileSource { 
  def main(args: Array[String]): Unit = { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment //读取hdfs文件 
    val filePath = "hdfs://node01:9000/flink/data/" 
    val textInputFormat = new TextInputFormat(new Path(filePath)) 
    //每隔10s中读取 hdfs上新增文件内容 
    val textStream = env.readFile(textInputFormat,filePath,FileProcessingMode.PROCESS_CONTINUOUSLY,10 )
    // val textStream = env.readTextFile("hdfs://node01:9000/flink/data/wc") 
    textStream.flatMap(_.split(" ")).map((_,1)).keyBy(0).sum(1).print() 
    env.execute() 
  } 
}

readTextFile底层调用的就是readFile方法,readFile是一个更加底层的方式,使用起来会更加的灵活

Collection Source
基于本地集合的数据源,一般用于测试场景,没有太大意义

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object CollectionSource{
    def main(args:Array[String]):Unit={
        val env=StreamExecutionEnvironment.getExecutionEnvironment
        val stream=env.fromCollection(List("hello flink msb","hello msb msb"))
        stream.flatMap(_.split(" ")).map((_,1)).keyBy(0).sum(1).print()
        env.execute()
    }
}

Socket Source
接受Socket Server中的数据
val initStream:DataStream[String] = env.socketTextStream("node01",8888)

Kafka Source
Flink接受Kafka中的数据,首先先配置flink与kafka的连接器依赖
官网地址:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html
maven依赖:

<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka_2.11</artifactId>
 <version>1.9.2</version>
</dependency>

代码:

val env = StreamExecutionEnvironment.getExecutionEnvironment
    val prop = new Properties()
        prop.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
        prop.setProperty("group.id","flink-kafka-id001")
        prop.setProperty("key.deserializer",classOf[StringDeserializer].getName)
        prop.setProperty("value.deserializer",classOf[StringDeserializer].getName)
        /** earliest:从头开始消费,旧数据会频繁消费
         *  latest:从最近的数据开始消费,不再消费旧数据
         *
         */
        prop.setProperty("auto.offset.reset","latest")
    val kafkaStream = env.addSource
            (new FlinkKafkaConsumer[(String, String)]
                ("flink-kafka",new KafkaDeserializationSchema[(String,String)]

    {
        override def isEndOfStream(t:(String, String)):Boolean = false
        override def deserialize(consumerRecord:
        ConsumerRecord[Array[Byte], Array[Byte]]):(String, String) ={
        val key = new String(consumerRecord.key(), "UTF-8")
        val value = new String(consumerRecord.value(), "UTF-8")
        (key, value)}
        //指定返回数据类型
        override def getProducedType: TypeInformation[(String, String)] = createTuple2TypeInformation
            (createTypeInformation[String], createTypeInformation[String]) 
    }, prop
    ))
            kafkaStream.print()
            env.execute()

kafka命令消费key value值
kafka-console-consumer.sh --zookeeper node01:2181 --topic flink-kafka --property print.key=true
默认只是消费value值
KafkaDeserializationSchema:读取kafka中key、value
SimpleStringSchema:读取kafka中value

Custom Source
Sources are where your program reads its input from. You can attach a source to your program by using StreamExecutionEnvironment.addSource(sourceFunction) . Flink comes with a number of pre-implemented source functions, but you can always write your own custom sources by implementing the SourceFunction for non-parallel sources, or by implementing the ParallelSourceFunction interface or extending the RichParallelSourceFunction for parallel sources.

基于SourceFunction接口实现单并行度数据源

val env = StreamExecutionEnvironment.getExecutionEnvironment 
//source的并行度为1 单并行度source源 
val stream = env.addSource(new SourceFunction[String] { 
	var flag = true 
	override def run(ctx: SourceFunction.SourceContext[String]): Unit = { 
		val random = new Random() 
		while (flag) { 
		ctx.collect("hello" + random.nextInt(1000)) 
		Thread.sleep(200) 
		} 
	}
	//停止产生数据 
	override def cancel(): Unit = flag = false 
})
stream.print() 
env.execute()

基于ParallelSourceFunction接口实现多并行度数据源

public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {}
public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction implements ParallelSourceFunction<OUT> { 
	private static final long serialVersionUID = 1L;
 }
实现ParallelSourceFunction接口=继承RichParallelSourceFunction
val env = StreamExecutionEnvironment.getExecutionEnvironment 
val sourceStream = env.addSource(new ParallelSourceFunction[String] { 
	var flag = true 
	override def run(ctx: SourceFunction.SourceContext[String]): Unit = { 
			val random = new Random() 
			while (flag) { 
			ctx.collect("hello" + random.nextInt(1000)) 
			Thread.sleep(500) 
		} 
	}
	override def cancel(): Unit = { 
		flag = false 
	} 
}).setParallelism(2)

数据源可以设置为多并行度


本文链接地址:http://www.ysxbohui.com/article/264

正文到此结束