• 你好!欢迎你的到来
  • 关于我们
  • 首页 博客 学习笔记 技术导航 工具
  • 博文分类
    • PHP(43)
    • MySQL(11)
    • Linux(28)
    • html(3)
    • JQuery(4)
    • JavaScript(9)
    • svn(2)
    • CSS(2)
    • seajs(1)
    • go(44)
    • redis(1)
    • nginx(8)
    • mongo(0)
    • java(0)
    • 算法(0)
    • 其他(26)
    • 生活(1)
    专栏
    • Jquery基础教程
      • 文章:(15)篇
      • 阅读:33316
    • shell命令
      • 文章:(42)篇
      • 阅读:108621
    • Git教程
      • 文章:(36)篇
      • 阅读:179635
    • leetCode刷题
      • 文章:(76)篇
      • 阅读:64008
    • 摘要视图
    • 目录视图
    PHP中zeromq使用
    标签: ZMQSocketzeromq消息队列
    2017-09-09 21:42 阅读(8252) 评论(1)

        在前一篇(PHP的ZMQ扩展zeromq源码安装)文章中,我们配置好了zeromq,现在到了使用的时候。ZMQ 提供了三个基本的通信模型,即Request-Reply Publisher-Subscriber Parallel Pipeline,下面将针对每一种模式,从两个方面去探究,怎么向消息队列中写消息,怎么从消息队列中读取消息。

    一、zemq的使用步骤

    1、使用ZMQContext创建一个上下文

    2、使用上下文初始化ZMQSocket,这里需要指明socket类型(ZMQ::SOCKET_开头),组合模式包括

    PUB,SUB
    REQ,REP
    REQ,ROUTER (take care, REQ inserts an extra null frame)
    DEALER,REP (take care, REP assumes a null frame)
    DEALER,ROUTER
    DEALER,DEALER
    ROUTER,ROUTER
    PUSH,PULL
    PAIR,PAIR
    分类包括
    轮询,REQ,PUSH,DEALER
    多播,PUB
    公平排队,REP,SUB,PULL,DEALER
    明确寻址,ROUTER
    单播,PAIR

    3、指定IP以及端口,如果是服务端就bind,如果是客户端就conncet

    进程内部通信,inproc://
    进程间通信,ipc://
    网络间通信,tcp://
    多播,pgm://

    4、使用send/recv发送/接收消息


    二、ZMQ常用的通信模式案例

    1、Request-Reply (请求应答模式)

    该模式的简介:server监听tcp的某个端口(下面案例中,绑定了8082端口),等待client 发起请求,并做相应的处理。如下图

    server.php 文件

    $ip        = '127.0.0.1';
    $context   = new \ZMQContext(1);
    $server    = new \ZMQSocket($context, \ZMQ::SOCKET_REP);//第二个参数指定通信模型
    $server->bind("tcp://" . $ip . ':8082');
    while(true){
          $res  = $server->recv();//监听用户的请求
          echo 'a request comes ,and the content is '.$res; 
          $server->send("Request-Reply 通信 by dq_test");
    }

    client.php文件

    $ip       = '127.0.0.1';
    $context  = new \ZMQContext(1);
    $client   = new \ZMQSocket($context, \ZMQ::SOCKET_REQ);
    $client->connect("tcp://" . $ip . ':8082');
    $msg      = array('name' => 'dq', 'lang' =>'php');
    echo "准备发送请求
    ";
    $rs = $client->send(json_encode($msg));//发送请求
    echo "已经发送,等待反馈\n";
    $res      = $client->recv(); //获取请求返回值
    echo "当前请求的返回值:".$res."\n";

    首先,得启动sever.php脚本,如下图

    启动前后,通过lsof查看8082端口占用情况。

    现在开始测试client.php

    注意

        1.服务端和客户端无论谁先启动,效果是相同的,这点不同于 Socke。


    2、Publisher-Subscriber(发布订阅模式)

    该模式的简介:有一个节点发布消息,其他所有节点可以接受消息,有点类似天气预报或是广播哦。如下图

    消息发布者 publish.php代码如下

    $ip        = '127.0.0.1';
    $context   = new \ZMQContext(1);
    $publish   = new \ZMQSocket($context, \ZMQ::SOCKET_PUB);//第二个参数指定通信模型
    $publish->bind("tcp://" . $ip . ':8083');
    
    while(true) {
        $publish->send("dq Request-Reply 通信 by dq_test");
        sleep(1);
    }

    消息订阅者 subscribe.php代码如下

    $ip        = '127.0.0.1';
    $context   = new \ZMQContext(1);
    $subscribe = new \ZMQSocket($context, \ZMQ::SOCKET_SUB);//第二个参数指定通信模型
    $subscribe->connect("tcp://" . $ip . ':8083');
    //设置频道  ,发布者可能有多个频道
    
    $filter = 'dq'; //接受消息的频道,必须设置,否则获取不到消息
    $subscribe->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter);
    while(true) {
       $rs = $subscribe->recv();
       echo $rs."\n";
    }
    执行上面程序

    添加订阅者

    /usr/local/php/bin/php subscribe.php &

    启动发布程序

    /usr/local/php/bin/php publish.php

    执行效果如下图:

    效率分析

    其发布任务的效率还是不错的,发布10w个任务,每秒发布的个数分别如下:

    个订阅者的个数
    每秒发布任务的个数
    0167万
    176.9万
    276.9万
    352.6万

    具体如下图:


    注意

        1、发布端(publish.php)需要一直处在运行状态,所有在上面程序中使用了while(true)死循环。若发布端(publish.php)中断,订阅者(subscribe.php)将会堵塞。

        2、如果中途有订阅者(subscribe.php)退出,并不影响他继续的广播,当订阅者(subscribe.php)再次连接上来的时候,仍然可以收到消息。 

    3、Parallel Pipeline(并行管道模式)

    该模式的简介:该模式给人的感觉就是总-分-总,一个地方进行分发任务到各个节点,等节点完成后,汇总到一个地方进行统计。懒得画图,借用一下官方的图,如下

    任务的发布者 serverPusher.php

    <?php
    $context = new ZMQContext();
    
    //  Socket to send messages on
    $sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
    $sender->bind("tcp://*:8034");
    
    echo "Press Enter when the workers are ready: ";
    $fp = fopen('php://stdin', 'r');
    $line = fgets($fp, 512);
    fclose($fp);
    echo "Sending tasks to workers…", PHP_EOL;
    
    //  The first message is "0" and signals start of batch
    $sender->send(0);
    
    //  Send 100 tasks
    $total_msec = 0;     //  Total expected cost in msecs
    for ($task_nbr = 0; $task_nbr < 100000; $task_nbr++) {
        //  Random workload from 1 to 100msecs
        $sender->send($task_nbr);
    
    }
    
    printf ("Total expected cost: %d msec\n", $total_msec);
    sleep (1);              //  Give 0MQ time to deliver

    任务的接受者 worker.php

    <?php
    
    $context = new ZMQContext();
    $index = 0;
    //  Socket to receive messages on
    $receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL);
    $receiver->connect("tcp://localhost:8034");
    
    //  Socket to send messages to
    $sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
    $sender->connect("tcp://localhost:8035");
    
    //  Process tasks forever
    while (true) {
        $string = $receiver->recv();
        $index +=1;
        //  Simple progress indicator for the viewer
        echo '任务id:'.$string;
        echo ';已经处理了'.$index.'个任务', PHP_EOL;
        //  Do the work
        usleep($string * 1000);
    
       //  Send results to sink
        $sender->send("good");
    }

    任务的汇总者 serverSummary.php

    <?php
    $context = new ZMQContext();
    $receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL);
    $receiver->bind("tcp://*:8035");
    
    //  Wait for start of batch
    $string = $receiver->recv();
    
    //  Start our clock now
    $tstart = microtime(true);
    
    //  Process 100 confirmations
    $total_msec = 0;     //  Total calculated cost in msecs
    for ($task_nbr = 0; $task_nbr < 100; $task_nbr++) {
        $string = $receiver->recv();
        if ($task_nbr % 10 == 0) {
            echo ":";
        } else {
            echo ".";
        }
    }
    
    $tend = microtime(true);
    
    $total_msec = ($tend - $tstart) * 1000;
    echo PHP_EOL;
    printf ("Total elapsed time: %d msec", $total_msec);
    echo PHP_EOL;

    这种模式声称是负载均衡,其实是平均分配而已。如下,101个任务,分发给3个woker

    可以看出任务id对3进行求余,余1分配给第一个人woker,余0分配给第二个woker,余2分配给第三个woker。如果仅仅是这样也就算了,啃爹的还不仅仅是这些,比如我们现在要分发1000个任务,如下图1、2、3在任务分发前启动,4在任务分发后启动,中途中断3后,隔几秒再启动,然后中断2,最后中断1。

    由上图,可以看出,当4启动以后,并不会被分发任务,也就是说当woker没有在任务分发前启动,是不会被分发任务的。其次,3中断后,原本该分发给3的任务,并不会分配给1和2,从而导致部分任务丢失。也就是说,在任务分发的时候,若有woker中断,会导致任务丢失。当有两个woker的时候,这种丢失看得更明显如下图:

    三、补充

    1、错误排查

    2、zemq的不足之处

    我感觉主要是其不支持持久化,当然这也成就了它的快速。

    3、参考

    https://github.com/anjuke/zguide-cn

    http://zguide.zeromq.org/page:all

    本文为原创文章,请尊重辛勤劳动,如需转载,请保留本文地址
    http://www.findme.wang/blog/detail/id/245.html

    若您感觉本站文章不错,读后有收获,不妨赞助一下?

    我要赞助

    您还可以分享给朋友哦

    更多
    顶
    5
    踩
    0
    • 上一篇: 利用Shell中awk和sort命令合并同类数据后并依据某一列进行排序
    • 下一篇: 修改Linux命令提示符及颜色,使其显示git分支名
    • 查看评论
    • 正在加载中...
    • 留言
    • 亲,您还没有登录,登录后留言不需要审核哦!
      可以使用如下方式登录哦!
  • CSDN | 新浪微博 | github | 关于我们 | 我要留言 | 友链申请
  • 豫ICP备18038193号    Copyright ©lidequan All Rights Reserved