다음을 통해 공유


LINQ 연산자를 사용하여 관찰 가능한 시퀀스 쿼리

기존 .NET 이벤트와의 브리징에서 기존 .NET 이벤트를 관찰 가능한 시퀀스로 변환하여 구독했습니다. 이 항목에서는 이러한 개체를 조작하기 위해 Rx 어셈블리에서 일반 LINQ 연산자를 제공하는 IObservable T> 개체로 관찰 가능한<시퀀스의 첫 번째 클래스 특성을 살펴봅니다. 대부분의 연산자는 관찰 가능한 시퀀스를 사용하고 해당 시퀀스에 대해 일부 논리를 수행하고 다른 관찰 가능한 시퀀스를 출력합니다. 또한 코드 샘플에서 볼 수 있듯이 소스 시퀀스에 여러 연산자를 연결하여 결과 시퀀스를 정확한 요구 사항으로 조정할 수도 있습니다.

다른 연산자 사용

이전 topics 만들기 및 생성 연산자를 사용하여 간단한 시퀀스를 만들고 반환했습니다. 또한 FromEventPattern 연산자를 사용하여 기존 .NET 이벤트를 관찰 가능한 시퀀스로 변환했습니다. 이 항목에서는 데이터를 필터링, 그룹화 및 변환할 수 있도록 관찰 가능 형식의 다른 정적 LINQ 연산자를 사용합니다. 이러한 연산자는 관찰 가능한 시퀀스를 입력으로 사용하고 관찰 가능한 시퀀스를 출력으로 생성합니다.

다른 시퀀스 결합

이 섹션에서는 관찰 가능한 다양한 시퀀스를 단일 관찰 가능한 시퀀스로 결합하는 연산자 중 일부를 살펴봅니다. 시퀀스를 결합하면 데이터가 변환되지 않습니다.

다음 샘플에서는 Concat 연산자를 사용하여 두 시퀀스를 단일 시퀀스로 결합하고 구독합니다. 설명을 위해 매우 간단한 Range(x, y) 연산자를 사용하여 x로 시작하고 나중에 y 순차적 숫자를 생성하는 정수 시퀀스를 만듭니다.

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Concat(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

결과 시퀀스는 입니다 1,2,3,1,2,3. Concat 연산자를 사용하는 경우 첫 번째 시퀀스()가 모든 값의 푸시를 완료할 때까지 두 번째 시퀀스(source1source2)가 활성화되지 않기 때문입니다. 가 완료된 source2 후에 source1 만 결과 시퀀스에 값을 푸시하기 시작합니다. 그러면 구독자는 결과 시퀀스에서 모든 값을 가져옵니다.

이를 Merge 연산자와 비교합니다. 다음 샘플 코드를 실행하면 가 표시됩니다 1,1,2,2,3,3. 이는 두 시퀀스가 동시에 활성화되고 값이 원본에서 발생할 때 푸시되기 때문입니다. 결과 시퀀스는 마지막 소스 시퀀스가 값 푸시를 완료한 경우에만 완료됩니다.

병합이 작동하려면 관찰 가능한 모든 원본 시퀀스가 동일한 형식의 IObservable<T>여야 합니다. 결과 시퀀스는 IObservable<T> 형식입니다. 시퀀스의 중간에 OnError를 생성하면 source1 결과 시퀀스가 즉시 완료됩니다.

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Merge(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Catch 연산자를 사용하여 다른 비교를 수행할 수 있습니다. 이 경우 source1 오류 source2 없이 완료되면 가 시작되지 않습니다. 따라서 다음 샘플 코드를 실행하면 (를 생성하는 4,5,6)이 무시되기 때문에 만 source2 가져옵니다1,2,3.

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(4, 3);
source1.Catch(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

마지막으로 OnErrorResumeNext를 살펴보겠습니다. 오류로 source2 인해 완료할 수 없는 경우에도 source1 이 연산자는 로 이동합니다. 다음 예제에서는 Throw 연산자를 사용하여 예외로 종료되는 시퀀스를 나타내더라도 source1 구독자는 에서 게시한 값(1,2,3)을 source2받습니다. 따라서 두 원본 시퀀스에서 오류가 발생할 것으로 예상되는 경우 구독자가 여전히 일부 값을 받을 수 있도록 OnErrorResumeNext를 사용하는 것이 더 안전합니다.

var source1 = Observable.Throw<int>(new Exception("An error has occurred."));
var source2 = Observable.Range(4, 3);
source1.OnErrorResumeNext(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

이러한 모든 조합 연산자가 작동하려면 관찰 가능한 모든 시퀀스가 동일한 형식의 T여야 합니다.

프로젝션

Select 연산자는 관찰 가능한 시퀀스의 각 요소를 다른 형식으로 변환할 수 있습니다.

다음 예제에서는 각각 길이 n의 문자열로 정수 시퀀스를 프로젝트합니다.

var seqNum = Observable.Range(1, 5);
var seqString = from n in seqNum
                select new string('*', (int)n);
seqString.Subscribe(str => { Console.WriteLine(str); });
Console.ReadKey();

기존 .NET 이벤트와의 브리징 항목에서 확인한 .NET 이벤트 변환 예제의 확장인 다음 샘플에서는 Select 연산자를 사용하여 IEventPattern<MouseEventArgs> 데이터 형식을 Point 형식으로 프로젝션합니다. 이러한 방식으로 마우스 이동 이벤트 시퀀스를 다음 "필터링" 섹션에서 볼 수 있듯이 추가로 구문 분석하고 조작할 수 있는 데이터 형식으로 변환합니다.

var frm = new Form();
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
                                          select evt.EventArgs.Location;
points.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);

마지막으로 SelectMany 연산자를 살펴보겠습니다. SelectMany 연산자에는 많은 오버로드가 있으며, 그 중 하나는 선택기 함수 인수를 사용합니다. 이 선택기 함수는 관찰 가능한 원본에 의해 푸시된 모든 값에 대해 호출됩니다. 이러한 각 값에 대해 선택기는 이를 관찰 가능한 미니 시퀀스로 투영합니다. 결국 SelectMany 연산자는 이러한 모든 미니 시퀀스를 단일 결과 시퀀스로 평면화한 다음 구독자에게 푸시합니다.

SelectMany에서 반환된 관찰 가능 항목은 원본 시퀀스와 선택기에서 생성된 모든 미니 관찰 가능한 시퀀스가 완료된 후 OnCompleted를 게시합니다. 원본 스트림에서 오류가 발생하거나, 선택기 함수에서 예외가 throw되었거나, 관찰 가능한 미니 시퀀스에서 오류가 발생했을 때 OnError가 발생합니다.

다음 예제에서는 먼저 5초마다 정수 를 생성하는 원본 시퀀스를 만들고 Take 연산자를 사용하여 생성된 처음 2개 값만 사용하기로 결정합니다. 그런 다음 를 사용하여 SelectMany 의 다른 시퀀스를 사용하여 이러한 정수 각각을 프로젝트 {100, 101, 102}합니다. 이렇게 하면 관찰 가능한 두 개의 미니 시퀀스 및 {100, 101, 102}가 생성 {100, 101, 102} 됩니다. 이들은 마침내 의 정수 {100, 101, 102, 100, 101, 102} 의 단일 스트림으로 평면화하고 관찰자에게 밀어 넣습니다.

var source1 = Observable.Interval(TimeSpan.FromSeconds(5)).Take(2);
var proj = Observable.Range(100, 3);
var resultSeq = source1.SelectMany(proj);

var sub = resultSeq.Subscribe(x => Console.WriteLine("OnNext : {0}", x.ToString()),
                              ex => Console.WriteLine("Error : {0}", ex.ToString()),
                              () => Console.WriteLine("Completed"));
Console.ReadKey();

필터링

다음 예제에서는 Generate 연산자를 사용하여 관찰 가능한 간단한 숫자 시퀀스를 만듭니다. Generate 연산자에는 여러 오버로드가 있습니다. 이 예제에서는 초기 상태(예제에서는 0), 종료할 조건부 함수(10회 미만), 반복기(+1), 결과 선택기(현재 값의 정사각형 함수)를 사용합니다. 및 는 Where 및 Select 연산자를 사용하여 15보다 작은 항목만 출력합니다.

  
IObservable<int> seq = Observable.Generate(0, i => i < 10, i => i + 1, i => i * i);
IObservable<int> source = from n in seq
                          where n < 5
                          select n;
source.Subscribe(x => {Console.WriteLine(x);});   // output is 0, 1, 4, 9
Console.ReadKey();

다음 예제는 이 항목의 앞부분에서 본 프로젝션 예제의 확장입니다. 이 샘플에서는 Select 연산자를 사용하여 IEventPattern<MouseEventArgs> 데이터 형식을 Point 형식으로 프로젝터했습니다. 다음 예제에서는 Where 및 Select 연산자를 사용하여 관심 있는 마우스 이동만 선택합니다. 이 경우 마우스 이동이 첫 번째 bisector(x 및 y 좌표가 같음)를 통해 마우스 이동을 필터링합니다.

var frm = new Form(); 
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
                                          select evt.EventArgs.Location;
var overfirstbisector = from pos in points
                        where pos.X == pos.Y 
                        select pos;
var movesub = overfirstbisector.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);

시간 기반 작업

버퍼 연산자를 사용하여 시간 기반 작업을 수행할 수 있습니다.

관찰 가능한 시퀀스를 버퍼링하면 관찰 가능한 시퀀스의 값이 지정된 시간 범위 또는 개수 임계값에 따라 버퍼에 배치됩니다. 이는 시퀀스에 의해 엄청난 양의 데이터가 푸시될 것으로 예상되고 구독자에게 이러한 값을 처리할 리소스가 없는 경우에 특히 유용합니다. 시간 또는 개수에 따라 결과를 버퍼링하고 조건이 초과되거나 원본 시퀀스가 완료된 경우에만 값 시퀀스를 반환하면 구독자는 자체 속도로 OnNext 호출을 처리할 수 있습니다. 

다음 예제에서는 먼저 1초마다 간단한 정수 시퀀스를 만듭니다. 그런 다음, Buffer 연산자를 사용하고 각 버퍼에 시퀀스의 5개 항목이 저장되도록 지정합니다. 버퍼가 가득 차면 OnNext가 호출됩니다. 그런 다음 Sum 연산자를 사용하여 버퍼의 합계를 집계합니다. 버퍼가 자동으로 플러시되고 다른 주기가 시작됩니다. 인쇄물 10, 35, 60… 은 10=0+1+2+3+4, 35=5+6+7+8+9 등입니다.

var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(5);
bufSeq.Subscribe(values => Console.WriteLine(values.Sum()));
Console.ReadKey();

지정된 시간 범위의 버퍼를 만들 수도 있습니다. 다음 예제에서 버퍼는 3초 동안 누적된 항목을 보유합니다. 인쇄물은 3, 12, 21... 3=0+1+2, 12=3+4+5 등입니다.

var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(TimeSpan.FromSeconds(3));
bufSeq.Subscribe(value => Console.WriteLine(value.Sum()));  
Console.ReadKey();

버퍼 또는 창을 사용하는 경우 필터링하기 전에 시퀀스가 비어 있지 않은지 확인해야 합니다.

범주별 LINQ 연산자

범주별 LINQ 연산자 항목에는 해당 범주별로 관찰 가능한 형식으로 구현된 모든 주요 LINQ 연산자의 목록이 나와 있습니다. 특히 생성, 변환, 결합, 기능, 수학, 시간, 예외, 기타, 선택 및 기본 형식.

참고 항목

참조

Observable

개념

범주별 LINQ 연산자