SQLServer · 最佳实践 · 数据库实现大容量插入的几种方式

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB 分布式版,标准版 2核8GB
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: title: SQLServer · 最佳实践 · 数据库实现大容量插入的几种方式 author: 石沫 背景 很多用户在使用阿里云云数据库SQL Server时,为了加快插入速度,都尝试使用大容量插入的方式,大家都知道,对于完整恢复模式下的数据库,大容量导入执行的所有行插入操作都会完整地记录

title: SQLServer · 最佳实践 · 数据库实现大容量插入的几种方式

author: 石沫

背景

很多用户在使用阿里云云数据库SQL Server时,为了加快插入速度,都尝试使用大容量插入的方式,大家都知道,对于完整恢复模式下的数据库,大容量导入执行的所有行插入操作都会完整地记录在事务日志中。如果使用完整恢复模式,大型数据导入会导致填充事务日志的速度很快。相反,对于简单恢复模式或大容量日志恢复模式,大容量导入操作的按最小方式记录日志减少了大容量导入操作填满日志空间的可能性。另外,按最小方式记录日志的效率也比按完整方式记录日志高 。
但实际上,当大容量导入与数据库镜像共存时,会出现镜像 Suspend的情况,这个情况是由于微软在2008 R2上的BUG导致,详细你可以了解 https://support.microsoft.com/en-us/kb/2700641 ,微软已经明确表示在2008 R2不会FIXED,那么如何正确在RDS使用大容量导入并避免镜像异常,下面介绍几种方式.

实现方法

  • 通过ADO.NET SQLBulkCopy 方式
只需要将SqlBulkCopy 指定SqlBulkCopyOptions.CheckConstraints就好,即:SqlBulkCopy blkcpy = new SqlBulkCopy(desConnString, SqlBulkCopyOptions.CheckConstraints)
例如:将本地的一个大表通过SQLBulkCopy方式导入到RDS的实例中

static void Main()
{

    string srcConnString = "Data Source=(local);Integrated Security=true;
    Initial Catalog=testdb";
    string desConnString = "Data Source=****.sqlserver.rds.aliyuncs.com,3433;
    UserID=**;Password=**;Initial Catalog=testdb";

    SqlConnection srcConnection = new SqlConnection();
    SqlConnection desConnection = new SqlConnection();

    SqlCommand sqlcmd = new SqlCommand();
    SqlDataAdapter da = new SqlDataAdapter();
    DataTable dt = new DataTable();

    srcConnection.ConnectionString = srcConnString;
    desConnection.ConnectionString = desConnString;
    sqlcmd.Connection = srcConnection;

    sqlcmd.CommandText = @"
    SELECT top 1000000 [PersonType],[NameStyle],[Title],[FirstName],[MiddleName],
    [LastName] ,[Suffix],[EmailPromotion],[AdditionalContactInfo],[Demographics],NULL 
    as rowguid,[ModifiedDate] FROM [testdb].[dbo].[Person]";

    sqlcmd.CommandType = CommandType.Text;
    sqlcmd.Connection.Open();
    da.SelectCommand = sqlcmd;
    da.Fill(dt);


    using (SqlBulkCopy blkcpy = 
    new  SqlBulkCopy(desConnString,SqlBulkCopyOptions.CheckConstraints))
    // using (SqlBulkCopy blkcpy = 
    // new SqlBulkCopy(desConnString, SqlBulkCopyOptions.Default))
    {
        blkcpy.BatchSize = 2000;
        blkcpy.BulkCopyTimeout = 5000;
        blkcpy.SqlRowsCopied += new SqlRowsCopiedEventHandler(OnSqlRowsCopied);
        blkcpy.NotifyAfter = 2000;

        foreach (DataColumn dc in dt.Columns)
        {
            blkcpy.ColumnMappings.Add(dc.ColumnName, dc.ColumnName);
        }

        try
        {
            blkcpy.DestinationTableName = "Person";
            blkcpy.WriteToServer(dt);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
        finally
        {
            sqlcmd.Clone();
            srcConnection.Close();
            desConnection.Close();

        }
    }

}

private static void OnSqlRowsCopied(
    object sender, SqlRowsCopiedEventArgs e)
{
    Console.WriteLine("Copied {0} so far...", e.RowsCopied);
}
  • 通过JDBC SQLServerBulkCopy 方式
同样的道理,需要在copyOptions指定检查约束性
SQLServerBulkCopyOptions copyOptions = new SQLServerBulkCopyOptions();  
copyOptions.setCheckConstraints(true);
测试时,请用Microsoft JDBC Drivers 6.0 的sqljdbc41.jar,sqljdbc4.jar及更老版本没有SQLServerBulkCopy 实现。

例如: 将本地的一个大表通过SQLServerBulkCopy方式导入到RDS的实例中


import java.sql.*;
import com.microsoft.sqlserver.jdbc.SQLServerBulkCopy;
import com.microsoft.sqlserver.jdbc.SQLServerBulkCopyOptions;

public class Program {
    public static void main(String[] args)  
    {  
        String sourceConnectionString  = "jdbc:sqlserver://localhost:1433;" +  
                "databaseName=testdb;user=****;password=****";  
        String destConnectionString  = "jdbc:sqlserver://*****.sqlserver.rds.aliyuncs.com:3433;" +  
                "databaseName=testdb;user=****;password=**** ";  
        
        try  
        {  
            Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");  
            try (Connection sourceConnection =
                 DriverManager.getConnection(sourceConnectionString))  
            {  
                try (Statement stmt = sourceConnection.createStatement())  
                {  
                
  
                    try (ResultSet rsSourceData = stmt.executeQuery(  
                            " SELECT top 1000000 " +
                            "[PersonType],[NameStyle],[Title],[FirstName],[MiddleName],[LastName] ," +
                            "[Suffix],[EmailPromotion],[AdditionalContactInfo]," +
                            "[Demographics],NULL as rowguid,[ModifiedDate] " +
                            "FROM [testdb].[dbo].[Person]"))  
                    {   
                        try (Connection destinationConnection =  DriverManager.getConnection(destConnectionString))  
                        {  
                            
                            Statement stmt1 = destinationConnection.createStatement();
                                    
                            long countStart = 0;  
                            try (ResultSet rsRowCount = stmt1.executeQuery(  
                                    "SELECT COUNT(*) FROM dbo.Person;"))  
                            {  
                                rsRowCount.next();  
                                countStart = rsRowCount.getInt(1);  
                                System.out.println("Starting row count = " + countStart);  
                            }  
                            
                            try (SQLServerBulkCopy bulkCopy =   new SQLServerBulkCopy(destinationConnection))  
                            {  
                                SQLServerBulkCopyOptions copyOptions = new SQLServerBulkCopyOptions();  
                                copyOptions.setKeepIdentity(true);  
                                copyOptions.setBatchSize(2000);
                                copyOptions.setBulkCopyTimeout(5000);
                                //this is importance setting
                                copyOptions.setCheckConstraints(true);
                                 
                                bulkCopy.setBulkCopyOptions(copyOptions);
                                bulkCopy.setDestinationTableName("dbo.Person");  
                                
                                bulkCopy.addColumnMapping("PersonType", "PersonType");  
                                bulkCopy.addColumnMapping("NameStyle", "NameStyle");  
                                bulkCopy.addColumnMapping("Title", "Title");  
                                bulkCopy.addColumnMapping("FirstName", "FirstName");  
                                bulkCopy.addColumnMapping("MiddleName", "MiddleName");  
                                bulkCopy.addColumnMapping("LastName", "LastName");  
                                bulkCopy.addColumnMapping("Suffix", "Suffix");  
                                bulkCopy.addColumnMapping("EmailPromotion", "EmailPromotion");  
                                bulkCopy.addColumnMapping("AdditionalContactInfo", "AdditionalContactInfo");  
                                bulkCopy.addColumnMapping("Demographics", "Demographics");  
                                bulkCopy.addColumnMapping("rowguid", "rowguid");  
                                bulkCopy.addColumnMapping("ModifiedDate", "ModifiedDate");  
  
                                try  
                                {  
                                    bulkCopy.writeToServer(rsSourceData);  
                                }  
                                catch (Exception e)  
                                {  
                                    e.printStackTrace();  
                                }  
                                
                                
                                try (ResultSet rsRowCount = stmt1.executeQuery(  
                                        "SELECT COUNT(*) FROM dbo.Person;"))  
                                {  
                                    rsRowCount.next();  
                                    long countEnd = rsRowCount.getInt(1);  
                                    System.out.println("Ending row count = " + countEnd);  
                                    System.out.println((countEnd - countStart) + " rows were added.");  
                                }  
                                
                            }  

                        }  
                    }  
                }  
            }  
        }  
        catch (Exception e)  
        {  
            e.printStackTrace();  
        }  
    }  
  
 

}
  • 通过BCP方式

第一步:需要将数据BCP到本地

BCP testdb.dbo.person Out "bcp_data" /t  /N /U **** /P *** /S "****.sqlserver.rds.aliyuncs.com,3433"    

第二步:将导出的文件直接导入到RDS的实例中,但需要指定提示:/h "CHECK_CONSTRAINTS"

BCP testdb.dbo.person In "bcp_data" /C /N /q /k /h "CHECK_CONSTRAINTS" /U *** /P *** /b 500 /S  "***.sqlserver.rds.aliyuncs.com,3433"  
  • 通过DTS/SSIS方式

第一种:import/export data方式需要先保存SSIS包,然后修改Connection Manager的属性 ,如下图
1111111

第二种:直接使用SQL Server Business Intelligence Development Stuidio新建 SSIS包:

222222

  • 特别说明

不能在RDS通过下列两种方式进行大容量插入 :原因是基于安全考虑不提供上传文件到RDS 数据库服务器。

第一种:

BULK INSERT testdb.dbo.person_in
FROM N'D:\trace\bcp.txt'
WITH
(
 CHECK_CONSTRAINTS 
);  

第二种:

INSERT ... SELECT * FROM OPENROWSET(BULK...)
  • 总结

大容量导入数据会带来更快的插入,解决了用户在有大量数据导入缓慢困惑,在阿里云数据库中,你可以使用五种方式来实现业务场景,但是基于镜像的主备关系,需要特别加入一个检查约束的选项,这是写这个最佳实践的目的,一旦镜像SUSPEND,不断有DUMP文件产生,一来需要时间来修正,二来DUMP文件也会不断占用空间,但不会影响用户的可用性和可靠性。有两种方式在RDS中不能实现,另外,还可以通过ODBC来实现大容量导入,具体请参见https://msdn.microsoft.com/en-us/library/ms403302.aspx。希望这些对大家有用,特别是阿里云云数据库使用用户。

相关实践学习
使用SQL语句管理索引
本次实验主要介绍如何在RDS-SQLServer数据库中,使用SQL语句管理索引。
SQL Server on Linux入门教程
SQL Server数据库一直只提供Windows下的版本。2016年微软宣布推出可运行在Linux系统下的SQL Server数据库,该版本目前还是早期预览版本。本课程主要介绍SQLServer On Linux的基本知识。 相关的阿里云产品:云数据库RDS SQL Server版 RDS SQL Server不仅拥有高可用架构和任意时间点的数据恢复功能,强力支撑各种企业应用,同时也包含了微软的License费用,减少额外支出。 了解产品详情: https://www.aliyun.com/product/rds/sqlserver
目录
相关文章
|
19天前
|
关系型数据库 MySQL 数据库
《MySQL 简易速速上手小册》第2章:数据库设计最佳实践(2024 最新版)
《MySQL 简易速速上手小册》第2章:数据库设计最佳实践(2024 最新版)
28 2
|
27天前
|
SQL 人工智能 算法
【SQL server】玩转SQL server数据库:第二章 关系数据库
【SQL server】玩转SQL server数据库:第二章 关系数据库
65 10
|
27天前
|
SQL 数据库 数据库管理
【SQL server】玩转SQL server数据库:第三章 关系数据库标准语言SQL(一)模式、表、索引与视图
【SQL server】玩转SQL server数据库:第三章 关系数据库标准语言SQL(一)模式、表、索引与视图
59 11
|
27天前
|
SQL 算法 数据库
【SQL server】玩转SQL server数据库:第三章 关系数据库标准语言SQL(二)数据查询
【SQL server】玩转SQL server数据库:第三章 关系数据库标准语言SQL(二)数据查询
102 6
|
8天前
|
存储 缓存 固态存储
优化矢量数据库性能:技巧与最佳实践
【4月更文挑战第30天】本文探讨了优化矢量数据库性能的技巧和最佳实践,包括硬件(如使用SSD、增加内存和利用多核处理器)、软件(索引优化、查询优化、数据分区和压缩)和架构(读写分离、分布式架构及缓存策略)方面的优化措施。通过这些方法,可以提升系统运行效率,应对大数据量和复杂查询的挑战。
|
13天前
|
SQL 调度 数据库
【Database】Sqlserver如何定时备份数据库和定时清除
【Database】Sqlserver如何定时备份数据库和定时清除
22 2
|
15天前
|
存储 SQL 数据库
数据库库表结构设计:原理、实例与最佳实践
数据库库表结构设计:原理、实例与最佳实践
37 0
|
27天前
|
SQL 存储 数据挖掘
数据库数据恢复—RAID5上层Sql Server数据库数据恢复案例
服务器数据恢复环境: 一台安装windows server操作系统的服务器。一组由8块硬盘组建的RAID5,划分LUN供这台服务器使用。 在windows服务器内装有SqlServer数据库。存储空间LUN划分了两个逻辑分区。 服务器故障&初检: 由于未知原因,Sql Server数据库文件丢失,丢失数据涉及到3个库,表的数量有3000左右。数据库文件丢失原因还没有查清楚,也不能确定数据存储位置。 数据库文件丢失后服务器仍处于开机状态,所幸没有大量数据写入。 将raid5中所有磁盘编号后取出,经过硬件工程师检测,没有发现明显的硬件故障。以只读方式将所有磁盘进行扇区级的全盘镜像,镜像完成后将所
数据库数据恢复—RAID5上层Sql Server数据库数据恢复案例
|
2月前
|
SQL Oracle 关系型数据库
干货!sqlserver数据库所有知识点总结整理,含代码(挺全的)
干货!sqlserver数据库所有知识点总结整理,含代码(挺全的)
13 0
|
7天前
|
关系型数据库 MySQL Linux
【MySQL-10】数据库函数-案例演示【字符串/数值/日期/流程控制函数】(代码演示&可cv代码)
【MySQL-10】数据库函数-案例演示【字符串/数值/日期/流程控制函数】(代码演示&可cv代码)
【MySQL-10】数据库函数-案例演示【字符串/数值/日期/流程控制函数】(代码演示&可cv代码)