방프리

24.05.22 C# 동시성 프로그래밍 (취소) 본문

C#/동시성 처리

24.05.22 C# 동시성 프로그래밍 (취소)

방프리 2024. 5. 22. 23:03

1. 취소 요청 실행

CancellationTokenSource 형식은 CancellationToken의 공급원이며 이 형식만 취소 요청에 대응이 가능하다.
또한 다른 CancellationTokenSource와 독립적이다. 코드를 취소할 때 거의 항상 경합 조건이 발생한다.

void IssueCancelRequest()
{
    using var cts = new CancellationTokenSource();
    var task = CancelableMethodAsync(cts.Token);
    
    cts.Cancel();
}

 

2. 폴링으로 취소 요청에 대응

함수 안에 처리 루프가 있을 때는 취소 여부를 주기적으로 확인할 수 밖에 없기에 실행속도가 빠르다면 속도를 어느 정도 조절해가며 성능을 측정해야 한다.

public int CancelableMethod(CancellationToken cancellationToken)
{
    for (int i = 0; i != 100000; ++i)
    {
        Thread.Sleep(1);
        if (i % 1000 == 0)
        {
            cancellationToken.ThrowIfCancellationRequested();
        }
    }
    return 42;
}

 

3. 타임아웃으로 취소

취소 요청의 한 형태로 일정 시간 뒤에 취소 요청을 한다.

async Task IssueTimeoutAsync()
{
    using var cts = new CancellationTokenSource();
    CancellationTOken token = cts.Token;
    cts.CancelAfter(TimeSpan.FromSeconds(5));
    await Task.Delay(TimeSpan.FromSeconds(10), token);
}

 

4. 비동기 코드의 취소

CancellationToken을 다음 계층으로 넘겨 처리. 취소를 지원하지 않는 메서드도 있다. 이럴 땐 방법이 없기에 결과를 무시하는 등의 자체 처리로 넘겨야 한다.

public async Task<int> CancelableMethodAsync(CancellationToken cancellationToken)
{
    await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken);
    return 42;
}

 

5. 병렬 코드의 취소

CancellationToken을 병렬 코드에 전달하여 취소를 지원한다. 

void RotateMatrics(IEnumerable<Matrix> matrices, float degrees,
    CancellationTOken token)
{
    Parallel.ForEach(matrices,
        new ParallelOptions { CancellationTOken = token },
        matrix => matrix.Rotate(degrees));
}

// 추천하지 않는 방식
void RotateMatrics2(IEnumerable<Matrix> matrices, float degrees,
    CancellationToken token)
{
    Paralle.ForEach(matrices, matrix =>
    {
        matrix.Rotate(degrees);
        token.ThrowIfCancellationRequeested();
    }
}

 

6. System.Reactive 코드의 취소

모든 작업을 리액티브 연산자로 수행하고 ToTask를 호출해서 마지막 요소를 대기할 수 있는 작업으로 변환하여 취소 기능을 지원하게 한다.

CancellationToken cancellationToken = ...
IObservable<int> observable = ...
int lastElement = await observable.TaskLast(1).ToTask(cancellationToken);

구독의 삭제에 대응하여 취소를 요청하는 방법도 있다.

using (var cancellation = new CancellationDisposable())
{
    CancellationToken token = cancellation.Token;
    // 취소에 대응하는 메서드에 토큰을 전달한다.
}
// 이 시점에서 토큰은 취소된 상태

 

7. 데이터 흐름 메시의 취소

CancellationToken을 취소 가능한 API로 전달하는 것이다. 데이터 흐름 블록의 각 블록은 DataflowBlockOptions를 통해 취소를 지원한다. 작성 중인 데이터 흐름 블록이 취소를 지원하게 하려면 블록 옵션에 CancellationToken을 설정한다.

IPropagatorBlock<int, int> CreateMyCustomBlock(
    CancellationToken cancellationToken)
{
    var blockOptions = new ExecutionDataflowBlockOptions
    {
        CancellationToken = cancellationToken,
    };
    
    var multiplyBlock = new TransformBlock<int, int>(item => item * 2,
        blockOptions);
    var addBlock = new TransformBlock<int, int>(item => item + 2,
        blockOptions);
    var divideBlock = new TransformBlock<int, int>(item => item / 2,
        blockOptions);
        
    var flowCompletion = new DataflowLinkOptions
    {
        PropagateCompletion = true,
    };
    multiplyBlock.LinkTo(addBloc, flowCompletion);
    addBlock.LinkTo(divideBlock, flowCompletion);
    
    return DataflowBlock.Encapsulate(multiplyBlock, divideBlock);
}

각 블록에 CancellationToken을 적용해야 하며, 취소 시 중간에 데이터 손실이 일어날 수 있음을 알고 있어야 한다.

 

8. 취소 토큰 소스의 연결

두 개 이상의 토큰을 연결하여 전달한 여러 개의 토큰을 일괄적으로 처리할 때 사용

async Task<HttpResponseMessage> GetWithTimeoutAsync(HttpClient client,
    string url, CancellationToken cancellationToken)
{
    using CancellationTokenSource cts = CancellationTokenSource
        .CreateLinkedTokenSource(cancellationToken);
    cts.CancelAfter(TimeSpan.FromSeconds(2));
    CancellationToken combinedToken = cts.Token;
    
    return await client.GetAsync(url, combinedToken);
}

 

9. 다른 취소 방식과 상호운용

자체적인 취소의 개념이 있는 외부 코드 또는 예전 코드를 취소하고 싶을 때 CancellationToken.Register 메서드를 통해 토큰에 콜백을 등록하여 사용하면 된다.

async Task<PingReply> PingAsync(string hostNameOrAddress,
    CancellationToken cancellationToken)
{
    using var ping = new Ping();
    Task<PingReply> task = ping.SendPingAsync(hostNameOrAddress);
    using CancelltinoTokenRegistration _ = cancellationToken
        .Register(() => pingSendAsyncCancel());
    return await task.
}
Comments