setName('queue:listen') ->addArgument('action', Argument::OPTIONAL, "start|stop|restart|reload|status|connections", 'start') ->addOption('mode', 'm', Option::VALUE_OPTIONAL, 'Run the workerman server in daemon mode.') ->setDescription('基于Redis的消息队列,支持消息延迟处理。'); } /** * 执行任务 * @return void */ protected function execute(Input $input, Output $output) { $this->resetCli($input, $output); Worker::$pidFile = runtime_path() .'workerman_queue.pid'; Worker::$logFile = runtime_path().'workerman.log'; $worker = new Worker(); $worker->name = 'queue_work'; // $worker->count = 3; $worker->onWorkerStart = function () use($output){ // 定时,每10秒一次 Timer::add(30, function()use($output) { (new SysSchedule())->select(); }); $redis_option = [ 'connect_timeout' => 10, 'max_attempts' => 3, 'retry_seconds' => 5, ]; if(!empty(env('redis.redis_password'))){ $redis_option['auth'] = env('redis.redis_password'); } $redis_option['db'] = env('redis.select'); $client = new Client('redis://'.env('redis.redis_hostname').':'.env('redis.port'), $redis_option); $queue_list = $this->getAllQueue(); foreach ($queue_list as $queue_class_name){ $queue_class_name = str_replace('.php', '', $queue_class_name); // 订阅 $client->subscribe($queue_class_name, function($data) use($queue_class_name, $output){ echo "\n".'['.date('Y-m-d H:i:s').']'." Processing:" . $queue_class_name; try{ $class_name = '\\' .$queue_class_name; $class = new $class_name(); $class->fire($data); } catch (\Throwable $e) { Log::write(date('Y-m-d H:i:s').',队列有错误:'.$queue_class_name.'_'.$e->getMessage().'_'.$e->getFile().'_'.$e->getLine()); } echo "\n".'['.date('Y-m-d H:i:s').']'." Processed:" . $queue_class_name; }); } // 消费失败触发的回调(可选) $client->onConsumeFailure(function (\Throwable $exception, $package) use($output){ echo "\n"."队列 " . $package['queue'] . " 消费失败,".$exception->getMessage(); }); }; Worker::runAll(); } /** * 捕获所有队列任务 * @return array */ public function getAllQueue(){ $class_list = []; $system_dir = root_path() . 'app' . DIRECTORY_SEPARATOR . 'job'; $addon_dir = root_path() . 'addon' . DIRECTORY_SEPARATOR; if(is_dir($system_dir)){ search_dir($system_dir, $app_data, root_path()); $class_list = array_merge($class_list, $app_data); } $addons = (new CoreAddonService())->getInstallAddonList(); foreach ($addons as $v) { $addon_path = $addon_dir .$v['key']. DIRECTORY_SEPARATOR . 'app' . DIRECTORY_SEPARATOR. 'job'; if (is_dir($addon_path)) { search_dir($addon_path, $addon_data, root_path()); $class_list = array_merge($class_list, $addon_data); } } foreach($class_list as &$v){ $v = str_replace('.php', '', $v); $v = str_replace('/', '\\', $v); } return $class_list; } }