Worker 不间断执行任务最佳实践

aidong_wang

项目整体是一个 Gateway 网关,保持跟物联网设备双向通讯。现在因为业务需要,我们启动了 10 个 Worker 不间断的从腾讯云的消息队列获取消息,并且转发给对应的物联网设备。先上代码:

[attach]2022[/attach]
 

如截图所示,当 onWorkerStart 之后,通过 while(true) 不间断从腾讯云消费消息,针对此实现方案,想请教大家两个问题:
 
1、Gateway 中 除了使用 worker 做这种任务外,是否还有其他的更优方案
2、onWorkerStart 的回调函数中,使用 while(true) 有什么风险?是否有更好的建议?
3、这个 10 个 worker 的生命周期如何管理?如何实现 worker 卡死或者挂掉后能自动重新启动?
 

3826 4 0
4个回答

walkor

1、这种情况使用单独的worker基本是最优的方案
 
2、while(true) 会让进程一直运行在while(true) 的循环中,进程无法处理其它业务逻辑,比如无法处理onMessage onClose等,无法响应php start.php status命令(对应进程显示busy状态),无法响应stop命令,stop后会有退出码为9的日志。如果确定只需要运行while(true) 里面的代码即可,则除了上面列出的影响外其它没有影响,问题不大
 
3、 worker退出后会自动重启。但是worker卡死看是怎样算卡死,比如CmqApi::receiveMsg()本身就是阻塞的,无超时时间,如果队列里1天的没有消息发过来,进程就一直阻塞在这里不动,那么怎么判定是卡死还是没有消息?
 
检测业务进程是否卡死可以采用类似心跳的方式,比如每个进程在一定时间间隔内向磁盘更新自己的一个时间戳文件(内容就是当前时间戳),这样可以通过单独开启一个worker进程定时检查时间戳文件里的时间是否及时更新来判定进程是否有卡死。如果卡死通过发送SIGKILL信号posix_kill(pid, SIGKILL)将对应进程杀死重启即可。
 
如果CmqApi::receiveMsg()是阻塞的,但是有超时时间,比如即使没有消息CmqApi::receiveMsg()会在1分钟后返回null,那么就可以CmqApi::receiveMsg()下面写一个时间戳日志。启动一个worker进程每2分钟检查一次,如果对应进程在x分钟内没有更新时间戳就认为进程卡死了。 如果CmqApi::receiveMsg()是无限阻塞的没有超时时间就没有办法判断是真的卡死还是消息队列没有消息了。

  • aidong_wang 2019-05-14

    非常感谢您的回复
    CmqApi::receiveMsg()是阻塞的,但是有超时时间,我们将采用您建议的方案:
    “检测业务进程是否卡死可以采用类似心跳的方式,比如每个进程在一定时间间隔内向磁盘更新自己的一个时间戳文件(内容就是当前时间戳),这样可以通过单独开启一个worker进程定时检查时间戳文件里的时间是否及时更新来判定进程是否有卡死。如果卡死通过发送SIGKILL信号posix_kill(pid, SIGKILL)将对应进程杀死重启即可”

    如果我们的 Worker 希望能响应php start.php status命令,Worker 需要做哪些操作才能支持?

aidong_wang

@https://wenda.workerman.net/people/walkor 
我记得哪里有提到可以相应 status 命令的方法,可以再发一下吗?

  • 暂无评论
walkor

while 循环改成定时器。但是你的情况不一定有效,因为进程可能长时间阻塞在CmqApi::receiveMsg(),无法在1秒内响应status命令

  • aidong_wang 2019-05-16

    是的,改成定时器会降低性能,CmqApi::receiveMsg() 函数,发送请求达到腾讯云如果没有消息,会被 hold 一定时间才返回。

    如果没有其他实现方案,那感谢您的支持和解答了

wadeYang

截图
请教大佬,你这个不加usleep(n)或者sleep()函数,CPU不会爆满吗?
我之前做一个类似demo,不加的话CPU一直100%。
我在我标记那里(两个地方任意一个地方都可以,目的就是想干完一段活儿后稍微休息下)加上sleep或者usleep,不想延迟太多就usleep(单位微妙),世界瞬间平静了,CPU降下来了。

我也不懂是为什么,在此也想请教各位大佬,是所谓的暂时让出cpu的意思吗

  • 暂无评论
年代过于久远,无法发表回答
🔝