PHP操作Beanstalkd队列

内容目录

介绍

Beanstalkd 是一款高性能、轻量级的分布式内存消息队列。
轻量是通过对比而来,相比RabbitMQ和Kafka, Beanstalkd显得更加简单易用,同时可以满足小系统的应用,可以实现生产者和消费者分离模式。
Beanstalkd是一个高性能,轻量级的分布式内存队列,C 代码,典型的类Memcached设计,协议和使用方式都是同样的风格,所以使用过memcached的用户会觉得Beanstalkd似曾相识。
beanstalkd 的最初设计意图是在高并发的网络请求下,通过异步执行耗时较多的请求,及时返回结果,减少请求的响应延迟。

基本概念

一、概念

  • job:一个需要异步处理的任务,是 Beanstalkd 中的基本单元,需要放在一个 tube 中
  • tube:一个有名的任务队列,用来存储统一类型的 job,是 producer 和 consumer 操作的对象

  • producer:Job 的生产者,通过 put 命令来将一个 job 放到一个 tube 中

  • consumer:Job的消费者,通过 reserve/release/bury/delete 命令来获取 job 或改变 job 的状态

生命周期

producer(生产者)put一个新的job到tube时,job就处于READY状态,等待consumer来处理,如果选择延迟put,job就先到DELAYED状态,等待时间过后才迁移到READY状态。

consumer获取了当前READY的job后,该job的状态就迁移到RESERVED,这样其他的consumer就不能再操作该job。

当consumer完成该job后,可以选择delete, release或者bury操作;delete之后,job从系统消亡,之后不能再获取;release操作可以重新把该job状态迁移回READY(也可以延迟该状态迁移操作),使其他的consumer可以继续获取和执行该job;有意思的是bury操作,可以把该job休眠,等到需要的时候,再将休眠的job kick回READY状态,也可以delete BURIED状态的job。

READY - 需要立即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务

DELAYED - 延迟执行的任务, 当消费者处理任务后, 可以用将消息再次放回 DELAYED 队列延迟执行

RESERVED - 已经被消费者获取, 正在执行的任务。Beanstalkd 负责检查任务是否在 TTR(time-to-run)
内完成

BURIED - 保留的任务: 任务不会被执行,也不会消失,除非有人把它 “踢” 回队列

DELETED - 消息被彻底删除。Beanstalkd 不再维持这些消息

状态 说明
delayed 延迟状态
ready 准备好状态
reserved 消费者把任务读出来,处理时
buried 预留状态
delete 删除状态

web管理工具

有一个开源的web 界面:

https://github.com/ptrofimov/beanstalk_console

特性

  • 优先级(priority)

对于很紧急的消费者可以让他在队列中提前,实现插队行为

  • 延迟(delay)

设置的任务在多少秒后才能被消费者读取,可以实现定时任务,如定时点赞等

  • 持久化(persistent data)

出现服务器宕机等情况,数据依然存在

  • 预留(buried)

消费者将无法取出任务,在其他合适的时机在拿出消费

  • 任务超时重发(time-to-run)

消费者必须在指定的时间内处理完任务,否则这个任务将重新被放入队列

安装

下载软件-编译安装

  1. git clone git://github.com/kr/beanstalkd.gitcd beanstalkd

编译

  1. make#安装完成

-默认安装在

  1. /usr/local/bin/

yum安装

  1. yum -y install beanstalkd --enablerepo=epel

配置文件

  1. /etc/sysconfig/beanstalkd

启动并绑定端口

  1. beanstalkd -l 127.0.0.1 -p 11300 -b /home/software/binstalkd/binlogs

-b开启日志备份,重启恢复数据
安装php操作类

  1. composer require pda/pheanstalk

demo.php 文件

  1. <?php
  2. require './vendor/autoload.php';
  3. $p = new \Pheanstalk\Pheanstalk('127.0.0.1',11300);
  4. print_r($p->stats());// 打印出队列所有信息

输出信息

  1. 'current-jobs-urgent' => '0', // 优先级小于1024状态为ready的job数量
  2. 'current-jobs-ready' => '0', // 状态为ready的job数量
  3. 'current-jobs-reserved' => '0', // 状态为reserved的job数量
  4. 'current-jobs-delayed' => '0', // 状态为delayed的job数量
  5. 'current-jobs-buried' => '0', // 状态为buried的job数量
  6. 'cmd-put' => '0', // 总共执行put指令的次数
  7. 'cmd-peek' => '0', // 总共执行peek指令的次数
  8. 'cmd-peek-ready' => '0', // 总共执行peek-ready指令的次数
  9. 'cmd-peek-delayed' => '0', // 总共执行peek-delayed指令的次数
  10. 'cmd-peek-buried' => '0', // 总共执行peek-buried指令的次数
  11. 'cmd-reserve' => '0', // 总共执行reserve指令的次数
  12. 'cmd-reserve-with-timeout' => '0', 'cmd-delete' => '0', 'cmd-release' => '0', 'cmd-use' => '0', // 总共执行use指令的次数
  13. 'cmd-watch' => '0', // 总共执行watch指令的次数
  14. 'cmd-ignore' => '0', 'cmd-bury' => '0', 'cmd-kick' => '0', 'cmd-touch' => '0', 'cmd-stats' => '2', 'cmd-stats-job' => '0', 'cmd-stats-tube' => '0', 'cmd-list-tubes' => '0', 'cmd-list-tube-used' => '0', 'cmd-list-tubes-watched' => '0', 'cmd-pause-tube' => '0', 'job-timeouts' => '0', // 所有超时的job的总共数量
  15. 'total-jobs' => '0', // 创建的所有job数量
  16. 'max-job-size' => '65535', // job的数据部分最大长度
  17. 'current-tubes' => '1', // 当前存在的tube数量
  18. 'current-connections' => '1', // 当前打开的连接数
  19. 'current-producers' => '0', // 当前所有的打开的连接中至少执行一次put指令的连接数量
  20. 'current-workers' => '0', // 当前所有的打开的连接中至少执行一次reserve指令的连接数量
  21. 'current-waiting' => '0', // 当前所有的打开的连接中执行reserve指令但是未响应的连接数量
  22. 'total-connections' => '2', // 总共处理的连接数
  23. 'pid' => '3609', // 服务器进程的id
  24. 'version' => '1.10', // 服务器版本号
  25. 'rusage-utime' => '0.000000', // 进程总共占用的用户CPU时间
  26. 'rusage-stime' => '0.001478', // 进程总共占用的系统CPU时间
  27. 'uptime' => '12031', // 服务器进程运行的秒数
  28. 'binlog-oldest-index' => '2', // 开始储存jobs的binlog索引号
  29. 'binlog-current-index' => '2', // 当前储存jobs的binlog索引号
  30. 'binlog-records-migrated' => '0', 'binlog-records-written' => '0', // 累积写入的记录数
  31. 'binlog-max-size' => '10485760', // binlog的最大容量
  32. 'id' => '37604ac4305d3b16', // 一个随机字符串,在beanstalkd进程启动时产生
  33. 'hostname' => 'localhost.localdomain',

更多其他方法

  1. #查看当前存在的所有管道
  2. $p->listTubes();
  3. #查看管道信息,例如newUsers管道
  4. $p->statusTubes('newUsers');
  5. #指定要使用的管道信息,添加任务
  6. $p->useTube('newUsers')->put('test');
  7. #监听管道 取出任务
  8. $job = $p->watch('newUsers')->reserve();
  9. #查看被取出任务的信息
  10. $state = $p->statsJob($job);
  11. print_r($state);
  12. #根据任务id获取任务参数
  13. $job = $p->peek(id);
  14. $state = $p->statsJob($job);

返回结果

  1. 'id' => '1', // job id
  2. 'tube' => 'test', // job 所在的管道
  3. 'state' => 'reserved', // job 当前的状态
  4. 'pri' => '1024', // job 的优先级
  5. 'age' => '5222', // 自 job 创建时间为止 单位:秒
  6. 'delay' => '0', 'ttr' => '60', // time to run
  7. 'time-left' => '58', // 仅在job状态为reserved或者delayed时有意义,当job状态为reserved时表示剩余的超时时间
  8. 'file' => '2', // 表示包含此job的binlog序号,如果没有开启它将为0
  9. 'reserves' => '10', // 表示job被reserved的次数
  10. 'timeouts' => '0', // 表示job处理的超时时间
  11. 'releases' => '1', // 表示job被released的次数
  12. 'buries' => '0', // 表示job被buried的次数
  13. 'kicks' => '0', // 表示job被kiced的次数

添加和处理任务

添加任务

  1. #参数顺序为1.管道名 2.内容 3.优先级 4.延迟秒数 5.超时时间秒速 超时重发(默认60s)
  2. #向 newUsers 管道 添加内容为 text 优先级为1000 延迟20s 操作时间为30s的任务
  3. $p->putInTube('newUsers','text',1000,20,30);

更多例子

  1. <?php
  2. //创建队列消息
  3. require_once('./vendor/autoload.php');
  4. use Pheanstalk\Pheanstalk;
  5. $pheanstalk = new Pheanstalk('127.0.0.1',11300);
  6. $tubeName = 'email_list';
  7. $jobData = [
  8. 'email' => '123456@gmail.com',
  9. 'message' => 'Hello World !!',
  10. 'dtime' => date('Y-m-d H:i:s'),
  11. ];
  12. $pheanstalk->useTube( $tubeName)->put( json_encode( $jobData ) );

使用生产者 put 方法向队列添加任务

  1. #与 putInTube 方法不同,先选择了管道,在添加任务,参数与 putInTube 相同
  2. $tube = $p->useTube('newUsers');
  3. $tube->put('text1',1000);
  4. $tube->put('text2',1000);

使用消费者方法获取管道信息

  1. #watch()监听管道#reserve()获取任务,无法读取bury状态任务,是阻塞方法,无任务时程序会一直阻塞,直到有新任务出现,可以添加阻塞时间,单位s,如reserve(2)
  2. $job = $p->watch('newUsers')->reserve();
  3. $job->getData();
  4. #获取任务内容~
  5. #执行处理逻辑
  6. $p->delete();
  7. #删除任务

处理任务

  1. <?php
  2. ini_set('default_socket_timeout', 86400*7);
  3. ini_set( 'memory_limit', '256M' );
  4. // 消费队列消息
  5. require_once('./vendor/autoload.php');
  6. use Pheanstalk\Pheanstalk;
  7. $pheanstalk = new Pheanstalk('127.0.0.1',11300);
  8. $tubeName = 'email_list';
  9. while ( true )
  10. {
  11. // 获取队列信息, reserve 阻塞获取
  12. $job = $pheanstalk->watch( $tubeName )->ignore( 'default' )->reserve();
  13. if ( $job !== false )
  14. {
  15. $data = $job->getData();
  16. /* TODO 逻辑操作 */
  17. /* 处理完成,删除 job */
  18. $pheanstalk->delete( $job );
  19. }
  20. }

另外一种方法

  1. public function watchJob()
  2. {
  3. $job = $this->pheanstalk->watch( config( 'tube' ) )->ignore( 'default' )->reserve();
  4. if ( $job !== false )
  5. {
  6. $job_data = $job->getData();
  7. $this->subscribe( $job_data );
  8. $this->pheanstalk->delete( $job );
  9. /* 继续 Watch 下一个 job */
  10. $this->watchJob();
  11. }
  12. else
  13. {
  14. $this->log->error( 'reserve false', 'reserve false' );
  15. }
  16. }

监控 beanstalkd 状态

  1. <?php
  2. //监控服务状态
  3. require_once('./vendor/autoload.php');
  4. use Pheanstalk\Pheanstalk;
  5. $pheanstalk = new Pheanstalk('127.0.0.1',11300);
  6. $isAlive = $pheanstalk->getConnection()->isServiceListening();
  7. var_dump( $isAlive );

一些命令

查看 beanstalkd 服务内存占用

  1. top -u beanstalkd

后台运行 consumer 脚本

  1. nohup php consumer_subscribe.php &

查看 consumer 脚本运行时间

  1. ps -A -opid,stime,etime,args | grep consumer_subscribe.php

手工重启 consumer 脚本

  1. ps auxf|grep 'consumer_subscribe.php'|grep -v grep|awk '{print $2}'|xargs kill -9
  2. nohup php consumer_subscribe.php &

错误排查

php 要把错误日志打开,方便收集 consumer 脚本 crash 的 log,脚本跑出一些致命的 error 一定要及时修复,因为一旦有错就会挂掉,这会影响你脚本的可用性,后期稳定之后可以上 supervisor 这种进程管理程序来管控脚本生命周期。
  一些网络请求操作,一定要 try catch 到所有错误,一旦没有 catch 到,脚本就崩。我用的是 Guzzle 去做的网络请求,下面是我 catch 的一些错误,代码片段供参考

  1. try
  2. {
  3. /* TODO: 逻辑操作 */
  4. }
  5. catch ( ClientException $e )
  6. {
  7. $results['mid'] = $this->mid;
  8. $results['code'] = $e->getResponse()->getStatusCode();
  9. $results['reason'] = $e->getResponse()->getReasonPhrase();
  10. $this->log->error( 'properties-changed ClientException', $results );
  11. }
  12. catch ( ServerException $e )
  13. {
  14. $results['mid'] = $this->mid;
  15. $results['code'] = $e->getResponse()->getStatusCode();
  16. $results['reason'] = $e->getResponse()->getReasonPhrase();
  17. $this->log->error( 'properties-changed ServerException', $results );
  18. }
  19. catch ( ConnectException $e )
  20. {
  21. $results['mid'] = $this->mid;
  22. $this->log->error( 'properties-changed ConnectException', $results );
  23. }

 job 消费之后一定要删除掉,如果长时间不删除,php 客户端会有 false 返回,是因为有 DEADLINE_SOON 这个超时错误产生,所以处理完任务,一定要记得删除,这一点跟 kafka 不一样,beanstalkd 需要开发者自己去删除 job。

若处理逻辑时间过长,任务将会被重新放回管道中,可以用 touch 方法延迟处理时间

  1. $job = $p->watch('newUsers')->reserve();
  2. $p->touch($job);

当这个任务不符合处理要求时,可以用 release 方法将它放回管道,重设为 ready 状态

  1. $job = $p->watch('newUsers')->reserve();
  2. $p->release($job);

如果此任务是当前不处理任务,可以使用 bury 方法将它改为预留任务,放置一旁

  1. $job = $p->watch('newUsers')->reserve();
  2. $p->bury($job);
  3. #读取预留任务
  4. $job = $p->peekBuried('newUsers');
  5. #将任务变回ready状态
  6. $p->kickJob($job);
  7. #批量处理预留任务 将id小于999的任务都变成ready状态
  8. $p->useTube('newUsers')->kick(999);

只获取对应状态的任务 peek

  1. #获取ready状态任务
  2. $job = $p->peekReady('newUsers');
  3. #读取Delayed状态任务
  4. $p->peekDelayed('newUsers');

reserve是阻塞的 peek是没阻塞,当获取不到任务将抛出异常

  1. #同时监听多个管道
  2. $p->watch('newUsers')->watch('default');
  3. #被监听的管道
  4. $p->listTubesWatched();
  5. #不监听此管道
  6. $p->ignore('default')
  7. #给管道设置延迟
  8. $p->pauseTube('newUsers',100);
  9. #取消延迟
  10. $p->resumeTube('newUsers');