1 Flink编程入门
1.1 初始化Flink项目模板
1.1.1 准备工作
要求安装Maven 3.0.4 及以上版本和JDK 8
1.1.2 使用maven命令创建java项目模板
-
执行maven命令,如果maven本地仓库没有依赖的jar,需要有网络
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.12.0 -DgroupId=cn._51doit.flink -DartifactId=flink-java -Dversion=1.0 -Dpackage=cn._51doit.flink -DinteractiveMode=false
命令执行完成后,项目就出现在了 D:\code\java_code
-
或者在命令行中执行下面的命令,需要有网络
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.12.0
1.1.3 使用maven命令创建scala项目模板
-
执行maven命令,如果maven本地仓库没有依赖的jar,需要有网络
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=1.12.0 -DgroupId=cn._51doit.flink -DartifactId=flink-scala -Dversion=1.0 -Dpackage=cn._51doit.flink -DinteractiveMode=false
-
或者在命令行中执行下面的命令
curl https://flink.apache.org/q/quickstart-scala.sh | bash -s 1.12.0
1.1.4 将maven项目导入到IDEA或Eclipse
1.2 DataFlow编程模型
Flink提供了不同级别的编程抽象,通过调用抽象的数据集调用算子构建DataFlow就可以实现对分布式的数据进行流式计算和离线计算,
DataSet是批处理的抽象数据集,DataStream是流式计算的抽象数据集,他们的方法都分别为Source、Transformation、Sink;
-
Source主要负责数据的读取
-
Transformation主要负责对数据的转换操作
-
Sink负责最终计算好的结果数据输出。
1.3 Flink第一个入门程序
1.3.1 实时WordCount
从一个Socket端口中实时的读取数据,然后实时统计相同单词出现的次数,该程序会一直运行,启动程序前先使用 nc -lk 8888 启动一个socket用来发送数据
package cn._51doit.flink;
package cn._51doit.flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * 从指定的socket读取数据,对单词进行计算 * */ public class StreamingWordCount { public static void main(String[] args) throws Exception { //创建Flink流式计算的执行环境(ExecutionEnvironment) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //创建DataStream //Source DataStream<String> lines = env.socketTextStream("cs-28-86",8888); //调用Transformation开始 //调用Transformation SingleOutputStreamOperator<String> wordsDataStream = lines.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String line, Collector<String> collector) throws Exception { String[] words = line.split(" "); for (String word : words) { collector.collect(word); } } }); //将单词和1组合 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = wordsDataStream.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String word) throws Exception { return Tuple2.of(word, 1); } }); //分组 KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> tp) throws Exception { return tp.f0; } }); //聚合 SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1); //Transformation结束 //调用Sinnk summed.print(); //启动执行 env.execute("StreamingWordCount"); } }
在IntelliJ IDEA执行自己编写的Flink程序报错Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.functions.FlatMapFunction
解决办法:
本地运行结果:
前面的4 2 1相当于是Slots, 在本地电脑表现为CPU的逻辑核
1.3.2 LambdaWordCount
package cn._51doit.flink;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.Arrays;/** * Lambda编写WordCount */public class LambdaStreamingWordCount { public static void main(String[] args) throws Exception { //LocalStreamEnvironment只能在local模式运行,通常用于本地测试 LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8); DataStreamSource<String> lines = env.socketTextStream("cs-28-86", 8888); //使用java8的Lambda表达式 SingleOutputStreamOperator<String> words = lines.flatMap((String line, Collector<String> out) -> Arrays.stream(line.split(" ")).forEach(out::collect)).returns(Types.STRING); //使用Lambda表达式,要有return返回信息 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(w -> Tuple2.of(w, 1)).returns(Types.TUPLE(Types.STRING, Types.INT)); SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(0).sum(1); //打印输出 summed.print(); //抛出异常 env.execute(); } }
1.3.2 scala WordCount
IDEA添加scala插件:
然后重启IDEA;
添加相应的SDK,然后OK即可:
—————-
package cn._51doit.flinkimport org.apache.flink.streaming.api.scala._ object StreamingWordCount { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val lines: DataStream[String] = env.socketTextStream("cs-28-86", port = 8888) val wordAndOne: DataStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)) val result = wordAndOne.keyBy(_._1).sum(position = 1) result.print() env.execute() } }
1.3.3 提交集群中运行
主机名和端口改为接收参数的形式:
package cn._51doit.flink;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.Arrays;/** * Lambda编写WordCount */public class LambdaStreamingWordCount { public static void main(String[] args) throws Exception { //LocalStreamEnvironment只能在local模式运行,通常用于本地测试 //LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream(args[0], Integer.parseInt(args[1])); //使用java8的Lambda表达式 SingleOutputStreamOperator<String> words = lines.flatMap((String line, Collector<String> out) -> Arrays.stream(line.split(" ")).forEach(out::collect)).returns(Types.STRING); //使用Lambda表达式,要有return返回信息 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(w -> Tuple2.of(w, 1)).returns(Types.TUPLE(Types.STRING, Types.INT)); SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(0).sum(1); //打印输出 summed.print(); //抛出异常 env.execute(); } }
打包程序;
-
上传jar到web
先启动一个nc:
Submit:
执行结果:
转自:https://www.cnblogs.com/weiyiming007/p/15848998.html