diff --git a/AGENTS.md b/AGENTS.md index 2b8f98404..0934fad9f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -39,7 +39,7 @@ - **Module(`app/Module`)** - 承载跨控制器 / 跨模型的业务逻辑与独立功能子域,例如: - - 外部服务集成:`AgoraIO/*`、`ZincSearch/*` 等; + - 外部服务集成:`AgoraIO/*`、`Manticore/*` 等; - 通用工具:`Lock.php`、`TextExtractor.php`、`Image.php` 等; - 项目 / 任务 / 对话等领域里的复杂协作逻辑。 - 原则: @@ -88,7 +88,7 @@ - **偏好(Preferences)**:用户表达持续性偏好时(语言、输出格式、技术选型等),应尽快写入; - **流程 / 习惯(Procedures)**:形成「以后都按这个流程来」的稳定开发 / 发布 / 调试流程时,应记录为可复用步骤; - **约束 / 决策(Requirements)**:项目长期有效的决策,如不再支持某版本、某模块的架构约定等; - - **事实 / 关系(Facts)**:模块边界约定、服务之间的调用关系、与外部系统(如 AgoraIO、ZincSearch)集成方式等。 + - **事实 / 关系(Facts)**:模块边界约定、服务之间的调用关系、与外部系统(如 AgoraIO、Manticore Search)集成方式等。 - 写入建议: - 默认使用 `source: "text"`,在 `episode_body` 中用简洁结构化自然语言描述背景、类型、范围、具体内容; - 需要结构化数据时可用 `source: "json"`,保证 `episode_body` 是合法 JSON 字符串; diff --git a/app/Console/Commands/SyncUserMsgToZincSearch.php b/app/Console/Commands/SyncUserMsgToZincSearch.php deleted file mode 100644 index 411a0546a..000000000 --- a/app/Console/Commands/SyncUserMsgToZincSearch.php +++ /dev/null @@ -1,175 +0,0 @@ -error("应用「ZincSearch」未安装"); - return 1; - } - - // 注册信号处理器(仅在支持pcntl扩展的环境下) - if (extension_loaded('pcntl')) { - pcntl_async_signals(true); // 启用异步信号处理 - pcntl_signal(SIGINT, [$this, 'handleSignal']); // Ctrl+C - pcntl_signal(SIGTERM, [$this, 'handleSignal']); // kill - } - - // 检查锁,如果已被占用则退出 - $lockInfo = $this->getLock(); - if ($lockInfo) { - $this->error("命令已在运行中,开始时间: {$lockInfo['started_at']}"); - return 1; - } - - // 设置锁 - $this->setLock(); - - // 清除索引 - if ($this->option('c')) { - $this->info('清除索引...'); - ZincSearchKeyValue::clear(); - ZincSearchDialogMsg::clear(); - $this->info("索引删除成功"); - $this->releaseLock(); - return 0; - } - - $this->info('开始同步聊天数据...'); - - // 同步消息数据 - $this->syncDialogMsgs(); - - // 完成 - $this->info("\n同步完成"); - $this->releaseLock(); - return 0; - } - - /** - * 获取锁信息 - * - * @return array|null 如果锁存在返回锁信息,否则返回null - */ - private function getLock(): ?array - { - $lockKey = md5($this->signature); - return Cache::has($lockKey) ? Cache::get($lockKey) : null; - } - - /** - * 设置锁 - */ - private function setLock(): void - { - $lockKey = md5($this->signature); - $lockInfo = [ - 'started_at' => date('Y-m-d H:i:s') - ]; - Cache::put($lockKey, $lockInfo, 300); // 5分钟 - } - - /** - * 释放锁 - */ - private function releaseLock(): void - { - $lockKey = md5($this->signature); - Cache::forget($lockKey); - } - - /** - * 处理终端信号 - * - * @param int $signal - * @return void - */ - public function handleSignal(int $signal): void - { - // 释放锁 - $this->releaseLock(); - exit(0); - } - - /** - * 同步消息数据 - * - * @return void - */ - private function syncDialogMsgs(): void - { - // 获取上次同步的最后ID - $lastKey = "sync:dialogUserMsgLastId"; - $lastId = $this->option('i') ? intval(ZincSearchKeyValue::get($lastKey, 0)) : 0; - - if ($lastId > 0) { - $this->info("\n同步消息数据({$lastId})..."); - } else { - $this->info("\n同步消息数据..."); - } - - $num = 0; - $count = WebSocketDialogMsg::where('id', '>', $lastId)->count(); - $batchSize = $this->option('batch'); - - $total = 0; - $lastNum = 0; - - do { - // 获取一批 - $dialogMsgs = WebSocketDialogMsg::where('id', '>', $lastId) - ->orderBy('id') - ->limit($batchSize) - ->get(); - - if ($dialogMsgs->isEmpty()) { - break; - } - - $num += count($dialogMsgs); - $progress = round($num / $count * 100, 2); - if ($progress < 100) { - $progress = number_format($progress, 2); - } - $this->info("{$num}/{$count} ({$progress}%) 正在同步消息ID {$dialogMsgs->first()->id} ~ {$dialogMsgs->last()->id} ({$total}|{$lastNum})"); - - // 刷新锁 - $this->setLock(); - - // 同步数据 - $lastNum = ZincSearchDialogMsg::batchSync($dialogMsgs); - $total += $lastNum; - - // 更新最后ID - $lastId = $dialogMsgs->last()->id; - ZincSearchKeyValue::set($lastKey, $lastId); - } while (count($dialogMsgs) == $batchSize); - - $this->info("同步消息结束 - 最后ID {$lastId}"); - } -} diff --git a/app/Http/Controllers/Api/DialogController.php b/app/Http/Controllers/Api/DialogController.php index 51004964b..40ba86e7f 100755 --- a/app/Http/Controllers/Api/DialogController.php +++ b/app/Http/Controllers/Api/DialogController.php @@ -32,7 +32,7 @@ use App\Models\WebSocketDialogMsgTranslate; use App\Models\WebSocketDialogSession; use App\Models\UserRecentItem; use App\Module\Table\OnlineData; -use App\Module\ZincSearch\ZincSearchDialogMsg; +use App\Module\Manticore\ManticoreMsg; use Hhxsv5\LaravelS\Swoole\Task\Task; /** @@ -155,7 +155,7 @@ class DialogController extends AbstractController } // 搜索消息会话 if (count($list) < $take) { - $searchResults = ZincSearchDialogMsg::search($user->userid, $key, 0, $take - count($list)); + $searchResults = ManticoreMsg::searchDialogs($user->userid, $key, 0, $take - count($list)); if ($searchResults) { foreach ($searchResults as $item) { if ($dialog = WebSocketDialog::find($item['id'])) { @@ -726,7 +726,7 @@ class DialogController extends AbstractController } else { // 搜索消息 $list = []; - $searchResults = ZincSearchDialogMsg::search($user->userid, $key, 0, Base::getPaginate(50, 20, 'take')); + $searchResults = ManticoreMsg::searchDialogs($user->userid, $key, 0, Base::getPaginate(50, 20, 'take')); if ($searchResults) { foreach ($searchResults as $item) { if ($dialog = WebSocketDialog::find($item['id'])) { diff --git a/app/Http/Controllers/IndexController.php b/app/Http/Controllers/IndexController.php index 9393e89f8..49526ba68 100755 --- a/app/Http/Controllers/IndexController.php +++ b/app/Http/Controllers/IndexController.php @@ -21,7 +21,6 @@ use App\Tasks\AutoArchivedTask; use App\Tasks\DeleteBotMsgTask; use App\Tasks\CheckinRemindTask; use App\Tasks\CloseMeetingRoomTask; -use App\Tasks\ZincSearchSyncTask; use App\Tasks\ManticoreSyncTask; use App\Tasks\UnclaimedTaskRemindTask; use Hhxsv5\LaravelS\Swoole\Task\Task; @@ -272,8 +271,6 @@ class IndexController extends InvokeController Task::deliver(new UnclaimedTaskRemindTask()); // 关闭会议室 Task::deliver(new CloseMeetingRoomTask()); - // ZincSearch 同步 - Task::deliver(new ZincSearchSyncTask()); // Manticore Search 同步 Task::deliver(new ManticoreSyncTask()); diff --git a/app/Module/Apps.php b/app/Module/Apps.php index 75bb1e46c..e3c8da237 100644 --- a/app/Module/Apps.php +++ b/app/Module/Apps.php @@ -54,7 +54,6 @@ class Apps 'office' => 'OnlyOffice', 'drawio' => 'Drawio', 'minder' => 'Minder', - 'search' => 'ZincSearch', 'manticore' => 'Manticore Search', default => $appId, }; diff --git a/app/Module/Manticore/ManticoreMsg.php b/app/Module/Manticore/ManticoreMsg.php index e7c9ecbe5..e2c9c3fb9 100644 --- a/app/Module/Manticore/ManticoreMsg.php +++ b/app/Module/Manticore/ManticoreMsg.php @@ -7,6 +7,8 @@ use App\Models\WebSocketDialogUser; use App\Module\Apps; use App\Module\Base; use App\Module\AI; +use Carbon\Carbon; +use DB; use Illuminate\Support\Facades\Log; /** @@ -171,6 +173,145 @@ class ManticoreMsg return $formatted; } + /** + * 按对话搜索消息(用于对话列表搜索) + * + * 返回包含匹配消息的对话列表,每个对话只返回一次 + * 当 Manticore 未安装时,回退到 MySQL LIKE 搜索 + * + * @param int $userid 用户ID + * @param string $keyword 搜索关键词 + * @param int $from 起始位置 + * @param int $size 返回数量 + * @return array 对话列表 + */ + public static function searchDialogs(int $userid, string $keyword, int $from = 0, int $size = 20): array + { + if (empty($keyword)) { + return []; + } + + // 未安装 Manticore 时使用 MySQL 回退搜索 + if (!Apps::isInstalled("manticore")) { + return self::searchDialogsByMysql($userid, $keyword, $from, $size); + } + + try { + // 使用全文搜索获取更多结果,然后按对话分组 + $results = ManticoreBase::msgFullTextSearch($keyword, $userid, 100, 0); + + if (empty($results)) { + return []; + } + + // 收集所有对话ID + $dialogIds = array_unique(array_column($results, 'dialog_id')); + + // 获取用户在这些对话中的信息 + $dialogUsers = WebSocketDialogUser::where('userid', $userid) + ->whereIn('dialog_id', $dialogIds) + ->get() + ->keyBy('dialog_id'); + + // 按对话分组,每个对话只保留最相关的消息 + $msgs = []; + $seenDialogs = []; + foreach ($results as $item) { + $dialogId = $item['dialog_id']; + + // 每个对话只取第一条(最相关的) + if (isset($seenDialogs[$dialogId])) { + continue; + } + $seenDialogs[$dialogId] = true; + + // 获取用户在该对话的信息 + $dialogUser = $dialogUsers->get($dialogId); + if (!$dialogUser) { + continue; + } + + $msgs[] = [ + 'id' => $dialogId, + 'search_msg_id' => $item['msg_id'], + 'user_at' => $dialogUser->updated_at ? Carbon::parse($dialogUser->updated_at)->format('Y-m-d H:i:s') : null, + 'mark_unread' => $dialogUser->mark_unread, + 'silence' => $dialogUser->silence, + 'hide' => $dialogUser->hide, + 'color' => $dialogUser->color, + 'top_at' => $dialogUser->top_at ? Carbon::parse($dialogUser->top_at)->format('Y-m-d H:i:s') : null, + 'last_at' => $dialogUser->last_at ? Carbon::parse($dialogUser->last_at)->format('Y-m-d H:i:s') : null, + ]; + + // 已达到需要的数量 + if (count($msgs) >= $from + $size) { + break; + } + } + + // 应用分页 + return array_slice($msgs, $from, $size); + } catch (\Exception $e) { + Log::error('Manticore searchDialogs error: ' . $e->getMessage()); + // 出错时回退到 MySQL 搜索 + return self::searchDialogsByMysql($userid, $keyword, $from, $size); + } + } + + /** + * MySQL 回退搜索(按对话搜索消息) + * + * 通过联表查询获取用户有权限的对话中匹配的消息 + * + * @param int $userid 用户ID + * @param string $keyword 搜索关键词 + * @param int $from 起始位置 + * @param int $size 返回数量 + * @return array 对话列表 + */ + private static function searchDialogsByMysql(int $userid, string $keyword, int $from = 0, int $size = 20): array + { + $items = DB::table('web_socket_dialog_users as u') + ->select([ + 'd.*', + 'u.top_at', + 'u.last_at', + 'u.mark_unread', + 'u.silence', + 'u.hide', + 'u.color', + 'u.updated_at as user_at', + 'm.id as search_msg_id' + ]) + ->join('web_socket_dialogs as d', 'u.dialog_id', '=', 'd.id') + ->join('web_socket_dialog_msgs as m', 'm.dialog_id', '=', 'd.id') + ->where('u.userid', $userid) + ->where('m.bot', 0) + ->whereNull('d.deleted_at') + ->where('m.key', 'like', "%{$keyword}%") + ->orderByDesc('m.id') + ->offset($from) + ->limit($size) + ->get() + ->all(); + + $msgs = []; + foreach ($items as $item) { + $msgs[] = [ + 'id' => $item->id, + 'search_msg_id' => $item->search_msg_id, + 'user_at' => Carbon::parse($item->user_at)->format('Y-m-d H:i:s'), + 'mark_unread' => $item->mark_unread, + 'silence' => $item->silence, + 'hide' => $item->hide, + 'color' => $item->color, + 'top_at' => Carbon::parse($item->top_at)->format('Y-m-d H:i:s'), + 'last_at' => Carbon::parse($item->last_at)->format('Y-m-d H:i:s'), + ]; + } + return $msgs; + } + // ============================== // 权限计算方法(MVA 方案核心) // ============================== diff --git a/app/Module/ZincSearch/ZincSearchBase.php b/app/Module/ZincSearch/ZincSearchBase.php deleted file mode 100644 index 60e4d5976..000000000 --- a/app/Module/ZincSearch/ZincSearchBase.php +++ /dev/null @@ -1,267 +0,0 @@ -host = env('ZINCSEARCH_HOST', 'search'); - $this->port = env('ZINCSEARCH_PORT', '4080'); - $this->user = env('DB_USERNAME', ''); - $this->pass = env('DB_PASSWORD', ''); - } - - /** - * 通用请求方法 - */ - private function request($path, $body = null, $method = 'POST') - { - if (!Apps::isInstalled("search")) { - return [ - 'success' => false, - 'error' => Doo::translate("应用「ZincSearch」未安装") - ]; - } - - $ch = curl_init(); - curl_setopt($ch, CURLOPT_URL, "http://{$this->host}:{$this->port}{$path}"); - curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method); - curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); - curl_setopt($ch, CURLOPT_USERPWD, $this->user . ':' . $this->pass); - curl_setopt($ch, CURLOPT_HTTPAUTH, CURLAUTH_BASIC); - - $headers = ['Content-Type: application/json']; - if ($method === 'BULK') { - $headers = ['Content-Type: text/plain']; - } - - curl_setopt($ch, CURLOPT_HTTPHEADER, $headers); - if ($body !== null) { - curl_setopt($ch, CURLOPT_POSTFIELDS, $body); - } - $result = curl_exec($ch); - $error = curl_error($ch); - $status = curl_getinfo($ch, CURLINFO_HTTP_CODE); - curl_close($ch); - if ($error) { - return ['success' => false, 'error' => $error]; - } - $data = json_decode($result, true); - return [ - 'success' => $status >= 200 && $status < 300, - 'status' => $status, - 'data' => $data - ]; - } - - // ============================== - // 索引管理相关方法 - // ============================== - - /** - * 创建索引 - */ - public static function createIndex($index, $mappings = []): array - { - $body = json_encode([ - 'name' => $index, - 'mappings' => $mappings - ]); - return (new self())->request("/api/index", $body); - } - - /** - * 获取索引信息 - */ - public static function getIndex($index): array - { - return (new self())->request("/api/index/{$index}", null, 'GET'); - } - - /** - * 判断索引是否存在 - */ - public static function indexExists($index): bool - { - $result = self::getIndex($index); - return $result['success'] && isset($result['data']['name']); - } - - /** - * 获取所有索引 - */ - public static function listIndices(): array - { - return (new self())->request("/api/index", null, 'GET'); - } - - /** - * 删除索引 - */ - public static function deleteIndex($index): array - { - return (new self())->request("/api/index/{$index}", null, 'DELETE'); - } - - /** - * 删除所有索引 - */ - public static function deleteAllIndices(): array - { - $instance = new self(); - $result = $instance->request("/api/index", null, 'GET'); - - if (!$result['success']) { - return $result; - } - - $indices = $result['data'] ?? []; - $deleteResults = []; - $success = true; - - foreach ($indices as $index) { - $indexName = $index['name'] ?? ''; - if (!empty($indexName)) { - $deleteResult = $instance->request("/api/index/{$indexName}", null, 'DELETE'); - $deleteResults[$indexName] = $deleteResult; - - if (!$deleteResult['success']) { - $success = false; - } - } - } - - return [ - 'success' => $success, - 'message' => $success ? '所有索引删除成功' : '部分索引删除失败', - 'details' => $deleteResults - ]; - } - - /** - * 分析文本 - */ - public static function analyze($analyzer, $text): array - { - $body = json_encode([ - 'analyzer' => $analyzer, - 'text' => $text - ]); - return (new self())->request("/api/_analyze", $body); - } - - // ============================== - // 文档管理相关方法 - // ============================== - - /** - * 写入单条文档 - */ - public static function addDoc($index, $doc): array - { - $body = json_encode($doc); - return (new self())->request("/api/{$index}/_doc", $body); - } - - /** - * 更新文档 - */ - public static function updateDoc($index, $id, $doc): array - { - $body = json_encode($doc); - return (new self())->request("/api/{$index}/_update/{$id}", $body); - } - - /** - * 删除文档 - */ - public static function deleteDoc($index, $id): array - { - return (new self())->request("/api/{$index}/_doc/{$id}", null, 'DELETE'); - } - - /** - * 批量写入文档 - */ - public static function addDocs($index, $docs): array - { - $body = json_encode([ - 'index' => $index, - 'records' => $docs - ]); - return (new self())->request("/api/_bulkv2", $body); - } - - /** - * 使用原始BULK API批量写入文档 - * 请求格式为Elasticsearch兼容格式 - */ - public static function bulkDocs($data): array - { - return (new self())->request("/api/_bulk", $data, 'BULK'); - } - - // ============================== - // 搜索相关方法 - // ============================== - - /** - * 查询文档 - */ - public static function search($index, $query, $from = 0, $size = 10): array - { - $searchParams = [ - 'search_type' => 'match', - 'query' => [ - 'term' => $query - ], - 'from' => $from, - 'max_results' => $size - ]; - - $body = json_encode($searchParams); - return (new self())->request("/api/{$index}/_search", $body); - } - - /** - * 高级查询文档 - */ - public static function advancedSearch($index, $searchParams): array - { - $body = json_encode($searchParams); - return (new self())->request("/api/{$index}/_search", $body); - } - - /** - * 兼容ES查询文档 - */ - public static function elasticSearch($index, $searchParams): array - { - $body = json_encode($searchParams); - return (new self())->request("/es/{$index}/_search", $body); - } - - /** - * 多索引查询 - */ - public static function multiSearch($queries): array - { - $body = json_encode($queries); - return (new self())->request("/api/_msearch", $body); - } -} diff --git a/app/Module/ZincSearch/ZincSearchDialogMsg.php b/app/Module/ZincSearch/ZincSearchDialogMsg.php deleted file mode 100644 index 3e7363c8b..000000000 --- a/app/Module/ZincSearch/ZincSearchDialogMsg.php +++ /dev/null @@ -1,612 +0,0 @@ - [ - // 拓展数据 - 'dialog_userid' => ['type' => 'keyword', 'index' => true], // 对话ID+用户ID - 'to_userid' => ['type' => 'numeric', 'index' => true], // 此消息发给的用户ID - - // 消息数据 - 'id' => ['type' => 'numeric', 'index' => true], - 'dialog_id' => ['type' => 'numeric', 'index' => true], - 'dialog_type' => ['type' => 'keyword', 'index' => true], - 'session_id' => ['type' => 'numeric', 'index' => true], - 'userid' => ['type' => 'numeric', 'index' => true], - 'type' => ['type' => 'keyword', 'index' => true], - 'key' => ['type' => 'text', 'index' => true], - 'created_at' => ['type' => 'date', 'index' => true], - 'updated_at' => ['type' => 'date', 'index' => true], - ] - ]; - $result = ZincSearchBase::createIndex(self::$indexNameMsg, $mappings); - return $result['success'] ?? false; - } - if (!ZincSearchBase::indexExists(self::$indexNameUser)) { - $mappings = [ - 'properties' => [ - // 拓展数据 - 'dialog_userid' => ['type' => 'keyword', 'index' => true], // 对话ID+用户ID - - // 用户数据 - 'id' => ['type' => 'numeric', 'index' => true], - 'dialog_id' => ['type' => 'numeric', 'index' => true], - 'userid' => ['type' => 'numeric', 'index' => true], - 'top_at' => ['type' => 'date', 'index' => true], - 'last_at' => ['type' => 'date', 'index' => true], - 'mark_unread' => ['type' => 'numeric', 'index' => true], - 'silence' => ['type' => 'numeric', 'index' => true], - 'hide' => ['type' => 'numeric', 'index' => true], - 'color' => ['type' => 'keyword', 'index' => true], - 'created_at' => ['type' => 'date', 'index' => true], - 'updated_at' => ['type' => 'date', 'index' => true], - ] - ]; - $result = ZincSearchBase::createIndex(self::$indexNameUser, $mappings); - return $result['success'] ?? false; - } - return true; - } - - /** - * 清空所有键值 - * - * @return bool 是否成功 - */ - public static function clear(): bool - { - // 检查索引是否存在然后删除 - if (ZincSearchBase::indexExists(self::$indexNameMsg)) { - $deleteResult = ZincSearchBase::deleteIndex(self::$indexNameMsg); - if (!($deleteResult['success'] ?? false)) { - return false; - } - } - if (ZincSearchBase::indexExists(self::$indexNameUser)) { - $deleteResult = ZincSearchBase::deleteIndex(self::$indexNameUser); - if (!($deleteResult['success'] ?? false)) { - return false; - } - } - - return self::ensureIndex(); - } - - // ============================== - // 搜索方法 - // ============================== - - /** - * 根据用户ID和消息关键词搜索会话 - * - * @param string $userid 用户ID - * @param string $keyword 消息关键词 - * @param int $from 起始位置 - * @param int $size 返回结果数量 - * @return array - */ - public static function search(string $userid, string $keyword, int $from = 0, int $size = 20): array - { - if (!Apps::isInstalled("search")) { - // 如果搜索功能未安装,使用数据库查询 - return self::searchByMysql($userid, $keyword, $from, $size); - } - - $searchParams = [ - 'query' => [ - 'bool' => [ - 'must' => [ - ['term' => ['to_userid' => $userid]], - ['match_phrase' => ['key' => $keyword]] - ] - ] - ], - 'from' => $from, - 'size' => $size, - 'sort' => [ - ['updated_at' => 'desc'] - ] - ]; - try { - $result = ZincSearchBase::elasticSearch(self::$indexNameMsg, $searchParams); - $hits = $result['data']['hits']['hits'] ?? []; - - // 收集所有的用户信息 - $dialogUserids = []; - foreach ($hits as $hit) { - $source = $hit['_source']; - $dialogUserids[] = $source['dialog_userid']; - } - $userInfos = self::searchUser(array_unique($dialogUserids)); - - // 组合返回结果,将用户信息合并到消息中 - $msgs = []; - foreach ($hits as $hit) { - $msgInfo = $hit['_source']; - $userInfo = $userInfos[$msgInfo['dialog_userid']] ?? []; - if ($userInfo) { - $msgs[] = [ - 'id' => $msgInfo['dialog_id'], - 'search_msg_id' => $msgInfo['id'], - 'user_at' => Carbon::parse($msgInfo['updated_at'])->format('Y-m-d H:i:s'), - - 'mark_unread' => $userInfo['mark_unread'], - 'silence' => $userInfo['silence'], - 'hide' => $userInfo['hide'], - 'color' => $userInfo['color'], - 'top_at' => Carbon::parse($userInfo['top_at'])->format('Y-m-d H:i:s'), - 'last_at' => Carbon::parse($userInfo['last_at'])->format('Y-m-d H:i:s'), - ]; - } - } - return $msgs; - } catch (\Exception $e) { - Log::error('search: ' . $e->getMessage()); - return []; - } - } - - /** - * 根据用户ID和消息关键词搜索会话(MySQL 版本,主要用于未安装ZincSearch的情况) - * - * @param string $userid 用户ID - * @param string $keyword 消息关键词 - * @param int $from 起始位置 - * @param int $size 返回结果数量 - * @return array - */ - private static function searchByMysql(string $userid, string $keyword, int $from = 0, int $size = 20): array - { - $items = DB::table('web_socket_dialog_users as u') - ->select(['d.*', 'u.top_at', 'u.last_at', 'u.mark_unread', 'u.silence', 'u.hide', 'u.color', 'u.updated_at as user_at', 'm.id as search_msg_id']) - ->join('web_socket_dialogs as d', 'u.dialog_id', '=', 'd.id') - ->join('web_socket_dialog_msgs as m', 'm.dialog_id', '=', 'd.id') - ->where('u.userid', $userid) - ->where('m.bot', 0) - ->whereNull('d.deleted_at') - ->where('m.key', 'like', "%{$keyword}%") - ->orderByDesc('m.id') - ->offset($from) - ->limit($size) - ->get() - ->all(); - $msgs = []; - foreach ($items as $item) { - $msgs[] = [ - 'id' => $item->id, - 'search_msg_id' => $item->search_msg_id, - 'user_at' => Carbon::parse($item->user_at)->format('Y-m-d H:i:s'), - - 'mark_unread' => $item->mark_unread, - 'silence' => $item->silence, - 'hide' => $item->hide, - 'color' => $item->color, - 'top_at' => Carbon::parse($item->top_at)->format('Y-m-d H:i:s'), - 'last_at' => Carbon::parse($item->last_at)->format('Y-m-d H:i:s'), - ]; - } - return $msgs; - } - - /** - * 根据对话用户ID搜索用户信息 - * @param array $dialogUserids - * @return array - */ - private static function searchUser(array $dialogUserids): array - { - if (empty($dialogUserids)) { - return []; - } - - $userInfos = []; - - // 构建用户查询条件 - $userSearchParams = [ - 'query' => [ - 'bool' => [ - 'should' => [] - ] - ], - 'size' => count($dialogUserids) // 确保取到所有符合条件的记录 - ]; - - // 添加所有 dialog_userid 到查询条件 - foreach ($dialogUserids as $dialogUserid) { - $userSearchParams['query']['bool']['should'][] = [ - 'term' => ['dialog_userid' => $dialogUserid] - ]; - } - - // 查询用户信息 - $userResult = ZincSearchBase::elasticSearch(self::$indexNameUser, $userSearchParams); - $userHits = $userResult['data']['hits']['hits'] ?? []; - - // 以 dialog_userid 为键保存用户信息 - foreach ($userHits as $userHit) { - $userSource = $userHit['_source']; - $userInfos[$userSource['dialog_userid']] = $userSource; - } - - return $userInfos; - } - - // ============================== - // 生成内容 - // ============================== - - /** - * 生成 dialog_userid - * - * @param WebSocketDialogUser $dialogUser - * @return string - */ - private static function generateDialogUserid(WebSocketDialogUser $dialogUser): string - { - return "{$dialogUser->dialog_id}_{$dialogUser->userid}"; - } - - /** - * 生成文档内容 - * - * @param WebSocketDialogMsg $dialogMsg - * @param WebSocketDialogUser $dialogUser - * @return array - */ - private static function generateMsgData(WebSocketDialogMsg $dialogMsg, WebSocketDialogUser $dialogUser): array - { - return [ - '_id' => self::$indexNameMsg . "_" . $dialogMsg->id . "_" . $dialogUser->userid, - 'dialog_userid' => self::generateDialogUserid($dialogUser), - 'to_userid' => $dialogUser->userid, - - 'id' => $dialogMsg->id, - 'dialog_id' => $dialogMsg->dialog_id, - 'dialog_type' => $dialogMsg->dialog_type, - 'session_id' => $dialogMsg->session_id, - 'userid' => $dialogMsg->userid, - 'type' => $dialogMsg->type, - 'key' => $dialogMsg->key, - 'created_at' => $dialogMsg->created_at, - 'updated_at' => $dialogMsg->updated_at, - ]; - } - private static function generateUserData(WebSocketDialogUser $dialogUser): array - { - return [ - '_id' => self::$indexNameUser . "_" . $dialogUser->id, - 'dialog_userid' => self::generateDialogUserid($dialogUser), - - 'id' => $dialogUser->id, - 'dialog_id' => $dialogUser->dialog_id, - 'userid' => $dialogUser->userid, - 'top_at' => $dialogUser->top_at, - 'last_at' => $dialogUser->last_at, - 'mark_unread' => $dialogUser->mark_unread, - 'silence' => $dialogUser->silence, - 'hide' => $dialogUser->hide, - 'color' => $dialogUser->color, - 'created_at' => $dialogUser->created_at, - 'updated_at' => $dialogUser->updated_at, - ]; - } - - // ============================== - // 基本方法 - // ============================== - - /** - * 同步消息(建议在异步进程中使用) - * - * @param WebSocketDialogMsg $dialogMsg - * @return bool - */ - public static function sync(WebSocketDialogMsg $dialogMsg): bool - { - if (!self::ensureIndex()) { - return false; - } - - if ($dialogMsg->bot) { - // 如果是机器人消息,跳过 - return true; - } - - try { - // 获取此会话的所有用户 - $dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get(); - - if ($dialogUsers->isEmpty()) { - return true; - } - - $msgs = []; - $users = []; - foreach ($dialogUsers as $dialogUser) { - if (empty($dialogMsg->key)) { - // 如果消息没有关键词,跳过 - continue; - } - if ($dialogUser->userid == 0) { - // 跳过系统用户 - continue; - } - $msgs[] = self::generateMsgData($dialogMsg, $dialogUser); - $users[$dialogUser->id] = self::generateUserData($dialogUser); - } - - if ($msgs) { - // 批量写入消息 - ZincSearchBase::addDocs(self::$indexNameMsg, $msgs); - } - - if ($users) { - // 批量写入用户 - ZincSearchBase::addDocs(self::$indexNameUser, array_values($users)); - } - - return true; - } catch (\Exception $e) { - Log::error('sync: ' . $e->getMessage()); - return false; - } - } - - /** - * 批量同步消息(建议在异步进程中使用) - * - * @param WebSocketDialogMsg[] $dialogMsgs - * @return int 成功同步的消息数 - */ - public static function batchSync($dialogMsgs): int - { - if (!self::ensureIndex()) { - return 0; - } - - $count = 0; - try { - $msgs = []; - $users = []; - $userDialogs = []; - - // 预处理:收集所有涉及的对话ID - $dialogIds = []; - foreach ($dialogMsgs as $dialogMsg) { - $dialogIds[] = $dialogMsg->dialog_id; - } - $dialogIds = array_unique($dialogIds); - - // 获取所有相关的用户-对话关系 - if (!empty($dialogIds)) { - $dialogUsers = WebSocketDialogUser::whereIn('dialog_id', $dialogIds)->get(); - - // 按对话ID组织用户 - foreach ($dialogUsers as $dialogUser) { - $userDialogs[$dialogUser->dialog_id][] = $dialogUser; - } - } - - // 为每条消息准备所有相关用户的文档 - foreach ($dialogMsgs as $dialogMsg) { - if (!isset($userDialogs[$dialogMsg->dialog_id])) { - // 如果该会话没有用户,跳过 - continue; - } - if ($dialogMsg->bot) { - // 如果是机器人消息,跳过 - continue; - } - /** @var WebSocketDialogUser $dialogUser */ - foreach ($userDialogs[$dialogMsg->dialog_id] as $dialogUser) { - if (empty($dialogMsg->key)) { - // 如果消息没有关键词,跳过 - continue; - } - if ($dialogUser->userid == 0) { - // 跳过系统用户 - continue; - } - $msgs[] = self::generateMsgData($dialogMsg, $dialogUser); - $users[$dialogUser->id] = self::generateUserData($dialogUser); - $count++; - } - } - - if ($msgs) { - // 批量写入消息 - ZincSearchBase::addDocs(self::$indexNameMsg, $msgs); - } - - if ($users) { - // 批量写入用户 - ZincSearchBase::addDocs(self::$indexNameUser, array_values($users)); - } - - } catch (\Exception $e) { - Log::error('batchSync: ' . $e->getMessage()); - } - - return $count; - } - - /** - * 同步用户(建议在异步进程中使用) - * @param WebSocketDialogUser $dialogUser - * @return bool - */ - public static function userSync(WebSocketDialogUser $dialogUser): bool - { - if (!self::ensureIndex()) { - return false; - } - $data = self::generateUserData($dialogUser); - - // 生成查询用户条件 - $searchParams = [ - 'query' => [ - 'bool' => [ - 'must' => [ - ['term' => ['dialog_userid' => $data['dialog_userid']]] - ] - ] - ], - 'size' => 1 - ]; - - try { - // 查询用户是否存在 - $result = ZincSearchBase::elasticSearch(self::$indexNameUser, $searchParams); - $hits = $result['data']['hits']['hits'] ?? []; - - // 同步用户(存在更新、不存在添加) - $result = ZincSearchBase::addDoc(self::$indexNameUser, $data); - if (!isset($result['success'])) { - return false; - } - - // 用户不存在,同步消息 - if (empty($hits)) { - $lastId = 0; // 上次同步的最后ID - $batchSize = 500; // 每批处理的消息数量 - - // 分批同步消息 - do { - // 获取一批 - $dialogMsgs = WebSocketDialogMsg::whereDialogId($dialogUser->dialog_id) - ->where('id', '>', $lastId) - ->orderBy('id') - ->limit($batchSize) - ->get(); - - if ($dialogMsgs->isEmpty()) { - break; - } - - // 同步数据 - ZincSearchDialogMsg::batchSync($dialogMsgs); - - // 更新最后ID - $lastId = $dialogMsgs->last()->id; - } while (count($dialogMsgs) == $batchSize); - } - - return true; - } catch (\Exception $e) { - Log::error('userSync: ' . $e->getMessage()); - return false; - } - } - - /** - * 删除(建议在异步进程中使用) - * - * @param WebSocketDialogMsg|WebSocketDialogUser|int $data - * @return int - */ - public static function delete(mixed $data): int - { - $batchSize = 500; // 每批处理的文档数量 - $totalDeleted = 0; // 总共删除的文档数量 - $from = 0; - - // 根据数据类型生成查询条件 - if ($data instanceof WebSocketDialogMsg) { - $query = [ - 'field' => 'id', - 'term' => (string) $data->id - ]; - } elseif ($data instanceof WebSocketDialogUser) { - $query = [ - 'field' => 'dialog_userid', - 'term' => self::generateDialogUserid($data), - ]; - } else { - $query = [ - 'field' => 'id', - 'term' => (string) $data - ]; - } - - try { - while (true) { - // 根据消息ID查找相关文档 - $result = ZincSearchBase::advancedSearch(self::$indexNameMsg, [ - 'search_type' => 'term', - 'query' => $query, - 'from' => $from, - 'max_results' => $batchSize - ]); - $hits = $result['data']['hits']['hits'] ?? []; - - // 如果没有更多文档,退出循环 - if (empty($hits)) { - break; - } - - // 删除本批次找到的所有文档 - foreach ($hits as $hit) { - if (isset($hit['_id'])) { - ZincSearchBase::deleteDoc(self::$indexNameMsg, $hit['_id']); - $totalDeleted++; - } - } - - // 如果返回的文档数少于批次大小,说明已经没有更多文档了 - if (count($hits) < $batchSize) { - break; - } - - // 移动到下一批 - $from += $batchSize; - } - } catch (\Exception $e) { - Log::error('delete: ' . $e->getMessage()); - } - - return $totalDeleted; - } -} diff --git a/app/Module/ZincSearch/ZincSearchKeyValue.php b/app/Module/ZincSearch/ZincSearchKeyValue.php deleted file mode 100644 index f9b9e44d5..000000000 --- a/app/Module/ZincSearch/ZincSearchKeyValue.php +++ /dev/null @@ -1,276 +0,0 @@ - 'logo.png', 'theme' => 'dark']); - * - 合并现有数据: set('site_config', ['footer' => '版权所有'], true); - * - 获取键值: $siteName = get('site_name'); - * - 获取键值带默认值: $theme = get('theme', 'light'); - * - 删除键值: delete('temporary_data'); - * - * 3. 批量操作 - * - 批量设置: batchSet(['user_count' => 100, 'active_users' => 50]); - * - 批量获取: $stats = batchGet(['user_count', 'active_users']); - */ -class ZincSearchKeyValue -{ - /** - * 索引名称 - */ - protected static string $indexName = 'keyValue'; - - // ============================== - // 基础方法 - // ============================== - - /** - * 确保索引存在 - */ - public static function ensureIndex(): bool - { - if (!ZincSearchBase::indexExists(self::$indexName)) { - $mappings = [ - 'properties' => [ - 'key' => ['type' => 'keyword', 'index' => true], - 'value' => ['type' => 'text', 'index' => true], - 'created_at' => ['type' => 'date', 'index' => true], - 'updated_at' => ['type' => 'date', 'index' => true] - ] - ]; - $result = ZincSearchBase::createIndex(self::$indexName, $mappings); - return $result['success'] ?? false; - } - return true; - } - - /** - * 清空所有键值 - * - * @return bool 是否成功 - */ - public static function clear(): bool - { - // 检查索引是否存在 - if (!ZincSearchBase::indexExists(self::$indexName)) { - return true; - } - - // 删除再重建索引 - $deleteResult = ZincSearchBase::deleteIndex(self::$indexName); - if (!($deleteResult['success'] ?? false)) { - return false; - } - - return self::ensureIndex(); - } - - // ============================== - // 基本操作 - // ============================== - - /** - * 设置键值 - * - * @param string $key 键名 - * @param mixed $value 值 - * @param bool $merge 是否合并现有数据(如果值是数组) - * @return bool 是否成功 - */ - public static function set(string $key, mixed $value, bool $merge = false): bool - { - if (!self::ensureIndex()) { - return false; - } - - // 检查键是否已存在 - if ($merge && is_array($value)) { - $existingData = self::get($key); - if (is_array($existingData)) { - $value = array_merge($existingData, $value); - } - } - - // 检查是否存在相同键的文档 - 使用精确查询而不是普通搜索 - $searchParams = [ - 'search_type' => 'term', - 'query' => [ - 'field' => 'key', - 'term' => $key - ], - 'from' => 0, - 'max_results' => 1 - ]; - - $result = ZincSearchBase::advancedSearch(self::$indexName, $searchParams); - $docs = $result['data']['hits']['hits'] ?? []; - $now = date('c'); - - if (!empty($docs)) { - $docId = $docs[0]['_id'] ?? null; - if ($docId) { - // 更新现有文档 - $docData = [ - 'key' => $key, - 'value' => $value, - 'updated_at' => $now - ]; - $updateResult = ZincSearchBase::updateDoc(self::$indexName, $docId, $docData); - return $updateResult['success'] ?? false; - } - } - - // 创建新文档 - $docData = [ - 'key' => $key, - 'value' => $value, - 'created_at' => $now, - 'updated_at' => $now - ]; - $addResult = ZincSearchBase::addDoc(self::$indexName, $docData); - return $addResult['success'] ?? false; - } - - /** - * 获取键值 - * - * @param string $key 键名 - * @param mixed $default 默认值 - * @return mixed 值或默认值 - */ - public static function get(string $key, mixed $default = null): mixed - { - if (!self::ensureIndex() || empty($key)) { - return $default; - } - - // 精确匹配键名 - $searchParams = [ - 'search_type' => 'term', - 'query' => [ - 'field' => 'key', - 'term' => $key - ], - 'from' => 0, - 'max_results' => 1 - ]; - - $result = ZincSearchBase::advancedSearch(self::$indexName, $searchParams); - if (!($result['success'] ?? false)) { - return $default; - } - - $hits = $result['data']['hits']['hits'] ?? []; - if (empty($hits)) { - return $default; - } - - return $hits[0]['_source']['value'] ?? $default; - } - - /** - * 删除键值 - * - * @param string $key 键名 - * @return bool 是否成功 - */ - public static function delete(string $key): bool - { - if (!self::ensureIndex() || empty($key)) { - return false; - } - - // 查找文档ID - $searchParams = [ - 'search_type' => 'term', - 'query' => [ - 'field' => 'key', - 'term' => $key - ], - 'from' => 0, - 'max_results' => 1 - ]; - - $result = ZincSearchBase::advancedSearch(self::$indexName, $searchParams); - if (!($result['success'] ?? false)) { - return false; - } - - $hits = $result['data']['hits']['hits'] ?? []; - if (empty($hits)) { - return true; // 不存在视为删除成功 - } - - $docId = $hits[0]['_id'] ?? null; - if (empty($docId)) { - return false; - } - - $deleteResult = ZincSearchBase::deleteDoc(self::$indexName, $docId); - return $deleteResult['success'] ?? false; - } - - // ============================== - // 批量操作 - // ============================== - - /** - * 批量设置键值对 - * - * @param array $keyValues 键值对数组 - * @return bool 是否全部成功 - */ - public static function batchSet(array $keyValues): bool - { - if (!self::ensureIndex() || empty($keyValues)) { - return false; - } - - $docs = []; - $now = date('c'); - - foreach ($keyValues as $key => $value) { - $docs[] = [ - 'key' => $key, - 'value' => $value, - 'created_at' => $now, - 'updated_at' => $now - ]; - } - - $result = ZincSearchBase::addDocs(self::$indexName, $docs); - return $result['success'] ?? false; - } - - /** - * 批量获取键值 - * - * @param array $keys 键名数组 - * @return array 键值对数组 - */ - public static function batchGet(array $keys): array - { - if (!self::ensureIndex() || empty($keys)) { - return []; - } - - $results = []; - - // 遍历查询每个键 - foreach ($keys as $key) { - $results[$key] = self::get($key); - } - - return $results; - } -} diff --git a/app/Observers/WebSocketDialogMsgObserver.php b/app/Observers/WebSocketDialogMsgObserver.php index 244fd27e6..10c5c06e0 100644 --- a/app/Observers/WebSocketDialogMsgObserver.php +++ b/app/Observers/WebSocketDialogMsgObserver.php @@ -6,7 +6,6 @@ use App\Models\WebSocketDialogMsg; use App\Module\Apps; use App\Module\Manticore\ManticoreMsg; use App\Tasks\ManticoreSyncTask; -use App\Tasks\ZincSearchSyncTask; class WebSocketDialogMsgObserver extends AbstractObserver { @@ -18,9 +17,6 @@ class WebSocketDialogMsgObserver extends AbstractObserver */ public function created(WebSocketDialogMsg $webSocketDialogMsg) { - // ZincSearch 同步 - self::taskDeliver(new ZincSearchSyncTask('sync', $webSocketDialogMsg->toArray())); - // Manticore 同步(仅在安装 Manticore 且符合索引条件时) if (Apps::isInstalled('manticore') && ManticoreMsg::shouldIndex($webSocketDialogMsg)) { self::taskDeliver(new ManticoreSyncTask('msg_sync', ['msg_id' => $webSocketDialogMsg->id])); @@ -35,9 +31,6 @@ class WebSocketDialogMsgObserver extends AbstractObserver */ public function updated(WebSocketDialogMsg $webSocketDialogMsg) { - // ZincSearch 同步 - self::taskDeliver(new ZincSearchSyncTask('sync', $webSocketDialogMsg->toArray())); - // Manticore 同步(更新可能使消息符合或不再符合索引条件,由 sync 方法处理) if (Apps::isInstalled('manticore')) { self::taskDeliver(new ManticoreSyncTask('msg_sync', ['msg_id' => $webSocketDialogMsg->id])); @@ -52,9 +45,6 @@ class WebSocketDialogMsgObserver extends AbstractObserver */ public function deleted(WebSocketDialogMsg $webSocketDialogMsg) { - // ZincSearch 删除 - self::taskDeliver(new ZincSearchSyncTask('delete', $webSocketDialogMsg->toArray())); - // Manticore 删除 if (Apps::isInstalled('manticore')) { self::taskDeliver(new ManticoreSyncTask('msg_delete', ['msg_id' => $webSocketDialogMsg->id])); diff --git a/app/Observers/WebSocketDialogUserObserver.php b/app/Observers/WebSocketDialogUserObserver.php index 7ed29705d..4c42a54b8 100644 --- a/app/Observers/WebSocketDialogUserObserver.php +++ b/app/Observers/WebSocketDialogUserObserver.php @@ -7,7 +7,6 @@ use App\Models\UserBot; use App\Models\WebSocketDialogUser; use App\Module\Apps; use App\Tasks\ManticoreSyncTask; -use App\Tasks\ZincSearchSyncTask; use Carbon\Carbon; class WebSocketDialogUserObserver extends AbstractObserver @@ -33,7 +32,6 @@ class WebSocketDialogUserObserver extends AbstractObserver } } Deleted::forget('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid); - self::taskDeliver(new ZincSearchSyncTask('userSync', $webSocketDialogUser->toArray())); // Manticore: 更新对话下所有消息的 allowed_users if (Apps::isInstalled('manticore')) { @@ -57,7 +55,7 @@ class WebSocketDialogUserObserver extends AbstractObserver */ public function updated(WebSocketDialogUser $webSocketDialogUser) { - self::taskDeliver(new ZincSearchSyncTask('userSync', $webSocketDialogUser->toArray())); + // } /** @@ -69,7 +67,6 @@ class WebSocketDialogUserObserver extends AbstractObserver public function deleted(WebSocketDialogUser $webSocketDialogUser) { Deleted::record('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid); - self::taskDeliver(new ZincSearchSyncTask('deleteUser', $webSocketDialogUser->toArray())); // Manticore: 更新对话下所有消息的 allowed_users if (Apps::isInstalled('manticore')) { diff --git a/app/Tasks/ZincSearchSyncTask.php b/app/Tasks/ZincSearchSyncTask.php deleted file mode 100644 index 904f2f829..000000000 --- a/app/Tasks/ZincSearchSyncTask.php +++ /dev/null @@ -1,88 +0,0 @@ -action = $action; - $this->data = $data; - } - - public function start() - { - if (!Apps::isInstalled("search")) { - // 如果没有安装搜索模块,则不执行 - return; - } - - switch ($this->action) { - case 'sync': - // 同步消息数据 - ZincSearchDialogMsg::sync(WebSocketDialogMsg::fillInstance($this->data)); - break; - - case 'delete': - // 删除消息数据 - ZincSearchDialogMsg::delete(WebSocketDialogMsg::fillInstance($this->data)); - break; - - case 'userSync': - // 同步用户数据 - ZincSearchDialogMsg::userSync(WebSocketDialogUser::fillInstance($this->data)); - break; - - case 'deleteUser': - // 删除用户数据 - ZincSearchDialogMsg::delete(WebSocketDialogUser::fillInstance($this->data)); - break; - - default: - // 增量更新 - $this->incrementalUpdate(); - break; - } - } - - /** - * 增量更新 - * @return void - */ - private function incrementalUpdate() - { - // 120分钟执行一次 - $time = intval(Cache::get("ZincSearchSyncTask:Time")); - if (time() - $time < 120 * 60) { - return; - } - - // 执行开始,120分钟后缓存标记失效 - Cache::put("ZincSearchSyncTask:Time", time(), Carbon::now()->addMinutes(120)); - - // 开始执行同步 - @shell_exec("php /var/www/artisan zinc:sync-user-msg --i"); - - // 执行完成,5分钟后缓存标记失效(5分钟任务可重复执行) - Cache::put("ZincSearchSyncTask:Time", time(), Carbon::now()->addMinutes(5)); - } - - public function end() - { - } -}