ReactiveX
http://reactivex.io/
Observable 스트림을 이용한 비동기 이벤트 기반 프로그래밍 프레임워크
행위(Behavioral) 디자인 패턴의 종류 중 하나인 Observer pattern을 확장
기존의 Future를 이용한 비동기 구문은 하나의 구문을 이용해 서로 다른 이벤트를 여러 차례 받는 것이 불가능하다.
이러한 Future의 단점을 극복한 것이 ReactiveX(이하 RX)
RxScala
RxJava 의 adapter
https://github.com/ReactiveX/RxScala
RxScala는 기본 라이브러리가 아니므로, 아래 Maven 경로를 이용하여 다운
https://mvnrepository.com/artifact/io.reactivex/rxscala_2.11/0.26.4
Observable[T] - T 타입의 이벤트를 만들어내는 이벤트 스트림
Observable 인스턴스는 subscribe 메소드를 이용해 observer(관찰자)를 받음
val o = Observable just (1,2,3)
o subscribe (n => println(s"first subscribe : $n"))
o subscribe (n => println(s"second subscribe : $n"))
위의 코드는 아래와 같은 결과를 비동기적으로 호출한다.
first subscribe : 1 first subscribe : 2 first subscribe : 3 second subscribe : 1 second subscribe : 2 second subscribe : 3 |
아래와 같이 예외가 발생했을 때, 동작하는 코드를 작성하면 다음과 같이 동작한다.
val exception = new Exception
val o = Observable.just(1,2) ++ Observable.error(exception) ++ Observable.just(3,4)
o.subscribe(
n => println(s"number $n"),
e => println(s"error : $e")
)
number 1 number 2 error : java.lang.Exception |
첫 번째로 넘긴 observer에서 1,2를 정상 출력 하지만,
그 다음 exception에서 예외가 발생하여 해당 이벤트를 추가로 실행시키지 않고 두 번째 observer로 넘긴다.
그리고 예외가 발생하여 해당 인스턴스가 오류 상태에 진입하여 3,4에 대하여 이벤트를 발생시키지 않는다.
Observable 객체의 상태(state)는 uncompleted, error, completed 3 가지가 있다.
이 상태들을 이용해 이벤트를 발생시키는 트레이트(trait) 는 다음과 같다.
//rx.lang.scala.Observer.scala
trait Observer[-T] extends scala.AnyRef {
private[scala] val asJavaObserver : rx.Observer[_ >: T] = { /* compiled code */ }
def onNext(value : T) : scala.Unit = { /* compiled code */ }
def onError(error : scala.Throwable) : scala.Unit = { /* compiled code */ }
def onCompleted() : scala.Unit = { /* compiled code */ }
}
object Observer extends scala.AnyRef with rx.lang.scala.ObserverFactoryMethods[rx.lang.scala.Observer] {
private[scala] def apply[T](observer : rx.Observer[T]) : rx.lang.scala.Observer[T] = { /* compiled code */ }
def apply[T](onNext : scala.Function1[T, scala.Unit], onError : scala.Function1[scala.Throwable, scala.Unit], onCompleted : scala.Function0[scala.Unit]) : rx.lang.scala.Observer[T] = { /* compiled code */ }
}
다음 예제는 Observer 인스턴스를 생성하고 Obervable 인스턴스에 직접 대입한다.
val exception = new Exception
val o1 = Observable.just(1,2) ++ Observable.error(exception) ++ Observable.just(3,4)
val o2 = Observable.just(1,2) ++ Observable.just(3,4)
val observer = new Observer[Int]{
override def onNext(n: Int) = println(s"number $n")
override def onError(error: Throwable) = println(s"error : $error")
override def onCompleted() = println("The End!!")
}
o1.subscribe(observer)
println("------------------------------------------")
o2.subscribe(observer)
number 1 number 2 error : java.lang.Exception ------------------------------------------ number 1 number 2 number 3 number 4 The End!! |
uncompleted 상태에서 error, completed 상태 두 가지 중 하나의 상태로만 이동할 수 있다.
따라서, 예외가 발생하면 오버라이드한 onError 메소드의 이벤트만 생성시킨다.
Future & Observable
Future를 이용하여 Observable 인스턴스를 생성 가능
Future 성공 => Future의 결과를 이벤트로 반환 및 onCompleted 메소드 호출
Future 실패 => onError 메소드 호출
아래 예제 코드와 같이 Future를 이용하여 Observable 인스턴스를 생성할 수 있다.
또, 주석과 같이 from 메소드를 이용하여 생성할 수도 있다.
import scala.concurrent._
import ExecutionContext.Implicits.global
val future = Future {"Back to the Future !!"}
val observable = Observable[String]{ observer =>
future foreach {case string => observer.onNext(string)}
future.failed foreach {case exception => observer.onError(exception)}
Subscription()
}
observable.subscribe(println(_))
//val observable2 = Observable.from(Future {"Back to the Future 22 !!"})
Observable Combinator
Observable 객체의 장점은 바로 여러 Combinator를 이용하여 조합이 가능하다는 것이다.
아래 예제는 함수 Combinator를 이용하여 0.1초 간격으로 짝수를 생성하는 이벤트이다.
import scala.concurrent.duration._
val even = Observable.interval(0.1.seconds).filter(_%2==0).map(n => s"number : $n").take(7)
even.subscribe(println(_), exception => println(s"exception : $exception"), ()=>println("no more even number"))
Thread sleep 2000
number : 0 number : 2 number : 4 number : 6 number : 8 number : 10 number : 12 no more even number |
스칼라의 for 내장 기법을 이용하면 더 짧은 코드로 프로그래밍이 가능하다.
val even2 = for(n <- Observable.from(0 until 12); if n%2==0)
yield s"number : $n"
내포된 Observable(고차 이벤트 스트림 Higher-order event stream)
고차함수의 예
foreach : (T => Unit) => Unit 처럼 함수 타입 안에 함수가 내포되어 있음
고차 이벤트 스트림
Observable[ Observable[ T ] ] 와 같이 이벤트가 내포
오래 걸리는 연산을 하는 Future 를 바탕으로 생성한 Observable 객체가 있다고 가정
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
def longObservable : Observable[String] = Observable.from(Future{
val random = scala.util.Random.nextInt(10)
Thread sleep 100 * random
"Long Long Exec : " + random
})
def observable : Observable[Observable[String]] = Observable.interval(0.1.seconds).take(4).map {
n => longObservable.map(exec => s"exec $n : " + exec)
}
내포된 Observable[ Observable[String] ] 객체에서 String이벤트를 처리하기 위해 두 가지 방법이 있음
먼저 concat 함수는 이벤트의 실행 순서를 유지하면서 이벤트 결과를 반환한다
observable.concat.subscribe(println(_))
Thread sleep 2000
exec 0 : Long Long Exec : 2 exec 1 : Long Long Exec : 2 exec 2 : Long Long Exec : 1 exec 3 : Long Long Exec : 0 |
두 번째로, flatten 함수는 이벤트의 실행 순서를 보장하지 않고
먼저 완료되는 대로 결과를 반환한다
flatten 은 특정 이벤트가 완료하지 않거나, 무한정 이벤트를 발생시킬 수 있을 때 사용
observable.flatten.subscribe(println(_))
Thread sleep 2000
exec 3 : Long Long Exec : 4 exec 1 : Long Long Exec : 7 exec 0 : Long Long Exec : 9 exec 2 : Long Long Exec : 8 |
flatten + map 조합의 경우
flatMap 으로 대치하여 사용할 수 있다.
def observable : Observable[String] =
Observable.interval(0.1.seconds).take(4).flatMap{ n => longObservable.map(exec => s"exec $n : " + exec)}
observable.subscribe(println(_))
Thread sleep 2000
flatMap을 사용한 경우는
for 내장식으로 바꿔서 코드를 더 간결하게 사용 가능하다.
def observable = for {
n <- Observable.interval(0.1.seconds).take(4)
exec <- longObservable
} yield s"exec $n : " + exec
observable.subscribe(println(_))
Thread sleep 2000
Ref) 스칼라 동시성 프로그래밍