dootask/app/Console/Commands/SyncDialogUserMsgToElasticsearch.php
2025-03-08 10:15:58 +08:00

255 lines
8.2 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
namespace App\Console\Commands;
use App\Models\WebSocketDialogMsg;
use App\Models\WebSocketDialogUser;
use App\Module\ElasticSearch\ElasticSearchKeyValue;
use App\Module\ElasticSearch\ElasticSearchUserMsg;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Log;
class SyncDialogUserMsgToElasticsearch extends Command
{
/**
* 更新数据
* --f: 全量更新 (默认)
* --i: 增量更新从上次更新的最后一个ID接上
*
* 清理数据
* --c: 清除索引
*/
protected $signature = 'elasticsearch:sync-dialog-user-msg {--f} {--i} {--c} {--batch=500}';
protected $description = '同步聊天会话用户和消息到Elasticsearch';
protected $es;
/**
* SyncDialogUserMsgToElasticsearch constructor.
*/
public function __construct()
{
parent::__construct();
try {
$this->es = new ElasticSearchUserMsg();
} catch (\Exception $e) {
$this->error('Elasticsearch连接失败: ' . $e->getMessage());
exit(1);
}
}
/**
* @return int
* @throws \Exception
*/
public function handle()
{
$this->info('开始同步聊天数据...');
// 清除索引
if ($this->option('c')) {
$this->info('清除索引...');
if (!$this->es->indexExists()) {
$this->saveLastId(true);
$this->info('索引不存在');
return 0;
}
$result = $this->es->deleteIndex();
if (isset($result['error'])) {
$this->error('删除索引失败: ' . $result['error']);
return 1;
}
$this->saveLastId(true);
$this->info('索引删除成功');
return 0;
}
// 判断创建索引
if (!$this->es->indexExists()) {
$this->info('创建索引...');
$result = ElasticSearchUserMsg::generateIndex();
if (isset($result['error'])) {
$this->error('创建索引失败: ' . $result['error']);
return 1;
}
$this->saveLastId(true);
$this->info('索引创建成功');
}
// 同步用户-会话数据
$this->syncDialogUsers($this->option('batch'));
// 同步消息数据
$this->syncDialogMsgs($this->option('batch'));
// 完成
$this->info("\n同步完成");
return 0;
}
/**
* 保存最后一个ID
* @param string|true $type
* @param integer $lastId
*/
private function saveLastId($type, $lastId = 0)
{
if ($type === true) {
$setting = [];
} else {
$setting = ElasticSearchKeyValue::getArray('elasticSearch:sync');
$setting[$type] = $lastId;
}
ElasticSearchKeyValue::save('elasticSearch:sync', $setting);
}
/**
* 获取最后一个ID
* @param $type
* @return int
*/
private function getLastId($type)
{
if ($this->option('i')) {
$setting = ElasticSearchKeyValue::getArray('elasticSearch:sync');
return intval($setting[$type] ?? 0);
}
return 0;
}
/**
* 同步用户-会话数据(父文档)
* @param $batchSize
* @return void
*/
private function syncDialogUsers($batchSize)
{
$this->info("\n同步用户数据...");
$lastId = $this->getLastId('dialog_user');
$num = 0;
$count = WebSocketDialogUser::where('id', '>', $lastId)->count();
do {
// 获取一批用户-会话关系
$dialogUsers = WebSocketDialogUser::where('id', '>', $lastId)
->orderBy('id')
->limit($batchSize)
->get();
if ($dialogUsers->isEmpty()) {
break;
}
$num += count($dialogUsers);
$progress = round($num / $count * 100, 2);
$this->info("{$num}/{$count} ({$progress}%) 正在同步用户ID {$lastId} ~ {$dialogUsers->last()->id}");
// 批量索引数据
$params = ['body' => []];
foreach ($dialogUsers as $dialogUser) {
$params['body'][] = [
'index' => [
'_index' => ElasticSearchUserMsg::indexName(),
'_id' => ElasticSearchUserMsg::generateUserDicId($dialogUser),
]
];
$params['body'][] = ElasticSearchUserMsg::generateUserFormat($dialogUser);
}
if ($params['body']) {
$result = $this->es->bulk($params);
if (isset($result['errors']) && $result['errors']) {
$this->error('批量索引用户数据部分失败');
Log::error('Elasticsearch批量索引失败: ' . json_encode($result['items']));
}
}
$lastId = $dialogUsers->last()->id;
$this->saveLastId('dialog_user', $lastId);
} while (count($dialogUsers) == $batchSize);
$this->info("同步用户数据结束 - 最后ID {$lastId}");
}
/**
* 同步消息数据(子文档)
*/
private function syncDialogMsgs($batchSize)
{
$this->info("\n同步消息数据...");
$lastId = $this->getLastId('dialog_msg');
$num = 0;
$count = WebSocketDialogMsg::where('id', '>', $lastId)->count();
do {
// 获取一批消息
$dialogMsgs = WebSocketDialogMsg::where('id', '>', $lastId)
->orderBy('id')
->limit($batchSize)
->get();
if ($dialogMsgs->isEmpty()) {
break;
}
$num += count($dialogMsgs);
$progress = round($num / $count * 100, 2);
$this->info("{$num}/{$count} ({$progress}%) 正在同步消息ID {$lastId} ~ {$dialogMsgs->last()->id}");
// 获取这些消息所属的会话对应的所有用户
$dialogIds = $dialogMsgs->pluck('dialog_id')->unique()->toArray();
$userDialogMap = [];
if (!empty($dialogIds)) {
$dialogUsers = WebSocketDialogUser::whereIn('dialog_id', $dialogIds)->get();
foreach ($dialogUsers as $dialogUser) {
$userDialogMap[$dialogUser->dialog_id][] = $dialogUser->userid;
}
}
// 批量索引消息数据
$params = ['body' => []];
foreach ($dialogMsgs as $dialogMsg) {
// 如果该会话没有用户,跳过
if (empty($userDialogMap[$dialogMsg->dialog_id])) {
continue;
}
// 为每个用户-会话关系创建子文档
foreach ($userDialogMap[$dialogMsg->dialog_id] as $userid) {
$params['body'][] = [
'index' => [
'_index' => ElasticSearchUserMsg::indexName(),
'_id' => ElasticSearchUserMsg::generateMsgDicId($dialogMsg, $userid),
'routing' => ElasticSearchUserMsg::generateMsgParentId($dialogMsg, $userid) // 路由到父文档
]
];
$params['body'][] = ElasticSearchUserMsg::generateMsgFormat($dialogMsg, $userid);
}
}
if (!empty($params['body'])) {
// 分批处理
$chunks = array_chunk($params['body'], 1000);
foreach ($chunks as $chunk) {
$chunkParams = ['body' => $chunk];
$result = $this->es->bulk($chunkParams);
if (isset($result['errors']) && $result['errors']) {
$this->error('批量索引消息数据部分失败');
Log::error('Elasticsearch批量索引失败: ' . json_encode($result['items']));
}
}
}
$lastId = $dialogMsgs->last()->id;
$this->saveLastId('dialog_msg', $lastId);
} while (count($dialogMsgs) == $batchSize);
$this->info("同步消息结束 - 最后ID {$lastId}");
}
}