PHP开源:Kafka-php-使用 PHP 编写的 Kafka 客户端

WayneMacias 8年前
   <h2>Kafka-php</h2>    <p><a href="/misc/goto?guid=4959748222367434544" rel="nofollow,noindex">English Document</a></p>    <p>Kafka-php 使用纯粹的PHP 编写的 kafka 客户端,目前支持 0.8.x 以上版本的 Kafka,该项目 v0.2.x 和 v0.1.x 不兼容,如果使用原有的 v0.1.x 的可以参照文档 <a href="/misc/goto?guid=4959748222455183273" rel="nofollow,noindex">Kafka PHP v0.1.x Document</a> , 不过建议切换到 v0.2.x 上。v0.2.x 使用 PHP 异步执行的方式来和kafka broker 交互,较 v0.1.x 更加稳定高效, 由于使用 PHP 语言编写所以不用编译任何的扩展就可以使用,降低了接入与维护成本</p>    <h2>安装环境要求</h2>    <ul>     <li>PHP 版本大于 5.5</li>     <li>Kafka Server 版本大于 0.8.0</li>     <li>消费模块 Kafka Server 版本需要大于 0.9.0</li>    </ul>    <h2>Installation</h2>    <h2>使用 Composer 安装</h2>    <p>添加 composer 依赖 nmred/kafka-php 到项目的 composer.json 文件中即可,如:</p>    <pre>  <code class="language-php">{   "require": {    "nmred/kafka-php": "0.2.*"   }  }</code></pre>    <h2>Produce</h2>    <pre>  <code class="language-php"><?php  require '../vendor/autoload.php';  date_default_timezone_set('PRC');  use Monolog\Logger;  use Monolog\Handler\StdoutHandler;  // Create the logger  $logger = new Logger('my_logger');  // Now add some handlers  $logger->pushHandler(new StdoutHandler());    // 设置生产相关配置,具体配置参数见 [Configuration](Configuration.md)  $config = \Kafka\ProducerConfig::getInstance();  $config->setMetadataRefreshIntervalMs(10000);  $config->setMetadataBrokerList('10.13.4.159:9192');  $config->setBrokerVersion('0.9.0.1');  $config->setRequiredAck(1);  $config->setIsAsyn(false);  $config->setProduceInterval(500);  $producer = new \Kafka\Producer(function() {   return array(    array(     'topic' => 'test',     'value' => 'test....message.',     'key' => 'testkey',    ),   );  });  $producer->setLogger($logger);  $producer->success(function($result) {   var_dump($result);  });  $producer->error(function($errorCode, $context) {   var_dump($errorCode);  });  $producer->send();</code></pre>    <h2>Consumer</h2>    <pre>  <code class="language-php"><?php  require '../vendor/autoload.php';  date_default_timezone_set('PRC');  use Monolog\Logger;  use Monolog\Handler\StdoutHandler;  // Create the logger  $logger = new Logger('my_logger');  // Now add some handlers  $logger->pushHandler(new StdoutHandler());    $config = \Kafka\ConsumerConfig::getInstance();  $config->setMetadataRefreshIntervalMs(10000);  $config->setMetadataBrokerList('10.13.4.159:9192');  $config->setGroupId('test');  $config->setBrokerVersion('0.9.0.1');  $config->setTopics(array('test'));  //$config->setOffsetReset('earliest');  $consumer = new \Kafka\Consumer();  $consumer->setLogger($logger);  $consumer->start(function($topic, $part, $message) {   var_dump($message);  });</code></pre>    <h2>Basic Protocol</h2>    <p>基础协议 API 调用方式见 <a href="/misc/goto?guid=4959748222536860146" rel="nofollow,noindex">Example</a></p>    <p> </p>    <p>项目主页:<a href="http://www.open-open.com/lib/view/home/1493705358573">http://www.open-open.com/lib/view/home/1493705358573</a></p>    <p> </p>