mirror of
https://github.com/kuaifan/dootask.git
synced 2025-12-11 18:42:54 +00:00
perf: 优化全文搜索
This commit is contained in:
parent
f258dcfca2
commit
e9e9bab479
@ -1,254 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace App\Console\Commands;
|
||||
|
||||
use App\Models\WebSocketDialogMsg;
|
||||
use App\Models\WebSocketDialogUser;
|
||||
use App\Module\ElasticSearch\ElasticSearchKeyValue;
|
||||
use App\Module\ElasticSearch\ElasticSearchUserMsg;
|
||||
use Illuminate\Console\Command;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
|
||||
class SyncDialogUserMsgToElasticsearch extends Command
|
||||
{
|
||||
/**
|
||||
* 更新数据
|
||||
* --f: 全量更新 (默认)
|
||||
* --i: 增量更新(从上次更新的最后一个ID接上)
|
||||
*
|
||||
* 清理数据
|
||||
* --c: 清除索引
|
||||
*/
|
||||
|
||||
protected $signature = 'elasticsearch:sync-dialog-user-msg {--f} {--i} {--c} {--batch=500}';
|
||||
protected $description = '同步聊天会话用户和消息到Elasticsearch';
|
||||
protected $es;
|
||||
|
||||
/**
|
||||
* SyncDialogUserMsgToElasticsearch constructor.
|
||||
*/
|
||||
public function __construct()
|
||||
{
|
||||
parent::__construct();
|
||||
try {
|
||||
$this->es = new ElasticSearchUserMsg();
|
||||
} catch (\Exception $e) {
|
||||
$this->error('Elasticsearch连接失败: ' . $e->getMessage());
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function handle()
|
||||
{
|
||||
$this->info('开始同步聊天数据...');
|
||||
|
||||
// 清除索引
|
||||
if ($this->option('c')) {
|
||||
$this->info('清除索引...');
|
||||
if (!$this->es->indexExists()) {
|
||||
$this->saveLastId(true);
|
||||
$this->info('索引不存在');
|
||||
return 0;
|
||||
}
|
||||
$result = $this->es->deleteIndex();
|
||||
if (isset($result['error'])) {
|
||||
$this->error('删除索引失败: ' . $result['error']);
|
||||
return 1;
|
||||
}
|
||||
$this->saveLastId(true);
|
||||
$this->info('索引删除成功');
|
||||
return 0;
|
||||
}
|
||||
|
||||
// 判断创建索引
|
||||
if (!$this->es->indexExists()) {
|
||||
$this->info('创建索引...');
|
||||
$result = ElasticSearchUserMsg::generateIndex();
|
||||
if (isset($result['error'])) {
|
||||
$this->error('创建索引失败: ' . $result['error']);
|
||||
return 1;
|
||||
}
|
||||
$this->saveLastId(true);
|
||||
$this->info('索引创建成功');
|
||||
}
|
||||
|
||||
// 同步用户-会话数据
|
||||
$this->syncDialogUsers($this->option('batch'));
|
||||
|
||||
// 同步消息数据
|
||||
$this->syncDialogMsgs($this->option('batch'));
|
||||
|
||||
// 完成
|
||||
$this->info("\n同步完成");
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存最后一个ID
|
||||
* @param string|true $type
|
||||
* @param integer $lastId
|
||||
*/
|
||||
private function saveLastId($type, $lastId = 0)
|
||||
{
|
||||
if ($type === true) {
|
||||
$setting = [];
|
||||
} else {
|
||||
$setting = ElasticSearchKeyValue::getArray('elasticSearch:sync');
|
||||
$setting[$type] = $lastId;
|
||||
}
|
||||
ElasticSearchKeyValue::save('elasticSearch:sync', $setting);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取最后一个ID
|
||||
* @param $type
|
||||
* @return int
|
||||
*/
|
||||
private function getLastId($type)
|
||||
{
|
||||
if ($this->option('i')) {
|
||||
$setting = ElasticSearchKeyValue::getArray('elasticSearch:sync');
|
||||
return intval($setting[$type] ?? 0);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* 同步用户-会话数据(父文档)
|
||||
* @param $batchSize
|
||||
* @return void
|
||||
*/
|
||||
private function syncDialogUsers($batchSize)
|
||||
{
|
||||
$this->info("\n同步用户数据...");
|
||||
$lastId = $this->getLastId('dialog_user');
|
||||
|
||||
$num = 0;
|
||||
$count = WebSocketDialogUser::where('id', '>', $lastId)->count();
|
||||
|
||||
do {
|
||||
// 获取一批用户-会话关系
|
||||
$dialogUsers = WebSocketDialogUser::where('id', '>', $lastId)
|
||||
->orderBy('id')
|
||||
->limit($batchSize)
|
||||
->get();
|
||||
|
||||
if ($dialogUsers->isEmpty()) {
|
||||
break;
|
||||
}
|
||||
|
||||
$num += count($dialogUsers);
|
||||
$progress = round($num / $count * 100, 2);
|
||||
$this->info("{$num}/{$count} ({$progress}%) 正在同步用户ID {$lastId} ~ {$dialogUsers->last()->id}");
|
||||
|
||||
// 批量索引数据
|
||||
$params = ['body' => []];
|
||||
foreach ($dialogUsers as $dialogUser) {
|
||||
$params['body'][] = [
|
||||
'index' => [
|
||||
'_index' => ElasticSearchUserMsg::indexName(),
|
||||
'_id' => ElasticSearchUserMsg::generateUserDicId($dialogUser),
|
||||
]
|
||||
];
|
||||
$params['body'][] = ElasticSearchUserMsg::generateUserFormat($dialogUser);
|
||||
}
|
||||
|
||||
if ($params['body']) {
|
||||
$result = $this->es->bulk($params);
|
||||
if (isset($result['errors']) && $result['errors']) {
|
||||
$this->error('批量索引用户数据部分失败');
|
||||
Log::error('Elasticsearch批量索引失败: ' . json_encode($result['items']));
|
||||
}
|
||||
}
|
||||
|
||||
$lastId = $dialogUsers->last()->id;
|
||||
$this->saveLastId('dialog_user', $lastId);
|
||||
} while (count($dialogUsers) == $batchSize);
|
||||
|
||||
$this->info("同步用户数据结束 - 最后ID {$lastId}");
|
||||
}
|
||||
|
||||
/**
|
||||
* 同步消息数据(子文档)
|
||||
*/
|
||||
private function syncDialogMsgs($batchSize)
|
||||
{
|
||||
$this->info("\n同步消息数据...");
|
||||
$lastId = $this->getLastId('dialog_msg');
|
||||
|
||||
$num = 0;
|
||||
$count = WebSocketDialogMsg::where('id', '>', $lastId)->count();
|
||||
|
||||
do {
|
||||
// 获取一批消息
|
||||
$dialogMsgs = WebSocketDialogMsg::where('id', '>', $lastId)
|
||||
->orderBy('id')
|
||||
->limit($batchSize)
|
||||
->get();
|
||||
|
||||
if ($dialogMsgs->isEmpty()) {
|
||||
break;
|
||||
}
|
||||
|
||||
$num += count($dialogMsgs);
|
||||
$progress = round($num / $count * 100, 2);
|
||||
$this->info("{$num}/{$count} ({$progress}%) 正在同步消息ID {$lastId} ~ {$dialogMsgs->last()->id}");
|
||||
|
||||
// 获取这些消息所属的会话对应的所有用户
|
||||
$dialogIds = $dialogMsgs->pluck('dialog_id')->unique()->toArray();
|
||||
$userDialogMap = [];
|
||||
|
||||
if (!empty($dialogIds)) {
|
||||
$dialogUsers = WebSocketDialogUser::whereIn('dialog_id', $dialogIds)->get();
|
||||
|
||||
foreach ($dialogUsers as $dialogUser) {
|
||||
$userDialogMap[$dialogUser->dialog_id][] = $dialogUser->userid;
|
||||
}
|
||||
}
|
||||
|
||||
// 批量索引消息数据
|
||||
$params = ['body' => []];
|
||||
foreach ($dialogMsgs as $dialogMsg) {
|
||||
// 如果该会话没有用户,跳过
|
||||
if (empty($userDialogMap[$dialogMsg->dialog_id])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 为每个用户-会话关系创建子文档
|
||||
foreach ($userDialogMap[$dialogMsg->dialog_id] as $userid) {
|
||||
$params['body'][] = [
|
||||
'index' => [
|
||||
'_index' => ElasticSearchUserMsg::indexName(),
|
||||
'_id' => ElasticSearchUserMsg::generateMsgDicId($dialogMsg, $userid),
|
||||
'routing' => ElasticSearchUserMsg::generateMsgParentId($dialogMsg, $userid) // 路由到父文档
|
||||
]
|
||||
];
|
||||
|
||||
$params['body'][] = ElasticSearchUserMsg::generateMsgFormat($dialogMsg, $userid);
|
||||
}
|
||||
}
|
||||
|
||||
if (!empty($params['body'])) {
|
||||
// 分批处理
|
||||
$chunks = array_chunk($params['body'], 1000);
|
||||
foreach ($chunks as $chunk) {
|
||||
$chunkParams = ['body' => $chunk];
|
||||
$result = $this->es->bulk($chunkParams);
|
||||
if (isset($result['errors']) && $result['errors']) {
|
||||
$this->error('批量索引消息数据部分失败');
|
||||
Log::error('Elasticsearch批量索引失败: ' . json_encode($result['items']));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$lastId = $dialogMsgs->last()->id;
|
||||
$this->saveLastId('dialog_msg', $lastId);
|
||||
} while (count($dialogMsgs) == $batchSize);
|
||||
|
||||
$this->info("同步消息结束 - 最后ID {$lastId}");
|
||||
}
|
||||
}
|
||||
@ -14,10 +14,8 @@ use App\Module\Base;
|
||||
use App\Module\Timer;
|
||||
use App\Models\Setting;
|
||||
use App\Module\Extranet;
|
||||
use App\Module\ElasticSearch\ElasticSearchUserMsg;
|
||||
use App\Module\TimeRange;
|
||||
use App\Module\MsgTool;
|
||||
use App\Module\Table\OnlineData;
|
||||
use App\Models\FileContent;
|
||||
use App\Models\ProjectTask;
|
||||
use App\Models\AbstractModel;
|
||||
@ -29,6 +27,8 @@ use App\Models\WebSocketDialogMsgRead;
|
||||
use App\Models\WebSocketDialogMsgTodo;
|
||||
use App\Models\WebSocketDialogMsgTranslate;
|
||||
use App\Models\WebSocketDialogSession;
|
||||
use App\Module\Table\OnlineData;
|
||||
use App\Module\ZincSearch\ZincSearchUserMsg;
|
||||
use Hhxsv5\LaravelS\Swoole\Task\Task;
|
||||
|
||||
/**
|
||||
@ -174,7 +174,7 @@ class DialogController extends AbstractController
|
||||
}
|
||||
// 搜索消息会话
|
||||
if (count($list) < 20) {
|
||||
$searchResults = ElasticSearchUserMsg::searchByKeyword($user->userid, $key, 20 - count($list));
|
||||
$searchResults = ZincSearchUserMsg::searchByKeyword($user->userid, $key, 0, 20 - count($list));
|
||||
if ($searchResults) {
|
||||
foreach ($searchResults as $item) {
|
||||
if ($dialog = WebSocketDialog::find($item['id'])) {
|
||||
@ -728,7 +728,7 @@ class DialogController extends AbstractController
|
||||
$key = trim(Request::input('key'));
|
||||
$list = [];
|
||||
//
|
||||
$searchResults = ElasticSearchUserMsg::searchByKeyword($user->userid, $key, Base::getPaginate(50, 20));
|
||||
$searchResults = ZincSearchUserMsg::searchByKeyword($user->userid, $key, 0, Base::getPaginate(50, 20));
|
||||
if ($searchResults) {
|
||||
foreach ($searchResults as $item) {
|
||||
if ($dialog = WebSocketDialog::find($item['id'])) {
|
||||
|
||||
@ -1,308 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace App\Module\ElasticSearch;
|
||||
|
||||
use Elastic\Elasticsearch\ClientBuilder;
|
||||
use Elastic\Elasticsearch\Exception\MissingParameterException;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
|
||||
/**
|
||||
* Elasticsearch基础类
|
||||
*
|
||||
* Class ElasticSearchBase
|
||||
* @package App\Module\ElasticSearch
|
||||
*/
|
||||
class ElasticSearchBase
|
||||
{
|
||||
/**
|
||||
* Elasticsearch客户端实例
|
||||
*
|
||||
* @var \Elastic\Elasticsearch\Client
|
||||
*/
|
||||
protected $client;
|
||||
|
||||
/**
|
||||
* 当前操作的索引名称
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
protected $index;
|
||||
|
||||
/**
|
||||
* 构造函数
|
||||
*
|
||||
* @param null $index 默认索引名称
|
||||
* @throws \Elastic\Elasticsearch\Exception\ConfigException
|
||||
*/
|
||||
public function __construct($index = null)
|
||||
{
|
||||
$host = env('ELASTICSEARCH_HOST', 'es');
|
||||
$port = env('ELASTICSEARCH_PORT', '9200');
|
||||
$scheme = env('ELASTICSEARCH_SCHEME', 'http');
|
||||
$user = env('ELASTICSEARCH_USER', '');
|
||||
$pass = env('ELASTICSEARCH_PASS', '');
|
||||
$verifi = env('ELASTICSEARCH_VERIFI', false);
|
||||
$ca = env('ELASTICSEARCH_CA', '');
|
||||
$key = env('ELASTICSEARCH_KEY', '');
|
||||
$cert = env('ELASTICSEARCH_CERT', '');
|
||||
// 为8.x版本客户端配置连接
|
||||
$config = [
|
||||
'hosts' => ["{$scheme}://{$host}:{$port}"]
|
||||
];
|
||||
|
||||
// 如果设置了用户名和密码
|
||||
if (!empty($user)) {
|
||||
$config['basicAuthentication'] = [$user, $pass];
|
||||
}
|
||||
|
||||
$config['SSLVerification'] = $verifi;
|
||||
if ($verifi) {
|
||||
$config['SSLCert'] = $cert;
|
||||
$config['CABundle'] = $ca;
|
||||
$config['SSLKey'] = $key;
|
||||
}
|
||||
// 8.x版本使用ClientBuilder::fromConfig创建客户端
|
||||
$this->client = ClientBuilder::fromConfig($config);
|
||||
|
||||
if ($index) {
|
||||
$this->index = $index;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置索引名称
|
||||
*
|
||||
* @param string $index
|
||||
* @return $this
|
||||
*/
|
||||
public function setIndex($index)
|
||||
{
|
||||
$this->index = $index;
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查索引是否存在
|
||||
*
|
||||
* @return bool
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function indexExists()
|
||||
{
|
||||
$params = ['index' => $this->index];
|
||||
return $this->client->indices()->exists($params)->asBool();
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建索引
|
||||
*
|
||||
* @param array $settings 索引设置
|
||||
* @param array $mappings 字段映射
|
||||
* @return array
|
||||
*/
|
||||
public function createIndex($settings = [], $mappings = [])
|
||||
{
|
||||
$params = [
|
||||
'index' => $this->index
|
||||
];
|
||||
|
||||
$body = [];
|
||||
if (!empty($settings)) {
|
||||
$body['settings'] = $settings;
|
||||
}
|
||||
|
||||
if (!empty($mappings)) {
|
||||
$body['mappings'] = $mappings;
|
||||
}
|
||||
|
||||
if (!empty($body)) {
|
||||
$params['body'] = $body;
|
||||
}
|
||||
|
||||
try {
|
||||
// 在8.x中,索引操作位于indices()命名空间
|
||||
return $this->client->indices()->create($params)->asArray();
|
||||
} catch (\Exception $e) {
|
||||
Log::error('创建Elasticsearch索引失败: ' . $e->getMessage());
|
||||
return ['error' => $e->getMessage()];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除索引
|
||||
* @return array
|
||||
*/
|
||||
public function deleteIndex()
|
||||
{
|
||||
try {
|
||||
$params = ['index' => $this->index];
|
||||
return $this->client->indices()->delete($params)->asArray();
|
||||
} catch (\Exception $e) {
|
||||
Log::error('删除Elasticsearch索引失败: ' . $e->getMessage());
|
||||
return ['error' => $e->getMessage()];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量操作(批量添加/更新/删除文档)
|
||||
*
|
||||
* @param array $operations 批量操作的数据
|
||||
* @return array
|
||||
*/
|
||||
public function bulk($operations)
|
||||
{
|
||||
try {
|
||||
// 在8.x中,批量操作API签名相同,但内部实现有所变化
|
||||
return $this->client->bulk($operations)->asArray();
|
||||
} catch (\Exception $e) {
|
||||
Log::error('批量操作失败: ' . $e->getMessage());
|
||||
return ['error' => $e->getMessage()];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 索引单个文档
|
||||
*
|
||||
* @param array $document 文档数据
|
||||
* @param string $id 文档ID
|
||||
* @param string|null $routing 路由值,用于父子文档
|
||||
* @return array
|
||||
*/
|
||||
public function indexDocument($document, $id, $routing = null)
|
||||
{
|
||||
$params = [
|
||||
'index' => $this->index,
|
||||
'id' => $id,
|
||||
'body' => $document
|
||||
];
|
||||
|
||||
if ($routing) {
|
||||
$params['routing'] = $routing;
|
||||
}
|
||||
|
||||
try {
|
||||
return $this->client->index($params)->asArray();
|
||||
} catch (\Exception $e) {
|
||||
Log::error('索引文档失败: ' . $e->getMessage());
|
||||
return ['error' => $e->getMessage()];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除文档
|
||||
*
|
||||
* @param string $id 文档ID
|
||||
* @param string|null $routing 路由值,用于父子文档
|
||||
* @return array
|
||||
*/
|
||||
public function deleteDocument($id, $routing = null)
|
||||
{
|
||||
$params = [
|
||||
'index' => $this->index,
|
||||
'id' => $id
|
||||
];
|
||||
|
||||
if ($routing) {
|
||||
$params['routing'] = $routing;
|
||||
}
|
||||
|
||||
try {
|
||||
return $this->client->delete($params)->asArray();
|
||||
} catch (MissingParameterException $e) {
|
||||
// 文档不存在时返回成功
|
||||
return ['result' => 'not_found', 'error' => $e->getMessage()];
|
||||
} catch (\Exception $e) {
|
||||
Log::error('删除文档失败: ' . $e->getMessage());
|
||||
return ['error' => $e->getMessage()];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 刷新索引
|
||||
* @return array
|
||||
*/
|
||||
public function refreshIndex()
|
||||
{
|
||||
$params = [
|
||||
'index' => $this->index
|
||||
];
|
||||
|
||||
try {
|
||||
return $this->client->indices()->refresh($params)->asArray();
|
||||
} catch (\Exception $e) {
|
||||
Log::error('刷新索引失败: ' . $e->getMessage());
|
||||
return ['error' => $e->getMessage()];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查索引映射
|
||||
* @return array
|
||||
*/
|
||||
public function checkIndexMapping()
|
||||
{
|
||||
try {
|
||||
return $this->client->indices()->getMapping(['index' => $this->index])->asArray();
|
||||
} catch (\Exception $e) {
|
||||
return ['error' => $e->getMessage()];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 通用搜索方法
|
||||
*
|
||||
* @param array $query 搜索查询
|
||||
* @param int $from 起始位置
|
||||
* @param int $size 返回结果数量
|
||||
* @param array $sort 排序规则
|
||||
* @return array
|
||||
*/
|
||||
public function search($query, $from = 0, $size = 10, $sort = [])
|
||||
{
|
||||
$params = [
|
||||
'index' => $this->index,
|
||||
'body' => [
|
||||
'query' => $query,
|
||||
'from' => $from,
|
||||
'size' => $size
|
||||
]
|
||||
];
|
||||
|
||||
if (!empty($sort)) {
|
||||
$params['body']['sort'] = $sort;
|
||||
}
|
||||
|
||||
try {
|
||||
return $this->client->search($params)->asArray();
|
||||
} catch (\Exception $e) {
|
||||
Log::error('搜索失败: ' . $e->getMessage());
|
||||
return ['error' => $e->getMessage(), 'hits' => ['total' => ['value' => 0], 'hits' => []]];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 索引名称
|
||||
*/
|
||||
const indexName = 'default';
|
||||
|
||||
/**
|
||||
* 获取索引名称
|
||||
* @param string $index 索引名称
|
||||
* @param string|null $prefix 索引前缀
|
||||
* @param string|null $subfix 索引后缀
|
||||
* @return string
|
||||
*/
|
||||
public static function indexName($index = '', $prefix = '', $subfix = '')
|
||||
{
|
||||
$index = $index ?: static::indexName;
|
||||
$prefix = $prefix ?: env('ES_INDEX_PREFIX', '');
|
||||
$subfix = $subfix ?: env('ES_INDEX_SUFFIX', '');
|
||||
if ($prefix) {
|
||||
$index = rtrim($prefix, '_') . '_' . $index;
|
||||
}
|
||||
if ($subfix) {
|
||||
$index = $index . '_' . ltrim($subfix, '_');
|
||||
}
|
||||
return $index;
|
||||
}
|
||||
}
|
||||
@ -1,204 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace App\Module\ElasticSearch;
|
||||
|
||||
use App\Module\Base;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
|
||||
/**
|
||||
* Elasticsearch键值存储
|
||||
*
|
||||
* Class ElasticSearchKeyValue
|
||||
* @package App\Module\ElasticSearch
|
||||
*/
|
||||
class ElasticSearchKeyValue extends ElasticSearchBase
|
||||
{
|
||||
const indexName = 'key_value_store';
|
||||
|
||||
/**
|
||||
* 构造函数
|
||||
* @return ElasticSearchBase
|
||||
* @throws \Elastic\Elasticsearch\Exception\ConfigException
|
||||
*/
|
||||
public function __construct()
|
||||
{
|
||||
return parent::__construct(self::indexName());
|
||||
}
|
||||
|
||||
/** ******************************************************************************************************** */
|
||||
/** *********************************** 键值存储方法 ******************************************************** */
|
||||
/** ******************************************************************************************************** */
|
||||
|
||||
/**
|
||||
* 创建键值存储索引
|
||||
* @return array
|
||||
*/
|
||||
public static function generateIndex()
|
||||
{
|
||||
try {
|
||||
$es = new self();
|
||||
|
||||
// 如果索引已存在,则直接返回
|
||||
if ($es->indexExists()) {
|
||||
return ['acknowledged' => true, 'message' => '索引已存在'];
|
||||
}
|
||||
|
||||
// 定义映射
|
||||
$mappings = [
|
||||
'properties' => [
|
||||
'key' => ['type' => 'keyword'],
|
||||
'value' => ['type' => 'text', 'fields' => ['keyword' => ['type' => 'keyword']]],
|
||||
'created_at' => ['type' => 'integer'],
|
||||
'updated_at' => ['type' => 'integer']
|
||||
]
|
||||
];
|
||||
|
||||
// 索引设置
|
||||
$settings = [
|
||||
'number_of_shards' => 1,
|
||||
'number_of_replicas' => 1,
|
||||
'refresh_interval' => '1s'
|
||||
];
|
||||
|
||||
return $es->createIndex($settings, $mappings);
|
||||
} catch (\Exception $e) {
|
||||
Log::error('创建键值存储索引失败: ' . $e->getMessage());
|
||||
return ['error' => $e->getMessage()];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存键值对
|
||||
* @param string $key 键名
|
||||
* @param mixed $value 键值
|
||||
* @param string $namespace 命名空间,用于区分不同的键值存储场景
|
||||
* @return array
|
||||
*/
|
||||
public static function save($key, $value, $namespace = 'default')
|
||||
{
|
||||
try {
|
||||
// 确保索引存在
|
||||
self::generateIndex();
|
||||
|
||||
$es = new self();
|
||||
|
||||
// 生成文档ID
|
||||
$docId = "{$namespace}:{$key}";
|
||||
|
||||
// 准备文档数据
|
||||
$document = [
|
||||
'key' => $key,
|
||||
'value' => is_array($value) ? json_encode($value, JSON_UNESCAPED_UNICODE) : $value,
|
||||
'namespace' => $namespace,
|
||||
'created_at' => time(),
|
||||
'updated_at' => time()
|
||||
];
|
||||
|
||||
// 索引文档
|
||||
$result = $es->indexDocument($document, $docId);
|
||||
|
||||
// 刷新索引以确保立即可见
|
||||
$es->refreshIndex();
|
||||
|
||||
return $result;
|
||||
} catch (\Exception $e) {
|
||||
Log::error('保存键值对失败: ' . $e->getMessage());
|
||||
return ['error' => $e->getMessage()];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取键值
|
||||
* @param string $key 键名
|
||||
* @param mixed $default 默认值,当键不存在时返回
|
||||
* @param string $namespace 命名空间,用于区分不同的键值存储场景
|
||||
* @return mixed
|
||||
*/
|
||||
public static function get($key, $default = null, $namespace = 'default')
|
||||
{
|
||||
try {
|
||||
$es = new self();
|
||||
|
||||
// 如果索引不存在,直接返回默认值
|
||||
if (!$es->indexExists()) {
|
||||
return $default;
|
||||
}
|
||||
|
||||
// 生成文档ID
|
||||
$docId = "{$namespace}:{$key}";
|
||||
|
||||
// 查询参数
|
||||
$params = [
|
||||
'index' => self::indexName(),
|
||||
'id' => $docId
|
||||
];
|
||||
|
||||
try {
|
||||
// 获取文档
|
||||
$response = $es->client->get($params)->asArray();
|
||||
|
||||
// 获取值
|
||||
$value = $response['_source']['value'] ?? $default;
|
||||
|
||||
// 如果值是JSON字符串,尝试解码
|
||||
if (is_string($value) && $decoded = json_decode($value, true)) {
|
||||
if (json_last_error() === JSON_ERROR_NONE) {
|
||||
return $decoded;
|
||||
}
|
||||
}
|
||||
|
||||
return $value;
|
||||
} catch (\Exception $e) {
|
||||
// 文档不存在或其他错误,返回默认值
|
||||
return $default;
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::error('获取键值对失败: ' . $e->getMessage());
|
||||
return $default;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取键值,返回数组
|
||||
* @param string $key 键名
|
||||
* @param array $default 默认值,当键不存在时返回
|
||||
* @param string $namespace 命名空间,用于区分不同的键值存储场景
|
||||
* @return array
|
||||
*/
|
||||
public static function getArray($key, $default = [], $namespace = 'default')
|
||||
{
|
||||
return Base::string2array(self::get($key, $default, $namespace));
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除键值对
|
||||
* @param string $key 键名
|
||||
* @param string $namespace 命名空间
|
||||
* @return array
|
||||
*/
|
||||
public static function delete($key, $namespace = 'default')
|
||||
{
|
||||
try {
|
||||
$es = new self();
|
||||
|
||||
// 如果索引不存在,直接返回成功
|
||||
if (!$es->indexExists()) {
|
||||
return ['result' => 'not_found'];
|
||||
}
|
||||
|
||||
// 生成文档ID
|
||||
$docId = "{$namespace}:{$key}";
|
||||
|
||||
// 删除文档
|
||||
$result = $es->deleteDocument($docId);
|
||||
|
||||
// 刷新索引以确保立即生效
|
||||
$es->refreshIndex();
|
||||
|
||||
return $result;
|
||||
} catch (\Exception $e) {
|
||||
Log::error('删除键值对失败: ' . $e->getMessage());
|
||||
return ['error' => $e->getMessage()];
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,375 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace App\Module\ElasticSearch;
|
||||
|
||||
use App\Models\WebSocketDialogMsg;
|
||||
use App\Models\WebSocketDialogUser;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
|
||||
/**
|
||||
* 对话系统消息索引
|
||||
*
|
||||
* Class ElasticSearchUserMsg
|
||||
* @package App\Module\ElasticSearch
|
||||
*/
|
||||
class ElasticSearchUserMsg extends ElasticSearchBase
|
||||
{
|
||||
const indexName = 'dialog_user_msg';
|
||||
|
||||
/**
|
||||
* 构造函数
|
||||
* @return ElasticSearchBase
|
||||
* @throws \Elastic\Elasticsearch\Exception\ConfigException
|
||||
*/
|
||||
public function __construct()
|
||||
{
|
||||
return parent::__construct(self::indexName());
|
||||
}
|
||||
|
||||
/** ******************************************************************************************************** */
|
||||
/** *********************************************** 基础 ************************************************** */
|
||||
/** ******************************************************************************************************** */
|
||||
|
||||
/**
|
||||
* 创建聊天系统索引 - 使用父子关系
|
||||
* @return array
|
||||
*/
|
||||
public static function generateIndex()
|
||||
{
|
||||
// 定义映射
|
||||
$mappings = [
|
||||
'properties' => [
|
||||
// 共用字段
|
||||
'dialog_id' => ['type' => 'keyword'],
|
||||
'created_at' => ['type' => 'date'],
|
||||
'updated_at' => ['type' => 'date'],
|
||||
|
||||
// dialog_users 字段
|
||||
'userid' => ['type' => 'keyword'],
|
||||
'top_at' => ['type' => 'date'],
|
||||
'last_at' => ['type' => 'date'],
|
||||
'mark_unread' => ['type' => 'integer'],
|
||||
'silence' => ['type' => 'integer'],
|
||||
'hide' => ['type' => 'integer'],
|
||||
'color' => ['type' => 'keyword'],
|
||||
|
||||
// dialog_msgs 字段
|
||||
'msg_id' => ['type' => 'keyword'],
|
||||
'sender_userid' => ['type' => 'keyword'],
|
||||
'msg_type' => ['type' => 'keyword'],
|
||||
'key' => ['type' => 'text'],
|
||||
'bot' => ['type' => 'integer'],
|
||||
|
||||
// Join字段定义父子关系
|
||||
'relationship' => [
|
||||
'type' => 'join',
|
||||
'relations' => [
|
||||
'dialog_user' => 'dialog_msg' // dialog_user是父文档,dialog_msg是子文档
|
||||
]
|
||||
],
|
||||
]
|
||||
];
|
||||
|
||||
// 索引设置
|
||||
$settings = [
|
||||
'number_of_shards' => 5,
|
||||
'number_of_replicas' => 1,
|
||||
'refresh_interval' => '5s'
|
||||
];
|
||||
|
||||
try {
|
||||
$es = new self();
|
||||
return $es->createIndex($settings, $mappings);
|
||||
} catch (\Exception $e) {
|
||||
Log::error('创建聊天系统索引失败: ' . $e->getMessage());
|
||||
return ['error' => $e->getMessage()];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建对话系统特定的搜索 - 根据用户ID和消息关键词搜索会话
|
||||
* @param string $userid 用户ID
|
||||
* @param string $keyword 消息关键词
|
||||
* @param int $size 返回结果数量
|
||||
* @return array
|
||||
*/
|
||||
public static function searchByKeyword($userid, $keyword, $size = 20)
|
||||
{
|
||||
// 注意这里的类型名称要与创建索引时的一致
|
||||
$query = [
|
||||
'bool' => [
|
||||
'must' => [
|
||||
[
|
||||
'term' => [
|
||||
'userid' => $userid
|
||||
]
|
||||
],
|
||||
[
|
||||
'has_child' => [
|
||||
'type' => 'dialog_msg',
|
||||
'query' => [
|
||||
'bool' => [
|
||||
'must' => [
|
||||
[
|
||||
'match_phrase' => [
|
||||
'key' => $keyword
|
||||
]
|
||||
],
|
||||
[
|
||||
'term' => [
|
||||
'bot' => 0
|
||||
]
|
||||
]
|
||||
]
|
||||
]
|
||||
],
|
||||
'inner_hits' => [
|
||||
'size' => 1,
|
||||
'sort' => [
|
||||
'msg_id' => 'desc'
|
||||
]
|
||||
]
|
||||
]
|
||||
]
|
||||
]
|
||||
]
|
||||
];
|
||||
|
||||
// 结果集合
|
||||
$searchMap = [];
|
||||
|
||||
try {
|
||||
// 开始搜索
|
||||
$es = new self();
|
||||
$results = $es->search($query, 0, $size, ['last_at' => 'desc']);
|
||||
|
||||
// 处理搜索结果
|
||||
$hits = $results['hits']['hits'] ?? [];
|
||||
|
||||
foreach ($hits as $hit) {
|
||||
if (isset($hit['inner_hits']['dialog_msg']['hits']['hits'][0])) {
|
||||
$msgHit = $hit['inner_hits']['dialog_msg']['hits']['hits'][0];
|
||||
$source = $hit['_source'];
|
||||
$msgSource = $msgHit['_source'];
|
||||
|
||||
$searchMap[] = [
|
||||
'id' => $source['dialog_id'],
|
||||
'top_at' => $source['top_at'],
|
||||
'last_at' => $source['last_at'],
|
||||
'mark_unread' => $source['mark_unread'],
|
||||
'silence' => $source['silence'],
|
||||
'hide' => $source['hide'],
|
||||
'color' => $source['color'],
|
||||
'user_at' => $source['updated_at'],
|
||||
'search_msg_id' => $msgSource['msg_id'],
|
||||
];
|
||||
}
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::error('searchByKeyword: ' . $e->getMessage());
|
||||
}
|
||||
|
||||
// 返回搜索结果
|
||||
return $searchMap;
|
||||
}
|
||||
|
||||
/** ******************************************************************************************************** */
|
||||
/** *********************************************** 用户 ************************************************** */
|
||||
/** ******************************************************************************************************** */
|
||||
|
||||
/**
|
||||
* 会话用户 - 生成文档ID
|
||||
* @param WebSocketDialogUser $dialogUser
|
||||
* @return string
|
||||
*/
|
||||
public static function generateUserDicId(WebSocketDialogUser $dialogUser)
|
||||
{
|
||||
return "user_{$dialogUser->userid}_dialog_{$dialogUser->dialog_id}";
|
||||
}
|
||||
|
||||
/**
|
||||
* 会话用户 - 生成文档格式
|
||||
* @param WebSocketDialogUser $dialogUser
|
||||
* @return array
|
||||
*/
|
||||
public static function generateUserFormat(WebSocketDialogUser $dialogUser)
|
||||
{
|
||||
return [
|
||||
'dialog_id' => $dialogUser->dialog_id,
|
||||
'created_at' => $dialogUser->created_at,
|
||||
'updated_at' => $dialogUser->updated_at,
|
||||
|
||||
'userid' => $dialogUser->userid,
|
||||
'top_at' => $dialogUser->top_at,
|
||||
'last_at' => $dialogUser->last_at,
|
||||
'mark_unread' => $dialogUser->mark_unread ? 1 : 0,
|
||||
'silence' => $dialogUser->silence ? 1 : 0,
|
||||
'hide' => $dialogUser->hide ? 1 : 0,
|
||||
'color' => $dialogUser->color,
|
||||
|
||||
'relationship' => [
|
||||
'name' => 'dialog_user'
|
||||
]
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* 会话用户 - 同步到Elasticsearch
|
||||
* @param WebSocketDialogUser $dialogUser
|
||||
* @return void
|
||||
*/
|
||||
public static function syncUser(WebSocketDialogUser $dialogUser)
|
||||
{
|
||||
try {
|
||||
$es = new self();
|
||||
$es->indexDocument(self::generateUserFormat($dialogUser), self::generateUserDicId($dialogUser));
|
||||
} catch (\Exception $e) {
|
||||
Log::error('syncUser: ' . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 会话用户 - 从Elasticsearch删除
|
||||
*/
|
||||
public static function deleteUser(WebSocketDialogUser $dialogUser)
|
||||
{
|
||||
try {
|
||||
$es = new self();
|
||||
|
||||
$docId = "user_{$dialogUser->userid}_dialog_{$dialogUser->dialog_id}";
|
||||
|
||||
// 删除用户-会话文档
|
||||
$es->deleteDocument($docId);
|
||||
|
||||
// 注意:这里可能还需要删除所有关联的消息文档
|
||||
// 但由于父子关系,可以通过查询找到所有子文档并删除
|
||||
// 这里为简化,可以选择在后台任务中处理,或者直接依赖ES的级联删除功能
|
||||
|
||||
} catch (\Exception $e) {
|
||||
Log::error('deleteUser: ' . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/** ******************************************************************************************************** */
|
||||
/** *********************************************** 消息 ************************************************** */
|
||||
/** ******************************************************************************************************** */
|
||||
|
||||
/**
|
||||
* 会话消息 - 生成父文档ID
|
||||
* @param WebSocketDialogMsg $dialogMsg
|
||||
* @param $userid
|
||||
* @return string
|
||||
*/
|
||||
public static function generateMsgParentId(WebSocketDialogMsg $dialogMsg, $userid)
|
||||
{
|
||||
return "user_{$userid}_dialog_{$dialogMsg->dialog_id}";
|
||||
}
|
||||
|
||||
/**
|
||||
* 会话消息 - 生成文档ID
|
||||
* @param WebSocketDialogMsg $dialogMsg
|
||||
* @param $userid
|
||||
* @return string
|
||||
*/
|
||||
public static function generateMsgDicId(WebSocketDialogMsg $dialogMsg, $userid)
|
||||
{
|
||||
return "msg_{$dialogMsg->id}_user_{$userid}";
|
||||
}
|
||||
|
||||
/**
|
||||
* 会话消息 - 生成文档格式
|
||||
* @param WebSocketDialogMsg $dialogMsg
|
||||
* @param $userid
|
||||
* @return array
|
||||
*/
|
||||
public static function generateMsgFormat(WebSocketDialogMsg $dialogMsg, $userid)
|
||||
{
|
||||
return [
|
||||
'dialog_id' => $dialogMsg->dialog_id,
|
||||
'created_at' => $dialogMsg->created_at,
|
||||
'updated_at' => $dialogMsg->updated_at,
|
||||
|
||||
'msg_id' => $dialogMsg->id,
|
||||
'sender_userid' => $dialogMsg->userid,
|
||||
'msg_type' => $dialogMsg->type,
|
||||
'key' => $dialogMsg->key,
|
||||
'bot' => $dialogMsg->bot ? 1 : 0,
|
||||
|
||||
'relationship' => [
|
||||
'name' => 'dialog_msg',
|
||||
'parent' => self::generateMsgParentId($dialogMsg, $userid)
|
||||
]
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* 会话消息 - 同步到Elasticsearch
|
||||
*/
|
||||
public static function syncMsg(WebSocketDialogMsg $dialogMsg)
|
||||
{
|
||||
try {
|
||||
$es = new self();
|
||||
|
||||
// 获取此会话的所有用户
|
||||
$dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get();
|
||||
|
||||
if ($dialogUsers->isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
$params = ['body' => []];
|
||||
|
||||
foreach ($dialogUsers as $dialogUser) {
|
||||
$params['body'][] = [
|
||||
'index' => [
|
||||
'_index' => self::indexName(),
|
||||
'_id' => self::generateMsgDicId($dialogMsg, $dialogUser->userid),
|
||||
'routing' => self::generateMsgParentId($dialogMsg, $dialogUser->userid)
|
||||
]
|
||||
];
|
||||
$params['body'][] = self::generateMsgFormat($dialogMsg, $dialogUser->userid);
|
||||
}
|
||||
|
||||
if (!empty($params['body'])) {
|
||||
$es->bulk($params);
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::error('syncMsg: ' . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 会话消息 - 从Elasticsearch删除
|
||||
*/
|
||||
public static function deleteMsg(WebSocketDialogMsg $dialogMsg)
|
||||
{
|
||||
try {
|
||||
$es = new self();
|
||||
|
||||
// 获取此会话的所有用户
|
||||
$dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get();
|
||||
|
||||
if ($dialogUsers->isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
$params = ['body' => []];
|
||||
|
||||
foreach ($dialogUsers as $dialogUser) {
|
||||
$params['body'][] = [
|
||||
'delete' => [
|
||||
'_index' => self::indexName(),
|
||||
'_id' => self::generateMsgDicId($dialogMsg, $dialogUser->userid),
|
||||
'routing' => self::generateMsgParentId($dialogMsg, $dialogUser->userid)
|
||||
]
|
||||
];
|
||||
}
|
||||
|
||||
if (!empty($params['body'])) {
|
||||
$es->bulk($params);
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::error('deleteMsg: ' . $e->getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -131,7 +131,21 @@ class ZincSearchUserMsg
|
||||
];
|
||||
|
||||
try {
|
||||
return ZincSearchBase::elasticSearch(self::$indexName, $searchParams);
|
||||
$result = ZincSearchBase::elasticSearch(self::$indexName, $searchParams);
|
||||
return array_map(function ($hit) {
|
||||
$source = $hit['_source'];
|
||||
return [
|
||||
'id' => $source['dialog_id'],
|
||||
'top_at' => $source['top_at'],
|
||||
'last_at' => $source['last_at'],
|
||||
'mark_unread' => $source['mark_unread'],
|
||||
'silence' => $source['silence'],
|
||||
'hide' => $source['hide'],
|
||||
'color' => $source['color'],
|
||||
'user_at' => $source['updated_at'],
|
||||
'search_msg_id' => $source['msg_id'],
|
||||
];
|
||||
}, $result['data']['hits']['hits'] ?? []);
|
||||
} catch (\Exception $e) {
|
||||
Log::error('搜索对话消息失败: ' . $e->getMessage());
|
||||
return [
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
namespace App\Observers;
|
||||
|
||||
use App\Models\WebSocketDialogMsg;
|
||||
use App\Module\ElasticSearch\ElasticSearchUserMsg;
|
||||
use App\Module\ZincSearch\ZincSearchUserMsg;
|
||||
|
||||
class WebSocketDialogMsgObserver
|
||||
{
|
||||
@ -15,7 +15,7 @@ class WebSocketDialogMsgObserver
|
||||
*/
|
||||
public function created(WebSocketDialogMsg $webSocketDialogMsg)
|
||||
{
|
||||
ElasticSearchUserMsg::syncMsg($webSocketDialogMsg);
|
||||
ZincSearchUserMsg::syncMsg($webSocketDialogMsg);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -26,7 +26,7 @@ class WebSocketDialogMsgObserver
|
||||
*/
|
||||
public function updated(WebSocketDialogMsg $webSocketDialogMsg)
|
||||
{
|
||||
ElasticSearchUserMsg::syncMsg($webSocketDialogMsg);
|
||||
ZincSearchUserMsg::syncMsg($webSocketDialogMsg);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -37,7 +37,7 @@ class WebSocketDialogMsgObserver
|
||||
*/
|
||||
public function deleted(WebSocketDialogMsg $webSocketDialogMsg)
|
||||
{
|
||||
ElasticSearchUserMsg::deleteMsg($webSocketDialogMsg);
|
||||
ZincSearchUserMsg::deleteMsg($webSocketDialogMsg);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -4,7 +4,7 @@ namespace App\Observers;
|
||||
|
||||
use App\Models\Deleted;
|
||||
use App\Models\WebSocketDialogUser;
|
||||
use App\Module\ElasticSearch\ElasticSearchUserMsg;
|
||||
use App\Module\ZincSearch\ZincSearchUserMsg;
|
||||
use Carbon\Carbon;
|
||||
|
||||
class WebSocketDialogUserObserver
|
||||
@ -30,7 +30,7 @@ class WebSocketDialogUserObserver
|
||||
}
|
||||
}
|
||||
Deleted::forget('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid);
|
||||
ElasticSearchUserMsg::syncUser($webSocketDialogUser);
|
||||
ZincSearchUserMsg::syncUser($webSocketDialogUser);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -41,7 +41,7 @@ class WebSocketDialogUserObserver
|
||||
*/
|
||||
public function updated(WebSocketDialogUser $webSocketDialogUser)
|
||||
{
|
||||
ElasticSearchUserMsg::syncUser($webSocketDialogUser);
|
||||
ZincSearchUserMsg::syncUser($webSocketDialogUser);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -53,7 +53,7 @@ class WebSocketDialogUserObserver
|
||||
public function deleted(WebSocketDialogUser $webSocketDialogUser)
|
||||
{
|
||||
Deleted::record('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid);
|
||||
ElasticSearchUserMsg::deleteUser($webSocketDialogUser);
|
||||
ZincSearchUserMsg::deleteUser($webSocketDialogUser);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -6,9 +6,9 @@ use Carbon\Carbon;
|
||||
use Illuminate\Support\Facades\Cache;
|
||||
|
||||
/**
|
||||
* 同步聊天数据到Elasticsearch
|
||||
* 同步聊天数据到ZincSearch
|
||||
*/
|
||||
class ElasticSearchSyncTask extends AbstractTask
|
||||
class ZincSearchSyncTask extends AbstractTask
|
||||
{
|
||||
public function __construct()
|
||||
{
|
||||
@ -18,19 +18,19 @@ class ElasticSearchSyncTask extends AbstractTask
|
||||
public function start()
|
||||
{
|
||||
// 120分钟执行一次
|
||||
$time = intval(Cache::get("ElasticSearchSyncTask:Time"));
|
||||
$time = intval(Cache::get("ZincSearchSyncTask:Time"));
|
||||
if (time() - $time < 120 * 60) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 执行开始,120分钟后缓存标记失效
|
||||
Cache::put("ElasticSearchSyncTask:Time", time(), Carbon::now()->addMinutes(120));
|
||||
Cache::put("ZincSearchSyncTask:Time", time(), Carbon::now()->addMinutes(120));
|
||||
|
||||
// 开始执行同步
|
||||
@shell_exec("php /var/www/artisan elasticsearch:sync-dialog-user-msg --i");
|
||||
@shell_exec("php /var/www/artisan zinc:sync-user-msg --i");
|
||||
|
||||
// 执行完成,5分钟后缓存标记失效(5分钟任务可重复执行)
|
||||
Cache::put("ElasticSearchSyncTask:Time", time(), Carbon::now()->addMinutes(5));
|
||||
Cache::put("ZincSearchSyncTask:Time", time(), Carbon::now()->addMinutes(5));
|
||||
}
|
||||
|
||||
public function end()
|
||||
@ -20,7 +20,6 @@
|
||||
"ext-simplexml": "*",
|
||||
"ext-zip": "*",
|
||||
"directorytree/ldaprecord-laravel": "^2.7",
|
||||
"elasticsearch/elasticsearch": "^8.17",
|
||||
"fideloper/proxy": "^4.4.1",
|
||||
"firebase/php-jwt": "^6.9",
|
||||
"fruitcake/laravel-cors": "^2.0.4",
|
||||
|
||||
@ -217,21 +217,6 @@ services:
|
||||
ipv4_address: "${APP_IPPR}.14"
|
||||
restart: unless-stopped
|
||||
|
||||
es:
|
||||
container_name: "dootask-es-${APP_ID}"
|
||||
image: "elasticsearch:8.17.2"
|
||||
volumes:
|
||||
- ./docker/es/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
|
||||
- ./docker/es/data:/usr/share/elasticsearch/data
|
||||
environment:
|
||||
discovery.type: single-node
|
||||
xpack.security.enabled: false
|
||||
ES_JAVA_OPTS: "-Xms1g -Xmx1g"
|
||||
networks:
|
||||
extnetwork:
|
||||
ipv4_address: "${APP_IPPR}.15"
|
||||
restart: unless-stopped
|
||||
|
||||
search:
|
||||
container_name: "dootask-search-${APP_ID}"
|
||||
image: "public.ecr.aws/zinclabs/zincsearch:0.4.10"
|
||||
@ -246,7 +231,7 @@ services:
|
||||
ZINC_FIRST_ADMIN_PASSWORD: "${DB_PASSWORD}"
|
||||
networks:
|
||||
extnetwork:
|
||||
ipv4_address: "${APP_IPPR}.16"
|
||||
ipv4_address: "${APP_IPPR}.15"
|
||||
restart: unless-stopped
|
||||
|
||||
networks:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user