摘要:

最近用到IoTDB数据库,经过对一些相关文档的搜集,大概了解到了该数据库的主要应用场景和使用方法,本篇就讲一下如何利用IoTDB并结合SpringBoott和Mybatis进行项目整合。经过一番查找全网都没有一篇完整的关于该数据库采用Mybatis做持久化框架的文章,那么就由我来开辟一下先河。

概述:

IoTDB数据库官网: http://iotdb.apache.org/

Apache IoTDB(物联网数据库)是一个物联网原生数据库,具有高性能的数据管理和分析能力,可部署在边缘和云端。Apache IoTDB以其轻量级的架构、高性能和丰富的特性集以及与Apache Hadoop、Spark和Flink的深度集成,可以满足物联网海量数据存储、高速数据摄取和复杂数据分析的需求工业领域。

特点:

高吞吐量读写

Apache IoTDB 可以支持数百万个低功耗智能联网设备的高速写入访问。它还提供用于检索数据的闪电读取访问。

高效的目录结构 Apache IoTDB

可以通过对时间序列数据复杂目录的模糊搜索策略,有效地组织来自物联网设备的复杂数据结构和大尺寸时间序列数据。

丰富的查询语义 Apache

IoTDB 可以支持跨设备和传感器的时间序列数据的时间对齐、时间序列领域的计算和丰富的时间维度聚合功能。

硬件成本低 Apache

IoTDB 可以达到很高的磁盘存储压缩率(在硬盘上存储 1GB 数据的成本不到 0.23 美元)

灵活部署 Apache IoTDB

可以为用户提供云端一键安装、桌面终端工具以及云平台与本地机器之间的桥梁工具(数据同步工具)。

与开源生态系统的紧密集成

Apache IoTDB 可以支持分析生态系统,例如 Hadoop、Spark、Flink 和 Grafana(可视化工具)。

同时该数据库只支持以下几种数据类型

BOOLEAN、INT32 (int)、INT64 (long)、FLOAT、DOUBLE 、TEXT (String)

具体的使用说明可以参照官网上地址,写的很全:http://iotdb.apache.org/UserGuide/Master/QuickStart/QuickStart.html

实战:

SpringBoot项目相关配置

首先创建一个SpringBoot项目 这里就不过多赘述了,然后pom.xml引入iotdb依赖

<dependency>

    <groupId>org.apache.iotdb</groupId>

    <artifactId>iotdb-jdbc</artifactId>

    <version>0.12.1</version>

</dependency>

<dependency>

    <groupId>org.mybatis.spring.boot</groupId>

    <artifactId>mybatis-spring-boot-starter</artifactId>

    <version>2.2.0</version>

</dependency>

<dependency>

    <groupId>com.alibaba</groupId>

    <artifactId>druid-spring-boot-starter</artifactId>

    <version>1.1.9</version>

</dependency>

 

application.yml文件配置

server:

  port: 8080

#iotdb

spring:

  datasource:

    username: root

    password: root

    driver-class-name: org.apache.iotdb.jdbc.IoTDBDriver

    url: jdbc:iotdb://192.168.80.200:6667/

    initial-size: 5

    min-idle: 10

    max-active: 50

    max-wait: 60000

    remove-abandoned: true

    remove-abandoned-timeout: 30

    time-between-eviction-runs-millis: 60000

    min-evictable-idle-time-millis: 300000

    test-while-idle: false

    test-on-borrow: false

    test-on-return: false

    #mybatis

mybatis:

  mapper-locations: classpath*:/mappers/*.xml

 

IotDbConfig 工具类

这是iotdb数据库的jdbc操作,很多文章中都是采用手写工具类去操作,都没有结合Mybatis,这可能和官网提出的不建议使用该方式有关。我参考了一下别人的JDBC工具类并重构了一下:

package com.iotdb.zjc.demo.util;

import com.alibaba.druid.pool.DruidDataSource;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Configuration;

import org.springframework.stereotype.Component;

import java.sql.*;

import java.text.SimpleDateFormat;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

/**

 * @Author: zjc

 * @ClassName IotDBConfig

 * @Description TODO IoTDB-JDBC操作工具

 * @date 2021/11/24 14:31

 * @Version 1.0

 */

@Component

@Configuration

public class IotDbConfig {

    private static final Logger log = LoggerFactory.getLogger(IotDbConfig.class);

    @Value("${spring.datasource.username}")

    private String username;

    @Value("${spring.datasource.password}")

    private String password;

    @Value("${spring.datasource.driver-class-name}")

    private String driverName;

    @Value("${spring.datasource.url}")

    private String url;

    @Value("${spring.datasource.initial-size}")

    private int initialSize;

    @Value("${spring.datasource.min-idle}")

    private int minIdle;

    @Value("${spring.datasource.max-active}")

    private int maxActive;

    @Value("${spring.datasource.max-wait}")

    private int maxWait;

    @Value("${spring.datasource.remove-abandoned}")

    private boolean removeAbandoned;

    @Value("${spring.datasource.remove-abandoned-timeout}")

    private int removeAbandonedTimeout;

    @Value("${spring.datasource.time-between-eviction-runs-millis}")

    private int timeBetweenEvictionRunsMillis;

    @Value("${spring.datasource.min-evictable-idle-time-millis}")

    private int minEvictableIdleTimeMillis;

    @Value("${spring.datasource.test-while-idle}")

    private boolean testWhileIdle;

    @Value("${spring.datasource.test-on-borrow}")

    private boolean testOnBorrow;

    @Value("${spring.datasource.test-on-return}")

    private boolean testOnReturn;

    private static DruidDataSource iotDbDataSource;

    /**

     * 使用阿里的druid连接池

     *

     * @return

     */

    public Connection getConnection() {

        if (iotDbDataSource == null) {

            iotDbDataSource = new DruidDataSource();

            //设置连接参数

            iotDbDataSource.setUrl(url);

            iotDbDataSource.setDriverClassName(driverName);

            iotDbDataSource.setUsername(username);

            iotDbDataSource.setPassword(password);

            //配置初始化大小、最小、最大

            iotDbDataSource.setInitialSize(initialSize);

            iotDbDataSource.setMinIdle(minIdle);

            iotDbDataSource.setMaxActive(maxActive);

            //配置获取连接等待超时的时间

            iotDbDataSource.setMaxWait(maxWait);

            //连接泄漏监测

            iotDbDataSource.setRemoveAbandoned(removeAbandoned);

            iotDbDataSource.setRemoveAbandonedTimeout(removeAbandonedTimeout);

            //配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒

            iotDbDataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);

            iotDbDataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);

            //防止过期

            iotDbDataSource.setTestWhileIdle(testWhileIdle);

            iotDbDataSource.setTestOnBorrow(testOnBorrow);

            iotDbDataSource.setTestOnReturn(testOnReturn);

        }

        Connection connection = null;

        try {

            connection = iotDbDataSource.getConnection();

        } catch (SQLException e) {

            e.printStackTrace();

            log.error("iotDB getConnection失败: error={}", e.getMessage());

        }

        return connection;

    }

    /**

     * 执行单条数据操作

     *

     * @param sql

     */

    public boolean execute(String sql) {

        log.info("iotDB execute sql={}", sql);

        Connection connection = getConnection();

        PreparedStatement preparedStatement = null;

        boolean flag = false;

        try {

            if (connection != null) {

                preparedStatement = connection.prepareStatement(sql);

                long systemTime = System.currentTimeMillis();

                flag = preparedStatement.execute();

                log.info("执行IotDb的sql—-[{}],时间:[{}]ms", sql, System.currentTimeMillis() – systemTime);

            }

        } catch (SQLException e) {

            log.error("iotDB insert失败: error={}", e.getMessage());

        } finally {

            close(preparedStatement, connection);

        }

        return flag;

    }

    /**

     * 执行批量数据操作

     *

     * @param sqls

     * @return

     */

    public Integer executeBatch(List<String> sqls) {

        Connection connection = getConnection();

        PreparedStatement preparedStatement = null;

        int[] flag = null;

        try {

            if (connection != null) {

                for (String sql : sqls) {

                    log.info("iotDB executeBatch sql={}", sql);

                    preparedStatement = connection.prepareStatement(sql);

                    long systemTime = System.currentTimeMillis();

                    preparedStatement.addBatch(sql);

                    log.info("执行IotDb的sql—-[{}],时间:[{}]ms", sql, System.currentTimeMillis() – systemTime);

                }

                flag = preparedStatement.executeBatch();

            }

        } catch (SQLException e) {

            log.error("iotDB 执行失败: error={}", e.getMessage());

        } finally {

            close(preparedStatement, connection);

        }

        return flag.length;

    }

    /**

     * 查询操作

     *

     * @param sql

     * @return

     */

    public List<Map<String, Object>> executeQuery(String sql) {

        log.info("iotDB executeQuery sql={}", sql);

        Connection connection = getConnection();

        PreparedStatement preparedStatement = null;

        List<Map<String, Object>> resultList = null;

        ResultSet resultSet = null;

        try {

            if (connection != null) {

                preparedStatement = connection.prepareStatement(sql);

                long systemTime = System.currentTimeMillis();

                resultSet = preparedStatement.executeQuery();

                log.info("查询IotDb的sql—-[{}],时间:[{}]ms", sql, System.currentTimeMillis() – systemTime);

                resultList = outputResult(resultSet);

            }

        } catch (SQLException e) {

            e.printStackTrace();

            log.error("iotDB query失败: error={}", e.getMessage());

        } finally {

            try {

                if (resultSet != null) {

                    resultSet.close();

                }

            } catch (SQLException e) {

                log.error("iotDB resultSet关闭异常: error={}", e.getMessage());

            }

            close(preparedStatement, connection);

        }

        return resultList;

    }

    /**

     * 输出结果集

     *

     * @param resultSet

     * @return

     * @throws SQLException

     */

    private List<Map<String, Object>> outputResult(ResultSet resultSet) throws SQLException {

        List<Map<String, Object>> resultList = new ArrayList<>();

        if (resultSet != null) {

            ResultSetMetaData metaData = resultSet.getMetaData();

            int columnCount = metaData.getColumnCount();

            while (resultSet.next()) {

                Map resultMap = new HashMap<>(16);

                for (int i = 1; i <= columnCount; i++) {

                    String colunmName = metaData.getColumnLabel(i);

                    if (colunmName.indexOf('.') >= 0) {

                        colunmName = colunmName.substring(colunmName.lastIndexOf('.') + 1);

                    }

                    //过滤 函数()括号

                    if (colunmName.indexOf(')') >= 0) {

                        colunmName = colunmName.substring(0, colunmName.lastIndexOf(')'));

                    }

                    //时序库自带的时间格式转换

                    if (colunmName.equals("Time")) {

                        Long timeStamp = Long.parseLong(resultSet.getString(i));

                        if (timeStamp > 0) {

                            Date d = new Date(timeStamp);

                            SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

                            resultMap.put("Time", sf.format(d));

                        }

                    } else {

                        resultMap.put(colunmName, resultSet.getString(i));

                    }

                }

                resultList.add(resultMap);

            }

        }

        return resultList;

    }

    /**

     * 关闭连接

     *

     * @param statement

     * @param connection

     */

    private void close(Statement statement, Connection connection) {

        try {

            if (statement != null) {

                statement.close();

            }

            if (connection != null) {

                connection.close();

            }

        } catch (Exception e) {

            log.error("iotDB close失败: error={}", e.getMessage());

            e.printStackTrace();

        }

    }

}

 

整合mybatis

创建controller、service、mapper文件

controller:

package com.iotdb.zjc.demo.controller;

import com.iotdb.zjc.demo.service.OrderService;

import com.iotdb.zjc.demo.constant.MyResponse;

import com.iotdb.zjc.demo.data.OrderInfo;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.*;

import java.util.Date;

import java.util.List;

/**

 * @Author: zjc

 * @ClassName OrderController

 * @Description TODO

 * @date 2021/11/23 17:16

 * @Version 1.0

 */

@RestController

public class OrderController {

    private static final Logger log = LoggerFactory.getLogger(OrderController.class);

    @Autowired

    OrderService orderService;

    @RequestMapping(value = "/createOrder", method = RequestMethod.POST)

//    @Transactional(rollbackFor = Exception.class)

    public MyResponse createOrder(@RequestBody OrderInfo orderInfo) {

        try {

            orderInfo.setTimestamp(System.currentTimeMillis());

            orderInfo.setCreateTime(new Date());

            Integer v = orderService.createOrder(orderInfo);

            if (v == -1) {

                return MyResponse.ok(v);

            } else {

                return MyResponse.checkForbidden(false);

            }

        } catch (Exception e) {

            log.info("创建订单失败!" + e);

            return new MyResponse(false);

        }

    }

    /**

     * 更新操作 其实也是插入操作 时间戳相同 只和时间戳相关

     * @param orderInfo

     * @return

     */

    @RequestMapping(value = "/updateOrder", method = RequestMethod.POST)

    public MyResponse updateOrder(@RequestBody OrderInfo orderInfo) {

        try {

            orderInfo.setTimestamp(System.currentTimeMillis());

            orderInfo.setCreateTime(new Date());

            Integer v = orderService.updateOrder(orderInfo);

            if (v == -1) {

                return MyResponse.ok(v);

            } else {

                return MyResponse.checkForbidden(false);

            }

        } catch (Exception e) {

            log.info("更新订单失败!" + e);

            return new MyResponse(false);

        }

    }

    /**

     * 删除操作  要将时间戳的加号变成%2B

     * @param timestamp

     * @return

     */

    @RequestMapping(value = "/deleteOrder", method = RequestMethod.GET)

    public MyResponse deleteOrder(String timestamp) {

        try {

            Integer v = orderService.deleteOrder(timestamp);

            if (v == -1) {

                return MyResponse.ok(v);

            } else {

                return MyResponse.checkForbidden(false);

            }

        } catch (Exception e) {

            log.info("删除订单失败!" + e);

            return new MyResponse(false);

        }

    }

    /**

     * 创建组 也就是相当于数据库

     * @return

     */

    @RequestMapping(value = "/createOrderGroup", method = RequestMethod.POST)

    public MyResponse createOrderGroup() {

        try {

            Integer v = orderService.createOrderGroup();

            if (v > 0) {

                return new MyResponse(v);

            } else {

                return new MyResponse(false);

            }

        } catch (Exception e) {

            log.info("创建订单组失败!" + e);

            return new MyResponse(false);

        }

    }

    /**

     * 查询所有订单数据

     * @return

     */

    @RequestMapping(value = "/queryOrder", method = RequestMethod.GET)

    public MyResponse queryOrder() {

        try {

            List<OrderInfo> v = orderService.queryOrder();

            if (v.size() > 0) {

                v.forEach(x -> {

                    System.out.println(x.toString());

                });

                return MyResponse.ok(v);

            } else {

                return new MyResponse(false);

            }

        } catch (Exception e) {

            log.info("查询订单组失败!" + e);

            return new MyResponse(false);

        }

    }

    /**

     * 查看数据库有多少组

     *

     * @return

     */

    @RequestMapping(value = "/selectStorageGroup", method = RequestMethod.GET)

    public MyResponse selectStorageGroup() {

        try {

            List<String> v = orderService.selectStorageGroup();

            if (v.size() > 0) {

                v.forEach(x -> {

                    System.out.println("group——————" + x.toString());

                });

                return MyResponse.ok(v);

            } else {

                return new MyResponse(false);

            }

        } catch (Exception e) {

            log.info("查询组失败!" + e);

            return new MyResponse(false);

        }

    }

}

 

serviceImpl文件

package com.iotdb.zjc.demo.serviceimpl;

import com.iotdb.zjc.demo.service.OrderService;

import com.iotdb.zjc.demo.data.OrderInfo;

import com.iotdb.zjc.demo.mapper.OrderMapper;

import com.iotdb.zjc.demo.util.IotDbConfig;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import java.util.List;

/**

 * @Author: zjc

 * @ClassName OrderServiceImpl

 * @Description TODO

 * @date 2021/11/23 17:32

 * @Version 1.0

 */

@Service

public class OrderServiceImpl implements OrderService {

    @Autowired

    IotDbConfig iotDbConfig;

    @Autowired

    OrderMapper orderMapper;

    @Override

    public Integer createOrder(OrderInfo orderInfo) {

        return orderMapper.createOrder(orderInfo);

    }

    @Override

    public Integer updateOrder(OrderInfo orderInfo) {

        return orderMapper.updateOrder(orderInfo);

    }

    @Override

    public Integer createOrderGroup() {

        try {

//            List<String> statements = new ArrayList<>();

//            statements.add("set storage group to root.order");

//            statements.add("CREATE TIMESERIES root.order.orderdetail.order_id WITH DATATYPE=INT64, ENCODING=PLAIN, COMPRESSOR=SNAPPY");

//            statements.add("CREATE TIMESERIES root.order.orderdetail.order_num WITH DATATYPE=INT32, ENCODING=PLAIN, COMPRESSOR=SNAPPY");

//            statements.add("CREATE TIMESERIES root.order.orderdetail.order_name WITH DATATYPE=TEXT, ENCODING=PLAIN, COMPRESSOR=SNAPPY");

//            statements.add("CREATE TIMESERIES root.order.orderdetail.create_time WITH DATATYPE=TEXT, ENCODING=PLAIN, COMPRESSOR=SNAPPY");

//            Integer flag = iotDbConfig.executeBatch(statements);

            Integer flag = orderMapper.createOrderGroup();

            Integer flagEle = orderMapper.createOrderGroupElement();

            System.out.println("\n\t执行sql数量为{}" + flagEle);

            return flagEle;

        } catch (Exception e) {

            e.printStackTrace();

        }

        return 0;

    }

    @Override

    public List<OrderInfo> queryOrder() {

        return orderMapper.queryOrder();

    }

    @Override

    public List<String> selectStorageGroup() {

        return orderMapper.selectStorageGroup();

    }

    @Override

    public Integer deleteOrder(String timestamp) {

        return orderMapper.deleteOrder(timestamp);

    }

}

 

mapper文件

package com.iotdb.zjc.demo.mapper;

import com.iotdb.zjc.demo.data.OrderInfo;

import org.apache.ibatis.annotations.Param;

import java.util.List;

/**

 * @Author: zjc

 * @ClassName OrderMapper

 * @Description TODO

 * @date 2021/11/24 10:09

 * @Version 1.0

 */

public interface OrderMapper {

    Integer createOrder(OrderInfo orderInfo);

    List<OrderInfo> queryOrder();

    Integer createOrderGroup();

    Integer createOrderGroupElement();

    List<String> selectStorageGroup();

    Integer updateOrder(OrderInfo orderInfo);

    Integer deleteOrder(@Param("timestamp") String timestamp);

}

 

mapper.xml文件

<?xml version="1.0" encoding="UTF-8"?>

<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">

<mapper namespace="com.iotdb.zjc.demo.mapper.OrderMapper">

    <resultMap id="BaseResultMap" type="com.iotdb.zjc.demo.data.OrderInfo">

        <result column="root.order2.orderdetail.order_id" property="orderId"/>

        <result column="root.order2.orderdetail.order_num" property="orderNum"/>

        <result column="root.order2.orderdetail.order_name" property="orderName"/>

        <result column="root.order2.orderdetail.create_time" property="createTime"/>

    </resultMap>

    <insert id="createOrder" parameterType="com.iotdb.zjc.demo.data.OrderInfo">

  insert into root.order2.orderdetail(timestamp, order_id, order_num, order_name,create_time) values(#{timestamp},#{orderId},#{orderNum},#{orderName},#{createTime});

  </insert>

    <select id="queryOrder" resultMap="BaseResultMap">

   select * from root.order2.orderdetail limit 3 offset 0

  </select>

    <select id="selectStorageGroup" resultType="java.lang.String">

        show storage group limit 3 offset 0

    </select>

    <delete id="deleteOrder" parameterType="java.lang.String">

        delete from root.order2.orderdetail where timestamp = ${timestamp};

    </delete>

    <insert id="updateOrder">

    insert into root.order2.orderdetail(timestamp, order_id, order_num, order_name,create_time) values(2021-11-24T18:28:20.689+08:00,#{orderId},#{orderNum},#{orderName},#{createTime});

    </insert>

    <update id="createOrderGroup">

        SET STORAGE GROUP TO root.order2

    </update>

    <update id="createOrderGroupElement">

—         CREATE TIMESERIES root.order2.orderdetail.create_time WITH DATATYPE=TEXT, ENCODING=PLAIN, COMPRESSOR=SNAPPY;

 CREATE TIMESERIES root.order2.orderdetail.order_num WITH DATATYPE=INT32, ENCODING=PLAIN, COMPRESSOR=SNAPPY;

    </update>

</mapper>

 

注意几点:

1、一定要注意映射关系要写成这样root.order2.orderdetail.order_id 组名的形式,不然映射不到对应实体的字段。

2、添加时要增加时间戳,时间戳就是主键,删除也是删除对应的时间戳。 3、修改操作就是新增操作,只是对应的时间戳不变,其他的都可以变。

4、创建组和创建时间序列,当我们创建一个时间序列时,我们应该定义它的数据类型和编码方案

常用的sql操作地址可以参考一下这个链接:https://blog.csdn.net/zjy660358/article/details/111315567

IoTDB安装操作请查看如下链接:

————————————————

版权声明:本文为CSDN博主「阳光非宅猿」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/weixin_42103983/article/details/121580689