先安装composer包
composer require workerman/crontab
composer require webman/channel
我使用的表结构 (这里可以使用任意表结构)
CREATE TABLE IF NOT EXISTS __PREFIX__system_task
(
id
int(11) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'ID',
method
varchar(100) NOT NULL DEFAULT '' COMMENT '任务方法',
title
varchar(255) NOT NULL DEFAULT '' COMMENT '任务描述',
content
text DEFAULT NULL COMMENT '事件内容',
memo
text DEFAULT NULL COMMENT '参数描述',
schedule
varchar(100) NOT NULL DEFAULT '' COMMENT 'Crontab格式',
maximums
int(10) UNSIGNED NOT NULL DEFAULT '0' COMMENT '最大执行次数 0为不限',
executes
int(10) UNSIGNED NOT NULL DEFAULT '0' COMMENT '已经执行的次数',
begin_at
datetime DEFAULT NULL COMMENT '开始时间',
end_at
datetime DEFAULT NULL COMMENT '结束时间',
execute_at
datetime DEFAULT NULL COMMENT '最后执行时间',
is_only_ones
tinyint(4) DEFAULT 0 COMMENT '同一时刻是否只执行一次(0-否,1-是)',
is_queue_run
tinyint(4) DEFAULT 0 COMMENT '是否队列运行(0-否,1-是)',
weigh
int(11) DEFAULT 1 COMMENT '排序',
status
tinyint(4) NOT NULL DEFAULT 1 COMMENT '状态:1-启用,0-禁用',
created_by
int(11) DEFAULT NULL COMMENT '创建者',
updated_by
int(11) DEFAULT NULL COMMENT '更新者',
created_at
datetime NOT NULL COMMENT '创建时间',
updated_at
datetime NOT NULL COMMENT '更新时间',
deleted_at
datetime DEFAULT NULL COMMENT '删除时间',
PRIMARY KEY (id
)
) ENGINE=InnoDB AUTO_INCREMENT=1 CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='定时任务表' ROW_FORMAT=DYNAMIC;
model类
<?php
declare(strict_types=1);
/**
* @opLog description Model
* @Datetime: 2025-08-11 11:43:21
* @Username: admin
*/
namespace app\api\model\system;
use app\api\base\BaseModel;
class Task extends BaseModel
{
//ID
const ID = 'id'; // integer;
//任务方法
const METHOD = 'method'; // string;
//任务描述
const TITLE = 'title'; // string;
//事件内容
const CONTENT = 'content'; // string;
//参数描述
const MEMO = 'memo'; // string;
//Crontab格式
const SCHEDULE = 'schedule'; // string;
//最大执行次数 0为不限
const MAXIMUMS = 'maximums'; // integer;
//已经执行的次数
const EXECUTES = 'executes'; // integer;
//开始时间
const BEGIN_AT = 'begin_at'; // dateFormat:Y-m-d H:i:s;
//结束时间
const END_AT = 'end_at'; // dateFormat:Y-m-d H:i:s;
//最后执行时间
const EXECUTE_AT = 'execute_at'; // dateFormat:Y-m-d H:i:s;
//同一时刻是否只执行一次(0-否,1-是)
const IS_ONLY_ONES = 'is_only_ones'; // integer;
//是否队列运行
const IS_QUEUE_RUN = 'is_queue_run'; // integer;
//排序
const WEIGH = 'weigh'; // integer;
//状态:1-启用,0-禁用
const STATUS = 'status'; // integer;
//创建者
const CREATED_BY = 'created_by'; // integer;
//更新者
const UPDATED_BY = 'updated_by'; // integer;
/**
* The table associated with the model.
* @var string
*/
protected $table = 'system_task';
/**
* 自定义每个属性的类型
* @return void
*/
protected $casts = [
self::ID => 'integer',
self::METHOD => 'string',
self::TITLE => 'string',
self::CONTENT => 'json',
self::MEMO => 'json',
self::SCHEDULE => 'string',
self::MAXIMUMS => 'integer',
self::EXECUTES => 'integer',
self::BEGIN_AT => 'datetime:Y-m-d H:i:s',
self::END_AT => 'datetime:Y-m-d H:i:s',
self::EXECUTE_AT => 'datetime:Y-m-d H:i:s',
self::IS_ONLY_ONES => 'integer',
self::IS_QUEUE_RUN => 'integer',
self::WEIGH => 'integer',
self::STATUS => 'integer',
self::CREATED_BY => 'integer',
self::UPDATED_BY => 'integer',
self::CREATED_AT => 'datetime:Y-m-d H:i:s',
self::UPDATED_AT => 'datetime:Y-m-d H:i:s',
self::DELETED_AT => 'datetime:Y-m-d H:i:s',
];
protected $eventName = 'task_update'; // Channel事件名称
/**
* 模型数据转换成 JSON
* @param mixed $value
* @return string
*/
protected function asJson($value)
{
return json_encode($value, JSON_UNESCAPED_UNICODE);
}
protected static function boot()
{
parent::boot();
// 监听新增后事件
static::created(function ($model) {
\app\lib\ChannelClient::publish($model->eventName, [$model->primaryKey => $model->{self::ID}]);
});
// 监听删除事件
static::deleted(function ($model) {
\app\lib\ChannelClient::publish($model->eventName, [$model->primaryKey => $model->{self::ID}]);
});
// 监听更新后事件(检查schedule是否变更)
static::updated(function ($model) {
// 1. 获取所有修改过的字段(新值)
$changedFields = $model->getChanges();
// 2. 检查schedule是否被修改
if (isset($changedFields[self::SCHEDULE])) {
\app\lib\ChannelClient::publish($model->eventName, [$model->primaryKey => $model->{self::ID}]);
}
});
}
}
自定义进程
<?php
declare(strict_types=1);
/**
* 处理定时任务进程(定时任务更改时间间隔下一分钟生效)
*/
namespace app\process;
use support\exception\BusinessException;
use Workerman\Crontab\Crontab;
use app\api\model\system\Task;
class CronJob
{
protected $logName = 'task'; // 日志名称
private $eventName = 'task_update'; // 订阅Channel事件名称
protected $task = \app\lib\Task::class; // 执行定时任务类
protected $taskIds = []; //定时任务表id => 定时任务对象id 集合
protected $model; // 任务数据访问对象
public function __construct()
{
$this->model = \support\Container::get(Task::class);
}
/**
* @opLog description 进程启动
* @Datetime: 2025/01/10
* @return void
* @Username: admin
*/
public function onWorkerStart()
{
try {
$taskList = $this->model->all(); // 获取任务列表
foreach ($taskList as $task) {
$this->addCrontab($task);
}
\app\lib\ChannelClient::subscribe($this->eventName, function($data){ // 订阅Channel事件(定时任务更新、添加、删除)触发
$this->taskUpdate($data['data']);
});
} catch (\Throwable $exception) {
call_user_func([\support\Log::channel(), 'ERROR'], json_encode(['errMessage'=>$exception->getMessage(),'errCode'=>$exception->getCode(),'errFile'=>$exception->getFile(),'errLine'=>$exception->getLine()], JSON_UNESCAPED_UNICODE));
}
}
/**
* @opLog description 定时任务更新
* @param array $data 定时任务数据['id' => 1]
* @Datetime: 2025/01/10
* @return void
* @Username: admin
*/
protected function taskUpdate(array $data) : void
{
try {
$pk = intval($data[Task::ID] ?? 0); // 定时任务表主键值
if(isset($this->taskIds[$pk])){ // 如果任务已存在 删除
Crontab::remove($this->taskIds[$pk]);
unset($this->taskIds[$pk]);
}
$task = $this->model->find($pk);// 查询定时任务表数据
if ($task) { // 任务存在 添加
$this->addCrontab($task);
}
} catch (\Throwable $exception) {
call_user_func([\support\Log::channel(), 'ERROR'], json_encode(['errMessage'=>$exception->getMessage(),'errCode'=>$exception->getCode(),'errFile'=>$exception->getFile(),'errLine'=>$exception->getLine()], JSON_UNESCAPED_UNICODE));
}
}
/**
* @opLog description 添加定时任务
* @param object $task 定时任务对象
* @Datetime: 2025/01/10
* @return void
* @Username: admin
*/
private function addCrontab(object $task) : void
{
$crontab = new Crontab($task->{Task::SCHEDULE}, function () use ($task) { // 添加定时任务
$this->run($task);
});
$this->taskIds[intval($task->{Task::ID})] = $crontab->getId(); // 定时任务表id => 定时任务对象id
}
/**
* @opLog description 运行任务
* @Datetime: 2025/01/10
* @return object
* @Username: admin
*/
protected function run(object $task) : bool
{
$task = $this->model->where(Task::ID, $task->{Task::ID})->first(); // 获取任务信息
if(!$task){ // 没有任务
return false;
}
$task = $task->toArray(); // 转换成数组
if(!class_exists($this->task) || !method_exists($this->task, $task[Task::METHOD])){ //方法不存在
return false;
}
if(!$task[Task::STATUS]){ //如果任务不是启用状态
return false;
}
if($task[Task::MAXIMUMS] && $task[Task::EXECUTES] > $task[Task::MAXIMUMS]){ //判断任务执行次数是否超过最大执行次数
return false;
}
if((!empty($task[Task::BEGIN_AT]) && time() < strtotime($task[Task::BEGIN_AT])) || (!empty($task[Task::END_AT]) && time() > strtotime($task[Task::END_AT]))){ //判断任务是否在执行时间内
return false;
}
if($task[Task::IS_ONLY_ONES] && !\app\lib\RateLimiting::getLock('task_' . $task[Task::ID], 500)){ //判断任务是否多进程只执行一次
return false;
}
if(!$task[Task::IS_QUEUE_RUN]){ //不是队列方式执行
$this->taskRun($task); // 直接执行任务
}else{
queue_send('task_run', $task); // 用队列执行任务
}
return true;
}
/**
* @opLog description 执行具体任务
* @Datetime: 2025/01/10
* @return void
* @Username: admin
*/
public function taskRun(array $task): void
{
$start_time = microtime(true); // 开始时间
$start_memory = memory_get_usage(); // 开始内存使用量
$method = 'INFO'; // 默认日志级别
$content = $task[Task::CONTENT] ? json_decode($task[Task::CONTENT], true) : []; // 获取执行任务的参数
try {
$res = call_user_func([$this->task, $task[Task::METHOD]], $content); // 执行任务
$this->model->where(Task::ID, $task[Task::ID])->increment(Task::EXECUTES, 1, [Task::EXECUTE_AT => date('Y-m-d H:i:s')]); // 执行次数加1 更新最后执行时间
} catch (BusinessException $exception) {
$res = ['code' => $exception->getCode(), 'message' => $exception->getMessage()];
} catch (\Throwable $exception) {
$method = 'ERROR';
$res = ['code' => $exception->getCode(), 'errMessage'=>$exception->getMessage(), 'errCode'=>$exception->getCode(), 'errFile'=>$exception->getFile(), 'errLine'=>$exception->getLine()];
//任务异常发钉钉报警
$ding_msg = ['TYPE' => 'CronJob', 'CLASS' => $this->task, 'METHOD' => $task[Task::METHOD], 'TITLE' => $task[Task::TITLE], 'SCHEDULE' => $task[Task::SCHEDULE], 'CONTENT' => $content, 'ERROR' => $res];
queue_send_async(env('EXCEPTION_SEND_DINGDING', 'exception_send_dingding'), $ding_msg);
}
$task[Task::EXECUTES]++; // 执行次数加1
$task[Task::EXECUTE_AT] = date('Y-m-d H:i:s'); // 最后执行时间
$time_diff = substr(strval((microtime(true) - $start_time) * 1000), 0, 7); // 计算使用时间
$mtime_diff = substr(strval((memory_get_usage() - $start_memory)/1024/1024), 0, 7); // 计算内存使用量
$mtime_diff = abs(floatval($mtime_diff)); // 绝对值
$memory_get_peak_usage = substr(strval(memory_get_peak_usage()/1024/1024), 0, 7); // 最大内存使用量
$logs = "[" . getHostByName(getHostName()) . "] [" . $this->task . "] [" . $task[Task::METHOD] . "] [{$time_diff}ms] [{$mtime_diff}mb] [{$memory_get_peak_usage}mb] [task/log] [0]" . PHP_EOL; //写日志第一行
$logs .= "[DATA] " . json_encode($content, JSON_UNESCAPED_UNICODE) . PHP_EOL; // 写日志消费的数据
$logs .= "[REN] " . json_encode($res, JSON_UNESCAPED_UNICODE); // 写日志消费完 返回的数据
call_user_func([\support\Log::channel($this->logName), $method], $logs, $task); // 写日志
}
}
在进程管理文件添加
// 定时任务进程
'cronJob' => [
'handler' => app\process\CronJob::class,
],
进程间通信封装类(这里可以不用封装自己实现即可)
<?php
declare(strict_types=1);
/**
* 跨进程通信客户端
* thb
* 2025-06-27
*/
namespace app\lib;
use Webman\Channel\Client;
class ChannelClient
{
static protected function init() : array
{
// Channel服务ip地址
$ip = env('CHANNEL_HOST','127.0.0.1');
// Channel服务端口
$port = env('CHANNEL_PORT','2206');
// 信息加密秘钥
$password = env('CHANNEL_PASSWORD','abc123');
return [$ip, $port, $password];
}
/**
* 推送信息
* @param string $eventName 订阅事件
* @param array $eventData 事件数据
* @param array $userIds 给用户ID列表发送信息
* @return array
*/
static public function publish(string $eventName = 'event_websocket', array $eventData = [], array $userIds = []) : array
{
if($eventName === ''){
stop(500, '事件名称不能为空');
}
if(empty($eventData)){
stop(500, '事件数据不能为空');
}
list($ip, $port, $password) = self::init();
// 发送的数据
$sendData = [
'data' => $eventData, //事件数据
'sign' => signature($eventData, $password), //签名
'userIds' => $userIds //发给用户
];
// 连接到Channel服务
Client::connect($ip, $port);
// 发布某个自定义事件,订阅这个事件的客户端会收到事件数据,并触发客户端对应的事件回调
Client::publish($eventName, $sendData);
return ['code' => 200, 'message' => '事件发布成功'];
}
/**
* 接受信息
* @param string $eventName 订阅事件
* @param callable $callback 回调方法
* @return void
*/
static public function subscribe(string $eventName = 'event_websocket', callable $callback = null) : void
{
list($ip, $port, $password) = self::init();
// 连接到Channel服务
Client::connect($ip, $port);
// 订阅某个自定义事件并注册回调,收到事件后会自动触发此回调
Client::on($eventName, function($data) use($password, $callback) {
// 签名
$sign = $data['sign'] ?? '';
unset($data['sign']);
if($sign === signature($data['data'], $password)){ // 签名验证通过,触发事件
$callback($data);
}
});
}
}
原理就是自定义进程处理定时任务,然后通过进程间通信实现定时任务的添加和删除修改等操作
及供参考(队列消费可以不用,但是建议耗时任务使用队列消费)
收藏了,感谢分享~
\app\lib\Task::class
怎么缺少一个这个类的实现