방프리
24.05.21 C# 동시성 프로그래밍 (컬렉션) 본문
1. 불변 스택과 불변 큐
기본 스택과 큐와 성능을 동일하나 컬렉션을 자주 업데이트를 해야한다면 기본 스택과 큐가 더 빠르다.
단, 사용 시 하단의 내용을 알고 있어야 한다.
- 불변 컬렉션의 인스턴스는 절대 바뀌지 않는다.
- 스레드로부터 안전하다
- 불변 컬렉션을 변경하는 메서드를 호출하면 변경한 내용을 반영한 새로운 컬렉션을 반환한다.
- 스렏드 간 통신에 사용해서는 안된다. 생산자/소비자 큐가 훨씬 좋다.
2. 불변 리스트
불변 리스트는 일반적인 List<T>와 달리 불변 리스트의 인스턴스끼리 최대한 많은 메모리를 공유할 수 있게 내부적으로 이진트리로 구현되어 있다. 그러기에 대부분의 소요시간이 O(log N)이기에 foreach를 적극 활용해야 한다.
ImmutableList<T>는 그다지 선호되지 않는다. 사용 시 잘 확인해볼 것
3. 불변 집합
ImmutableHashSet<T>는 고유 항목의 컬렉션이고, ImmutableSortedSet<T>는 정렬된 고유 항목의 컬렉션이다. 만약 정렬이 정말 필요하지 않는다면 ImmutableHashSet<T>를 사용하는 것이 좋다. 정렬된 컨테이너라 하더라도 인덱싱에서는
O(1)이 아닌 O(log N)이기 때문이다. 또한 ImmutableSortedSet<T>도 가능한 foreach를 사용해야 한다.
4. 불변 딕셔너리
ImmutableDictionary<TKey, TValue>와 ImmutableSortedDictionary<TKey, TValue> 두 가지가 있다. 정렬되지 않는 ImmutableDictionary가 좀 더 빠르게 동작하기에 정렬이 필요없다면 ImmutableDictionary 사용이 좀 더 효율적이다.
5. 스레드로부터 안전한 딕셔너리
ConcurrentDictionary는 정교한 잠금과 무잠금 기법을 함께 사용해서 대부분의 상황에서의 빠른 접근을 보장하며 스레드로부터 안전하다. ConcurrentDictionary를 대리자와 함께 사용할 땐 대리자는 삽입할 값만 생성해야하지 대리자에서 AddOrUdpate를 진행하게 되면 부작용이 있을 수 있다. ConcurrentDictionary가 만능은 아니기에 사용 시에도 주의가 필요하다.
6. 블로킹 큐
BlockingCollection<T>은 여러 스레드가 공유하면서 선입선출로 동작하는 큐이다. 사용 시 private 접근 제한자에 읽기 전용 필드로 정의한 후 사용한다.
블로킹 큐는 스레드를 나누어 각각 생산자 스레드와 소비자 스레드라 지정한 후 역할을 나눈다. 생산자 스레드에서는 Add를 통해 항목을 추가하고 CompleteAdding을 호출하여 컬렉션을 완료 상태로 표시할 수 있다.
소비자 스레드는 대개 루프 안에서 실행하며 다음 항목을 기다렸다가 처리한다. Task.Run 등을 통해 생산자 코드를 별도의 스레드에 넣으면 다음과 같이 항목을 소비할 수 있다.
7. 블로킹 스택과 블로킹 백
BlockingCollection<T>로 후입선출 방식의 스택 또는 순서가 없는 백을 만들 수 있다.
BlockingCollection<int> _blockingStack = new BlockingCollection<int>(
new ConcurrentStack<int>());
BlockingCollection<int> _blockingBag = new BlockingCollection<int>(
new ConcurrentBag<int>());
// 생산자 코드
_blockingStack.Add(7);
_blockingStack.Add(13);
_blockingStack.CompleteAdding();
// 소비자 코드
foreach (int item in _blockingStack.GetConsumingEnumerable())
Trace.WriteLine(item);
8. 비동기 큐
비동기 API를 지니는 큐가 필요하다. 닷넷에는 없지만 Nuget을 통해 몇 가지 선택을 할 수 있다.
첫 번째로는 채널인데 비동기적 생산자/소비자 컬렉션을 지원하는 최신 라이브러리로 큰 규모에서 높은 성능에 중점을 두고 있다. 생산자는 WriteAsync로 채널에 항목을 작성하며 모든 항목의 생산이 끝나면 생산자 중 하나가 Complete를 호출해서 완료를 채널에 알린다.
Channel<int> queue = Channle.CreateUnbounded<int>();
// 생산자 코드
ChannelWriter<int> writer = queue.Writer;
await writer.WriteAsync(7);
await writer.WriteAsync(13);
writer.Complete();
ChannelReader<int> reader = queue.Reader;
await foreach (int value in reader.ReadAllAsync())
Trace.WriteLine(value);
두 번째로는 BufferBlock<T>를 사용하는 것이다.
var _asyncQueue = new BufferBlock<int>();
// 생산자 코드
await _asyncQueue.SendAsync(7);
await _asyncQueue.SendAsync(13);
_asyncQueue.Complete();
while (await _asyncQueue.OutputAvailableAsync())
Trace.WriteLine(await _asyncQueue.ReceiveAsync());
세 번째로는 Nito.AsyncEx의 AsyncProducerComsumerQueue<T> 형식이다.
var _asyncQueue = new AsyncProducerComsumerQueue<int>();
// 생산자 코드
await _asyncQueue.EnqueueAsync(7);
await _asyncQueue.EnqueueAsync(13);
_asyncQueue.CompleteAdding();
// 소비자 코드
while (await _asyncQueue.OutputAvailableAsync())
Trace.WriteLine(await _asyncQueue.DequeueAsync());
9. 큐 조절
소비 속도가 빠르다고 확신할 수 없다면 생산 속도가 소비 속도보다 빠를 때를 대비해야한다.
큐의 최대 요소 수를 지정하여 큐를 조절해야 한다.
// 채널 생성 제한
Channel<int> queue = Channel.CreateBounded<int>(1);
ChannelWriter<int> writer = queue.Writer;
await writer.WriteAsync(7);
await writer.WriteAsync(13);
write.Complete();
// BufferBlock<T> 생성 제한
var queue = new BufferBlock<int>(
new DataflowBlockOptions { BoundedCapacity = 1 });
await queue.SendAsync(7);
await queue.SendAsync(13);
queue.Complete();
// Nito.AsyncEx
var queue = new AsyncProducerConsumerQueue<int>(maxCount: 1);
await queue.EnqueueSync(7);
await queue.EnqueueAsync(13);
queue.CompleteAdding();
// BlockingCollection<T>
var queue = new BlockingCollection<int>(boundedCapacity: 1);
queue.Add(7);
queue.Add(13);
queue.CompleteAdding();
10. 큐 샘플링
최신 항목을 유지하면서 오래된 항목을 삭제하는 것이 일반적
Channel<int> queue = Channel.CreateBounded<int>(
new BoundedChannelOptions(1)
{
FullMode = BoundedChannelFullMode.DropOldest,
});
ChannelWriter<int> writer = queue.Writer;
await writer.WriteAsync(7);
await writer.WriteAsync(13);
//오래된 항목을 보존하고 최신 작업을 삭제하려고 할 때
Channel<int> queue = Channel.CreateBounded<int>(
new BoundedChannelOptions(1)
{
FullMode = BoundedChannelFullMode.DropWrite,
});
ChannelWriter<int> writer = queue.Writer;
await writer.WriteAsync(7);
await writer.WriteAsync(13);
11. 비동기 스택과 비동기 백
Nito.AsyncEx의 AsyncCollection<T>가 비동기 큐의 역할을 하지만 모든 종류의 생산자/소비자 컬렉션의 역할도 할 수 있다.
var _asyncStack = new AsyncCollection<int>(
new ConcurrentStack<int>());
var _asyncBag = new AsyncCollection<int>(
new ConcurrentBag<int>());
// 생산자 코드
await _asyncStack.AddAsync(7);
await _asyncStack.AddAsync(13);
_asyncStack.CompleteAdding();
// 소비자 코드
while (await _asyncStack.OutputAvailableAsync())
Trace.WriteLine(await _asyncStack.TakeAsync());
// 컬렉션 크기를 지정하여 대기
var _asyncStack = new AsyncCollection<int>(
new ConcurrentStack<int>(), maxCount: 1);
// 이 AddAsync는 즉시 완료
await _asyncStack.AddAsync(7);
// 13을 추가하기 전 7의 삭제를 대기
await _asyncStack.AddAsync(13);
_asyncStack.CompleteAdding();
12. 블로킹 큐와 비동기 큐
백그라운드 스레드에서 데이터를 로드하면서 로드한 데이터를 전달 경로에 밀어넣다가 경로가 꽉차면 동기적으로 차단하면서 데이터를 수신하는 UI 스레드의 응답성을 유지할 수 있게 비동기적으로 경로에서 데이터를 로드하려고 할 때
// BufferBloc<T>를 사용한 생산자/소비자 큐
var queue = new BufferBlock<int>();
// 생산자 코드
await queue.SendAsync(7);
await queue.SendAsync(13);
queue.Complete();
// 단일 소비자용 소비자 코드
while (await queue.OutputAvailableAsync())
Trace.WriteLine(await queue.ReceiveAsync());
// 복수 소비자용 소비자 코드
while (true)
{
int item;
try
{
item = await queue.ReceiveAsync();
}
catch (InvalidOperationException)
{
break;
}
Trace.WriteLine(item);
}
// ActionBlock<T>를 사용한 생산자/소비자 큐
ActionBlock<int> queue = new ActionBlock<int>(item => Trace.WriteLine(item));
// 비동기적 생산자 코드
await queue.SendAsync(7);
await queue.Sendsync(13);
// 동기적 생산자 코드
queue.Post(7);
queue.Post(13);
queue.Complete();
// Nito.AsyncEx의 AsyncProducerConsumerQueue<T>
var queue = new AsyncProducerConsumerQueue<int>();
// 비동기적 생산자 코드
await queue.EnqueueAsync(7);
await queue.EnqueueAsync(13);
// 동기적 생산자 코드
queue.Enqueue(7);
queue.Enqueue(13);
queue.CompleteAdding();
// 비동기적 단일 소비자 코드
while (await queue.OutputAvailableAsync())
Trace.WriteLine(await queue.DequeueAsync());
// 비동기적 복수 소비자 코드
while (true)
{
int item;
try
{
item = await queue.DequeueAsync();
}
catch (InvalidOperationException)
{
break;
}
Trace.WriteLine(item);
}
// 동기 소비자 코드
foreach (int item in queue.GetConsumingEnumerable())
Trace.WriteLine(item);
'C# > 동시성 처리' 카테고리의 다른 글
24.05.24 C# 동시성 프로그래밍 (함수형 친화적 OOP) (0) | 2024.05.24 |
---|---|
24.05.22 C# 동시성 프로그래밍 (취소) (0) | 2024.05.22 |
24.05.09 C# 동시성 프로그래밍 (상호운용) (0) | 2024.05.09 |
23.11.19 C# 동시성 프로그래밍 (테스트) (0) | 2023.11.19 |
23.11.12 C# 동시성 프로그래밍 (Reactive) (1) | 2023.11.12 |