본문 바로가기
Android.Kotlin

[Kotlin] Channel, ReceiveChannel, Pipeline

by 동동하다 2023. 10. 20.
반응형

Kotlin Channel 은 둘 이상의 코루틴 간의 동시성 통신을 가능하게 하는 강력한 구조입니다. Channel 은 서로를 간섭하지 않고 데이터를 공유하고 함께 작업할 수 있도록 합니다. Kotlin의 Channel에 대해서 알아보고 사용하는 팁과 예시를 알아보고 Channel을 이용하여 Pipeline을 어떻게 효율적으로 사용하는지 알아보겠습니다.

Channel 은 무엇인가?

Channel 은 한 코루틴에서 다른 코루틴으로 데이터가 흐를 수 있도록 파이프라인을 연결합니다. 본질적으로 코루틴의 메세지를 보내고 받을 수 있는 버퍼 또는 큐 입니다. 한 코루틴은 Channel에 데이터를 전송할 수 있고, 다른 코루틴 Channel을 통해 해당 데이터를 검색하고 받을 수 있습니다. 이는 코루틴을 위한 pub-sub 구조라고 할 수 있습니다.

데이터를 보내는 코루틴을 producer 라고 하고, 데이터를 받는 코루틴을 consumer라고 합니다. 하나 이상의 코루틴이 producer, consumer 가 될 수 있습니다.

여러 개의 코루틴이 동일한 Channel에서 데이터를 수신할 경우 데이터는 하나의 consumer 코루틴에서만 처리됩니다. 수신이 된 데이터는 즉시 Channel에서 제거됩니다.

 

아래 간단한 예시를 보도록 하겠습니다.

fun channelTest (
    coroutineScope:CoroutineScope
){
    val channel:Channel<String> = Channel<String>()

    coroutineScope.launch {
        channel.send("Data 1")
        channel.send("Data 2")
        channel.send("Data 3")
        channel.send("Data 4")
        channel.send("Data 5")
        channel.send("Data 6")
        channel.send("Data 7")
        channel.send("Data 8")
        channel.send("Data 9")
        channel.send("Data 10")
    }

    coroutineScope.launch {
        channel.consumeEach {
            Log.i("LOG", "#### coroutine 111 [${this}] received : $it ")
        }
    }

    coroutineScope.launch {
        channel.consumeEach {
            Log.i("LOG", "#### coroutine 222 [${this}] received : $it ")
        }
    }

    Log.i("LOG", ""#### this is coroutine channel test end")
}

첫번째 Channel에서 send 함수를 이용하여 데이터를 produce 하게 되면 두 번째, 세 번째 Channel에서 해당 데이터를 consumeEach 함수를 통해 receive 하게 됩니다. 

 

close 함수를 호출하면 Channel 에 token을 전달해서 Channel 이 더 이상 보낼 데이터가 없다는 것을 인지 시킵니다. token을 받은 Channel 은 지금까지 보냈던 elements 가 수신 되기를 기다렸다가 모두 수신되면 종료됩니다. close는 Channel 사용을 효율적으로 관리해 주기에 잘 사용해야 합니다.

 

순서의 차이가 있기는 하겠지만, 위 예제의 결과는 다음과 같습니다.

I  #### this is coroutine channel test end
I  #### coroutine 222 [StandaloneCoroutine{Active}@9a8d955] received : Data 2 
I  #### coroutine 111 [StandaloneCoroutine{Active}@bda9f6a] received : Data 1 
I  #### coroutine 111 [StandaloneCoroutine{Active}@bda9f6a] received : Data 3 
I  #### coroutine 222 [StandaloneCoroutine{Active}@9a8d955] received : Data 4 
I  #### coroutine 111 [StandaloneCoroutine{Active}@bda9f6a] received : Data 5 
I  #### coroutine 111 [StandaloneCoroutine{Active}@bda9f6a] received : Data 7 
I  #### coroutine 111 [StandaloneCoroutine{Active}@bda9f6a] received : Data 8 
I  #### coroutine 222 [StandaloneCoroutine{Active}@9a8d955] received : Data 6 
I  #### coroutine 111 [StandaloneCoroutine{Active}@bda9f6a] received : Data 9 
I  #### coroutine 222 [StandaloneCoroutine{Active}@9a8d955] received : Data 10

ReceiveChannel

ReceiveChannel 은 데이터를 수신하는 Channel 로 주로 sender에게 다시 데이터를 전달할 필요가 없을 때 사용합니다. ReceiveChannel을 이용할 경우 해당 Channel 은 오직 수신만 하는 역할을 한다는 것을 보장받을 수 있기에 유지보수에 유리합니다. 

fun receiveChannelTest (
    coroutineScope: CoroutineScope
) {
    var channel :ReceiveChannel<String> = Channel()

    coroutineScope.launch {
        channel = produce {
            send("A")
            send("B")
            send("C")
            send("D")
            send("E")
            send("F")
        }
    }

    coroutineScope.launch {
        channel.consumeEach {
            Log.i("LOG", "Received data : $it")
        }

        Log.i("LOG", "is producer closed : ${channel.isClosedForReceive}")
    }
}

위 예제에서 produce 를 사용하여 ReceiveChannel을 구현하였다.

produce를 사용하면 따로 Channel을 close 해줄 필요 없이 알아서 정상적으로 send 가 완료 되든, 예외가 발생하여 비정상적으로 완료가 되든 Channel 은 close 됩니다.

 

위 예제의 결과는 다음과 같습니다.

I  Received data : A
I  Received data : B
I  Received data : C
I  Received data : D
I  Received data : E
I  Received data : F
I  is producer closed : true

Pipeline

Pipeline 은 특정 로직을 처리하기 위해 필요한 일련의 과정을 스탭으로파 만들어서 모아둔 형태를 말합니다. 코틀린에서 스탭은 Channel 형태로 Input을 받아서 데이터를 처리하고 결괏값을 다음 스탭에서 사용할 수 있는 Channel 형태로 반환하는 코루틴입니다.

 

Pipeline 스탭에서 Input/Output 으로 말하는 Channel 은 데이터를 동기적으로 처리되고, 각 스탭 간 Channel의 의존도가 낮습니다. 이는 Pipeline에서 처리하는 데이터의 관리를 효율적으로 합니다.

 

fun CoroutineScope.squares(
    num:ReceiveChannel<Int>
) :ReceiveChannel<Int> = produce {
    for (x in num) send( x * x )
}

fun CoroutineScope.getEven(
    num:ReceiveChannel<Int>
):ReceiveChannel<Int> = produce {
    for (x in num) if (x % 2 == 0) send(x)
}

fun CoroutineScope.produceNum() = produce {
    var x = 1
    while (true) send(x++)
}

fun pipelineTest (
    coroutineScope: CoroutineScope
) {
    coroutineScope.launch {
        val nums = produceNum()
        val even = getEven(nums)
        val squared = squares(even)

        for (i in 1..10) {
            Log.i("LOG", "the result is : ${squared.receive()}")
        }
        
        println("Done!")
        coroutineContext.cancelChildren()
    }
}

위 예제는 임의의 전달 받은 자연수에서 짝수만을 선별하여 제곱 처리 하도록 하는 Pipeline입니다. 

숫자를 생성하고, 짝수를 선별하고, 제곱하는 로직을 Channel 로 만들어서 Pipeline의 각 스탭으로 처리하였습니다. 만약 로직이 변경되어 짝수가 아닌 홀수를 선별해야 한다면 getEven 스탭을 getOdd 로 변경하여 쉽게 처리할 수 있습니다. 

 

Pipeline의 각 스탭은 무한 반복 될 수 있기 때문에 모든 작업이 종료된 이후에 명시적으로 종료할 필요가 있습니다. 

 

예제의 결과는 아래와 같습니다.

I  the result is : 4
I  the result is : 16
I  the result is : 36
I  the result is : 64
I  the result is : 100
I  the result is : 144
I  the result is : 196
I  the result is : 256
I  the result is : 324
I  the result is : 400
반응형