image.png

 

1.1 Data Source数据源

在实时计算DataStream API中,Source是用来获取外部数据源的操作,按照获取数据的方式,可以分为:基于集合的Source、基于Socket网络端口的Source、基于文件的Source、第三方Connector Source和自定义Source五种。前三种Source是Flink已经封装好的方法,这些Source只要调用StreamExecutionEnvironment的对应方法就可以创建DataStream了,使用起来比较简单,我们在学习和测试的时候会经常用到。如果以后生产环境想要从一些分布式、高可用的消息中间件中读取数据,可以使用第三方Connector Source,比如Apache Kafka Source、AWS Kinesis Source、Google Cloud PubSub Source等(国内公司使用比较多的是Kafka这个消息中间件作为数据源),使用这些第三方的Source,需要额外引入对应消息中间件的依赖jar包。于此同时Flink允许开发者根据自己的需求,自定义各种Source,只要实现SourceFunction这个接口,然后将该实现类的实例作为参数传入到StreamExecutionEnvironment的addSource方法就可以了,这样大大的提高了Flink与外部数据源交互的灵活性。

从并行度的角度,Source又可以分为非并行的Source和并行的Source。非并行的Source它的并行度只能为1,即用来读取外部数据源的Source只有一个实例,在读取大量数据时效率比较低,通常是用来做一些实验或测试,例如Flink的Socket网络端口读取数据的Source就是一个非并行的Source;并行的Source它的并行度可以是1到多个,即用来读取外部数据源的Source可以有一个到多个实例(在分布式计算中,并行度是影响吞吐量一个非常重要的因素,在计算资源足够的前提下,并行度越大,效率越高)。例如Kafka Source就是并行的Source。

 

多并行Source:

image.png

 

非并行的Source:

image.png

 

单并行的Source直接实现了SourceFunction接口;

多并行的Source,可以继承RichParallelSourceFunction或实现parallelSourceFunction接口;

 

1.1.1 基于集合的Source

基于集合的Source是将一个普通的Java集合、迭代器或者可变参数转换成一个分布式数据集DataStreamSource,它是DataStream的子类,所以也可以使用DataStream类型来引用。得到DataStream后就可以调用Transformation或Sink度数据进行处理了。

1. fromElements

fromElements(T …) 方法是一个非并行的Source,可以将一到多个数据作为可变参数传入到该方法中,返回DataStreamSource;

并行度为1 ,是一个有限的数据流,程序执行完就退出,通常用来作实验;

复制代码

public class FromElementDemo {    public static void main(String[] args) throws Exception {      //创建流计算执行上下文环境
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();      //指定多个相同类型的数据创建DataStream
      DataStreamSource<String> words = env.fromElements(“flink”, “hadoop”, “flink”);      //调用Sink将数据在控制台打印      words.print();      //执行      env.execute(“FromElementDemo”);
    }
}

复制代码

 

  • fromCollection

fromCollection(Collection) 方法也是一个非并行的Source,可以将一个Collection类型的数据作为参数传入到该方法中,返回一个DataStreamSource;

并行度为1 ,是一个有限的数据流,程序执行完就退出,通常用来作实验;

//创建一个List List<String> wordList = Arrays.asList(“flink”, “spark”, “hadoop”, “flink”);

//将List并行化成DataStream DataStreamSource<String> words = env.fromCollection(wordList);

 

  • fromParallelCollection

fromParallelCollection(SplittableIterator, Class) 方法是一个并行的Source(并行度可以使用env的setParallelism来设置),该方法需要传入两个参数,第一个是继承SplittableIterator的实现类的迭代器,第二个是迭代器中数据的类型。

多并行,有限的数据流,程序执行完就退出,通常用来作实验;

复制代码

package cn._51doit.flink;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.NumberSequenceIterator;

//多并行的Source
public class FromParCollection {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Long> nums = env.fromParallelCollection(new NumberSequenceIterator(1L, 20L), Long.class);

        int parallelism = nums.getParallelism();

        System.out.println("fromParallelCollection创建的DataStream的并行度为:" + parallelism);

        nums.print();

        env.execute();

    }
}

复制代码

 

  • generateSequence

generateSequence(long from, long to) 方法是一个并行的Source(并行度也可以通过调用该方法后,再调用setParallelism来设置)该方法需要传入两个long类型的参数,第一个是起始值,第二个是结束值,返回一个DataStreamSource。

//调用env的generateSequence生成并行的DataSource,输出的数字是1到100DataStreamSource<Long> numbers = env.generateSequence(1L, 100L).setParallelism(3);

 

1.1.2 基于Socket的Source

socketTextStream(String hostname, int port) 方法是一个非并行的Source,该方法需要传入两个参数,第一个是指定的IP地址或主机名,第二个是端口号,即从指定的Socket读取数据创建DataStream。该方法还有多个重载的方法,其中一个是socketTextStream(String hostname, int port, String delimiter, long maxRetry),这个重载的方法可以指定行分隔符和最大重新连接次数。这两个参数,默认行分隔符是”\n”,最大重新连接次数为0。

//调用env的socketTextStream方法,从指定的Socket地址和端口创建DataStreamDataStreamSource<String> lines = env.socketTextStream(“localhost”, 8888);

 

提示:如果使用socketTextStream读取数据,在启动Flink程序之前,必须先启动一个Socket服务,为了方便,Mac或Linux用户可以在命令行终端输入nc -lk 8888启动一个Socket服务并在命令行中向该Socket服务发送数据。Windows用户可以在百度中搜索windows安装netcat命令。

 

1.1.3 基于文件的Source

基于文件的Source,本质上就是使用指定的FileInputFormat格式读取数据,可以指定TextInputFormat、CsvInputFormat、BinaryInputFormat等格式,基于文件的Source底层都是ContinuousFileMonitoringFunction,这个类继承了RichSourceFunction,它们都是非并行的Source。

  1. readFile

readFile(FileInputFormat inputFormat, String filePath) 方法可以指定读取文件的FileInputFormat 格式,其中一个重载的方法readFile(FileInputFormat inputFormat, String filePath, FileProcessingMode watchType, long interval) 可以指定FileProcessingMode,它有两个枚举类型分别是PROCESS_ONCE和PROCESS_CONTINUOUSLY模式,PROCESS_ONCE模式Source只读取文件中的数据一次,读取完成后,程序退出。PROCESS_CONTINUOUSLY模式Source会一直监听指定的文件,如果使用该模式,需要指定检测该文件是否发生变化的时间间隔,但是使用这种模式,文件的内容发生变化后,会将这个变化的文件以前的内容和新的内容全部都读取出来,进而造成数据重复读取,是一个多并行Source。

String path = “file:///Users/xing/Desktop/a.txt”;//PROCESS_CONTINUOUSLY模式是一直监听指定的文件或目录,2秒钟检测一次文件是否发生变化DataStreamSource<String> lines = env.readFile(new TextInputFormat(null), path,
        FileProcessingMode.PROCESS_CONTINUOUSLY, 2000);

 

  • readTextFile

readTextFile(String filePath) 可以从指定的目录或文件读取数据,默认使用的是TextInputFormat格式读取数据,还有一个重载的方法readTextFile(String filePath, String charsetName)可以传入读取文件指定的字符集,默认是UTF-8编码。该方法是一个有限的数据源,数据读完后,程序就会退出,不能一直运行。该方法底层调用的是readFile方法,FileProcessingMode为PROCESS_ONCE

DataStreamSource<String> lines = env.readTextFile(path);

 

1.1.4 第三方Connector Source

在现实生产环境中,为了保证flink可以高效地读取数据源中的数据,通常是跟一些分布式消息中件结合使用,例如Apache Kafka。Kafka的特点是分布式、多副本、高可用、高吞吐、可以记录偏移量等。Flink和Kafka整合可以高效的读取数据,并且可以保证Exactly Once(精确一次性语义)。

  1. Kafka Consumer

首先在maven项目的pom.xml文件中导入Flink跟Kafka整合的依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

 

在代码中,创建一个Properties对象,然后设置Kafka的地址和端口、读取偏移量的策略、消费者组ID、并且设置Source读取完Kafka的数据后,定期更新偏移量。然后new FlinkKafkaConsumer,指定三个参数,第一个参数是topic名称;第二个参数是读取文件的反序列化Schema,SimpleStringSchema指的是读取Kafka中的数据反序列化成String格式;第三个参数是传入事先new 好的Properties实例。然后调用env的addSource方法将FlinkKafkaConsumer的实例传入,这样就创建好了一个DataSteamSource。

复制代码

//设置Kafka相关参数Properties properties = new Properties();//设置Kafka的地址和端口properties.setProperty(“bootstrap.servers”, “node-1.:9092,node-2:9092,node-3:9092”);//读取偏移量策略:如果没有记录偏移量,就从头读,如果记录过偏移量,就接着读properties.setProperty(“auto.offset.reset”, “earliest”);//设置消费者组IDproperties.setProperty(“group.id”, “g1”);//没有开启checkpoint,让flink提交偏移量的消费者定期自动提交偏移量properties.setProperty(“enable.auto.commit”, “true”);//创建FlinkKafkaConsumer并传入相关参数FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
       “test”, //要读取数据的Topic名称new SimpleStringSchema(), //读取文件的反序列化Schemaproperties //传入Kafka的参数);//使用addSource添加kafkaConsumerDataStreamSource<String> lines = env.addSource(kafkaConsumer);

复制代码

 

注意:目前这种方式无法保证Exactly Once,Flink的Source消费完数据后,将偏移量定期的写入到Kafka的一个特殊的topic中,这个topic就是__consumer_offset,这种方式虽然可以记录偏移量,但是无法保证Exactly Once,后面学完了State后,再实现Exactly Once功能。

 

1.1.5 自定义Source

Flink的DataStream API可以让开发者根据实际需要,灵活的自定义Source,本质上就是定义一个类,实现SourceFunction这个接口,实现run方法和cancel方法。run方法中实现的就是获取数据的逻辑,然后调用SourceContext的collect方法,将获取的数据收集起来,这样就返回了一个新的DataStreamSource,但是如果只实现这个接口,该Source只能是一个非并行的Source。在生产环境,通常是希望Source可以并行的读取数据,这样读取数据的速度才更快,所以最好的方式是实现ParallelSourceFunction接口或继承RichParallelSourceFunction这个抽象类,同样实现实现run方法和cancel方法,这样该Source就是一个可以并行的Source了。其实所有的Source底层都是调用该的方法。下面是一个简单的并行的Source,后面学习了State和Checkpoint,再定义一个可以实现Exactly Once的并行的Source。

复制代码

public class MyParallelSource extends RichParallelSourceFunction<String> {    private int i = 1; //定义一个int类型的变量,从1开始
    private boolean flag = true; //定义一个flag标标志    //run方法就是用来读取外部的数据或产生数据的逻辑    @Override    public void run(SourceContext<String> ctx) throws Exception {        //满足while循环的条件,就将数据通过SourceContext收集起来
        while (i <= 10 && flag) {
            Thread.sleep(1000); //为避免太快,睡眠1秒
            ctx.collect(“data:” + i++); //将数据通过SourceContext收集起来        }
    }    //cancel方法就是让Source停止    @Override    public void cancel() {        //将flag设置成false,即停止Source
        flag = false;
    }
}

复制代码

转自 https://www.cnblogs.com/weiyiming007/p/15849875.html