perf: 优化全文搜索

This commit is contained in:
kuaifan 2025-04-18 12:56:21 +08:00
parent 924f0a9f7c
commit 5aed9ce29e
5 changed files with 195 additions and 58 deletions

View File

@ -4,10 +4,11 @@ namespace App\Console\Commands;
use App\Models\WebSocketDialogMsg; use App\Models\WebSocketDialogMsg;
use App\Module\ZincSearch\ZincSearchKeyValue; use App\Module\ZincSearch\ZincSearchKeyValue;
use App\Module\ZincSearch\ZincSearchDialogUserMsg; use App\Module\ZincSearch\ZincSearchDialogMsg;
use Cache;
use Illuminate\Console\Command; use Illuminate\Console\Command;
class SyncUserMsgToSearch extends Command class SyncUserMsgToZincSearch extends Command
{ {
/** /**
* 更新数据 * 更新数据
@ -18,7 +19,7 @@ class SyncUserMsgToSearch extends Command
* --c: 清除索引 * --c: 清除索引
*/ */
protected $signature = 'search:sync-user-msg {--f} {--i} {--c} {--batch=1000}'; protected $signature = 'zinc:sync-user-msg {--f} {--i} {--c} {--batch=1000}';
protected $description = '同步聊天会话用户和消息到 ZincSearch'; protected $description = '同步聊天会话用户和消息到 ZincSearch';
/** /**
@ -26,23 +27,35 @@ class SyncUserMsgToSearch extends Command
*/ */
public function handle(): int public function handle(): int
{ {
// 清除索引 // 使用缓存锁确保一次只能运行一个实例
if ($this->option('c')) { $lock = Cache::lock('zinc:sync-user-msg', 3600 * 6); // 锁定6小时
$this->info('清除索引...'); if (!$lock->get()) {
ZincSearchKeyValue::clear(); $this->error('命令已在运行中,请等待当前实例完成');
ZincSearchDialogUserMsg::clear(); return 1;
$this->info("索引删除成功");
return 0;
} }
$this->info('开始同步聊天数据...'); try {
// 清除索引
if ($this->option('c')) {
$this->info('清除索引...');
ZincSearchKeyValue::clear();
ZincSearchDialogMsg::clear();
$this->info("索引删除成功");
return 0;
}
// 同步消息数据 $this->info('开始同步聊天数据...');
$this->syncDialogMsgs();
// 完成 // 同步消息数据
$this->info("\n同步完成"); $this->syncDialogMsgs();
return 0;
// 完成
$this->info("\n同步完成");
return 0;
} finally {
// 确保无论如何都会释放锁
$lock->release();
}
} }
/** /**
@ -78,7 +91,7 @@ class SyncUserMsgToSearch extends Command
$this->info("{$num}/{$count} ({$progress}%) 正在同步消息ID {$lastId} ~ {$dialogMsgs->last()->id}"); $this->info("{$num}/{$count} ({$progress}%) 正在同步消息ID {$lastId} ~ {$dialogMsgs->last()->id}");
// 同步数据 // 同步数据
ZincSearchDialogUserMsg::batchSync($dialogMsgs); ZincSearchDialogMsg::batchSync($dialogMsgs);
// 更新最后ID // 更新最后ID
$lastId = $dialogMsgs->last()->id; $lastId = $dialogMsgs->last()->id;

View File

@ -28,7 +28,7 @@ use App\Models\WebSocketDialogMsgTodo;
use App\Models\WebSocketDialogMsgTranslate; use App\Models\WebSocketDialogMsgTranslate;
use App\Models\WebSocketDialogSession; use App\Models\WebSocketDialogSession;
use App\Module\Table\OnlineData; use App\Module\Table\OnlineData;
use App\Module\ZincSearch\ZincSearchDialogUserMsg; use App\Module\ZincSearch\ZincSearchDialogMsg;
use Hhxsv5\LaravelS\Swoole\Task\Task; use Hhxsv5\LaravelS\Swoole\Task\Task;
/** /**
@ -174,7 +174,7 @@ class DialogController extends AbstractController
} }
// 搜索消息会话 // 搜索消息会话
if (count($list) < 20) { if (count($list) < 20) {
$searchResults = ZincSearchDialogUserMsg::searchByKeyword($user->userid, $key, 0, 20 - count($list)); $searchResults = ZincSearchDialogMsg::search($user->userid, $key, 0, 20 - count($list));
if ($searchResults) { if ($searchResults) {
foreach ($searchResults as $item) { foreach ($searchResults as $item) {
if ($dialog = WebSocketDialog::find($item['id'])) { if ($dialog = WebSocketDialog::find($item['id'])) {
@ -728,7 +728,7 @@ class DialogController extends AbstractController
$key = trim(Request::input('key')); $key = trim(Request::input('key'));
$list = []; $list = [];
// //
$searchResults = ZincSearchDialogUserMsg::searchByKeyword($user->userid, $key, 0, Base::getPaginate(50, 20)); $searchResults = ZincSearchDialogMsg::search($user->userid, $key, 0, Base::getPaginate(50, 20));
if ($searchResults) { if ($searchResults) {
foreach ($searchResults as $item) { foreach ($searchResults as $item) {
if ($dialog = WebSocketDialog::find($item['id'])) { if ($dialog = WebSocketDialog::find($item['id'])) {

View File

@ -4,7 +4,9 @@ namespace App\Module\ZincSearch;
use App\Models\WebSocketDialogMsg; use App\Models\WebSocketDialogMsg;
use App\Models\WebSocketDialogUser; use App\Models\WebSocketDialogUser;
use Carbon\Carbon;
use Illuminate\Support\Facades\Log; use Illuminate\Support\Facades\Log;
use Swoole\Coroutine;
/** /**
* ZincSearch 会话消息类 * ZincSearch 会话消息类
@ -21,9 +23,9 @@ use Illuminate\Support\Facades\Log;
* - 单个同步: sync(WebSocketDialogMsg $dialogMsg); * - 单个同步: sync(WebSocketDialogMsg $dialogMsg);
* - 批量同步: batchSync(WebSocketDialogMsg[] $dialogMsgs); * - 批量同步: batchSync(WebSocketDialogMsg[] $dialogMsgs);
* - 用户同步: userSync(WebSocketDialogUser $dialogUser); * - 用户同步: userSync(WebSocketDialogUser $dialogUser);
* - 删除消息: delete(WebSocketDialogMsg|WebSocketDialogUser $data); * - 删除消息: delete(WebSocketDialogMsg|WebSocketDialogUser|int $data);
*/ */
class ZincSearchDialogUserMsg class ZincSearchDialogMsg
{ {
/** /**
* 索引名称 * 索引名称
@ -45,6 +47,7 @@ class ZincSearchDialogUserMsg
'properties' => [ 'properties' => [
// 拓展数据 // 拓展数据
'dialog_userid' => ['type' => 'keyword', 'index' => true], // 对话ID+用户ID 'dialog_userid' => ['type' => 'keyword', 'index' => true], // 对话ID+用户ID
'to_userid' => ['type' => 'numeric', 'index' => true], // 此消息发给的用户ID
// 消息数据 // 消息数据
'id' => ['type' => 'numeric', 'index' => true], 'id' => ['type' => 'numeric', 'index' => true],
@ -130,8 +133,7 @@ class ZincSearchDialogUserMsg
'query' => [ 'query' => [
'bool' => [ 'bool' => [
'must' => [ 'must' => [
['term' => ['userid' => $userid]], ['term' => ['to_userid' => $userid]],
['term' => ['bot' => 0]],
['match_phrase' => ['key' => $keyword]] ['match_phrase' => ['key' => $keyword]]
] ]
] ]
@ -145,18 +147,84 @@ class ZincSearchDialogUserMsg
try { try {
$result = ZincSearchBase::elasticSearch(self::$indexNameMsg, $searchParams); $result = ZincSearchBase::elasticSearch(self::$indexNameMsg, $searchParams);
return array_map(function ($hit) { $hits = $result['data']['hits']['hits'] ?? [];
// todo 格式化消息
return $hit['_source']; // 收集所有的用户信息
}, $result['data']['hits']['hits'] ?? []); $dialogUserids = [];
foreach ($hits as $hit) {
$source = $hit['_source'];
$dialogUserids[] = $source['dialog_userid'];
}
$userInfos = self::searchUser(array_unique($dialogUserids));
// 组合返回结果,将用户信息合并到消息中
$msgs = [];
foreach ($hits as $hit) {
$msgInfo = $hit['_source'];
$userInfo = $userInfos[$msgInfo['dialog_userid']] ?? [];
if ($userInfo) {
$msgs[] = [
'id' => $msgInfo['dialog_id'],
'search_msg_id' => $msgInfo['id'],
'user_at' => Carbon::parse($msgInfo['updated_at'])->format('Y-m-d H:i:s'),
'mark_unread' => $userInfo['mark_unread'],
'silence' => $userInfo['silence'],
'hide' => $userInfo['hide'],
'color' => $userInfo['color'],
'top_at' => Carbon::parse($userInfo['top_at'])->format('Y-m-d H:i:s'),
'last_at' => Carbon::parse($userInfo['last_at'])->format('Y-m-d H:i:s'),
];
}
}
return $msgs;
} catch (\Exception $e) { } catch (\Exception $e) {
Log::error('搜索对话消息失败: ' . $e->getMessage()); Log::error('search: ' . $e->getMessage());
return [ return [];
'success' => false, }
'error' => $e->getMessage(), }
'hits' => ['total' => ['value' => 0], 'hits' => []]
/**
* 根据对话用户ID搜索用户信息
* @param array $dialogUserids
* @return array
*/
private static function searchUser(array $dialogUserids): array
{
if (empty($dialogUserids)) {
return [];
}
$userInfos = [];
// 构建用户查询条件
$userSearchParams = [
'query' => [
'bool' => [
'should' => []
]
],
'size' => count($dialogUserids) // 确保取到所有符合条件的记录
];
// 添加所有 dialog_userid 到查询条件
foreach ($dialogUserids as $dialogUserid) {
$userSearchParams['query']['bool']['should'][] = [
'term' => ['dialog_userid' => $dialogUserid]
]; ];
} }
// 查询用户信息
$userResult = ZincSearchBase::elasticSearch(self::$indexNameUser, $userSearchParams);
$userHits = $userResult['data']['hits']['hits'] ?? [];
// 以 dialog_userid 为键保存用户信息
foreach ($userHits as $userHit) {
$userSource = $userHit['_source'];
$userInfos[$userSource['dialog_userid']] = $userSource;
}
return $userInfos;
} }
// ============================== // ==============================
@ -164,16 +232,11 @@ class ZincSearchDialogUserMsg
// ============================== // ==============================
/** /**
* 生成文档ID * 生成 dialog_userid
* *
* @param WebSocketDialogMsg $dialogMsg * @param WebSocketDialogUser $dialogUser
* @param int $userid
* @return string * @return string
*/ */
private static function generateDocId(WebSocketDialogMsg $dialogMsg, int $userid): string
{
return "{$dialogMsg->id}_{$userid}";
}
private static function generateDialogUserid(WebSocketDialogUser $dialogUser): string private static function generateDialogUserid(WebSocketDialogUser $dialogUser): string
{ {
return "{$dialogUser->dialog_id}_{$dialogUser->userid}"; return "{$dialogUser->dialog_id}_{$dialogUser->userid}";
@ -189,8 +252,9 @@ class ZincSearchDialogUserMsg
private static function generateMsgData(WebSocketDialogMsg $dialogMsg, WebSocketDialogUser $dialogUser): array private static function generateMsgData(WebSocketDialogMsg $dialogMsg, WebSocketDialogUser $dialogUser): array
{ {
return [ return [
'_id' => self::generateDocId($dialogMsg, $dialogUser->userid), '_id' => self::$indexNameMsg . "_" . $dialogMsg->id . "_" . $dialogUser->userid,
'dialog_userid' => self::generateDialogUserid($dialogUser), 'dialog_userid' => self::generateDialogUserid($dialogUser),
'to_userid' => $dialogUser->userid,
'id' => $dialogMsg->id, 'id' => $dialogMsg->id,
'dialog_id' => $dialogMsg->dialog_id, 'dialog_id' => $dialogMsg->dialog_id,
@ -206,7 +270,7 @@ class ZincSearchDialogUserMsg
private static function generateUserData(WebSocketDialogUser $dialogUser): array private static function generateUserData(WebSocketDialogUser $dialogUser): array
{ {
return [ return [
'_id' => $dialogUser->id, '_id' => self::$indexNameUser . "_" . $dialogUser->id,
'dialog_userid' => self::generateDialogUserid($dialogUser), 'dialog_userid' => self::generateDialogUserid($dialogUser),
'id' => $dialogUser->id, 'id' => $dialogUser->id,
@ -275,9 +339,9 @@ class ZincSearchDialogUserMsg
return true; return true;
} catch (\Exception $e) { } catch (\Exception $e) {
Log::error('syncMsg: ' . $e->getMessage()); Log::error('sync: ' . $e->getMessage());
return false;
} }
return false;
} }
/** /**
@ -352,7 +416,7 @@ class ZincSearchDialogUserMsg
} }
} catch (\Exception $e) { } catch (\Exception $e) {
Log::error('batchSyncMsgs: ' . $e->getMessage()); Log::error('batchSync: ' . $e->getMessage());
} }
return $count; return $count;
@ -369,14 +433,71 @@ class ZincSearchDialogUserMsg
return false; return false;
} }
$data = self::generateUserData($dialogUser); $data = self::generateUserData($dialogUser);
$result = ZincSearchBase::addDoc(self::$indexNameUser, $data);
return $result['success'] ?? false; // 生成查询用户条件
$searchParams = [
'query' => [
'bool' => [
'must' => [
['term' => ['dialog_userid' => $data['dialog_userid']]]
]
]
],
'size' => 1
];
try {
// 查询用户是否存在
$result = ZincSearchBase::elasticSearch(self::$indexNameUser, $searchParams);
$hits = $result['data']['hits']['hits'] ?? [];
// 同步用户(存在更新、不存在添加)
$result = ZincSearchBase::addDoc(self::$indexNameUser, $data);
if (!isset($result['success'])) {
return false;
}
// 用户不存在,同步消息
if (empty($hits)) {
go(function () use ($dialogUser) {
Coroutine::sleep(0.1);
$lastId = 0; // 上次同步的最后ID
$batchSize = 500; // 每批处理的消息数量
// 分批同步消息
do {
// 获取一批
$dialogMsgs = WebSocketDialogMsg::whereDialogId($dialogUser->dialog_id)
->where('id', '>', $lastId)
->orderBy('id')
->limit($batchSize)
->get();
if ($dialogMsgs->isEmpty()) {
break;
}
// 同步数据
ZincSearchDialogMsg::batchSync($dialogMsgs);
// 更新最后ID
$lastId = $dialogMsgs->last()->id;
} while (count($dialogMsgs) == $batchSize);
});
}
return true;
} catch (\Exception $e) {
Log::error('userSync: ' . $e->getMessage());
return false;
}
} }
/** /**
* 删除 * 删除
* *
* @param WebSocketDialogMsg|WebSocketDialogUser $data * @param WebSocketDialogMsg|WebSocketDialogUser|int $data
* @return int * @return int
*/ */
public static function delete(mixed $data): int public static function delete(mixed $data): int
@ -397,7 +518,10 @@ class ZincSearchDialogUserMsg
'term' => self::generateDialogUserid($data), 'term' => self::generateDialogUserid($data),
]; ];
} else { } else {
return 0; $query = [
'field' => 'id',
'term' => (string) $data
];
} }
try { try {
@ -433,7 +557,7 @@ class ZincSearchDialogUserMsg
$from += $batchSize; $from += $batchSize;
} }
} catch (\Exception $e) { } catch (\Exception $e) {
Log::error('deleteMsg: ' . $e->getMessage()); Log::error('delete: ' . $e->getMessage());
} }
return $totalDeleted; return $totalDeleted;

View File

@ -3,7 +3,7 @@
namespace App\Observers; namespace App\Observers;
use App\Models\WebSocketDialogMsg; use App\Models\WebSocketDialogMsg;
use App\Module\ZincSearch\ZincSearchDialogUserMsg; use App\Module\ZincSearch\ZincSearchDialogMsg;
class WebSocketDialogMsgObserver class WebSocketDialogMsgObserver
{ {
@ -15,7 +15,7 @@ class WebSocketDialogMsgObserver
*/ */
public function created(WebSocketDialogMsg $webSocketDialogMsg) public function created(WebSocketDialogMsg $webSocketDialogMsg)
{ {
ZincSearchDialogUserMsg::syncMsg($webSocketDialogMsg); ZincSearchDialogMsg::sync($webSocketDialogMsg);
} }
/** /**
@ -26,7 +26,7 @@ class WebSocketDialogMsgObserver
*/ */
public function updated(WebSocketDialogMsg $webSocketDialogMsg) public function updated(WebSocketDialogMsg $webSocketDialogMsg)
{ {
ZincSearchDialogUserMsg::syncMsg($webSocketDialogMsg); ZincSearchDialogMsg::sync($webSocketDialogMsg);
} }
/** /**
@ -37,7 +37,7 @@ class WebSocketDialogMsgObserver
*/ */
public function deleted(WebSocketDialogMsg $webSocketDialogMsg) public function deleted(WebSocketDialogMsg $webSocketDialogMsg)
{ {
ZincSearchDialogUserMsg::deleteMsg($webSocketDialogMsg); ZincSearchDialogMsg::delete($webSocketDialogMsg);
} }
/** /**

View File

@ -4,7 +4,7 @@ namespace App\Observers;
use App\Models\Deleted; use App\Models\Deleted;
use App\Models\WebSocketDialogUser; use App\Models\WebSocketDialogUser;
use App\Module\ZincSearch\ZincSearchDialogUserMsg; use App\Module\ZincSearch\ZincSearchDialogMsg;
use Carbon\Carbon; use Carbon\Carbon;
class WebSocketDialogUserObserver class WebSocketDialogUserObserver
@ -30,7 +30,7 @@ class WebSocketDialogUserObserver
} }
} }
Deleted::forget('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid); Deleted::forget('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid);
ZincSearchDialogUserMsg::syncUser($webSocketDialogUser); ZincSearchDialogMsg::userSync($webSocketDialogUser);
} }
/** /**
@ -41,7 +41,7 @@ class WebSocketDialogUserObserver
*/ */
public function updated(WebSocketDialogUser $webSocketDialogUser) public function updated(WebSocketDialogUser $webSocketDialogUser)
{ {
ZincSearchDialogUserMsg::syncUser($webSocketDialogUser); ZincSearchDialogMsg::userSync($webSocketDialogUser);
} }
/** /**
@ -53,7 +53,7 @@ class WebSocketDialogUserObserver
public function deleted(WebSocketDialogUser $webSocketDialogUser) public function deleted(WebSocketDialogUser $webSocketDialogUser)
{ {
Deleted::record('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid); Deleted::record('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid);
ZincSearchDialogUserMsg::deleteUser($webSocketDialogUser); ZincSearchDialogMsg::delete($webSocketDialogUser);
} }
/** /**