방프리

24.05.21 C# 동시성 프로그래밍 (컬렉션) 본문

C#/동시성 처리

24.05.21 C# 동시성 프로그래밍 (컬렉션)

방프리 2024. 5. 21. 23:04

1. 불변 스택과 불변 큐

기본 스택과 큐와 성능을 동일하나 컬렉션을 자주 업데이트를 해야한다면 기본 스택과 큐가 더 빠르다. 
단, 사용 시 하단의 내용을 알고 있어야 한다.

  • 불변 컬렉션의 인스턴스는 절대 바뀌지 않는다.
  • 스레드로부터 안전하다
  • 불변 컬렉션을 변경하는 메서드를 호출하면 변경한 내용을 반영한 새로운 컬렉션을 반환한다.
  • 스렏드 간 통신에 사용해서는 안된다. 생산자/소비자 큐가 훨씬 좋다.

 

2. 불변 리스트

불변 리스트는 일반적인 List<T>와 달리 불변 리스트의 인스턴스끼리 최대한 많은 메모리를 공유할 수 있게 내부적으로 이진트리로 구현되어 있다. 그러기에 대부분의 소요시간이 O(log N)이기에 foreach를 적극 활용해야 한다.
ImmutableList<T>는 그다지 선호되지 않는다. 사용 시 잘 확인해볼 것

3. 불변 집합

ImmutableHashSet<T>는 고유 항목의 컬렉션이고, ImmutableSortedSet<T>는 정렬된 고유 항목의 컬렉션이다. 만약 정렬이 정말 필요하지 않는다면 ImmutableHashSet<T>를 사용하는 것이 좋다. 정렬된 컨테이너라 하더라도 인덱싱에서는
O(1)이 아닌 O(log N)이기 때문이다. 또한 ImmutableSortedSet<T>도 가능한 foreach를 사용해야 한다.

4. 불변 딕셔너리

ImmutableDictionary<TKey, TValue>와 ImmutableSortedDictionary<TKey, TValue> 두 가지가 있다. 정렬되지 않는 ImmutableDictionary가 좀 더 빠르게 동작하기에 정렬이 필요없다면 ImmutableDictionary 사용이 좀 더 효율적이다.

5. 스레드로부터 안전한 딕셔너리

ConcurrentDictionary는 정교한 잠금과 무잠금 기법을 함께 사용해서 대부분의 상황에서의 빠른 접근을 보장하며 스레드로부터 안전하다. ConcurrentDictionary를 대리자와 함께 사용할 땐 대리자는 삽입할 값만 생성해야하지 대리자에서 AddOrUdpate를 진행하게 되면 부작용이 있을 수 있다. ConcurrentDictionary가 만능은 아니기에 사용 시에도 주의가 필요하다.

6. 블로킹 큐

BlockingCollection<T>은 여러 스레드가 공유하면서 선입선출로 동작하는 큐이다. 사용 시 private 접근 제한자에 읽기 전용 필드로 정의한 후 사용한다.
블로킹 큐는 스레드를 나누어 각각 생산자 스레드와 소비자 스레드라 지정한 후 역할을 나눈다. 생산자 스레드에서는 Add를 통해 항목을 추가하고 CompleteAdding을 호출하여 컬렉션을 완료 상태로 표시할 수 있다.
소비자 스레드는 대개 루프 안에서 실행하며 다음 항목을 기다렸다가 처리한다. Task.Run 등을 통해 생산자 코드를 별도의 스레드에 넣으면 다음과 같이 항목을 소비할 수 있다.

7. 블로킹 스택과 블로킹 백

BlockingCollection<T>로 후입선출 방식의 스택 또는 순서가 없는 백을 만들 수 있다.

BlockingCollection<int> _blockingStack = new BlockingCollection<int>(
    new ConcurrentStack<int>());
BlockingCollection<int> _blockingBag = new BlockingCollection<int>(
    new ConcurrentBag<int>());
    
// 생산자 코드
_blockingStack.Add(7);
_blockingStack.Add(13);
_blockingStack.CompleteAdding();

// 소비자 코드
foreach (int item in _blockingStack.GetConsumingEnumerable())
    Trace.WriteLine(item);

 

8. 비동기 큐

비동기 API를 지니는 큐가 필요하다. 닷넷에는 없지만 Nuget을 통해 몇 가지 선택을 할 수 있다.


첫 번째로는 채널인데 비동기적 생산자/소비자 컬렉션을 지원하는 최신 라이브러리로 큰 규모에서 높은 성능에 중점을 두고 있다. 생산자는 WriteAsync로 채널에 항목을 작성하며 모든 항목의 생산이 끝나면 생산자 중 하나가 Complete를 호출해서 완료를 채널에 알린다.

Channel<int> queue = Channle.CreateUnbounded<int>();

// 생산자 코드
ChannelWriter<int> writer = queue.Writer;
await writer.WriteAsync(7);
await writer.WriteAsync(13);
writer.Complete();

ChannelReader<int> reader = queue.Reader;
await foreach (int value in reader.ReadAllAsync())
    Trace.WriteLine(value);

두 번째로는 BufferBlock<T>를 사용하는 것이다.

var _asyncQueue = new BufferBlock<int>();

// 생산자 코드
await _asyncQueue.SendAsync(7);
await _asyncQueue.SendAsync(13);
_asyncQueue.Complete();

while (await _asyncQueue.OutputAvailableAsync())
    Trace.WriteLine(await _asyncQueue.ReceiveAsync());

세 번째로는 Nito.AsyncEx의 AsyncProducerComsumerQueue<T> 형식이다.

var _asyncQueue = new AsyncProducerComsumerQueue<int>();

// 생산자 코드
await _asyncQueue.EnqueueAsync(7);
await _asyncQueue.EnqueueAsync(13);
_asyncQueue.CompleteAdding();

// 소비자 코드
while (await _asyncQueue.OutputAvailableAsync())
    Trace.WriteLine(await _asyncQueue.DequeueAsync());

 

9. 큐 조절

소비 속도가 빠르다고 확신할 수 없다면 생산 속도가 소비 속도보다 빠를 때를 대비해야한다.
큐의 최대 요소 수를 지정하여 큐를 조절해야 한다.

// 채널 생성 제한
Channel<int> queue = Channel.CreateBounded<int>(1);
ChannelWriter<int> writer = queue.Writer;

await writer.WriteAsync(7);

await writer.WriteAsync(13);

write.Complete();

// BufferBlock<T> 생성 제한
var queue = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 1 });
    
await queue.SendAsync(7);

await queue.SendAsync(13);

queue.Complete();

// Nito.AsyncEx
var queue = new AsyncProducerConsumerQueue<int>(maxCount: 1);

await queue.EnqueueSync(7);

await queue.EnqueueAsync(13);
queue.CompleteAdding();

// BlockingCollection<T>
var queue = new BlockingCollection<int>(boundedCapacity: 1);
queue.Add(7);
queue.Add(13);

queue.CompleteAdding();

 

10. 큐 샘플링

최신 항목을 유지하면서 오래된 항목을 삭제하는 것이 일반적

Channel<int> queue = Channel.CreateBounded<int>(
    new BoundedChannelOptions(1)
    {
        FullMode = BoundedChannelFullMode.DropOldest,
    });
ChannelWriter<int> writer = queue.Writer;

await writer.WriteAsync(7);
await writer.WriteAsync(13);

//오래된 항목을 보존하고 최신 작업을 삭제하려고 할 때
Channel<int> queue = Channel.CreateBounded<int>(
    new BoundedChannelOptions(1)
    {
        FullMode = BoundedChannelFullMode.DropWrite,
    });
ChannelWriter<int> writer = queue.Writer;

await writer.WriteAsync(7);
await writer.WriteAsync(13);

 

11. 비동기 스택과 비동기 백

Nito.AsyncEx의 AsyncCollection<T>가 비동기 큐의 역할을 하지만 모든 종류의 생산자/소비자 컬렉션의 역할도 할 수 있다.

var _asyncStack = new AsyncCollection<int>(
    new ConcurrentStack<int>());
var _asyncBag = new AsyncCollection<int>(
    new ConcurrentBag<int>());
    
// 생산자 코드
await _asyncStack.AddAsync(7);
await _asyncStack.AddAsync(13);
_asyncStack.CompleteAdding();

// 소비자 코드
while (await _asyncStack.OutputAvailableAsync())
    Trace.WriteLine(await _asyncStack.TakeAsync());
    
// 컬렉션 크기를 지정하여 대기
var _asyncStack = new AsyncCollection<int>(
    new ConcurrentStack<int>(), maxCount: 1);

// 이 AddAsync는 즉시 완료
await _asyncStack.AddAsync(7);

// 13을 추가하기 전 7의 삭제를 대기
await _asyncStack.AddAsync(13);

_asyncStack.CompleteAdding();

 

12. 블로킹 큐와 비동기 큐

백그라운드 스레드에서 데이터를 로드하면서 로드한 데이터를 전달 경로에 밀어넣다가 경로가 꽉차면 동기적으로 차단하면서 데이터를 수신하는 UI 스레드의 응답성을 유지할 수 있게 비동기적으로 경로에서 데이터를 로드하려고 할 때

// BufferBloc<T>를 사용한 생산자/소비자 큐
var queue = new BufferBlock<int>();

// 생산자 코드
await queue.SendAsync(7);
await queue.SendAsync(13);
queue.Complete();

// 단일 소비자용 소비자 코드
while (await queue.OutputAvailableAsync())
    Trace.WriteLine(await queue.ReceiveAsync());

// 복수 소비자용 소비자 코드
while (true)
{
    int item;
    try
    {
        item = await queue.ReceiveAsync();
    }
    catch (InvalidOperationException)
    {
        break;
    }
    
    Trace.WriteLine(item);
}

// ActionBlock<T>를 사용한 생산자/소비자 큐
ActionBlock<int> queue = new ActionBlock<int>(item => Trace.WriteLine(item));

// 비동기적 생산자 코드
await queue.SendAsync(7);
await queue.Sendsync(13);

// 동기적 생산자 코드
queue.Post(7);
queue.Post(13);
queue.Complete();

// Nito.AsyncEx의 AsyncProducerConsumerQueue<T>
var queue = new AsyncProducerConsumerQueue<int>();

// 비동기적 생산자 코드
await queue.EnqueueAsync(7);
await queue.EnqueueAsync(13);

// 동기적 생산자 코드
queue.Enqueue(7);
queue.Enqueue(13);

queue.CompleteAdding();

// 비동기적 단일 소비자 코드
while (await queue.OutputAvailableAsync())
    Trace.WriteLine(await queue.DequeueAsync());
    
// 비동기적 복수 소비자 코드
while (true)
{
    int item;
    try
    {
        item = await queue.DequeueAsync();
    }
    catch (InvalidOperationException)
    {
        break;
    }
    Trace.WriteLine(item);
}

// 동기 소비자 코드
foreach (int item in queue.GetConsumingEnumerable())
    Trace.WriteLine(item);

 

Comments