[ // 拓展数据 'dialog_userid' => ['type' => 'keyword', 'index' => true], // 对话ID+用户ID 'to_userid' => ['type' => 'numeric', 'index' => true], // 此消息发给的用户ID // 消息数据 'id' => ['type' => 'numeric', 'index' => true], 'dialog_id' => ['type' => 'numeric', 'index' => true], 'dialog_type' => ['type' => 'keyword', 'index' => true], 'session_id' => ['type' => 'numeric', 'index' => true], 'userid' => ['type' => 'numeric', 'index' => true], 'type' => ['type' => 'keyword', 'index' => true], 'key' => ['type' => 'text', 'index' => true], 'created_at' => ['type' => 'date', 'index' => true], 'updated_at' => ['type' => 'date', 'index' => true], ] ]; $result = ZincSearchBase::createIndex(self::$indexNameMsg, $mappings); return $result['success'] ?? false; } if (!ZincSearchBase::indexExists(self::$indexNameUser)) { $mappings = [ 'properties' => [ // 拓展数据 'dialog_userid' => ['type' => 'keyword', 'index' => true], // 对话ID+用户ID // 用户数据 'id' => ['type' => 'numeric', 'index' => true], 'dialog_id' => ['type' => 'numeric', 'index' => true], 'userid' => ['type' => 'numeric', 'index' => true], 'top_at' => ['type' => 'date', 'index' => true], 'last_at' => ['type' => 'date', 'index' => true], 'mark_unread' => ['type' => 'numeric', 'index' => true], 'silence' => ['type' => 'numeric', 'index' => true], 'hide' => ['type' => 'numeric', 'index' => true], 'color' => ['type' => 'keyword', 'index' => true], 'created_at' => ['type' => 'date', 'index' => true], 'updated_at' => ['type' => 'date', 'index' => true], ] ]; $result = ZincSearchBase::createIndex(self::$indexNameUser, $mappings); return $result['success'] ?? false; } return true; } /** * 清空所有键值 * * @return bool 是否成功 */ public static function clear(): bool { // 检查索引是否存在然后删除 if (ZincSearchBase::indexExists(self::$indexNameMsg)) { $deleteResult = ZincSearchBase::deleteIndex(self::$indexNameMsg); if (!($deleteResult['success'] ?? false)) { return false; } } if (ZincSearchBase::indexExists(self::$indexNameUser)) { $deleteResult = ZincSearchBase::deleteIndex(self::$indexNameUser); if (!($deleteResult['success'] ?? false)) { return false; } } return self::ensureIndex(); } // ============================== // 搜索方法 // ============================== /** * 根据用户ID和消息关键词搜索会话 * * @param string $userid 用户ID * @param string $keyword 消息关键词 * @param int $from 起始位置 * @param int $size 返回结果数量 * @return array */ public static function search(string $userid, string $keyword, int $from = 0, int $size = 20): array { if (!Apps::isInstalled("search")) { // 如果搜索功能未安装,使用数据库查询 return self::searchByMysql($userid, $keyword, $from, $size); } $searchParams = [ 'query' => [ 'bool' => [ 'must' => [ ['term' => ['to_userid' => $userid]], ['match_phrase' => ['key' => $keyword]] ] ] ], 'from' => $from, 'size' => $size, 'sort' => [ ['updated_at' => 'desc'] ] ]; try { $result = ZincSearchBase::elasticSearch(self::$indexNameMsg, $searchParams); $hits = $result['data']['hits']['hits'] ?? []; // 收集所有的用户信息 $dialogUserids = []; foreach ($hits as $hit) { $source = $hit['_source']; $dialogUserids[] = $source['dialog_userid']; } $userInfos = self::searchUser(array_unique($dialogUserids)); // 组合返回结果,将用户信息合并到消息中 $msgs = []; foreach ($hits as $hit) { $msgInfo = $hit['_source']; $userInfo = $userInfos[$msgInfo['dialog_userid']] ?? []; if ($userInfo) { $msgs[] = [ 'id' => $msgInfo['dialog_id'], 'search_msg_id' => $msgInfo['id'], 'user_at' => Carbon::parse($msgInfo['updated_at'])->format('Y-m-d H:i:s'), 'mark_unread' => $userInfo['mark_unread'], 'silence' => $userInfo['silence'], 'hide' => $userInfo['hide'], 'color' => $userInfo['color'], 'top_at' => Carbon::parse($userInfo['top_at'])->format('Y-m-d H:i:s'), 'last_at' => Carbon::parse($userInfo['last_at'])->format('Y-m-d H:i:s'), ]; } } return $msgs; } catch (\Exception $e) { Log::error('search: ' . $e->getMessage()); return []; } } /** * 根据用户ID和消息关键词搜索会话(MySQL 版本,主要用于未安装ZincSearch的情况) * * @param string $userid 用户ID * @param string $keyword 消息关键词 * @param int $from 起始位置 * @param int $size 返回结果数量 * @return array */ private static function searchByMysql(string $userid, string $keyword, int $from = 0, int $size = 20): array { $items = DB::table('web_socket_dialog_users as u') ->select(['d.*', 'u.top_at', 'u.last_at', 'u.mark_unread', 'u.silence', 'u.hide', 'u.color', 'u.updated_at as user_at', 'm.id as search_msg_id']) ->join('web_socket_dialogs as d', 'u.dialog_id', '=', 'd.id') ->join('web_socket_dialog_msgs as m', 'm.dialog_id', '=', 'd.id') ->where('u.userid', $userid) ->where('m.bot', 0) ->whereNull('d.deleted_at') ->where('m.key', 'like', "%{$keyword}%") ->orderByDesc('m.id') ->offset($from) ->limit($size) ->get() ->all(); $msgs = []; foreach ($items as $item) { $msgs[] = [ 'id' => $item->id, 'search_msg_id' => $item->search_msg_id, 'user_at' => Carbon::parse($item->user_at)->format('Y-m-d H:i:s'), 'mark_unread' => $item->mark_unread, 'silence' => $item->silence, 'hide' => $item->hide, 'color' => $item->color, 'top_at' => Carbon::parse($item->top_at)->format('Y-m-d H:i:s'), 'last_at' => Carbon::parse($item->last_at)->format('Y-m-d H:i:s'), ]; } return $msgs; } /** * 根据对话用户ID搜索用户信息 * @param array $dialogUserids * @return array */ private static function searchUser(array $dialogUserids): array { if (empty($dialogUserids)) { return []; } $userInfos = []; // 构建用户查询条件 $userSearchParams = [ 'query' => [ 'bool' => [ 'should' => [] ] ], 'size' => count($dialogUserids) // 确保取到所有符合条件的记录 ]; // 添加所有 dialog_userid 到查询条件 foreach ($dialogUserids as $dialogUserid) { $userSearchParams['query']['bool']['should'][] = [ 'term' => ['dialog_userid' => $dialogUserid] ]; } // 查询用户信息 $userResult = ZincSearchBase::elasticSearch(self::$indexNameUser, $userSearchParams); $userHits = $userResult['data']['hits']['hits'] ?? []; // 以 dialog_userid 为键保存用户信息 foreach ($userHits as $userHit) { $userSource = $userHit['_source']; $userInfos[$userSource['dialog_userid']] = $userSource; } return $userInfos; } // ============================== // 生成内容 // ============================== /** * 生成 dialog_userid * * @param WebSocketDialogUser $dialogUser * @return string */ private static function generateDialogUserid(WebSocketDialogUser $dialogUser): string { return "{$dialogUser->dialog_id}_{$dialogUser->userid}"; } /** * 生成文档内容 * * @param WebSocketDialogMsg $dialogMsg * @param WebSocketDialogUser $dialogUser * @return array */ private static function generateMsgData(WebSocketDialogMsg $dialogMsg, WebSocketDialogUser $dialogUser): array { return [ '_id' => self::$indexNameMsg . "_" . $dialogMsg->id . "_" . $dialogUser->userid, 'dialog_userid' => self::generateDialogUserid($dialogUser), 'to_userid' => $dialogUser->userid, 'id' => $dialogMsg->id, 'dialog_id' => $dialogMsg->dialog_id, 'dialog_type' => $dialogMsg->dialog_type, 'session_id' => $dialogMsg->session_id, 'userid' => $dialogMsg->userid, 'type' => $dialogMsg->type, 'key' => $dialogMsg->key, 'created_at' => $dialogMsg->created_at, 'updated_at' => $dialogMsg->updated_at, ]; } private static function generateUserData(WebSocketDialogUser $dialogUser): array { return [ '_id' => self::$indexNameUser . "_" . $dialogUser->id, 'dialog_userid' => self::generateDialogUserid($dialogUser), 'id' => $dialogUser->id, 'dialog_id' => $dialogUser->dialog_id, 'userid' => $dialogUser->userid, 'top_at' => $dialogUser->top_at, 'last_at' => $dialogUser->last_at, 'mark_unread' => $dialogUser->mark_unread, 'silence' => $dialogUser->silence, 'hide' => $dialogUser->hide, 'color' => $dialogUser->color, 'created_at' => $dialogUser->created_at, 'updated_at' => $dialogUser->updated_at, ]; } // ============================== // 基本方法 // ============================== /** * 同步消息(建议在异步进程中使用) * * @param WebSocketDialogMsg $dialogMsg * @return bool */ public static function sync(WebSocketDialogMsg $dialogMsg): bool { if (!self::ensureIndex()) { return false; } if ($dialogMsg->bot) { // 如果是机器人消息,跳过 return true; } try { // 获取此会话的所有用户 $dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get(); if ($dialogUsers->isEmpty()) { return true; } $msgs = []; $users = []; foreach ($dialogUsers as $dialogUser) { if (empty($dialogMsg->key)) { // 如果消息没有关键词,跳过 continue; } if ($dialogUser->userid == 0) { // 跳过系统用户 continue; } $msgs[] = self::generateMsgData($dialogMsg, $dialogUser); $users[$dialogUser->id] = self::generateUserData($dialogUser); } if ($msgs) { // 批量写入消息 ZincSearchBase::addDocs(self::$indexNameMsg, $msgs); } if ($users) { // 批量写入用户 ZincSearchBase::addDocs(self::$indexNameUser, array_values($users)); } return true; } catch (\Exception $e) { Log::error('sync: ' . $e->getMessage()); return false; } } /** * 批量同步消息(建议在异步进程中使用) * * @param WebSocketDialogMsg[] $dialogMsgs * @return int 成功同步的消息数 */ public static function batchSync($dialogMsgs): int { if (!self::ensureIndex()) { return 0; } $count = 0; try { $msgs = []; $users = []; $userDialogs = []; // 预处理:收集所有涉及的对话ID $dialogIds = []; foreach ($dialogMsgs as $dialogMsg) { $dialogIds[] = $dialogMsg->dialog_id; } $dialogIds = array_unique($dialogIds); // 获取所有相关的用户-对话关系 if (!empty($dialogIds)) { $dialogUsers = WebSocketDialogUser::whereIn('dialog_id', $dialogIds)->get(); // 按对话ID组织用户 foreach ($dialogUsers as $dialogUser) { $userDialogs[$dialogUser->dialog_id][] = $dialogUser; } } // 为每条消息准备所有相关用户的文档 foreach ($dialogMsgs as $dialogMsg) { if (!isset($userDialogs[$dialogMsg->dialog_id])) { // 如果该会话没有用户,跳过 continue; } if ($dialogMsg->bot) { // 如果是机器人消息,跳过 continue; } /** @var WebSocketDialogUser $dialogUser */ foreach ($userDialogs[$dialogMsg->dialog_id] as $dialogUser) { if (empty($dialogMsg->key)) { // 如果消息没有关键词,跳过 continue; } if ($dialogUser->userid == 0) { // 跳过系统用户 continue; } $msgs[] = self::generateMsgData($dialogMsg, $dialogUser); $users[$dialogUser->id] = self::generateUserData($dialogUser); $count++; } } if ($msgs) { // 批量写入消息 ZincSearchBase::addDocs(self::$indexNameMsg, $msgs); } if ($users) { // 批量写入用户 ZincSearchBase::addDocs(self::$indexNameUser, array_values($users)); } } catch (\Exception $e) { Log::error('batchSync: ' . $e->getMessage()); } return $count; } /** * 同步用户(建议在异步进程中使用) * @param WebSocketDialogUser $dialogUser * @return bool */ public static function userSync(WebSocketDialogUser $dialogUser): bool { if (!self::ensureIndex()) { return false; } $data = self::generateUserData($dialogUser); // 生成查询用户条件 $searchParams = [ 'query' => [ 'bool' => [ 'must' => [ ['term' => ['dialog_userid' => $data['dialog_userid']]] ] ] ], 'size' => 1 ]; try { // 查询用户是否存在 $result = ZincSearchBase::elasticSearch(self::$indexNameUser, $searchParams); $hits = $result['data']['hits']['hits'] ?? []; // 同步用户(存在更新、不存在添加) $result = ZincSearchBase::addDoc(self::$indexNameUser, $data); if (!isset($result['success'])) { return false; } // 用户不存在,同步消息 if (empty($hits)) { $lastId = 0; // 上次同步的最后ID $batchSize = 500; // 每批处理的消息数量 // 分批同步消息 do { // 获取一批 $dialogMsgs = WebSocketDialogMsg::whereDialogId($dialogUser->dialog_id) ->where('id', '>', $lastId) ->orderBy('id') ->limit($batchSize) ->get(); if ($dialogMsgs->isEmpty()) { break; } // 同步数据 ZincSearchDialogMsg::batchSync($dialogMsgs); // 更新最后ID $lastId = $dialogMsgs->last()->id; } while (count($dialogMsgs) == $batchSize); } return true; } catch (\Exception $e) { Log::error('userSync: ' . $e->getMessage()); return false; } } /** * 删除(建议在异步进程中使用) * * @param WebSocketDialogMsg|WebSocketDialogUser|int $data * @return int */ public static function delete(mixed $data): int { $batchSize = 500; // 每批处理的文档数量 $totalDeleted = 0; // 总共删除的文档数量 $from = 0; // 根据数据类型生成查询条件 if ($data instanceof WebSocketDialogMsg) { $query = [ 'field' => 'id', 'term' => (string) $data->id ]; } elseif ($data instanceof WebSocketDialogUser) { $query = [ 'field' => 'dialog_userid', 'term' => self::generateDialogUserid($data), ]; } else { $query = [ 'field' => 'id', 'term' => (string) $data ]; } try { while (true) { // 根据消息ID查找相关文档 $result = ZincSearchBase::advancedSearch(self::$indexNameMsg, [ 'search_type' => 'term', 'query' => $query, 'from' => $from, 'max_results' => $batchSize ]); $hits = $result['data']['hits']['hits'] ?? []; // 如果没有更多文档,退出循环 if (empty($hits)) { break; } // 删除本批次找到的所有文档 foreach ($hits as $hit) { if (isset($hit['_id'])) { ZincSearchBase::deleteDoc(self::$indexNameMsg, $hit['_id']); $totalDeleted++; } } // 如果返回的文档数少于批次大小,说明已经没有更多文档了 if (count($hits) < $batchSize) { break; } // 移动到下一批 $from += $batchSize; } } catch (\Exception $e) { Log::error('delete: ' . $e->getMessage()); } return $totalDeleted; } }