摘要:
最近用到IoTDB数据库,经过对一些相关文档的搜集,大概了解到了该数据库的主要应用场景和使用方法,本篇就讲一下如何利用IoTDB并结合SpringBoott和Mybatis进行项目整合。经过一番查找全网都没有一篇完整的关于该数据库采用Mybatis做持久化框架的文章,那么就由我来开辟一下先河。
概述:
IoTDB数据库官网: http://iotdb.apache.org/
Apache IoTDB(物联网数据库)是一个物联网原生数据库,具有高性能的数据管理和分析能力,可部署在边缘和云端。Apache IoTDB以其轻量级的架构、高性能和丰富的特性集以及与Apache Hadoop、Spark和Flink的深度集成,可以满足物联网海量数据存储、高速数据摄取和复杂数据分析的需求工业领域。
特点:
高吞吐量读写
Apache IoTDB 可以支持数百万个低功耗智能联网设备的高速写入访问。它还提供用于检索数据的闪电读取访问。
高效的目录结构 Apache IoTDB
可以通过对时间序列数据复杂目录的模糊搜索策略,有效地组织来自物联网设备的复杂数据结构和大尺寸时间序列数据。
丰富的查询语义 Apache
IoTDB 可以支持跨设备和传感器的时间序列数据的时间对齐、时间序列领域的计算和丰富的时间维度聚合功能。
硬件成本低 Apache
IoTDB 可以达到很高的磁盘存储压缩率(在硬盘上存储 1GB 数据的成本不到 0.23 美元)
灵活部署 Apache IoTDB
可以为用户提供云端一键安装、桌面终端工具以及云平台与本地机器之间的桥梁工具(数据同步工具)。
与开源生态系统的紧密集成
Apache IoTDB 可以支持分析生态系统,例如 Hadoop、Spark、Flink 和 Grafana(可视化工具)。
同时该数据库只支持以下几种数据类型
BOOLEAN、INT32 (int)、INT64 (long)、FLOAT、DOUBLE 、TEXT (String)
具体的使用说明可以参照官网上地址,写的很全:http://iotdb.apache.org/UserGuide/Master/QuickStart/QuickStart.html
实战:
SpringBoot项目相关配置
首先创建一个SpringBoot项目 这里就不过多赘述了,然后pom.xml引入iotdb依赖
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-jdbc</artifactId>
<version>0.12.1</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.9</version>
</dependency>
application.yml文件配置
server:
port: 8080
#iotdb
spring:
datasource:
username: root
password: root
driver-class-name: org.apache.iotdb.jdbc.IoTDBDriver
url: jdbc:iotdb://192.168.80.200:6667/
initial-size: 5
min-idle: 10
max-active: 50
max-wait: 60000
remove-abandoned: true
remove-abandoned-timeout: 30
time-between-eviction-runs-millis: 60000
min-evictable-idle-time-millis: 300000
test-while-idle: false
test-on-borrow: false
test-on-return: false
#mybatis
mybatis:
mapper-locations: classpath*:/mappers/*.xml
IotDbConfig 工具类
这是iotdb数据库的jdbc操作,很多文章中都是采用手写工具类去操作,都没有结合Mybatis,这可能和官网提出的不建议使用该方式有关。我参考了一下别人的JDBC工具类并重构了一下:
package com.iotdb.zjc.demo.util;
import com.alibaba.druid.pool.DruidDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @Author: zjc
* @ClassName IotDBConfig
* @Description TODO IoTDB-JDBC操作工具
* @date 2021/11/24 14:31
* @Version 1.0
*/
@Component
@Configuration
public class IotDbConfig {
private static final Logger log = LoggerFactory.getLogger(IotDbConfig.class);
@Value("${spring.datasource.username}")
private String username;
@Value("${spring.datasource.password}")
private String password;
@Value("${spring.datasource.driver-class-name}")
private String driverName;
@Value("${spring.datasource.url}")
private String url;
@Value("${spring.datasource.initial-size}")
private int initialSize;
@Value("${spring.datasource.min-idle}")
private int minIdle;
@Value("${spring.datasource.max-active}")
private int maxActive;
@Value("${spring.datasource.max-wait}")
private int maxWait;
@Value("${spring.datasource.remove-abandoned}")
private boolean removeAbandoned;
@Value("${spring.datasource.remove-abandoned-timeout}")
private int removeAbandonedTimeout;
@Value("${spring.datasource.time-between-eviction-runs-millis}")
private int timeBetweenEvictionRunsMillis;
@Value("${spring.datasource.min-evictable-idle-time-millis}")
private int minEvictableIdleTimeMillis;
@Value("${spring.datasource.test-while-idle}")
private boolean testWhileIdle;
@Value("${spring.datasource.test-on-borrow}")
private boolean testOnBorrow;
@Value("${spring.datasource.test-on-return}")
private boolean testOnReturn;
private static DruidDataSource iotDbDataSource;
/**
* 使用阿里的druid连接池
*
* @return
*/
public Connection getConnection() {
if (iotDbDataSource == null) {
iotDbDataSource = new DruidDataSource();
//设置连接参数
iotDbDataSource.setUrl(url);
iotDbDataSource.setDriverClassName(driverName);
iotDbDataSource.setUsername(username);
iotDbDataSource.setPassword(password);
//配置初始化大小、最小、最大
iotDbDataSource.setInitialSize(initialSize);
iotDbDataSource.setMinIdle(minIdle);
iotDbDataSource.setMaxActive(maxActive);
//配置获取连接等待超时的时间
iotDbDataSource.setMaxWait(maxWait);
//连接泄漏监测
iotDbDataSource.setRemoveAbandoned(removeAbandoned);
iotDbDataSource.setRemoveAbandonedTimeout(removeAbandonedTimeout);
//配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
iotDbDataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
iotDbDataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
//防止过期
iotDbDataSource.setTestWhileIdle(testWhileIdle);
iotDbDataSource.setTestOnBorrow(testOnBorrow);
iotDbDataSource.setTestOnReturn(testOnReturn);
}
Connection connection = null;
try {
connection = iotDbDataSource.getConnection();
} catch (SQLException e) {
e.printStackTrace();
log.error("iotDB getConnection失败: error={}", e.getMessage());
}
return connection;
}
/**
* 执行单条数据操作
*
* @param sql
*/
public boolean execute(String sql) {
log.info("iotDB execute sql={}", sql);
Connection connection = getConnection();
PreparedStatement preparedStatement = null;
boolean flag = false;
try {
if (connection != null) {
preparedStatement = connection.prepareStatement(sql);
long systemTime = System.currentTimeMillis();
flag = preparedStatement.execute();
log.info("执行IotDb的sql—-[{}],时间:[{}]ms", sql, System.currentTimeMillis() – systemTime);
}
} catch (SQLException e) {
log.error("iotDB insert失败: error={}", e.getMessage());
} finally {
close(preparedStatement, connection);
}
return flag;
}
/**
* 执行批量数据操作
*
* @param sqls
* @return
*/
public Integer executeBatch(List<String> sqls) {
Connection connection = getConnection();
PreparedStatement preparedStatement = null;
int[] flag = null;
try {
if (connection != null) {
for (String sql : sqls) {
log.info("iotDB executeBatch sql={}", sql);
preparedStatement = connection.prepareStatement(sql);
long systemTime = System.currentTimeMillis();
preparedStatement.addBatch(sql);
log.info("执行IotDb的sql—-[{}],时间:[{}]ms", sql, System.currentTimeMillis() – systemTime);
}
flag = preparedStatement.executeBatch();
}
} catch (SQLException e) {
log.error("iotDB 执行失败: error={}", e.getMessage());
} finally {
close(preparedStatement, connection);
}
return flag.length;
}
/**
* 查询操作
*
* @param sql
* @return
*/
public List<Map<String, Object>> executeQuery(String sql) {
log.info("iotDB executeQuery sql={}", sql);
Connection connection = getConnection();
PreparedStatement preparedStatement = null;
List<Map<String, Object>> resultList = null;
ResultSet resultSet = null;
try {
if (connection != null) {
preparedStatement = connection.prepareStatement(sql);
long systemTime = System.currentTimeMillis();
resultSet = preparedStatement.executeQuery();
log.info("查询IotDb的sql—-[{}],时间:[{}]ms", sql, System.currentTimeMillis() – systemTime);
resultList = outputResult(resultSet);
}
} catch (SQLException e) {
e.printStackTrace();
log.error("iotDB query失败: error={}", e.getMessage());
} finally {
try {
if (resultSet != null) {
resultSet.close();
}
} catch (SQLException e) {
log.error("iotDB resultSet关闭异常: error={}", e.getMessage());
}
close(preparedStatement, connection);
}
return resultList;
}
/**
* 输出结果集
*
* @param resultSet
* @return
* @throws SQLException
*/
private List<Map<String, Object>> outputResult(ResultSet resultSet) throws SQLException {
List<Map<String, Object>> resultList = new ArrayList<>();
if (resultSet != null) {
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
Map resultMap = new HashMap<>(16);
for (int i = 1; i <= columnCount; i++) {
String colunmName = metaData.getColumnLabel(i);
if (colunmName.indexOf('.') >= 0) {
colunmName = colunmName.substring(colunmName.lastIndexOf('.') + 1);
}
//过滤 函数()括号
if (colunmName.indexOf(')') >= 0) {
colunmName = colunmName.substring(0, colunmName.lastIndexOf(')'));
}
//时序库自带的时间格式转换
if (colunmName.equals("Time")) {
Long timeStamp = Long.parseLong(resultSet.getString(i));
if (timeStamp > 0) {
Date d = new Date(timeStamp);
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
resultMap.put("Time", sf.format(d));
}
} else {
resultMap.put(colunmName, resultSet.getString(i));
}
}
resultList.add(resultMap);
}
}
return resultList;
}
/**
* 关闭连接
*
* @param statement
* @param connection
*/
private void close(Statement statement, Connection connection) {
try {
if (statement != null) {
statement.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
log.error("iotDB close失败: error={}", e.getMessage());
e.printStackTrace();
}
}
}
整合mybatis
创建controller、service、mapper文件
controller:
package com.iotdb.zjc.demo.controller;
import com.iotdb.zjc.demo.service.OrderService;
import com.iotdb.zjc.demo.constant.MyResponse;
import com.iotdb.zjc.demo.data.OrderInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.Date;
import java.util.List;
/**
* @Author: zjc
* @ClassName OrderController
* @Description TODO
* @date 2021/11/23 17:16
* @Version 1.0
*/
@RestController
public class OrderController {
private static final Logger log = LoggerFactory.getLogger(OrderController.class);
@Autowired
OrderService orderService;
@RequestMapping(value = "/createOrder", method = RequestMethod.POST)
// @Transactional(rollbackFor = Exception.class)
public MyResponse createOrder(@RequestBody OrderInfo orderInfo) {
try {
orderInfo.setTimestamp(System.currentTimeMillis());
orderInfo.setCreateTime(new Date());
Integer v = orderService.createOrder(orderInfo);
if (v == -1) {
return MyResponse.ok(v);
} else {
return MyResponse.checkForbidden(false);
}
} catch (Exception e) {
log.info("创建订单失败!" + e);
return new MyResponse(false);
}
}
/**
* 更新操作 其实也是插入操作 时间戳相同 只和时间戳相关
* @param orderInfo
* @return
*/
@RequestMapping(value = "/updateOrder", method = RequestMethod.POST)
public MyResponse updateOrder(@RequestBody OrderInfo orderInfo) {
try {
orderInfo.setTimestamp(System.currentTimeMillis());
orderInfo.setCreateTime(new Date());
Integer v = orderService.updateOrder(orderInfo);
if (v == -1) {
return MyResponse.ok(v);
} else {
return MyResponse.checkForbidden(false);
}
} catch (Exception e) {
log.info("更新订单失败!" + e);
return new MyResponse(false);
}
}
/**
* 删除操作 要将时间戳的加号变成%2B
* @param timestamp
* @return
*/
@RequestMapping(value = "/deleteOrder", method = RequestMethod.GET)
public MyResponse deleteOrder(String timestamp) {
try {
Integer v = orderService.deleteOrder(timestamp);
if (v == -1) {
return MyResponse.ok(v);
} else {
return MyResponse.checkForbidden(false);
}
} catch (Exception e) {
log.info("删除订单失败!" + e);
return new MyResponse(false);
}
}
/**
* 创建组 也就是相当于数据库
* @return
*/
@RequestMapping(value = "/createOrderGroup", method = RequestMethod.POST)
public MyResponse createOrderGroup() {
try {
Integer v = orderService.createOrderGroup();
if (v > 0) {
return new MyResponse(v);
} else {
return new MyResponse(false);
}
} catch (Exception e) {
log.info("创建订单组失败!" + e);
return new MyResponse(false);
}
}
/**
* 查询所有订单数据
* @return
*/
@RequestMapping(value = "/queryOrder", method = RequestMethod.GET)
public MyResponse queryOrder() {
try {
List<OrderInfo> v = orderService.queryOrder();
if (v.size() > 0) {
v.forEach(x -> {
System.out.println(x.toString());
});
return MyResponse.ok(v);
} else {
return new MyResponse(false);
}
} catch (Exception e) {
log.info("查询订单组失败!" + e);
return new MyResponse(false);
}
}
/**
* 查看数据库有多少组
*
* @return
*/
@RequestMapping(value = "/selectStorageGroup", method = RequestMethod.GET)
public MyResponse selectStorageGroup() {
try {
List<String> v = orderService.selectStorageGroup();
if (v.size() > 0) {
v.forEach(x -> {
System.out.println("group——————" + x.toString());
});
return MyResponse.ok(v);
} else {
return new MyResponse(false);
}
} catch (Exception e) {
log.info("查询组失败!" + e);
return new MyResponse(false);
}
}
}
serviceImpl文件
package com.iotdb.zjc.demo.serviceimpl;
import com.iotdb.zjc.demo.service.OrderService;
import com.iotdb.zjc.demo.data.OrderInfo;
import com.iotdb.zjc.demo.mapper.OrderMapper;
import com.iotdb.zjc.demo.util.IotDbConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @Author: zjc
* @ClassName OrderServiceImpl
* @Description TODO
* @date 2021/11/23 17:32
* @Version 1.0
*/
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
IotDbConfig iotDbConfig;
@Autowired
OrderMapper orderMapper;
@Override
public Integer createOrder(OrderInfo orderInfo) {
return orderMapper.createOrder(orderInfo);
}
@Override
public Integer updateOrder(OrderInfo orderInfo) {
return orderMapper.updateOrder(orderInfo);
}
@Override
public Integer createOrderGroup() {
try {
// List<String> statements = new ArrayList<>();
// statements.add("set storage group to root.order");
// statements.add("CREATE TIMESERIES root.order.orderdetail.order_id WITH DATATYPE=INT64, ENCODING=PLAIN, COMPRESSOR=SNAPPY");
// statements.add("CREATE TIMESERIES root.order.orderdetail.order_num WITH DATATYPE=INT32, ENCODING=PLAIN, COMPRESSOR=SNAPPY");
// statements.add("CREATE TIMESERIES root.order.orderdetail.order_name WITH DATATYPE=TEXT, ENCODING=PLAIN, COMPRESSOR=SNAPPY");
// statements.add("CREATE TIMESERIES root.order.orderdetail.create_time WITH DATATYPE=TEXT, ENCODING=PLAIN, COMPRESSOR=SNAPPY");
// Integer flag = iotDbConfig.executeBatch(statements);
Integer flag = orderMapper.createOrderGroup();
Integer flagEle = orderMapper.createOrderGroupElement();
System.out.println("\n\t执行sql数量为{}" + flagEle);
return flagEle;
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
@Override
public List<OrderInfo> queryOrder() {
return orderMapper.queryOrder();
}
@Override
public List<String> selectStorageGroup() {
return orderMapper.selectStorageGroup();
}
@Override
public Integer deleteOrder(String timestamp) {
return orderMapper.deleteOrder(timestamp);
}
}
mapper文件
package com.iotdb.zjc.demo.mapper;
import com.iotdb.zjc.demo.data.OrderInfo;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* @Author: zjc
* @ClassName OrderMapper
* @Description TODO
* @date 2021/11/24 10:09
* @Version 1.0
*/
public interface OrderMapper {
Integer createOrder(OrderInfo orderInfo);
List<OrderInfo> queryOrder();
Integer createOrderGroup();
Integer createOrderGroupElement();
List<String> selectStorageGroup();
Integer updateOrder(OrderInfo orderInfo);
Integer deleteOrder(@Param("timestamp") String timestamp);
}
mapper.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.iotdb.zjc.demo.mapper.OrderMapper">
<resultMap id="BaseResultMap" type="com.iotdb.zjc.demo.data.OrderInfo">
<result column="root.order2.orderdetail.order_id" property="orderId"/>
<result column="root.order2.orderdetail.order_num" property="orderNum"/>
<result column="root.order2.orderdetail.order_name" property="orderName"/>
<result column="root.order2.orderdetail.create_time" property="createTime"/>
</resultMap>
<insert id="createOrder" parameterType="com.iotdb.zjc.demo.data.OrderInfo">
insert into root.order2.orderdetail(timestamp, order_id, order_num, order_name,create_time) values(#{timestamp},#{orderId},#{orderNum},#{orderName},#{createTime});
</insert>
<select id="queryOrder" resultMap="BaseResultMap">
select * from root.order2.orderdetail limit 3 offset 0
</select>
<select id="selectStorageGroup" resultType="java.lang.String">
show storage group limit 3 offset 0
</select>
<delete id="deleteOrder" parameterType="java.lang.String">
delete from root.order2.orderdetail where timestamp = ${timestamp};
</delete>
<insert id="updateOrder">
insert into root.order2.orderdetail(timestamp, order_id, order_num, order_name,create_time) values(2021-11-24T18:28:20.689+08:00,#{orderId},#{orderNum},#{orderName},#{createTime});
</insert>
<update id="createOrderGroup">
SET STORAGE GROUP TO root.order2
</update>
<update id="createOrderGroupElement">
— CREATE TIMESERIES root.order2.orderdetail.create_time WITH DATATYPE=TEXT, ENCODING=PLAIN, COMPRESSOR=SNAPPY;
CREATE TIMESERIES root.order2.orderdetail.order_num WITH DATATYPE=INT32, ENCODING=PLAIN, COMPRESSOR=SNAPPY;
</update>
</mapper>
注意几点:
1、一定要注意映射关系要写成这样root.order2.orderdetail.order_id 组名的形式,不然映射不到对应实体的字段。
2、添加时要增加时间戳,时间戳就是主键,删除也是删除对应的时间戳。 3、修改操作就是新增操作,只是对应的时间戳不变,其他的都可以变。
4、创建组和创建时间序列,当我们创建一个时间序列时,我们应该定义它的数据类型和编码方案
常用的sql操作地址可以参考一下这个链接:https://blog.csdn.net/zjy660358/article/details/111315567
IoTDB安装操作请查看如下链接:
————————————————
版权声明:本文为CSDN博主「阳光非宅猿」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_42103983/article/details/121580689