feat: 新增消息搜索功能

- 新增 msg_vectors 表,支持消息全文/向量/混合搜索
- 采用 MVA 权限方案,allowed_users 内联存储
- 新增 /api/search/message API
- 新增 manticore:sync-msgs 同步命令
- Observer 触发消息创建/更新/删除同步
- Observer 触发对话成员变更时更新 allowed_users
This commit is contained in:
kuaifan 2026-01-02 06:46:18 +00:00
parent c08323e1ea
commit 7a5ef3a491
7 changed files with 1072 additions and 0 deletions

View File

@ -0,0 +1,227 @@
<?php
namespace App\Console\Commands;
use App\Models\WebSocketDialogMsg;
use App\Module\Apps;
use App\Module\Manticore\ManticoreMsg;
use App\Module\Manticore\ManticoreKeyValue;
use Cache;
use Illuminate\Console\Command;
class SyncMsgToManticore extends Command
{
/**
* 更新数据MVA 方案allowed_users 在同步时自动写入)
* --f: 全量更新 (默认)
* --i: 增量更新从上次更新的最后一个ID接上
*
* 清理数据
* --c: 清除索引
*
* 其他选项
* --dialog: 指定对话ID仅同步该对话的消息
*/
protected $signature = 'manticore:sync-msgs {--f} {--i} {--c} {--batch=100} {--dialog=}';
protected $description = '同步消息数据到 Manticore SearchMVA 权限方案)';
/**
* @return int
*/
public function handle(): int
{
if (!Apps::isInstalled("manticore")) {
$this->error("应用「Manticore Search」未安装");
return 1;
}
// 注册信号处理器
if (extension_loaded('pcntl')) {
pcntl_async_signals(true);
pcntl_signal(SIGINT, [$this, 'handleSignal']);
pcntl_signal(SIGTERM, [$this, 'handleSignal']);
}
// 检查锁
$lockInfo = $this->getLock();
if ($lockInfo) {
$this->error("命令已在运行中,开始时间: {$lockInfo['started_at']}");
return 1;
}
$this->setLock();
// 清除索引
if ($this->option('c')) {
$this->info('清除索引...');
ManticoreMsg::clear();
$this->info("索引删除成功");
$this->releaseLock();
return 0;
}
$dialogId = $this->option('dialog') ? intval($this->option('dialog')) : 0;
if ($dialogId > 0) {
$this->info("开始同步对话 {$dialogId} 的消息数据MVA 方案allowed_users 自动内联)...");
$this->syncDialogMsgs($dialogId);
} else {
$this->info('开始同步消息数据MVA 方案allowed_users 自动内联)...');
$this->syncMsgs();
}
$this->info("\n同步完成");
$this->releaseLock();
return 0;
}
private function getLock(): ?array
{
$lockKey = md5($this->signature);
return Cache::has($lockKey) ? Cache::get($lockKey) : null;
}
private function setLock(): void
{
$lockKey = md5($this->signature);
Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 600);
}
private function releaseLock(): void
{
$lockKey = md5($this->signature);
Cache::forget($lockKey);
}
public function handleSignal(int $signal): void
{
$this->releaseLock();
exit(0);
}
/**
* 同步所有消息
*/
private function syncMsgs(): void
{
$lastKey = "sync:manticoreMsgLastId";
$lastId = $this->option('i') ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0;
if ($lastId > 0) {
$this->info("\n增量同步消息数据从ID {$lastId} 开始)...");
} else {
$this->info("\n全量同步消息数据...");
}
// 构建基础查询条件
// 排除:软删除、机器人消息、空 key 消息
// 只包含:可索引的消息类型
$baseQuery = WebSocketDialogMsg::where('id', '>', $lastId)
->whereNull('deleted_at')
->where('bot', '!=', 1)
->whereNotNull('key')
->where('key', '!=', '')
->whereIn('type', ManticoreMsg::INDEXABLE_TYPES);
$num = 0;
$count = $baseQuery->count();
$batchSize = $this->option('batch');
$total = 0;
$lastNum = 0;
do {
$msgs = WebSocketDialogMsg::where('id', '>', $lastId)
->whereNull('deleted_at')
->where('bot', '!=', 1)
->whereNotNull('key')
->where('key', '!=', '')
->whereIn('type', ManticoreMsg::INDEXABLE_TYPES)
->orderBy('id')
->limit($batchSize)
->get();
if ($msgs->isEmpty()) {
break;
}
$num += count($msgs);
$progress = $count > 0 ? round($num / $count * 100, 2) : 100;
if ($progress < 100) {
$progress = number_format($progress, 2);
}
$this->info("{$num}/{$count} ({$progress}%) 正在同步消息ID {$msgs->first()->id} ~ {$msgs->last()->id} ({$total}|{$lastNum})");
$this->setLock();
$lastNum = ManticoreMsg::batchSync($msgs);
$total += $lastNum;
$lastId = $msgs->last()->id;
ManticoreKeyValue::set($lastKey, $lastId);
} while (count($msgs) == $batchSize);
$this->info("同步消息结束 - 最后ID {$lastId}");
$this->info("已索引消息数量: " . ManticoreMsg::getIndexedCount());
}
/**
* 同步指定对话的消息
*
* @param int $dialogId 对话ID
*/
private function syncDialogMsgs(int $dialogId): void
{
$this->info("\n同步对话 {$dialogId} 的消息数据...");
$baseQuery = WebSocketDialogMsg::where('dialog_id', $dialogId)
->whereNull('deleted_at')
->where('bot', '!=', 1)
->whereNotNull('key')
->where('key', '!=', '')
->whereIn('type', ManticoreMsg::INDEXABLE_TYPES);
$num = 0;
$count = $baseQuery->count();
$batchSize = $this->option('batch');
$lastId = 0;
$total = 0;
$lastNum = 0;
do {
$msgs = WebSocketDialogMsg::where('dialog_id', $dialogId)
->where('id', '>', $lastId)
->whereNull('deleted_at')
->where('bot', '!=', 1)
->whereNotNull('key')
->where('key', '!=', '')
->whereIn('type', ManticoreMsg::INDEXABLE_TYPES)
->orderBy('id')
->limit($batchSize)
->get();
if ($msgs->isEmpty()) {
break;
}
$num += count($msgs);
$progress = $count > 0 ? round($num / $count * 100, 2) : 100;
if ($progress < 100) {
$progress = number_format($progress, 2);
}
$this->info("{$num}/{$count} ({$progress}%) 正在同步消息ID {$msgs->first()->id} ~ {$msgs->last()->id} ({$total}|{$lastNum})");
$this->setLock();
$lastNum = ManticoreMsg::batchSync($msgs);
$total += $lastNum;
$lastId = $msgs->last()->id;
} while (count($msgs) == $batchSize);
$this->info("同步对话 {$dialogId} 消息结束");
$this->info("该对话已索引消息数量: " . \App\Module\Manticore\ManticoreBase::getDialogIndexedMsgCount($dialogId));
}
}

View File

@ -5,12 +5,14 @@ namespace App\Http\Controllers\Api;
use Request;
use App\Models\File;
use App\Models\User;
use App\Models\WebSocketDialogMsg;
use App\Module\Base;
use App\Module\Apps;
use App\Module\Manticore\ManticoreFile;
use App\Module\Manticore\ManticoreUser;
use App\Module\Manticore\ManticoreProject;
use App\Module\Manticore\ManticoreTask;
use App\Module\Manticore\ManticoreMsg;
/**
* @apiDefine search
@ -242,5 +244,73 @@ class SearchController extends AbstractController
return Base::retSuccess('success', []);
}
/**
* @api {get} api/search/message AI 搜索消息
*
* @apiDescription 需要token身份需要安装 Manticore Search 应用
* @apiVersion 1.0.0
* @apiGroup search
* @apiName message
*
* @apiParam {String} key 搜索关键词
* @apiParam {String} [search_type] 搜索类型text/vector/hybrid默认hybrid
* @apiParam {Number} [take] 获取数量默认20最大50
*
* @apiSuccess {Number} ret 返回状态码1正确、0错误
* @apiSuccess {String} msg 返回信息(错误描述)
* @apiSuccess {Object} data 返回数据
*/
public function message()
{
$user = User::auth();
if (!Apps::isInstalled('manticore')) {
return Base::retError('Manticore Search 应用未安装');
}
$key = trim(Request::input('key'));
$searchType = Request::input('search_type', 'hybrid');
$take = min(50, max(1, intval(Request::input('take', 20))));
if (empty($key)) {
return Base::retSuccess('success', []);
}
$results = ManticoreMsg::search($user->userid, $key, $searchType, 0, $take);
// 补充消息完整信息
$msgIds = array_column($results, 'msg_id');
if (!empty($msgIds)) {
$msgs = WebSocketDialogMsg::whereIn('id', $msgIds)
->with(['user' => function ($query) {
$query->select(User::$basicField);
}])
->get()
->keyBy('id');
$formattedResults = [];
foreach ($results as $item) {
$msgData = $msgs->get($item['msg_id']);
if ($msgData) {
$formattedResults[] = [
'id' => $msgData->id,
'msg_id' => $msgData->id,
'dialog_id' => $msgData->dialog_id,
'userid' => $msgData->userid,
'type' => $msgData->type,
'msg' => $msgData->msg,
'created_at' => $msgData->created_at,
'user' => $msgData->user,
'relevance' => $item['relevance'] ?? 0,
'content_preview' => $item['content_preview'] ?? null,
];
}
}
return Base::retSuccess('success', $formattedResults);
}
return Base::retSuccess('success', []);
}
}

View File

@ -143,6 +143,21 @@ class ManticoreBase
) charset_table='chinese' morphology='icu_chinese'
");
// 创建消息向量表(含 allowed_users MVA 权限字段)
$pdo->exec("
CREATE TABLE IF NOT EXISTS msg_vectors (
id BIGINT,
msg_id BIGINT,
dialog_id BIGINT,
userid BIGINT,
msg_type STRING,
content TEXT,
allowed_users MULTI,
created_at BIGINT,
content_vector float_vector knn_type='hnsw' knn_dims='1536' hnsw_similarity='cosine'
) charset_table='chinese' morphology='icu_chinese'
");
Log::info('Manticore tables initialized successfully');
} catch (PDOException $e) {
Log::warning('Manticore initialization warning: ' . $e->getMessage());
@ -1493,5 +1508,338 @@ class ManticoreBase
return $result ? (int) $result['cnt'] : 0;
}
// ==============================
// 消息向量方法
// ==============================
/**
* 消息全文搜索(使用 MVA allowed_users 权限过滤)
*
* @param string $keyword 关键词
* @param int $userid 用户ID权限过滤
* @param int $limit 返回数量
* @param int $offset 偏移量
* @return array 搜索结果
*/
public static function msgFullTextSearch(string $keyword, int $userid = 0, int $limit = 20, int $offset = 0): array
{
if (empty($keyword)) {
return [];
}
$instance = new self();
$escapedKeyword = self::escapeMatch($keyword);
if ($userid > 0) {
// 使用 MVA 权限过滤
$sql = "
SELECT
id,
msg_id,
dialog_id,
userid,
msg_type,
content,
created_at,
WEIGHT() as relevance
FROM msg_vectors
WHERE MATCH('@content {$escapedKeyword}')
AND allowed_users = " . (int)$userid . "
ORDER BY relevance DESC
LIMIT " . (int)$limit . " OFFSET " . (int)$offset;
} else {
$sql = "
SELECT
id,
msg_id,
dialog_id,
userid,
msg_type,
content,
created_at,
WEIGHT() as relevance
FROM msg_vectors
WHERE MATCH('@content {$escapedKeyword}')
ORDER BY relevance DESC
LIMIT " . (int)$limit . " OFFSET " . (int)$offset;
}
return $instance->query($sql);
}
/**
* 消息向量搜索(使用 MVA allowed_users 权限过滤)
*
* @param array $queryVector 查询向量
* @param int $userid 用户ID权限过滤
* @param int $limit 返回数量
* @return array 搜索结果
*/
public static function msgVectorSearch(array $queryVector, int $userid = 0, int $limit = 20): array
{
if (empty($queryVector)) {
return [];
}
$instance = new self();
$vectorStr = '(' . implode(',', $queryVector) . ')';
// KNN 搜索需要先获取更多结果,再在应用层过滤权限
$fetchLimit = $userid > 0 ? $limit * 5 : $limit;
$sql = "
SELECT
id,
msg_id,
dialog_id,
userid,
msg_type,
content,
created_at,
KNN_DIST() as distance
FROM msg_vectors
WHERE KNN(content_vector, " . (int)$fetchLimit . ", {$vectorStr})
ORDER BY distance ASC
";
$results = $instance->query($sql);
foreach ($results as &$item) {
$item['similarity'] = 1 - ($item['distance'] ?? 0);
}
// MVA 权限过滤
if ($userid > 0 && !empty($results)) {
$allowedMsgIds = $instance->query(
"SELECT msg_id FROM msg_vectors WHERE allowed_users = ? LIMIT 100000",
[$userid]
);
$allowedIds = array_column($allowedMsgIds, 'msg_id');
$results = array_filter($results, function ($item) use ($allowedIds) {
return in_array($item['msg_id'], $allowedIds);
});
$results = array_values($results);
}
return array_slice($results, 0, $limit);
}
/**
* 消息混合搜索
*
* @param string $keyword 关键词
* @param array $queryVector 查询向量
* @param int $userid 用户ID权限过滤
* @param int $limit 返回数量
* @return array 搜索结果
*/
public static function msgHybridSearch(string $keyword, array $queryVector, int $userid = 0, int $limit = 20): array
{
$textResults = self::msgFullTextSearch($keyword, $userid, 50, 0);
$vectorResults = !empty($queryVector) ? self::msgVectorSearch($queryVector, $userid, 50) : [];
$scores = [];
$items = [];
$k = 60;
foreach ($textResults as $rank => $item) {
$id = $item['msg_id'];
$scores[$id] = ($scores[$id] ?? 0) + 0.5 / ($k + $rank + 1);
$items[$id] = $item;
}
foreach ($vectorResults as $rank => $item) {
$id = $item['msg_id'];
$scores[$id] = ($scores[$id] ?? 0) + 0.5 / ($k + $rank + 1);
if (!isset($items[$id])) {
$items[$id] = $item;
}
}
arsort($scores);
$results = [];
$count = 0;
foreach ($scores as $id => $score) {
if ($count >= $limit) break;
$item = $items[$id];
$item['rrf_score'] = $score;
$results[] = $item;
$count++;
}
return $results;
}
/**
* 插入或更新消息向量(含 allowed_users MVA 权限字段)
*
* @param array $data 消息数据,包含:
* - msg_id: 消息ID
* - dialog_id: 对话ID
* - userid: 发送者ID
* - msg_type: 消息类型
* - content: 消息内容
* - content_vector: 向量值
* - allowed_users: 有权限的用户ID数组
* - created_at: 创建时间戳
* @return bool 是否成功
*/
public static function upsertMsgVector(array $data): bool
{
$instance = new self();
$msgId = $data['msg_id'] ?? 0;
if ($msgId <= 0) {
return false;
}
// 先删除已存在的记录
$instance->execute("DELETE FROM msg_vectors WHERE msg_id = ?", [$msgId]);
// 构建 allowed_users MVA 值
$allowedUsers = $data['allowed_users'] ?? [];
$allowedUsersStr = !empty($allowedUsers) ? '(' . implode(',', array_map('intval', $allowedUsers)) . ')' : '()';
// 插入新记录
$vectorValue = $data['content_vector'] ?? null;
if ($vectorValue) {
$vectorValue = str_replace(['[', ']'], ['(', ')'], $vectorValue);
$sql = "INSERT INTO msg_vectors
(id, msg_id, dialog_id, userid, msg_type, content, allowed_users, created_at, content_vector)
VALUES (?, ?, ?, ?, ?, ?, {$allowedUsersStr}, ?, {$vectorValue})";
} else {
$sql = "INSERT INTO msg_vectors
(id, msg_id, dialog_id, userid, msg_type, content, allowed_users, created_at)
VALUES (?, ?, ?, ?, ?, ?, {$allowedUsersStr}, ?)";
}
$params = [
$msgId,
$msgId,
$data['dialog_id'] ?? 0,
$data['userid'] ?? 0,
$data['msg_type'] ?? 'text',
$data['content'] ?? '',
$data['created_at'] ?? time()
];
return $instance->execute($sql, $params);
}
/**
* 更新对话的 allowed_users 权限列表(批量更新该对话下所有消息)
*
* @param int $dialogId 对话ID
* @param array $userids 有权限的用户ID数组
* @return int 更新的消息数量
*/
public static function updateDialogAllowedUsers(int $dialogId, array $userids): int
{
if ($dialogId <= 0) {
return 0;
}
$instance = new self();
$allowedUsersStr = !empty($userids) ? '(' . implode(',', array_map('intval', $userids)) . ')' : '()';
// Manticore 支持按条件批量更新
return $instance->executeWithRowCount(
"UPDATE msg_vectors SET allowed_users = {$allowedUsersStr} WHERE dialog_id = ?",
[$dialogId]
);
}
/**
* 删除消息向量
*
* @param int $msgId 消息ID
* @return bool 是否成功
*/
public static function deleteMsgVector(int $msgId): bool
{
if ($msgId <= 0) {
return false;
}
$instance = new self();
return $instance->execute("DELETE FROM msg_vectors WHERE msg_id = ?", [$msgId]);
}
/**
* 批量删除对话下的所有消息向量
*
* @param int $dialogId 对话ID
* @return int 删除数量
*/
public static function deleteDialogMsgVectors(int $dialogId): int
{
if ($dialogId <= 0) {
return 0;
}
$instance = new self();
return $instance->executeWithRowCount(
"DELETE FROM msg_vectors WHERE dialog_id = ?",
[$dialogId]
);
}
/**
* 清空所有消息向量
*
* @return bool 是否成功
*/
public static function clearAllMsgVectors(): bool
{
$instance = new self();
return $instance->execute("TRUNCATE TABLE msg_vectors");
}
/**
* 获取已索引的消息数量
*
* @return int 消息数量
*/
public static function getIndexedMsgCount(): int
{
$instance = new self();
$result = $instance->queryOne("SELECT COUNT(*) as cnt FROM msg_vectors");
return $result ? (int) $result['cnt'] : 0;
}
/**
* 获取对话的已索引消息数量
*
* @param int $dialogId 对话ID
* @return int 消息数量
*/
public static function getDialogIndexedMsgCount(int $dialogId): int
{
if ($dialogId <= 0) {
return 0;
}
$instance = new self();
$result = $instance->queryOne(
"SELECT COUNT(*) as cnt FROM msg_vectors WHERE dialog_id = ?",
[$dialogId]
);
return $result ? (int) $result['cnt'] : 0;
}
/**
* 获取最后索引的消息ID
*
* @return int 消息ID
*/
public static function getLastIndexedMsgId(): int
{
$instance = new self();
$result = $instance->queryOne("SELECT MAX(msg_id) as max_id FROM msg_vectors");
return $result ? (int) ($result['max_id'] ?? 0) : 0;
}
}

View File

@ -0,0 +1,360 @@
<?php
namespace App\Module\Manticore;
use App\Models\WebSocketDialogMsg;
use App\Models\WebSocketDialogUser;
use App\Module\Apps;
use App\Module\Base;
use App\Module\AI;
use Illuminate\Support\Facades\Log;
/**
* Manticore Search 消息搜索类MVA 权限方案)
*
* 使用方法:
*
* 1. 搜索方法
* - 搜索消息: search($userid, $keyword, $searchType, $from, $size);
*
* 2. 同步方法
* - 单个同步: sync(WebSocketDialogMsg $msg);
* - 批量同步: batchSync($msgs);
* - 删除索引: delete($msgId);
*
* 3. 权限更新方法
* - 更新对话权限: updateDialogAllowedUsers($dialogId);
*
* 4. 工具方法
* - 清空索引: clear();
* - 判断是否索引: shouldIndex($msg);
*/
class ManticoreMsg
{
/**
* 可索引的消息类型
*/
public const INDEXABLE_TYPES = ['text', 'file', 'record', 'meeting', 'vote'];
/**
* 最大内容长度(字符)
*/
public const MAX_CONTENT_LENGTH = 50000; // 50K 字符
/**
* 判断消息是否应该被索引
*
* @param WebSocketDialogMsg $msg 消息模型
* @return bool 是否应该索引
*/
public static function shouldIndex(WebSocketDialogMsg $msg): bool
{
// 1. 排除机器人消息
if ($msg->bot === 1) {
return false;
}
// 2. 检查消息类型
if (!in_array($msg->type, self::INDEXABLE_TYPES)) {
return false;
}
// 3. 排除 key 为空的消息
if (empty($msg->key)) {
return false;
}
return true;
}
/**
* 搜索消息(支持全文、向量、混合搜索)
*
* @param int $userid 用户ID
* @param string $keyword 搜索关键词
* @param string $searchType 搜索类型: text/vector/hybrid
* @param int $from 起始位置
* @param int $size 返回数量
* @return array 搜索结果
*/
public static function search(int $userid, string $keyword, string $searchType = 'hybrid', int $from = 0, int $size = 20): array
{
if (empty($keyword)) {
return [];
}
if (!Apps::isInstalled("manticore")) {
return [];
}
try {
switch ($searchType) {
case 'text':
// 纯全文搜索
return self::formatSearchResults(
ManticoreBase::msgFullTextSearch($keyword, $userid, $size, $from)
);
case 'vector':
// 纯向量搜索(需要先获取 embedding
$embedding = self::getEmbedding($keyword);
if (empty($embedding)) {
// embedding 获取失败,降级到全文搜索
return self::formatSearchResults(
ManticoreBase::msgFullTextSearch($keyword, $userid, $size, $from)
);
}
return self::formatSearchResults(
ManticoreBase::msgVectorSearch($embedding, $userid, $size)
);
case 'hybrid':
default:
// 混合搜索
$embedding = self::getEmbedding($keyword);
return self::formatSearchResults(
ManticoreBase::msgHybridSearch($keyword, $embedding, $userid, $size)
);
}
} catch (\Exception $e) {
Log::error('Manticore msg search error: ' . $e->getMessage());
return [];
}
}
/**
* 获取文本的 Embedding 向量
*
* @param string $text 文本
* @return array 向量数组(空数组表示失败)
*/
private static function getEmbedding(string $text): array
{
if (empty($text)) {
return [];
}
try {
// 调用 AI 模块获取 embedding
$result = AI::getEmbedding($text);
if (Base::isSuccess($result)) {
return $result['data'] ?? [];
}
} catch (\Exception $e) {
Log::warning('Get embedding error: ' . $e->getMessage());
}
return [];
}
/**
* 格式化搜索结果
*
* @param array $results Manticore 返回的结果
* @return array 格式化后的结果
*/
private static function formatSearchResults(array $results): array
{
$formatted = [];
foreach ($results as $item) {
$formatted[] = [
'id' => $item['msg_id'],
'msg_id' => $item['msg_id'],
'dialog_id' => $item['dialog_id'],
'userid' => $item['userid'],
'msg_type' => $item['msg_type'],
'content_preview' => isset($item['content']) ? mb_substr($item['content'], 0, 200) : null,
'created_at' => $item['created_at'] ?? null,
'relevance' => $item['relevance'] ?? $item['similarity'] ?? $item['rrf_score'] ?? 0,
];
}
return $formatted;
}
// ==============================
// 权限计算方法MVA 方案核心)
// ==============================
/**
* 获取消息的 allowed_users 列表
*
* 对话的所有成员都有权限查看该对话的消息
*
* @param WebSocketDialogMsg $msg 消息模型
* @return array 有权限的用户ID数组
*/
public static function getAllowedUsers(WebSocketDialogMsg $msg): array
{
return self::getDialogUserIds($msg->dialog_id);
}
/**
* 获取对话的所有成员ID
*
* @param int $dialogId 对话ID
* @return array 成员用户ID数组
*/
public static function getDialogUserIds(int $dialogId): array
{
if ($dialogId <= 0) {
return [];
}
return WebSocketDialogUser::where('dialog_id', $dialogId)
->pluck('userid')
->toArray();
}
// ==============================
// 同步方法
// ==============================
/**
* 同步单个消息到 Manticore allowed_users
*
* @param WebSocketDialogMsg $msg 消息模型
* @return bool 是否成功
*/
public static function sync(WebSocketDialogMsg $msg): bool
{
if (!Apps::isInstalled("manticore")) {
return false;
}
// 检查是否应该索引
if (!self::shouldIndex($msg)) {
// 不符合索引条件,尝试删除已存在的索引
return ManticoreBase::deleteMsgVector($msg->id);
}
try {
// 提取消息内容(使用 key 字段)
$content = $msg->key ?? '';
// 限制内容长度
$content = mb_substr($content, 0, self::MAX_CONTENT_LENGTH);
// 获取 embedding如果有内容且 AI 可用)
$embedding = null;
if (!empty($content) && Apps::isInstalled('ai')) {
$embeddingResult = self::getEmbedding($content);
if (!empty($embeddingResult)) {
$embedding = '[' . implode(',', $embeddingResult) . ']';
}
}
// 获取消息的 allowed_users
$allowedUsers = self::getAllowedUsers($msg);
// 写入 Manticore含 allowed_users
$result = ManticoreBase::upsertMsgVector([
'msg_id' => $msg->id,
'dialog_id' => $msg->dialog_id,
'userid' => $msg->userid,
'msg_type' => $msg->type,
'content' => $content,
'content_vector' => $embedding,
'allowed_users' => $allowedUsers,
'created_at' => $msg->created_at ? $msg->created_at->timestamp : time(),
]);
return $result;
} catch (\Exception $e) {
Log::error('Manticore msg sync error: ' . $e->getMessage(), [
'msg_id' => $msg->id,
'dialog_id' => $msg->dialog_id,
]);
return false;
}
}
/**
* 批量同步消息
*
* @param iterable $msgs 消息列表
* @return int 成功同步的数量
*/
public static function batchSync(iterable $msgs): int
{
if (!Apps::isInstalled("manticore")) {
return 0;
}
$count = 0;
foreach ($msgs as $msg) {
if (self::sync($msg)) {
$count++;
}
}
return $count;
}
/**
* 删除消息索引
*
* @param int $msgId 消息ID
* @return bool 是否成功
*/
public static function delete(int $msgId): bool
{
if (!Apps::isInstalled("manticore")) {
return false;
}
return ManticoreBase::deleteMsgVector($msgId);
}
/**
* 清空所有索引
*
* @return bool 是否成功
*/
public static function clear(): bool
{
if (!Apps::isInstalled("manticore")) {
return false;
}
return ManticoreBase::clearAllMsgVectors();
}
/**
* 获取已索引消息数量
*
* @return int 数量
*/
public static function getIndexedCount(): int
{
if (!Apps::isInstalled("manticore")) {
return 0;
}
return ManticoreBase::getIndexedMsgCount();
}
// ==============================
// 权限更新方法MVA 方案)
// ==============================
/**
* 更新对话下所有消息的 allowed_users 权限列表
* MySQL 获取最新的对话成员并更新到 Manticore
*
* @param int $dialogId 对话ID
* @return int 更新的消息数量
*/
public static function updateDialogAllowedUsers(int $dialogId): int
{
if (!Apps::isInstalled("manticore") || $dialogId <= 0) {
return 0;
}
try {
$userids = self::getDialogUserIds($dialogId);
return ManticoreBase::updateDialogAllowedUsers($dialogId, $userids);
} catch (\Exception $e) {
Log::error('Manticore updateDialogAllowedUsers error: ' . $e->getMessage(), ['dialog_id' => $dialogId]);
return 0;
}
}
}

View File

@ -3,6 +3,9 @@
namespace App\Observers;
use App\Models\WebSocketDialogMsg;
use App\Module\Apps;
use App\Module\Manticore\ManticoreMsg;
use App\Tasks\ManticoreSyncTask;
use App\Tasks\ZincSearchSyncTask;
class WebSocketDialogMsgObserver extends AbstractObserver
@ -15,7 +18,13 @@ class WebSocketDialogMsgObserver extends AbstractObserver
*/
public function created(WebSocketDialogMsg $webSocketDialogMsg)
{
// ZincSearch 同步
self::taskDeliver(new ZincSearchSyncTask('sync', $webSocketDialogMsg->toArray()));
// Manticore 同步(仅在安装 Manticore 且符合索引条件时)
if (Apps::isInstalled('manticore') && ManticoreMsg::shouldIndex($webSocketDialogMsg)) {
self::taskDeliver(new ManticoreSyncTask('msg_sync', ['msg_id' => $webSocketDialogMsg->id]));
}
}
/**
@ -26,7 +35,13 @@ class WebSocketDialogMsgObserver extends AbstractObserver
*/
public function updated(WebSocketDialogMsg $webSocketDialogMsg)
{
// ZincSearch 同步
self::taskDeliver(new ZincSearchSyncTask('sync', $webSocketDialogMsg->toArray()));
// Manticore 同步(更新可能使消息符合或不再符合索引条件,由 sync 方法处理)
if (Apps::isInstalled('manticore')) {
self::taskDeliver(new ManticoreSyncTask('msg_sync', ['msg_id' => $webSocketDialogMsg->id]));
}
}
/**
@ -37,7 +52,13 @@ class WebSocketDialogMsgObserver extends AbstractObserver
*/
public function deleted(WebSocketDialogMsg $webSocketDialogMsg)
{
// ZincSearch 删除
self::taskDeliver(new ZincSearchSyncTask('delete', $webSocketDialogMsg->toArray()));
// Manticore 删除
if (Apps::isInstalled('manticore')) {
self::taskDeliver(new ManticoreSyncTask('msg_delete', ['msg_id' => $webSocketDialogMsg->id]));
}
}
/**

View File

@ -5,6 +5,8 @@ namespace App\Observers;
use App\Models\Deleted;
use App\Models\UserBot;
use App\Models\WebSocketDialogUser;
use App\Module\Apps;
use App\Tasks\ManticoreSyncTask;
use App\Tasks\ZincSearchSyncTask;
use Carbon\Carbon;
@ -32,6 +34,14 @@ class WebSocketDialogUserObserver extends AbstractObserver
}
Deleted::forget('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid);
self::taskDeliver(new ZincSearchSyncTask('userSync', $webSocketDialogUser->toArray()));
// Manticore: 更新对话下所有消息的 allowed_users
if (Apps::isInstalled('manticore')) {
self::taskDeliver(new ManticoreSyncTask('update_dialog_allowed_users', [
'dialog_id' => $webSocketDialogUser->dialog_id
]));
}
//
$dialog = $webSocketDialogUser->webSocketDialog;
if ($dialog) {
@ -60,6 +70,14 @@ class WebSocketDialogUserObserver extends AbstractObserver
{
Deleted::record('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid);
self::taskDeliver(new ZincSearchSyncTask('deleteUser', $webSocketDialogUser->toArray()));
// Manticore: 更新对话下所有消息的 allowed_users
if (Apps::isInstalled('manticore')) {
self::taskDeliver(new ManticoreSyncTask('update_dialog_allowed_users', [
'dialog_id' => $webSocketDialogUser->dialog_id
]));
}
//
$dialog = $webSocketDialogUser->webSocketDialog;
if ($dialog) {

View File

@ -6,12 +6,14 @@ use App\Models\File;
use App\Models\User;
use App\Models\Project;
use App\Models\ProjectTask;
use App\Models\WebSocketDialogMsg;
use App\Module\Apps;
use App\Module\Manticore\ManticoreBase;
use App\Module\Manticore\ManticoreFile;
use App\Module\Manticore\ManticoreUser;
use App\Module\Manticore\ManticoreProject;
use App\Module\Manticore\ManticoreTask;
use App\Module\Manticore\ManticoreMsg;
use Carbon\Carbon;
use Illuminate\Support\Facades\Cache;
@ -152,6 +154,31 @@ class ManticoreSyncTask extends AbstractTask
}
break;
// ==============================
// 消息同步动作
// ==============================
case 'msg_sync':
$msg = WebSocketDialogMsg::find($this->data['msg_id'] ?? 0);
if ($msg) {
ManticoreMsg::sync($msg);
}
break;
case 'msg_delete':
$msgId = $this->data['msg_id'] ?? 0;
if ($msgId > 0) {
ManticoreMsg::delete($msgId);
}
break;
case 'update_dialog_allowed_users':
// 更新对话下所有消息的 allowed_users成员变更时调用
$dialogId = $this->data['dialog_id'] ?? 0;
if ($dialogId > 0) {
ManticoreMsg::updateDialogAllowedUsers($dialogId);
}
break;
default:
// 增量更新(定时任务调用)
$this->incrementalUpdate();
@ -181,6 +208,7 @@ class ManticoreSyncTask extends AbstractTask
@shell_exec("php /var/www/artisan manticore:sync-users --i 2>&1 &");
@shell_exec("php /var/www/artisan manticore:sync-projects --i 2>&1 &");
@shell_exec("php /var/www/artisan manticore:sync-tasks --i 2>&1 &");
@shell_exec("php /var/www/artisan manticore:sync-msgs --i 2>&1 &");
// 执行完成
Cache::put("ManticoreSyncTask:Time", time(), Carbon::now()->addMinutes(5));