宝塔项目PHP调用kafka消息队列简单案例

news/2025/3/10 1:15:28/文章来源:https://www.cnblogs.com/youantianqin/p/18758076

一、在软件商店安装kafka

 二、php扩展开启rdkafka,调用phpinfo确认扩展开启成功:

 

三、建立一个生产者和一个消费者,例如生产者producer.php   消费者consumer.php  以及一个调用生产者往队列放入消息的方法,例如test.php

1.producer.php内容:

<?php

function produceKafkaMessage($message) {
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', '127.0.0.1:9092');

$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("im_topic");//主题的名字

$topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message));
$producer->flush(10000);

echo "Produced: " . json_encode($message) . "\n";
}

 2.consumer.php内容:

 

<?php
// consumer.php
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', '127.0.0.1:9092');
$conf->set('group.id', 'my_consumer_group');//消费者组名
$conf->set('auto.offset.reset', 'earliest');

$consumer = new RdKafka\Consumer($conf);
$consumer->addBrokers('127.0.0.1:9092');

$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', '/tmp');

$topic = $consumer->newTopic("im_topic", $topicConf);//与生产者的主题相同
// $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);//每次重启都会把所有历史消息重新消费
$topic->consumeStart(0, RD_KAFKA_OFFSET_END);//只会消费新产生的消息

while (true) {
$message = $topic->consume(0, 120 * 1000); // 超时时间为120秒

switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$order = trim($message->payload); // 去除换行
$data = json_decode($order, true);

print_r($data);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
// 到达分区末尾
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
}
}
?>

3.test.php内容:

 

<?php
require_once 'producer.php'; // 载入 producer.php

//模拟生成订单
for($i=0;$i<10;$i++){
$order = [
'id' => time().'_'.$i,
'price' => 9.9,
'user_id'=>123,
'goods_name'=>'测试商品'.$i
];

// 每产生一个新订单就 调用函数放入队列
produceKafkaMessage($order);

}


?>

 

四:运行效果,打开终端:运行消费者:php consumer.php start  (生产环境可以添加守护进程)。浏览器打开test.php,或者新窗口命令行运行php test.php start即可看到效果

后续处理队列消息,在消费者consumer.php获取到消息那里处理即可。

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/895157.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

炒股事实监看?电脑远程观看、操作,使用ToDesk软件

对于绝大多数人来说,单单依靠打工所得的固定工资其实很难实现财富积累,而通过基金、股票等投资,却能在追求刺激的同时兴许有幸达成理财增值的目的。 当然,这并非是鼓励大家茫然入局,其中是需要很多专业性的信息分析并掌握一定的交易技术才可行;而在同样的付出基础上如何炒…

微软工程师偷偷在用!这款SSH工具让Windows操控CentOS比Mac还优雅!

🚀 个人主页 极客小俊 ✍🏻 作者简介:web开发者、设计师、技术分享 🐋 希望大家多多支持, 我们一起学习和进步! 🏅 欢迎评论 ❤️点赞💬评论 📂收藏 📂加关注基本原理 为什么要远程登录Linux? 你总不会把买来的服务器背在背上吧~ 所以无论我们身在何处,只要有…

地平线5、荒野大镖客、赛博朋克2077被嘲太贵?todesk云游戏早就搞定了

当《极限竞速:地平线5》《荒野大镖客:救赎2》《赛博朋克2077》等顶级3A游戏凭借震撼的画面与沉浸式玩法持续霸榜,但其动辄200元起步的定价始终是玩家热议的焦点。 "高价游戏是否值得买单"的争议愈演愈烈,小编搜罗了一番玩家评论,一起来看看大家到底怎么说的吧~《…

破防了!原来CentOS联网设置竟藏在这个路径?网友:5年运维都白干了!

🚀 个人主页 极客小俊 ✍🏻 作者简介:web开发者、设计师、技术分享 🐋 希望大家多多支持, 我们一起学习和进步! 🏅 欢迎评论 ❤️点赞💬评论 📂收藏 📂加关注默认情况下,我们按照刚刚安装好的CentOS是不能进行直接上网的 比如我们也可以打开系统自带的Firefox…

记录---一个网页打造自己的天气预报

🧑‍💻 写在开头 点赞 + 收藏 === 学会🤣🤣🤣 概念解释通过数据接口,简化功能开发。天气数据从哪里来?如果是自己采集,不光要写后端代码,最难的是维护啊,所以必须《天气预报》此类APP特别适合 前后端分离的,以下用一个简单的例子快速打通前后端的调用前端使用H…

从零开始:deepseek本地部署教程,小白也能玩转!

最近deepseek非常火热,很多小伙伴想要使用它,但是却找不到入口,即使找到了在线网页端,也会在意自己的数据的安全性和隐私保护,特别是在处理敏感信息或机密数据,不想让其他人在使用浏览器的时候看到这些信息。 那么将deepseek部署到本地就可以很好解决这一问题,怎么部署呢…

SoK: History is a Vast Early Warning System: Auditing the Provenance of System Intrusions 论文笔记

简介 审计能力就被认为是任何资源共享系统中检测违规和渗透尝试的关键。Lampson 将访问控制“黄金标准”的三大支柱确定为授权、身份验证和审计。当授权和身份验证等主动安全措施失败时,审计构成了所有形式的反应性安全的基础,使系统防御者能够在入侵升级之前识别并减轻入侵 …

寻找通义灵码 AI 程序员 {头号玩家} ,体验 QwQ-Plus、DeepSeek 满血版的通义灵码

2025 年 1 月,通义灵码 AI 程序员全面上线,同时支持 VS Code、JetBrains IDEs,是国内首个真正落地的 AI 程序员。近期,通义灵码能力再升级全新上线模型选择功能,目前已经支持 QwQ-plus、DeepSeek 满血版模型,用户可以在 VSCode 和 JetBrains 里搜索并下载最新通义灵码插件…

从“零”到“联”:Profinet转Ethernet/IP网关搞定发那科机器手臂

从“零”到“联”:Profinet转Ethernet/IP网关搞定发那科机器手臂 在电子制造行业,产品更新换代迅速,对生产效率和精度的要求近乎严苛。一家专注于智能手机零部件制造的企业,面临着生产流程优化与设备协同的重大挑战。传统设备多采用EthernetIP协议,而新引入的发那科机器人…

聊聊突然爆火的Manus

昨天号称全球首款通用人工智能的产品Manus横空出世,开始全网刷屏。我看到很多技术社群的讨论话题从DeepSeek变成了Manus,甚至网上已经有人在卖Manus的资料和邀请码,流量属性拉满。 Manus号称支持直接交付完整的任务结果(官网有展示Use case),还宣称在GAIA评分中超越了OpenA…

Linux 交叉编译(toolchain) ARM 版openssl-1.0.0s的libssl.so 库

前言全局说明一、说明 环境: ubuntu 18.04二、下载源码: 官网: https://openssl-library.org 源码下载: https://openssl-library.org/source/old/1.0.0/index.html 下载,指定版本: https://github.com/openssl/openssl/releases/download/OpenSSL_1_0_0s/openssl-1.0.0s.ta…

DMS+ADB-PG支持一键部署QwQ-32B推理模型

3月6日,阿里云发布并开源全新推理模型通义千问QwQ-32B。现已支持在DMS+ADB上私域部署并与Dify打通使用。3月6日,阿里云发布并开源全新的推理模型通义千问QwQ-32B。通过大规模强化学习,千问QwQ-32B在数学、代码及通用能力上实现质的飞跃,整体性能比肩DeepSeek-R1。在保持强劲…