방프리
23.11.07 C# 동시성 프로그래밍 (비동기 스트림) 본문
1. 비동기 스트림 생성
- yield return을 사용해서 반환
- 비동기 스트림은 내부적으로 ValueTask를 사용, 취소해야할 경우 CancellationToken을 사용하여 취소
async IAsyncEnumerable<string> GetValuesAsync(HttpClient client)
{
var offset = 0;
const int limit = 10;
while (true)
{
var result = await client.GetStringAsync(
$"https://example.com/api/values?offset={offset}&limit={limit}");
string[] valuesOnThisPage = result.Split('\n');
foreach (string value in valuesOnThisPage)
{
yield rturn value;
}
if (valuesOnThisPage.Length != limit)
{
break;
}
offset += limit;
}
}
2. 비동기 스트림 사용
- await foreach를 통해서 사용
- ConfigureAwait(false) 지원
- 동작이 동기일 수도, 비동기일 수도 있다.
IAsyncEnumerable<string> GetValuesAsync(HttpClient client);
public async Task ProcessValueAsync(HttpClient client)
{
await foreach (var value in GetValuesAsync(client).ConfigureAwait(false))
{
await Task.Delay(100).ConfigureAwait(false);
Console.WriteLine(value);
}
}
3. 비동기 스트림과 LINQ를 함께 사용
- System.Linq.Async Nuget을 통해 설치 가능
- 일반적인 Where은 비동기를 지원하지 않기 때문에 WhereAwait으로 사용
(이로 미루어보아 대부분의 linq 쿼리가 ~Await인 것 같음)
- 비동기 스트림 (IEnumerableAsync)는 기본적으로 Linq도 비동기를 지원함
IAsyncEnumerable<int> values = SlowRange().WhereAwait(
async value =>
{
await Task.Delay(10);
return value % 2 = 0;
});
await foreach (int result in values)
{
Console.WriteLine(result);
}
async IAsyncEnumerable<int> SlowRange()
{
for (var i = 0; i != 10; ++i)
{
await Task.Delay(i * 100);
yield return i;
}
}
만약 조건이 비동기라면 ~AsyncAwait을 활용하는 것도 좋다.
int count = await SlowRange().CountAwaitAsync(
async value =>
{
await Task.Delay(10);
return value % 2 == 0;
});
3.4 비동기 스트림의 취소
- Cancellation Token 활용해서 취소하기
async Task ConsumeSequence(IAsyncEnumerable<int> items)
{
using var cts = new CancellationTOkenSource(500);
CancellationToken token = cts.Token;
await foreach (int result in items.WithCancellation(token))
{
Console.WriteLine(result);
}
}
async IAsyncEnumerable<int> SlowRange(
[EnumeratorCancelltion] CancellationToken token = default)
{
for (var i = 0; i != 10; ++i)
{
await Task.Delay(i * 100, token);
yield return i;
}
}
await ConsumeSequence(SlowRange());
//WithCancellation 확장 메서드는 ConfigureAwait(false)를 방해하지 않음
async Task ConsumeSequence(IAsyncEnumerable<int> items)
{
using var cts = new CancellationTokenSource(500);
CancellationToken token = cts.Token;
await foreach (int result in items
.WithCancellation(token).ConfigureAwait(false))
{
Console.WriteLine(result);
}
}
'C# > 동시성 처리' 카테고리의 다른 글
23.11.19 C# 동시성 프로그래밍 (테스트) (0) | 2023.11.19 |
---|---|
23.11.12 C# 동시성 프로그래밍 (Reactive) (1) | 2023.11.12 |
23.11.11 C# 동시성 프로그래밍 (TPL) (0) | 2023.11.11 |
23.11.08 C# 동시성 프로그래밍 (병렬 처리) (0) | 2023.11.08 |
23.11.05 C# 동시성 프로그래밍 (기초 기능) (0) | 2023.11.05 |
Comments