folly/folly/python/test/simplegenerator.pyx

# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
from cpython.ref cimport PyObject

from folly cimport cFollyTry
from folly.coro cimport bridgeCoroTaskWith
from folly.executor cimport cAsyncioExecutor, get_executor
from folly.async_generator cimport cAsyncGenerator, cAsyncGeneratorWrapper, cNextResult


cdef extern from "folly/python/test/simplegenerator.h" namespace "folly::python::test":
    cAsyncGenerator[int] make_generator_empty()
    cAsyncGenerator[int] make_generator()
    cAsyncGenerator[int] make_generator_error()


ctypedef cAsyncGeneratorWrapper[int] cIntGenerator


cdef extern from "<utility>" namespace "std" nogil:
    cdef cIntGenerator move "std::move"(cIntGenerator)


cdef class SimpleGenerator:

    cdef cIntGenerator generator
    cdef cAsyncioExecutor* executor

    def __cinit__(self, flavor):
        if flavor == "normal":
            self.generator = move(cIntGenerator(make_generator()))
        elif flavor == "empty":
            self.generator = move(cIntGenerator(make_generator_empty()))
        elif flavor == "error":
            self.generator = move(cIntGenerator(make_generator_error()))


    @staticmethod
    cdef void callback(
        cFollyTry[cNextResult[int]]&& res,
        PyObject* py_future,
    ):
        future = <object> py_future
        if res.hasException():
            try:
                res.exception().throw_exception()
            except Exception as ex:
                future.set_exception(ex)
        else:
            if res.value().has_value():
                future.set_result(res.value().value())
            else:
                future.set_exception(StopAsyncIteration())

    def __aiter__(self):
        return self

    def __anext__(self):
        loop = asyncio.get_event_loop()
        future = loop.create_future()
        bridgeCoroTaskWith[cNextResult[int]](
            get_executor(),
            self.generator.getNext(),
            SimpleGenerator.callback,
            <PyObject *> future,
        )
        return future