mirror of
https://github.com/kuaifan/dootask.git
synced 2025-12-12 19:35:50 +00:00
perf: 优化全文搜索
This commit is contained in:
parent
462705c4ed
commit
9eba376976
@ -6,11 +6,14 @@ use App\Module\Base;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* ZincSearch 键值存储类
|
* ZincSearch 键值存储类
|
||||||
* 基于ZincSearch实现简单的键值存储功能
|
|
||||||
*
|
*
|
||||||
* 使用方法:
|
* 使用方法:
|
||||||
*
|
*
|
||||||
* 1. 基本操作
|
* 1. 基础方法
|
||||||
|
* - 确保索引存在: ZincSearchKeyValue::ensureIndex();
|
||||||
|
* - 清空所有数据: ZincSearchKeyValue::clear();
|
||||||
|
*
|
||||||
|
* 2. 基本操作
|
||||||
* - 设置键值: ZincSearchKeyValue::set('site_name', '我的网站');
|
* - 设置键值: ZincSearchKeyValue::set('site_name', '我的网站');
|
||||||
* - 设置复杂数据: ZincSearchKeyValue::set('site_config', ['logo' => 'logo.png', 'theme' => 'dark']);
|
* - 设置复杂数据: ZincSearchKeyValue::set('site_config', ['logo' => 'logo.png', 'theme' => 'dark']);
|
||||||
* - 合并现有数据: ZincSearchKeyValue::set('site_config', ['footer' => '版权所有'], true);
|
* - 合并现有数据: ZincSearchKeyValue::set('site_config', ['footer' => '版权所有'], true);
|
||||||
@ -18,12 +21,9 @@ use App\Module\Base;
|
|||||||
* - 获取键值带默认值: $theme = ZincSearchKeyValue::get('theme', 'light');
|
* - 获取键值带默认值: $theme = ZincSearchKeyValue::get('theme', 'light');
|
||||||
* - 删除键值: ZincSearchKeyValue::delete('temporary_data');
|
* - 删除键值: ZincSearchKeyValue::delete('temporary_data');
|
||||||
*
|
*
|
||||||
* 2. 批量操作
|
* 3. 批量操作
|
||||||
* - 批量设置: ZincSearchKeyValue::batchSet(['user_count' => 100, 'active_users' => 50]);
|
* - 批量设置: ZincSearchKeyValue::batchSet(['user_count' => 100, 'active_users' => 50]);
|
||||||
* - 批量获取: $stats = ZincSearchKeyValue::batchGet(['user_count', 'active_users']);
|
* - 批量获取: $stats = ZincSearchKeyValue::batchGet(['user_count', 'active_users']);
|
||||||
*
|
|
||||||
* 3. 其他操作
|
|
||||||
* - 清空所有数据: ZincSearchKeyValue::clear();
|
|
||||||
*/
|
*/
|
||||||
class ZincSearchKeyValue
|
class ZincSearchKeyValue
|
||||||
{
|
{
|
||||||
@ -39,7 +39,7 @@ class ZincSearchKeyValue
|
|||||||
/**
|
/**
|
||||||
* 确保索引存在
|
* 确保索引存在
|
||||||
*/
|
*/
|
||||||
private static function ensureIndex(): bool
|
public static function ensureIndex(): bool
|
||||||
{
|
{
|
||||||
if (!ZincSearchBase::indexExists(self::$indexName)) {
|
if (!ZincSearchBase::indexExists(self::$indexName)) {
|
||||||
$mappings = [
|
$mappings = [
|
||||||
@ -56,6 +56,27 @@ class ZincSearchKeyValue
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 清空所有键值
|
||||||
|
*
|
||||||
|
* @return bool 是否成功
|
||||||
|
*/
|
||||||
|
public static function clear(): bool
|
||||||
|
{
|
||||||
|
// 检查索引是否存在
|
||||||
|
if (!ZincSearchBase::indexExists(self::$indexName)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 删除再重建索引
|
||||||
|
$deleteResult = ZincSearchBase::deleteIndex(self::$indexName);
|
||||||
|
if (!($deleteResult['success'] ?? false)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return self::ensureIndex();
|
||||||
|
}
|
||||||
|
|
||||||
// ==============================
|
// ==============================
|
||||||
// 基本操作
|
// 基本操作
|
||||||
// ==============================
|
// ==============================
|
||||||
@ -265,29 +286,4 @@ class ZincSearchKeyValue
|
|||||||
|
|
||||||
return $results;
|
return $results;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ==============================
|
|
||||||
// 其他操作
|
|
||||||
// ==============================
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 清空所有键值
|
|
||||||
*
|
|
||||||
* @return bool 是否成功
|
|
||||||
*/
|
|
||||||
public static function clear(): bool
|
|
||||||
{
|
|
||||||
// 检查索引是否存在
|
|
||||||
if (!ZincSearchBase::indexExists(self::$indexName)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 删除再重建索引
|
|
||||||
$deleteResult = ZincSearchBase::deleteIndex(self::$indexName);
|
|
||||||
if (!($deleteResult['success'] ?? false)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return self::ensureIndex();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -7,30 +7,26 @@ use App\Models\WebSocketDialogUser;
|
|||||||
use Illuminate\Support\Facades\Log;
|
use Illuminate\Support\Facades\Log;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 对话系统消息索引
|
* ZincSearch 会话消息类
|
||||||
*
|
*
|
||||||
* 使用方法:
|
* 使用方法:
|
||||||
*
|
*
|
||||||
* 1. 索引管理
|
* 1. 基础方法
|
||||||
* - 创建索引: ZincSearchUserMsg::generateIndex();
|
* - 确保索引存在: ZincSearchKeyValue::ensureIndex();
|
||||||
* - 检查索引: ZincSearchBase::indexExists(ZincSearchUserMsg::$indexName);
|
* - 清空所有数据: ZincSearchKeyValue::clear();
|
||||||
* - 清空索引: ZincSearchUserMsg::clear();
|
|
||||||
*
|
*
|
||||||
* 2. 会话用户操作
|
* 2. 搜索方法
|
||||||
* - 单个同步: ZincSearchUserMsg::syncUser($dialogUser);
|
* - 关键词搜索: ZincSearchUserMsg::searchByKeyword('用户ID', '关键词');
|
||||||
* - 批量同步: ZincSearchUserMsg::batchSyncUsers($dialogUsers);
|
|
||||||
* - 删除用户: ZincSearchUserMsg::deleteUser($dialogUser);
|
|
||||||
*
|
*
|
||||||
* 3. 会话消息操作
|
* 3. 基本方法
|
||||||
* - 单个同步: ZincSearchUserMsg::syncMsg($dialogMsg);
|
* - 单个同步: ZincSearchUserMsg::syncMsg($dialogMsg);
|
||||||
* - 批量同步: ZincSearchUserMsg::batchSyncMsgs($dialogMsgs);
|
* - 批量同步: ZincSearchUserMsg::batchSyncMsgs($dialogMsgs);
|
||||||
* - 删除消息: ZincSearchUserMsg::deleteMsg($dialogMsg);
|
* - 删除消息: ZincSearchUserMsg::deleteMsg($dialogMsg);
|
||||||
*
|
*
|
||||||
* 4. 搜索功能
|
* 4. 用户方法
|
||||||
* - 关键词搜索: ZincSearchUserMsg::searchByKeyword('用户ID', '关键词');
|
* - 单个同步: ZincSearchUserMsg::syncUser($dialogUser);
|
||||||
*
|
* - 批量同步: ZincSearchUserMsg::batchSyncUsers($dialogUsers);
|
||||||
* Class ZincSearchUserMsg
|
* - 删除消息: ZincSearchUserMsg::deleteUser($dialogUser);
|
||||||
* @package App\Module\ZincSearch
|
|
||||||
*/
|
*/
|
||||||
class ZincSearchUserMsg
|
class ZincSearchUserMsg
|
||||||
{
|
{
|
||||||
@ -40,86 +36,72 @@ class ZincSearchUserMsg
|
|||||||
protected static string $indexName = 'userMsg';
|
protected static string $indexName = 'userMsg';
|
||||||
|
|
||||||
// ==============================
|
// ==============================
|
||||||
// 索引管理相关方法
|
// 基础方法
|
||||||
// ==============================
|
// ==============================
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 创建聊天系统索引
|
* 确保索引存在
|
||||||
*
|
|
||||||
* @return array
|
|
||||||
*/
|
*/
|
||||||
public static function generateIndex(): array
|
public static function ensureIndex(): bool
|
||||||
{
|
{
|
||||||
// 定义映射
|
if (!ZincSearchBase::indexExists(self::$indexName)) {
|
||||||
$mappings = [
|
$mappings = [
|
||||||
'properties' => [
|
'properties' => [
|
||||||
// 共用字段
|
// 共用字段
|
||||||
'dialog_id' => ['type' => 'keyword', 'index' => true],
|
'dialog_id' => ['type' => 'keyword', 'index' => true],
|
||||||
'created_at' => ['type' => 'date', 'index' => true],
|
'created_at' => ['type' => 'date', 'index' => true],
|
||||||
'updated_at' => ['type' => 'date', 'index' => true],
|
'updated_at' => ['type' => 'date', 'index' => true],
|
||||||
|
|
||||||
// dialog_users 字段
|
// dialog_users 字段
|
||||||
'userid' => ['type' => 'keyword', 'index' => true],
|
'userid' => ['type' => 'keyword', 'index' => true],
|
||||||
'top_at' => ['type' => 'date', 'index' => true],
|
'top_at' => ['type' => 'date', 'index' => true],
|
||||||
'last_at' => ['type' => 'date', 'index' => true],
|
'last_at' => ['type' => 'date', 'index' => true],
|
||||||
'mark_unread' => ['type' => 'numeric', 'index' => true],
|
'mark_unread' => ['type' => 'numeric', 'index' => true],
|
||||||
'silence' => ['type' => 'numeric', 'index' => true],
|
'silence' => ['type' => 'numeric', 'index' => true],
|
||||||
'hide' => ['type' => 'numeric', 'index' => true],
|
'hide' => ['type' => 'numeric', 'index' => true],
|
||||||
'color' => ['type' => 'keyword', 'index' => true],
|
'color' => ['type' => 'keyword', 'index' => true],
|
||||||
|
|
||||||
// dialog_msgs 字段
|
// dialog_msgs 字段
|
||||||
'msg_id' => ['type' => 'keyword', 'index' => true],
|
'msg_id' => ['type' => 'keyword', 'index' => true],
|
||||||
'sender_userid' => ['type' => 'keyword', 'index' => true],
|
'sender_userid' => ['type' => 'keyword', 'index' => true],
|
||||||
'msg_type' => ['type' => 'keyword', 'index' => true],
|
'msg_type' => ['type' => 'keyword', 'index' => true],
|
||||||
'key' => ['type' => 'text', 'index' => true],
|
'key' => ['type' => 'text', 'index' => true],
|
||||||
'bot' => ['type' => 'numeric', 'index' => true],
|
'bot' => ['type' => 'numeric', 'index' => true],
|
||||||
|
|
||||||
// 关联字段
|
// 关联字段
|
||||||
'_join_type' => ['type' => 'keyword', 'index' => true],
|
'_userid_msg_id_' => ['type' => 'keyword', 'index' => true],
|
||||||
'_join_key' => ['type' => 'keyword', 'index' => true],
|
'_userid_dialog_id_' => ['type' => 'keyword', 'index' => true],
|
||||||
]
|
]
|
||||||
];
|
];
|
||||||
|
$result = ZincSearchBase::createIndex(self::$indexName, $mappings);
|
||||||
try {
|
return $result['success'] ?? false;
|
||||||
return ZincSearchBase::createIndex(self::$indexName, $mappings);
|
|
||||||
} catch (\Exception $e) {
|
|
||||||
Log::error('创建聊天系统索引失败: ' . $e->getMessage());
|
|
||||||
return ['success' => false, 'error' => $e->getMessage()];
|
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 清空对话系统索引
|
* 清空所有键值
|
||||||
* 删除索引并重新创建一个空索引
|
|
||||||
*
|
*
|
||||||
* @return bool 是否清空成功
|
* @return bool 是否成功
|
||||||
*/
|
*/
|
||||||
public static function clear(): bool
|
public static function clear(): bool
|
||||||
{
|
{
|
||||||
try {
|
// 检查索引是否存在
|
||||||
// 检查索引是否存在
|
if (!ZincSearchBase::indexExists(self::$indexName)) {
|
||||||
if (!ZincSearchBase::indexExists(self::$indexName)) {
|
return true;
|
||||||
return true; // 索引不存在视为已清空
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// 删除索引
|
// 删除再重建索引
|
||||||
$deleteResult = ZincSearchBase::deleteIndex(self::$indexName);
|
$deleteResult = ZincSearchBase::deleteIndex(self::$indexName);
|
||||||
if (!($deleteResult['success'] ?? false)) {
|
if (!($deleteResult['success'] ?? false)) {
|
||||||
Log::error('清空对话系统索引失败: ' . ($deleteResult['error'] ?? '未知错误'));
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 重新创建索引
|
|
||||||
$createResult = self::generateIndex();
|
|
||||||
return $createResult['success'] ?? false;
|
|
||||||
} catch (\Exception $e) {
|
|
||||||
Log::error('清空对话系统索引异常: ' . $e->getMessage());
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return self::ensureIndex();
|
||||||
}
|
}
|
||||||
|
|
||||||
// ==============================
|
// ==============================
|
||||||
// 搜索相关方法
|
// 搜索方法
|
||||||
// ==============================
|
// ==============================
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -157,217 +139,113 @@ class ZincSearchUserMsg
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ==============================
|
// ==============================
|
||||||
// 会话用户相关方法
|
// 基本方法
|
||||||
// ==============================
|
// ==============================
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 会话用户 - 生成文档ID
|
* 生成文档ID(消息)
|
||||||
|
*
|
||||||
|
* @param WebSocketDialogMsg $dialogMsg
|
||||||
|
* @param WebSocketDialogUser $dialogUser
|
||||||
|
* @return string
|
||||||
|
*/
|
||||||
|
private static function generateUseridMsgId(WebSocketDialogMsg $dialogMsg, WebSocketDialogUser $dialogUser): string
|
||||||
|
{
|
||||||
|
return "{$dialogUser->userid}_{$dialogMsg->id}";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 生成文档ID(会话)
|
||||||
*
|
*
|
||||||
* @param WebSocketDialogUser $dialogUser
|
* @param WebSocketDialogUser $dialogUser
|
||||||
* @return string
|
* @return string
|
||||||
*/
|
*/
|
||||||
public static function generateJoinKeyFromUser(WebSocketDialogUser $dialogUser): string
|
private static function generateUseridDialogId(WebSocketDialogUser $dialogUser): string
|
||||||
{
|
{
|
||||||
return "dialog_{$dialogUser->dialog_id}_user_{$dialogUser->userid}";
|
return "{$dialogUser->userid}_{$dialogUser->dialog_id}";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 会话用户 - 生成文档格式
|
* 生成文档内容
|
||||||
*
|
*
|
||||||
|
* @param WebSocketDialogMsg $dialogMsg
|
||||||
* @param WebSocketDialogUser $dialogUser
|
* @param WebSocketDialogUser $dialogUser
|
||||||
* @return array
|
* @return array
|
||||||
*/
|
*/
|
||||||
public static function generateUserFormat(WebSocketDialogUser $dialogUser): array
|
private static function generateMsgFormat(WebSocketDialogMsg $dialogMsg, WebSocketDialogUser $dialogUser): array
|
||||||
{
|
|
||||||
return [
|
|
||||||
'dialog_id' => $dialogUser->dialog_id,
|
|
||||||
'created_at' => $dialogUser->created_at,
|
|
||||||
'updated_at' => $dialogUser->updated_at,
|
|
||||||
|
|
||||||
'userid' => $dialogUser->userid,
|
|
||||||
'top_at' => $dialogUser->top_at,
|
|
||||||
'last_at' => $dialogUser->last_at,
|
|
||||||
'mark_unread' => $dialogUser->mark_unread ?: 0,
|
|
||||||
'silence' => $dialogUser->silence ?: 0,
|
|
||||||
'hide' => $dialogUser->hide ?: 0,
|
|
||||||
'color' => $dialogUser->color,
|
|
||||||
|
|
||||||
'_join_type' => 'dialog_user',
|
|
||||||
'_join_key' => '' // 用户文档没有父文档
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 会话用户 - 同步到ZincSearch
|
|
||||||
*
|
|
||||||
* @param WebSocketDialogUser $dialogUser
|
|
||||||
* @return void
|
|
||||||
*/
|
|
||||||
public static function syncUser(WebSocketDialogUser $dialogUser): void
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
if (!ZincSearchBase::indexExists(self::$indexName)) {
|
|
||||||
self::generateIndex();
|
|
||||||
}
|
|
||||||
$docFormat = self::generateUserFormat($dialogUser);
|
|
||||||
ZincSearchBase::addDoc(self::$indexName, $docFormat);
|
|
||||||
} catch (\Exception $e) {
|
|
||||||
Log::error('syncUser: ' . $e->getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 批量同步会话用户
|
|
||||||
*
|
|
||||||
* @param WebSocketDialogUser[] $dialogUsers WebSocketDialogUser对象集合
|
|
||||||
* @return int 成功同步的用户数
|
|
||||||
*/
|
|
||||||
public static function batchSyncUsers($dialogUsers): int
|
|
||||||
{
|
|
||||||
$count = 0;
|
|
||||||
try {
|
|
||||||
if (!ZincSearchBase::indexExists(self::$indexName)) {
|
|
||||||
self::generateIndex();
|
|
||||||
}
|
|
||||||
|
|
||||||
$docs = [];
|
|
||||||
foreach ($dialogUsers as $dialogUser) {
|
|
||||||
$docs[] = self::generateUserFormat($dialogUser);
|
|
||||||
$count++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!empty($docs)) {
|
|
||||||
ZincSearchBase::addDocs(self::$indexName, $docs);
|
|
||||||
}
|
|
||||||
} catch (\Exception $e) {
|
|
||||||
Log::error('batchSyncUsers: ' . $e->getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
return $count;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 会话用户 - 从ZincSearch删除
|
|
||||||
*
|
|
||||||
* @param WebSocketDialogUser $dialogUser
|
|
||||||
* @return void
|
|
||||||
*/
|
|
||||||
public static function deleteUser(WebSocketDialogUser $dialogUser): void
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
$docId = self::generateJoinKeyFromUser($dialogUser);
|
|
||||||
|
|
||||||
// 首先查询相关消息
|
|
||||||
$searchParams = [
|
|
||||||
'search_type' => 'term',
|
|
||||||
'query' => [
|
|
||||||
'field' => '_join_key',
|
|
||||||
'term' => $docId
|
|
||||||
],
|
|
||||||
'from' => 0,
|
|
||||||
'max_results' => 1000 // 限制一次查询返回的文档数
|
|
||||||
];
|
|
||||||
|
|
||||||
$result = ZincSearchBase::advancedSearch(self::$indexName, $searchParams);
|
|
||||||
$hits = $result['data']['hits']['hits'] ?? [];
|
|
||||||
|
|
||||||
// 批量删除子文档
|
|
||||||
$batch = [];
|
|
||||||
foreach ($hits as $hit) {
|
|
||||||
if (isset($hit['_id'])) {
|
|
||||||
ZincSearchBase::deleteDoc(self::$indexName, $hit['_id']);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 删除用户文档
|
|
||||||
ZincSearchBase::deleteDoc(self::$indexName, $docId);
|
|
||||||
} catch (\Exception $e) {
|
|
||||||
Log::error('deleteUser: ' . $e->getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ==============================
|
|
||||||
// 会话消息相关方法
|
|
||||||
// ==============================
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 会话消息 - 生成文档ID
|
|
||||||
*
|
|
||||||
* @param WebSocketDialogMsg $dialogMsg
|
|
||||||
* @param string $userid
|
|
||||||
* @return string
|
|
||||||
*/
|
|
||||||
public static function generateJoinKeyFromMsg(WebSocketDialogMsg $dialogMsg, string $userid): string
|
|
||||||
{
|
|
||||||
return "dialog_{$dialogMsg->dialog_id}_user_{$userid}";
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 会话消息 - 生成文档格式
|
|
||||||
*
|
|
||||||
* @param WebSocketDialogMsg $dialogMsg
|
|
||||||
* @param string $userid
|
|
||||||
* @return array
|
|
||||||
*/
|
|
||||||
public static function generateMsgFormat(WebSocketDialogMsg $dialogMsg, string $userid): array
|
|
||||||
{
|
{
|
||||||
return [
|
return [
|
||||||
'dialog_id' => $dialogMsg->dialog_id,
|
'dialog_id' => $dialogMsg->dialog_id,
|
||||||
'created_at' => $dialogMsg->created_at,
|
'created_at' => $dialogMsg->created_at,
|
||||||
'updated_at' => $dialogMsg->updated_at,
|
'updated_at' => $dialogMsg->updated_at,
|
||||||
|
|
||||||
|
'userid' => $dialogUser->userid,
|
||||||
|
'top_at' => $dialogUser->top_at,
|
||||||
|
'last_at' => $dialogUser->last_at,
|
||||||
|
'mark_unread' => $dialogUser->mark_unread ? 1 : 0,
|
||||||
|
'silence' => $dialogUser->silence ? 1 : 0,
|
||||||
|
'hide' => $dialogUser->hide ? 1 : 0,
|
||||||
|
'color' => $dialogUser->color,
|
||||||
|
|
||||||
'msg_id' => $dialogMsg->id,
|
'msg_id' => $dialogMsg->id,
|
||||||
'sender_userid' => $dialogMsg->userid,
|
'sender_userid' => $dialogMsg->userid,
|
||||||
'msg_type' => $dialogMsg->type,
|
'msg_type' => $dialogMsg->type,
|
||||||
'key' => $dialogMsg->key,
|
'key' => $dialogMsg->key,
|
||||||
'bot' => $dialogMsg->bot ? 1 : 0,
|
'bot' => $dialogMsg->bot ? 1 : 0,
|
||||||
|
|
||||||
'_join_type' => 'dialog_msg',
|
'_userid_msg_id_' => self::generateUseridMsgId($dialogMsg, $dialogUser),
|
||||||
'_join_key' => self::generateJoinKeyFromMsg($dialogMsg, $userid)
|
'_userid_dialog_id_' => self::generateUseridDialogId($dialogUser),
|
||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 会话消息 - 同步到ZincSearch
|
* 同步消息
|
||||||
*
|
*
|
||||||
* @param WebSocketDialogMsg $dialogMsg
|
* @param WebSocketDialogMsg $dialogMsg
|
||||||
* @return void
|
* @return bool
|
||||||
*/
|
*/
|
||||||
public static function syncMsg(WebSocketDialogMsg $dialogMsg): void
|
public static function syncMsg(WebSocketDialogMsg $dialogMsg): bool
|
||||||
{
|
{
|
||||||
|
if (!self::ensureIndex()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 获取此会话的所有用户
|
// 获取此会话的所有用户
|
||||||
$dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get();
|
$dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get();
|
||||||
|
|
||||||
if ($dialogUsers->isEmpty()) {
|
if ($dialogUsers->isEmpty()) {
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
$docs = [];
|
$docs = [];
|
||||||
foreach ($dialogUsers as $dialogUser) {
|
foreach ($dialogUsers as $dialogUser) {
|
||||||
$docFormat = self::generateMsgFormat($dialogMsg, $dialogUser->userid);
|
$docs[] = self::generateMsgFormat($dialogMsg, $dialogUser);
|
||||||
$docs[] = $docFormat;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!empty($docs)) {
|
if (!empty($docs)) {
|
||||||
if (!ZincSearchBase::indexExists(self::$indexName)) {
|
|
||||||
self::generateIndex();
|
|
||||||
}
|
|
||||||
ZincSearchBase::addDocs(self::$indexName, $docs);
|
ZincSearchBase::addDocs(self::$indexName, $docs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
} catch (\Exception $e) {
|
} catch (\Exception $e) {
|
||||||
Log::error('syncMsg: ' . $e->getMessage());
|
Log::error('syncMsg: ' . $e->getMessage());
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 批量同步会话消息
|
* 批量同步消息
|
||||||
*
|
*
|
||||||
* @param WebSocketDialogMsg[] $dialogMsgs WebSocketDialogMsg对象集合
|
* @param WebSocketDialogMsg[] $dialogMsgs
|
||||||
* @return int 成功同步的消息数
|
* @return int 成功同步的消息数
|
||||||
*/
|
*/
|
||||||
public static function batchSyncMsgs($dialogMsgs): int
|
public static function batchSyncMsgs($dialogMsgs): int
|
||||||
{
|
{
|
||||||
|
if (!self::ensureIndex()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
$count = 0;
|
$count = 0;
|
||||||
try {
|
try {
|
||||||
$docs = [];
|
$docs = [];
|
||||||
@ -394,8 +272,7 @@ class ZincSearchUserMsg
|
|||||||
foreach ($dialogMsgs as $dialogMsg) {
|
foreach ($dialogMsgs as $dialogMsg) {
|
||||||
if (isset($userDialogs[$dialogMsg->dialog_id])) {
|
if (isset($userDialogs[$dialogMsg->dialog_id])) {
|
||||||
foreach ($userDialogs[$dialogMsg->dialog_id] as $dialogUser) {
|
foreach ($userDialogs[$dialogMsg->dialog_id] as $dialogUser) {
|
||||||
$docFormat = self::generateMsgFormat($dialogMsg, $dialogUser->userid);
|
$docs[] = self::generateMsgFormat($dialogMsg, $dialogUser);
|
||||||
$docs[] = $docFormat;
|
|
||||||
$count++;
|
$count++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -403,9 +280,6 @@ class ZincSearchUserMsg
|
|||||||
|
|
||||||
// 批量写入
|
// 批量写入
|
||||||
if (!empty($docs)) {
|
if (!empty($docs)) {
|
||||||
if (!ZincSearchBase::indexExists(self::$indexName)) {
|
|
||||||
self::generateIndex();
|
|
||||||
}
|
|
||||||
ZincSearchBase::addDocs(self::$indexName, $docs);
|
ZincSearchBase::addDocs(self::$indexName, $docs);
|
||||||
}
|
}
|
||||||
} catch (\Exception $e) {
|
} catch (\Exception $e) {
|
||||||
@ -416,21 +290,21 @@ class ZincSearchUserMsg
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 会话消息 - 从ZincSearch删除
|
* 删除消息
|
||||||
*
|
*
|
||||||
* @param WebSocketDialogMsg $dialogMsg
|
* @param WebSocketDialogMsg $dialogMsg
|
||||||
* @return void
|
* @return int
|
||||||
*/
|
*/
|
||||||
public static function deleteMsg(WebSocketDialogMsg $dialogMsg): void
|
public static function deleteMsg(WebSocketDialogMsg $dialogMsg): int
|
||||||
{
|
{
|
||||||
try {
|
$batchSize = 1000; // 每批处理的文档数量
|
||||||
$batchSize = 1000; // 每批处理的文档数量
|
$totalDeleted = 0; // 总共删除的文档数量
|
||||||
$from = 0;
|
$from = 0;
|
||||||
$totalDeleted = 0;
|
|
||||||
|
|
||||||
|
try {
|
||||||
while (true) {
|
while (true) {
|
||||||
// 根据消息ID查找相关文档,使用分页
|
// 根据消息ID查找相关文档
|
||||||
$searchParams = [
|
$result = ZincSearchBase::advancedSearch(self::$indexName, [
|
||||||
'search_type' => 'term',
|
'search_type' => 'term',
|
||||||
'query' => [
|
'query' => [
|
||||||
'field' => 'msg_id',
|
'field' => 'msg_id',
|
||||||
@ -438,9 +312,7 @@ class ZincSearchUserMsg
|
|||||||
],
|
],
|
||||||
'from' => $from,
|
'from' => $from,
|
||||||
'max_results' => $batchSize
|
'max_results' => $batchSize
|
||||||
];
|
]);
|
||||||
|
|
||||||
$result = ZincSearchBase::advancedSearch(self::$indexName, $searchParams);
|
|
||||||
$hits = $result['data']['hits']['hits'] ?? [];
|
$hits = $result['data']['hits']['hits'] ?? [];
|
||||||
|
|
||||||
// 如果没有更多文档,退出循环
|
// 如果没有更多文档,退出循环
|
||||||
@ -464,10 +336,104 @@ class ZincSearchUserMsg
|
|||||||
// 移动到下一批
|
// 移动到下一批
|
||||||
$from += $batchSize;
|
$from += $batchSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
Log::info("deleteMsg: 已删除消息ID {$dialogMsg->id} 的 {$totalDeleted} 条相关文档");
|
|
||||||
} catch (\Exception $e) {
|
} catch (\Exception $e) {
|
||||||
Log::error('deleteMsg: ' . $e->getMessage());
|
Log::error('deleteMsg: ' . $e->getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return $totalDeleted;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==============================
|
||||||
|
// 用户方法
|
||||||
|
// ==============================
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 同步用户
|
||||||
|
*
|
||||||
|
* @param WebSocketDialogUser $dialogUser
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public static function syncUser(WebSocketDialogUser $dialogUser): void
|
||||||
|
{
|
||||||
|
$batchSize = 1000; // 每批处理的文档数量
|
||||||
|
do {
|
||||||
|
$dialogMsgs = WebSocketDialogMsg::whereDialogId($dialogUser->dialog_id)
|
||||||
|
->orderBy('id')
|
||||||
|
->limit($batchSize)
|
||||||
|
->get();
|
||||||
|
|
||||||
|
if ($dialogMsgs->isEmpty()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
ZincSearchUserMsg::batchSyncMsgs($dialogMsgs);
|
||||||
|
} while (count($dialogMsgs) == $batchSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 批量同步用户
|
||||||
|
*
|
||||||
|
* @param WebSocketDialogUser[] $dialogUsers
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public static function batchSyncUsers($dialogUsers): void
|
||||||
|
{
|
||||||
|
foreach ($dialogUsers as $dialogUser) {
|
||||||
|
self::syncUser($dialogUser);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除用户
|
||||||
|
*
|
||||||
|
* @param WebSocketDialogUser $dialogUser
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
|
public static function deleteUser(WebSocketDialogUser $dialogUser): int
|
||||||
|
{
|
||||||
|
$batchSize = 1000; // 每批处理的文档数量
|
||||||
|
$totalDeleted = 0; // 总共删除的文档数量
|
||||||
|
$from = 0;
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
// 根据消息ID查找相关文档
|
||||||
|
$result = ZincSearchBase::advancedSearch(self::$indexName, [
|
||||||
|
'search_type' => 'term',
|
||||||
|
'query' => [
|
||||||
|
'field' => '_userid_dialog_id_',
|
||||||
|
'term' => self::generateUseridDialogId($dialogUser),
|
||||||
|
],
|
||||||
|
'from' => $from,
|
||||||
|
'max_results' => $batchSize
|
||||||
|
]);
|
||||||
|
$hits = $result['data']['hits']['hits'] ?? [];
|
||||||
|
|
||||||
|
// 如果没有更多文档,退出循环
|
||||||
|
if (empty($hits)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 删除本批次找到的所有文档
|
||||||
|
foreach ($hits as $hit) {
|
||||||
|
if (isset($hit['_id'])) {
|
||||||
|
ZincSearchBase::deleteDoc(self::$indexName, $hit['_id']);
|
||||||
|
$totalDeleted++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果返回的文档数少于批次大小,说明已经没有更多文档了
|
||||||
|
if (count($hits) < $batchSize) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 移动到下一批
|
||||||
|
$from += $batchSize;
|
||||||
|
}
|
||||||
|
} catch (\Exception $e) {
|
||||||
|
Log::error('deleteUser: ' . $e->getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
return $totalDeleted;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user