방프리

23.11.07 C# 동시성 프로그래밍 (비동기 스트림) 본문

C#/동시성 처리

23.11.07 C# 동시성 프로그래밍 (비동기 스트림)

방프리 2023. 11. 7. 00:03

1. 비동기 스트림 생성

 

- yield return을 사용해서 반환

- 비동기 스트림은 내부적으로 ValueTask를 사용, 취소해야할 경우 CancellationToken을 사용하여 취소

 

async IAsyncEnumerable<string> GetValuesAsync(HttpClient client)
{
    var offset = 0;
    const int limit = 10;
    while (true)
    {
        var result = await client.GetStringAsync(
        	$"https://example.com/api/values?offset={offset}&limit={limit}");
        string[] valuesOnThisPage = result.Split('\n');
        
        foreach (string value in valuesOnThisPage)
        {
            yield rturn value;
        }
        
        if (valuesOnThisPage.Length != limit)
        {
            break;
        }
        
        offset += limit;
    }
}

 

2. 비동기 스트림 사용

 

- await foreach를 통해서 사용

- ConfigureAwait(false) 지원

- 동작이 동기일 수도, 비동기일 수도 있다.

IAsyncEnumerable<string> GetValuesAsync(HttpClient client);

public async Task ProcessValueAsync(HttpClient client)
{
    await foreach (var value in GetValuesAsync(client).ConfigureAwait(false))
    {
    	await Task.Delay(100).ConfigureAwait(false);
        Console.WriteLine(value);
    }
}

 

3. 비동기 스트림과 LINQ를 함께 사용

 

- System.Linq.Async Nuget을 통해 설치 가능

- 일반적인 Where은 비동기를 지원하지 않기 때문에 WhereAwait으로 사용

(이로 미루어보아 대부분의 linq 쿼리가 ~Await인 것 같음)

- 비동기 스트림 (IEnumerableAsync)는 기본적으로 Linq도 비동기를 지원함

 

IAsyncEnumerable<int> values = SlowRange().WhereAwait(
    async value =>
    {
    	await Task.Delay(10);
        return value % 2 = 0;
    });

await foreach (int result in values)
{
	Console.WriteLine(result);
}

async IAsyncEnumerable<int> SlowRange()
{
    for (var i = 0; i != 10; ++i)
    {
    	await Task.Delay(i * 100);
        yield return i;
    }
}

만약 조건이 비동기라면 ~AsyncAwait을 활용하는 것도 좋다.

int count = await SlowRange().CountAwaitAsync(
    async value =>
    {
    	await Task.Delay(10);
        return value % 2 == 0;
    });

 

3.4 비동기 스트림의 취소

 

- Cancellation Token 활용해서 취소하기

 

async Task ConsumeSequence(IAsyncEnumerable<int> items)
{
    using var cts = new CancellationTOkenSource(500);
    CancellationToken token = cts.Token;
    await foreach (int result in items.WithCancellation(token))
    {
    	Console.WriteLine(result);
    }
}

async IAsyncEnumerable<int> SlowRange(
	[EnumeratorCancelltion] CancellationToken token = default)
{
    for (var i = 0; i != 10; ++i)
    {
    	await Task.Delay(i * 100, token);
        yield return i;
    }
}

await ConsumeSequence(SlowRange());

//WithCancellation 확장 메서드는 ConfigureAwait(false)를 방해하지 않음
async Task ConsumeSequence(IAsyncEnumerable<int> items)
{
    using var cts = new CancellationTokenSource(500);
    CancellationToken token = cts.Token;
    await foreach (int result in items
    	.WithCancellation(token).ConfigureAwait(false))
    {
    	Console.WriteLine(result);
    }
}
Comments