From 42c77db1d4dda6797c021bb0915720473424d11e Mon Sep 17 00:00:00 2001 From: kuaifan Date: Mon, 18 Nov 2024 13:30:45 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E4=BC=98=E5=8C=96=E7=94=A8=E6=88=B7?= =?UTF-8?q?=E5=9C=A8=E7=BA=BF=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/Events/WorkerStartEvent.php | 7 -- app/Models/User.php | 3 +- app/Module/Table/OnlineData.php | 53 +++++++++++-- app/Services/WebSocketService.php | 124 +++++++++--------------------- 4 files changed, 87 insertions(+), 100 deletions(-) diff --git a/app/Events/WorkerStartEvent.php b/app/Events/WorkerStartEvent.php index feb4dddb4..405e70293 100644 --- a/app/Events/WorkerStartEvent.php +++ b/app/Events/WorkerStartEvent.php @@ -3,8 +3,6 @@ namespace App\Events; use App\Models\WebSocket; -use App\Module\Base; -use Cache; use Hhxsv5\LaravelS\Swoole\Events\WorkerStartInterface; use Swoole\Http\Server; @@ -27,10 +25,5 @@ class WorkerStartEvent implements WorkerStartInterface private function handleFirstWorkerTasks() { WebSocket::query()->delete(); - // - $all = Base::json2array(Cache::get("User::online:all")); - foreach ($all as $userid) { - Cache::forget("User::online:" . $userid); - } } } diff --git a/app/Models/User.php b/app/Models/User.php index b0f41d6f2..dfdb0e868 100644 --- a/app/Models/User.php +++ b/app/Models/User.php @@ -6,6 +6,7 @@ namespace App\Models; use App\Exceptions\ApiException; use App\Module\Base; use App\Module\Doo; +use App\Module\Table\OnlineData; use App\Services\RequestContext; use Cache; use Carbon\Carbon; @@ -195,7 +196,7 @@ class User extends AbstractModel */ public function getOnlineStatus() { - $online = $this->bot || Cache::get("User::online:" . $this->userid) === "on"; + $online = $this->bot || OnlineData::live($this->userid) > 0; if ($online) { return true; } diff --git a/app/Module/Table/OnlineData.php b/app/Module/Table/OnlineData.php index ff1818011..930c39329 100644 --- a/app/Module/Table/OnlineData.php +++ b/app/Module/Table/OnlineData.php @@ -2,21 +2,62 @@ namespace App\Module\Table; +use App\Models\User; +use App\Tasks\LineTask; +use App\Tasks\PushTask; +use Carbon\Carbon; +use Hhxsv5\LaravelS\Swoole\Task\Task; + class OnlineData extends AbstractData { - public static function incr($userid) + /** + * 上线 + * @param $userid + * @return float|int|mixed + */ + public static function online($userid) { $key = "online::" . $userid; - $value = intval(self::get($key, 0)); - self::set($key, ++$value); + $value = self::instance()->getTable()->incr($key, 'value'); + if ($value === 1) { + // 通知上线 + Task::deliver(new LineTask($userid, true)); + // 推送离线时收到的消息 + Task::deliver(new PushTask("RETRY::" . $userid)); + } return $value; } - public static function decr($userid) + /** + * 离线 + * @param $userid + * @return float|int|mixed + */ + public static function offline($userid) { $key = "online::" . $userid; - $value = intval(self::get($key, 0)); - self::set($key, --$value); + $value = self::instance()->getTable()->decr($key, 'value'); + if ($value === 0) { + // 更新最后在线时间 + User::whereUserid($userid)->update([ + 'line_at' => Carbon::now() + ]); + // 通知下线 + Task::deliver(new LineTask($userid, false)); + // 清除在线状态 + self::instance()->getTable()->del($key); + } return $value; } + + /** + * 获取在线状态 + * @param $userid + * @return int + */ + public static function live($userid) + { + $key = "online::" . $userid; + return intval(self::instance()->getTable()->get($key)); + } } diff --git a/app/Services/WebSocketService.php b/app/Services/WebSocketService.php index bfb48309c..c3c727042 100644 --- a/app/Services/WebSocketService.php +++ b/app/Services/WebSocketService.php @@ -6,10 +6,9 @@ namespace App\Services; use App\Models\User; use App\Models\WebSocket; -use App\Models\WebSocketDialogMsg; use App\Module\Base; use App\Module\Doo; -use App\Tasks\LineTask; +use App\Module\Table\OnlineData; use App\Tasks\PushTask; use Cache; use Carbon\Carbon; @@ -25,7 +24,6 @@ use Swoole\WebSocket\Server; class WebSocketService implements WebSocketHandlerInterface { /** - * 声明没有参数的构造函数 * WebSocketService constructor. */ public function __construct() @@ -37,14 +35,14 @@ class WebSocketService implements WebSocketHandlerInterface * 连接建立时触发 * @param Server $server * @param Request $request + * @return void */ public function onOpen(Server $server, Request $request) { $fd = $request->fd; $get = Base::newTrim($request->get); - $action = $get['action']; Cache::forget("User::encrypt:" . $fd); - switch ($action) { + switch ($get['action']) { /** * 网页访问 */ @@ -52,23 +50,21 @@ class WebSocketService implements WebSocketHandlerInterface { Doo::load($get['token'], $get['language']); // - if (Doo::userId() > 0 - && !Doo::userExpired() - && $user = User::whereUserid(Doo::userId())->whereEmail(Doo::userEmail())->whereEncrypt(Doo::userEncrypt())->first()) { - // 保存用户 - $this->saveUser($fd, $user->userid); - // 发送open事件 + $count = 0; + $userid = Doo::userId(); + if ($userid > 0 && !Doo::userExpired()) { + $count = User::whereUserid($userid)->whereEmail(Doo::userEmail())->whereEncrypt(Doo::userEncrypt())->count(); + } + if ($count) { + // 用户正常 $server->push($fd, Base::array2json([ 'type' => 'open', 'data' => [ 'fd' => $fd, - 'ud' => $user->userid, + 'ud' => $userid, ], ])); - // 通知上线 - Task::deliver(new LineTask($user->userid, true)); - // 推送离线时收到的消息 - Task::deliver(new PushTask("RETRY::" . $user->userid)); + $this->userOn($fd, $userid); } else { // 用户不存在 $server->push($fd, Base::array2json([ @@ -78,7 +74,6 @@ class WebSocketService implements WebSocketHandlerInterface ], ])); $server->close($fd); - $this->deleteUser($fd); } } break; @@ -92,39 +87,23 @@ class WebSocketService implements WebSocketHandlerInterface * 收到消息时触发 * @param Server $server * @param Frame $frame + * @return void */ public function onMessage(Server $server, Frame $frame) { $msg = Base::json2array($frame->data); - $type = $msg['type']; // 消息类型 - $msgId = $msg['msgId']; // 消息ID(用于回调) - $data = $msg['data']; // 消息详情 - // + $type = $msg['type']; // 消息类型 + $data = $msg['data']; // 消息详情 + $msgId = $msg['msgId'] ?? $msg['msg_id']; // 消息ID(用于回调) + + // 处理消息 $reData = []; switch ($type) { - /** - * 收到回执 - */ + // 收到回执 case 'receipt': return; - /** - * 已阅消息 - */ - case 'readMsg': - $ids = is_array($data['id']) ? $data['id'] : [$data['id']]; - $userid = $this->getUserid($frame->fd); - WebSocketDialogMsg::whereIn('id', $ids)->chunkById(20, function($list) use ($userid) { - /** @var WebSocketDialogMsg $item */ - foreach ($list as $item) { - $item->readSuccess($userid); - } - }); - return; - - /** - * 访问状态 - */ + // 访问状态 case 'path': $row = WebSocket::whereFd($frame->fd)->first(); if ($row) { @@ -141,9 +120,7 @@ class WebSocketService implements WebSocketHandlerInterface } return; - /** - * 加密参数 - */ + // 加密参数 case 'encrypt': if ($data['type'] === 'pgp') { $data['key'] = Doo::pgpPublicFormat($data['key']); @@ -151,7 +128,8 @@ class WebSocketService implements WebSocketHandlerInterface Cache::put("User::encrypt:" . $frame->fd, Base::array2json($data), Carbon::now()->addDay()); return; } - // + + // 返回消息 if ($msgId) { PushTask::push([ 'fd' => $frame->fd, @@ -169,15 +147,11 @@ class WebSocketService implements WebSocketHandlerInterface * @param Server $server * @param $fd * @param $reactorId - * @throws \Exception + * @return void */ public function onClose(Server $server, $fd, $reactorId) { - $userid = $this->getUserid($fd); - if($userid){ - Task::deliver(new LineTask($userid, false)); // 通知离线 - $this->deleteUser($fd); - } + $this->userOff($fd); } /** ****************************************************************************** */ @@ -185,71 +159,49 @@ class WebSocketService implements WebSocketHandlerInterface /** ****************************************************************************** */ /** - * 保存用户 + * 用户上线 * @param $fd * @param $userid + * @return void */ - private function saveUser($fd, $userid) + private function userOn($fd, $userid) { - Cache::put("User::fd:" . $fd, "on", Carbon::now()->addDay()); - Cache::put("User::online:" . $userid, "on", Carbon::now()->addDay()); - // - $all = Base::json2array(Cache::get("User::online:all")); - if (!isset($all[$userid])) { - $all[$userid] = $userid; - Cache::forever("User::online:all", Base::array2json($all)); - } - // WebSocket::updateInsert([ 'key' => md5($fd . '@' . $userid) ], [ 'fd' => $fd, 'userid' => $userid, ]); + OnlineData::online($userid); } /** - * 清除用户 + * 用户下线 * @param $fd + * @return void */ - private function deleteUser($fd) + private function userOff($fd) { - Cache::forget("User::fd:" . $fd); - // - $array = []; - WebSocket::whereFd($fd)->chunk(10, function($list) use (&$array) { + $paths = []; + WebSocket::whereFd($fd)->chunk(10, function($list) use (&$paths) { /** @var WebSocket $item */ foreach ($list as $item) { $item->delete(); if ($item->userid) { - // 离线时更新会员最后在线时间 - User::whereUserid($item->userid)->update([ - 'line_at' => Carbon::now() - ]); - Cache::forget("User::online:" . $item->userid); + OnlineData::offline($item->userid); } if ($item->path && str_starts_with($item->path, "/single/file/")) { - $array[$item->path] = $item->path; + $paths[$item->path] = $item->path; } } }); - foreach ($array as $path) { + foreach ($paths as $path) { $this->pushPath($path); } } /** - * 根据fd获取会员ID - * @param $fd - * @return int - */ - private function getUserid($fd) - { - return intval(WebSocket::whereFd($fd)->value('userid')); - } - - /** - * 发送给相同访问状态的会员 + * 通知相同访问路径的用户 * @param $path */ private function pushPath($path)