image.png

1.1 Data Sink 数据输出

经过一系列Transformation转换操作后,最后一定要调用Sink操作,才会形成一个完整的DataFlow拓扑。只有调用了Sink操作,才会产生最终的计算结果,这些数据可以写入到的文件、输出到指定的网络端口、消息中间件、外部的文件系统或者是打印到控制台。

1.1.1 print 打印

打印是最简单的一个Sink,通常是用来做实验和测试时使用。如果想让一个DataStream输出打印的结果,直接可以在该DataStream调用print方法。另外,该方法还有一个重载的方法,可以传入一个字符,指定一个Sink的标识名称,如果有多个打印的Sink,用来区分到底是哪一个Sink的输出。

复制代码

package cn._51doit.flink.day02;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.RuntimeContext;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.flink.util.Collector;public class PrintSinkDemo {    public static void main(String[] args) throws Exception {        //local模式默认的并行度是当前机器的逻辑核的数量
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);        int parallelism0 = env.getParallelism();

        System.out.println("执行环境默认的并行度:" + parallelism0);

        DataStreamSource<String> lines = env.socketTextStream("cs-28-86", 8888);        //获取DataStream的并行度
        int parallelism = lines.getParallelism();

        System.out.println("SocketSource的并行度:" + parallelism);

        lines.print();        //lines.addSink(new MyPrintSink()).name("my-print-sink");
        env.execute();


    }    public static class MyPrintSink extends RichSinkFunction<String> {        private int indexOfThisSubtask;
        @Override        public void open(Configuration parameters) throws Exception {
            RuntimeContext runtimeContext = getRuntimeContext();
            indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
        }

        @Override        public void invoke(String value, Context context) throws Exception {

            System.out.println(indexOfThisSubtask + 1 + "> " + value);
        }
    }
}

复制代码


下面的结果是WordCount例子中调用print Sink输出在控制台的结果,细心的读者会发现,在输出的单词和次数之前,有一个数字前缀,我这里是1~4,这个数字是该Sink所在subtask的Index + 1。有的读者运行的结果数字前缀是1~8,该数字前缀其实是与任务的并行度相关的,由于该任务是以local模式运行,默认的并行度是所在机器可用的逻辑核数即线程数,我的电脑是2核4线程的,所以subtask的Index范围是0~3,将Index + 1,显示的数字前缀就是1~4了。这里在来仔细的观察一下运行的结果发现:相同的单词输出结果的数字前缀一定相同,即经过keyBy之后,相同的单词会被shuffle到同一个subtask中,并且在同一个subtask的同一个组内进行聚合。一个subtask中是可能有零到多个组的,如果是有多个组,每一个组是相互独立的,累加的结果不会相互干扰。

1.1.2 writerAsText 以文本格式输出

已writer开头的sink 现在基本都已过时了;

该方法是将数据以文本格式实时的写入到指定的目录中,本质上使用的是TextOutputFormat格式写入的。每输出一个元素,在该内容后面同时追加一个换行符,最终以字符的形式写入到文件中,目录中的文件名称是该Sink所在subtask的Index + 1。该方法还有一个重载的方法,可以额外指定一个枚举类型的参数writeMode,默认是WriteMode.NO_OVERWRITE,如果指定相同输出目录下有相同的名称文件存在,就会出现异常。如果是WriteMode.OVERWRITE,会将以前的文件覆盖。

复制代码

package cn._51doit.flink.day02;import org.apache.flink.api.common.functions.RuntimeContext;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;public class WriteSinkDemo {    public static void main(String[] args) throws Exception {        //local模式默认的并行度是当前机器的逻辑核的数量
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);        int parallelism0 = env.getParallelism();

        System.out.println("执行环境默认的并行度:" + parallelism0);

        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);        //获取DataStream的并行度
        int parallelism = lines.getParallelism();

        System.out.println("SocketSource的并行度:" + parallelism);

        lines.writeAsText("file:///Users/xing/Desktop/out");

        env.execute();


    }    public static class MyPrintSink extends RichSinkFunction<String> {        private int indexOfThisSubtask;
        @Override        public void open(Configuration parameters) throws Exception {
            RuntimeContext runtimeContext = getRuntimeContext();
            indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
        }

        @Override        public void invoke(String value, Context context) throws Exception {

            System.out.println(indexOfThisSubtask + 1 + "> " + value);
        }
    }
}

复制代码


1.1.3 writeAsCsv 以csv格式输出

该方法是将数据以csv格式写入到指定的目录中,本质上使用的是CsvOutputFormat格式写入的。每输出一个元素,在该内容后面同时追加一个换行符,最终以csv的形式(类似Excel的格式,字段和字段之间用逗号分隔)写入到文件中,目录中的文件名称是该Sink所在subtask的Index + 1。需要说明的是,该Sink并不是将数据实时的写入到文件中,而是有一个BufferedOutputStream,默认缓存的大小为4096个字节,只有达到这个大小,才会flush到磁盘。另外程序在正常退出,调用Sink的close方法也会flush到磁盘。

DataStream<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);
result.writeAsCsv(path);

1.1.4 writeUsingOutputFormat 以指定的格式输出

该方法是将数据已指定的格式写入到指定目录中,该方法要传入一个OutputFormat接口的实现类,该接口有很多已经实现好了的实现类,并且可以根据需求自己实现,所以该方法更加灵活。writeAsText和writeAsCsv方法底层都是调用了writeUsingOutputFormat方法。

DataStream<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);
result.writeUsingOutputFormat(new TextOutputFormat<>(new Path(path));

1.1.5 writeToSocket 输出到网络端口

该方法是将数据输出到指定的Socket网络地址端口。该方法需要传入三个参数:第一个为ip地址或主机名,第二个为端口号,第三个为数据输出的序列化格式SerializationSchema。输出之前,指定的网络端口服务必须已经启动。

DataStreamSource<String> lines = env.socketTextStream(“localhost”, 8888);
lines.writeToSocket(“localhost”, 9999, new SimpleStringSchema());

1.1.6 RedisSink

该方法是将数据输出到Redis数据库中,Redis是一个基于内存、性能极高的NoSQL数据库,数据还可以持久化到磁盘,读写速度快,适合存储key-value类型的数据。Redis不仅仅支持简单的key-value类型的数据,同时还提供list,set,zset,hash等数据结构的存储。Flink实时计算出的结果,需要快速的输出存储起来,要求写入的存储系统的速度要快,这个才不会造成数据积压。Redis就是一个非常不错的选择。

首先在maven项目中的pom.xml中添加Redis Sink的依赖。

<!-- redis 依赖--><dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_${scala.binary.version}</artifactId>
    <version>1.1-SNAPSHOT</version></dependency>

接下来就是定义一个类(或者静态内部类)实现RedisMapper即可,需要指定一个泛型,这里是Tuple2<String, Integer>,即写入到Redis中的数据的类型,并实现三个方法。第一个方法是getCommandDescription方法,返回RedisCommandDescription实例,在该构造方法中可以指定写入到Redis的方法类型为HSET,和Redis的additionalKey即value为HASH类型外面key的值;第二个方法getKeyFromData是指定value为HASH类型对应key的值;第三个方法geVauleFromData是指定value为HASH类型对应value的值。

在使用之前,先new FlinkJedisPoolConfig,设置Redis的ip地址或主机名、端口号、密码等。然后new RedisSink将准备好的conf和RedisWordCountMapper实例传入到其构造方法中,最后调用DataStream的addSink方法,将new好的RedisSink作为参数传入。

复制代码

package cn._51doit.flink.day02;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.redis.RedisSink;import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import org.apache.flink.util.Collector;/**
 * 从指定的socket读取数据,对单词进行计算,将结果写入到Redis中 */public class RedisSinkDemo {    public static void main(String[] args) throws Exception {        //创建Flink流计算执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //创建DataStream        //Source
        DataStreamSource<String> lines = env.socketTextStream("cs-28-86", 8888);        //调用Transformation开始        //调用Transformation
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = line.split(" ");                for (String word : words) {                    //new Tuple2<String, Integer>(word, 1)
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });        //分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override            public String getKey(Tuple2<String, Integer> tp) throws Exception {                return tp.f0;
            }
        });        //聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);        //Transformation结束        //调用Sink        //summed.addSink()
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setDatabase(0).build();

        summed.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));        //启动执行
        env.execute("StreamingWordCount");

    }    public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {

        @Override        public RedisCommandDescription getCommandDescription() {            return new RedisCommandDescription(RedisCommand.HSET, "WORD_COUNT");
        }

        @Override        public String getKeyFromData(Tuple2<String, Integer> data) {            return data.f0;
        }

        @Override        public String getValueFromData(Tuple2<String, Integer> data) {            return data.f1.toString();
        }
    }

}

复制代码

  image.png    

1.1.7 KafkaSink

在实际的生产环境中,经常会有一些场景,需要将Flink处理后的数据快速地写入到一个分布式、高吞吐、高可用、可用保证Exactly Once的消息中间件中,供其他的应用消费处理后的数据。Kafka就是Flink最好的黄金搭档,Flink不但可以从Kafka中消费数据,还可以将处理后的数据写入到Kafka,并且吞吐量高、数据安全、可以保证Exactly Once等。

Flink可以和Kafka多个版本整合,比如0.11.x、1.x、2.x等,从Flink1.9开始,使用的是kafka 2.2的客户端,所以这里使用kafka的版本是2.2.2,并且使用最新的API。

下面的例子就是将数据写入到Kafka中,首先要定义一个类实现KafkaSerializationSchema接口,指定一个泛型,String代表要写入到Kafka的数据为String类型。该类的功能是指定写入到Kafka中数据的序列化Schema,需要重写serialize方法,将要写入的数据转成二进制数组,并封装到一个ProducerRecord中返回。

复制代码

package cn._51doit.flink.day02;import org.apache.flink.api.common.functions.RuntimeContext;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;public class KafkaSinkDemo {    public static void main(String[] args) throws Exception {        //local模式默认的并行度是当前机器的逻辑核的数量
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);        int parallelism0 = env.getParallelism();

        System.out.println("执行环境默认的并行度:" + parallelism0);

        DataStreamSource<String> lines = env.socketTextStream("cs-28-86", 8888);        //获取DataStream的并行度
        int parallelism = lines.getParallelism();

        System.out.println("SocketSource的并行度:" + parallelism);        //lines.writeAsText("file:///Users/xing/Desktop/out");
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(                "cs-28-87:9092,cs-28-88:9092,cs-28-89:9092", "wordcount18", new SimpleStringSchema()
        );

        lines.addSink(kafkaProducer);

        env.execute();

    }

}

复制代码

启动nc –lk 8888 ,然后启动上述代码程序;

在nc窗口中输入数据,使用kafka可以消费到;

image.png

kafka消费wordcount18的topic:

[root@cs-28-88 ~]# kafka-console-consumer –zookeeper cs-28-88:2181 –topic wordcount18

然后将Kafka相关的参数设置到Properties中,再new FlinkKafkaProducer,将要写入的topic名称、Kafka序列化Schema、Properties和写入到Kafka的Semantic语义作为FlinkKafkaProducer构造方法参数传入。最好调用addSink方法将FlinkKafkaProducer的引用传入到该方法中。虽然下面的代码指定了EXACTLY_ONCE语义,但是没有开启Checkpointing,是没法实现的。具有怎样实现Exactly Once,会在后面原理深入的章节进行讲解。

1.1.8 StreamFileDataSink

实时处理的数据,有一些场景要输出到其他分布式文件系统中,比如Hadoop HDFS、Amazon S3 (Simple Storage Service)、Aliyun OSS(Object Storage Service)等。因为这些分布式文件系统都具有高可用、可扩展、多副本、存储海量数据等特点。存储到分布式文件系统的数据,就可以做一些离线的数据分析,比如离线的数仓、数据挖掘、机器学习等。

从Flink 1.9开始,原来的Bucketing Sink已经标记为过时,在未来的版本将会被移除。推荐使用StreamFileDataSink,该Sink不但可以将数据写入到各种文件系统中,可以保证Exacly Once语义,还支持以列式存储的格式写入,功能更强大。

下面的例子是将数据写入到HDFS中,首先在maven项目的pom.xml文件引入HDFS文件系统的依赖:

复制代码

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-filesystem_2.12</artifactId>
    <version>1.12-SNAPSHOT</version></dependency><dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.0</version></dependency>

复制代码

通过DefaultRollingPolicy这个工具类,指定文件滚动生成的策略。这里设置的文件滚动生成策略有两个,一个是距离上一次生成文件时间超过30秒,另一个是文件大小达到100 mb。这两个条件只要满足其中一个即可滚动生成文件。然后StreamingFileSink.forRowFormat方法将文件输出目录、文件写入的编码传入,再调用withRollingPolicy关联上面的文件滚动生成策略,接着调用build方法构建好StreamingFileSink,最后将其作为参数传入到addSink方法中。

1.1.9 JDBCSink

复制代码

package cn._51doit.flink.day02;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.connector.jdbc.JdbcConnectionOptions;import org.apache.flink.connector.jdbc.JdbcSink;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/**
 * 从指定的socket读取数据,对单词进行计算,最后将结果写入到MySQL */public class JDBCSinkDemo {    public static void main(String[] args) throws Exception {        //创建Flink流计算执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(5000);        //创建DataStream        //Source
        DataStreamSource<String> lines = env.socketTextStream("cs-28-86", 8888);        //调用Transformation开始        //调用Transformation
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = line.split(" ");                for (String word : words) {                    //new Tuple2<String, Integer>(word, 1)
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });        //分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override            public String getKey(Tuple2<String, Integer> tp) throws Exception {                return tp.f0;
            }
        });        //聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);

        summed.addSink(JdbcSink.sink(                "INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = ?",
                (ps, t) -> {
                    ps.setString(1, t.f0);
                    ps.setInt(2, t.f1);
                    ps.setInt(3, t.f1);
                },                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://localhost:3306/bigdata?characterEncoding=utf-8")
                        .withDriverName("com.mysql.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("123456")
                        .build()));        //启动执行
        env.execute("StreamingWordCount");

    }
}


转自:https://www.cnblogs.com/weiyiming007/p/15891483.html