Flink编程入门

1.1 初始化Flink项目模板

1.1.1 准备工作

要求安装Maven 3.0.4 及以上版本和JDK 8

1.1.2 使用maven命令创建java项目模板
  • 执行maven命令,如果maven本地仓库没有依赖的jar,需要有网络

复制代码

mvn archetype:generate  
-DarchetypeGroupId=org.apache.flink 
-DarchetypeArtifactId=flink-quickstart-java 
-DarchetypeVersion=1.12.0 
-DgroupId=cn._51doit.flink 
-DartifactId=flink-java 
-Dversion=1.0 
-Dpackage=cn._51doit.flink 
-DinteractiveMode=false

复制代码

image.png

命令执行完成后,项目就出现在了 D:\code\java_code

  • 或者在命令行中执行下面的命令,需要有网络

curl https://flink.apache.org/q/quickstart.sh | bash -s 1.12.0

1.1.3 使用maven命令创建scala项目模板
  • 执行maven命令,如果maven本地仓库没有依赖的jar,需要有网络

复制代码

mvn archetype:generate 
-DarchetypeGroupId=org.apache.flink 
-DarchetypeArtifactId=flink-quickstart-scala 
-DarchetypeVersion=1.12.0 
-DgroupId=cn._51doit.flink 
-DartifactId=flink-scala 
-Dversion=1.0 -Dpackage=cn._51doit.flink 
-DinteractiveMode=false

复制代码

image.png

  • 或者在命令行中执行下面的命令

curl https://flink.apache.org/q/quickstart-scala.sh | bash -s 1.12.0

1.1.4 将maven项目导入到IDEA或Eclipse

image.png

image.png

1.2 DataFlow编程模型

image.png

Flink提供了不同级别的编程抽象,通过调用抽象的数据集调用算子构建DataFlow就可以实现对分布式的数据进行流式计算和离线计算,

DataSet是批处理的抽象数据集,DataStream是流式计算的抽象数据集,他们的方法都分别为Source、Transformation、Sink;

  • Source主要负责数据的读取

  • Transformation主要负责对数据的转换操作

  • Sink负责最终计算好的结果数据输出。


1.3 Flink第一个入门程序

1.3.1 实时WordCount

从一个Socket端口中实时的读取数据,然后实时统计相同单词出现的次数,该程序会一直运行,启动程序前先使用 nc -lk 8888 启动一个socket用来发送数据

复制代码

package cn._51doit.flink;
package cn._51doit.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 从指定的socket读取数据,对单词进行计算
 *
 */
public class StreamingWordCount {
    public static void main(String[] args) throws Exception {
        //创建Flink流式计算的执行环境(ExecutionEnvironment)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //创建DataStream
        //Source
        DataStream<String> lines = env.socketTextStream("cs-28-86",8888);
        //调用Transformation开始
        //调用Transformation
        SingleOutputStreamOperator<String> wordsDataStream = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    collector.collect(word);

                }
            }
        });

        //将单词和1组合
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = wordsDataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);
            }
        });

        //分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tp) throws Exception {
                return tp.f0;
            }
        });

        //聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);

        //Transformation结束

        //调用Sinnk
        summed.print();

        //启动执行
        env.execute("StreamingWordCount");

    }
}

复制代码

在IntelliJ IDEA执行自己编写的Flink程序报错Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.functions.FlatMapFunction

解决办法:

image.png

image.png

本地运行结果:

image.png

前面的4 2 1相当于是Slots, 在本地电脑表现为CPU的逻辑核

image.png

1.3.2 LambdaWordCount

复制代码

package cn._51doit.flink;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.Arrays;/**
 * Lambda编写WordCount
 */public class LambdaStreamingWordCount {    public static void main(String[] args) throws Exception {        //LocalStreamEnvironment只能在local模式运行,通常用于本地测试
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8);

        DataStreamSource<String> lines = env.socketTextStream("cs-28-86", 8888);        //使用java8的Lambda表达式
        SingleOutputStreamOperator<String> words = lines.flatMap((String line, Collector<String> out) ->
                Arrays.stream(line.split(" ")).forEach(out::collect)).returns(Types.STRING); //使用Lambda表达式,要有return返回信息

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(w -> Tuple2.of(w, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));

        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(0).sum(1);        //打印输出
        summed.print();        //抛出异常
        env.execute();

    }
}

复制代码

1.3.2 scala WordCount

IDEA添加scala插件:

image.png

然后重启IDEA;

添加相应的SDK,然后OK即可:

image.png

—————-

复制代码

package cn._51doit.flinkimport org.apache.flink.streaming.api.scala._

object StreamingWordCount {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val lines: DataStream[String] = env.socketTextStream("cs-28-86", port = 8888)

    val wordAndOne: DataStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1))

    val result = wordAndOne.keyBy(_._1).sum(position = 1)

    result.print()

    env.execute()
  }

}

复制代码

1.3.3 提交集群中运行

主机名和端口改为接收参数的形式:

复制代码

package cn._51doit.flink;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.Arrays;/**
 * Lambda编写WordCount */public class LambdaStreamingWordCount {    public static void main(String[] args) throws Exception {        //LocalStreamEnvironment只能在local模式运行,通常用于本地测试        //LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> lines = env.socketTextStream(args[0], Integer.parseInt(args[1]));        //使用java8的Lambda表达式
        SingleOutputStreamOperator<String> words = lines.flatMap((String line, Collector<String> out) ->
                Arrays.stream(line.split(" ")).forEach(out::collect)).returns(Types.STRING); //使用Lambda表达式,要有return返回信息
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(w -> Tuple2.of(w, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));

        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(0).sum(1);        //打印输出        summed.print();        //抛出异常        env.execute();

    }
}

复制代码

打包程序;

  • 上传jar到web

先启动一个nc:

image.png

Submit:

image.png

执行结果:

image.png

image.png

转自:https://www.cnblogs.com/weiyiming007/p/15848998.html