mirror of
https://github.com/kuaifan/dootask.git
synced 2025-12-12 11:19:56 +00:00
perf: 优化全文搜索
This commit is contained in:
parent
679c2070c1
commit
f61e7caf2b
@ -4,10 +4,9 @@ namespace App\Console\Commands;
|
||||
|
||||
use App\Models\WebSocketDialogMsg;
|
||||
use App\Models\WebSocketDialogUser;
|
||||
use App\Module\ElasticSearch\ElasticSearchKeyValue;
|
||||
use App\Module\ElasticSearch\ElasticSearchUserMsg;
|
||||
use App\Module\ZincSearch\ZincSearchKeyValue;
|
||||
use App\Module\ZincSearch\ZincSearchUserMsg;
|
||||
use Illuminate\Console\Command;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
|
||||
class SyncDialogUserMsgToZincSearch extends Command
|
||||
{
|
||||
@ -22,7 +21,6 @@ class SyncDialogUserMsgToZincSearch extends Command
|
||||
|
||||
protected $signature = 'zinc:sync-dialog-user-msg {--f} {--i} {--c} {--batch=500}';
|
||||
protected $description = '同步聊天会话用户和消息到 ZincSearch';
|
||||
protected $client = null;
|
||||
|
||||
/**
|
||||
* SyncDialogUserMsgToElasticsearch constructor.
|
||||
@ -30,12 +28,6 @@ class SyncDialogUserMsgToZincSearch extends Command
|
||||
public function __construct()
|
||||
{
|
||||
parent::__construct();
|
||||
try {
|
||||
$this->es = new ElasticSearchUserMsg();
|
||||
} catch (\Exception $e) {
|
||||
$this->error('Elasticsearch连接失败: ' . $e->getMessage());
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -44,37 +36,16 @@ class SyncDialogUserMsgToZincSearch extends Command
|
||||
*/
|
||||
public function handle()
|
||||
{
|
||||
$this->info('开始同步聊天数据...');
|
||||
|
||||
// 清除索引
|
||||
if ($this->option('c')) {
|
||||
$this->info('清除索引...');
|
||||
if (!$this->es->indexExists()) {
|
||||
$this->saveLastId(true);
|
||||
$this->info('索引不存在');
|
||||
return 0;
|
||||
}
|
||||
$result = $this->es->deleteIndex();
|
||||
if (isset($result['error'])) {
|
||||
$this->error('删除索引失败: ' . $result['error']);
|
||||
return 1;
|
||||
}
|
||||
$this->saveLastId(true);
|
||||
$this->info('索引删除成功');
|
||||
ZincSearchUserMsg::clear();
|
||||
ZincSearchKeyValue::clear();
|
||||
$this->info("索引删除成功");
|
||||
return 0;
|
||||
}
|
||||
|
||||
// 判断创建索引
|
||||
if (!$this->es->indexExists()) {
|
||||
$this->info('创建索引...');
|
||||
$result = ElasticSearchUserMsg::generateIndex();
|
||||
if (isset($result['error'])) {
|
||||
$this->error('创建索引失败: ' . $result['error']);
|
||||
return 1;
|
||||
}
|
||||
$this->saveLastId(true);
|
||||
$this->info('索引创建成功');
|
||||
}
|
||||
$this->info('开始同步聊天数据...');
|
||||
|
||||
// 同步用户-会话数据
|
||||
$this->syncDialogUsers($this->option('batch'));
|
||||
@ -89,18 +60,12 @@ class SyncDialogUserMsgToZincSearch extends Command
|
||||
|
||||
/**
|
||||
* 保存最后一个ID
|
||||
* @param string|true $type
|
||||
* @param integer $lastId
|
||||
* @param string $type
|
||||
* @param int $lastId
|
||||
*/
|
||||
private function saveLastId($type, $lastId = 0)
|
||||
private function saveLastId(string $type, int $lastId = 0): void
|
||||
{
|
||||
if ($type === true) {
|
||||
$setting = [];
|
||||
} else {
|
||||
$setting = ElasticSearchKeyValue::getArray('elasticSearch:sync');
|
||||
$setting[$type] = $lastId;
|
||||
}
|
||||
ElasticSearchKeyValue::save('elasticSearch:sync', $setting);
|
||||
ZincSearchKeyValue::set("sync", ["{$type}" => $lastId], true);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -108,11 +73,11 @@ class SyncDialogUserMsgToZincSearch extends Command
|
||||
* @param $type
|
||||
* @return int
|
||||
*/
|
||||
private function getLastId($type)
|
||||
private function getLastId($type): int
|
||||
{
|
||||
if ($this->option('i')) {
|
||||
$setting = ElasticSearchKeyValue::getArray('elasticSearch:sync');
|
||||
return intval($setting[$type] ?? 0);
|
||||
if ($this->option("i")) {
|
||||
$array = ZincSearchKeyValue::getArray("sync");
|
||||
return intval($array[$type] ?? 0);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
@ -146,24 +111,7 @@ class SyncDialogUserMsgToZincSearch extends Command
|
||||
$this->info("{$num}/{$count} ({$progress}%) 正在同步用户ID {$lastId} ~ {$dialogUsers->last()->id}");
|
||||
|
||||
// 批量索引数据
|
||||
$params = ['body' => []];
|
||||
foreach ($dialogUsers as $dialogUser) {
|
||||
$params['body'][] = [
|
||||
'index' => [
|
||||
'_index' => ElasticSearchUserMsg::indexName(),
|
||||
'_id' => ElasticSearchUserMsg::generateUserDicId($dialogUser),
|
||||
]
|
||||
];
|
||||
$params['body'][] = ElasticSearchUserMsg::generateUserFormat($dialogUser);
|
||||
}
|
||||
|
||||
if ($params['body']) {
|
||||
$result = $this->es->bulk($params);
|
||||
if (isset($result['errors']) && $result['errors']) {
|
||||
$this->error('批量索引用户数据部分失败');
|
||||
Log::error('Elasticsearch批量索引失败: ' . json_encode($result['items']));
|
||||
}
|
||||
}
|
||||
ZincSearchUserMsg::batchSyncUsers($dialogUsers);
|
||||
|
||||
$lastId = $dialogUsers->last()->id;
|
||||
$this->saveLastId('dialog_user', $lastId);
|
||||
@ -198,52 +146,8 @@ class SyncDialogUserMsgToZincSearch extends Command
|
||||
$progress = round($num / $count * 100, 2);
|
||||
$this->info("{$num}/{$count} ({$progress}%) 正在同步消息ID {$lastId} ~ {$dialogMsgs->last()->id}");
|
||||
|
||||
// 获取这些消息所属的会话对应的所有用户
|
||||
$dialogIds = $dialogMsgs->pluck('dialog_id')->unique()->toArray();
|
||||
$userDialogMap = [];
|
||||
|
||||
if (!empty($dialogIds)) {
|
||||
$dialogUsers = WebSocketDialogUser::whereIn('dialog_id', $dialogIds)->get();
|
||||
|
||||
foreach ($dialogUsers as $dialogUser) {
|
||||
$userDialogMap[$dialogUser->dialog_id][] = $dialogUser->userid;
|
||||
}
|
||||
}
|
||||
|
||||
// 批量索引消息数据
|
||||
$params = ['body' => []];
|
||||
foreach ($dialogMsgs as $dialogMsg) {
|
||||
// 如果该会话没有用户,跳过
|
||||
if (empty($userDialogMap[$dialogMsg->dialog_id])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 为每个用户-会话关系创建子文档
|
||||
foreach ($userDialogMap[$dialogMsg->dialog_id] as $userid) {
|
||||
$params['body'][] = [
|
||||
'index' => [
|
||||
'_index' => ElasticSearchUserMsg::indexName(),
|
||||
'_id' => ElasticSearchUserMsg::generateMsgDicId($dialogMsg, $userid),
|
||||
'routing' => ElasticSearchUserMsg::generateMsgParentId($dialogMsg, $userid) // 路由到父文档
|
||||
]
|
||||
];
|
||||
|
||||
$params['body'][] = ElasticSearchUserMsg::generateMsgFormat($dialogMsg, $userid);
|
||||
}
|
||||
}
|
||||
|
||||
if (!empty($params['body'])) {
|
||||
// 分批处理
|
||||
$chunks = array_chunk($params['body'], 1000);
|
||||
foreach ($chunks as $chunk) {
|
||||
$chunkParams = ['body' => $chunk];
|
||||
$result = $this->es->bulk($chunkParams);
|
||||
if (isset($result['errors']) && $result['errors']) {
|
||||
$this->error('批量索引消息数据部分失败');
|
||||
Log::error('Elasticsearch批量索引失败: ' . json_encode($result['items']));
|
||||
}
|
||||
}
|
||||
}
|
||||
// 批量索引数据
|
||||
ZincSearchUserMsg::batchSyncMsgs($dialogMsgs);
|
||||
|
||||
$lastId = $dialogMsgs->last()->id;
|
||||
$this->saveLastId('dialog_msg', $lastId);
|
||||
|
||||
@ -1,11 +1,11 @@
|
||||
<?php
|
||||
|
||||
namespace App\Module;
|
||||
namespace App\Module\ZincSearch;
|
||||
|
||||
/**
|
||||
* ZincSearch 公共类
|
||||
*/
|
||||
class ZincSearch
|
||||
class ZincSearchBase
|
||||
{
|
||||
private mixed $host;
|
||||
private mixed $port;
|
||||
@ -97,6 +97,15 @@ class ZincSearch
|
||||
return (new self())->request("/api/index/{$index}", null, 'GET');
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断索引是否存在
|
||||
*/
|
||||
public static function indexExists($index): bool
|
||||
{
|
||||
$result = self::getIndex($index);
|
||||
return $result['success'] && isset($result['data']['name']);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有索引
|
||||
*/
|
||||
@ -113,6 +122,41 @@ class ZincSearch
|
||||
return (new self())->request("/api/index/{$index}", null, 'DELETE');
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除所有索引
|
||||
*/
|
||||
public static function deleteAllIndices(): array
|
||||
{
|
||||
$instance = new self();
|
||||
$result = $instance->request("/api/index", null, 'GET');
|
||||
|
||||
if (!$result['success']) {
|
||||
return $result;
|
||||
}
|
||||
|
||||
$indices = $result['data'] ?? [];
|
||||
$deleteResults = [];
|
||||
$success = true;
|
||||
|
||||
foreach ($indices as $index) {
|
||||
$indexName = $index['name'] ?? '';
|
||||
if (!empty($indexName)) {
|
||||
$deleteResult = $instance->request("/api/index/{$indexName}", null, 'DELETE');
|
||||
$deleteResults[$indexName] = $deleteResult;
|
||||
|
||||
if (!$deleteResult['success']) {
|
||||
$success = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return [
|
||||
'success' => $success,
|
||||
'message' => $success ? '所有索引删除成功' : '部分索引删除失败',
|
||||
'details' => $deleteResults
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* 分析文本
|
||||
*/
|
||||
295
app/Module/ZincSearch/ZincSearchKeyValue.php
Normal file
295
app/Module/ZincSearch/ZincSearchKeyValue.php
Normal file
@ -0,0 +1,295 @@
|
||||
<?php
|
||||
|
||||
namespace App\Module\ZincSearch;
|
||||
|
||||
use App\Module\Base;
|
||||
|
||||
/**
|
||||
* ZincSearch 键值存储类
|
||||
* 基于ZincSearch实现简单的键值存储功能
|
||||
*
|
||||
* 使用方法:
|
||||
*
|
||||
* 1. 基本操作
|
||||
* - 设置键值: ZincSearchValue::set('site_name', '我的网站');
|
||||
* - 设置复杂数据: ZincSearchValue::set('site_config', ['logo' => 'logo.png', 'theme' => 'dark']);
|
||||
* - 合并现有数据: ZincSearchValue::set('site_config', ['footer' => '版权所有'], true);
|
||||
* - 获取键值: $siteName = ZincSearchValue::get('site_name');
|
||||
* - 获取键值带默认值: $theme = ZincSearchValue::get('theme', 'light');
|
||||
* - 删除键值: ZincSearchValue::delete('temporary_data');
|
||||
*
|
||||
* 2. 批量操作
|
||||
* - 批量设置: ZincSearchValue::batchSet(['user_count' => 100, 'active_users' => 50]);
|
||||
* - 批量获取: $stats = ZincSearchValue::batchGet(['user_count', 'active_users']);
|
||||
*
|
||||
* 3. 其他操作
|
||||
* - 清空所有数据: ZincSearchValue::clear();
|
||||
*/
|
||||
class ZincSearchKeyValue
|
||||
{
|
||||
/**
|
||||
* 索引名称
|
||||
*/
|
||||
protected static string $indexName = 'keyValue';
|
||||
|
||||
// ==============================
|
||||
// 基础方法
|
||||
// ==============================
|
||||
|
||||
/**
|
||||
* 确保索引存在
|
||||
*/
|
||||
private static function ensureIndex(): bool
|
||||
{
|
||||
if (!ZincSearchBase::indexExists(self::$indexName)) {
|
||||
$mappings = [
|
||||
'properties' => [
|
||||
'key' => [
|
||||
'type' => 'keyword',
|
||||
'index' => true
|
||||
],
|
||||
'value' => [
|
||||
'type' => 'text',
|
||||
'index' => true
|
||||
],
|
||||
'created_at' => [
|
||||
'type' => 'date',
|
||||
'index' => true
|
||||
],
|
||||
'updated_at' => [
|
||||
'type' => 'date',
|
||||
'index' => true
|
||||
]
|
||||
]
|
||||
];
|
||||
$result = ZincSearchBase::createIndex(self::$indexName, $mappings);
|
||||
return $result['success'] ?? false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// ==============================
|
||||
// 基本操作
|
||||
// ==============================
|
||||
|
||||
/**
|
||||
* 设置键值
|
||||
*
|
||||
* @param string $key 键名
|
||||
* @param mixed $value 值
|
||||
* @param bool $merge 是否合并现有数据(如果值是数组)
|
||||
* @return bool 是否成功
|
||||
*/
|
||||
public static function set(string $key, mixed $value, bool $merge = false): bool
|
||||
{
|
||||
if (!self::ensureIndex()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 检查键是否已存在
|
||||
if ($merge && is_array($value)) {
|
||||
$existingData = self::get($key);
|
||||
if (is_array($existingData)) {
|
||||
$value = array_merge($existingData, $value);
|
||||
}
|
||||
}
|
||||
|
||||
// 检查是否存在相同键的文档
|
||||
$searchResult = ZincSearchBase::search(self::$indexName, $key, 0, 1);
|
||||
$docs = $searchResult['data']['hits']['hits'] ?? [];
|
||||
$now = date('Y-m-d H:i:s');
|
||||
|
||||
if (!empty($docs)) {
|
||||
$docId = $docs[0]['_id'] ?? null;
|
||||
if ($docId) {
|
||||
// 更新现有文档
|
||||
$docData = [
|
||||
'key' => $key,
|
||||
'value' => $value,
|
||||
'updated_at' => $now
|
||||
];
|
||||
$updateResult = ZincSearchBase::updateDoc(self::$indexName, $docId, $docData);
|
||||
return $updateResult['success'] ?? false;
|
||||
}
|
||||
}
|
||||
|
||||
// 创建新文档
|
||||
$docData = [
|
||||
'key' => $key,
|
||||
'value' => $value,
|
||||
'created_at' => $now,
|
||||
'updated_at' => $now
|
||||
];
|
||||
$addResult = ZincSearchBase::addDoc(self::$indexName, $docData);
|
||||
return $addResult['success'] ?? false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取键值
|
||||
*
|
||||
* @param string $key 键名
|
||||
* @param mixed $default 默认值
|
||||
* @return mixed 值或默认值
|
||||
*/
|
||||
public static function get(string $key, mixed $default = null): mixed
|
||||
{
|
||||
if (!self::ensureIndex() || empty($key)) {
|
||||
return $default;
|
||||
}
|
||||
|
||||
// 精确匹配键名
|
||||
$searchParams = [
|
||||
'search_type' => 'term',
|
||||
'query' => [
|
||||
'field' => 'key',
|
||||
'term' => $key
|
||||
],
|
||||
'from' => 0,
|
||||
'max_results' => 1
|
||||
];
|
||||
|
||||
$result = ZincSearchBase::advancedSearch(self::$indexName, $searchParams);
|
||||
if (!($result['success'] ?? false)) {
|
||||
return $default;
|
||||
}
|
||||
|
||||
$hits = $result['data']['hits']['hits'] ?? [];
|
||||
if (empty($hits)) {
|
||||
return $default;
|
||||
}
|
||||
|
||||
return $hits[0]['_source']['value'] ?? $default;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取键值,返回数组
|
||||
* @param string $key 键名
|
||||
* @param array $default 默认值,当键不存在时返回
|
||||
* @return array
|
||||
*/
|
||||
public static function getArray(string $key, array $default = []): array
|
||||
{
|
||||
return Base::string2array(self::get($key, $default));
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除键值
|
||||
*
|
||||
* @param string $key 键名
|
||||
* @return bool 是否成功
|
||||
*/
|
||||
public static function delete(string $key): bool
|
||||
{
|
||||
if (!self::ensureIndex() || empty($key)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 查找文档ID
|
||||
$searchParams = [
|
||||
'search_type' => 'term',
|
||||
'query' => [
|
||||
'field' => 'key',
|
||||
'term' => $key
|
||||
],
|
||||
'from' => 0,
|
||||
'max_results' => 1
|
||||
];
|
||||
|
||||
$result = ZincSearchBase::advancedSearch(self::$indexName, $searchParams);
|
||||
if (!($result['success'] ?? false)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$hits = $result['data']['hits']['hits'] ?? [];
|
||||
if (empty($hits)) {
|
||||
return true; // 不存在视为删除成功
|
||||
}
|
||||
|
||||
$docId = $hits[0]['_id'] ?? null;
|
||||
if (empty($docId)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$deleteResult = ZincSearchBase::deleteDoc(self::$indexName, $docId);
|
||||
return $deleteResult['success'] ?? false;
|
||||
}
|
||||
|
||||
// ==============================
|
||||
// 批量操作
|
||||
// ==============================
|
||||
|
||||
/**
|
||||
* 批量设置键值对
|
||||
*
|
||||
* @param array $keyValues 键值对数组
|
||||
* @return bool 是否全部成功
|
||||
*/
|
||||
public static function batchSet(array $keyValues): bool
|
||||
{
|
||||
if (!self::ensureIndex() || empty($keyValues)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$docs = [];
|
||||
$now = date('Y-m-d H:i:s');
|
||||
|
||||
foreach ($keyValues as $key => $value) {
|
||||
$docs[] = [
|
||||
'key' => $key,
|
||||
'value' => $value,
|
||||
'created_at' => $now,
|
||||
'updated_at' => $now
|
||||
];
|
||||
}
|
||||
|
||||
$result = ZincSearchBase::addDocs(self::$indexName, $docs);
|
||||
return $result['success'] ?? false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量获取键值
|
||||
*
|
||||
* @param array $keys 键名数组
|
||||
* @return array 键值对数组
|
||||
*/
|
||||
public static function batchGet(array $keys): array
|
||||
{
|
||||
if (!self::ensureIndex() || empty($keys)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
$results = [];
|
||||
|
||||
// 遍历查询每个键
|
||||
foreach ($keys as $key) {
|
||||
$results[$key] = self::get($key);
|
||||
}
|
||||
|
||||
return $results;
|
||||
}
|
||||
|
||||
// ==============================
|
||||
// 其他操作
|
||||
// ==============================
|
||||
|
||||
/**
|
||||
* 清空所有键值
|
||||
*
|
||||
* @return bool 是否成功
|
||||
*/
|
||||
public static function clear(): bool
|
||||
{
|
||||
// 检查索引是否存在
|
||||
if (!ZincSearchBase::indexExists(self::$indexName)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// 删除再重建索引
|
||||
$deleteResult = ZincSearchBase::deleteIndex(self::$indexName);
|
||||
if (!($deleteResult['success'] ?? false)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return self::ensureIndex();
|
||||
}
|
||||
}
|
||||
459
app/Module/ZincSearch/ZincSearchUserMsg.php
Normal file
459
app/Module/ZincSearch/ZincSearchUserMsg.php
Normal file
@ -0,0 +1,459 @@
|
||||
<?php
|
||||
|
||||
namespace App\Module\ZincSearch;
|
||||
|
||||
use App\Models\WebSocketDialogMsg;
|
||||
use App\Models\WebSocketDialogUser;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
|
||||
/**
|
||||
* 对话系统消息索引
|
||||
*
|
||||
* 使用方法:
|
||||
*
|
||||
* 1. 索引管理
|
||||
* - 创建索引: ZincSearchUserMsg::generateIndex();
|
||||
* - 检查索引: ZincSearchBase::indexExists(ZincSearchUserMsg::$indexName);
|
||||
* - 清空索引: ZincSearchUserMsg::clear();
|
||||
*
|
||||
* 2. 会话用户操作
|
||||
* - 单个同步: ZincSearchUserMsg::syncUser($dialogUser);
|
||||
* - 批量同步: ZincSearchUserMsg::batchSyncUsers($dialogUsers);
|
||||
* - 删除用户: ZincSearchUserMsg::deleteUser($dialogUser);
|
||||
*
|
||||
* 3. 会话消息操作
|
||||
* - 单个同步: ZincSearchUserMsg::syncMsg($dialogMsg);
|
||||
* - 批量同步: ZincSearchUserMsg::batchSyncMsgs($dialogMsgs);
|
||||
* - 删除消息: ZincSearchUserMsg::deleteMsg($dialogMsg);
|
||||
*
|
||||
* 4. 搜索功能
|
||||
* - 关键词搜索: ZincSearchUserMsg::searchByKeyword('用户ID', '关键词');
|
||||
*
|
||||
* Class ZincSearchUserMsg
|
||||
* @package App\Module\ZincSearch
|
||||
*/
|
||||
class ZincSearchUserMsg
|
||||
{
|
||||
/**
|
||||
* 索引名称
|
||||
*/
|
||||
protected static string $indexName = 'userMsg';
|
||||
|
||||
// ==============================
|
||||
// 索引管理相关方法
|
||||
// ==============================
|
||||
|
||||
/**
|
||||
* 创建聊天系统索引
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public static function generateIndex(): array
|
||||
{
|
||||
// 定义映射
|
||||
$mappings = [
|
||||
'properties' => [
|
||||
// 共用字段
|
||||
'dialog_id' => ['type' => 'keyword', 'index' => true],
|
||||
'created_at' => ['type' => 'date', 'index' => true],
|
||||
'updated_at' => ['type' => 'date', 'index' => true],
|
||||
|
||||
// dialog_users 字段
|
||||
'userid' => ['type' => 'keyword', 'index' => true],
|
||||
'top_at' => ['type' => 'date', 'index' => true],
|
||||
'last_at' => ['type' => 'date', 'index' => true],
|
||||
'mark_unread' => ['type' => 'numeric', 'index' => true],
|
||||
'silence' => ['type' => 'numeric', 'index' => true],
|
||||
'hide' => ['type' => 'numeric', 'index' => true],
|
||||
'color' => ['type' => 'keyword', 'index' => true],
|
||||
|
||||
// dialog_msgs 字段
|
||||
'msg_id' => ['type' => 'keyword', 'index' => true],
|
||||
'sender_userid' => ['type' => 'keyword', 'index' => true],
|
||||
'msg_type' => ['type' => 'keyword', 'index' => true],
|
||||
'key' => ['type' => 'text', 'index' => true],
|
||||
'bot' => ['type' => 'numeric', 'index' => true],
|
||||
|
||||
// 关联字段 - ZincSearch不支持父子文档,使用引用字段代替
|
||||
'parent_id' => ['type' => 'keyword', 'index' => true],
|
||||
'doc_type' => ['type' => 'keyword', 'index' => true] // dialog_user 或 dialog_msg
|
||||
]
|
||||
];
|
||||
|
||||
try {
|
||||
return ZincSearchBase::createIndex(self::$indexName, $mappings);
|
||||
} catch (\Exception $e) {
|
||||
Log::error('创建聊天系统索引失败: ' . $e->getMessage());
|
||||
return ['success' => false, 'error' => $e->getMessage()];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 清空对话系统索引
|
||||
* 删除索引并重新创建一个空索引
|
||||
*
|
||||
* @return bool 是否清空成功
|
||||
*/
|
||||
public static function clear(): bool
|
||||
{
|
||||
try {
|
||||
// 检查索引是否存在
|
||||
if (!ZincSearchBase::indexExists(self::$indexName)) {
|
||||
return true; // 索引不存在视为已清空
|
||||
}
|
||||
|
||||
// 删除索引
|
||||
$deleteResult = ZincSearchBase::deleteIndex(self::$indexName);
|
||||
if (!($deleteResult['success'] ?? false)) {
|
||||
Log::error('清空对话系统索引失败: ' . ($deleteResult['error'] ?? '未知错误'));
|
||||
return false;
|
||||
}
|
||||
|
||||
// 重新创建索引
|
||||
$createResult = self::generateIndex();
|
||||
return $createResult['success'] ?? false;
|
||||
} catch (\Exception $e) {
|
||||
Log::error('清空对话系统索引异常: ' . $e->getMessage());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// ==============================
|
||||
// 搜索相关方法
|
||||
// ==============================
|
||||
|
||||
/**
|
||||
* 构建对话系统特定的搜索 - 根据用户ID和消息关键词搜索会话
|
||||
*
|
||||
* @param string $userid 用户ID
|
||||
* @param string $keyword 消息关键词
|
||||
* @param int $from 起始位置
|
||||
* @param int $size 返回结果数量
|
||||
* @return array
|
||||
*/
|
||||
public static function searchByKeyword(string $userid, string $keyword, int $from = 0, int $size = 20): array
|
||||
{
|
||||
// 构建复杂的搜索查询
|
||||
$searchParams = [
|
||||
'search_type' => 'querystring',
|
||||
'query' => [
|
||||
'term' => "+userid:{$userid} +key:*{$keyword}*"
|
||||
],
|
||||
'from' => $from,
|
||||
'max_results' => $size,
|
||||
'sort_fields' => ["updated_at:desc"]
|
||||
];
|
||||
|
||||
try {
|
||||
return ZincSearchBase::advancedSearch(self::$indexName, $searchParams);
|
||||
} catch (\Exception $e) {
|
||||
Log::error('搜索对话消息失败: ' . $e->getMessage());
|
||||
return [
|
||||
'success' => false,
|
||||
'error' => $e->getMessage(),
|
||||
'hits' => ['total' => ['value' => 0], 'hits' => []]
|
||||
];
|
||||
}
|
||||
}
|
||||
|
||||
// ==============================
|
||||
// 会话用户相关方法
|
||||
// ==============================
|
||||
|
||||
/**
|
||||
* 会话用户 - 生成文档ID
|
||||
*
|
||||
* @param WebSocketDialogUser $dialogUser
|
||||
* @return string
|
||||
*/
|
||||
public static function generateUserDocId(WebSocketDialogUser $dialogUser): string
|
||||
{
|
||||
return "user_{$dialogUser->userid}_dialog_{$dialogUser->dialog_id}";
|
||||
}
|
||||
|
||||
/**
|
||||
* 会话用户 - 生成文档格式
|
||||
*
|
||||
* @param WebSocketDialogUser $dialogUser
|
||||
* @return array
|
||||
*/
|
||||
public static function generateUserFormat(WebSocketDialogUser $dialogUser): array
|
||||
{
|
||||
return [
|
||||
'dialog_id' => $dialogUser->dialog_id,
|
||||
'created_at' => $dialogUser->created_at,
|
||||
'updated_at' => $dialogUser->updated_at,
|
||||
|
||||
'userid' => $dialogUser->userid,
|
||||
'top_at' => $dialogUser->top_at,
|
||||
'last_at' => $dialogUser->last_at,
|
||||
'mark_unread' => $dialogUser->mark_unread ?: 0,
|
||||
'silence' => $dialogUser->silence ?: 0,
|
||||
'hide' => $dialogUser->hide ?: 0,
|
||||
'color' => $dialogUser->color,
|
||||
|
||||
'doc_type' => 'dialog_user',
|
||||
'parent_id' => '' // 用户文档没有父文档
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* 会话用户 - 同步到ZincSearch
|
||||
*
|
||||
* @param WebSocketDialogUser $dialogUser
|
||||
* @return void
|
||||
*/
|
||||
public static function syncUser(WebSocketDialogUser $dialogUser): void
|
||||
{
|
||||
try {
|
||||
if (!ZincSearchBase::indexExists(self::$indexName)) {
|
||||
self::generateIndex();
|
||||
}
|
||||
$docFormat = self::generateUserFormat($dialogUser);
|
||||
ZincSearchBase::addDoc(self::$indexName, $docFormat);
|
||||
} catch (\Exception $e) {
|
||||
Log::error('syncUser: ' . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量同步会话用户
|
||||
*
|
||||
* @param array|iterable $dialogUsers WebSocketDialogUser对象集合
|
||||
* @return int 成功同步的用户数
|
||||
*/
|
||||
public static function batchSyncUsers($dialogUsers): int
|
||||
{
|
||||
$count = 0;
|
||||
try {
|
||||
if (!ZincSearchBase::indexExists(self::$indexName)) {
|
||||
self::generateIndex();
|
||||
}
|
||||
|
||||
$docs = [];
|
||||
foreach ($dialogUsers as $dialogUser) {
|
||||
$docs[] = self::generateUserFormat($dialogUser);
|
||||
$count++;
|
||||
}
|
||||
|
||||
if (!empty($docs)) {
|
||||
ZincSearchBase::addDocs(self::$indexName, $docs);
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::error('batchSyncUsers: ' . $e->getMessage());
|
||||
}
|
||||
|
||||
return $count;
|
||||
}
|
||||
|
||||
/**
|
||||
* 会话用户 - 从ZincSearch删除
|
||||
*
|
||||
* @param WebSocketDialogUser $dialogUser
|
||||
* @return void
|
||||
*/
|
||||
public static function deleteUser(WebSocketDialogUser $dialogUser): void
|
||||
{
|
||||
try {
|
||||
$docId = self::generateUserDocId($dialogUser);
|
||||
|
||||
// 首先查询相关消息
|
||||
$searchParams = [
|
||||
'search_type' => 'term',
|
||||
'query' => [
|
||||
'field' => 'parent_id',
|
||||
'term' => $docId
|
||||
],
|
||||
'from' => 0,
|
||||
'max_results' => 1000 // 限制一次查询返回的文档数
|
||||
];
|
||||
|
||||
$result = ZincSearchBase::advancedSearch(self::$indexName, $searchParams);
|
||||
$hits = $result['data']['hits']['hits'] ?? [];
|
||||
|
||||
// 批量删除子文档
|
||||
$batch = [];
|
||||
foreach ($hits as $hit) {
|
||||
if (isset($hit['_id'])) {
|
||||
ZincSearchBase::deleteDoc(self::$indexName, $hit['_id']);
|
||||
}
|
||||
}
|
||||
|
||||
// 删除用户文档
|
||||
ZincSearchBase::deleteDoc(self::$indexName, $docId);
|
||||
} catch (\Exception $e) {
|
||||
Log::error('deleteUser: ' . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
// ==============================
|
||||
// 会话消息相关方法
|
||||
// ==============================
|
||||
|
||||
/**
|
||||
* 会话消息 - 生成父文档ID
|
||||
*
|
||||
* @param WebSocketDialogMsg $dialogMsg
|
||||
* @param string $userid
|
||||
* @return string
|
||||
*/
|
||||
public static function generateMsgParentId(WebSocketDialogMsg $dialogMsg, string $userid): string
|
||||
{
|
||||
return "user_{$userid}_dialog_{$dialogMsg->dialog_id}";
|
||||
}
|
||||
|
||||
/**
|
||||
* 会话消息 - 生成文档ID
|
||||
*
|
||||
* @param WebSocketDialogMsg $dialogMsg
|
||||
* @param string $userid
|
||||
* @return string
|
||||
*/
|
||||
public static function generateMsgDocId(WebSocketDialogMsg $dialogMsg, string $userid): string
|
||||
{
|
||||
return "msg_{$dialogMsg->id}_user_{$userid}";
|
||||
}
|
||||
|
||||
/**
|
||||
* 会话消息 - 生成文档格式
|
||||
*
|
||||
* @param WebSocketDialogMsg $dialogMsg
|
||||
* @param string $userid
|
||||
* @return array
|
||||
*/
|
||||
public static function generateMsgFormat(WebSocketDialogMsg $dialogMsg, string $userid): array
|
||||
{
|
||||
return [
|
||||
'dialog_id' => $dialogMsg->dialog_id,
|
||||
'created_at' => $dialogMsg->created_at,
|
||||
'updated_at' => $dialogMsg->updated_at,
|
||||
|
||||
'msg_id' => $dialogMsg->id,
|
||||
'sender_userid' => $dialogMsg->userid,
|
||||
'msg_type' => $dialogMsg->type,
|
||||
'key' => $dialogMsg->key,
|
||||
'bot' => $dialogMsg->bot ? 1 : 0,
|
||||
|
||||
'doc_type' => 'dialog_msg',
|
||||
'parent_id' => self::generateMsgParentId($dialogMsg, $userid)
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* 会话消息 - 同步到ZincSearch
|
||||
*
|
||||
* @param WebSocketDialogMsg $dialogMsg
|
||||
* @return void
|
||||
*/
|
||||
public static function syncMsg(WebSocketDialogMsg $dialogMsg): void
|
||||
{
|
||||
try {
|
||||
// 获取此会话的所有用户
|
||||
$dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get();
|
||||
|
||||
if ($dialogUsers->isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
$docs = [];
|
||||
foreach ($dialogUsers as $dialogUser) {
|
||||
$docId = self::generateMsgDocId($dialogMsg, $dialogUser->userid);
|
||||
$docFormat = self::generateMsgFormat($dialogMsg, $dialogUser->userid);
|
||||
$docs[] = $docFormat;
|
||||
}
|
||||
|
||||
if (!empty($docs)) {
|
||||
if (ZincSearchBase::indexExists(self::$indexName)) {
|
||||
ZincSearchBase::addDocs(self::$indexName, $docs);
|
||||
} else {
|
||||
self::generateIndex();
|
||||
ZincSearchBase::addDocs(self::$indexName, $docs);
|
||||
}
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::error('syncMsg: ' . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量同步会话消息
|
||||
*
|
||||
* @param array|iterable $dialogMsgs WebSocketDialogMsg对象集合
|
||||
* @return int 成功同步的消息数
|
||||
*/
|
||||
public static function batchSyncMsgs($dialogMsgs): int
|
||||
{
|
||||
$count = 0;
|
||||
try {
|
||||
$docs = [];
|
||||
$userDialogs = [];
|
||||
|
||||
// 预处理:收集所有涉及的对话ID
|
||||
$dialogIds = [];
|
||||
foreach ($dialogMsgs as $message) {
|
||||
$dialogIds[] = $message->dialog_id;
|
||||
}
|
||||
$dialogIds = array_unique($dialogIds);
|
||||
|
||||
// 获取所有相关的用户-对话关系
|
||||
if (!empty($dialogIds)) {
|
||||
$dialogUsers = WebSocketDialogUser::whereIn('dialog_id', $dialogIds)->get();
|
||||
|
||||
// 按对话ID组织用户
|
||||
foreach ($dialogUsers as $dialogUser) {
|
||||
$userDialogs[$dialogUser->dialog_id][] = $dialogUser;
|
||||
}
|
||||
}
|
||||
|
||||
// 为每条消息准备所有相关用户的文档
|
||||
foreach ($dialogMsgs as $message) {
|
||||
if (isset($userDialogs[$message->dialog_id])) {
|
||||
foreach ($userDialogs[$message->dialog_id] as $dialogUser) {
|
||||
$docFormat = self::generateMsgFormat($message, $dialogUser->userid);
|
||||
$docs[] = $docFormat;
|
||||
$count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 批量写入
|
||||
if (!empty($docs)) {
|
||||
if (ZincSearchBase::indexExists(self::$indexName)) {
|
||||
ZincSearchBase::addDocs(self::$indexName, $docs);
|
||||
} else {
|
||||
self::generateIndex();
|
||||
ZincSearchBase::addDocs(self::$indexName, $docs);
|
||||
}
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::error('batchSyncMsgs: ' . $e->getMessage());
|
||||
}
|
||||
|
||||
return $count;
|
||||
}
|
||||
|
||||
/**
|
||||
* 会话消息 - 从ZincSearch删除
|
||||
*
|
||||
* @param WebSocketDialogMsg $dialogMsg
|
||||
* @return void
|
||||
*/
|
||||
public static function deleteMsg(WebSocketDialogMsg $dialogMsg): void
|
||||
{
|
||||
try {
|
||||
// 获取此会话的所有用户
|
||||
$dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get();
|
||||
|
||||
if ($dialogUsers->isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
foreach ($dialogUsers as $dialogUser) {
|
||||
$docId = self::generateMsgDocId($dialogMsg, $dialogUser->userid);
|
||||
ZincSearchBase::deleteDoc(self::$indexName, $docId);
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::error('deleteMsg: ' . $e->getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user