ReactiveX


http://reactivex.io/


Observable 스트림을 이용한 비동기 이벤트 기반 프로그래밍 프레임워크


행위(Behavioral) 디자인 패턴의 종류 중 하나인 Observer pattern을 확장


기존의 Future를 이용한 비동기 구문은 하나의 구문을 이용해 서로 다른 이벤트를 여러 차례 받는 것이 불가능하다.

이러한 Future의 단점을 극복한 것이 ReactiveX(이하 RX)



RxScala


RxJava 의 adapter

https://github.com/ReactiveX/RxScala


RxScala는 기본 라이브러리가 아니므로, 아래 Maven 경로를 이용하여 다운

https://mvnrepository.com/artifact/io.reactivex/rxscala_2.11/0.26.4



Observable[T] - T 타입의 이벤트를 만들어내는 이벤트 스트림


Observable 인스턴스는 subscribe 메소드를 이용해 observer(관찰자)를 받음


val o = Observable just (1,2,3)
o subscribe (n => println(s"first subscribe : $n"))
o subscribe (n => println(s"second subscribe : $n"))


위의 코드는 아래와 같은 결과를 비동기적으로 호출한다.


first subscribe : 1

first subscribe : 2

first subscribe : 3

second subscribe : 1

second subscribe : 2

second subscribe : 3 



아래와 같이 예외가 발생했을 때, 동작하는 코드를 작성하면 다음과 같이 동작한다.


val exception = new Exception
val o = Observable.just(1,2) ++ Observable.error(exception) ++ Observable.just(3,4)

o.subscribe(
  n => println(s"number $n"),
  e => println(s"error : $e")
)

number 1

number 2

error : java.lang.Exception


첫 번째로 넘긴 observer에서 1,2를 정상 출력 하지만,

그 다음 exception에서 예외가 발생하여 해당 이벤트를 추가로 실행시키지 않고 두 번째 observer로 넘긴다.


그리고 예외가 발생하여 해당 인스턴스가  오류 상태에 진입하여 3,4에 대하여 이벤트를 발생시키지 않는다.



Observable 객체의 상태(state)는 uncompleted, error, completed 3 가지가 있다.


이 상태들을 이용해 이벤트를 발생시키는 트레이트(trait) 는 다음과 같다.


//rx.lang.scala.Observer.scala
trait Observer[-T] extends scala.AnyRef {
  private[scala] val asJavaObserver : rx.Observer[_ >: T] = { /* compiled code */ }
  def onNext(value : T) : scala.Unit = { /* compiled code */ }
  def onError(error : scala.Throwable) : scala.Unit = { /* compiled code */ }
  def onCompleted() : scala.Unit = { /* compiled code */ }
}
object Observer extends scala.AnyRef with rx.lang.scala.ObserverFactoryMethods[rx.lang.scala.Observer] {
  private[scala] def apply[T](observer : rx.Observer[T]) : rx.lang.scala.Observer[T] = { /* compiled code */ }
  def apply[T](onNext : scala.Function1[T, scala.Unit], onError : scala.Function1[scala.Throwable, scala.Unit], onCompleted : scala.Function0[scala.Unit]) : rx.lang.scala.Observer[T] = { /* compiled code */ }
}



다음 예제는 Observer 인스턴스를 생성하고 Obervable 인스턴스에 직접 대입한다.



val exception = new Exception
val o1 = Observable.just(1,2) ++ Observable.error(exception) ++ Observable.just(3,4)
val o2 = Observable.just(1,2) ++ Observable.just(3,4)

val observer = new Observer[Int]{
  override def onNext(n: Int) = println(s"number $n")
  override def onError(error: Throwable) = println(s"error : $error")
  override def onCompleted() = println("The End!!")
}

o1.subscribe(observer)
println("------------------------------------------")
o2.subscribe(observer)


number 1

number 2

error : java.lang.Exception

------------------------------------------

number 1

number 2

number 3

number 4

The End!!


uncompleted 상태에서 error, completed 상태 두 가지 중 하나의 상태로만 이동할 수 있다.

따라서, 예외가 발생하면 오버라이드한 onError 메소드의 이벤트만 생성시킨다.




Future & Observable


Future를 이용하여 Observable 인스턴스를 생성 가능


Future 성공 => Future의 결과를 이벤트로 반환 및 onCompleted 메소드 호출

Future 실패 => onError 메소드 호출



아래 예제 코드와 같이 Future를 이용하여 Observable 인스턴스를 생성할 수 있다.

또, 주석과 같이 from 메소드를 이용하여 생성할 수도 있다.

import scala.concurrent._
import ExecutionContext.Implicits.global

val future = Future {"Back to the Future !!"}
val observable = Observable[String]{ observer =>
  future foreach {case string => observer.onNext(string)}
  future.failed foreach {case exception => observer.onError(exception)}
  Subscription()
}
observable.subscribe(println(_))
//val observable2 = Observable.from(Future {"Back to the Future 22 !!"})



Observable Combinator


Observable 객체의 장점은 바로 여러 Combinator를 이용하여 조합이 가능하다는 것이다.


아래 예제는 함수 Combinator를 이용하여 0.1초 간격으로 짝수를 생성하는 이벤트이다.



import scala.concurrent.duration._

val even = Observable.interval(0.1.seconds).filter(_%2==0).map(n => s"number : $n").take(7)
even.subscribe(println(_), exception => println(s"exception : $exception"), ()=>println("no more even number"))
Thread sleep 2000


number : 0

number : 2

number : 4

number : 6

number : 8

number : 10

number : 12

no more even number




스칼라의 for 내장 기법을 이용하면 더 짧은 코드로 프로그래밍이 가능하다.


val even2 = for(n <- Observable.from(0 until 12); if n%2==0)
      yield s"number : $n"



내포된 Observable(고차 이벤트 스트림 Higher-order event stream)


고차함수의 예

foreach : (T => Unit) => Unit 처럼 함수 타입 안에 함수가 내포되어 있음


고차 이벤트 스트림

Observable[ Observable[ T ] ] 와 같이 이벤트가 내포


오래 걸리는 연산을 하는 Future 를 바탕으로 생성한 Observable 객체가 있다고 가정



import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

def longObservable : Observable[String] = Observable.from(Future{
  val random = scala.util.Random.nextInt(10)
  Thread sleep 100 * random
  "Long Long Exec : " + random
})

def observable : Observable[Observable[String]] = Observable.interval(0.1.seconds).take(4).map {
  n => longObservable.map(exec => s"exec $n : " + exec)
}


내포된 Observable[ Observable[String] ] 객체에서 String이벤트를 처리하기 위해 두 가지 방법이 있음


먼저 concat 함수는 이벤트의 실행 순서를 유지하면서 이벤트 결과를 반환한다

observable.concat.subscribe(println(_))
Thread sleep 2000


exec 0 : Long Long Exec : 2

exec 1 : Long Long Exec : 2

exec 2 : Long Long Exec : 1

exec 3 : Long Long Exec : 0 



두 번째로, flatten 함수는 이벤트의 실행 순서를 보장하지 않고

먼저 완료되는 대로 결과를 반환한다


flatten 은 특정 이벤트가 완료하지 않거나, 무한정 이벤트를 발생시킬 수 있을 때 사용

observable.flatten.subscribe(println(_))
Thread sleep 2000



exec 3 : Long Long Exec : 4

exec 1 : Long Long Exec : 7

exec 0 : Long Long Exec : 9

exec 2 : Long Long Exec : 8



flatten + map 조합의 경우

flatMap 으로 대치하여 사용할 수 있다.


def observable : Observable[String] =
Observable.interval(0.1.seconds).take(4).flatMap{ n => longObservable.map(exec => s"exec $n : " + exec)}

observable.subscribe(println(_))
Thread sleep 2000




flatMap을 사용한 경우는

for 내장식으로 바꿔서 코드를 더 간결하게 사용 가능하다.


def observable = for {
  n <- Observable.interval(0.1.seconds).take(4)
  exec <- longObservable
} yield s"exec $n : " + exec

observable.subscribe(println(_))
Thread sleep 2000


Ref) 스칼라 동시성 프로그래밍

+ Recent posts