mirror of
https://github.com/kuaifan/dootask.git
synced 2026-01-21 16:48:13 +00:00
refactor: 移除 ZincSearch,统一使用 Manticore Search
- 删除 ZincSearch 模块、任务、命令 - 对话消息搜索改用 ManticoreMsg::searchDialogs - 移除 Observer 中的 ZincSearch 同步 - 移除定时任务中的 ZincSearch 同步 - 更新项目文档
This commit is contained in:
parent
7a5ef3a491
commit
1e94ce501e
@ -39,7 +39,7 @@
|
|||||||
|
|
||||||
- **Module(`app/Module`)**
|
- **Module(`app/Module`)**
|
||||||
- 承载跨控制器 / 跨模型的业务逻辑与独立功能子域,例如:
|
- 承载跨控制器 / 跨模型的业务逻辑与独立功能子域,例如:
|
||||||
- 外部服务集成:`AgoraIO/*`、`ZincSearch/*` 等;
|
- 外部服务集成:`AgoraIO/*`、`Manticore/*` 等;
|
||||||
- 通用工具:`Lock.php`、`TextExtractor.php`、`Image.php` 等;
|
- 通用工具:`Lock.php`、`TextExtractor.php`、`Image.php` 等;
|
||||||
- 项目 / 任务 / 对话等领域里的复杂协作逻辑。
|
- 项目 / 任务 / 对话等领域里的复杂协作逻辑。
|
||||||
- 原则:
|
- 原则:
|
||||||
@ -88,7 +88,7 @@
|
|||||||
- **偏好(Preferences)**:用户表达持续性偏好时(语言、输出格式、技术选型等),应尽快写入;
|
- **偏好(Preferences)**:用户表达持续性偏好时(语言、输出格式、技术选型等),应尽快写入;
|
||||||
- **流程 / 习惯(Procedures)**:形成「以后都按这个流程来」的稳定开发 / 发布 / 调试流程时,应记录为可复用步骤;
|
- **流程 / 习惯(Procedures)**:形成「以后都按这个流程来」的稳定开发 / 发布 / 调试流程时,应记录为可复用步骤;
|
||||||
- **约束 / 决策(Requirements)**:项目长期有效的决策,如不再支持某版本、某模块的架构约定等;
|
- **约束 / 决策(Requirements)**:项目长期有效的决策,如不再支持某版本、某模块的架构约定等;
|
||||||
- **事实 / 关系(Facts)**:模块边界约定、服务之间的调用关系、与外部系统(如 AgoraIO、ZincSearch)集成方式等。
|
- **事实 / 关系(Facts)**:模块边界约定、服务之间的调用关系、与外部系统(如 AgoraIO、Manticore Search)集成方式等。
|
||||||
- 写入建议:
|
- 写入建议:
|
||||||
- 默认使用 `source: "text"`,在 `episode_body` 中用简洁结构化自然语言描述背景、类型、范围、具体内容;
|
- 默认使用 `source: "text"`,在 `episode_body` 中用简洁结构化自然语言描述背景、类型、范围、具体内容;
|
||||||
- 需要结构化数据时可用 `source: "json"`,保证 `episode_body` 是合法 JSON 字符串;
|
- 需要结构化数据时可用 `source: "json"`,保证 `episode_body` 是合法 JSON 字符串;
|
||||||
|
|||||||
@ -1,175 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
namespace App\Console\Commands;
|
|
||||||
|
|
||||||
use App\Models\WebSocketDialogMsg;
|
|
||||||
use App\Module\Apps;
|
|
||||||
use App\Module\ZincSearch\ZincSearchKeyValue;
|
|
||||||
use App\Module\ZincSearch\ZincSearchDialogMsg;
|
|
||||||
use Cache;
|
|
||||||
use Illuminate\Console\Command;
|
|
||||||
|
|
||||||
class SyncUserMsgToZincSearch extends Command
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* 更新数据
|
|
||||||
* --f: 全量更新 (默认)
|
|
||||||
* --i: 增量更新(从上次更新的最后一个ID接上)
|
|
||||||
*
|
|
||||||
* 清理数据
|
|
||||||
* --c: 清除索引
|
|
||||||
*/
|
|
||||||
|
|
||||||
protected $signature = 'zinc:sync-user-msg {--f} {--i} {--c} {--batch=1000}';
|
|
||||||
protected $description = '同步聊天会话用户和消息到 ZincSearch';
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return int
|
|
||||||
*/
|
|
||||||
public function handle(): int
|
|
||||||
{
|
|
||||||
if (!Apps::isInstalled("search")) {
|
|
||||||
$this->error("应用「ZincSearch」未安装");
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 注册信号处理器(仅在支持pcntl扩展的环境下)
|
|
||||||
if (extension_loaded('pcntl')) {
|
|
||||||
pcntl_async_signals(true); // 启用异步信号处理
|
|
||||||
pcntl_signal(SIGINT, [$this, 'handleSignal']); // Ctrl+C
|
|
||||||
pcntl_signal(SIGTERM, [$this, 'handleSignal']); // kill
|
|
||||||
}
|
|
||||||
|
|
||||||
// 检查锁,如果已被占用则退出
|
|
||||||
$lockInfo = $this->getLock();
|
|
||||||
if ($lockInfo) {
|
|
||||||
$this->error("命令已在运行中,开始时间: {$lockInfo['started_at']}");
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 设置锁
|
|
||||||
$this->setLock();
|
|
||||||
|
|
||||||
// 清除索引
|
|
||||||
if ($this->option('c')) {
|
|
||||||
$this->info('清除索引...');
|
|
||||||
ZincSearchKeyValue::clear();
|
|
||||||
ZincSearchDialogMsg::clear();
|
|
||||||
$this->info("索引删除成功");
|
|
||||||
$this->releaseLock();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->info('开始同步聊天数据...');
|
|
||||||
|
|
||||||
// 同步消息数据
|
|
||||||
$this->syncDialogMsgs();
|
|
||||||
|
|
||||||
// 完成
|
|
||||||
$this->info("\n同步完成");
|
|
||||||
$this->releaseLock();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取锁信息
|
|
||||||
*
|
|
||||||
* @return array|null 如果锁存在返回锁信息,否则返回null
|
|
||||||
*/
|
|
||||||
private function getLock(): ?array
|
|
||||||
{
|
|
||||||
$lockKey = md5($this->signature);
|
|
||||||
return Cache::has($lockKey) ? Cache::get($lockKey) : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 设置锁
|
|
||||||
*/
|
|
||||||
private function setLock(): void
|
|
||||||
{
|
|
||||||
$lockKey = md5($this->signature);
|
|
||||||
$lockInfo = [
|
|
||||||
'started_at' => date('Y-m-d H:i:s')
|
|
||||||
];
|
|
||||||
Cache::put($lockKey, $lockInfo, 300); // 5分钟
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 释放锁
|
|
||||||
*/
|
|
||||||
private function releaseLock(): void
|
|
||||||
{
|
|
||||||
$lockKey = md5($this->signature);
|
|
||||||
Cache::forget($lockKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理终端信号
|
|
||||||
*
|
|
||||||
* @param int $signal
|
|
||||||
* @return void
|
|
||||||
*/
|
|
||||||
public function handleSignal(int $signal): void
|
|
||||||
{
|
|
||||||
// 释放锁
|
|
||||||
$this->releaseLock();
|
|
||||||
exit(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 同步消息数据
|
|
||||||
*
|
|
||||||
* @return void
|
|
||||||
*/
|
|
||||||
private function syncDialogMsgs(): void
|
|
||||||
{
|
|
||||||
// 获取上次同步的最后ID
|
|
||||||
$lastKey = "sync:dialogUserMsgLastId";
|
|
||||||
$lastId = $this->option('i') ? intval(ZincSearchKeyValue::get($lastKey, 0)) : 0;
|
|
||||||
|
|
||||||
if ($lastId > 0) {
|
|
||||||
$this->info("\n同步消息数据({$lastId})...");
|
|
||||||
} else {
|
|
||||||
$this->info("\n同步消息数据...");
|
|
||||||
}
|
|
||||||
|
|
||||||
$num = 0;
|
|
||||||
$count = WebSocketDialogMsg::where('id', '>', $lastId)->count();
|
|
||||||
$batchSize = $this->option('batch');
|
|
||||||
|
|
||||||
$total = 0;
|
|
||||||
$lastNum = 0;
|
|
||||||
|
|
||||||
do {
|
|
||||||
// 获取一批
|
|
||||||
$dialogMsgs = WebSocketDialogMsg::where('id', '>', $lastId)
|
|
||||||
->orderBy('id')
|
|
||||||
->limit($batchSize)
|
|
||||||
->get();
|
|
||||||
|
|
||||||
if ($dialogMsgs->isEmpty()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
$num += count($dialogMsgs);
|
|
||||||
$progress = round($num / $count * 100, 2);
|
|
||||||
if ($progress < 100) {
|
|
||||||
$progress = number_format($progress, 2);
|
|
||||||
}
|
|
||||||
$this->info("{$num}/{$count} ({$progress}%) 正在同步消息ID {$dialogMsgs->first()->id} ~ {$dialogMsgs->last()->id} ({$total}|{$lastNum})");
|
|
||||||
|
|
||||||
// 刷新锁
|
|
||||||
$this->setLock();
|
|
||||||
|
|
||||||
// 同步数据
|
|
||||||
$lastNum = ZincSearchDialogMsg::batchSync($dialogMsgs);
|
|
||||||
$total += $lastNum;
|
|
||||||
|
|
||||||
// 更新最后ID
|
|
||||||
$lastId = $dialogMsgs->last()->id;
|
|
||||||
ZincSearchKeyValue::set($lastKey, $lastId);
|
|
||||||
} while (count($dialogMsgs) == $batchSize);
|
|
||||||
|
|
||||||
$this->info("同步消息结束 - 最后ID {$lastId}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -32,7 +32,7 @@ use App\Models\WebSocketDialogMsgTranslate;
|
|||||||
use App\Models\WebSocketDialogSession;
|
use App\Models\WebSocketDialogSession;
|
||||||
use App\Models\UserRecentItem;
|
use App\Models\UserRecentItem;
|
||||||
use App\Module\Table\OnlineData;
|
use App\Module\Table\OnlineData;
|
||||||
use App\Module\ZincSearch\ZincSearchDialogMsg;
|
use App\Module\Manticore\ManticoreMsg;
|
||||||
use Hhxsv5\LaravelS\Swoole\Task\Task;
|
use Hhxsv5\LaravelS\Swoole\Task\Task;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -155,7 +155,7 @@ class DialogController extends AbstractController
|
|||||||
}
|
}
|
||||||
// 搜索消息会话
|
// 搜索消息会话
|
||||||
if (count($list) < $take) {
|
if (count($list) < $take) {
|
||||||
$searchResults = ZincSearchDialogMsg::search($user->userid, $key, 0, $take - count($list));
|
$searchResults = ManticoreMsg::searchDialogs($user->userid, $key, 0, $take - count($list));
|
||||||
if ($searchResults) {
|
if ($searchResults) {
|
||||||
foreach ($searchResults as $item) {
|
foreach ($searchResults as $item) {
|
||||||
if ($dialog = WebSocketDialog::find($item['id'])) {
|
if ($dialog = WebSocketDialog::find($item['id'])) {
|
||||||
@ -726,7 +726,7 @@ class DialogController extends AbstractController
|
|||||||
} else {
|
} else {
|
||||||
// 搜索消息
|
// 搜索消息
|
||||||
$list = [];
|
$list = [];
|
||||||
$searchResults = ZincSearchDialogMsg::search($user->userid, $key, 0, Base::getPaginate(50, 20, 'take'));
|
$searchResults = ManticoreMsg::searchDialogs($user->userid, $key, 0, Base::getPaginate(50, 20, 'take'));
|
||||||
if ($searchResults) {
|
if ($searchResults) {
|
||||||
foreach ($searchResults as $item) {
|
foreach ($searchResults as $item) {
|
||||||
if ($dialog = WebSocketDialog::find($item['id'])) {
|
if ($dialog = WebSocketDialog::find($item['id'])) {
|
||||||
|
|||||||
@ -21,7 +21,6 @@ use App\Tasks\AutoArchivedTask;
|
|||||||
use App\Tasks\DeleteBotMsgTask;
|
use App\Tasks\DeleteBotMsgTask;
|
||||||
use App\Tasks\CheckinRemindTask;
|
use App\Tasks\CheckinRemindTask;
|
||||||
use App\Tasks\CloseMeetingRoomTask;
|
use App\Tasks\CloseMeetingRoomTask;
|
||||||
use App\Tasks\ZincSearchSyncTask;
|
|
||||||
use App\Tasks\ManticoreSyncTask;
|
use App\Tasks\ManticoreSyncTask;
|
||||||
use App\Tasks\UnclaimedTaskRemindTask;
|
use App\Tasks\UnclaimedTaskRemindTask;
|
||||||
use Hhxsv5\LaravelS\Swoole\Task\Task;
|
use Hhxsv5\LaravelS\Swoole\Task\Task;
|
||||||
@ -272,8 +271,6 @@ class IndexController extends InvokeController
|
|||||||
Task::deliver(new UnclaimedTaskRemindTask());
|
Task::deliver(new UnclaimedTaskRemindTask());
|
||||||
// 关闭会议室
|
// 关闭会议室
|
||||||
Task::deliver(new CloseMeetingRoomTask());
|
Task::deliver(new CloseMeetingRoomTask());
|
||||||
// ZincSearch 同步
|
|
||||||
Task::deliver(new ZincSearchSyncTask());
|
|
||||||
// Manticore Search 同步
|
// Manticore Search 同步
|
||||||
Task::deliver(new ManticoreSyncTask());
|
Task::deliver(new ManticoreSyncTask());
|
||||||
|
|
||||||
|
|||||||
@ -54,7 +54,6 @@ class Apps
|
|||||||
'office' => 'OnlyOffice',
|
'office' => 'OnlyOffice',
|
||||||
'drawio' => 'Drawio',
|
'drawio' => 'Drawio',
|
||||||
'minder' => 'Minder',
|
'minder' => 'Minder',
|
||||||
'search' => 'ZincSearch',
|
|
||||||
'manticore' => 'Manticore Search',
|
'manticore' => 'Manticore Search',
|
||||||
default => $appId,
|
default => $appId,
|
||||||
};
|
};
|
||||||
|
|||||||
@ -7,6 +7,8 @@ use App\Models\WebSocketDialogUser;
|
|||||||
use App\Module\Apps;
|
use App\Module\Apps;
|
||||||
use App\Module\Base;
|
use App\Module\Base;
|
||||||
use App\Module\AI;
|
use App\Module\AI;
|
||||||
|
use Carbon\Carbon;
|
||||||
|
use DB;
|
||||||
use Illuminate\Support\Facades\Log;
|
use Illuminate\Support\Facades\Log;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -171,6 +173,145 @@ class ManticoreMsg
|
|||||||
return $formatted;
|
return $formatted;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 按对话搜索消息(用于对话列表搜索)
|
||||||
|
*
|
||||||
|
* 返回包含匹配消息的对话列表,每个对话只返回一次
|
||||||
|
* 当 Manticore 未安装时,回退到 MySQL LIKE 搜索
|
||||||
|
*
|
||||||
|
* @param int $userid 用户ID
|
||||||
|
* @param string $keyword 搜索关键词
|
||||||
|
* @param int $from 起始位置
|
||||||
|
* @param int $size 返回数量
|
||||||
|
* @return array 对话列表
|
||||||
|
*/
|
||||||
|
public static function searchDialogs(int $userid, string $keyword, int $from = 0, int $size = 20): array
|
||||||
|
{
|
||||||
|
if (empty($keyword)) {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
// 未安装 Manticore 时使用 MySQL 回退搜索
|
||||||
|
if (!Apps::isInstalled("manticore")) {
|
||||||
|
return self::searchDialogsByMysql($userid, $keyword, $from, $size);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 使用全文搜索获取更多结果,然后按对话分组
|
||||||
|
$results = ManticoreBase::msgFullTextSearch($keyword, $userid, 100, 0);
|
||||||
|
|
||||||
|
if (empty($results)) {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
// 收集所有对话ID
|
||||||
|
$dialogIds = array_unique(array_column($results, 'dialog_id'));
|
||||||
|
|
||||||
|
// 获取用户在这些对话中的信息
|
||||||
|
$dialogUsers = WebSocketDialogUser::where('userid', $userid)
|
||||||
|
->whereIn('dialog_id', $dialogIds)
|
||||||
|
->get()
|
||||||
|
->keyBy('dialog_id');
|
||||||
|
|
||||||
|
// 按对话分组,每个对话只保留最相关的消息
|
||||||
|
$msgs = [];
|
||||||
|
$seenDialogs = [];
|
||||||
|
foreach ($results as $item) {
|
||||||
|
$dialogId = $item['dialog_id'];
|
||||||
|
|
||||||
|
// 每个对话只取第一条(最相关的)
|
||||||
|
if (isset($seenDialogs[$dialogId])) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
$seenDialogs[$dialogId] = true;
|
||||||
|
|
||||||
|
// 获取用户在该对话的信息
|
||||||
|
$dialogUser = $dialogUsers->get($dialogId);
|
||||||
|
if (!$dialogUser) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
$msgs[] = [
|
||||||
|
'id' => $dialogId,
|
||||||
|
'search_msg_id' => $item['msg_id'],
|
||||||
|
'user_at' => $dialogUser->updated_at ? Carbon::parse($dialogUser->updated_at)->format('Y-m-d H:i:s') : null,
|
||||||
|
'mark_unread' => $dialogUser->mark_unread,
|
||||||
|
'silence' => $dialogUser->silence,
|
||||||
|
'hide' => $dialogUser->hide,
|
||||||
|
'color' => $dialogUser->color,
|
||||||
|
'top_at' => $dialogUser->top_at ? Carbon::parse($dialogUser->top_at)->format('Y-m-d H:i:s') : null,
|
||||||
|
'last_at' => $dialogUser->last_at ? Carbon::parse($dialogUser->last_at)->format('Y-m-d H:i:s') : null,
|
||||||
|
];
|
||||||
|
|
||||||
|
// 已达到需要的数量
|
||||||
|
if (count($msgs) >= $from + $size) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 应用分页
|
||||||
|
return array_slice($msgs, $from, $size);
|
||||||
|
} catch (\Exception $e) {
|
||||||
|
Log::error('Manticore searchDialogs error: ' . $e->getMessage());
|
||||||
|
// 出错时回退到 MySQL 搜索
|
||||||
|
return self::searchDialogsByMysql($userid, $keyword, $from, $size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MySQL 回退搜索(按对话搜索消息)
|
||||||
|
*
|
||||||
|
* 通过联表查询获取用户有权限的对话中匹配的消息
|
||||||
|
*
|
||||||
|
* @param int $userid 用户ID
|
||||||
|
* @param string $keyword 搜索关键词
|
||||||
|
* @param int $from 起始位置
|
||||||
|
* @param int $size 返回数量
|
||||||
|
* @return array 对话列表
|
||||||
|
*/
|
||||||
|
private static function searchDialogsByMysql(int $userid, string $keyword, int $from = 0, int $size = 20): array
|
||||||
|
{
|
||||||
|
$items = DB::table('web_socket_dialog_users as u')
|
||||||
|
->select([
|
||||||
|
'd.*',
|
||||||
|
'u.top_at',
|
||||||
|
'u.last_at',
|
||||||
|
'u.mark_unread',
|
||||||
|
'u.silence',
|
||||||
|
'u.hide',
|
||||||
|
'u.color',
|
||||||
|
'u.updated_at as user_at',
|
||||||
|
'm.id as search_msg_id'
|
||||||
|
])
|
||||||
|
->join('web_socket_dialogs as d', 'u.dialog_id', '=', 'd.id')
|
||||||
|
->join('web_socket_dialog_msgs as m', 'm.dialog_id', '=', 'd.id')
|
||||||
|
->where('u.userid', $userid)
|
||||||
|
->where('m.bot', 0)
|
||||||
|
->whereNull('d.deleted_at')
|
||||||
|
->where('m.key', 'like', "%{$keyword}%")
|
||||||
|
->orderByDesc('m.id')
|
||||||
|
->offset($from)
|
||||||
|
->limit($size)
|
||||||
|
->get()
|
||||||
|
->all();
|
||||||
|
|
||||||
|
$msgs = [];
|
||||||
|
foreach ($items as $item) {
|
||||||
|
$msgs[] = [
|
||||||
|
'id' => $item->id,
|
||||||
|
'search_msg_id' => $item->search_msg_id,
|
||||||
|
'user_at' => Carbon::parse($item->user_at)->format('Y-m-d H:i:s'),
|
||||||
|
'mark_unread' => $item->mark_unread,
|
||||||
|
'silence' => $item->silence,
|
||||||
|
'hide' => $item->hide,
|
||||||
|
'color' => $item->color,
|
||||||
|
'top_at' => Carbon::parse($item->top_at)->format('Y-m-d H:i:s'),
|
||||||
|
'last_at' => Carbon::parse($item->last_at)->format('Y-m-d H:i:s'),
|
||||||
|
];
|
||||||
|
}
|
||||||
|
return $msgs;
|
||||||
|
}
|
||||||
|
|
||||||
// ==============================
|
// ==============================
|
||||||
// 权限计算方法(MVA 方案核心)
|
// 权限计算方法(MVA 方案核心)
|
||||||
// ==============================
|
// ==============================
|
||||||
|
|||||||
@ -1,267 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
namespace App\Module\ZincSearch;
|
|
||||||
|
|
||||||
use App\Module\Apps;
|
|
||||||
use App\Module\Doo;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* ZincSearch 公共类
|
|
||||||
*/
|
|
||||||
class ZincSearchBase
|
|
||||||
{
|
|
||||||
private mixed $host;
|
|
||||||
private mixed $port;
|
|
||||||
private mixed $user;
|
|
||||||
private mixed $pass;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 构造函数
|
|
||||||
*/
|
|
||||||
public function __construct()
|
|
||||||
{
|
|
||||||
$this->host = env('ZINCSEARCH_HOST', 'search');
|
|
||||||
$this->port = env('ZINCSEARCH_PORT', '4080');
|
|
||||||
$this->user = env('DB_USERNAME', '');
|
|
||||||
$this->pass = env('DB_PASSWORD', '');
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 通用请求方法
|
|
||||||
*/
|
|
||||||
private function request($path, $body = null, $method = 'POST')
|
|
||||||
{
|
|
||||||
if (!Apps::isInstalled("search")) {
|
|
||||||
return [
|
|
||||||
'success' => false,
|
|
||||||
'error' => Doo::translate("应用「ZincSearch」未安装")
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
$ch = curl_init();
|
|
||||||
curl_setopt($ch, CURLOPT_URL, "http://{$this->host}:{$this->port}{$path}");
|
|
||||||
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method);
|
|
||||||
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
|
|
||||||
curl_setopt($ch, CURLOPT_USERPWD, $this->user . ':' . $this->pass);
|
|
||||||
curl_setopt($ch, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
|
|
||||||
|
|
||||||
$headers = ['Content-Type: application/json'];
|
|
||||||
if ($method === 'BULK') {
|
|
||||||
$headers = ['Content-Type: text/plain'];
|
|
||||||
}
|
|
||||||
|
|
||||||
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
|
|
||||||
if ($body !== null) {
|
|
||||||
curl_setopt($ch, CURLOPT_POSTFIELDS, $body);
|
|
||||||
}
|
|
||||||
$result = curl_exec($ch);
|
|
||||||
$error = curl_error($ch);
|
|
||||||
$status = curl_getinfo($ch, CURLINFO_HTTP_CODE);
|
|
||||||
curl_close($ch);
|
|
||||||
if ($error) {
|
|
||||||
return ['success' => false, 'error' => $error];
|
|
||||||
}
|
|
||||||
$data = json_decode($result, true);
|
|
||||||
return [
|
|
||||||
'success' => $status >= 200 && $status < 300,
|
|
||||||
'status' => $status,
|
|
||||||
'data' => $data
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
// ==============================
|
|
||||||
// 索引管理相关方法
|
|
||||||
// ==============================
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 创建索引
|
|
||||||
*/
|
|
||||||
public static function createIndex($index, $mappings = []): array
|
|
||||||
{
|
|
||||||
$body = json_encode([
|
|
||||||
'name' => $index,
|
|
||||||
'mappings' => $mappings
|
|
||||||
]);
|
|
||||||
return (new self())->request("/api/index", $body);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取索引信息
|
|
||||||
*/
|
|
||||||
public static function getIndex($index): array
|
|
||||||
{
|
|
||||||
return (new self())->request("/api/index/{$index}", null, 'GET');
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 判断索引是否存在
|
|
||||||
*/
|
|
||||||
public static function indexExists($index): bool
|
|
||||||
{
|
|
||||||
$result = self::getIndex($index);
|
|
||||||
return $result['success'] && isset($result['data']['name']);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取所有索引
|
|
||||||
*/
|
|
||||||
public static function listIndices(): array
|
|
||||||
{
|
|
||||||
return (new self())->request("/api/index", null, 'GET');
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 删除索引
|
|
||||||
*/
|
|
||||||
public static function deleteIndex($index): array
|
|
||||||
{
|
|
||||||
return (new self())->request("/api/index/{$index}", null, 'DELETE');
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 删除所有索引
|
|
||||||
*/
|
|
||||||
public static function deleteAllIndices(): array
|
|
||||||
{
|
|
||||||
$instance = new self();
|
|
||||||
$result = $instance->request("/api/index", null, 'GET');
|
|
||||||
|
|
||||||
if (!$result['success']) {
|
|
||||||
return $result;
|
|
||||||
}
|
|
||||||
|
|
||||||
$indices = $result['data'] ?? [];
|
|
||||||
$deleteResults = [];
|
|
||||||
$success = true;
|
|
||||||
|
|
||||||
foreach ($indices as $index) {
|
|
||||||
$indexName = $index['name'] ?? '';
|
|
||||||
if (!empty($indexName)) {
|
|
||||||
$deleteResult = $instance->request("/api/index/{$indexName}", null, 'DELETE');
|
|
||||||
$deleteResults[$indexName] = $deleteResult;
|
|
||||||
|
|
||||||
if (!$deleteResult['success']) {
|
|
||||||
$success = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return [
|
|
||||||
'success' => $success,
|
|
||||||
'message' => $success ? '所有索引删除成功' : '部分索引删除失败',
|
|
||||||
'details' => $deleteResults
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 分析文本
|
|
||||||
*/
|
|
||||||
public static function analyze($analyzer, $text): array
|
|
||||||
{
|
|
||||||
$body = json_encode([
|
|
||||||
'analyzer' => $analyzer,
|
|
||||||
'text' => $text
|
|
||||||
]);
|
|
||||||
return (new self())->request("/api/_analyze", $body);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ==============================
|
|
||||||
// 文档管理相关方法
|
|
||||||
// ==============================
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 写入单条文档
|
|
||||||
*/
|
|
||||||
public static function addDoc($index, $doc): array
|
|
||||||
{
|
|
||||||
$body = json_encode($doc);
|
|
||||||
return (new self())->request("/api/{$index}/_doc", $body);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 更新文档
|
|
||||||
*/
|
|
||||||
public static function updateDoc($index, $id, $doc): array
|
|
||||||
{
|
|
||||||
$body = json_encode($doc);
|
|
||||||
return (new self())->request("/api/{$index}/_update/{$id}", $body);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 删除文档
|
|
||||||
*/
|
|
||||||
public static function deleteDoc($index, $id): array
|
|
||||||
{
|
|
||||||
return (new self())->request("/api/{$index}/_doc/{$id}", null, 'DELETE');
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 批量写入文档
|
|
||||||
*/
|
|
||||||
public static function addDocs($index, $docs): array
|
|
||||||
{
|
|
||||||
$body = json_encode([
|
|
||||||
'index' => $index,
|
|
||||||
'records' => $docs
|
|
||||||
]);
|
|
||||||
return (new self())->request("/api/_bulkv2", $body);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 使用原始BULK API批量写入文档
|
|
||||||
* 请求格式为Elasticsearch兼容格式
|
|
||||||
*/
|
|
||||||
public static function bulkDocs($data): array
|
|
||||||
{
|
|
||||||
return (new self())->request("/api/_bulk", $data, 'BULK');
|
|
||||||
}
|
|
||||||
|
|
||||||
// ==============================
|
|
||||||
// 搜索相关方法
|
|
||||||
// ==============================
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 查询文档
|
|
||||||
*/
|
|
||||||
public static function search($index, $query, $from = 0, $size = 10): array
|
|
||||||
{
|
|
||||||
$searchParams = [
|
|
||||||
'search_type' => 'match',
|
|
||||||
'query' => [
|
|
||||||
'term' => $query
|
|
||||||
],
|
|
||||||
'from' => $from,
|
|
||||||
'max_results' => $size
|
|
||||||
];
|
|
||||||
|
|
||||||
$body = json_encode($searchParams);
|
|
||||||
return (new self())->request("/api/{$index}/_search", $body);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 高级查询文档
|
|
||||||
*/
|
|
||||||
public static function advancedSearch($index, $searchParams): array
|
|
||||||
{
|
|
||||||
$body = json_encode($searchParams);
|
|
||||||
return (new self())->request("/api/{$index}/_search", $body);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 兼容ES查询文档
|
|
||||||
*/
|
|
||||||
public static function elasticSearch($index, $searchParams): array
|
|
||||||
{
|
|
||||||
$body = json_encode($searchParams);
|
|
||||||
return (new self())->request("/es/{$index}/_search", $body);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 多索引查询
|
|
||||||
*/
|
|
||||||
public static function multiSearch($queries): array
|
|
||||||
{
|
|
||||||
$body = json_encode($queries);
|
|
||||||
return (new self())->request("/api/_msearch", $body);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,612 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
namespace App\Module\ZincSearch;
|
|
||||||
|
|
||||||
use App\Models\WebSocketDialogMsg;
|
|
||||||
use App\Models\WebSocketDialogUser;
|
|
||||||
use App\Module\Apps;
|
|
||||||
use Carbon\Carbon;
|
|
||||||
use DB;
|
|
||||||
use Illuminate\Support\Facades\Log;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* ZincSearch 会话消息类
|
|
||||||
*
|
|
||||||
* 使用方法:
|
|
||||||
*
|
|
||||||
* 1. 基础方法
|
|
||||||
* - 清空所有数据: clear();
|
|
||||||
*
|
|
||||||
* 2. 搜索方法
|
|
||||||
* - 关键词搜索: search('用户ID', '关键词');
|
|
||||||
*
|
|
||||||
* 3. 基本方法
|
|
||||||
* - 单个同步: sync(WebSocketDialogMsg $dialogMsg);
|
|
||||||
* - 批量同步: batchSync(WebSocketDialogMsg[] $dialogMsgs);
|
|
||||||
* - 用户同步: userSync(WebSocketDialogUser $dialogUser);
|
|
||||||
* - 删除消息: delete(WebSocketDialogMsg|WebSocketDialogUser|int $data);
|
|
||||||
*/
|
|
||||||
class ZincSearchDialogMsg
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* 索引名称
|
|
||||||
*/
|
|
||||||
protected static string $indexNameMsg = 'dialogMsg';
|
|
||||||
protected static string $indexNameUser = 'dialogUser';
|
|
||||||
|
|
||||||
// ==============================
|
|
||||||
// 基础方法
|
|
||||||
// ==============================
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 确保索引存在
|
|
||||||
*/
|
|
||||||
private static function ensureIndex(): bool
|
|
||||||
{
|
|
||||||
if (!ZincSearchBase::indexExists(self::$indexNameMsg)) {
|
|
||||||
$mappings = [
|
|
||||||
'properties' => [
|
|
||||||
// 拓展数据
|
|
||||||
'dialog_userid' => ['type' => 'keyword', 'index' => true], // 对话ID+用户ID
|
|
||||||
'to_userid' => ['type' => 'numeric', 'index' => true], // 此消息发给的用户ID
|
|
||||||
|
|
||||||
// 消息数据
|
|
||||||
'id' => ['type' => 'numeric', 'index' => true],
|
|
||||||
'dialog_id' => ['type' => 'numeric', 'index' => true],
|
|
||||||
'dialog_type' => ['type' => 'keyword', 'index' => true],
|
|
||||||
'session_id' => ['type' => 'numeric', 'index' => true],
|
|
||||||
'userid' => ['type' => 'numeric', 'index' => true],
|
|
||||||
'type' => ['type' => 'keyword', 'index' => true],
|
|
||||||
'key' => ['type' => 'text', 'index' => true],
|
|
||||||
'created_at' => ['type' => 'date', 'index' => true],
|
|
||||||
'updated_at' => ['type' => 'date', 'index' => true],
|
|
||||||
]
|
|
||||||
];
|
|
||||||
$result = ZincSearchBase::createIndex(self::$indexNameMsg, $mappings);
|
|
||||||
return $result['success'] ?? false;
|
|
||||||
}
|
|
||||||
if (!ZincSearchBase::indexExists(self::$indexNameUser)) {
|
|
||||||
$mappings = [
|
|
||||||
'properties' => [
|
|
||||||
// 拓展数据
|
|
||||||
'dialog_userid' => ['type' => 'keyword', 'index' => true], // 对话ID+用户ID
|
|
||||||
|
|
||||||
// 用户数据
|
|
||||||
'id' => ['type' => 'numeric', 'index' => true],
|
|
||||||
'dialog_id' => ['type' => 'numeric', 'index' => true],
|
|
||||||
'userid' => ['type' => 'numeric', 'index' => true],
|
|
||||||
'top_at' => ['type' => 'date', 'index' => true],
|
|
||||||
'last_at' => ['type' => 'date', 'index' => true],
|
|
||||||
'mark_unread' => ['type' => 'numeric', 'index' => true],
|
|
||||||
'silence' => ['type' => 'numeric', 'index' => true],
|
|
||||||
'hide' => ['type' => 'numeric', 'index' => true],
|
|
||||||
'color' => ['type' => 'keyword', 'index' => true],
|
|
||||||
'created_at' => ['type' => 'date', 'index' => true],
|
|
||||||
'updated_at' => ['type' => 'date', 'index' => true],
|
|
||||||
]
|
|
||||||
];
|
|
||||||
$result = ZincSearchBase::createIndex(self::$indexNameUser, $mappings);
|
|
||||||
return $result['success'] ?? false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 清空所有键值
|
|
||||||
*
|
|
||||||
* @return bool 是否成功
|
|
||||||
*/
|
|
||||||
public static function clear(): bool
|
|
||||||
{
|
|
||||||
// 检查索引是否存在然后删除
|
|
||||||
if (ZincSearchBase::indexExists(self::$indexNameMsg)) {
|
|
||||||
$deleteResult = ZincSearchBase::deleteIndex(self::$indexNameMsg);
|
|
||||||
if (!($deleteResult['success'] ?? false)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (ZincSearchBase::indexExists(self::$indexNameUser)) {
|
|
||||||
$deleteResult = ZincSearchBase::deleteIndex(self::$indexNameUser);
|
|
||||||
if (!($deleteResult['success'] ?? false)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return self::ensureIndex();
|
|
||||||
}
|
|
||||||
|
|
||||||
// ==============================
|
|
||||||
// 搜索方法
|
|
||||||
// ==============================
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 根据用户ID和消息关键词搜索会话
|
|
||||||
*
|
|
||||||
* @param string $userid 用户ID
|
|
||||||
* @param string $keyword 消息关键词
|
|
||||||
* @param int $from 起始位置
|
|
||||||
* @param int $size 返回结果数量
|
|
||||||
* @return array
|
|
||||||
*/
|
|
||||||
public static function search(string $userid, string $keyword, int $from = 0, int $size = 20): array
|
|
||||||
{
|
|
||||||
if (!Apps::isInstalled("search")) {
|
|
||||||
// 如果搜索功能未安装,使用数据库查询
|
|
||||||
return self::searchByMysql($userid, $keyword, $from, $size);
|
|
||||||
}
|
|
||||||
|
|
||||||
$searchParams = [
|
|
||||||
'query' => [
|
|
||||||
'bool' => [
|
|
||||||
'must' => [
|
|
||||||
['term' => ['to_userid' => $userid]],
|
|
||||||
['match_phrase' => ['key' => $keyword]]
|
|
||||||
]
|
|
||||||
]
|
|
||||||
],
|
|
||||||
'from' => $from,
|
|
||||||
'size' => $size,
|
|
||||||
'sort' => [
|
|
||||||
['updated_at' => 'desc']
|
|
||||||
]
|
|
||||||
];
|
|
||||||
try {
|
|
||||||
$result = ZincSearchBase::elasticSearch(self::$indexNameMsg, $searchParams);
|
|
||||||
$hits = $result['data']['hits']['hits'] ?? [];
|
|
||||||
|
|
||||||
// 收集所有的用户信息
|
|
||||||
$dialogUserids = [];
|
|
||||||
foreach ($hits as $hit) {
|
|
||||||
$source = $hit['_source'];
|
|
||||||
$dialogUserids[] = $source['dialog_userid'];
|
|
||||||
}
|
|
||||||
$userInfos = self::searchUser(array_unique($dialogUserids));
|
|
||||||
|
|
||||||
// 组合返回结果,将用户信息合并到消息中
|
|
||||||
$msgs = [];
|
|
||||||
foreach ($hits as $hit) {
|
|
||||||
$msgInfo = $hit['_source'];
|
|
||||||
$userInfo = $userInfos[$msgInfo['dialog_userid']] ?? [];
|
|
||||||
if ($userInfo) {
|
|
||||||
$msgs[] = [
|
|
||||||
'id' => $msgInfo['dialog_id'],
|
|
||||||
'search_msg_id' => $msgInfo['id'],
|
|
||||||
'user_at' => Carbon::parse($msgInfo['updated_at'])->format('Y-m-d H:i:s'),
|
|
||||||
|
|
||||||
'mark_unread' => $userInfo['mark_unread'],
|
|
||||||
'silence' => $userInfo['silence'],
|
|
||||||
'hide' => $userInfo['hide'],
|
|
||||||
'color' => $userInfo['color'],
|
|
||||||
'top_at' => Carbon::parse($userInfo['top_at'])->format('Y-m-d H:i:s'),
|
|
||||||
'last_at' => Carbon::parse($userInfo['last_at'])->format('Y-m-d H:i:s'),
|
|
||||||
];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return $msgs;
|
|
||||||
} catch (\Exception $e) {
|
|
||||||
Log::error('search: ' . $e->getMessage());
|
|
||||||
return [];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 根据用户ID和消息关键词搜索会话(MySQL 版本,主要用于未安装ZincSearch的情况)
|
|
||||||
*
|
|
||||||
* @param string $userid 用户ID
|
|
||||||
* @param string $keyword 消息关键词
|
|
||||||
* @param int $from 起始位置
|
|
||||||
* @param int $size 返回结果数量
|
|
||||||
* @return array
|
|
||||||
*/
|
|
||||||
private static function searchByMysql(string $userid, string $keyword, int $from = 0, int $size = 20): array
|
|
||||||
{
|
|
||||||
$items = DB::table('web_socket_dialog_users as u')
|
|
||||||
->select(['d.*', 'u.top_at', 'u.last_at', 'u.mark_unread', 'u.silence', 'u.hide', 'u.color', 'u.updated_at as user_at', 'm.id as search_msg_id'])
|
|
||||||
->join('web_socket_dialogs as d', 'u.dialog_id', '=', 'd.id')
|
|
||||||
->join('web_socket_dialog_msgs as m', 'm.dialog_id', '=', 'd.id')
|
|
||||||
->where('u.userid', $userid)
|
|
||||||
->where('m.bot', 0)
|
|
||||||
->whereNull('d.deleted_at')
|
|
||||||
->where('m.key', 'like', "%{$keyword}%")
|
|
||||||
->orderByDesc('m.id')
|
|
||||||
->offset($from)
|
|
||||||
->limit($size)
|
|
||||||
->get()
|
|
||||||
->all();
|
|
||||||
$msgs = [];
|
|
||||||
foreach ($items as $item) {
|
|
||||||
$msgs[] = [
|
|
||||||
'id' => $item->id,
|
|
||||||
'search_msg_id' => $item->search_msg_id,
|
|
||||||
'user_at' => Carbon::parse($item->user_at)->format('Y-m-d H:i:s'),
|
|
||||||
|
|
||||||
'mark_unread' => $item->mark_unread,
|
|
||||||
'silence' => $item->silence,
|
|
||||||
'hide' => $item->hide,
|
|
||||||
'color' => $item->color,
|
|
||||||
'top_at' => Carbon::parse($item->top_at)->format('Y-m-d H:i:s'),
|
|
||||||
'last_at' => Carbon::parse($item->last_at)->format('Y-m-d H:i:s'),
|
|
||||||
];
|
|
||||||
}
|
|
||||||
return $msgs;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 根据对话用户ID搜索用户信息
|
|
||||||
* @param array $dialogUserids
|
|
||||||
* @return array
|
|
||||||
*/
|
|
||||||
private static function searchUser(array $dialogUserids): array
|
|
||||||
{
|
|
||||||
if (empty($dialogUserids)) {
|
|
||||||
return [];
|
|
||||||
}
|
|
||||||
|
|
||||||
$userInfos = [];
|
|
||||||
|
|
||||||
// 构建用户查询条件
|
|
||||||
$userSearchParams = [
|
|
||||||
'query' => [
|
|
||||||
'bool' => [
|
|
||||||
'should' => []
|
|
||||||
]
|
|
||||||
],
|
|
||||||
'size' => count($dialogUserids) // 确保取到所有符合条件的记录
|
|
||||||
];
|
|
||||||
|
|
||||||
// 添加所有 dialog_userid 到查询条件
|
|
||||||
foreach ($dialogUserids as $dialogUserid) {
|
|
||||||
$userSearchParams['query']['bool']['should'][] = [
|
|
||||||
'term' => ['dialog_userid' => $dialogUserid]
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
// 查询用户信息
|
|
||||||
$userResult = ZincSearchBase::elasticSearch(self::$indexNameUser, $userSearchParams);
|
|
||||||
$userHits = $userResult['data']['hits']['hits'] ?? [];
|
|
||||||
|
|
||||||
// 以 dialog_userid 为键保存用户信息
|
|
||||||
foreach ($userHits as $userHit) {
|
|
||||||
$userSource = $userHit['_source'];
|
|
||||||
$userInfos[$userSource['dialog_userid']] = $userSource;
|
|
||||||
}
|
|
||||||
|
|
||||||
return $userInfos;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ==============================
|
|
||||||
// 生成内容
|
|
||||||
// ==============================
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 生成 dialog_userid
|
|
||||||
*
|
|
||||||
* @param WebSocketDialogUser $dialogUser
|
|
||||||
* @return string
|
|
||||||
*/
|
|
||||||
private static function generateDialogUserid(WebSocketDialogUser $dialogUser): string
|
|
||||||
{
|
|
||||||
return "{$dialogUser->dialog_id}_{$dialogUser->userid}";
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 生成文档内容
|
|
||||||
*
|
|
||||||
* @param WebSocketDialogMsg $dialogMsg
|
|
||||||
* @param WebSocketDialogUser $dialogUser
|
|
||||||
* @return array
|
|
||||||
*/
|
|
||||||
private static function generateMsgData(WebSocketDialogMsg $dialogMsg, WebSocketDialogUser $dialogUser): array
|
|
||||||
{
|
|
||||||
return [
|
|
||||||
'_id' => self::$indexNameMsg . "_" . $dialogMsg->id . "_" . $dialogUser->userid,
|
|
||||||
'dialog_userid' => self::generateDialogUserid($dialogUser),
|
|
||||||
'to_userid' => $dialogUser->userid,
|
|
||||||
|
|
||||||
'id' => $dialogMsg->id,
|
|
||||||
'dialog_id' => $dialogMsg->dialog_id,
|
|
||||||
'dialog_type' => $dialogMsg->dialog_type,
|
|
||||||
'session_id' => $dialogMsg->session_id,
|
|
||||||
'userid' => $dialogMsg->userid,
|
|
||||||
'type' => $dialogMsg->type,
|
|
||||||
'key' => $dialogMsg->key,
|
|
||||||
'created_at' => $dialogMsg->created_at,
|
|
||||||
'updated_at' => $dialogMsg->updated_at,
|
|
||||||
];
|
|
||||||
}
|
|
||||||
private static function generateUserData(WebSocketDialogUser $dialogUser): array
|
|
||||||
{
|
|
||||||
return [
|
|
||||||
'_id' => self::$indexNameUser . "_" . $dialogUser->id,
|
|
||||||
'dialog_userid' => self::generateDialogUserid($dialogUser),
|
|
||||||
|
|
||||||
'id' => $dialogUser->id,
|
|
||||||
'dialog_id' => $dialogUser->dialog_id,
|
|
||||||
'userid' => $dialogUser->userid,
|
|
||||||
'top_at' => $dialogUser->top_at,
|
|
||||||
'last_at' => $dialogUser->last_at,
|
|
||||||
'mark_unread' => $dialogUser->mark_unread,
|
|
||||||
'silence' => $dialogUser->silence,
|
|
||||||
'hide' => $dialogUser->hide,
|
|
||||||
'color' => $dialogUser->color,
|
|
||||||
'created_at' => $dialogUser->created_at,
|
|
||||||
'updated_at' => $dialogUser->updated_at,
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
// ==============================
|
|
||||||
// 基本方法
|
|
||||||
// ==============================
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 同步消息(建议在异步进程中使用)
|
|
||||||
*
|
|
||||||
* @param WebSocketDialogMsg $dialogMsg
|
|
||||||
* @return bool
|
|
||||||
*/
|
|
||||||
public static function sync(WebSocketDialogMsg $dialogMsg): bool
|
|
||||||
{
|
|
||||||
if (!self::ensureIndex()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($dialogMsg->bot) {
|
|
||||||
// 如果是机器人消息,跳过
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
// 获取此会话的所有用户
|
|
||||||
$dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get();
|
|
||||||
|
|
||||||
if ($dialogUsers->isEmpty()) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
$msgs = [];
|
|
||||||
$users = [];
|
|
||||||
foreach ($dialogUsers as $dialogUser) {
|
|
||||||
if (empty($dialogMsg->key)) {
|
|
||||||
// 如果消息没有关键词,跳过
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if ($dialogUser->userid == 0) {
|
|
||||||
// 跳过系统用户
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
$msgs[] = self::generateMsgData($dialogMsg, $dialogUser);
|
|
||||||
$users[$dialogUser->id] = self::generateUserData($dialogUser);
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($msgs) {
|
|
||||||
// 批量写入消息
|
|
||||||
ZincSearchBase::addDocs(self::$indexNameMsg, $msgs);
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($users) {
|
|
||||||
// 批量写入用户
|
|
||||||
ZincSearchBase::addDocs(self::$indexNameUser, array_values($users));
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
} catch (\Exception $e) {
|
|
||||||
Log::error('sync: ' . $e->getMessage());
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 批量同步消息(建议在异步进程中使用)
|
|
||||||
*
|
|
||||||
* @param WebSocketDialogMsg[] $dialogMsgs
|
|
||||||
* @return int 成功同步的消息数
|
|
||||||
*/
|
|
||||||
public static function batchSync($dialogMsgs): int
|
|
||||||
{
|
|
||||||
if (!self::ensureIndex()) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
$count = 0;
|
|
||||||
try {
|
|
||||||
$msgs = [];
|
|
||||||
$users = [];
|
|
||||||
$userDialogs = [];
|
|
||||||
|
|
||||||
// 预处理:收集所有涉及的对话ID
|
|
||||||
$dialogIds = [];
|
|
||||||
foreach ($dialogMsgs as $dialogMsg) {
|
|
||||||
$dialogIds[] = $dialogMsg->dialog_id;
|
|
||||||
}
|
|
||||||
$dialogIds = array_unique($dialogIds);
|
|
||||||
|
|
||||||
// 获取所有相关的用户-对话关系
|
|
||||||
if (!empty($dialogIds)) {
|
|
||||||
$dialogUsers = WebSocketDialogUser::whereIn('dialog_id', $dialogIds)->get();
|
|
||||||
|
|
||||||
// 按对话ID组织用户
|
|
||||||
foreach ($dialogUsers as $dialogUser) {
|
|
||||||
$userDialogs[$dialogUser->dialog_id][] = $dialogUser;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 为每条消息准备所有相关用户的文档
|
|
||||||
foreach ($dialogMsgs as $dialogMsg) {
|
|
||||||
if (!isset($userDialogs[$dialogMsg->dialog_id])) {
|
|
||||||
// 如果该会话没有用户,跳过
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if ($dialogMsg->bot) {
|
|
||||||
// 如果是机器人消息,跳过
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
/** @var WebSocketDialogUser $dialogUser */
|
|
||||||
foreach ($userDialogs[$dialogMsg->dialog_id] as $dialogUser) {
|
|
||||||
if (empty($dialogMsg->key)) {
|
|
||||||
// 如果消息没有关键词,跳过
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if ($dialogUser->userid == 0) {
|
|
||||||
// 跳过系统用户
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
$msgs[] = self::generateMsgData($dialogMsg, $dialogUser);
|
|
||||||
$users[$dialogUser->id] = self::generateUserData($dialogUser);
|
|
||||||
$count++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($msgs) {
|
|
||||||
// 批量写入消息
|
|
||||||
ZincSearchBase::addDocs(self::$indexNameMsg, $msgs);
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($users) {
|
|
||||||
// 批量写入用户
|
|
||||||
ZincSearchBase::addDocs(self::$indexNameUser, array_values($users));
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (\Exception $e) {
|
|
||||||
Log::error('batchSync: ' . $e->getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
return $count;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 同步用户(建议在异步进程中使用)
|
|
||||||
* @param WebSocketDialogUser $dialogUser
|
|
||||||
* @return bool
|
|
||||||
*/
|
|
||||||
public static function userSync(WebSocketDialogUser $dialogUser): bool
|
|
||||||
{
|
|
||||||
if (!self::ensureIndex()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
$data = self::generateUserData($dialogUser);
|
|
||||||
|
|
||||||
// 生成查询用户条件
|
|
||||||
$searchParams = [
|
|
||||||
'query' => [
|
|
||||||
'bool' => [
|
|
||||||
'must' => [
|
|
||||||
['term' => ['dialog_userid' => $data['dialog_userid']]]
|
|
||||||
]
|
|
||||||
]
|
|
||||||
],
|
|
||||||
'size' => 1
|
|
||||||
];
|
|
||||||
|
|
||||||
try {
|
|
||||||
// 查询用户是否存在
|
|
||||||
$result = ZincSearchBase::elasticSearch(self::$indexNameUser, $searchParams);
|
|
||||||
$hits = $result['data']['hits']['hits'] ?? [];
|
|
||||||
|
|
||||||
// 同步用户(存在更新、不存在添加)
|
|
||||||
$result = ZincSearchBase::addDoc(self::$indexNameUser, $data);
|
|
||||||
if (!isset($result['success'])) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 用户不存在,同步消息
|
|
||||||
if (empty($hits)) {
|
|
||||||
$lastId = 0; // 上次同步的最后ID
|
|
||||||
$batchSize = 500; // 每批处理的消息数量
|
|
||||||
|
|
||||||
// 分批同步消息
|
|
||||||
do {
|
|
||||||
// 获取一批
|
|
||||||
$dialogMsgs = WebSocketDialogMsg::whereDialogId($dialogUser->dialog_id)
|
|
||||||
->where('id', '>', $lastId)
|
|
||||||
->orderBy('id')
|
|
||||||
->limit($batchSize)
|
|
||||||
->get();
|
|
||||||
|
|
||||||
if ($dialogMsgs->isEmpty()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 同步数据
|
|
||||||
ZincSearchDialogMsg::batchSync($dialogMsgs);
|
|
||||||
|
|
||||||
// 更新最后ID
|
|
||||||
$lastId = $dialogMsgs->last()->id;
|
|
||||||
} while (count($dialogMsgs) == $batchSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
} catch (\Exception $e) {
|
|
||||||
Log::error('userSync: ' . $e->getMessage());
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 删除(建议在异步进程中使用)
|
|
||||||
*
|
|
||||||
* @param WebSocketDialogMsg|WebSocketDialogUser|int $data
|
|
||||||
* @return int
|
|
||||||
*/
|
|
||||||
public static function delete(mixed $data): int
|
|
||||||
{
|
|
||||||
$batchSize = 500; // 每批处理的文档数量
|
|
||||||
$totalDeleted = 0; // 总共删除的文档数量
|
|
||||||
$from = 0;
|
|
||||||
|
|
||||||
// 根据数据类型生成查询条件
|
|
||||||
if ($data instanceof WebSocketDialogMsg) {
|
|
||||||
$query = [
|
|
||||||
'field' => 'id',
|
|
||||||
'term' => (string) $data->id
|
|
||||||
];
|
|
||||||
} elseif ($data instanceof WebSocketDialogUser) {
|
|
||||||
$query = [
|
|
||||||
'field' => 'dialog_userid',
|
|
||||||
'term' => self::generateDialogUserid($data),
|
|
||||||
];
|
|
||||||
} else {
|
|
||||||
$query = [
|
|
||||||
'field' => 'id',
|
|
||||||
'term' => (string) $data
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
while (true) {
|
|
||||||
// 根据消息ID查找相关文档
|
|
||||||
$result = ZincSearchBase::advancedSearch(self::$indexNameMsg, [
|
|
||||||
'search_type' => 'term',
|
|
||||||
'query' => $query,
|
|
||||||
'from' => $from,
|
|
||||||
'max_results' => $batchSize
|
|
||||||
]);
|
|
||||||
$hits = $result['data']['hits']['hits'] ?? [];
|
|
||||||
|
|
||||||
// 如果没有更多文档,退出循环
|
|
||||||
if (empty($hits)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 删除本批次找到的所有文档
|
|
||||||
foreach ($hits as $hit) {
|
|
||||||
if (isset($hit['_id'])) {
|
|
||||||
ZincSearchBase::deleteDoc(self::$indexNameMsg, $hit['_id']);
|
|
||||||
$totalDeleted++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 如果返回的文档数少于批次大小,说明已经没有更多文档了
|
|
||||||
if (count($hits) < $batchSize) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 移动到下一批
|
|
||||||
$from += $batchSize;
|
|
||||||
}
|
|
||||||
} catch (\Exception $e) {
|
|
||||||
Log::error('delete: ' . $e->getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
return $totalDeleted;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,276 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
namespace App\Module\ZincSearch;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* ZincSearch 键值存储类
|
|
||||||
*
|
|
||||||
* 使用方法:
|
|
||||||
*
|
|
||||||
* 1. 基础方法
|
|
||||||
* - 确保索引存在: ensureIndex();
|
|
||||||
* - 清空所有数据: clear();
|
|
||||||
*
|
|
||||||
* 2. 基本操作
|
|
||||||
* - 设置键值: set('site_name', '我的网站');
|
|
||||||
* - 设置复杂数据: set('site_config', ['logo' => 'logo.png', 'theme' => 'dark']);
|
|
||||||
* - 合并现有数据: set('site_config', ['footer' => '版权所有'], true);
|
|
||||||
* - 获取键值: $siteName = get('site_name');
|
|
||||||
* - 获取键值带默认值: $theme = get('theme', 'light');
|
|
||||||
* - 删除键值: delete('temporary_data');
|
|
||||||
*
|
|
||||||
* 3. 批量操作
|
|
||||||
* - 批量设置: batchSet(['user_count' => 100, 'active_users' => 50]);
|
|
||||||
* - 批量获取: $stats = batchGet(['user_count', 'active_users']);
|
|
||||||
*/
|
|
||||||
class ZincSearchKeyValue
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* 索引名称
|
|
||||||
*/
|
|
||||||
protected static string $indexName = 'keyValue';
|
|
||||||
|
|
||||||
// ==============================
|
|
||||||
// 基础方法
|
|
||||||
// ==============================
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 确保索引存在
|
|
||||||
*/
|
|
||||||
public static function ensureIndex(): bool
|
|
||||||
{
|
|
||||||
if (!ZincSearchBase::indexExists(self::$indexName)) {
|
|
||||||
$mappings = [
|
|
||||||
'properties' => [
|
|
||||||
'key' => ['type' => 'keyword', 'index' => true],
|
|
||||||
'value' => ['type' => 'text', 'index' => true],
|
|
||||||
'created_at' => ['type' => 'date', 'index' => true],
|
|
||||||
'updated_at' => ['type' => 'date', 'index' => true]
|
|
||||||
]
|
|
||||||
];
|
|
||||||
$result = ZincSearchBase::createIndex(self::$indexName, $mappings);
|
|
||||||
return $result['success'] ?? false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 清空所有键值
|
|
||||||
*
|
|
||||||
* @return bool 是否成功
|
|
||||||
*/
|
|
||||||
public static function clear(): bool
|
|
||||||
{
|
|
||||||
// 检查索引是否存在
|
|
||||||
if (!ZincSearchBase::indexExists(self::$indexName)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 删除再重建索引
|
|
||||||
$deleteResult = ZincSearchBase::deleteIndex(self::$indexName);
|
|
||||||
if (!($deleteResult['success'] ?? false)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return self::ensureIndex();
|
|
||||||
}
|
|
||||||
|
|
||||||
// ==============================
|
|
||||||
// 基本操作
|
|
||||||
// ==============================
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 设置键值
|
|
||||||
*
|
|
||||||
* @param string $key 键名
|
|
||||||
* @param mixed $value 值
|
|
||||||
* @param bool $merge 是否合并现有数据(如果值是数组)
|
|
||||||
* @return bool 是否成功
|
|
||||||
*/
|
|
||||||
public static function set(string $key, mixed $value, bool $merge = false): bool
|
|
||||||
{
|
|
||||||
if (!self::ensureIndex()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 检查键是否已存在
|
|
||||||
if ($merge && is_array($value)) {
|
|
||||||
$existingData = self::get($key);
|
|
||||||
if (is_array($existingData)) {
|
|
||||||
$value = array_merge($existingData, $value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 检查是否存在相同键的文档 - 使用精确查询而不是普通搜索
|
|
||||||
$searchParams = [
|
|
||||||
'search_type' => 'term',
|
|
||||||
'query' => [
|
|
||||||
'field' => 'key',
|
|
||||||
'term' => $key
|
|
||||||
],
|
|
||||||
'from' => 0,
|
|
||||||
'max_results' => 1
|
|
||||||
];
|
|
||||||
|
|
||||||
$result = ZincSearchBase::advancedSearch(self::$indexName, $searchParams);
|
|
||||||
$docs = $result['data']['hits']['hits'] ?? [];
|
|
||||||
$now = date('c');
|
|
||||||
|
|
||||||
if (!empty($docs)) {
|
|
||||||
$docId = $docs[0]['_id'] ?? null;
|
|
||||||
if ($docId) {
|
|
||||||
// 更新现有文档
|
|
||||||
$docData = [
|
|
||||||
'key' => $key,
|
|
||||||
'value' => $value,
|
|
||||||
'updated_at' => $now
|
|
||||||
];
|
|
||||||
$updateResult = ZincSearchBase::updateDoc(self::$indexName, $docId, $docData);
|
|
||||||
return $updateResult['success'] ?? false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 创建新文档
|
|
||||||
$docData = [
|
|
||||||
'key' => $key,
|
|
||||||
'value' => $value,
|
|
||||||
'created_at' => $now,
|
|
||||||
'updated_at' => $now
|
|
||||||
];
|
|
||||||
$addResult = ZincSearchBase::addDoc(self::$indexName, $docData);
|
|
||||||
return $addResult['success'] ?? false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取键值
|
|
||||||
*
|
|
||||||
* @param string $key 键名
|
|
||||||
* @param mixed $default 默认值
|
|
||||||
* @return mixed 值或默认值
|
|
||||||
*/
|
|
||||||
public static function get(string $key, mixed $default = null): mixed
|
|
||||||
{
|
|
||||||
if (!self::ensureIndex() || empty($key)) {
|
|
||||||
return $default;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 精确匹配键名
|
|
||||||
$searchParams = [
|
|
||||||
'search_type' => 'term',
|
|
||||||
'query' => [
|
|
||||||
'field' => 'key',
|
|
||||||
'term' => $key
|
|
||||||
],
|
|
||||||
'from' => 0,
|
|
||||||
'max_results' => 1
|
|
||||||
];
|
|
||||||
|
|
||||||
$result = ZincSearchBase::advancedSearch(self::$indexName, $searchParams);
|
|
||||||
if (!($result['success'] ?? false)) {
|
|
||||||
return $default;
|
|
||||||
}
|
|
||||||
|
|
||||||
$hits = $result['data']['hits']['hits'] ?? [];
|
|
||||||
if (empty($hits)) {
|
|
||||||
return $default;
|
|
||||||
}
|
|
||||||
|
|
||||||
return $hits[0]['_source']['value'] ?? $default;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 删除键值
|
|
||||||
*
|
|
||||||
* @param string $key 键名
|
|
||||||
* @return bool 是否成功
|
|
||||||
*/
|
|
||||||
public static function delete(string $key): bool
|
|
||||||
{
|
|
||||||
if (!self::ensureIndex() || empty($key)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 查找文档ID
|
|
||||||
$searchParams = [
|
|
||||||
'search_type' => 'term',
|
|
||||||
'query' => [
|
|
||||||
'field' => 'key',
|
|
||||||
'term' => $key
|
|
||||||
],
|
|
||||||
'from' => 0,
|
|
||||||
'max_results' => 1
|
|
||||||
];
|
|
||||||
|
|
||||||
$result = ZincSearchBase::advancedSearch(self::$indexName, $searchParams);
|
|
||||||
if (!($result['success'] ?? false)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
$hits = $result['data']['hits']['hits'] ?? [];
|
|
||||||
if (empty($hits)) {
|
|
||||||
return true; // 不存在视为删除成功
|
|
||||||
}
|
|
||||||
|
|
||||||
$docId = $hits[0]['_id'] ?? null;
|
|
||||||
if (empty($docId)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
$deleteResult = ZincSearchBase::deleteDoc(self::$indexName, $docId);
|
|
||||||
return $deleteResult['success'] ?? false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ==============================
|
|
||||||
// 批量操作
|
|
||||||
// ==============================
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 批量设置键值对
|
|
||||||
*
|
|
||||||
* @param array $keyValues 键值对数组
|
|
||||||
* @return bool 是否全部成功
|
|
||||||
*/
|
|
||||||
public static function batchSet(array $keyValues): bool
|
|
||||||
{
|
|
||||||
if (!self::ensureIndex() || empty($keyValues)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
$docs = [];
|
|
||||||
$now = date('c');
|
|
||||||
|
|
||||||
foreach ($keyValues as $key => $value) {
|
|
||||||
$docs[] = [
|
|
||||||
'key' => $key,
|
|
||||||
'value' => $value,
|
|
||||||
'created_at' => $now,
|
|
||||||
'updated_at' => $now
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
$result = ZincSearchBase::addDocs(self::$indexName, $docs);
|
|
||||||
return $result['success'] ?? false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 批量获取键值
|
|
||||||
*
|
|
||||||
* @param array $keys 键名数组
|
|
||||||
* @return array 键值对数组
|
|
||||||
*/
|
|
||||||
public static function batchGet(array $keys): array
|
|
||||||
{
|
|
||||||
if (!self::ensureIndex() || empty($keys)) {
|
|
||||||
return [];
|
|
||||||
}
|
|
||||||
|
|
||||||
$results = [];
|
|
||||||
|
|
||||||
// 遍历查询每个键
|
|
||||||
foreach ($keys as $key) {
|
|
||||||
$results[$key] = self::get($key);
|
|
||||||
}
|
|
||||||
|
|
||||||
return $results;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -6,7 +6,6 @@ use App\Models\WebSocketDialogMsg;
|
|||||||
use App\Module\Apps;
|
use App\Module\Apps;
|
||||||
use App\Module\Manticore\ManticoreMsg;
|
use App\Module\Manticore\ManticoreMsg;
|
||||||
use App\Tasks\ManticoreSyncTask;
|
use App\Tasks\ManticoreSyncTask;
|
||||||
use App\Tasks\ZincSearchSyncTask;
|
|
||||||
|
|
||||||
class WebSocketDialogMsgObserver extends AbstractObserver
|
class WebSocketDialogMsgObserver extends AbstractObserver
|
||||||
{
|
{
|
||||||
@ -18,9 +17,6 @@ class WebSocketDialogMsgObserver extends AbstractObserver
|
|||||||
*/
|
*/
|
||||||
public function created(WebSocketDialogMsg $webSocketDialogMsg)
|
public function created(WebSocketDialogMsg $webSocketDialogMsg)
|
||||||
{
|
{
|
||||||
// ZincSearch 同步
|
|
||||||
self::taskDeliver(new ZincSearchSyncTask('sync', $webSocketDialogMsg->toArray()));
|
|
||||||
|
|
||||||
// Manticore 同步(仅在安装 Manticore 且符合索引条件时)
|
// Manticore 同步(仅在安装 Manticore 且符合索引条件时)
|
||||||
if (Apps::isInstalled('manticore') && ManticoreMsg::shouldIndex($webSocketDialogMsg)) {
|
if (Apps::isInstalled('manticore') && ManticoreMsg::shouldIndex($webSocketDialogMsg)) {
|
||||||
self::taskDeliver(new ManticoreSyncTask('msg_sync', ['msg_id' => $webSocketDialogMsg->id]));
|
self::taskDeliver(new ManticoreSyncTask('msg_sync', ['msg_id' => $webSocketDialogMsg->id]));
|
||||||
@ -35,9 +31,6 @@ class WebSocketDialogMsgObserver extends AbstractObserver
|
|||||||
*/
|
*/
|
||||||
public function updated(WebSocketDialogMsg $webSocketDialogMsg)
|
public function updated(WebSocketDialogMsg $webSocketDialogMsg)
|
||||||
{
|
{
|
||||||
// ZincSearch 同步
|
|
||||||
self::taskDeliver(new ZincSearchSyncTask('sync', $webSocketDialogMsg->toArray()));
|
|
||||||
|
|
||||||
// Manticore 同步(更新可能使消息符合或不再符合索引条件,由 sync 方法处理)
|
// Manticore 同步(更新可能使消息符合或不再符合索引条件,由 sync 方法处理)
|
||||||
if (Apps::isInstalled('manticore')) {
|
if (Apps::isInstalled('manticore')) {
|
||||||
self::taskDeliver(new ManticoreSyncTask('msg_sync', ['msg_id' => $webSocketDialogMsg->id]));
|
self::taskDeliver(new ManticoreSyncTask('msg_sync', ['msg_id' => $webSocketDialogMsg->id]));
|
||||||
@ -52,9 +45,6 @@ class WebSocketDialogMsgObserver extends AbstractObserver
|
|||||||
*/
|
*/
|
||||||
public function deleted(WebSocketDialogMsg $webSocketDialogMsg)
|
public function deleted(WebSocketDialogMsg $webSocketDialogMsg)
|
||||||
{
|
{
|
||||||
// ZincSearch 删除
|
|
||||||
self::taskDeliver(new ZincSearchSyncTask('delete', $webSocketDialogMsg->toArray()));
|
|
||||||
|
|
||||||
// Manticore 删除
|
// Manticore 删除
|
||||||
if (Apps::isInstalled('manticore')) {
|
if (Apps::isInstalled('manticore')) {
|
||||||
self::taskDeliver(new ManticoreSyncTask('msg_delete', ['msg_id' => $webSocketDialogMsg->id]));
|
self::taskDeliver(new ManticoreSyncTask('msg_delete', ['msg_id' => $webSocketDialogMsg->id]));
|
||||||
|
|||||||
@ -7,7 +7,6 @@ use App\Models\UserBot;
|
|||||||
use App\Models\WebSocketDialogUser;
|
use App\Models\WebSocketDialogUser;
|
||||||
use App\Module\Apps;
|
use App\Module\Apps;
|
||||||
use App\Tasks\ManticoreSyncTask;
|
use App\Tasks\ManticoreSyncTask;
|
||||||
use App\Tasks\ZincSearchSyncTask;
|
|
||||||
use Carbon\Carbon;
|
use Carbon\Carbon;
|
||||||
|
|
||||||
class WebSocketDialogUserObserver extends AbstractObserver
|
class WebSocketDialogUserObserver extends AbstractObserver
|
||||||
@ -33,7 +32,6 @@ class WebSocketDialogUserObserver extends AbstractObserver
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Deleted::forget('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid);
|
Deleted::forget('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid);
|
||||||
self::taskDeliver(new ZincSearchSyncTask('userSync', $webSocketDialogUser->toArray()));
|
|
||||||
|
|
||||||
// Manticore: 更新对话下所有消息的 allowed_users
|
// Manticore: 更新对话下所有消息的 allowed_users
|
||||||
if (Apps::isInstalled('manticore')) {
|
if (Apps::isInstalled('manticore')) {
|
||||||
@ -57,7 +55,7 @@ class WebSocketDialogUserObserver extends AbstractObserver
|
|||||||
*/
|
*/
|
||||||
public function updated(WebSocketDialogUser $webSocketDialogUser)
|
public function updated(WebSocketDialogUser $webSocketDialogUser)
|
||||||
{
|
{
|
||||||
self::taskDeliver(new ZincSearchSyncTask('userSync', $webSocketDialogUser->toArray()));
|
//
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -69,7 +67,6 @@ class WebSocketDialogUserObserver extends AbstractObserver
|
|||||||
public function deleted(WebSocketDialogUser $webSocketDialogUser)
|
public function deleted(WebSocketDialogUser $webSocketDialogUser)
|
||||||
{
|
{
|
||||||
Deleted::record('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid);
|
Deleted::record('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid);
|
||||||
self::taskDeliver(new ZincSearchSyncTask('deleteUser', $webSocketDialogUser->toArray()));
|
|
||||||
|
|
||||||
// Manticore: 更新对话下所有消息的 allowed_users
|
// Manticore: 更新对话下所有消息的 allowed_users
|
||||||
if (Apps::isInstalled('manticore')) {
|
if (Apps::isInstalled('manticore')) {
|
||||||
|
|||||||
@ -1,88 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
namespace App\Tasks;
|
|
||||||
|
|
||||||
use App\Models\WebSocketDialogMsg;
|
|
||||||
use App\Models\WebSocketDialogUser;
|
|
||||||
use App\Module\Apps;
|
|
||||||
use App\Module\ZincSearch\ZincSearchDialogMsg;
|
|
||||||
use Carbon\Carbon;
|
|
||||||
use Illuminate\Support\Facades\Cache;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 同步聊天数据到ZincSearch
|
|
||||||
*/
|
|
||||||
class ZincSearchSyncTask extends AbstractTask
|
|
||||||
{
|
|
||||||
private $action;
|
|
||||||
|
|
||||||
private $data;
|
|
||||||
|
|
||||||
public function __construct($action = null, $data = null)
|
|
||||||
{
|
|
||||||
parent::__construct(...func_get_args());
|
|
||||||
$this->action = $action;
|
|
||||||
$this->data = $data;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function start()
|
|
||||||
{
|
|
||||||
if (!Apps::isInstalled("search")) {
|
|
||||||
// 如果没有安装搜索模块,则不执行
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
switch ($this->action) {
|
|
||||||
case 'sync':
|
|
||||||
// 同步消息数据
|
|
||||||
ZincSearchDialogMsg::sync(WebSocketDialogMsg::fillInstance($this->data));
|
|
||||||
break;
|
|
||||||
|
|
||||||
case 'delete':
|
|
||||||
// 删除消息数据
|
|
||||||
ZincSearchDialogMsg::delete(WebSocketDialogMsg::fillInstance($this->data));
|
|
||||||
break;
|
|
||||||
|
|
||||||
case 'userSync':
|
|
||||||
// 同步用户数据
|
|
||||||
ZincSearchDialogMsg::userSync(WebSocketDialogUser::fillInstance($this->data));
|
|
||||||
break;
|
|
||||||
|
|
||||||
case 'deleteUser':
|
|
||||||
// 删除用户数据
|
|
||||||
ZincSearchDialogMsg::delete(WebSocketDialogUser::fillInstance($this->data));
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
// 增量更新
|
|
||||||
$this->incrementalUpdate();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 增量更新
|
|
||||||
* @return void
|
|
||||||
*/
|
|
||||||
private function incrementalUpdate()
|
|
||||||
{
|
|
||||||
// 120分钟执行一次
|
|
||||||
$time = intval(Cache::get("ZincSearchSyncTask:Time"));
|
|
||||||
if (time() - $time < 120 * 60) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 执行开始,120分钟后缓存标记失效
|
|
||||||
Cache::put("ZincSearchSyncTask:Time", time(), Carbon::now()->addMinutes(120));
|
|
||||||
|
|
||||||
// 开始执行同步
|
|
||||||
@shell_exec("php /var/www/artisan zinc:sync-user-msg --i");
|
|
||||||
|
|
||||||
// 执行完成,5分钟后缓存标记失效(5分钟任务可重复执行)
|
|
||||||
Cache::put("ZincSearchSyncTask:Time", time(), Carbon::now()->addMinutes(5));
|
|
||||||
}
|
|
||||||
|
|
||||||
public function end()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
x
Reference in New Issue
Block a user