php 实现 Promise.all 和 Promise.race

  • 测试

    $promise1 = function () {
     msleep(500);
     return 'one';
    };
    $promise2 = function () {
     msleep(100);
     return 'two';
    };
    $promise3 = function () {
     msleep(50);
     throw new \Exception('Reject');
    };
    var_dump(promise_all([$promise1, $promise2]));
    var_dump(promise_race([$promise1, $promise2]));
    var_dump(promise_race([$promise1, $promise2, $promise3]));
  • 结果

    # php promise.php
    array(2) {
    [1]=>
    string(3) "two"
    [0]=>
    string(3) "one"
    }
    string(3) "two"
    object(Exception)#15 (7) {}
  • 实现

    <?php
    declare(strict_types=1);
    use Swow\Coroutine;
    use Swow\Channel;
    use Swow\Selector;
    use Swow\Sync\WaitGroup;
    use function Swow\defer;
    /**
     * @param array $callbacks 
     * @param int $parallel 并发数量
     * @return array 
     */
    function promise_all(array $callbacks, int $parallel = -1)
    {
     $wg = new WaitGroup();
     $channel = new Channel($parallel);
     $results = [];
     foreach ($callbacks as $key => $callback) {
     $wg->add();
     $channel->push(true);
     Coroutine::run(static function () use ($wg, $channel, $callback, $key, &$results) {
     try {
     $results[$key] = $callback();
     } catch (\Throwable) {
     } finally {
     $channel->pop();
     $wg->done();
     }
     });
     }
     $wg->wait();
     return $results;
    }
    /**
     * @param array $callbacks 
     * @param int $timeout 超时
     * @param bool $throw 是否抛出异常
     * @return mixed 
     */
    function promise_race(array $callbacks, int $timeout = -1, bool $throw = true)
    {
     $coroutines = [];
     defer(static function () use (&$coroutines) {
     Coroutine::run(static function () use (&$coroutines) {
     foreach ($coroutines as $coroutine) {
     if ($coroutine && $coroutine->isAlive()) {
     $coroutine->kill();
     }
     }
     });
     });
     $selector = new Selector();
     foreach ($callbacks as $callback) {
     $channel = new Channel();
     $coroutines[] = Coroutine::run(static function () use ($channel, $callback) {
     try {
     $channel->push($callback());
     } catch (\Throwable $e) {
     $channel->push($e);
     }
     });
     $selector->pop($channel);
     }
     try {
     $selector->commit($timeout);
     return $selector->fetch();
     } catch (\Throwable $e) {
     if ($throw) {
     throw $e;
     }
     }
     return false;
    }
作者:何一鸣原文地址:https://segmentfault.com/a/1190000043336191

%s 个评论

要回复文章请先登录注册