perf: 优化消息搜索

This commit is contained in:
kuaifan 2025-03-01 20:39:28 +08:00
parent c2fd747c45
commit 32c232a0b5
8 changed files with 644 additions and 22 deletions

View File

@ -13,6 +13,7 @@ use App\Models\User;
use App\Module\Base;
use App\Module\Timer;
use App\Module\Extranet;
use App\Module\ElasticSearch;
use App\Module\TimeRange;
use App\Module\MsgTool;
use App\Module\Table\OnlineData;
@ -171,28 +172,16 @@ class DialogController extends AbstractController
}
// 搜索消息会话
if (count($list) < 20) {
$prefix = DB::getTablePrefix();
if (preg_match('/[+\-><()~*"@]/', $key)) {
$against = "\"{$key}\"";
} else {
$against = "*{$key}*";
$es = new ElasticSearch(ElasticSearch::DUM);
$searchResults = $es->searchDialogsByUserAndKeyword($user->userid, $key, 20 - count($list));
if ($searchResults) {
foreach ($searchResults as $item) {
if ($dialog = WebSocketDialog::find($item['id'])) {
$dialog = array_merge($dialog->toArray(), $item);
$list[] = WebSocketDialog::synthesizeData($dialog, $user->userid);
}
}
}
$msgs = DB::table('web_socket_dialog_users as u')
->select(['d.*', 'u.top_at', 'u.last_at', 'u.mark_unread', 'u.silence', 'u.hide', 'u.color', 'u.updated_at as user_at', 'm.id as search_msg_id'])
->join('web_socket_dialogs as d', 'u.dialog_id', '=', 'd.id')
->join('web_socket_dialog_msgs as m', 'm.dialog_id', '=', 'd.id')
->where('u.userid', $user->userid)
->where('m.bot', 0)
->whereNull('d.deleted_at')
->whereRaw("MATCH({$prefix}m.key) AGAINST('{$against}' IN BOOLEAN MODE)")
->orderByDesc('m.id')
->take(20 - count($list))
->get()
->map(function($item) use ($user) {
return WebSocketDialog::synthesizeData($item, $user->userid);
})
->all();
$list = array_merge($list, $msgs);
}
//
return Base::retSuccess('success', $list);

View File

@ -0,0 +1,556 @@
<?php
namespace App\Module;
use App\Models\WebSocketDialogMsg;
use App\Models\WebSocketDialogUser;
use Elastic\Elasticsearch\ClientBuilder;
use Elastic\Elasticsearch\Exception\MissingParameterException;
use Illuminate\Support\Facades\Log;
class ElasticSearch
{
/**
* Elasticsearch客户端实例
*
* @var \Elastic\Elasticsearch\Client
*/
public $client;
/**
* 当前操作的索引名称
*
* @var string
*/
protected $index;
/**
* 构造函数
*
* @param null $index 默认索引名称
* @throws \Elastic\Elasticsearch\Exception\ConfigException
*/
public function __construct($index = null)
{
$host = env('ELASTICSEARCH_HOST', env('APP_IPPR') . '.15');
$port = env('ELASTICSEARCH_PORT', '9200');
$scheme = env('ELASTICSEARCH_SCHEME', 'http');
$user = env('ELASTICSEARCH_USER', '');
$pass = env('ELASTICSEARCH_PASS', '');
// 为8.x版本客户端配置连接
$config = [
'hosts' => ["{$scheme}://{$host}:{$port}"]
];
// 如果设置了用户名和密码
if (!empty($user)) {
$config['basicAuthentication'] = [$user, $pass];
}
// 8.x版本使用ClientBuilder::fromConfig创建客户端
$this->client = ClientBuilder::fromConfig($config);
if ($index) {
$this->index = $index;
}
}
/**
* 设置索引名称
*
* @param string $index
* @return $this
*/
public function setIndex($index)
{
$this->index = $index;
return $this;
}
/**
* 检查索引是否存在
*
* @return bool
* @throws \Exception
*/
public function indexExists()
{
$params = ['index' => $this->index];
return (bool)$this->client->indices()->exists($params);
}
/**
* 创建索引
*
* @param array $settings 索引设置
* @param array $mappings 字段映射
* @return array
*/
public function createIndex($settings = [], $mappings = [])
{
$params = [
'index' => $this->index
];
$body = [];
if (!empty($settings)) {
$body['settings'] = $settings;
}
if (!empty($mappings)) {
$body['mappings'] = $mappings;
}
if (!empty($body)) {
$params['body'] = $body;
}
try {
// 在8.x中索引操作位于indices()命名空间
return $this->client->indices()->create($params)->asArray();
} catch (\Exception $e) {
Log::error('创建Elasticsearch索引失败: ' . $e->getMessage());
return ['error' => $e->getMessage()];
}
}
/**
* 删除索引
* @return array
*/
public function deleteIndex()
{
try {
$params = ['index' => $this->index];
return $this->client->indices()->delete($params)->asArray();
} catch (\Exception $e) {
Log::error('删除Elasticsearch索引失败: ' . $e->getMessage());
return ['error' => $e->getMessage()];
}
}
/**
* 批量操作(批量添加/更新/删除文档)
*
* @param array $operations 批量操作的数据
* @return array
*/
public function bulk($operations)
{
try {
// 在8.x中批量操作API签名相同但内部实现有所变化
return $this->client->bulk($operations)->asArray();
} catch (\Exception $e) {
Log::error('批量操作失败: ' . $e->getMessage());
return ['error' => $e->getMessage()];
}
}
/**
* 索引单个文档
*
* @param array $document 文档数据
* @param string $id 文档ID
* @param string|null $routing 路由值,用于父子文档
* @return array
*/
public function indexDocument($document, $id, $routing = null)
{
$params = [
'index' => $this->index,
'id' => $id,
'body' => $document
];
if ($routing) {
$params['routing'] = $routing;
}
try {
return $this->client->index($params)->asArray();
} catch (\Exception $e) {
Log::error('索引文档失败: ' . $e->getMessage());
return ['error' => $e->getMessage()];
}
}
/**
* 删除文档
*
* @param string $id 文档ID
* @param string|null $routing 路由值,用于父子文档
* @return array
*/
public function deleteDocument($id, $routing = null)
{
$params = [
'index' => $this->index,
'id' => $id
];
if ($routing) {
$params['routing'] = $routing;
}
try {
return $this->client->delete($params)->asArray();
} catch (MissingParameterException $e) {
// 文档不存在时返回成功
return ['result' => 'not_found', 'error' => $e->getMessage()];
} catch (\Exception $e) {
Log::error('删除文档失败: ' . $e->getMessage());
return ['error' => $e->getMessage()];
}
}
/**
* 通用搜索方法
*
* @param array $query 搜索查询
* @param int $from 起始位置
* @param int $size 返回结果数量
* @param array $sort 排序规则
* @return array
*/
public function search($query, $from = 0, $size = 10, $sort = [])
{
$params = [
'index' => $this->index,
'body' => [
'query' => $query,
'from' => $from,
'size' => $size
]
];
if (!empty($sort)) {
$params['body']['sort'] = $sort;
}
try {
return $this->client->search($params)->asArray();
} catch (\Exception $e) {
Log::error('搜索失败: ' . $e->getMessage());
return ['error' => $e->getMessage(), 'hits' => ['total' => ['value' => 0], 'hits' => []]];
}
}
/**
* 刷新索引
* @return array
*/
public function refreshIndex()
{
$params = [
'index' => $this->index
];
try {
return $this->client->indices()->refresh($params)->asArray();
} catch (\Exception $e) {
Log::error('刷新索引失败: ' . $e->getMessage());
return ['error' => $e->getMessage()];
}
}
/**
* 检查索引映射
* @return array
*/
public function checkIndexMapping()
{
try {
return $this->client->indices()->getMapping(['index' => $this->index])->asArray();
} catch (\Exception $e) {
return ['error' => $e->getMessage()];
}
}
/**
* 创建聊天系统索引 - 使用父子关系
* @return array
*/
public function createDialogUserMsgIndex()
{
// 定义映射
$mappings = [
'properties' => [
// 共用字段
'dialog_id' => ['type' => 'keyword'],
'created_at' => ['type' => 'date'],
'updated_at' => ['type' => 'date'],
// dialog_users 字段
'userid' => ['type' => 'keyword'],
'top_at' => ['type' => 'date'],
'last_at' => ['type' => 'date'],
'mark_unread' => ['type' => 'integer'],
'silence' => ['type' => 'integer'],
'hide' => ['type' => 'integer'],
'color' => ['type' => 'keyword'],
// dialog_msgs 字段
'msg_id' => ['type' => 'keyword'],
'sender_userid' => ['type' => 'keyword'],
'msg_type' => ['type' => 'keyword'],
'key' => ['type' => 'text'],
'bot' => ['type' => 'integer'],
// Join字段定义父子关系
'relationship' => [
'type' => 'join',
'relations' => [
'dialog_user' => 'dialog_msg' // dialog_user是父文档dialog_msg是子文档
]
],
]
];
// 索引设置
$settings = [
'number_of_shards' => 5,
'number_of_replicas' => 1,
'refresh_interval' => '5s'
];
return $this->createIndex($settings, $mappings);
}
/**
* 构建对话系统特定的搜索 - 根据用户ID和消息关键词搜索会话
*
* @param string $userid 用户ID
* @param string $keyword 消息关键词
* @param int $size 返回结果数量
* @return array
*/
public function searchDialogsByUserAndKeyword($userid, $keyword, $size = 20)
{
// 注意这里的类型名称要与创建索引时的一致
$query = [
'bool' => [
'must' => [
[
'term' => [
'userid' => $userid
]
],
[
'has_child' => [
'type' => 'dialog_msg',
'query' => [
'bool' => [
'must' => [
[
'match' => [
'key' => $keyword
]
],
[
'term' => [
'bot' => 0
]
]
]
]
],
'inner_hits' => [
'size' => 1,
'sort' => [
'msg_id' => 'desc'
]
]
]
]
]
]
];
// 开始搜索
$results = $this->search($query, 0, $size, ['last_at' => 'desc']);
// 处理搜索结果
$searchMap = [];
$hits = $results['hits']['hits'] ?? [];
foreach ($hits as $hit) {
if (isset($hit['inner_hits']['dialog_msg']['hits']['hits'][0])) {
$msgHit = $hit['inner_hits']['dialog_msg']['hits']['hits'][0];
$source = $hit['_source'];
$msgSource = $msgHit['_source'];
$searchMap[] = [
'id' => $source['dialog_id'],
'top_at' => $source['top_at'],
'last_at' => $source['last_at'],
'mark_unread' => $source['mark_unread'],
'silence' => $source['silence'],
'hide' => $source['hide'],
'color' => $source['color'],
'user_at' => $source['updated_at'],
'search_msg_id' => $msgSource['msg_id'],
];
}
}
// 返回搜索结果
return $searchMap;
}
/** ******************************************************************************************************** */
/** ******************************************************************************************************** */
/** ******************************************************************************************************** */
const DUM = "dialog_user_msg";
/**
* 会话用户 - 同步到Elasticsearch
*/
public static function syncDialogUserToElasticSearch(WebSocketDialogUser $dialogUser)
{
try {
$es = new self(self::DUM);
$docId = "user_{$dialogUser->userid}_dialog_{$dialogUser->dialog_id}";
$document = [
'dialog_id' => $dialogUser->dialog_id,
'created_at' => $dialogUser->created_at,
'updated_at' => $dialogUser->updated_at,
'userid' => $dialogUser->userid,
'top_at' => $dialogUser->top_at,
'last_at' => $dialogUser->last_at,
'mark_unread' => $dialogUser->mark_unread ? 1 : 0,
'silence' => $dialogUser->silence ? 1 : 0,
'hide' => $dialogUser->hide ? 1 : 0,
'color' => $dialogUser->color,
'relationship' => [
'name' => 'dialog_user'
]
];
$es->indexDocument($document, $docId);
} catch (\Exception $e) {
Log::error('syncDialogUserToElasticSearch: ' . $e->getMessage());
}
}
/**
* 会话用户 - 从Elasticsearch删除
*/
public static function deleteDialogUserFromElasticSearch(WebSocketDialogUser $dialogUser)
{
try {
$es = new self(self::DUM);
$docId = "user_{$dialogUser->userid}_dialog_{$dialogUser->dialog_id}";
// 删除用户-会话文档
$es->deleteDocument($docId);
// 注意:这里可能还需要删除所有关联的消息文档
// 但由于父子关系,可以通过查询找到所有子文档并删除
// 这里为简化可以选择在后台任务中处理或者直接依赖ES的级联删除功能
} catch (\Exception $e) {
Log::error('deleteDialogUserFromElasticSearch: ' . $e->getMessage());
}
}
/**
* 会话消息 - 同步到Elasticsearch
*/
public static function syncDialogToElasticSearch(WebSocketDialogMsg $dialogMsg)
{
try {
$es = new self(self::DUM);
// 获取此会话的所有用户
$dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get();
if ($dialogUsers->isEmpty()) {
return;
}
$params = ['body' => []];
foreach ($dialogUsers as $dialogUser) {
$parentId = "user_{$dialogUser->userid}_dialog_{$dialogMsg->dialog_id}";
$docId = "msg_{$dialogMsg->id}_user_{$dialogUser->userid}";
$params['body'][] = [
'index' => [
'_index' => self::DUM,
'_id' => $docId,
'routing' => $parentId
]
];
$params['body'][] = [
'dialog_id' => $dialogMsg->dialog_id,
'created_at' => $dialogMsg->created_at,
'updated_at' => $dialogMsg->updated_at,
'msg_id' => $dialogMsg->id,
'sender_userid' => $dialogMsg->userid,
'msg_type' => $dialogMsg->type,
'key' => $dialogMsg->key,
'bot' => $dialogMsg->bot ? 1 : 0,
'relationship' => [
'name' => 'dialog_msg',
'parent' => $parentId
]
];
}
if (!empty($params['body'])) {
$es->bulk($params);
}
} catch (\Exception $e) {
Log::error('syncDialogToElasticSearch: ' . $e->getMessage());
}
}
/**
* 会话消息 - 从Elasticsearch删除
*/
public static function deleteDialogFromElasticSearch(WebSocketDialogMsg $dialogMsg)
{
try {
$es = new self(self::DUM);
// 获取此会话的所有用户
$dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get();
if ($dialogUsers->isEmpty()) {
return;
}
$params = ['body' => []];
foreach ($dialogUsers as $dialogUser) {
$docId = "msg_{$dialogMsg->id}_user_{$dialogUser->userid}";
$parentId = "user_{$dialogUser->userid}_dialog_{$dialogMsg->dialog_id}";
$params['body'][] = [
'delete' => [
'_index' => self::DUM,
'_id' => $docId,
'routing' => $parentId
]
];
}
if (!empty($params['body'])) {
$es->bulk($params);
}
} catch (\Exception $e) {
Log::error('deleteDialogFromElasticSearch: ' . $e->getMessage());
}
}
}

View File

@ -0,0 +1,64 @@
<?php
namespace App\Observers;
use App\Models\WebSocketDialogMsg;
use App\Module\ElasticSearch;
class WebSocketDialogMsgObserver
{
/**
* Handle the WebSocketDialogMsg "created" event.
*
* @param \App\Models\WebSocketDialogMsg $webSocketDialogMsg
* @return void
*/
public function created(WebSocketDialogMsg $webSocketDialogMsg)
{
ElasticSearch::syncDialogToElasticSearch($webSocketDialogMsg);
}
/**
* Handle the WebSocketDialogMsg "updated" event.
*
* @param \App\Models\WebSocketDialogMsg $webSocketDialogMsg
* @return void
*/
public function updated(WebSocketDialogMsg $webSocketDialogMsg)
{
ElasticSearch::syncDialogToElasticSearch($webSocketDialogMsg);
}
/**
* Handle the WebSocketDialogMsg "deleted" event.
*
* @param \App\Models\WebSocketDialogMsg $webSocketDialogMsg
* @return void
*/
public function deleted(WebSocketDialogMsg $webSocketDialogMsg)
{
ElasticSearch::deleteDialogFromElasticSearch($webSocketDialogMsg);
}
/**
* Handle the WebSocketDialogMsg "restored" event.
*
* @param \App\Models\WebSocketDialogMsg $webSocketDialogMsg
* @return void
*/
public function restored(WebSocketDialogMsg $webSocketDialogMsg)
{
//
}
/**
* Handle the WebSocketDialogMsg "force deleted" event.
*
* @param \App\Models\WebSocketDialogMsg $webSocketDialogMsg
* @return void
*/
public function forceDeleted(WebSocketDialogMsg $webSocketDialogMsg)
{
//
}
}

View File

@ -4,6 +4,7 @@ namespace App\Observers;
use App\Models\Deleted;
use App\Models\WebSocketDialogUser;
use App\Module\ElasticSearch;
use Carbon\Carbon;
class WebSocketDialogUserObserver
@ -29,6 +30,7 @@ class WebSocketDialogUserObserver
}
}
Deleted::forget('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid);
ElasticSearch::syncDialogUserToElasticSearch($webSocketDialogUser);
}
/**
@ -39,7 +41,7 @@ class WebSocketDialogUserObserver
*/
public function updated(WebSocketDialogUser $webSocketDialogUser)
{
//
ElasticSearch::syncDialogUserToElasticSearch($webSocketDialogUser);
}
/**
@ -51,6 +53,7 @@ class WebSocketDialogUserObserver
public function deleted(WebSocketDialogUser $webSocketDialogUser)
{
Deleted::record('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid);
ElasticSearch::deleteDialogUserFromElasticSearch($webSocketDialogUser);
}
/**

View File

@ -7,11 +7,13 @@ use App\Models\ProjectTask;
use App\Models\ProjectTaskUser;
use App\Models\ProjectUser;
use App\Models\WebSocketDialog;
use App\Models\WebSocketDialogMsg;
use App\Models\WebSocketDialogUser;
use App\Observers\ProjectObserver;
use App\Observers\ProjectTaskObserver;
use App\Observers\ProjectTaskUserObserver;
use App\Observers\ProjectUserObserver;
use App\Observers\WebSocketDialogMsgObserver;
use App\Observers\WebSocketDialogObserver;
use App\Observers\WebSocketDialogUserObserver;
use Illuminate\Auth\Events\Registered;
@ -43,6 +45,7 @@ class EventServiceProvider extends ServiceProvider
ProjectTaskUser::observe(ProjectTaskUserObserver::class);
ProjectUser::observe(ProjectUserObserver::class);
WebSocketDialog::observe(WebSocketDialogObserver::class);
WebSocketDialogMsg::observe(WebSocketDialogMsgObserver::class);
WebSocketDialogUser::observe(WebSocketDialogUserObserver::class);
}
}

View File

@ -228,6 +228,9 @@ services:
es:
container_name: "dootask-es-${APP_ID}"
image: "elasticsearch:8.17.2"
volumes:
- ./docker/es/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
- ./docker/es/data:/usr/share/elasticsearch/data
environment:
discovery.type: single-node
xpack.security.enabled: false

View File

@ -0,0 +1,2 @@
cluster.name: "docker-cluster"
network.host: 0.0.0.0

2
docker/es/data/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
*
!.gitignore