방프리

23.11.12 C# 동시성 프로그래밍 (Reactive) 본문

C#/동시성 처리

23.11.12 C# 동시성 프로그래밍 (Reactive)

방프리 2023. 11. 12. 20:02

1. 닷넷 이벤트 변환

- FromEventPattern을 통해 범용적인 이벤트를 사용

- EventPattern을 선언할 때 강력하게 할 것인가? 혹은 강력한 데이터를 포기할 것인가

//강력한 데이터를 가져올 때
var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };
IObservable<EventPattern<ElapsedEventArgs>> ticks = 
    Observable.FromEventPattern<ElapsedEventHandler, ElapsedEventArgs>(
        handler => (s, a) => handler(s, a),
        handler => timer.Elapsed += handler,
        handler => timer.Elapsed -= handler);
ticks.Subscribe(data => Console.WriteLine("OnNext: " + data.EventArgs.SignalTime));

//리플렉션 방식
var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };
IObservable<EventPattern<object>> ticks = 
    Observable.FromEventPattern(timer, nameof(Timer.Elapsed));
ticks.Subscribe(data => Console.WriteLine("OnNext: "
	+ ((ElapsedEventArgs)data.EventArgs).SignalTime));

 

2. 컨텍스트로 알림 전달

- System.Reactive는 최대한 Thread-Safe 하도록 동작하려고 한다. 하지만 특정 케이스의 경우 컨텍스트가 
스위치된 상태 (Backgroudn Thread의 결과를 UI Thread에서 표현)에서 동작할 때 알림 전달 방식을 사용

private void Button_Click(object sender, RoutedEventArgs e)
{
    SynchronizationContext uiContext = SynchronizationContext.Current;
    Trace.WriteLine($"UI Thread is {Environment.CurrentManagedThreadId}");
    Observable.Interval(TimeSpan.FromSeconds(1))
    	.ObserveOn(uiContext)
        .Subscribe(x => Trace.WriteLine(
        	$"Interval {x} on thread {Environment.CurrentManagedThreadId}"));
}

 

3. Window와 Buffer로 이벤트 데이터 그룹화

- Buffer는 그룹화할 이벤트를 모두 기다린 후 하나의 컬렉션을 내보내고, Window는 Buffer와 동일하게 그룹화 하지
이벤트가 도착하는 대로 내보낸다.

//Buffer
Observable.Interval(TimeSpan.FronSeconds(1))
    .Buffer(2)
    .Subscribe(x => Console.WriteLine(
        $"{DateTime.Now.Second}: Got {x[0]} and {x[1]}"));
        
//Window
Observable.Interval(TimeSpan.FromSeconds(1))
    .Window(2)
    .Subscribe(group =>
    {
        Console.WriteLine($"{DateTtime.Now.Second}: Starting new group");
        group.Subscribe(
        	x => Console.WriteLine($"{DateTime.Now.Second}: Saw {x}"),
            () => Console.WriteLine($"{DateTime.Now.Second}: Ending group"));
    }

 

4. Throttle과 Sample로 이벤트 스트림 조절

- Throttle은 타임아웃 값을 통해 이벤트의 만료 등을 제어한다.

- Sample은 빠르게 전개하는 시퀸스를 제어한다.

//using throttle
private void Button_Click(object sender, RoutedEventArgs e)
{
    Observable.FromEventPattern<MouseeventHandler, MouseEventArgs>(
    	handler => (s, a) => handler(s, a),
        handler => MouseMove += handler,
        handler => MouseMove -= handler)
      .Select(x => x.EventArgs.GetPosition(this))
      .Throttle(TimeSpan.FronSeconds(1))
      .Subscribe(x => Console.WriteLine(
      	$"{DateTime.Now.Second}: Saw {x.X + x.Y}"));
}

//using sample
private void Button_Click(object sender, RoutedEventArgs e)
{
    Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
        handler => (s, a) => handler(s, a),
        handler => MouseMove += handler,
        handler => MouseMove -= handler,
      .Select(x => x.EventArgs.GetPosition(this))
      .Sample(TimeSpan.FromSecond(1))
      .Subscribe(x => Console.WriteLine(
       	$"{DateTime.Now.Second}: Saw {x.X + x.Y}"));
}

 

5. 타임아웃

- 입력 스트림에 슬라이딩 타임아웃 윈도우 설정, 새로운 이벤트 도착할 때마다 리셋

private void Button_Click(object sender, RoutedEventArgs e)
{
    Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
        handler => (s, a) => handler(s, a),
        handler => MouseMove += handler,
        handler => MouseMove -= handler,
      .Select(x => x.EventArgs.GetPosition(this))
      .Timeout(TimeSpan.FromSeconds(1))
      .Subscribe(
        x => Console.WriteLine($"{DateTime.Now.Second}: Saw {x.X + x.Y}"),
        ex => Console.WriteLine(ex));
}
Comments