일반적인 비동기 함수를 co_await 과 함께 사용하는 방법


결론부터 말하자면 비동기 API가 co_await의 피연산자가 될 수 있도록 디자인된 게 아니라면

코루틴에 사용하지 않는 것이 좋다.

co_await의 피연산자가 되려면 해당 api가 코루틴 그 자체여야 한다.

그게 아니라면 사용자가 직접 해당 api를 awaitable객체로 wrapping해야 한다.


그런데 이 방식의 문제점은 단순히 복잡성의 증가가 아니다.

예를 들어, 비동기 api가 완료 콜백 핸들러를 인자로 갖는다고 가정해보자.

void AsyncApi(
		int param1, 
    	int param2, 
		std::function<void(bool)>&& completion_callback);

이 경우 completion_callback또는 exception_handler에 코루틴 핸들을 넘겨 코루틴을 제어해야 한다.

그런데 그걸려면 비동기 API가 호출되는 시점에 이미 콜백에 코루틴 핸들을 넘겨야 한다.

// Awaitable 객체
template<typeanme... Args>
class AsyncAwaiter {
public:
    bool await_ready() const noexcept { return false; }

    void await_suspend(std::coroutine_handle<> h) {
        // 비동기 작업 시작 및 완료 콜백 설정
        AsyncApi(input_, [this, h](int result) {
            result_ = result;
            h.resume();  // 비동기 작업 완료 시 코루틴 재개
        });
    }

    int await_resume() const { return result_; }

    explicit AsyncAwaiter(int input) : input_(input) {}

private:
    int ;
    int result_;
};









이 콜백 함수가 호출되거나 성능 오버헤드


애초에 해당 API가 코루틴을 지원하는 방식으로 디자인 되어야 한다.


그렇지 않으면 결국 awaitable 객체로 직접 래핑 클래스를 만들어야 한다.

'awaitable 템플릿을 만들어두고 활용하면 괜찮지 않냐?' 싶겠지만 제약이 존재한다.

비동기 함수가 std::future를 반환하도록 디자인되있다면 쉽게 래핑할 수 있겠지만

사실상 이런 디자인이라면 굳이 코루틴을 사용할 필요도 거의 없다.


그러나 만약 비동기 API가 콜백 핸들러를 받는 디자인이라면 얘기가 달라진다.

이런 디자인이라면 콜백 핸들러에 코루틴을 제어할 수 있는 무언가를 전달해야만 한다.

굉장히 복잡하고 별로인 구조가 된다.

#include <coroutine>
#include <future>
#include <functional>
#include <iostream>
using namespace std::chrono_literals;

// 코루틴 객체
template <typename T>
struct AsyncResult
{
    struct promise_type
    {
        T result;
        std::exception_ptr exception;

        AsyncResult get_return_object()
        {
            return AsyncResult(std::coroutine_handle<promise_type>::from_promise(*this));
        }

        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }

        void unhandled_exception()
        {
            exception = std::current_exception();
        }

        void return_value(T value) { result = std::move(value); }
    };

    // 코루틴 핸들의 promise()메서드를 통해 비동기 작업의 결과를 반환받기 위해 필요
    // promise()메서드가 반환하는 것은 promise_type이다. 
    std::coroutine_handle<promise_type> coro;

    AsyncResult(std::coroutine_handle<promise_type> h) : coro(h)
    {
    }

    AsyncResult(AsyncResult&& other) noexcept : coro(other.coro) { other.coro = nullptr; }

    ~AsyncResult()
    {
        if (coro)
        {
            if (!coro.done())
            {
                std::cerr << "Warning: Destroying incomplete coroutine\n";
                throw std::runtime_error("coro is undone but destroyed");
            }
            coro.destroy();
        }
    }

    T get_result()
    {
        if (coro.promise().exception)
        {
            std::rethrow_exception(coro.promise().exception);
        }
        return std::move(coro.promise().result);
    }
};

template <typename T>
struct awaitable_wrapper
{
    std::future<T> future;

    bool await_ready() const { return future.wait_for(std::chrono::seconds(0)) == std::future_status::ready; }

    void await_suspend(std::coroutine_handle<> h)
    {
        std::thread([this, h]()
        {
            future.wait();
            h.resume();
        }).detach();
    }

    T await_resume() { return future.get(); }
};

template <typename Func, typename... Args>
auto make_awaitable(Func&& func, Args&&... args)
{
    using result_type = decltype(func(std::forward<Args>(args)...));
    // 이 구조는 결국 비동기 작업이 future를 반환해야 한다.
    // 혹은 비동기 작업에 대한 핸들을 반환하는 경우에는 이 핸들로 작업이 완료되었는지를 모니터링해야 한다.
    // 만약 콜백 핸들러를 넘기는 방식의 디자인이라면 결국 co_await의 피연산자로 사용할 수 없게 된다.
    auto future = std::async(std::launch::async, std::forward<Func>(func), std::forward<Args>(args)...);
    return awaitable_wrapper<result_type>{std::move(future)};
}

template <typename Func, typename... Args>
auto coroutine_wrapper(Func&& func, Args&&... args)
    -> AsyncResult<decltype(std::declval<Func>()(std::declval<Args>()...))>
{
    using result_type = decltype(std::declval<Func>()(std::declval<Args>()...));
    result_type result = co_await make_awaitable(std::forward<Func>(func), std::forward<Args>(args)...);
    co_return result;
}

// 일반적인 비동기 함수(std::future같은 것을 지원하지 않음)
int async_function(int x, int y)
{
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return x + y;
}

// 메인 함수에서의 사용
int main()
{
    auto result = coroutine_wrapper(async_function, 10, 32);
    std::cout << "Result: " << result.get_result() << std::endl;
    std::this_thread::sleep_for(3s);
    return 0;
}




1.   해당 비동기 함수를 awaitable object로 래핑하기


안되는 예시

// 이렇게 쉽게 반환 타입을 설정하면 작동하지 않는다
std::future<int> async_function();
int result = co_await async_function();  // 오류!

래핑한 예시

// 이렇게 래핑해야 한다
template <typename T>
auto await_future(std::future<T>&& f) {
    struct Awaiter {
        std::future<T> fut;
        bool await_ready() { return fut.wait_for(std::chrono::seconds(0)) == std::future_status::ready; }
        // await_suspend에서 코루틴 핸들을 통해 비동기 작업에 대한 처리 결과를 기다린다
        void await_suspend(std::coroutine_handle<> h) {
            std::thread([this, h] {
                fut.wait();
                h.resume();
            }).detach();
        }
        T await_resume() { return fut.get(); }
    };
    return Awaiter{std::move(f)};
}

// 이제 이렇게 사용할 수 있다
int result = co_await await_future(some_async_work());




비동기 작업 래핑 예시

#include <iostream>
#include <coroutine>
#include <functional>
#include <thread>
#include <chrono>
#include <future>
#include <vector>
#include <memory>
#include <stdexcept>
using namespace std::chrono_literals;

class AsyncJob
{
public:
    using Callback = std::function<void(int)>;

    void DoAsyncWork(int id, Callback callback)
    {
        std::cout << "DoAsyncWork called for id: " << id << std::endl;
        std::async(std::launch::async, [id,callback]()
        {
            std::this_thread::sleep_for(5000ms);
            callback(id * 10);
        });
    }
};

// 코루틴 객체
template <typename T>
struct CoroResult
{
    struct promise_type
    {
        std::promise<T> promise;

        CoroResult get_return_object()
        {
            std::cout << "CoroResult::promise_type::get_return_object()" << std::endl;
            return CoroResult(promise.get_future());
        }
        std::suspend_never initial_suspend()
        {
            std::cout << "CoroResult::promise_type::initial_suspend()" << std::endl;
            return {};
        }
        std::suspend_never final_suspend() noexcept
        {
            std::cout << "CoroResult::promise_type::final_suspend()" << std::endl;
            return {};
        }
        void return_value(T value)
        {
            std::cout << "CoroResult::promise_type::return_value()" << std::endl;
            promise.set_value(std::move(value));
        }
        void unhandled_exception()
        {
            std::cout << "CoroResult::promise_type::unhandled_exception()" << std::endl;
            promise.set_exception(std::current_exception());
        }
    };


    std::future<T> future;

    explicit CoroResult(std::future<T>&& f) : future(std::move(f))
    {
        std::cout << "CoroResult(std::future<T>&& f)" << std::endl;
    }

    T get()
    {
        std::cout << "CoroResult::get()" << std::endl;
        return future.get();
    }

    // co_await 수행을 위한 메서드 정의
    bool await_ready()
    {
        std::cout << "CoroResult::await_ready()" << std::endl;
        /* 아래 future.wait_for을 하는건 실제로는 non-blocking이며 즉시 상태를 확인한다.
         * 이때 true를 반환하면 await_suspend()를 호출하지 않고 즉시, await_resume()을 호출한다.
         * 하지만 false를 반환한다면 await_suspend()를 호출하게 되고 코루틴은 일시 중지된다.
         * await_suspend()내부에서 비동기로 future에 결과가 나올때 까지 대기하다가
         * future에 값이 들어오면 coroutine_handle.resume()을 하게 되어 await_resume()을 호출하고
         * 코루틴을 재개함*/
        return future.wait_for(0s) == std::future_status::ready;
    }
    void await_suspend(std::coroutine_handle<> _handle)
    {
        std::cout << "CoroResult::await_suspend(std::coroutine_handle<>)" << std::endl;
        std::async(std::launch::async, [this, _handle]()
        {
            future.wait();
            _handle.resume(); // future가 결과를 받으면 코루틴을 재개한다. 코루틴 핸들을 resume()하면 await_resume()메서드가 호출 됨
        });
    }
    T await_resume()
    {
        std::cout << "CoroResult::await_resume()" << std::endl;
        return future.get();
    }
};


CoroResult<int> fetch_data_coroutine(AsyncJob& api, int id)
{
    struct Awaiter
    {
        AsyncJob& api;
        int id;
        std::promise<int> promise;

        bool await_ready()
        {
            std::cout << "Awaiter::await_ready()" << std::endl;
            return false;
        }
        void await_suspend(std::coroutine_handle<> h)
        {
            std::cout << "Awaiter::await_suspend() called for id: " << id << std::endl;

            api.DoAsyncWork(id, [this, h](int value) mutable
            {
                std::cout << "Callback executed for id: " << id << " with value: " << value << std::endl;
                promise.set_value(value);
                h.resume(); // 이부분에서 await_resume() 메서드가 호출됨
            });
        }
        int await_resume()
        {
            int result = promise.get_future().get();
            std::cout << "Awaiter::await_resume() called, returning: " << result << std::endl;
            return result;
        }
    };

    std::cout << "fetch_data_coroutine called for id: " << id << std::endl;
    int result = co_await Awaiter{api, id};
    std::cout << "fetch_data_coroutine completed for id: " << id << " with result: " << result << std::endl;
    co_return result;
}


CoroResult<std::vector<int>> process_data(AsyncJob& api)
{
    std::vector<int> results;

    std::cout << "process_data started" << std::endl;
    for (int i = 0; i < 3; i++)
        results.emplace_back(co_await fetch_data_coroutine(api, i));

    std::cout << "process_data completed" << std::endl;

    co_return results;
}

int main()
{
    AsyncJob api;
    std::cout << "Starting async operations...\n";

    auto result = process_data(api);
    std::cout << "process_data coroutine created, calling get()" << std::endl;

    auto results = result.get();

    std::cout << "All operations completed. Results:\n";
    for (size_t i = 0; i < results.size(); ++i)
    {
        std::cout << "CoroResult " << (i + 1) << ": " << results[i] << '\n';
    }

    return 0;
}

/* 출력 :
 Starting async operations...
CoroResult::promise_type::get_return_object()
CoroResult(std::future<T>&& f)
CoroResult::promise_type::initial_suspend()
process_data started
CoroResult::promise_type::get_return_object()
CoroResult(std::future<T>&& f)
CoroResult::promise_type::initial_suspend()
fetch_data_coroutine called for id: 0
Awaiter::await_ready()
Awaiter::await_suspend() called for id: 0
DoAsyncWork called for id: 0
Callback executed for id: 0 with value: 0
Awaiter::await_resume() called, returning: 0
fetch_data_coroutine completed for id: 0 with result: 0
CoroResult::promise_type::return_value()
CoroResult::promise_type::final_suspend()
CoroResult::await_ready()
CoroResult::await_resume()
CoroResult::promise_type::get_return_object()
CoroResult(std::future<T>&& f)
CoroResult::promise_type::initial_suspend()
fetch_data_coroutine called for id: 1
Awaiter::await_ready()
Awaiter::await_suspend() called for id: 1
DoAsyncWork called for id: 1
Callback executed for id: 1 with value: 10
Awaiter::await_resume() called, returning: 10
fetch_data_coroutine completed for id: 1 with result: 10
CoroResult::promise_type::return_value()
CoroResult::promise_type::final_suspend()
CoroResult::await_ready()
CoroResult::await_resume()
CoroResult::promise_type::get_return_object()
CoroResult(std::future<T>&& f)
CoroResult::promise_type::initial_suspend()
fetch_data_coroutine called for id: 2
Awaiter::await_ready()
Awaiter::await_suspend() called for id: 2
DoAsyncWork called for id: 2
Callback executed for id: 2 with value: 20
Awaiter::await_resume() called, returning: 20
fetch_data_coroutine completed for id: 2 with result: 20
CoroResult::promise_type::return_value()
CoroResult::promise_type::final_suspend()
CoroResult::await_ready()
CoroResult::await_resume()
process_data completed
CoroResult::promise_type::return_value()
CoroResult::promise_type::final_suspend()
process_data coroutine created, calling get()
CoroResult::get()
All operations completed. Results:
CoroResult 1: 0
CoroResult 2: 10
CoroResult 3: 20

 */


위 비동기 작업 래핑 방식을 실행하여

디버깅을 통해 어떤 순서로 흐름이 흘러가는지 잘 익혀두자.

Q2. 코루틴은 성능을 크게 신경쓰지 않아도 될까?
A2. 코루틴 프레임 할당 시 관련 최적화 기술 HALO(Heap Allocation eLision Optimization)의 맹점에 주의해야 한다.

 

Posted by Elan
: