perf: 优化消息搜索

This commit is contained in:
kuaifan 2025-03-01 23:52:39 +08:00
parent 32c232a0b5
commit ed064a825a
4 changed files with 378 additions and 51 deletions

View File

@ -0,0 +1,247 @@
<?php
namespace App\Console\Commands;
use App\Module\Base;
use App\Module\ElasticSearch;
use App\Models\WebSocketDialogMsg;
use App\Models\WebSocketDialogUser;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Log;
class SyncDialogUserMsgToElasticsearch extends Command
{
/**
* 更新数据
* --f: 全量更新 (默认)
* --i: 增量更新从上次更新的最后一个ID接上
*
* 清理数据
* --c: 清除索引
*/
protected $signature = 'elasticsearch:sync-dialog-user-msg {--f} {--i} {--c} {--batch=500}';
protected $description = '同步聊天会话用户和消息到Elasticsearch';
protected $elasticsearch;
/**
* SyncDialogUserMsgToElasticsearch constructor.
*/
public function __construct()
{
parent::__construct();
$this->elasticsearch = new ElasticSearch(ElasticSearch::DUM);
}
/**
* @return int
* @throws \Exception
*/
public function handle()
{
$this->info('开始同步聊天数据...');
// 清除索引
if ($this->option('c')) {
$this->info('清除索引...');
if (!$this->elasticsearch->indexExists()) {
$this->info('索引不存在');
return 0;
}
$result = $this->elasticsearch->deleteIndex();
if (isset($result['error'])) {
$this->error('删除索引失败: ' . $result['error']);
return 1;
}
$this->saveLastId(true);
$this->info('索引删除成功');
return 0;
}
// 判断创建索引
if (!$this->elasticsearch->indexExists()) {
$this->info('创建索引...');
$result = $this->elasticsearch->createDialogUserMsgIndex();
if (isset($result['error'])) {
$this->error('创建索引失败: ' . $result['error']);
return 1;
}
$this->info('索引创建成功');
}
// 同步用户-会话数据
$this->syncDialogUsers($this->option('batch'));
// 同步消息数据
$this->syncDialogMsgs($this->option('batch'));
// 完成
$this->info("\n同步完成");
return 0;
}
/**
* 保存最后一个ID
* @param string|true $type
* @param integer $lastId
*/
private function saveLastId($type, $lastId = 0)
{
if ($type === true) {
$setting = [];
} else {
$setting = Base::setting('elasticSearch:sync');
$setting[$type] = $lastId;
}
Base::setting('elasticSearch:sync', $setting);
}
/**
* 获取最后一个ID
* @param $type
* @return int
*/
private function getLastId($type)
{
if ($this->option('i')) {
$setting = Base::setting('elasticSearch:sync');
return intval($setting[$type] ?? 0);
}
return 0;
}
/**
* 同步用户-会话数据(父文档)
* @param $batchSize
* @return void
*/
private function syncDialogUsers($batchSize)
{
$this->info("\n同步用户数据...");
$lastId = $this->getLastId('dialog_user');
$num = 0;
$count = WebSocketDialogUser::where('id', '>', $lastId)->count();
do {
// 获取一批用户-会话关系
$dialogUsers = WebSocketDialogUser::where('id', '>', $lastId)
->orderBy('id')
->limit($batchSize)
->get();
if ($dialogUsers->isEmpty()) {
break;
}
$num += count($dialogUsers);
$progress = round($num / $count * 100, 2);
$this->info("{$num}/{$count} ({$progress}%) 正在同步用户ID {$lastId} ~ {$dialogUsers->last()->id}");
// 批量索引数据
$params = ['body' => []];
foreach ($dialogUsers as $dialogUser) {
$params['body'][] = [
'index' => [
'_index' => ElasticSearch::DUM,
'_id' => ElasticSearch::generateDialogUserDicId($dialogUser),
]
];
$params['body'][] = ElasticSearch::generateDialogUserFormat($dialogUser);
}
if ($params['body']) {
$result = $this->elasticsearch->bulk($params);
if (isset($result['errors']) && $result['errors']) {
$this->error('批量索引用户数据部分失败');
Log::error('Elasticsearch批量索引失败: ' . json_encode($result['items']));
}
}
$lastId = $dialogUsers->last()->id;
$this->saveLastId('dialog_user', $lastId);
} while (count($dialogUsers) == $batchSize);
$this->info("同步用户数据结束 - 最后ID {$lastId}");
}
/**
* 同步消息数据(子文档)
*/
private function syncDialogMsgs($batchSize)
{
$this->info("\n同步消息数据...");
$lastId = $this->getLastId('dialog_msg');
$num = 0;
$count = WebSocketDialogMsg::where('id', '>', $lastId)->count();
do {
// 获取一批消息
$dialogMsgs = WebSocketDialogMsg::where('id', '>', $lastId)
->orderBy('id')
->limit($batchSize)
->get();
if ($dialogMsgs->isEmpty()) {
break;
}
$num += count($dialogMsgs);
$progress = round($num / $count * 100, 2);
$this->info("{$num}/{$count} ({$progress}%) 正在同步消息ID {$lastId} ~ {$dialogMsgs->last()->id}");
// 获取这些消息所属的会话对应的所有用户
$dialogIds = $dialogMsgs->pluck('dialog_id')->unique()->toArray();
$userDialogMap = [];
if (!empty($dialogIds)) {
$dialogUsers = WebSocketDialogUser::whereIn('dialog_id', $dialogIds)->get();
foreach ($dialogUsers as $dialogUser) {
$userDialogMap[$dialogUser->dialog_id][] = $dialogUser->userid;
}
}
// 批量索引消息数据
$params = ['body' => []];
foreach ($dialogMsgs as $dialogMsg) {
// 如果该会话没有用户,跳过
if (empty($userDialogMap[$dialogMsg->dialog_id])) {
continue;
}
// 为每个用户-会话关系创建子文档
foreach ($userDialogMap[$dialogMsg->dialog_id] as $userid) {
$params['body'][] = [
'index' => [
'_index' => ElasticSearch::DUM,
'_id' => ElasticSearch::generateDialogMsgDicId($dialogMsg, $userid),
'routing' => ElasticSearch::generateDialogMsgParentId($dialogMsg, $userid) // 路由到父文档
]
];
$params['body'][] = ElasticSearch::generateDialogMsgFormat($dialogMsg, $userid);
}
}
if (!empty($params['body'])) {
// 分批处理
$chunks = array_chunk($params['body'], 1000);
foreach ($chunks as $chunk) {
$chunkParams = ['body' => $chunk];
$result = $this->elasticsearch->bulk($chunkParams);
if (isset($result['errors']) && $result['errors']) {
$this->error('批量索引消息数据部分失败');
Log::error('Elasticsearch批量索引失败: ' . json_encode($result['items']));
}
}
}
$lastId = $dialogMsgs->last()->id;
$this->saveLastId('dialog_msg', $lastId);
} while (count($dialogMsgs) == $batchSize);
$this->info("同步消息结束 - 最后ID {$lastId}");
}
}

View File

@ -23,6 +23,7 @@ use App\Tasks\AutoArchivedTask;
use App\Tasks\DeleteBotMsgTask;
use App\Tasks\CheckinRemindTask;
use App\Tasks\CloseMeetingRoomTask;
use App\Tasks\ElasticSearchSyncTask;
use App\Tasks\UnclaimedTaskRemindTask;
use Hhxsv5\LaravelS\Swoole\Task\Task;
use Laravolt\Avatar\Avatar;
@ -258,6 +259,8 @@ class IndexController extends InvokeController
Task::deliver(new UnclaimedTaskRemindTask());
// 关闭会议室
Task::deliver(new CloseMeetingRoomTask());
// ElasticSearch 同步
Task::deliver(new ElasticSearchSyncTask());
return "success";
}

View File

@ -77,7 +77,7 @@ class ElasticSearch
public function indexExists()
{
$params = ['index' => $this->index];
return (bool)$this->client->indices()->exists($params);
return $this->client->indices()->exists($params)->asBool();
}
/**
@ -404,36 +404,53 @@ class ElasticSearch
const DUM = "dialog_user_msg";
/**
* 会话用户 - 生成文档ID
* @param WebSocketDialogUser $dialogUser
* @return string
*/
public static function generateDialogUserDicId(WebSocketDialogUser $dialogUser)
{
return "user_{$dialogUser->userid}_dialog_{$dialogUser->dialog_id}";
}
/**
* 会话用户 - 生成文档格式
* @param WebSocketDialogUser $dialogUser
* @return array
*/
public static function generateDialogUserFormat(WebSocketDialogUser $dialogUser)
{
return [
'dialog_id' => $dialogUser->dialog_id,
'created_at' => $dialogUser->created_at,
'updated_at' => $dialogUser->updated_at,
'userid' => $dialogUser->userid,
'top_at' => $dialogUser->top_at,
'last_at' => $dialogUser->last_at,
'mark_unread' => $dialogUser->mark_unread ? 1 : 0,
'silence' => $dialogUser->silence ? 1 : 0,
'hide' => $dialogUser->hide ? 1 : 0,
'color' => $dialogUser->color,
'relationship' => [
'name' => 'dialog_user'
]
];
}
/**
* 会话用户 - 同步到Elasticsearch
* @param WebSocketDialogUser $dialogUser
* @return void
*/
public static function syncDialogUserToElasticSearch(WebSocketDialogUser $dialogUser)
{
try {
$es = new self(self::DUM);
$docId = "user_{$dialogUser->userid}_dialog_{$dialogUser->dialog_id}";
$document = [
'dialog_id' => $dialogUser->dialog_id,
'created_at' => $dialogUser->created_at,
'updated_at' => $dialogUser->updated_at,
'userid' => $dialogUser->userid,
'top_at' => $dialogUser->top_at,
'last_at' => $dialogUser->last_at,
'mark_unread' => $dialogUser->mark_unread ? 1 : 0,
'silence' => $dialogUser->silence ? 1 : 0,
'hide' => $dialogUser->hide ? 1 : 0,
'color' => $dialogUser->color,
'relationship' => [
'name' => 'dialog_user'
]
];
$es->indexDocument($document, $docId);
$es->indexDocument(self::generateDialogUserFormat($dialogUser), self::generateDialogUserDicId($dialogUser));
} catch (\Exception $e) {
Log::error('syncDialogUserToElasticSearch: ' . $e->getMessage());
}
@ -461,6 +478,58 @@ class ElasticSearch
}
}
/** ******************************************************************************************************** */
/** ******************************************************************************************************** */
/** ******************************************************************************************************** */
/**
* 会话消息 - 生成父文档ID
* @param WebSocketDialogMsg $dialogMsg
* @param $userid
* @return string
*/
public static function generateDialogMsgParentId(WebSocketDialogMsg $dialogMsg, $userid)
{
return "user_{$userid}_dialog_{$dialogMsg->dialog_id}";
}
/**
* 会话消息 - 生成文档ID
* @param WebSocketDialogMsg $dialogMsg
* @param $userid
* @return string
*/
public static function generateDialogMsgDicId(WebSocketDialogMsg $dialogMsg, $userid)
{
return "msg_{$dialogMsg->id}_user_{$userid}";
}
/**
* 会话消息 - 生成文档格式
* @param WebSocketDialogMsg $dialogMsg
* @param $userid
* @return array
*/
public static function generateDialogMsgFormat(WebSocketDialogMsg $dialogMsg, $userid)
{
return [
'dialog_id' => $dialogMsg->dialog_id,
'created_at' => $dialogMsg->created_at,
'updated_at' => $dialogMsg->updated_at,
'msg_id' => $dialogMsg->id,
'sender_userid' => $dialogMsg->userid,
'msg_type' => $dialogMsg->type,
'key' => $dialogMsg->key,
'bot' => $dialogMsg->bot ? 1 : 0,
'relationship' => [
'name' => 'dialog_msg',
'parent' => self::generateDialogMsgParentId($dialogMsg, $userid)
]
];
}
/**
* 会话消息 - 同步到Elasticsearch
*/
@ -479,33 +548,14 @@ class ElasticSearch
$params = ['body' => []];
foreach ($dialogUsers as $dialogUser) {
$parentId = "user_{$dialogUser->userid}_dialog_{$dialogMsg->dialog_id}";
$docId = "msg_{$dialogMsg->id}_user_{$dialogUser->userid}";
$params['body'][] = [
'index' => [
'_index' => self::DUM,
'_id' => $docId,
'routing' => $parentId
]
];
$params['body'][] = [
'dialog_id' => $dialogMsg->dialog_id,
'created_at' => $dialogMsg->created_at,
'updated_at' => $dialogMsg->updated_at,
'msg_id' => $dialogMsg->id,
'sender_userid' => $dialogMsg->userid,
'msg_type' => $dialogMsg->type,
'key' => $dialogMsg->key,
'bot' => $dialogMsg->bot ? 1 : 0,
'relationship' => [
'name' => 'dialog_msg',
'parent' => $parentId
'_id' => self::generateDialogMsgDicId($dialogMsg, $dialogUser->userid),
'routing' => self::generateDialogMsgParentId($dialogMsg, $dialogUser->userid)
]
];
$params['body'][] = self::generateDialogMsgFormat($dialogMsg, $dialogUser->userid);
}
if (!empty($params['body'])) {
@ -534,14 +584,11 @@ class ElasticSearch
$params = ['body' => []];
foreach ($dialogUsers as $dialogUser) {
$docId = "msg_{$dialogMsg->id}_user_{$dialogUser->userid}";
$parentId = "user_{$dialogUser->userid}_dialog_{$dialogMsg->dialog_id}";
$params['body'][] = [
'delete' => [
'_index' => self::DUM,
'_id' => $docId,
'routing' => $parentId
'_id' => self::generateDialogMsgDicId($dialogMsg, $dialogUser->userid),
'routing' => self::generateDialogMsgParentId($dialogMsg, $dialogUser->userid)
]
];
}

View File

@ -0,0 +1,30 @@
<?php
namespace App\Tasks;
use Carbon\Carbon;
use Illuminate\Support\Facades\Cache;
class ElasticSearchSyncTask extends AbstractTask
{
public function __construct()
{
parent::__construct();
}
public function start()
{
// 30分钟执行一次
$time = intval(Cache::get("ElasticSearchSyncTask:Time"));
if (time() - $time < 1800) {
return;
}
Cache::put("ElasticSearchSyncTask:Time", time(), Carbon::now()->addMinutes(10));
// 判断参数
@shell_exec("php /var/www/artisan elasticsearch:sync-dialog-user-msg --i");
}
public function end()
{
}
}