diff --git a/app/Console/Commands/SyncDialogUserMsgToZincSearch.php b/app/Console/Commands/SyncDialogUserMsgToZincSearch.php index acef0e30b..62f375056 100644 --- a/app/Console/Commands/SyncDialogUserMsgToZincSearch.php +++ b/app/Console/Commands/SyncDialogUserMsgToZincSearch.php @@ -3,7 +3,6 @@ namespace App\Console\Commands; use App\Models\WebSocketDialogMsg; -use App\Models\WebSocketDialogUser; use App\Module\ZincSearch\ZincSearchKeyValue; use App\Module\ZincSearch\ZincSearchUserMsg; use Illuminate\Console\Command; @@ -22,36 +21,24 @@ class SyncDialogUserMsgToZincSearch extends Command protected $signature = 'zinc:sync-dialog-user-msg {--f} {--i} {--c} {--batch=1000}'; protected $description = '同步聊天会话用户和消息到 ZincSearch'; - /** - * SyncDialogUserMsgToElasticsearch constructor. - */ - public function __construct() - { - parent::__construct(); - } - /** * @return int - * @throws \Exception */ - public function handle() + public function handle(): int { // 清除索引 if ($this->option('c')) { $this->info('清除索引...'); - ZincSearchUserMsg::clear(); ZincSearchKeyValue::clear(); + ZincSearchUserMsg::clear(); $this->info("索引删除成功"); return 0; } $this->info('开始同步聊天数据...'); - // 同步用户-会话数据 - $this->syncDialogUsers($this->option('batch')); - // 同步消息数据 - $this->syncDialogMsgs($this->option('batch')); + $this->syncDialogMsgs(); // 完成 $this->info("\n同步完成"); @@ -59,77 +46,21 @@ class SyncDialogUserMsgToZincSearch extends Command } /** - * 保存最后一个ID - * @param string $type - * @param int $lastId - */ - private function saveLastId(string $type, int $lastId = 0): void - { - ZincSearchKeyValue::set("sync", ["{$type}" => $lastId], true); - } - - /** - * 获取最后一个ID - * @param $type - * @return int - */ - private function getLastId($type): int - { - if ($this->option("i")) { - $array = ZincSearchKeyValue::getArray("sync"); - return intval($array[$type] ?? 0); - } - return 0; - } - - /** - * 同步用户-会话数据(父文档) - * @param $batchSize + * 同步消息数据 + * * @return void */ - private function syncDialogUsers($batchSize) - { - $this->info("\n同步用户数据..."); - $lastId = $this->getLastId('dialog_user'); - - $num = 0; - $count = WebSocketDialogUser::where('id', '>', $lastId)->count(); - - do { - // 获取一批用户-会话关系 - $dialogUsers = WebSocketDialogUser::where('id', '>', $lastId) - ->orderBy('id') - ->limit($batchSize) - ->get(); - - if ($dialogUsers->isEmpty()) { - break; - } - - $num += count($dialogUsers); - $progress = round($num / $count * 100, 2); - $this->info("{$num}/{$count} ({$progress}%) 正在同步用户ID {$lastId} ~ {$dialogUsers->last()->id}"); - - // 批量索引数据 - ZincSearchUserMsg::batchSyncUsers($dialogUsers); - - $lastId = $dialogUsers->last()->id; - $this->saveLastId('dialog_user', $lastId); - } while (count($dialogUsers) == $batchSize); - - $this->info("同步用户数据结束 - 最后ID {$lastId}"); - } - - /** - * 同步消息数据(子文档) - */ - private function syncDialogMsgs($batchSize) + private function syncDialogMsgs(): void { $this->info("\n同步消息数据..."); - $lastId = $this->getLastId('dialog_msg'); + + // 获取上次同步的最后ID + $lastKey = "sync:userMsgLastId"; + $lastId = $this->option('i') ? intval(ZincSearchKeyValue::get($lastKey, 0)) : 0; $num = 0; $count = WebSocketDialogMsg::where('id', '>', $lastId)->count(); + $batchSize = $this->option('batch'); do { // 获取一批消息 @@ -149,8 +80,9 @@ class SyncDialogUserMsgToZincSearch extends Command // 批量索引数据 ZincSearchUserMsg::batchSyncMsgs($dialogMsgs); + // 更新最后ID $lastId = $dialogMsgs->last()->id; - $this->saveLastId('dialog_msg', $lastId); + ZincSearchKeyValue::set($lastKey, $lastId); } while (count($dialogMsgs) == $batchSize); $this->info("同步消息结束 - 最后ID {$lastId}"); diff --git a/app/Module/ZincSearch/ZincSearchKeyValue.php b/app/Module/ZincSearch/ZincSearchKeyValue.php index 7c446d7e0..cd2570bda 100644 --- a/app/Module/ZincSearch/ZincSearchKeyValue.php +++ b/app/Module/ZincSearch/ZincSearchKeyValue.php @@ -180,17 +180,6 @@ class ZincSearchKeyValue return $hits[0]['_source']['value'] ?? $default; } - /** - * 获取键值,返回数组 - * @param string $key 键名 - * @param array $default 默认值,当键不存在时返回 - * @return array - */ - public static function getArray(string $key, array $default = []): array - { - return Base::string2array(self::get($key, $default)); - } - /** * 删除键值 * diff --git a/app/Module/ZincSearch/ZincSearchUserMsg.php b/app/Module/ZincSearch/ZincSearchUserMsg.php index 5dfcc76bb..556045adc 100644 --- a/app/Module/ZincSearch/ZincSearchUserMsg.php +++ b/app/Module/ZincSearch/ZincSearchUserMsg.php @@ -69,8 +69,8 @@ class ZincSearchUserMsg 'bot' => ['type' => 'numeric', 'index' => true], // 关联字段 - '_userid_msg_id_' => ['type' => 'keyword', 'index' => true], - '_userid_dialog_id_' => ['type' => 'keyword', 'index' => true], + 'userid_msg_id' => ['type' => 'keyword', 'index' => true], + 'userid_dialog_id' => ['type' => 'keyword', 'index' => true], ] ]; $result = ZincSearchBase::createIndex(self::$indexName, $mappings); @@ -193,8 +193,8 @@ class ZincSearchUserMsg 'key' => $dialogMsg->key, 'bot' => $dialogMsg->bot ? 1 : 0, - '_userid_msg_id_' => self::generateUseridMsgId($dialogMsg, $dialogUser), - '_userid_dialog_id_' => self::generateUseridDialogId($dialogUser), + 'userid_msg_id' => self::generateUseridMsgId($dialogMsg, $dialogUser), + 'userid_dialog_id' => self::generateUseridDialogId($dialogUser), ]; } @@ -356,8 +356,11 @@ class ZincSearchUserMsg public static function syncUser(WebSocketDialogUser $dialogUser): void { $batchSize = 1000; // 每批处理的文档数量 + $lastId = 0; // 上次处理的最后ID + do { $dialogMsgs = WebSocketDialogMsg::whereDialogId($dialogUser->dialog_id) + ->where('id', '>', $lastId) ->orderBy('id') ->limit($batchSize) ->get(); @@ -367,6 +370,10 @@ class ZincSearchUserMsg } ZincSearchUserMsg::batchSyncMsgs($dialogMsgs); + + // 记录最后处理的ID + $lastId = $dialogMsgs->last()->id; + } while (count($dialogMsgs) == $batchSize); } @@ -401,7 +408,7 @@ class ZincSearchUserMsg $result = ZincSearchBase::advancedSearch(self::$indexName, [ 'search_type' => 'term', 'query' => [ - 'field' => '_userid_dialog_id_', + 'field' => 'userid_dialog_id', 'term' => self::generateUseridDialogId($dialogUser), ], 'from' => $from,