방프리
24.05.09 C# 동시성 프로그래밍 (상호운용) 본문
1. 'Async' 메서드와 'Completed' 이벤트용 비동기 래퍼
Async로 끝나는 메서드와 Completed로 끝나는 이벤트를 사용하는 패턴을 이벤트 기반 비동기 패턴 (EAP, Event-based Asynchronous Pattern)이라고 한다. 이 EAP가 작업 기반 비동기 패턴 (TAP, Task-based Asynchronuous Pattern)을 따를 수 있게 Task를 반환하는 메서드로 감싸려 한다.
public static Task<string> DownloadStringTaskAsync(this WebClient client,
Uri address)
{
var tcs = new TaskCompletionSource<string>();
DownloadStringCompletedEventHandler handler = null;
handler = (_, e) =>
{
client.DownloadStringCompleted -= handler;
if (e.Cancelled)
tcs.TrySetCancled();
else if (e.Error != null)
tcs.TrySetException(e.Error);
else
tcs.TrySetResult(e.Result);
};
client.DownloadStringCompleted += handler;
client.DownloadStringAsync(address);
return tcs.Task;
}
2. 'Begin'과 'End' 메서드용 비동기 래퍼
APM을 감싸는 가장 좋은 방법은 TaskFactory 형식의 FromAsync 메서드 중 하나를 사용하는 것이다. FromAsync 또한 내부적으로 TaskCompletionSource<TResult>를 사용하지만, FromAsync가 사용면에서 좀 더 쉽다.
public static Task<WebResponse> GetResponseAsync(this WebRequest client)
{
return Task<WebResponse>.Factory.FromAsync(client.BeginGetResponse,
client.EndGetResponse, null);
}
FromAsync의 마지막 인자를 null로 해줄 때 이점은 C# 4.0 버전 이전에는 async/await이 존재하지 않아 state 개체를 사용하는 것이 일반적이였기에 신규 버전에서는 필요가 없어 null로 인자를 넘기는 것이다. 최근 state 개체는 메모리 사용을 최적화할 때 클로저 인스턴스를 피하는 용도로만 쓰인다.
3. 범용 비동기 래퍼
async가 등장하기 전 MS에서 추천하는 방식은 1번의 EAP, 2번의 APM이였다. 하지만 현재는 비공식적으로 다음과 같이 콜백형식으로 많이 사용한다.
public interface IMyAsyncHttpService
{
void DownloadString(Uri address, Action<string, Exception> callback);
}
//실제 사용
public static Task<string> DownloadStringAsync(
this IMyAsyncHttpService httpService, Uri address)
{
var tcs = new TaskCompletionSource<string>();
httpService.DownloadString(address, (result, exception) =>
{
if (exception != null)
tcs.TrySetException(exception);
else
tcs.TrySetResult(result);
});
return tcs.Task;
}
4. 병렬 처리 코드용 래퍼
Parallel 형식과 PLINQ는 스레드 풀을 사용해서 병렬 처리를 수행한다. 또 호출 스레드를 병렬 처리 스레드의 하나로 합류시키기도 한다. UI의 응답성을 유지하려면 다음과 같이 하자
await Task.Run(() => Parallel.ForEach(...));
5. System.Reactive 옵저버블용 비동기 래퍼
이벤트 스트림에서 관심이 있는 옵저버블 이벤트를 선택해야한다.
1. 스트림이 끝나기 전 마지막 이벤트 => await으로 LastAsync의 결과를 기다리거나 옵저버블을 기다림
IObservable<int> observable = ...;
int lastElement = await observable.LastAsync();
// 또는 int lastElement = await observable;
2. 다음이벤트 => FirstAsync를 사용 스트림을 구독한 뒤에 첫 번째 이벤트가 도착하자마자 완료하면서 구독 해지
IObservable<int> observable = ...;
int nextElement = await observable.FirstAsync();
3. 모든 이벤트 => ToList를 사용
IObservable<int> observable = ...;
IList<int> allElements = await observable.ToList();
6. async 코드용 System.Reactive 옵저버블 래퍼
모든 비동기 작업은 두 가지 옵저버블 스트림 중 하나로 다룰 수 있다.
- 하나의 요소를 만들어 낸 뒤에 완료
- 요소를 만들어 내지 못하고 실패
// ToObservable을 사용
IObservable<HttpResponseMessage> GetPage(HttpClient client)
{
Task<HttpResponseMessage> task =
client.GetAsync("http://www.example.com/");
return task.ToObservable();
}
// StartAsync 사용 StartAsync는 즉시 async를 호출하지만 구독을 삭제하면 비동기 메서드를 취소
IObservable<HttpResponseMessage> GetPage(HttpClient client)
{
return Observable.StartAsync(
token => client.GetAsync("http://www.example.com/", token));
}
// 콜드 옵저버블, 구독 후에만 사용 가능 그리고 취소가 가능
IObservable<HttpResponseMessage> GetPage(HttpClient client)
{
return Observable.FromAsync(
token => client.GetAsync("http://www.example.com/", token));
}
// 소스 스트림의 각 이벤트가 도착하는 대로 비동기 작업 시작
IObservable<HttpResponseMessage> GetPages(
IObservable<string> urls, HttpClient client)
{
return urls.SelectMany(
(url, token) => client.GetAsync(url, token));
}
7. 비동기 스트림과 데이터 흐름 메시
비동기 스트림을 사용하는 부분과 데이터 흐름 메시를 사용하는 부분끼리 데이터를 전달하게 하려면 하단의 코드를 참고해보자
public static class DataflowExtensions
{
public static bool TryReceiveItem<T>(this ISourceBlock<T> block, out T value)
{
if (block is IReceivableSourceBlock<T> receivableSourceBlock)
return receivableSourceBlock.TryReceive(out value);
try
{
value = block.Receive(TimeSpan.Zero);
return true;
}
catch (TimeoutException)
{
value = default;
return false;
}
catch (InvalidOperationException)
{
value = default;
return false;
}
}
public static async IAsyncEnumerable<T> ReceiveAllAsync<T>(
this ISourceBlock<T> block,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (await block
.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
{
while (block.TryReceiveItem(out var value))
{
yield return value;
}
}
}
}
//실제 사용
var multiplyBlock = new TransformBlock<int, int>(value => value * 2);
multiplyBlock.Post(5);
multiplyBlock.Post(2);
multiplyBlock.Complete();
await foreach (int item in multiplyBlock.ReceiveAllAsync())
{
Console.WriteLine(item);
}
8. System.Reactive 옵저버블과 데이터 흐름 메시
System.Reactive 옵저버블과 데이터 흐름 메시를 사용하는 기능을 연결하고자 할 때 하단의 코드와 같이 진행한다.
// 데이터 흐름 블록을 옵저버블 스트림의 입력으로 사용
var buffer = new BufferBlock<int>();
IObservable<int> integers = buffer.AsObservable();
integers.Subscribe(data => Trace.WriteLine(data),
ex => Trace.WriteLine(ex),
() => Trace.WriteLine("Done"));
buffer.Post(13);
// 옵저버블 스트림을 메시의 입력으로 사용
IObservable<DateTimeOffset> ticks =
Observable.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Select(x => x.TimeStamp)
.Take(5);
var display = new ActionBlock<DateTimeOffset>(x => Trace.WriteLine(x));
ticks.Subscribe(display.AsObserver());
try
{
display.Completion.Wait();
Trace.WriteLine("Done.");
}
catch (Exception ex)
{
Trace.WriteLine(ex);
}
9. System.Reactive 옵저버블을 비동기 스트림으로 변환
System.Reactive 옵저버블은 푸시 기반이고, 비동기 스트림은 풀 기반이다. 두 기능은 개념적으로 다르기에 옵저버블 스트림의 응답성을 유지하려면 소비하는 코드의 요청이 있을 때까지 알림을 저장할 방법이 필요하다.
단, 약간의 간격 차가 있기에 메모리 문제가 발생할 수 있다. 메모리 문제는 큐를 제한함으로써 어느 정도 해결이 될 순 있으나 버려지는 알람이 생길 수 있기에 후처리가 필요하다.
public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
this IObservable<T> observable, int bufferSize)
{
var bufferOptions = new BoundedChannelOptions(bufferSize)
{
FullMode = BoundedChannelFullMode.DropOldest,
};
Channel<T> buffer = Channel.CreateBounded<T>(bufferOptions);
using (observable.Subscribe(
value => buffer.Writer.TryWrite(value),
error => buffer.Writer.Complete(error),
() => buffer.Writer.Complete()))
{
await foreach (T item in buffer.Reader.ReadAllAsync())
yield return item;
}
}
'C# > 동시성 처리' 카테고리의 다른 글
24.05.22 C# 동시성 프로그래밍 (취소) (0) | 2024.05.22 |
---|---|
24.05.21 C# 동시성 프로그래밍 (컬렉션) (0) | 2024.05.21 |
23.11.19 C# 동시성 프로그래밍 (테스트) (0) | 2023.11.19 |
23.11.12 C# 동시성 프로그래밍 (Reactive) (1) | 2023.11.12 |
23.11.11 C# 동시성 프로그래밍 (TPL) (0) | 2023.11.11 |