BlockingCollection 컬렉션 소개

BlockingCollection은 다음의 기능을 제공합니다.

  • 생산자-소비자 패턴 구현
  • 여러 스레드에서 항목을 동시에 추가하거나 가져올 수 있음
  • 최대 용량 지정
  • 컬렉션이 비었거나 찼을 경우 차단할 작업 삽입 및 제거
  • IProducerConsumerCollection를 구현하는 모든 컬렉션 형식 사용
  • 취소 토근으로 취소 가능
  • 항목을 소비하면서 열거할 수 있는 GetConsumingEnumerable() 제공
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

using BlockingCollection<int> queue = new();
int count = 0;

var cts = new CancellationTokenSource();
var producerTask = Task.Run(() =>
{
    while (cts.Token.IsCancellationRequested == false)
    {
        queue.Add(count, cts.Token);
        Console.WriteLine($"IN: {count}");
        count++;
        queue.Add(count, cts.Token);
        Console.WriteLine($"IN: {count}");
        count++;
        queue.Add(count, cts.Token);
        Console.WriteLine($"IN: {count}");
        count++;

        Thread.Sleep(1000);
    }

    Console.WriteLine("ProducerTask END");
}, cts.Token);

var consumerTask = Task.Run(() =>
{
    while (cts.Token.IsCancellationRequested == false)
    {
        try
        {
            var result = queue.Take(cts.Token);
            Console.WriteLine($"OUT: {result}");
        }
        catch (OperationCanceledException)
        {
            break;
        }
    }

    Console.WriteLine("ConsumerTask END");
}, cts.Token);

Console.WriteLine("Press Enter to exit.");
Console.ReadLine();

cts.Cancel();

Console.ReadLine();

GetConsumingEnumerable를 이용해 foreach로 목록을 소비할 수 있습니다. 특히, foreach문을 벗어나기 위해 CompleteAdding
를 호출해야 함을 알수 있습니다.

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

using (BlockingCollection<int> bc = new BlockingCollection<int>())
{
    // Kick off a producer task
    var producerTask = Task.Run(async () =>
    {
        for (int i = 0; i < 10; i++)
        {
            bc.Add(i);
            Console.WriteLine($"Producing: {i}");

            await Task.Delay(100); // sleep 100 ms between adds
        }

        // Need to do this to keep foreach below from hanging
        bc.CompleteAdding();
    });

    // Now consume the blocking collection with foreach.
    // Use bc.GetConsumingEnumerable() instead of just bc because the
    // former will block waiting for completion and the latter will
    // simply take a snapshot of the current state of the underlying collection.
    foreach (var item in bc.GetConsumingEnumerable())
    {
        Console.WriteLine($"Consuming: {item}");
    }
    await producerTask; // Allow task to complete cleanup
}