Batch Inserts with Large Collections

This scenario demonstrate how to perform insert operations on collections of 1,000 to 10,000 objects. Some ORMs require special handling for collections of this size.

For other batch operations, see CRUD Operations on Multiple Objects.

For better performance, consider using a Bulk Insert instead.

Scenario Prototype

public interface ILargeBatchScenario<TEmployeeSimple>
   where TEmployeeSimple : class, IEmployeeSimple, new()
{
    /// <summary>
    /// Gets a collection of Employee rows by their name. Assume the name is not unique.
    /// </summary>
    int CountByLastName(string lastName);

    /// <summary>
    /// Insert a large collection of Employee rows.
    /// </summary>
    void InsertLargeBatch(IList<TEmployeeSimple> employees);

    /// <summary>
    /// Insert a large collection of Employee rows.
    /// </summary>
    /// <param name="employees">The employees.</param>
    /// <param name="batchSize">Size of the batch.</param>
    void InsertLargeBatch(IList<TEmployeeSimple> employees, int batchSize);

    /// <summary>
    /// Gets the maximum size of a batch.
    /// </summary>
    /// <remarks>Return Int32.MaxValue if not limited.</remarks>
    int MaximumBatchSize { get; }
}

For ORMs that require breaking up the size of the batch, this function is provided.

public static IEnumerable<List<T>> BatchAsLists<T>(this IEnumerable<T> source, int batchSize)
{
    if (source == null)
        throw new ArgumentNullException(nameof(source), $"{nameof(source)} is null.");
    if (batchSize <= 0)
        throw new ArgumentOutOfRangeException(nameof(batchSize), batchSize, $"{batchSize} must be greater than 0");

    return BatchAsListsCore();

    IEnumerable<List<T>> BatchAsListsCore()
    {
        int count = 0;
        using (var iter = source.GetEnumerator())
        {
            while (iter.MoveNext())
            {
                var chunk = new List<T>(batchSize);
                count = 1;
                chunk.Add(iter.Current);
                for (int i = 1; i < batchSize && iter.MoveNext(); i++)
                {
                    chunk.Add(iter.Current);
                    count++;
                }
                yield return chunk;
            }
        }
    }
}

ADO.NET

Large collections need to be broken up into batches. For SQL Server, the maximum batch size is approximately 2100/number of columns.

    public class LargeBatchScenario : SqlServerScenarioBase, ILargeBatchScenario<EmployeeSimple>
    {
        public LargeBatchScenario(string connectionString) : base(connectionString)
        { }

        public int CountByLastName(string lastName)
        {
            const string sql = "SELECT Count(*) FROM HR.EmployeeDetail e WHERE e.LastName = @LastName";
            using (var con = OpenConnection())
            using (var cmd = new SqlCommand(sql, con))
            {
                cmd.Parameters.AddWithValue("@LastName", lastName);
                return (int)cmd.ExecuteScalar();
            }
        }

        public int MaximumBatchSize => 2100 / 7;

        public void InsertLargeBatch(IList<EmployeeSimple> employees)
        {
            InsertLargeBatch(employees, 250);
        }

        public void InsertLargeBatch(IList<EmployeeSimple> employees, int batchSize)
        {
            if (employees == null || employees.Count == 0)
                throw new ArgumentException($"{nameof(employees)} is null or empty.", nameof(employees));

            using (var con = OpenConnection())
            using (var trans = con.BeginTransaction())
            {
                foreach (var batch in employees.BatchAsLists(batchSize))
                {
                    var sql = new StringBuilder(@"INSERT INTO HR.Employee
(FirstName, MiddleName, LastName, Title, OfficePhone, CellPhone, EmployeeClassificationKey)
VALUES ");

                    for (var i = 0; i < batch.Count; i++)
                    {
                        if (i != 0)
                            sql.AppendLine(",");
                        sql.Append($"(@FirstName_{i}, @MiddleName_{i}, @LastName_{i}, @Title_{i}, @OfficePhone_{i}, @CellPhone_{i}, @EmployeeClassificationKey_{i})");
                    }
                    sql.AppendLine(";");

                    using (var cmd = new SqlCommand(sql.ToString(), con, trans))
                    {
                        for (var i = 0; i < batch.Count; i++)
                        {
                            cmd.Parameters.AddWithValue($"@FirstName_{i}", batch[i].FirstName);
                            cmd.Parameters.AddWithValue($"@MiddleName_{i}", (object?)batch[i].MiddleName ?? DBNull.Value);
                            cmd.Parameters.AddWithValue($"@LastName_{i}", batch[i].LastName);
                            cmd.Parameters.AddWithValue($"@Title_{i}", (object?)batch[i].Title ?? DBNull.Value);
                            cmd.Parameters.AddWithValue($"@OfficePhone_{i}", (object?)batch[i].OfficePhone ?? DBNull.Value);
                            cmd.Parameters.AddWithValue($"@CellPhone_{i}", (object?)batch[i].CellPhone ?? DBNull.Value);
                            cmd.Parameters.AddWithValue($"@EmployeeClassificationKey_{i}", batch[i].EmployeeClassificationKey);
                        }
                        cmd.ExecuteNonQuery();
                    }
                }
                trans.Commit();
            }
        }
    }

Chain

The InsertMultipleBatch command overcomes the database's limit on parameter counts, but doesn't offer as many features as InsertBatch.

The InsertMultipleBatch command is not atomic and should be used in a transaction.

public class LargeBatchScenario : ILargeBatchScenario<EmployeeSimple>
{
    readonly SqlServerDataSource m_DataSource;

    public LargeBatchScenario(SqlServerDataSource dataSource)
    {
        m_DataSource = dataSource;
    }

    public int MaximumBatchSize => 2100 / 7;

    public int CountByLastName(string lastName)
    {
        return (int)m_DataSource.From<EmployeeSimple>(new { lastName }).AsCount().Execute();
    }

    public void InsertLargeBatch(IList<EmployeeSimple> employees)
    {
        if (employees == null || employees.Count == 0)
            throw new ArgumentException($"{nameof(employees)} is null or empty.", nameof(employees));

        using (var trans = m_DataSource.BeginTransaction())
        {
            trans.InsertMultipleBatch((IReadOnlyList<EmployeeSimple>)employees).Execute();
            trans.Commit();
        }
    }

    public void InsertLargeBatch(IList<EmployeeSimple> employees, int batchSize)
    {
        using (var trans = m_DataSource.BeginTransaction())
        {
            //This is essentially what InsertMultipleBatch does
            foreach (var batch in employees.BatchAsLists(batchSize))
                trans.InsertBatch(batch).Execute();

            trans.Commit();
        }
    }
}

Dapper

Large collections need to be broken up into batches. For SQL Server, the maximum batch size is approximately 2100/number of columns.

    public class LargeBatchScenario : ScenarioBase, ILargeBatchScenario<EmployeeSimple>
    {
        public LargeBatchScenario(string connectionString) : base(connectionString)
        { }

        public int CountByLastName(string lastName)
        {
            const string sql = "SELECT Count(*) FROM HR.EmployeeDetail e WHERE e.LastName = @LastName";

            using (var con = OpenConnection())
                return con.ExecuteScalar<int>(sql, new { lastName });
        }

        public int MaximumBatchSize => 2100 / 7;

        virtual public void InsertLargeBatch(IList<EmployeeSimple> employees)
        {
            InsertLargeBatch(employees, 250);
        }

        virtual public void InsertLargeBatch(IList<EmployeeSimple> employees, int batchSize)
        {
            if (employees == null || employees.Count == 0)
                throw new ArgumentException($"{nameof(employees)} is null or empty.", nameof(employees));

            using (var con = OpenConnection())
            using (var trans = con.BeginTransaction())
            {
                foreach (var batch in employees.BatchAsLists(batchSize))
                {
                    var sql = new StringBuilder(@"INSERT INTO HR.Employee
(FirstName, MiddleName, LastName, Title, OfficePhone, CellPhone, EmployeeClassificationKey)
VALUES ");
                    var parameters = new Dictionary<string, object?>();
                    for (var i = 0; i < batch.Count; i++)
                    {
                        if (i != 0)
                            sql.AppendLine(",");
                        sql.Append($"(@FirstName_{i}, @MiddleName_{i}, @LastName_{i}, @Title_{i}, @OfficePhone_{i}, @CellPhone_{i}, @EmployeeClassificationKey_{i})");

                        parameters[$"@FirstName_{i}"] = batch[i].FirstName;
                        parameters[$"@MiddleName_{i}"] = batch[i].MiddleName;
                        parameters[$"@LastName_{i}"] = batch[i].LastName;
                        parameters[$"@Title_{i}"] = batch[i].Title;
                        parameters[$"@OfficePhone_{i}"] = batch[i].OfficePhone;
                        parameters[$"@CellPhone_{i}"] = batch[i].CellPhone;
                        parameters[$"@EmployeeClassificationKey_{i}"] = batch[i].EmployeeClassificationKey;
                    }
                    sql.AppendLine(";");

                    con.Execute(sql.ToString(), parameters, transaction: trans);
                }
                trans.Commit();
            }
        }
    }
Info

The repository methods are not normally virtual. This was done so that they could be overridden with better implementations as shown below.

Dapper.Contrib

The Dapper.Contrib library removes the need to explicilty batch inserts.

public class LargeBatchScenarioContrib : LargeBatchScenario
{
    public LargeBatchScenarioContrib(string connectionString) : base(connectionString)
    { }

    override public void InsertLargeBatch(IList<EmployeeSimple> employees)
    {
        if (employees == null || employees.Count == 0)
            throw new ArgumentException($"{nameof(employees)} is null or empty.", nameof(employees));

        using (var con = OpenConnection())
        using (var trans = con.BeginTransaction())
        {
            con.Insert(employees, trans);
            trans.Commit();
        }
    }

    override public void InsertLargeBatch(IList<EmployeeSimple> employees, int batchSize)
    {
        if (employees == null || employees.Count == 0)
            throw new ArgumentException($"{nameof(employees)} is null or empty.", nameof(employees));

        using (var con = OpenConnection())
        using (var trans = con.BeginTransaction())
        {
            foreach (var batch in employees.BatchAsLists(batchSize))
                con.Insert(batch, trans);

            trans.Commit();
        }
    }
}

DbConnector

public class LargeBatchScenario : ScenarioBase, ILargeBatchScenario<EmployeeSimple>
{
    public LargeBatchScenario(string connectionString) : base(connectionString)
    { }

    public int CountByLastName(string lastName)
    {
        const string sql = "SELECT Count(*) FROM HR.EmployeeDetail e WHERE e.LastName = @lastName";

        return DbConnector.Scalar<int>(sql, new { lastName }).Execute();
    }

    public int MaximumBatchSize => int.MaxValue;

    virtual public void InsertLargeBatch(IList<EmployeeSimple> employees)
    {
        InsertLargeBatch(employees, 250);
    }

    virtual public void InsertLargeBatch(IList<EmployeeSimple> employees, int batchSize)
    {
        if (employees == null || !employees.Any())
            throw new ArgumentException($"{nameof(employees)} is null or empty.", nameof(employees));

        //Best approach for unlimited inserts since SQL server has parameter amount restrictions
        //https://docs.microsoft.com/en-us/sql/sql-server/maximum-capacity-specifications-for-sql-server?redirectedfrom=MSDN&view=sql-server-ver15
        //Also, notice the "batchSize" argument is not necessary.
        DbConnector.Build<int?>(
                sql: @$"INSERT INTO {EmployeeSimple.TableName}
                (
                    CellPhone,
                    EmployeeClassificationKey,
                    FirstName,
                    LastName,
                    MiddleName,
                    OfficePhone,
                    Title
                ) 
                VALUES (
                    @{nameof(EmployeeSimple.CellPhone)},
                    @{nameof(EmployeeSimple.EmployeeClassificationKey)},
                    @{nameof(EmployeeSimple.FirstName)},
                    @{nameof(EmployeeSimple.LastName)},
                    @{nameof(EmployeeSimple.MiddleName)},
                    @{nameof(EmployeeSimple.OfficePhone)},
                    @{nameof(EmployeeSimple.Title)}
                )",
                param: employees.First(),
                onExecute: (int? result, IDbExecutionModel em) =>
                {
                    //Set the command
                    DbCommand command = em.Command;

                    //Execute first row.
                    em.NumberOfRowsAffected = command.ExecuteNonQuery();

                    //Set and execute remaining rows.
                    foreach (var emp in employees.Skip(1))
                    {
                        command.Parameters[nameof(EmployeeSimple.CellPhone)].Value = emp.CellPhone ?? (object)DBNull.Value;
                        command.Parameters[nameof(EmployeeSimple.EmployeeClassificationKey)].Value = emp.EmployeeClassificationKey;
                        command.Parameters[nameof(EmployeeSimple.FirstName)].Value = emp.FirstName ?? (object)DBNull.Value;
                        command.Parameters[nameof(EmployeeSimple.LastName)].Value = emp.LastName ?? (object)DBNull.Value;
                        command.Parameters[nameof(EmployeeSimple.MiddleName)].Value = emp.MiddleName ?? (object)DBNull.Value;
                        command.Parameters[nameof(EmployeeSimple.OfficePhone)].Value = emp.OfficePhone ?? (object)DBNull.Value;
                        command.Parameters[nameof(EmployeeSimple.Title)].Value = emp.Title ?? (object)DBNull.Value;

                        em.NumberOfRowsAffected += command.ExecuteNonQuery();
                    }

                    return em.NumberOfRowsAffected;
                }
            )
            .WithIsolationLevel(IsolationLevel.ReadCommitted)//Use a transaction
            .Execute();
    }
}

Entity Framework 6

Entity Framework can suffer severe performance degration as the number of objects it tracks increases. To mitigate this effect, a new DBContext after every batch is necessary. Start with a batch size of 100 and adjust as needed.

public class LargeBatchScenario : ILargeBatchScenario<Employee>
{
    private Func<OrmCookbookContext> CreateDbContext;

    public LargeBatchScenario(Func<OrmCookbookContext> dBContextFactory)
    {
        CreateDbContext = dBContextFactory;
    }

    public int MaximumBatchSize => int.MaxValue;

    public int CountByLastName(string lastName)
    {
        using (var context = CreateDbContext())
            return context.Employee.Where(ec => ec.LastName == lastName).Count();
    }

    public void InsertLargeBatch(IList<Employee> employees)
    {
        if (employees == null || employees.Count == 0)
            throw new ArgumentException($"{nameof(employees)} is null or empty.", nameof(employees));

        //Performance can be significantly degraded as the size of the batch increases
        InsertLargeBatch(employees, 50);
    }

    public void InsertLargeBatch(IList<Employee> employees, int batchSize)
    {
        if (employees == null || employees.Count == 0)
            throw new ArgumentException($"{nameof(employees)} is null or empty.", nameof(employees));

        foreach (var batch in employees.BatchAsLists(batchSize))
        {
            using (var context = CreateDbContext())
            {
                context.Employee.AddRange(batch);
                context.SaveChanges();
            }
        }
    }
}

Entity Framework Core

No changes are needed.

public class LargeBatchScenario : ILargeBatchScenario<Employee>
{
    private Func<OrmCookbookContext> CreateDbContext;

    public LargeBatchScenario(Func<OrmCookbookContext> dBContextFactory)
    {
        CreateDbContext = dBContextFactory;
    }

    public int MaximumBatchSize => int.MaxValue;

    public int CountByLastName(string lastName)
    {
        using (var context = CreateDbContext())
            return context.Employees.Where(ec => ec.LastName == lastName).Count();
    }

    public void InsertLargeBatch(IList<Employee> employees)
    {
        if (employees == null || employees.Count == 0)
            throw new ArgumentException($"{nameof(employees)} is null or empty.", nameof(employees));

        using (var context = CreateDbContext())
        {
            context.Employees.AddRange(employees);
            context.SaveChanges();
        }
    }

    public void InsertLargeBatch(IList<Employee> employees, int batchSize)
    {
        foreach (var batch in employees.BatchAsLists(batchSize))
        {
            using (var context = CreateDbContext())
            {
                context.Employees.AddRange(batch);
                context.SaveChanges();
            }
        }
    }
}

LINQ to DB

LinqToDB only supports batch inserts from a collection of objects.

Note the use of BulkCopyType.MultipleRows.

public void BulkInsert(IList<Employee> employees)
{
    var options = new BulkCopyOptions() { BulkCopyType = BulkCopyType.ProviderSpecific };
    using (var db = new OrmCookbook())
        db.BulkCopy(options, employees);
}

public void BulkInsert(DataTable employees)
{
    Assert.Inconclusive("Bulk insert using a DataTable is not supported.");
}

For more information see Bulk Copy (Bulk Insert)

LLBLGen Pro

No changes are needed.

public class LargeBatchScenario : ILargeBatchScenario<EmployeeEntity>
{
    public int CountByLastName(string lastName)
    {
        using (var adapter = new DataAccessAdapter())
        {
            return new LinqMetaData(adapter).Employee.Where(e => e.LastName == lastName).Count();
        }
    }

    public int MaximumBatchSize => 2100 / 7;

    public void InsertLargeBatch(IList<EmployeeEntity> employees)
    {
        InsertLargeBatch(employees, 100);
    }

    public void InsertLargeBatch(IList<EmployeeEntity> employees, int batchSize)
    {
        if (employees == null || employees.Count == 0)
            throw new ArgumentException($"{nameof(employees)} is null or empty.", nameof(employees));

        // The collection is disposed as it assigns event handlers to the entities it contains. Keeping the
        // entities around would keep the collection in memory.
        using (var toInsert = new EntityCollection<EmployeeEntity>(employees))
        {
            using (var adapter = new DataAccessAdapter())
            {
                adapter.BatchSize = batchSize;
                adapter.SaveEntityCollection(toInsert);
            }
        }
    }
}

NHibernate

No changes are needed.

public class LargeBatchScenario : ILargeBatchScenario<Employee>
{
    readonly ISessionFactory m_SessionFactory;

    public LargeBatchScenario(ISessionFactory sessionFactory)
    {
        m_SessionFactory = sessionFactory;
    }

    public int MaximumBatchSize => int.MaxValue;

    public int CountByLastName(string lastName)
    {
        using (var session = m_SessionFactory.OpenSession())
            return session.Query<Employee>().Where(ec => ec.LastName == lastName).Count();
    }

    public void InsertLargeBatch(IList<Employee> employees)
    {
        if (employees == null || employees.Count == 0)
            throw new ArgumentException($"{nameof(employees)} is null or empty.", nameof(employees));

        using (var session = m_SessionFactory.OpenSession())
        {
            foreach (var employee in employees)
                session.Save(employee);
            session.Flush();
        }
    }

    public void InsertLargeBatch(IList<Employee> employees, int batchSize)
    {
        if (employees == null || employees.Count == 0)
            throw new ArgumentException($"{nameof(employees)} is null or empty.", nameof(employees));

        foreach (var batch in employees.BatchAsLists(batchSize))
        {
            using (var session = m_SessionFactory.OpenSession())
            {
                foreach (var employee in batch)
                    session.Save(employee);
                session.Flush();
            }
        }
    }
}

RepoDb

No changes are needed.

public class LargeBatchScenario : BaseRepository<EmployeeSimple, SqlConnection>, ILargeBatchScenario<EmployeeSimple>
{
    public LargeBatchScenario(string connectionString)
        : base(connectionString, RDB.Enumerations.ConnectionPersistency.Instance)
    { }

    public int CountByLastName(string lastName)
    {
        return Query(e => e.LastName == lastName).Count();
    }

    public int MaximumBatchSize => 2100 / 7;

    public void InsertLargeBatch(IList<EmployeeSimple> employees)
    {
        if (employees == null || employees.Count == 0)
            throw new ArgumentException($"{nameof(employees)} is null or empty.", nameof(employees));

        InsertAll(employees);
    }

    public void InsertLargeBatch(IList<EmployeeSimple> employees, int batchSize)
    {
        if (employees == null || employees.Count == 0)
            throw new ArgumentException($"{nameof(employees)} is null or empty.", nameof(employees));

        InsertAll(employees, batchSize: batchSize);
    }
}

ServiceStack

No changes are needed.

public class LargeBatchScenario : ILargeBatchScenario<Employee>
{
    private readonly IDbConnectionFactory _dbConnectionFactory;

    public LargeBatchScenario(IDbConnectionFactory dbConnectionFactory)
    {
        _dbConnectionFactory = dbConnectionFactory;
    }

    public int MaximumBatchSize => int.MaxValue;

    public int CountByLastName(string lastName)
    {
        using (var db = _dbConnectionFactory.OpenDbConnection())
            return (int)db.Count<Employee>(e => e.LastName == lastName);
    }

    public void InsertLargeBatch(IList<Employee> employees)
    {
        using (var db = _dbConnectionFactory.OpenDbConnection())
            db.InsertAll(employees);
    }

    public void InsertLargeBatch(IList<Employee> employees, int batchSize)
    {
        using (var db = _dbConnectionFactory.OpenDbConnection())
        {
            foreach (var batch in employees.BatchAsLists(batchSize))
                db.InsertAll(batch);
        }
    }
}