방프리

23.11.11 C# 동시성 프로그래밍 (TPL) 본문

C#/동시성 처리

23.11.11 C# 동시성 프로그래밍 (TPL)

방프리 2023. 11. 11. 17:34

1. 블록 연결

var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
var subtractBlock = new TransformBlock<int, int>(item => item - 2);

//PropagateCompletion 옵션을 통해 연결 대상의 완료 및 오류 알림 여부 확인
var options = new DataflowLinkOptions { PropagateCompletion = true };
multiplyBlock.LinkTo(subtractBlock, options);

...

//첫 번째 블록의 완료를 자동으로 두 번째 블록에 전파한다.
multiplyBlock.Complete();
await subtractBlock.Completion;

 

2. 오류 전파

- 반드시 Completion의  await을 통해 대기해야 한다.

- PropagateCompletion 옵션을 통해 예외를 전파받을 수 있도록 해야한다.

- 메시 (연결된 블록의 단위)가 복잡하다면 각 블록별로 확인이 필요할 수 있다.

try
{
    var multiplyBlock = new TransformBlock<int, int>(item =>
    {
    	if (item == 1)
        {
        	throw new InvalidOperationException("Blech.");
        }
        return item * 2;
    });
    var subtractBlock = new TransformBlock<int, int>(item => item - 2);
    multiplyBlock.LinkTo(subtractBlock,
    	new DataflowLinkOptions { PropagateCompletion = true });
    multiplyBlock.Post(1);
    await subtractBlock.Completion;
}
catch (AggregateException)
{
}

 

3. 블록의 연결 해제

- 연결된 블록을 동적으로 변경

- 연결 및 연결 해제는 Thread-Safe하다. 

var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
var subtractBlock = new TransformBlock<int, int>(item => item - 2);

IDispoable link = multiplyBlock.LinkTo(subtractBlock);
multiplyBlock.Post(1);
multiplyBlock.Post(2);

link.Dispose();

 

4. 블록의 흐름 조절

- 각 블록의 작업의 분기를 태우거나 혹은 블록의 작업이 무거워 한번에 많은 양을 처리하지 못할 때 사용

var sourceBlock = new BufferBlock<int>();
//입력데이터의 개수 조절, DataflowBlockOptions.UnBounded(무제한)으로 설정가능 상수로는 -1
var options = new DataflowBlockOptions { BoundedCapacity = 1 };
var targetBlockA = new BufferBlock<int>(options);
var targetBlockB = new BufferBlock<int>(options);

sourceBlock.LinkTo(targetBlockA);
sourceBlock.LinkTo(targetBlockB);

 

5. 데이터 흐름 블록으로 병렬 처리

- 각 블록은 독립적이기 때문에 연결하여도 수행은 독립적으로 수행한다. 이로 인해 병렬 작업이 가능

var multiplyBlock = new TransformBlock<int, int>(
    item => item * 2,
    new ExecutionDataflowBlockOptions
    {
    	MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });
var subtractBlock = new TransformBlock<int, int>(item => item - 2);
multiplyBlock.LinkTo(subtractBlock);

 

6. 사용자 지정 데이터 흐름 블록 생성

- 재사용이 가능한 블록을 생성

IPropagatorBlock<int, int> CreateMyCustomBlock()
{
    var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
    var addBlock = new TransformBlock<int, int>(item => item + 2);
    var divideBlock = new TransformBlock<int, int>(item => item / 2);
    
    var flowCompletion = new DataflowLinkOptions { PropagateCompletion = true };
    multiplyBlock.LinkTo(addBlock, flowCompletion);
    addBlock.LinkTo(divideBlock.flowCompletion);
    
    return DataflowBlock.Encapsulate(multiplyBlock, divideBlock);
}
Comments