EfCore의 AddRange와 BulkInsert

최근에 사이드 프로젝트를 진행하면서 겪었던 문제사항과 해결(?)방법에 대해 공유드리고자 합니다.

PostgresSql을 사용하여 EF Core를 사용중에 Transaction과 관련된 문제입니다.
클린아키텍쳐 구조를 사용했습니다.

[Authorize]
public record CreateRecipeByBarCommand : IRequest<BaseResponse<long>>, ITransactional
{
    public string? RecipeName { get; init; }
    
    public string? Description { get; init; }
    
    public bool IsPublic { get; init; }
    
    public bool IsChangeableByOthers { get; init; }
    
    public ICollection<CreateRecipeAlcohol> Alcohols { get; init; } = new List<CreateRecipeAlcohol>();
    
    public ICollection<CreateRecipeIngredient> Ingredients { get; init; } = new List<CreateRecipeIngredient>();
}

public class CreateRecipeByBarCommandHandler : IRequestHandler<CreateRecipeByBarCommand, BaseResponse<long>>
{
    private readonly IApplicationDbContext _context;
    private readonly IMediator _sender;

    public CreateRecipeByBarCommandHandler(IApplicationDbContext context, IMediator sender)
    {
        _context = context;
        _sender = sender;
    }
    
    public async Task<BaseResponse<long>> Handle(CreateRecipeByBarCommand request, CancellationToken cancellationToken)
    {
        var recipe = new Recipe
        {
            Name = request.RecipeName,
            Description = request.Description,
            IsPublic = request.IsPublic,
            IsChangeableByOthers = request.IsChangeableByOthers
        };
        
        recipe.AddDomainEvent(new RecipeCreatedEvent(recipe));
        
        _context.Recipes.Add(recipe);
        
        await _context.SaveChangesAsync(cancellationToken);

        var commandRecipeAlcohols = new CreateRecipeAlcoholsCommand
        {
            RecipeId = recipe.Id,
            Alcohols = request.Alcohols
        };
        
        await _sender.Send(commandRecipeAlcohols, cancellationToken);

        var commandRecipeIngredients = new CreateRecipeIngredientsCommand
        {
            RecipeId = recipe.Id,
            Ingredients = request.Ingredients
        };
        
        await _sender.Send(commandRecipeIngredients, cancellationToken);
        
        return BaseResponse<long>.Success(recipe.Id);
    }
}

다음과 같이 Recipe를 생성하는 메소드가 있습니다. 여기서 ITransactional이라는 마커인터페이스를 상속받아서 트랜잭션을 활성화 시켰습니다.

public record CreateRecipeAlcoholsCommand : IRequest
{
    public ICollection<CreateRecipeAlcohol> Alcohols { get; init; } = new List<CreateRecipeAlcohol>();

    public long RecipeId { get; init; }
}

public record CreateRecipeAlcohol
{
    public long AlcoholId { get; init; }

    public decimal AlcoholCost { get; init; }

    // [9] 액체 단위 타입 코드
    public int SystemUnitTypeCode { get; init; }
}

public class CreateRecipeAlcoholsCommandHandler : IRequestHandler<CreateRecipeAlcoholsCommand>
{
    private readonly IApplicationDbContext _context;

    public CreateRecipeAlcoholsCommandHandler(IApplicationDbContext context)
    {
        _context = context;
    }

    public async Task Handle(CreateRecipeAlcoholsCommand request, CancellationToken cancellationToken)
    {
        {
            var entities = request.Alcohols.Select(alcohol =>
            {
                var entity = new RecipeAlcohol
                {
                    RecipeId = request.RecipeId,
                    AlcoholId = alcohol.AlcoholId,
                    AlcoholCost = alcohol.AlcoholCost.ToMilliLiterDecimal(alcohol.SystemUnitTypeCode),
                    SystemUnitTypeCode = alcohol.SystemUnitTypeCode
                };
                entity.AddDomainEvent(new RecipeAlcoholCreatedEvent(entity));
                //_context.RecipeAlcohols.Add(entity);
                return entity;
            }).ToList();
            
            _context.RecipeAlcohols.AddRange(entities);

            await _context.SaveChangesAsync(cancellationToken);
        }
    }
}

그리고 위와 같이 AlcoholRecipe 리스트를 생성해주는 메서드도 호출했습니다 (IngredientRecipe의 모습은 동일하여, 생략합니다.)

다만,

앗… 여기서 멘붕이 오더군요.
일단 구글을 좀 찾아보니 AddRange를 저장할 경우에는 단건의 쿼리를 여러번 전송한다. 라고 확인을 했습니다.

흠, 그렇다면 여기서 방법을 바꿔서
Add는 어떨까 하는 실험정신으로 단건으로 여러번 추가해본결과, 어래래… 잘되네여…? 다만 더 찾아보니 Add는 벤치마크에서도 확실히 시간차이가 많이벌어지네여…

그래서 제가 선택한방법은

ef core의 extension nuget을 사용했습니다.

    public async Task BulkInsertAuditableEntitiesAsync<T>(IList<T> entities, BulkConfig? bulkConfig = null,
        Action<decimal>? progress = null, Type? type = null, CancellationToken cancellationToken = default)
        where T : BaseAuditableEntity
    {
        entities.UpdateEntities(_userService.Id);

        await this.BulkInsertAsync(entities, bulkConfig, progress, type, cancellationToken);
    }
    
    public async Task BulkInsertEntitiesAsync<T>(IList<T> entities, BulkConfig? bulkConfig = null,
        Action<decimal>? progress = null, Type? type = null, CancellationToken cancellationToken = default)
        where T : class
    {
        await this.BulkInsertAsync(entities, bulkConfig, progress, type, cancellationToken);
    }

위와 같이 진행하였으며,

image

네 테스트가 통과되었습니다.

결과적으로 아래와 같이 유추가 가능한 것 같습니다.

  1. AddRange 는 SaveChangesAsync를 호출하는 순간 transaction을 건다.
  2. Add로 단건씩 추가한경우에는 transaction을 걸지않고, 단건씩 저장한다.

다만, 몇몇부분들은 좀 더 고민이 필요한 것 같지만, EF Core에서의 대량 업데이트가 필요하거나, 상단에서 Transaction이 걸려있다면, BulkInsert Extension을 고려해볼만 한것같습니다.

추가로, 제가 틀린부분이 있거나, 더 좋은 방법이 있다면 언제든 저에게 알려주세요. 감사합니다.

4개의 좋아요

IApplicationDbContext 가 인터페이스인가요?

넵 맞습니다.

DbContext.SaveChanges 는 하나의 트랜잭션입니다.

예외로 보아, CreateRecipeByBarCommandHandler 가 생성한 Transcation 내부에서, CreateRecipeAlcoholsCommandHandler 와 CreateRecipeIngredientsCommandHandler가 SaveChanges 를 호출하면서 발생시킨 Transaction 이 문제가 되는 것 같습니다.

MediatR 은 RequestHandler 를 비동기적으로 호출하기 때문에, 각 Transaction 은 Nested/Concurrent Transaction 이 됩니다.

다만, 같은 transcation 인데, DbSet.Add()는 문제가 없고, DbSet.AddRange 만 문제가 되는 것은 좀 흥미롭습니다.

DbContext 문제를 떠나, CreateRecipeByBarCommandHandler 가 CreateRecipeByBarCommand 를 Unit of Work 로 처리하도록 변경할 필요는 있을 것 같습니다.

즉, CreateRecipeAlcoholsCommandHandler 와 CreateRecipeIngredientsCommandHandler 로직을 모두 포함하게 만드는 것이죠.

이는 CreateRecipeByBarCommand 가 본질적으로 저장소에 대한 Transaction을 나타내기 때문입니다. 덤으로 DbContext Concurrency 문제도 해결될 것입니다.

3개의 좋아요

일단 ITransactional 마커 인터페이스를 상속 받을시

public class TransactionBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
    where TRequest : notnull
{
    private readonly IUnitOfWork _unitOfWork;
    private readonly ILogger<TransactionBehavior<TRequest, TResponse>> _logger;

    public TransactionBehavior(
        IUnitOfWork unitOfWork, 
        ILogger<TransactionBehavior<TRequest, TResponse>> logger)
    {
        _unitOfWork = unitOfWork;
        _logger = logger;
    }

    public async Task<TResponse> Handle(
        TRequest request, 
        RequestHandlerDelegate<TResponse> next, 
        CancellationToken cancellationToken)
    {
        // ITransactional 인터페이스를 구현한 요청에만 트랜잭션 적용
        if (request is ITransactional)
        {
            var startTime = DateTime.UtcNow;
            
            _logger.LogInformation(
                "Transaction started for {RequestType} at {StartTime}", 
                typeof(TRequest).Name, 
                startTime
            );

            using var transaction = await _unitOfWork.BeginTransactionAsync(cancellationToken);
            try 
            {
                var response = await next();

                _unitOfWork.Commit();

                var endTime = DateTime.UtcNow;
                var duration = endTime - startTime;

                _logger.LogInformation(
                    "Transaction for {RequestType} completed in {Duration}", 
                    typeof(TRequest).Name, 
                    duration
                );

                return response;
            }
            catch (Exception ex)
            {
                _logger.LogError(
                    ex, 
                    "Transaction for {RequestType} failed after {Duration}", 
                    typeof(TRequest).Name, 
                    DateTime.UtcNow - startTime
                );

                _unitOfWork.Rollback();
                throw;
            }
        }

        // 트랜잭션이 필요 없는 요청은 그대로 처리
        return await next();
    }
}
public interface IUnitOfWork
{
    Task<IDbTransaction> BeginTransactionAsync(CancellationToken cancellationToken = default);
    
    void Commit();
    
    void Rollback();
}

public class UnitOfWork : IUnitOfWork
{
    private readonly IApplicationDbContext _context;
    private IDbTransaction _transaction;

    public UnitOfWork(IApplicationDbContext context)
    {
        _context = context;
        _transaction = null!;
    }

    public async Task<IDbTransaction> BeginTransactionAsync(CancellationToken cancellationToken = default)
    {
        var connection = _context.DatabaseConnection;

        if (connection.State != ConnectionState.Open)
        {
            await connection.OpenAsync(cancellationToken);
        }

        _transaction = await connection.BeginTransactionAsync(cancellationToken);
        return _transaction;
    }

    public void Commit()
    {
        _transaction.Commit();
    }

    public void Rollback()
    {
        _transaction.Rollback();
    }
}

해당 형식으로 하나의 단위를 잡고 실행하게 됩니다.
말씀하신 부분이 이런형식이 맞으실까요?

그런 것 같습니다.

UnitOfWork 의 Transcation 내부에서 IMediator.Send 가 또 다른 Task 를 유발하고, 거기에서 다시 DbContext 가 사용되기 때문입니다.

DbContext 는 이렇게 동시에 여러 쓰레드에서 사용될 때 매우 취약합니다.

개발 단계에서 발현되지 않더라도, 안심할 수 없습니다.
운용 단계에서 발현될 수도 있기 때문입니다.

그리고, DbContext 를 IRepository, IApplicationDbContext 등 추가적인 Abstraction으로 감싸지 말라고 하는데는 이런 Concurrency 이슈도 한 몫합니다.

특히, MeditaR과 함께 사용할 때는 바로 사용하는 것이 더 좋다는 것이 개인적인 생각입니다.

예를 들면, 아래는 커맨드, 커맨드 Response입니다.

Application.csproj
/Recipe/
Create.cs, CreateResponse.cs
Update.cs, UpdateResponse.cs
Query.cs, QueryResponse.cs
QueryById.cs, QueryByIdResponse.cs

아래는 핸들러 공급자들입니다.

DatabaseHandler.EFcore.csproj.
abstract AppDbContext.cs
/Configurations/
…테이블 설정들
/Handlers/Recipes/
RecipeCreateHandler(AppDbContext db) : IRequestHandler<Create, CreateResponse>,

services.AddHandlers();

DataBase.SqlServer.csproj → DatabaseHandler.EFcore.csproj.
SqlServerDbContext : AppDbContext.cs (Sql server 용 디테일 설정)

services.AddScoped<AppDbContext, SqlDbContext>();

DataBase.Postgresql.csproj → DatabaseHandler.EFcore.csproj.
PgsqlDbContext : AppDbContext.cs (Pgsql 용 디테일 설정.)

services.AddScoped<AppDbContext, PgsqlDbContext>();

ClientHandlers.csproj
/Recipes/
RecipeCreateHandler(HttpClient proxy) : IRequestHandler<Create, CreateResponse>,

services.AddHandlers()

아래는 커맨드 전송자들입니다.

Api.csproj
builder.Services.AddEFcoreHandlers();
builder.Services.AddSqlServerServices();
// builder.Services.AddPgsqlServices();

Was.csproj
builder.Services.AddEFcoreHandlers();
builder.Services.AddSqlServerServices();
// builder.Services.AddPgsqlServices();

Client.BlazorWasm.csproj → ClientHanders.csproj
builder.Services.AddClientHandlers();

Client.WPF.csproj → ClientHandlers.csproj
builder.Services.AddClientHandlers();

WPFStandalone.csproj
builder.Services.AddEFcoreHandlers();
builder.Services.AddSqlServerServices();
// builder.Services.AddPgsqlServices();

1개의 좋아요

앗, 이런부분에 대해서 놓쳤네요. 감사합니다

1개의 좋아요