1 Flink编程入门
1.1 初始化Flink项目模板
1.1.1 准备工作
要求安装Maven 3.0.4 及以上版本和JDK 8
1.1.2 使用maven命令创建java项目模板
-
执行maven命令,如果maven本地仓库没有依赖的jar,需要有网络
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.12.0 -DgroupId=cn._51doit.flink -DartifactId=flink-java -Dversion=1.0 -Dpackage=cn._51doit.flink -DinteractiveMode=false

命令执行完成后,项目就出现在了 D:\code\java_code
-
或者在命令行中执行下面的命令,需要有网络
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.12.0
1.1.3 使用maven命令创建scala项目模板
-
执行maven命令,如果maven本地仓库没有依赖的jar,需要有网络
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=1.12.0 -DgroupId=cn._51doit.flink -DartifactId=flink-scala -Dversion=1.0 -Dpackage=cn._51doit.flink -DinteractiveMode=false

-
或者在命令行中执行下面的命令
curl https://flink.apache.org/q/quickstart-scala.sh | bash -s 1.12.0
1.1.4 将maven项目导入到IDEA或Eclipse


1.2 DataFlow编程模型

Flink提供了不同级别的编程抽象,通过调用抽象的数据集调用算子构建DataFlow就可以实现对分布式的数据进行流式计算和离线计算,
DataSet是批处理的抽象数据集,DataStream是流式计算的抽象数据集,他们的方法都分别为Source、Transformation、Sink;
-
Source主要负责数据的读取
-
Transformation主要负责对数据的转换操作
-
Sink负责最终计算好的结果数据输出。
1.3 Flink第一个入门程序
1.3.1 实时WordCount
从一个Socket端口中实时的读取数据,然后实时统计相同单词出现的次数,该程序会一直运行,启动程序前先使用 nc -lk 8888 启动一个socket用来发送数据
package cn._51doit.flink;
package cn._51doit.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* 从指定的socket读取数据,对单词进行计算
*
*/
public class StreamingWordCount {
public static void main(String[] args) throws Exception {
//创建Flink流式计算的执行环境(ExecutionEnvironment)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//创建DataStream
//Source
DataStream<String> lines = env.socketTextStream("cs-28-86",8888);
//调用Transformation开始
//调用Transformation
SingleOutputStreamOperator<String> wordsDataStream = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> collector) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
collector.collect(word);
}
}
});
//将单词和1组合
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = wordsDataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
//分组
KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tp) throws Exception {
return tp.f0;
}
});
//聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);
//Transformation结束
//调用Sinnk
summed.print();
//启动执行
env.execute("StreamingWordCount");
}
}
在IntelliJ IDEA执行自己编写的Flink程序报错Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.functions.FlatMapFunction
解决办法:


本地运行结果:

前面的4 2 1相当于是Slots, 在本地电脑表现为CPU的逻辑核

1.3.2 LambdaWordCount
package cn._51doit.flink;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.Arrays;/**
* Lambda编写WordCount
*/public class LambdaStreamingWordCount { public static void main(String[] args) throws Exception { //LocalStreamEnvironment只能在local模式运行,通常用于本地测试
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8);
DataStreamSource<String> lines = env.socketTextStream("cs-28-86", 8888); //使用java8的Lambda表达式
SingleOutputStreamOperator<String> words = lines.flatMap((String line, Collector<String> out) ->
Arrays.stream(line.split(" ")).forEach(out::collect)).returns(Types.STRING); //使用Lambda表达式,要有return返回信息
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(w -> Tuple2.of(w, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(0).sum(1); //打印输出
summed.print(); //抛出异常
env.execute();
}
}
1.3.2 scala WordCount
IDEA添加scala插件:

然后重启IDEA;
添加相应的SDK,然后OK即可:

—————-
package cn._51doit.flinkimport org.apache.flink.streaming.api.scala._
object StreamingWordCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val lines: DataStream[String] = env.socketTextStream("cs-28-86", port = 8888)
val wordAndOne: DataStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1))
val result = wordAndOne.keyBy(_._1).sum(position = 1)
result.print()
env.execute()
}
}
1.3.3 提交集群中运行
主机名和端口改为接收参数的形式:
package cn._51doit.flink;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.Arrays;/**
* Lambda编写WordCount */public class LambdaStreamingWordCount { public static void main(String[] args) throws Exception { //LocalStreamEnvironment只能在local模式运行,通常用于本地测试 //LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream(args[0], Integer.parseInt(args[1])); //使用java8的Lambda表达式
SingleOutputStreamOperator<String> words = lines.flatMap((String line, Collector<String> out) ->
Arrays.stream(line.split(" ")).forEach(out::collect)).returns(Types.STRING); //使用Lambda表达式,要有return返回信息
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(w -> Tuple2.of(w, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(0).sum(1); //打印输出 summed.print(); //抛出异常 env.execute();
}
}
打包程序;
-
上传jar到web
先启动一个nc:

Submit:

执行结果:


转自:https://www.cnblogs.com/weiyiming007/p/15848998.html
