방프리

24.05.09 C# 동시성 프로그래밍 (상호운용) 본문

C#/동시성 처리

24.05.09 C# 동시성 프로그래밍 (상호운용)

방프리 2024. 5. 9. 21:38

1. 'Async' 메서드와 'Completed' 이벤트용 비동기 래퍼

Async로 끝나는 메서드와 Completed로 끝나는 이벤트를 사용하는 패턴을 이벤트 기반 비동기 패턴 (EAP, Event-based Asynchronous Pattern)이라고 한다. 이 EAP가 작업 기반 비동기 패턴 (TAP, Task-based Asynchronuous Pattern)을 따를 수 있게 Task를 반환하는 메서드로 감싸려 한다.

public static Task<string> DownloadStringTaskAsync(this WebClient client,
    Uri address)
{
    var tcs = new TaskCompletionSource<string>();
    
    DownloadStringCompletedEventHandler handler = null;
    handler = (_, e) =>
    {
        client.DownloadStringCompleted -= handler;
        if (e.Cancelled)
            tcs.TrySetCancled();
        else if (e.Error != null)
            tcs.TrySetException(e.Error);
        else 
            tcs.TrySetResult(e.Result);
    };
    
    client.DownloadStringCompleted += handler;
    client.DownloadStringAsync(address);
    
    return tcs.Task;
}

 

2. 'Begin'과 'End' 메서드용 비동기 래퍼

APM을 감싸는 가장 좋은 방법은 TaskFactory 형식의 FromAsync 메서드 중 하나를 사용하는 것이다. FromAsync 또한 내부적으로 TaskCompletionSource<TResult>를 사용하지만, FromAsync가 사용면에서 좀 더 쉽다.

public static Task<WebResponse> GetResponseAsync(this WebRequest client)
{
    return Task<WebResponse>.Factory.FromAsync(client.BeginGetResponse,
        client.EndGetResponse, null);
}

FromAsync의 마지막 인자를 null로 해줄 때 이점은 C# 4.0 버전 이전에는 async/await이 존재하지 않아 state 개체를 사용하는 것이 일반적이였기에 신규 버전에서는 필요가 없어 null로 인자를 넘기는 것이다. 최근 state 개체는 메모리 사용을 최적화할 때 클로저 인스턴스를 피하는 용도로만 쓰인다.

3. 범용 비동기 래퍼

async가 등장하기 전 MS에서 추천하는 방식은 1번의 EAP, 2번의 APM이였다. 하지만 현재는 비공식적으로 다음과 같이 콜백형식으로 많이 사용한다.

public interface IMyAsyncHttpService
{
    void DownloadString(Uri address, Action<string, Exception> callback);
}

//실제 사용
public static Task<string> DownloadStringAsync(
    this IMyAsyncHttpService httpService, Uri address)
{
    var tcs = new TaskCompletionSource<string>();
    httpService.DownloadString(address, (result, exception) =>
    {
        if (exception != null)
            tcs.TrySetException(exception);
        else
            tcs.TrySetResult(result);
    });
    return tcs.Task;
}

 

4. 병렬 처리 코드용 래퍼

Parallel 형식과 PLINQ는 스레드 풀을 사용해서 병렬 처리를 수행한다. 또 호출 스레드를 병렬 처리 스레드의 하나로 합류시키기도 한다. UI의 응답성을 유지하려면 다음과 같이 하자

await Task.Run(() => Parallel.ForEach(...));

 

5. System.Reactive 옵저버블용 비동기 래퍼

이벤트 스트림에서 관심이 있는 옵저버블 이벤트를 선택해야한다.

1. 스트림이 끝나기 전 마지막 이벤트 => await으로 LastAsync의 결과를 기다리거나 옵저버블을 기다림

IObservable<int> observable = ...;
int lastElement = await observable.LastAsync();
// 또는 int lastElement = await observable;

2. 다음이벤트 => FirstAsync를 사용 스트림을 구독한 뒤에 첫 번째 이벤트가 도착하자마자 완료하면서 구독 해지

IObservable<int> observable = ...;
int nextElement = await observable.FirstAsync();

3. 모든 이벤트 => ToList를 사용

IObservable<int> observable = ...;
IList<int> allElements = await observable.ToList();

 

6. async 코드용 System.Reactive 옵저버블 래퍼

모든 비동기 작업은 두 가지 옵저버블 스트림 중 하나로 다룰 수 있다.

  • 하나의 요소를 만들어 낸 뒤에 완료
  • 요소를 만들어 내지 못하고 실패
// ToObservable을 사용
IObservable<HttpResponseMessage> GetPage(HttpClient client)
{
    Task<HttpResponseMessage> task = 
        client.GetAsync("http://www.example.com/");
    return task.ToObservable();
}

// StartAsync 사용 StartAsync는 즉시 async를 호출하지만 구독을 삭제하면 비동기 메서드를 취소
IObservable<HttpResponseMessage> GetPage(HttpClient client)
{
     return Observable.StartAsync(
         token => client.GetAsync("http://www.example.com/", token));
}

// 콜드 옵저버블, 구독 후에만 사용 가능 그리고 취소가 가능
IObservable<HttpResponseMessage> GetPage(HttpClient client)
{
    return Observable.FromAsync(
        token => client.GetAsync("http://www.example.com/", token));
}

// 소스 스트림의 각 이벤트가 도착하는 대로 비동기 작업 시작
IObservable<HttpResponseMessage> GetPages(
    IObservable<string> urls, HttpClient client)
{
    return urls.SelectMany(
        (url, token) => client.GetAsync(url, token));
}

 

7. 비동기 스트림과 데이터 흐름 메시

비동기 스트림을 사용하는 부분과 데이터 흐름 메시를 사용하는 부분끼리 데이터를 전달하게 하려면 하단의 코드를 참고해보자

public static class DataflowExtensions
{
    public static bool TryReceiveItem<T>(this ISourceBlock<T> block, out T value)
    {
        if (block is IReceivableSourceBlock<T> receivableSourceBlock)
            return receivableSourceBlock.TryReceive(out value);
        
        try
        {
            value = block.Receive(TimeSpan.Zero);
            return true;
        }
        catch (TimeoutException)
        {
            value = default;
            return false;
        }
        catch (InvalidOperationException)
        {
            value = default;
            return false;
        }
    }
    
    public static async IAsyncEnumerable<T> ReceiveAllAsync<T>(
        this ISourceBlock<T> block,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        while (await block
            .OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
        {
            while (block.TryReceiveItem(out var value))
            {
                yield return value;
            }
        }
    }
}

//실제 사용
var multiplyBlock = new TransformBlock<int, int>(value => value * 2);

multiplyBlock.Post(5);
multiplyBlock.Post(2);
multiplyBlock.Complete();

await foreach (int item in multiplyBlock.ReceiveAllAsync())
{
    Console.WriteLine(item);
}

 

8. System.Reactive 옵저버블과 데이터 흐름 메시

System.Reactive 옵저버블과 데이터 흐름 메시를 사용하는 기능을 연결하고자 할 때 하단의 코드와 같이 진행한다.

// 데이터 흐름 블록을 옵저버블 스트림의 입력으로 사용
var buffer = new BufferBlock<int>();
IObservable<int> integers = buffer.AsObservable();
integers.Subscribe(data => Trace.WriteLine(data),
    ex => Trace.WriteLine(ex),
    () => Trace.WriteLine("Done"));

buffer.Post(13);

// 옵저버블 스트림을 메시의 입력으로 사용
IObservable<DateTimeOffset> ticks = 
    Observable.Interval(TimeSpan.FromSeconds(1))
        .Timestamp()
        .Select(x => x.TimeStamp)
        .Take(5);
        
var display = new ActionBlock<DateTimeOffset>(x => Trace.WriteLine(x));
ticks.Subscribe(display.AsObserver());

try
{
    display.Completion.Wait();
    Trace.WriteLine("Done.");
}
catch (Exception ex)
{
    Trace.WriteLine(ex);
}

 

9. System.Reactive 옵저버블을 비동기 스트림으로 변환

System.Reactive 옵저버블은 푸시 기반이고, 비동기 스트림은 풀 기반이다. 두 기능은 개념적으로 다르기에 옵저버블 스트림의 응답성을 유지하려면 소비하는 코드의 요청이 있을 때까지 알림을 저장할 방법이 필요하다. 
단, 약간의 간격 차가 있기에 메모리 문제가 발생할 수 있다. 메모리 문제는 큐를 제한함으로써 어느 정도 해결이 될 순 있으나 버려지는 알람이 생길 수 있기에 후처리가 필요하다.

public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
    this IObservable<T> observable, int bufferSize)
{
    var bufferOptions = new BoundedChannelOptions(bufferSize)
    {
        FullMode = BoundedChannelFullMode.DropOldest,
    };
    Channel<T> buffer = Channel.CreateBounded<T>(bufferOptions);
    using (observable.Subscribe(
        value => buffer.Writer.TryWrite(value),
        error => buffer.Writer.Complete(error),
        () => buffer.Writer.Complete()))
    {
        await foreach (T item in buffer.Reader.ReadAllAsync())
            yield return item;
    }
}
Comments