Functional Program Design in Scala


  Scala 언어를 만든 마틴 오더스키 교수의 Coursera MOOC 2탄이다. 1탄에 대한 소감은 이 링크에 있다. 1탄에서는 Scala와 Functional Programming의 기본 과정을 다뤘다면, 이번 강의는 심화 과정을 다룬다.


  처음에는 1장에서 배웠던 for 구문에 대한 심화 과정을 알려준다. for와 yield를 조합한 코드는  Scala 컴파일러가 map, flatMap 과 같은 Higher-order Function으로 변경시킨 후, 해당 작업을 처리한다. 이러한 변환은 Higher-order Function을 직접 사용하는 것보다 code에 대한 추상화를 더 강력하게 지원한다. 그리고 난수를 생성하는 방법을 함수형 스럽게 지원한다. 가장 작은 기본 단위인 Int형 난수 생성기를 이용해, 단계적으로 아주 큰 범위의 난수까지 생성하는 과정을 보여준다. 이 장의 마지막에는 함수형 프로그래밍의 끝판왕 중 하나인 Monad에 대해 설명하고 있다. Scala에서 Monad는 flatMap() 함수로 나타난다. Monad는 카테고리 이론에서 나온 것으로써, 단어 자체의 어원과는 전혀 상관없는 이론을 나타낸다. Monad에 대해서 이 강의에서는 기본적인 개념만 설명하고 있다. 이 강의를 가지고 Monad를 완벽하게 이해하는 것은 어려운 것 같다. Scalaz 라이브러리나 스칼라로 배우는 함수형 프로그래밍 책, 또는 Haskell 프로그래밍 언어나 Category Theory 책 같은 것을 이용하여 향우에 깊기 파야지 완벽하게 이해할 수 있을 것 같다.


  2장에선는 Lazy Evaluation 에 관한 설명을 한다. Lazy Evaluation을 위한 lazy 키워드나 Stream 클래스를 설명하고, 이를 이용하여 Infinite Sequences를 생성하는 방법을 알려준다. Lazy Evaluation을 이용하면 런타임에서 특정 값을 사용할 때, 필요한 경우에만 값을 계산하여 준비를 한 후 사용하기 때문에, 프로그램의 최적화가 가능하다.


 3장에서는 실무에서 쓰일법 한 프로그래밍인 상태(state)를 주제로 다루고 있다. 논리회로 시간에 배웠던 가산기를 만드는 방법을 Scala 코드로 알려준다. And gate나 Or gate 등을 이용해 Half Adder, Full Adder 등을 조합해서 만드는 방법을 State를 가지고 만든다. 역시 작은 단위부터 큰 단위까지 만들기 위한 점진적인 방법을 사용하는데, 이것이 함수형 프로그래밍의 장점 중 하나인 것 같다.


  마지막 장에서는, 얼마 전에 한참 유행하던 FRP(Functional Reactive Programming)을 우선 다룬다. 기존 Imperative Programming에서 다루던 MVC와 같은 구조는 Muti-Threading 환경에서 동기화 처리를 하기 힘든데, FRP를 사용하면 이런 작업이 아주 편하다는 장점을 알려준다. FRP 의 기본 개념 및 간단한 FRP를 구현하는 방법을 알려준다. 그 뒤에 Future를 중점적으로 다룬다. Future를 설명할 때 에릭 매이어가 오더스키 교수를 대신해 강의를 한다. 마이어는 Microsoft 재직 당시 C#의 RINQ를 만들고, Async이나 Await 같은 동시성 프로그래밍 개념을 만들어 다른 프로그래밍 언어로까지 전파시킨 네임드 개발자이다. 강의가 오더스키 교수 때와는 다르게, 대화를 하는 것 같은 재미있는 상황이 펼쳐졌지만, 오더스키 교수가 구사하던 깔끔한 영어와는 다르기 때문에 약간 알아듣기가 힘들었다. 마이어가 C9에서 Haskell 을 가지고 진행한 함수형 프로그래밍 강의가 있는데, 이 것도 시간이 되면 수강할 예정이다. 강의 마지막에는 Future의 Monad(flatMap) 을 구현하는 방법을 알려주는데, 아직 내공이 부족해서 고개만 끄덕거리고 넘어갔다.


  두 개의 Scala를 이용한 Functional Programming 강의를 들었는데, 책으로 보던 것보다는 내공이 많이 쌓인 것 같다. 좀 더 학문적으로 접근하기 위해 Category Theory는 Functional Programming에서는 필수인 것 같다. 좀 더 내공이 쌓이면 Monad나 Category Theory에 대한 포스팅을 할 예정이다. 또, Functional Programming에 관해 얘기하는 사람들을의 대다수는 SICP에 대해서도 언급을 많이 하고 있다. 봐야할 책과 자료들이 산처럼 쌓이고 있다. 이런 것들을 보다 보면, 좀 더 나은 개발자가 될 수 있다고 믿기 때문에 재미 없을 때까지 계속 산더미를 치워야 겠다.

ReactiveX


http://reactivex.io/


Observable 스트림을 이용한 비동기 이벤트 기반 프로그래밍 프레임워크


행위(Behavioral) 디자인 패턴의 종류 중 하나인 Observer pattern을 확장


기존의 Future를 이용한 비동기 구문은 하나의 구문을 이용해 서로 다른 이벤트를 여러 차례 받는 것이 불가능하다.

이러한 Future의 단점을 극복한 것이 ReactiveX(이하 RX)



RxScala


RxJava 의 adapter

https://github.com/ReactiveX/RxScala


RxScala는 기본 라이브러리가 아니므로, 아래 Maven 경로를 이용하여 다운

https://mvnrepository.com/artifact/io.reactivex/rxscala_2.11/0.26.4



Observable[T] - T 타입의 이벤트를 만들어내는 이벤트 스트림


Observable 인스턴스는 subscribe 메소드를 이용해 observer(관찰자)를 받음


val o = Observable just (1,2,3)
o subscribe (n => println(s"first subscribe : $n"))
o subscribe (n => println(s"second subscribe : $n"))


위의 코드는 아래와 같은 결과를 비동기적으로 호출한다.


first subscribe : 1

first subscribe : 2

first subscribe : 3

second subscribe : 1

second subscribe : 2

second subscribe : 3 



아래와 같이 예외가 발생했을 때, 동작하는 코드를 작성하면 다음과 같이 동작한다.


val exception = new Exception
val o = Observable.just(1,2) ++ Observable.error(exception) ++ Observable.just(3,4)

o.subscribe(
  n => println(s"number $n"),
  e => println(s"error : $e")
)

number 1

number 2

error : java.lang.Exception


첫 번째로 넘긴 observer에서 1,2를 정상 출력 하지만,

그 다음 exception에서 예외가 발생하여 해당 이벤트를 추가로 실행시키지 않고 두 번째 observer로 넘긴다.


그리고 예외가 발생하여 해당 인스턴스가  오류 상태에 진입하여 3,4에 대하여 이벤트를 발생시키지 않는다.



Observable 객체의 상태(state)는 uncompleted, error, completed 3 가지가 있다.


이 상태들을 이용해 이벤트를 발생시키는 트레이트(trait) 는 다음과 같다.


//rx.lang.scala.Observer.scala
trait Observer[-T] extends scala.AnyRef {
  private[scala] val asJavaObserver : rx.Observer[_ >: T] = { /* compiled code */ }
  def onNext(value : T) : scala.Unit = { /* compiled code */ }
  def onError(error : scala.Throwable) : scala.Unit = { /* compiled code */ }
  def onCompleted() : scala.Unit = { /* compiled code */ }
}
object Observer extends scala.AnyRef with rx.lang.scala.ObserverFactoryMethods[rx.lang.scala.Observer] {
  private[scala] def apply[T](observer : rx.Observer[T]) : rx.lang.scala.Observer[T] = { /* compiled code */ }
  def apply[T](onNext : scala.Function1[T, scala.Unit], onError : scala.Function1[scala.Throwable, scala.Unit], onCompleted : scala.Function0[scala.Unit]) : rx.lang.scala.Observer[T] = { /* compiled code */ }
}



다음 예제는 Observer 인스턴스를 생성하고 Obervable 인스턴스에 직접 대입한다.



val exception = new Exception
val o1 = Observable.just(1,2) ++ Observable.error(exception) ++ Observable.just(3,4)
val o2 = Observable.just(1,2) ++ Observable.just(3,4)

val observer = new Observer[Int]{
  override def onNext(n: Int) = println(s"number $n")
  override def onError(error: Throwable) = println(s"error : $error")
  override def onCompleted() = println("The End!!")
}

o1.subscribe(observer)
println("------------------------------------------")
o2.subscribe(observer)


number 1

number 2

error : java.lang.Exception

------------------------------------------

number 1

number 2

number 3

number 4

The End!!


uncompleted 상태에서 error, completed 상태 두 가지 중 하나의 상태로만 이동할 수 있다.

따라서, 예외가 발생하면 오버라이드한 onError 메소드의 이벤트만 생성시킨다.




Future & Observable


Future를 이용하여 Observable 인스턴스를 생성 가능


Future 성공 => Future의 결과를 이벤트로 반환 및 onCompleted 메소드 호출

Future 실패 => onError 메소드 호출



아래 예제 코드와 같이 Future를 이용하여 Observable 인스턴스를 생성할 수 있다.

또, 주석과 같이 from 메소드를 이용하여 생성할 수도 있다.

import scala.concurrent._
import ExecutionContext.Implicits.global

val future = Future {"Back to the Future !!"}
val observable = Observable[String]{ observer =>
  future foreach {case string => observer.onNext(string)}
  future.failed foreach {case exception => observer.onError(exception)}
  Subscription()
}
observable.subscribe(println(_))
//val observable2 = Observable.from(Future {"Back to the Future 22 !!"})



Observable Combinator


Observable 객체의 장점은 바로 여러 Combinator를 이용하여 조합이 가능하다는 것이다.


아래 예제는 함수 Combinator를 이용하여 0.1초 간격으로 짝수를 생성하는 이벤트이다.



import scala.concurrent.duration._

val even = Observable.interval(0.1.seconds).filter(_%2==0).map(n => s"number : $n").take(7)
even.subscribe(println(_), exception => println(s"exception : $exception"), ()=>println("no more even number"))
Thread sleep 2000


number : 0

number : 2

number : 4

number : 6

number : 8

number : 10

number : 12

no more even number




스칼라의 for 내장 기법을 이용하면 더 짧은 코드로 프로그래밍이 가능하다.


val even2 = for(n <- Observable.from(0 until 12); if n%2==0)
      yield s"number : $n"



내포된 Observable(고차 이벤트 스트림 Higher-order event stream)


고차함수의 예

foreach : (T => Unit) => Unit 처럼 함수 타입 안에 함수가 내포되어 있음


고차 이벤트 스트림

Observable[ Observable[ T ] ] 와 같이 이벤트가 내포


오래 걸리는 연산을 하는 Future 를 바탕으로 생성한 Observable 객체가 있다고 가정



import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

def longObservable : Observable[String] = Observable.from(Future{
  val random = scala.util.Random.nextInt(10)
  Thread sleep 100 * random
  "Long Long Exec : " + random
})

def observable : Observable[Observable[String]] = Observable.interval(0.1.seconds).take(4).map {
  n => longObservable.map(exec => s"exec $n : " + exec)
}


내포된 Observable[ Observable[String] ] 객체에서 String이벤트를 처리하기 위해 두 가지 방법이 있음


먼저 concat 함수는 이벤트의 실행 순서를 유지하면서 이벤트 결과를 반환한다

observable.concat.subscribe(println(_))
Thread sleep 2000


exec 0 : Long Long Exec : 2

exec 1 : Long Long Exec : 2

exec 2 : Long Long Exec : 1

exec 3 : Long Long Exec : 0 



두 번째로, flatten 함수는 이벤트의 실행 순서를 보장하지 않고

먼저 완료되는 대로 결과를 반환한다


flatten 은 특정 이벤트가 완료하지 않거나, 무한정 이벤트를 발생시킬 수 있을 때 사용

observable.flatten.subscribe(println(_))
Thread sleep 2000



exec 3 : Long Long Exec : 4

exec 1 : Long Long Exec : 7

exec 0 : Long Long Exec : 9

exec 2 : Long Long Exec : 8



flatten + map 조합의 경우

flatMap 으로 대치하여 사용할 수 있다.


def observable : Observable[String] =
Observable.interval(0.1.seconds).take(4).flatMap{ n => longObservable.map(exec => s"exec $n : " + exec)}

observable.subscribe(println(_))
Thread sleep 2000




flatMap을 사용한 경우는

for 내장식으로 바꿔서 코드를 더 간결하게 사용 가능하다.


def observable = for {
  n <- Observable.interval(0.1.seconds).take(4)
  exec <- longObservable
} yield s"exec $n : " + exec

observable.subscribe(println(_))
Thread sleep 2000


Ref) 스칼라 동시성 프로그래밍

+ Recent posts