<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>0.11.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.8</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>2.4.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.8</version>
</dependency>
<!-- spark依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.8</version>
</dependency>
<!-- hivecontext要用这个依赖-->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>2.4.8</version>
</dependency>
java:
public class SparkSteamingKafkaTest {
public static void main(String[] args) throws InterruptedException {
//不打印多余的日志配置
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF);
String brokers = "hadoop03:9092";
String topics = "topic1";
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("streaming word count")
.set("spark.yarn.app.id", "333");
SparkSession ss = SparkSession.builder()
.config(conf)
.config("hive.metastore.uris", "thrift://hadoop01:9083")
.enableHiveSupport()
.getOrCreate();
JavaStreamingContext ssc = new JavaStreamingContext(JavaSparkContext.fromSparkContext(ss.sparkContext()), Seconds.apply(30));//5秒执行一次
Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
//kafka相关参数,必要!缺了会报错
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("group.id", "group1");
kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//Topic分区 也可以通过配置项实现
//如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
//earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
// Map offsets = new HashMap<>();
// offsets.put(new TopicPartition("topic1", 0), 2L);
//通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定
JavaInputDStream lines = KafkaUtils.createDirectStream(
ssc,
PreferConsistent(),
//Subscribe(topicsSet, kafkaParams, offsets)
Subscribe(topicsSet, kafkaParams)
);
//将 DStream 转换成 DataFrame 并且运行sql查询
lines.foreachRDD(new VoidFunction2<JavaRDD<ConsumerRecord>, Time>() {
@Override
public void call(JavaRDD<ConsumerRecord> rdd, Time time) {
//SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
//通过反射将RDD转换为DataFrame
JavaRDD<UserAccessLog> rowRDD = rdd.map(new Function<ConsumerRecord, UserAccessLog>() {
@Override
public UserAccessLog call(ConsumerRecord line) {
UserAccessLog userLog = new UserAccessLog();
String[] cols = line.value().toString().split(" ");
userLog.setUserId(cols[0]);
userLog.setDate(cols[1]);
return userLog;
}
});
Dataset<Row> dataFrame = ss.createDataFrame(rowRDD, UserAccessLog.class);
// 创建临时表
dataFrame.createOrReplaceTempView("log");
ss.conf().set("spark.sql.crossJoin.enabled", "true");
//按日期分组 去重userId,计算访客数
Dataset<Row> result =
ss.sql("select date, count(distinct userId) as uv from log group by date");
System.out.println("========= " + time + "=========");
//输出前20条数据
result.show();
}
});
//开始流式计算
ssc.start();
// 等待计算终止
ssc.awaitTermination();
ssc.stop(true);
}
}
hive-site.xml:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
# hive元数据服务的地址
<name>hive.metastore.uris</name>
<value>thrift://hadoop01:9083</value>
</property>
<property>
<name>hive.metastore.local</name>
<value>false</value>
<description>controls whether to connect to remove metastore server or open a new metastore server in Hive Client JVM</description>
</property>
</configuration>