perf: 优化用户在线状态

This commit is contained in:
kuaifan 2024-11-18 13:30:45 +08:00
parent 11ea2d3697
commit 42c77db1d4
4 changed files with 87 additions and 100 deletions

View File

@ -3,8 +3,6 @@
namespace App\Events; namespace App\Events;
use App\Models\WebSocket; use App\Models\WebSocket;
use App\Module\Base;
use Cache;
use Hhxsv5\LaravelS\Swoole\Events\WorkerStartInterface; use Hhxsv5\LaravelS\Swoole\Events\WorkerStartInterface;
use Swoole\Http\Server; use Swoole\Http\Server;
@ -27,10 +25,5 @@ class WorkerStartEvent implements WorkerStartInterface
private function handleFirstWorkerTasks() private function handleFirstWorkerTasks()
{ {
WebSocket::query()->delete(); WebSocket::query()->delete();
//
$all = Base::json2array(Cache::get("User::online:all"));
foreach ($all as $userid) {
Cache::forget("User::online:" . $userid);
}
} }
} }

View File

@ -6,6 +6,7 @@ namespace App\Models;
use App\Exceptions\ApiException; use App\Exceptions\ApiException;
use App\Module\Base; use App\Module\Base;
use App\Module\Doo; use App\Module\Doo;
use App\Module\Table\OnlineData;
use App\Services\RequestContext; use App\Services\RequestContext;
use Cache; use Cache;
use Carbon\Carbon; use Carbon\Carbon;
@ -195,7 +196,7 @@ class User extends AbstractModel
*/ */
public function getOnlineStatus() public function getOnlineStatus()
{ {
$online = $this->bot || Cache::get("User::online:" . $this->userid) === "on"; $online = $this->bot || OnlineData::live($this->userid) > 0;
if ($online) { if ($online) {
return true; return true;
} }

View File

@ -2,21 +2,62 @@
namespace App\Module\Table; namespace App\Module\Table;
use App\Models\User;
use App\Tasks\LineTask;
use App\Tasks\PushTask;
use Carbon\Carbon;
use Hhxsv5\LaravelS\Swoole\Task\Task;
class OnlineData extends AbstractData class OnlineData extends AbstractData
{ {
public static function incr($userid) /**
* 上线
* @param $userid
* @return float|int|mixed
*/
public static function online($userid)
{ {
$key = "online::" . $userid; $key = "online::" . $userid;
$value = intval(self::get($key, 0)); $value = self::instance()->getTable()->incr($key, 'value');
self::set($key, ++$value); if ($value === 1) {
// 通知上线
Task::deliver(new LineTask($userid, true));
// 推送离线时收到的消息
Task::deliver(new PushTask("RETRY::" . $userid));
}
return $value; return $value;
} }
public static function decr($userid) /**
* 离线
* @param $userid
* @return float|int|mixed
*/
public static function offline($userid)
{ {
$key = "online::" . $userid; $key = "online::" . $userid;
$value = intval(self::get($key, 0)); $value = self::instance()->getTable()->decr($key, 'value');
self::set($key, --$value); if ($value === 0) {
// 更新最后在线时间
User::whereUserid($userid)->update([
'line_at' => Carbon::now()
]);
// 通知下线
Task::deliver(new LineTask($userid, false));
// 清除在线状态
self::instance()->getTable()->del($key);
}
return $value; return $value;
} }
/**
* 获取在线状态
* @param $userid
* @return int
*/
public static function live($userid)
{
$key = "online::" . $userid;
return intval(self::instance()->getTable()->get($key));
}
} }

View File

@ -6,10 +6,9 @@ namespace App\Services;
use App\Models\User; use App\Models\User;
use App\Models\WebSocket; use App\Models\WebSocket;
use App\Models\WebSocketDialogMsg;
use App\Module\Base; use App\Module\Base;
use App\Module\Doo; use App\Module\Doo;
use App\Tasks\LineTask; use App\Module\Table\OnlineData;
use App\Tasks\PushTask; use App\Tasks\PushTask;
use Cache; use Cache;
use Carbon\Carbon; use Carbon\Carbon;
@ -25,7 +24,6 @@ use Swoole\WebSocket\Server;
class WebSocketService implements WebSocketHandlerInterface class WebSocketService implements WebSocketHandlerInterface
{ {
/** /**
* 声明没有参数的构造函数
* WebSocketService constructor. * WebSocketService constructor.
*/ */
public function __construct() public function __construct()
@ -37,14 +35,14 @@ class WebSocketService implements WebSocketHandlerInterface
* 连接建立时触发 * 连接建立时触发
* @param Server $server * @param Server $server
* @param Request $request * @param Request $request
* @return void
*/ */
public function onOpen(Server $server, Request $request) public function onOpen(Server $server, Request $request)
{ {
$fd = $request->fd; $fd = $request->fd;
$get = Base::newTrim($request->get); $get = Base::newTrim($request->get);
$action = $get['action'];
Cache::forget("User::encrypt:" . $fd); Cache::forget("User::encrypt:" . $fd);
switch ($action) { switch ($get['action']) {
/** /**
* 网页访问 * 网页访问
*/ */
@ -52,23 +50,21 @@ class WebSocketService implements WebSocketHandlerInterface
{ {
Doo::load($get['token'], $get['language']); Doo::load($get['token'], $get['language']);
// //
if (Doo::userId() > 0 $count = 0;
&& !Doo::userExpired() $userid = Doo::userId();
&& $user = User::whereUserid(Doo::userId())->whereEmail(Doo::userEmail())->whereEncrypt(Doo::userEncrypt())->first()) { if ($userid > 0 && !Doo::userExpired()) {
// 保存用户 $count = User::whereUserid($userid)->whereEmail(Doo::userEmail())->whereEncrypt(Doo::userEncrypt())->count();
$this->saveUser($fd, $user->userid); }
// 发送open事件 if ($count) {
// 用户正常
$server->push($fd, Base::array2json([ $server->push($fd, Base::array2json([
'type' => 'open', 'type' => 'open',
'data' => [ 'data' => [
'fd' => $fd, 'fd' => $fd,
'ud' => $user->userid, 'ud' => $userid,
], ],
])); ]));
// 通知上线 $this->userOn($fd, $userid);
Task::deliver(new LineTask($user->userid, true));
// 推送离线时收到的消息
Task::deliver(new PushTask("RETRY::" . $user->userid));
} else { } else {
// 用户不存在 // 用户不存在
$server->push($fd, Base::array2json([ $server->push($fd, Base::array2json([
@ -78,7 +74,6 @@ class WebSocketService implements WebSocketHandlerInterface
], ],
])); ]));
$server->close($fd); $server->close($fd);
$this->deleteUser($fd);
} }
} }
break; break;
@ -92,39 +87,23 @@ class WebSocketService implements WebSocketHandlerInterface
* 收到消息时触发 * 收到消息时触发
* @param Server $server * @param Server $server
* @param Frame $frame * @param Frame $frame
* @return void
*/ */
public function onMessage(Server $server, Frame $frame) public function onMessage(Server $server, Frame $frame)
{ {
$msg = Base::json2array($frame->data); $msg = Base::json2array($frame->data);
$type = $msg['type']; // 消息类型 $type = $msg['type']; // 消息类型
$msgId = $msg['msgId']; // 消息ID用于回调
$data = $msg['data']; // 消息详情 $data = $msg['data']; // 消息详情
// $msgId = $msg['msgId'] ?? $msg['msg_id']; // 消息ID用于回调
// 处理消息
$reData = []; $reData = [];
switch ($type) { switch ($type) {
/** // 收到回执
* 收到回执
*/
case 'receipt': case 'receipt':
return; return;
/** // 访问状态
* 已阅消息
*/
case 'readMsg':
$ids = is_array($data['id']) ? $data['id'] : [$data['id']];
$userid = $this->getUserid($frame->fd);
WebSocketDialogMsg::whereIn('id', $ids)->chunkById(20, function($list) use ($userid) {
/** @var WebSocketDialogMsg $item */
foreach ($list as $item) {
$item->readSuccess($userid);
}
});
return;
/**
* 访问状态
*/
case 'path': case 'path':
$row = WebSocket::whereFd($frame->fd)->first(); $row = WebSocket::whereFd($frame->fd)->first();
if ($row) { if ($row) {
@ -141,9 +120,7 @@ class WebSocketService implements WebSocketHandlerInterface
} }
return; return;
/** // 加密参数
* 加密参数
*/
case 'encrypt': case 'encrypt':
if ($data['type'] === 'pgp') { if ($data['type'] === 'pgp') {
$data['key'] = Doo::pgpPublicFormat($data['key']); $data['key'] = Doo::pgpPublicFormat($data['key']);
@ -151,7 +128,8 @@ class WebSocketService implements WebSocketHandlerInterface
Cache::put("User::encrypt:" . $frame->fd, Base::array2json($data), Carbon::now()->addDay()); Cache::put("User::encrypt:" . $frame->fd, Base::array2json($data), Carbon::now()->addDay());
return; return;
} }
//
// 返回消息
if ($msgId) { if ($msgId) {
PushTask::push([ PushTask::push([
'fd' => $frame->fd, 'fd' => $frame->fd,
@ -169,15 +147,11 @@ class WebSocketService implements WebSocketHandlerInterface
* @param Server $server * @param Server $server
* @param $fd * @param $fd
* @param $reactorId * @param $reactorId
* @throws \Exception * @return void
*/ */
public function onClose(Server $server, $fd, $reactorId) public function onClose(Server $server, $fd, $reactorId)
{ {
$userid = $this->getUserid($fd); $this->userOff($fd);
if($userid){
Task::deliver(new LineTask($userid, false)); // 通知离线
$this->deleteUser($fd);
}
} }
/** ****************************************************************************** */ /** ****************************************************************************** */
@ -185,71 +159,49 @@ class WebSocketService implements WebSocketHandlerInterface
/** ****************************************************************************** */ /** ****************************************************************************** */
/** /**
* 保存用户 * 用户上线
* @param $fd * @param $fd
* @param $userid * @param $userid
* @return void
*/ */
private function saveUser($fd, $userid) private function userOn($fd, $userid)
{ {
Cache::put("User::fd:" . $fd, "on", Carbon::now()->addDay());
Cache::put("User::online:" . $userid, "on", Carbon::now()->addDay());
//
$all = Base::json2array(Cache::get("User::online:all"));
if (!isset($all[$userid])) {
$all[$userid] = $userid;
Cache::forever("User::online:all", Base::array2json($all));
}
//
WebSocket::updateInsert([ WebSocket::updateInsert([
'key' => md5($fd . '@' . $userid) 'key' => md5($fd . '@' . $userid)
], [ ], [
'fd' => $fd, 'fd' => $fd,
'userid' => $userid, 'userid' => $userid,
]); ]);
OnlineData::online($userid);
} }
/** /**
* 清除用户 * 用户下线
* @param $fd * @param $fd
* @return void
*/ */
private function deleteUser($fd) private function userOff($fd)
{ {
Cache::forget("User::fd:" . $fd); $paths = [];
// WebSocket::whereFd($fd)->chunk(10, function($list) use (&$paths) {
$array = [];
WebSocket::whereFd($fd)->chunk(10, function($list) use (&$array) {
/** @var WebSocket $item */ /** @var WebSocket $item */
foreach ($list as $item) { foreach ($list as $item) {
$item->delete(); $item->delete();
if ($item->userid) { if ($item->userid) {
// 离线时更新会员最后在线时间 OnlineData::offline($item->userid);
User::whereUserid($item->userid)->update([
'line_at' => Carbon::now()
]);
Cache::forget("User::online:" . $item->userid);
} }
if ($item->path && str_starts_with($item->path, "/single/file/")) { if ($item->path && str_starts_with($item->path, "/single/file/")) {
$array[$item->path] = $item->path; $paths[$item->path] = $item->path;
} }
} }
}); });
foreach ($array as $path) { foreach ($paths as $path) {
$this->pushPath($path); $this->pushPath($path);
} }
} }
/** /**
* 根据fd获取会员ID * 通知相同访问路径的用户
* @param $fd
* @return int
*/
private function getUserid($fd)
{
return intval(WebSocket::whereFd($fd)->value('userid'));
}
/**
* 发送给相同访问状态的会员
* @param $path * @param $path
*/ */
private function pushPath($path) private function pushPath($path)