一、在软件商店安装kafka
二、php扩展开启rdkafka,调用phpinfo确认扩展开启成功:
三、建立一个生产者和一个消费者,例如生产者producer.php 消费者consumer.php 以及一个调用生产者往队列放入消息的方法,例如test.php
1.producer.php内容:
<?php
function produceKafkaMessage($message) {
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', '127.0.0.1:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("im_topic");//主题的名字
$topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message));
$producer->flush(10000);
echo "Produced: " . json_encode($message) . "\n";
}
2.consumer.php内容:
<?php
// consumer.php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', '127.0.0.1:9092');
$conf->set('group.id', 'my_consumer_group');//消费者组名
$conf->set('auto.offset.reset', 'earliest');
$consumer = new RdKafka\Consumer($conf);
$consumer->addBrokers('127.0.0.1:9092');
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', '/tmp');
$topic = $consumer->newTopic("im_topic", $topicConf);//与生产者的主题相同
// $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);//每次重启都会把所有历史消息重新消费
$topic->consumeStart(0, RD_KAFKA_OFFSET_END);//只会消费新产生的消息
while (true) {
$message = $topic->consume(0, 120 * 1000); // 超时时间为120秒
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$order = trim($message->payload); // 去除换行
$data = json_decode($order, true);
print_r($data);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
// 到达分区末尾
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
}
}
?>
3.test.php内容:
<?php
require_once 'producer.php'; // 载入 producer.php
//模拟生成订单
for($i=0;$i<10;$i++){
$order = [
'id' => time().'_'.$i,
'price' => 9.9,
'user_id'=>123,
'goods_name'=>'测试商品'.$i
];
// 每产生一个新订单就 调用函数放入队列
produceKafkaMessage($order);
}
?>
四:运行效果,打开终端:运行消费者:php consumer.php start (生产环境可以添加守护进程)。浏览器打开test.php,或者新窗口命令行运行php test.php start即可看到效果
后续处理队列消息,在消费者consumer.php获取到消息那里处理即可。