博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
PHP使用Kafka
阅读量:5871 次
发布时间:2019-06-19

本文共 2484 字,大约阅读时间需要 8 分钟。

  hot3.png

what is kafka

kafka是分布式发布订阅系统

安装brew install kafka

可能会提示brew cask install java

中途会被安装zookeeper

修改server.properties

vim /usr/local/etc/kafka/server.properties

增加一行配置

listeners=PLAINTEXT://localhost:9092

首先我们启动zookeeper

zkserver start启动kafka servercd /usr/local/Cellar/kafka/0.10.1.0/bin

./kafka-server-start /usr/local/etc/kafka/server.properties 新建session查看kafka的topic

./kafka-topics --list --zookeeper localhost:2181

一般产出的是test

启动kafka的生产者

./kafka-console-producer --topic [topic-name] --broker-list localhost:9092

开启一个kafka消费者

./kafka-console-consumer --bootstrap-server localhost:9092 -topic [topic-name]

查看kafka的topic数量

./kafka-topics --list --zookeeper localhost:2181

2181 端口是固定的

创建一个topic
./kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic gwr
Broker

Kafka集群包含了一个或多个服务器,这种服务器称为broker

Topic

每条发布到Kafka集群的消息都有一个类别,这个类别称为Topic

Partition

Partition是物理上的概念,每个Topic包含一个或多个Partition

Producer

负责发布消息到Kafka broker

Consumer

消费消费者,向Kafka broker读取消息的客户端

php 使用kafka扩展

composer安装kafka扩展

composer require nmred/kafka-php 生产者producer.php

setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setBrokerVersion('0.10.0.0.');$config->setRequiredAck(1);$config->setIsAsyn(false);//$config->setProduceInterval(500);$producer = new \Kafka\Producer();//$producer->send(array(// array(// 'topic' => 'test',// 'value' => 'test1....message.',// 'key' => '',// ),//));for ($i = 0; $i < 100; $i++) { $result = $producer->send(array( array( 'topic' => 'test', 'value' => 'test1....message.', 'key' => '', ), )); var_dump($result);}

消费者 cosumer.php

setMetadataRefreshIntervalMs(10000);$config->setMetadataBrokerList('127.0.0.1:9092');$config->setGroupId('test');$config->setBrokerVersion('0.10.0.1');$config->setTopics(array('test'));//$config->setOffsetReset('earliest');$consumer = new \Kafka\Consumer();$consumer->start(function($topic, $part, $message) { var_dump($message);});
ERROR [Replica Manager on Broker 0]: Error processing append operation on partition test-0 (kafka.server.ReplicaManager)Well this bug report is mostly about the fact that a producer that worked with 0.9.0.1 breaks when updating to 0.10.0.0. Whether this is an issue of Kafka or the client isn't that interesting, but for sure something changed on the server side.需要将broker version从0.9.0.0 切换到0.10.0.0

转载于:https://my.oschina.net/kakoi/blog/1490298

你可能感兴趣的文章
linux命令行模式下实现代理上网 专题
查看>>
EPEL库安装
查看>>
Character Studio
查看>>
【转】关于PHP的header("P3P: CP=CURa……")
查看>>
PHP如何释放内存之unset销毁变量并释放内存详解
查看>>
开源:Taurus.MVC 框架 (已支持.NET Core)
查看>>
C# WPF定时器
查看>>
30个你必须记住的CSS选择符
查看>>
CSS图片裁剪Clip
查看>>
iOS中JS 与OC的交互(JavaScriptCore.framework)
查看>>
Unbuntu和Centos中部署同时多版本PHP的详细过程
查看>>
Spring Boot配置文件规则以及使用方法官方文档查找以及Spring项目的官方文档查找方法...
查看>>
python binascii模块详解
查看>>
Hive之 hive的三种使用方式(CLI、HWI、Thrift)
查看>>
UWP的一种下拉刷新实现
查看>>
requests 证书验证
查看>>
iOS开发基础:OC数组对象NSArray的常用方法
查看>>
Xcode下的中文乱码问题
查看>>
How can I set ccshared=-fPIC while executing ./configure?
查看>>
python常见面试题(三)
查看>>