草庐IT

Java怎么实现几十万条数据插入(30万条数据插入MySQL仅需13秒)

共饮一杯无 2023-04-13 原文

本文主要讲述通过MyBatis、JDBC等做大数据量数据插入的案例和结果。

30万条数据插入插入数据库验证


验证的数据库表结构如下:

CREATE TABLE `t_user` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '用户id',
  `username` varchar(64) DEFAULT NULL COMMENT '用户名称',
  `age` int(4) DEFAULT NULL COMMENT '年龄',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用户信息表';

话不多说,开整!

实体类、mapper和配置文件定义

User实体

/**
 * <p>用户实体</p>
 *
 * @Author zjq
 * @Date 2021/8/3
 */
@Data
public class User {

    private int id;
    private String username;
    private int age;

}

mapper接口

public interface UserMapper {

    /**
     * 批量插入用户
     * @param userList
     */
    void batchInsertUser(@Param("list") List<User> userList);


}

mapper.xml文件

    <!-- 批量插入用户信息 -->
    <insert id="batchInsertUser" parameterType="java.util.List">
        insert into t_user(username,age) values
        <foreach collection="list" item="item" index="index" separator=",">
            (
            #{item.username},
            #{item.age}
            )
        </foreach>
    </insert>

jdbc.properties

jdbc.driver=com.mysql.jdbc.Driver
jdbc.url=jdbc:mysql://localhost:3306/test
jdbc.username=root
jdbc.password=root

sqlMapConfig.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>

    <!--通过properties标签加载外部properties文件-->
    <properties resource="jdbc.properties"></properties>


    <!--自定义别名-->
    <typeAliases>
        <typeAlias type="com.zjq.domain.User" alias="user"></typeAlias>
    </typeAliases>


    <!--数据源环境-->
    <environments default="developement">
        <environment id="developement">
            <transactionManager type="JDBC"></transactionManager>
            <dataSource type="POOLED">
                <property name="driver" value="${jdbc.driver}"/>
                <property name="url" value="${jdbc.url}"/>
                <property name="username" value="${jdbc.username}"/>
                <property name="password" value="${jdbc.password}"/>
            </dataSource>
        </environment>
    </environments>


    <!--加载映射文件-->
    <mappers>
        <mapper resource="com/zjq/mapper/UserMapper.xml"></mapper>
    </mappers>


</configuration>

不分批次直接梭哈

MyBatis直接一次性批量插入30万条,代码如下:

    @Test
    public void testBatchInsertUser() throws IOException {
        InputStream resourceAsStream =
                Resources.getResourceAsStream("sqlMapConfig.xml");
        SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(resourceAsStream);
        SqlSession session = sqlSessionFactory.openSession();
        System.out.println("===== 开始插入数据 =====");
        long startTime = System.currentTimeMillis();
        try {
            List<User> userList = new ArrayList<>();
            for (int i = 1; i <= 300000; i++) {
                User user = new User();
                user.setId(i);
                user.setUsername("共饮一杯无 " + i);
                user.setAge((int) (Math.random() * 100));
                userList.add(user);
            }
            session.insert("batchInsertUser", userList); // 最后插入剩余的数据
            session.commit();

            long spendTime = System.currentTimeMillis()-startTime;
            System.out.println("成功插入 30 万条数据,耗时:"+spendTime+"毫秒");
        } finally {
            session.close();
        }
    }

可以看到控制台输出:

Cause: com.mysql.jdbc.PacketTooBigException: Packet for query is too large (27759038 >yun 4194304). You can change this value on the server by setting the max_allowed_packet’ variable.

超出最大数据包限制了,可以通过调整max_allowed_packet限制来提高可以传输的内容,不过由于30万条数据超出太多,这个不可取,梭哈看来是不行了 😅😅😅
既然梭哈不行那我们就一条一条循环着插入行不行呢

循环逐条插入

mapper接口和mapper文件中新增单个用户新增的内容如下:

    /**
     * 新增单个用户
     * @param user
     */
    void insertUser(User user);
    <!-- 新增用户信息 -->
    <insert id="insertUser" parameterType="user">
        insert into t_user(username,age) values
            (
            #{username},
            #{age}
            )
    </insert>

调整执行代码如下:

    @Test
    public void testCirculateInsertUser() throws IOException {
        InputStream resourceAsStream =
                Resources.getResourceAsStream("sqlMapConfig.xml");
        SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(resourceAsStream);
        SqlSession session = sqlSessionFactory.openSession();
        System.out.println("===== 开始插入数据 =====");
        long startTime = System.currentTimeMillis();
        try {
            for (int i = 1; i <= 300000; i++) {
                User user = new User();
                user.setId(i);
                user.setUsername("共饮一杯无 " + i);
                user.setAge((int) (Math.random() * 100));
                // 一条一条新增
                session.insert("insertUser", user);
                session.commit();
            }

            long spendTime = System.currentTimeMillis()-startTime;
            System.out.println("成功插入 30 万条数据,耗时:"+spendTime+"毫秒");
        } finally {
            session.close();
        }
    }

执行后可以发现磁盘IO占比飙升,一直处于高位。

等啊等等啊等,好久还没执行完


先不管他了太慢了先搞其他的,等会再来看看结果吧。
two thousand year later …
控制台输出如下:

总共执行了14909367毫秒,换算出来是4小时八分钟。太慢了。。

👇👇👇还是优化下之前的批处理方案吧

MyBatis实现插入30万条数据

先清理表数据,然后优化批处理执行插入:

-- 清空用户表
TRUNCATE table  t_user;

以下是通过 MyBatis 实现 30 万条数据插入代码实现:

    /**
     * 分批次批量插入
     * @throws IOException
     */
    @Test
    public void testBatchInsertUser() throws IOException {
        InputStream resourceAsStream =
                Resources.getResourceAsStream("sqlMapConfig.xml");
        SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(resourceAsStream);
        SqlSession session = sqlSessionFactory.openSession();
        System.out.println("===== 开始插入数据 =====");
        long startTime = System.currentTimeMillis();
        int waitTime = 10;
        try {
            List<User> userList = new ArrayList<>();
            for (int i = 1; i <= 300000; i++) {
                User user = new User();
                user.setId(i);
                user.setUsername("共饮一杯无 " + i);
                user.setAge((int) (Math.random() * 100));
                userList.add(user);
                if (i % 1000 == 0) {
                    session.insert("batchInsertUser", userList);
                    // 每 1000 条数据提交一次事务
                    session.commit();
                    userList.clear();

                    // 等待一段时间
                    Thread.sleep(waitTime * 1000);
                }
            }
            // 最后插入剩余的数据
            if(!CollectionUtils.isEmpty(userList)) {
                session.insert("batchInsertUser", userList);
                session.commit();
            }

            long spendTime = System.currentTimeMillis()-startTime;
            System.out.println("成功插入 30 万条数据,耗时:"+spendTime+"毫秒");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            session.close();
        }
    }

使用了 MyBatis 的批处理操作,将每 1000 条数据放在一个批次中插入,能够较为有效地提高插入速度。同时请注意在循环插入时要带有合适的等待时间和批处理大小,以防止出现内存占用过高等问题。此外,还需要在配置文件中设置合理的连接池和数据库的参数,以获得更好的性能。

在上面的示例中,我们每插入1000行数据就进行一次批处理提交,并等待10秒钟。这有助于控制内存占用,并确保插入操作平稳进行。

五十分钟执行完毕,时间主要用在了等待上。

如果低谷时期执行,CPU和磁盘性能又足够的情况下,直接批处理不等待执行:

    /**
     * 分批次批量插入
     * @throws IOException
     */
    @Test
    public void testBatchInsertUser() throws IOException {
        InputStream resourceAsStream =
                Resources.getResourceAsStream("sqlMapConfig.xml");
        SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(resourceAsStream);
        SqlSession session = sqlSessionFactory.openSession();
        System.out.println("===== 开始插入数据 =====");
        long startTime = System.currentTimeMillis();
        int waitTime = 10;
        try {
            List<User> userList = new ArrayList<>();
            for (int i = 1; i <= 300000; i++) {
                User user = new User();
                user.setId(i);
                user.setUsername("共饮一杯无 " + i);
                user.setAge((int) (Math.random() * 100));
                userList.add(user);
                if (i % 1000 == 0) {
                    session.insert("batchInsertUser", userList);
                    // 每 1000 条数据提交一次事务
                    session.commit();
                    userList.clear();
                }
            }
            // 最后插入剩余的数据
            if(!CollectionUtils.isEmpty(userList)) {
                session.insert("batchInsertUser", userList);
                session.commit();
            }

            long spendTime = System.currentTimeMillis()-startTime;
            System.out.println("成功插入 30 万条数据,耗时:"+spendTime+"毫秒");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            session.close();
        }
    }

则24秒可以完成数据插入操作:


可以看到短时CPU和磁盘占用会飙高。
把批处理的量再调大一些调到5000,在执行:

13秒插入成功30万条,直接芜湖起飞🛫🛫🛫

JDBC实现插入30万条数据

JDBC循环插入的话跟上面的mybatis逐条插入类似,不再赘述。
以下是 Java 使用 JDBC 批处理实现 30 万条数据插入的示例代码。请注意,该代码仅提供思路,具体实现需根据实际情况进行修改。

    /**
     * JDBC分批次批量插入
     * @throws IOException
     */
    @Test
    public void testJDBCBatchInsertUser() throws IOException {
        Connection connection = null;
        PreparedStatement preparedStatement = null;

        String databaseURL = "jdbc:mysql://localhost:3306/test";
        String user = "root";
        String password = "root";

        try {
            connection = DriverManager.getConnection(databaseURL, user, password);
            // 关闭自动提交事务,改为手动提交
            connection.setAutoCommit(false);
            System.out.println("===== 开始插入数据 =====");
            long startTime = System.currentTimeMillis();
            String sqlInsert = "INSERT INTO t_user ( username, age) VALUES ( ?, ?)";
            preparedStatement = connection.prepareStatement(sqlInsert);

            Random random = new Random();
            for (int i = 1; i <= 300000; i++) {
                preparedStatement.setString(1, "共饮一杯无 " + i);
                preparedStatement.setInt(2, random.nextInt(100));
                // 添加到批处理中
                preparedStatement.addBatch();

                if (i % 1000 == 0) {
                    // 每1000条数据提交一次
                    preparedStatement.executeBatch();
                    connection.commit();
                    System.out.println("成功插入第 "+ i+" 条数据");
                }

            }
            // 处理剩余的数据
            preparedStatement.executeBatch();
            connection.commit();
            long spendTime = System.currentTimeMillis()-startTime;
            System.out.println("成功插入 30 万条数据,耗时:"+spendTime+"毫秒");
        } catch (SQLException e) {
            System.out.println("Error: " + e.getMessage());
        } finally {
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }


上述示例代码中,我们通过 JDBC 连接 MySQL 数据库,并执行批处理操作插入数据。具体实现步骤如下:

  1. 获取数据库连接。
  2. 创建 Statement 对象。
  3. 定义 SQL 语句,使用 PreparedStatement 对象预编译 SQL 语句并设置参数。
  4. 执行批处理操作。
  5. 处理剩余的数据。
  6. 关闭 Statement 和 Connection 对象。

使用setAutoCommit(false) 来禁止自动提交事务,然后在每次批量插入之后手动提交事务。每次插入数据时都新建一个 PreparedStatement 对象以避免状态不一致问题。在插入数据的循环中,每 10000 条数据就执行一次 executeBatch() 插入数据。
另外,需要根据实际情况优化连接池和数据库的相关配置,以防止连接超时等问题。

总结

实现高效的大量数据插入需要结合以下优化策略(建议综合使用):

  1. 批处理:批量提交SQL语句可以降低网络传输和处理开销,减少与数据库交互的次数。在Java中可以使用Statement或者PreparedStatement的addBatch()方法来添加多个SQL语句,然后一次性执行executeBatch()方法提交批处理的SQL语句。
  • 在循环插入时带有适当的等待时间批处理大小,从而避免内存占用过高等问题
    • 设置适当的批处理大小:批处理大小指在一次插入操作中插入多少行数据。如果批处理大小太小,插入操作的频率将很高,而如果批处理大小太大,可能会导致内存占用过高。通常,建议将批处理大小设置为1000-5000行,这将减少插入操作的频率并降低内存占用。
    • 采用适当的等待时间:等待时间指在批处理操作之间等待的时间量。等待时间过短可能会导致内存占用过高,而等待时间过长则可能会延迟插入操作的速度。通常,建议将等待时间设置为几秒钟到几十秒钟之间,这将使操作变得平滑且避免出现内存占用过高等问题。
    • 可以考虑使用一些内存优化的技巧,例如使用内存数据库或使用游标方式插入数据,以减少内存占用。
  • 总的来说,选择适当的批处理大小和等待时间可以帮助您平稳地进行插入操作,避免出现内存占用过高等问题。
  1. 索引: 在大量数据插入前暂时去掉索引,最后再打上,这样可以大大减少写入时候的更新索引的时间。
  2. 数据库连接池:使用数据库连接池可以减少数据库连接建立和关闭的开销,提高性能。在没有使用数据库连接池的情况,记得在finally中关闭相关连接。
  3. 数据库参数调整:增加MySQL数据库缓冲区大小、配置高性能的磁盘和I/O等。

本文内容到此结束了,
如有收获欢迎点赞👍收藏💖关注✔️,您的鼓励是我最大的动力。
如有错误❌疑问💬欢迎各位指出。
主页共饮一杯无的博客汇总👨‍💻

保持热爱,奔赴下一场山海。🏃🏃🏃

有关Java怎么实现几十万条数据插入(30万条数据插入MySQL仅需13秒)的更多相关文章

  1. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  2. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  3. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  4. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  5. ruby - Ruby 中的隐式返回值是怎么回事? - 2

    所以我开始关注ruby​​,很多东西看起来不错,但我对隐式return语句很反感。我理解默认情况下让所有内容返回self或nil但不是语句的最后一个值。对我来说,它看起来非常脆弱(尤其是)如果你正在使用一个不打算返回某些东西的方法(尤其是一个改变状态/破坏性方法的函数!),其他人可能最终依赖于一个返回对方法的目的并不重要,并且有很大的改变机会。隐式返回有什么意义?有没有办法让事情变得更简单?总是有返回以防止隐含返回被认为是好的做法吗?我是不是太担心这个了?附言当人们想要从方法中返回特定的东西时,他们是否经常使用隐式返回,这不是让你组中的其他人更容易破坏彼此的代码吗?当然,记录一切并给出

  6. ruby - 怎么来的(a_method || :other) returns :other only when assigning to a var called a_method? - 2

    给定以下方法:defsome_method:valueend以下语句按我的预期工作:some_method||:other#=>:valuex=some_method||:other#=>:value但是下面语句的行为让我感到困惑:some_method=some_method||:other#=>:other它按预期创建了一个名为some_method的局部变量,随后对some_method的调用返回该局部变量的值。但为什么它分配:other而不是:value呢?我知道这可能不是一件明智的事情,并且可以看出它可能有多么模棱两可,但我认为应该在考虑作业之前评估作业的右侧...我已经在R

  7. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  8. ruby-on-rails - 我该怎么办 :remote location validation with CarrierWave? - 2

    我在我的Rails3示例应用程序上使用CarrierWave。我想验证远程位置上传,因此当用户提交无效URL(空白或非图像)时,我不会收到标准错误异常:CarrierWave::DownloadErrorinImageController#createtryingtodownloadafilewhichisnotservedoverHTTP这是我的模型:classPaintingtrue,:length=>{:minimum=>5,:maximum=>100}validates:image,:presence=>trueend这是我的Controller:classPaintingsC

  9. java - 我的模型类或其他类中应该有逻辑吗 - 2

    我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我

  10. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

随机推荐