no message

This commit is contained in:
kuaifan 2025-04-24 08:37:35 +08:00
parent f5a343f358
commit eccb3e2825
6 changed files with 60 additions and 33 deletions

View File

@ -151,6 +151,25 @@ class AbstractModel extends Model
return $date->format($this->dateFormat ?: 'Y-m-d H:i:s'); return $date->format($this->dateFormat ?: 'Y-m-d H:i:s');
} }
/**
* 通过模型创建实例
* @param array $param
* @param bool $force
* @return static
*/
public static function fillInstance(array $param = [], bool $force = true)
{
$instance = new static;
if ($param) {
if ($force) {
$instance->forceFill($param);
} else {
$instance->fill($param);
}
}
return $instance;
}
/** /**
* 创建/更新数据 * 创建/更新数据
* @param array $param * @param array $param
@ -209,15 +228,23 @@ class AbstractModel extends Model
/** /**
* 数据库更新或插入 * 数据库更新或插入
* @param $where * @param array $where 查询条件
* @param array|\Closure $update 存在时更新的内容 * @param array|\Closure $update 存在时更新的内容
* @param array|\Closure $insert 不存在时插入的内容,如果没有则插入更新内容 * @param array|\Closure $insert 不存在时插入的内容,如果没有则插入更新内容
* @param bool $isInsert 是否是插入数据 * @param bool $isInsert 是否是插入数据
* @param bool|null $lockForUpdate 是否加锁true:加锁false:不加锁null:在事务中会自动加锁)
* @return AbstractModel|\Illuminate\Database\Eloquent\Builder|Model|object|static|null * @return AbstractModel|\Illuminate\Database\Eloquent\Builder|Model|object|static|null
*/ */
public static function updateInsert($where, $update = [], $insert = [], &$isInsert = true) public static function updateInsert($where, $update = [], $insert = [], &$isInsert = true, $lockForUpdate = null)
{ {
$row = static::where($where)->first(); $query = static::where($where);
if ($lockForUpdate === null) {
$lockForUpdate = \DB::transactionLevel() > 0;
}
if ($lockForUpdate) {
$query->lockForUpdate();
}
$row = $query->first();
if (empty($row)) { if (empty($row)) {
$row = new static; $row = new static;
if ($insert instanceof \Closure) { if ($insert instanceof \Closure) {

View File

@ -429,10 +429,9 @@ class ZincSearchDialogMsg
/** /**
* 同步用户(建议在异步进程中使用) * 同步用户(建议在异步进程中使用)
* @param WebSocketDialogUser $dialogUser * @param WebSocketDialogUser $dialogUser
* @param bool $full 跳过判断是否已经存在(全量更新)
* @return bool * @return bool
*/ */
public static function userSync(WebSocketDialogUser $dialogUser, bool $full = false): bool public static function userSync(WebSocketDialogUser $dialogUser): bool
{ {
if (!self::ensureIndex()) { if (!self::ensureIndex()) {
return false; return false;
@ -453,13 +452,8 @@ class ZincSearchDialogMsg
try { try {
// 查询用户是否存在 // 查询用户是否存在
if ($full) { $result = ZincSearchBase::elasticSearch(self::$indexNameUser, $searchParams);
$hits = null; $hits = $result['data']['hits']['hits'] ?? [];
} else {
$result = ZincSearchBase::elasticSearch(self::$indexNameUser, $searchParams);
$hits = $result['data']['hits']['hits'] ?? [];
}
// 同步用户(存在更新、不存在添加) // 同步用户(存在更新、不存在添加)
$result = ZincSearchBase::addDoc(self::$indexNameUser, $data); $result = ZincSearchBase::addDoc(self::$indexNameUser, $data);

View File

@ -16,7 +16,7 @@ class WebSocketDialogMsgObserver
*/ */
public function created(WebSocketDialogMsg $webSocketDialogMsg) public function created(WebSocketDialogMsg $webSocketDialogMsg)
{ {
Task::deliver(new ZincSearchSyncTask('sync', $webSocketDialogMsg)); Task::deliver(new ZincSearchSyncTask('sync', $webSocketDialogMsg->toArray()));
} }
/** /**
@ -27,7 +27,7 @@ class WebSocketDialogMsgObserver
*/ */
public function updated(WebSocketDialogMsg $webSocketDialogMsg) public function updated(WebSocketDialogMsg $webSocketDialogMsg)
{ {
Task::deliver(new ZincSearchSyncTask('sync', $webSocketDialogMsg)); Task::deliver(new ZincSearchSyncTask('sync', $webSocketDialogMsg->toArray()));
} }
/** /**
@ -38,7 +38,7 @@ class WebSocketDialogMsgObserver
*/ */
public function deleted(WebSocketDialogMsg $webSocketDialogMsg) public function deleted(WebSocketDialogMsg $webSocketDialogMsg)
{ {
Task::deliver(new ZincSearchSyncTask('delete', $webSocketDialogMsg)); Task::deliver(new ZincSearchSyncTask('delete', $webSocketDialogMsg->toArray()));
} }
/** /**

View File

@ -31,7 +31,7 @@ class WebSocketDialogUserObserver
} }
} }
Deleted::forget('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid); Deleted::forget('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid);
Task::deliver(new ZincSearchSyncTask('userCreated', $webSocketDialogUser)); Task::deliver(new ZincSearchSyncTask('userSync', $webSocketDialogUser->toArray()));
} }
/** /**
@ -42,7 +42,7 @@ class WebSocketDialogUserObserver
*/ */
public function updated(WebSocketDialogUser $webSocketDialogUser) public function updated(WebSocketDialogUser $webSocketDialogUser)
{ {
Task::deliver(new ZincSearchSyncTask('userSync', $webSocketDialogUser)); Task::deliver(new ZincSearchSyncTask('userSync', $webSocketDialogUser->toArray()));
} }
/** /**
@ -54,7 +54,7 @@ class WebSocketDialogUserObserver
public function deleted(WebSocketDialogUser $webSocketDialogUser) public function deleted(WebSocketDialogUser $webSocketDialogUser)
{ {
Deleted::record('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid); Deleted::record('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid);
Task::deliver(new ZincSearchSyncTask('delete', $webSocketDialogUser)); Task::deliver(new ZincSearchSyncTask('deleteUser', $webSocketDialogUser->toArray()));
} }
/** /**

View File

@ -2,6 +2,8 @@
namespace App\Tasks; namespace App\Tasks;
use App\Models\WebSocketDialogMsg;
use App\Models\WebSocketDialogUser;
use App\Module\ZincSearch\ZincSearchDialogMsg; use App\Module\ZincSearch\ZincSearchDialogMsg;
use Carbon\Carbon; use Carbon\Carbon;
use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Cache;
@ -26,19 +28,23 @@ class ZincSearchSyncTask extends AbstractTask
{ {
switch ($this->action) { switch ($this->action) {
case 'sync': case 'sync':
// 同步聊天数据 // 同步消息数据
ZincSearchDialogMsg::sync($this->data); ZincSearchDialogMsg::sync(WebSocketDialogMsg::fillInstance($this->data));
break;
case 'userCreated':
case 'userSync':
// 同步用户数据
ZincSearchDialogMsg::userSync($this->data, $this->action === 'userCreated');
break; break;
case 'delete': case 'delete':
// 删除消息数据
ZincSearchDialogMsg::delete(WebSocketDialogMsg::fillInstance($this->data));
break;
case 'userSync':
// 同步用户数据
ZincSearchDialogMsg::userSync(WebSocketDialogUser::fillInstance($this->data));
break;
case 'deleteUser':
// 删除用户数据 // 删除用户数据
ZincSearchDialogMsg::delete($this->data); ZincSearchDialogMsg::delete(WebSocketDialogUser::fillInstance($this->data));
break; break;
default: default:

View File

@ -24,14 +24,14 @@
<Select v-model="keys.status" :placeholder="$L('全部')"> <Select v-model="keys.status" :placeholder="$L('全部')">
<Option value="">{{$L('全部')}}</Option> <Option value="">{{$L('全部')}}</Option>
<template v-if="flows.type==='group'"> <template v-if="flows.type==='group'">
<OptionGroup v-for="group in flows.groups" :label="group.label"> <OptionGroup v-for="(group, index) in flows.groups" :key="index" :label="group.label">
<Option v-for="item in group.items" :value="item.id" :label="item.name"> <Option v-for="(item, key) in group.items" :key="key" :value="item.id" :label="item.name">
<div class="tag-dot" :class="item.status">{{item.name}}</div> <div class="tag-dot" :class="item.status">{{item.name}}</div>
</Option> </Option>
</OptionGroup> </OptionGroup>
</template> </template>
<template v-else> <template v-else>
<Option v-for="item in flows.items" :value="item.id" :label="item.name"> <Option v-for="(item, key) in flows.items" :key="key" :value="item.id" :label="item.name">
<div class="tag-dot" :class="item.status">{{item.name}}</div> <div class="tag-dot" :class="item.status">{{item.name}}</div>
</Option> </Option>
</template> </template>