<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>0.11.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.8</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>2.4.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.8</version>
</dependency>
<!-- spark依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.8</version>
</dependency>
<!-- hivecontext要用这个依赖-->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>2.4.8</version>
</dependency>
java:
public class SparkSteamingKafkaTest {
public static void main(String[] args) throws InterruptedException {
//不打印多余的日志配置
Logger.getLogger("org").setLevel(Level.OFF);
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("streaming word count")
.set("spark.yarn.app.id", "333");
SparkSession ss = SparkSession.builder()
.config(conf)
//.config("hive.metastore.uris", "thrift://hadoop01:9083")
.enableHiveSupport()
.getOrCreate();
//按日期分组 去重userId,计算访客数
Dataset<Row> result =
ss.sql("show databases ");
//输出前20条数据
result.show();
}
}
hive-site.xml:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
# hive元数据服务的地址
<name>hive.metastore.uris</name>
<value>thrift://hadoop01:9083</value>
</property>
<property>
<name>hive.metastore.local</name>
<value>false</value>
<description>controls whether to connect to remove metastore server or open a new metastore server in Hive Client JVM</description>
</property>
</configuration>