perf: 优化全文搜索

This commit is contained in:
kuaifan 2025-04-17 22:02:34 +08:00
parent 9eba376976
commit fe84f812e7
3 changed files with 25 additions and 97 deletions

View File

@ -3,7 +3,6 @@
namespace App\Console\Commands; namespace App\Console\Commands;
use App\Models\WebSocketDialogMsg; use App\Models\WebSocketDialogMsg;
use App\Models\WebSocketDialogUser;
use App\Module\ZincSearch\ZincSearchKeyValue; use App\Module\ZincSearch\ZincSearchKeyValue;
use App\Module\ZincSearch\ZincSearchUserMsg; use App\Module\ZincSearch\ZincSearchUserMsg;
use Illuminate\Console\Command; use Illuminate\Console\Command;
@ -22,36 +21,24 @@ class SyncDialogUserMsgToZincSearch extends Command
protected $signature = 'zinc:sync-dialog-user-msg {--f} {--i} {--c} {--batch=1000}'; protected $signature = 'zinc:sync-dialog-user-msg {--f} {--i} {--c} {--batch=1000}';
protected $description = '同步聊天会话用户和消息到 ZincSearch'; protected $description = '同步聊天会话用户和消息到 ZincSearch';
/**
* SyncDialogUserMsgToElasticsearch constructor.
*/
public function __construct()
{
parent::__construct();
}
/** /**
* @return int * @return int
* @throws \Exception
*/ */
public function handle() public function handle(): int
{ {
// 清除索引 // 清除索引
if ($this->option('c')) { if ($this->option('c')) {
$this->info('清除索引...'); $this->info('清除索引...');
ZincSearchUserMsg::clear();
ZincSearchKeyValue::clear(); ZincSearchKeyValue::clear();
ZincSearchUserMsg::clear();
$this->info("索引删除成功"); $this->info("索引删除成功");
return 0; return 0;
} }
$this->info('开始同步聊天数据...'); $this->info('开始同步聊天数据...');
// 同步用户-会话数据
$this->syncDialogUsers($this->option('batch'));
// 同步消息数据 // 同步消息数据
$this->syncDialogMsgs($this->option('batch')); $this->syncDialogMsgs();
// 完成 // 完成
$this->info("\n同步完成"); $this->info("\n同步完成");
@ -59,77 +46,21 @@ class SyncDialogUserMsgToZincSearch extends Command
} }
/** /**
* 保存最后一个ID * 同步消息数据
* @param string $type *
* @param int $lastId
*/
private function saveLastId(string $type, int $lastId = 0): void
{
ZincSearchKeyValue::set("sync", ["{$type}" => $lastId], true);
}
/**
* 获取最后一个ID
* @param $type
* @return int
*/
private function getLastId($type): int
{
if ($this->option("i")) {
$array = ZincSearchKeyValue::getArray("sync");
return intval($array[$type] ?? 0);
}
return 0;
}
/**
* 同步用户-会话数据(父文档)
* @param $batchSize
* @return void * @return void
*/ */
private function syncDialogUsers($batchSize) private function syncDialogMsgs(): void
{
$this->info("\n同步用户数据...");
$lastId = $this->getLastId('dialog_user');
$num = 0;
$count = WebSocketDialogUser::where('id', '>', $lastId)->count();
do {
// 获取一批用户-会话关系
$dialogUsers = WebSocketDialogUser::where('id', '>', $lastId)
->orderBy('id')
->limit($batchSize)
->get();
if ($dialogUsers->isEmpty()) {
break;
}
$num += count($dialogUsers);
$progress = round($num / $count * 100, 2);
$this->info("{$num}/{$count} ({$progress}%) 正在同步用户ID {$lastId} ~ {$dialogUsers->last()->id}");
// 批量索引数据
ZincSearchUserMsg::batchSyncUsers($dialogUsers);
$lastId = $dialogUsers->last()->id;
$this->saveLastId('dialog_user', $lastId);
} while (count($dialogUsers) == $batchSize);
$this->info("同步用户数据结束 - 最后ID {$lastId}");
}
/**
* 同步消息数据(子文档)
*/
private function syncDialogMsgs($batchSize)
{ {
$this->info("\n同步消息数据..."); $this->info("\n同步消息数据...");
$lastId = $this->getLastId('dialog_msg');
// 获取上次同步的最后ID
$lastKey = "sync:userMsgLastId";
$lastId = $this->option('i') ? intval(ZincSearchKeyValue::get($lastKey, 0)) : 0;
$num = 0; $num = 0;
$count = WebSocketDialogMsg::where('id', '>', $lastId)->count(); $count = WebSocketDialogMsg::where('id', '>', $lastId)->count();
$batchSize = $this->option('batch');
do { do {
// 获取一批消息 // 获取一批消息
@ -149,8 +80,9 @@ class SyncDialogUserMsgToZincSearch extends Command
// 批量索引数据 // 批量索引数据
ZincSearchUserMsg::batchSyncMsgs($dialogMsgs); ZincSearchUserMsg::batchSyncMsgs($dialogMsgs);
// 更新最后ID
$lastId = $dialogMsgs->last()->id; $lastId = $dialogMsgs->last()->id;
$this->saveLastId('dialog_msg', $lastId); ZincSearchKeyValue::set($lastKey, $lastId);
} while (count($dialogMsgs) == $batchSize); } while (count($dialogMsgs) == $batchSize);
$this->info("同步消息结束 - 最后ID {$lastId}"); $this->info("同步消息结束 - 最后ID {$lastId}");

View File

@ -180,17 +180,6 @@ class ZincSearchKeyValue
return $hits[0]['_source']['value'] ?? $default; return $hits[0]['_source']['value'] ?? $default;
} }
/**
* 获取键值,返回数组
* @param string $key 键名
* @param array $default 默认值,当键不存在时返回
* @return array
*/
public static function getArray(string $key, array $default = []): array
{
return Base::string2array(self::get($key, $default));
}
/** /**
* 删除键值 * 删除键值
* *

View File

@ -69,8 +69,8 @@ class ZincSearchUserMsg
'bot' => ['type' => 'numeric', 'index' => true], 'bot' => ['type' => 'numeric', 'index' => true],
// 关联字段 // 关联字段
'_userid_msg_id_' => ['type' => 'keyword', 'index' => true], 'userid_msg_id' => ['type' => 'keyword', 'index' => true],
'_userid_dialog_id_' => ['type' => 'keyword', 'index' => true], 'userid_dialog_id' => ['type' => 'keyword', 'index' => true],
] ]
]; ];
$result = ZincSearchBase::createIndex(self::$indexName, $mappings); $result = ZincSearchBase::createIndex(self::$indexName, $mappings);
@ -193,8 +193,8 @@ class ZincSearchUserMsg
'key' => $dialogMsg->key, 'key' => $dialogMsg->key,
'bot' => $dialogMsg->bot ? 1 : 0, 'bot' => $dialogMsg->bot ? 1 : 0,
'_userid_msg_id_' => self::generateUseridMsgId($dialogMsg, $dialogUser), 'userid_msg_id' => self::generateUseridMsgId($dialogMsg, $dialogUser),
'_userid_dialog_id_' => self::generateUseridDialogId($dialogUser), 'userid_dialog_id' => self::generateUseridDialogId($dialogUser),
]; ];
} }
@ -356,8 +356,11 @@ class ZincSearchUserMsg
public static function syncUser(WebSocketDialogUser $dialogUser): void public static function syncUser(WebSocketDialogUser $dialogUser): void
{ {
$batchSize = 1000; // 每批处理的文档数量 $batchSize = 1000; // 每批处理的文档数量
$lastId = 0; // 上次处理的最后ID
do { do {
$dialogMsgs = WebSocketDialogMsg::whereDialogId($dialogUser->dialog_id) $dialogMsgs = WebSocketDialogMsg::whereDialogId($dialogUser->dialog_id)
->where('id', '>', $lastId)
->orderBy('id') ->orderBy('id')
->limit($batchSize) ->limit($batchSize)
->get(); ->get();
@ -367,6 +370,10 @@ class ZincSearchUserMsg
} }
ZincSearchUserMsg::batchSyncMsgs($dialogMsgs); ZincSearchUserMsg::batchSyncMsgs($dialogMsgs);
// 记录最后处理的ID
$lastId = $dialogMsgs->last()->id;
} while (count($dialogMsgs) == $batchSize); } while (count($dialogMsgs) == $batchSize);
} }
@ -401,7 +408,7 @@ class ZincSearchUserMsg
$result = ZincSearchBase::advancedSearch(self::$indexName, [ $result = ZincSearchBase::advancedSearch(self::$indexName, [
'search_type' => 'term', 'search_type' => 'term',
'query' => [ 'query' => [
'field' => '_userid_dialog_id_', 'field' => 'userid_dialog_id',
'term' => self::generateUseridDialogId($dialogUser), 'term' => self::generateUseridDialogId($dialogUser),
], ],
'from' => $from, 'from' => $from,