云计算百科
云计算领域专业知识百科平台

在Windows下搭建kafka服务器,使用python制作生产者与消费者

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 前言
    • JDK安装
      • 1.下载jdk
      • 2.安装jdk
      • 3.验证是否安装成功
      • 4.配置环境变量
    • Zookeeper安装
      • 1.下载Zookeeper
      • 2.安装Zookeeper
      • 3.配置环境变量
      • 4.验证是否安装成功
    • Scala安装
      • 1.下载Scala
      • 2.安装Scala
      • 3.配置环境变量
      • 4.验证是否安装成功
    • 安装kafka
      • 1.下载kafka
      • 2.安装kafka
    • Kafka运行
    • 与python结合实例,制作生产者与消费者
    • 待完善的问题

前言

安装Kafka之前,我们需要安装JDK、Zookeeper、Scala。 JDK(Java Development Kit)是 Java 开发工具包,它包括 Java 编译器(javac)用于将代码编译成字节码,Java 虚拟机(JVM)负责运行字节码,还有大量的 Java 核心类库。这些类库为 Kafka 提供了诸如网络通信、文件操作、内存管理等基础功能的支持。

Zookeeper 是一个分布式的、开源的应用程序协调服务。它主要用于维护配置信息、命名、提供分布式同步和组服务等。它以一种简单的层次结构数据模型(类似文件系统的目录树结构)来存储数据。

Scala 运行在 Java 虚拟机上,可以与 Java 代码无缝集成。它具有许多独特的特性,比如类型推断,这使得代码编写更加简洁。

安装原因与使用:Kafka 是用 Scala 编写的,而 Scala 运行在 Java 虚拟机(JVM)上。JDK 提供了 Java 编译器和 Java 虚拟机等关键组件,是 Kafka 运行的基础环境。Kafka 的启动脚本、核心代码的执行都依赖于 JVM,没有 JDK,Kafka 无法正常运行。Kafka 使用 Zookeeper 来管理和协调集群。Zookeeper 存储了 Kafka 集群的元数据信息,比如集群中有哪些 broker(Kafka 服务器)、每个 topic(主题)的分区分布情况、消费者的消费偏移量等信息。它在 Kafka 集群的动态扩展、故障恢复等过程中起到关键的协调作用。

JDK安装

1.下载jdk

官网地址安装链接:https://www.oracle.com/java/technologies/downloads 进入官网后 在这里插入图片描述 这里下载的是Windows版本的JDK 23,从上到下 x64 Compressed Archive(228.70 MB)是一个压缩文件(.zip),这种类型的文件下载后需要手动解压,然后配置系统环境变量才能使用 JDK。适合那些喜欢手动管理软件安装和配置的用户。

x64 Installer(205.21 MB)是一个可执行的安装程序(.exe),这种类型的文件下载后,用户可以直接双击运行安装程序,它会自动将 JDK 安装到系统中。

x64 MSI Installer(203.96 MB)是一个 Windows Installer 包(.msi)这种类型的文件通常用于企业环境或通过组策略进行软件部署。它提供了更高级的安装选项。 这里我们直接下载第二种,如果有别的需求,也可以按需下载不同系统的JDK安装文件。

2.安装jdk

下载后在文件夹找到文件如图所示 在这里插入图片描述 双击运行这个文件,跟随给出的步骤操作就可以安装好JDK,在这个过程中可以修改JDK的安装目录,这里注意:安装目录一定不要用中文,并且不能有空格。建议直接用默认的安装路径一般为C:\\Program Files\\Java\\jdk-23或者C:\\Program\\Java\\jdk-23,不想修改就记一下这里的路径,后面配置环境变量需要用到。

3.验证是否安装成功

按下window键+R,在运行栏中输入cmd, 输入如下命令:

java -version –执行工具
javac -version –编译工具

如图: 在这里插入图片描述 出现版本号,说明安装成功。

4.配置环境变量

上面我们还没有配置环境变量,在cmd终端就能使用如上的“java -version ” 、“javac -version”,,这是因为现在的版本安装jdk后,会自动给我们创建好环境变量(路径在xx\\xx\\xx\\Java\\javapath),以前的版本是必须手工创建的,方法如下: 首先,打开“编辑系统环境变量” 在这里插入图片描述 进入系统变量–path 在这里插入图片描述 删除自动创建的路径(xx\\xx\\xx\\Java\\javapath)

然后再去环境变量里添加路径,首先点击新建,名称为JAVA_HOME,值为上面存放JDK的文件路径(我的JDK文件下载在C:\\Program Files\\Java\\jdk-23)。 在这里插入图片描述 然后再到环境变量path中添加JAVA_HOME,首先双击path打开该变量目录点击新建,添加变量%JAVA_HOME%\\bin,完成后如下 在这里插入图片描述 OK ,以后如果改了jdk的安装路径,只需要更改JAVA_HOME的值即可。

Zookeeper安装

1.下载Zookeeper

安装链接:https://pan.baidu.com/s/1TbgZXmK5_O8W4iZiuFicgw?pwd=yyds 我这里是通过一个博主的网盘链接下载的 参考文章:【Zookeeper】Windows下安装Zookeeper(图文记录详细步骤,手把手包安装成功)

2.安装Zookeeper

这个文件我下载后解压在E:bigdata,文件名就是zookeeper, 进入目录后要新建一个data文件, 在这里插入图片描述 然后进入conf目录, 在这里插入图片描述

修改zoo_sample.cfg文件,改名为zoo.cfg,我这里直接复制了一份改了一下名字 在这里插入图片描述

,再进入zoo.cfg把dataDir=/tmp/zookeeper 修改为data文件路径dataDir=E:\\bigdata\\zookeeper\\data 在这里插入图片描述

在这里有一个小问题,因为我的zookeeper文件最开始直接安装在E盘,不是bigdata文件,所以路径是E:\\zookeeper\\data因此储存信息的zookeeper文件出现在E盘,如果想修改或删除kafka运行的相关文件,不仅要修改或删除kafka文件里的储存文件,还要修改或删除这个出现在E盘的文件 以上zookeeper文件安装和配置就完成了

3.配置环境变量

再如同JDK文件一样去环境变量添加zookeeper文件的路径,先在系统变量里添加ZOOKEEPER_HOME,值为zookeeper文件路径 在这里插入图片描述

再去path中添加ZOOKEEPER_HOME, 在这里插入图片描述 这样,以后zookeeper文件路径有变化时直接修改ZOOKEEPER_HOME对应的值就行了

4.验证是否安装成功

以管理员权限打开命令窗口,输入:

zkServer

出现下图显示,说明安装成功 在这里插入图片描述

Scala安装

1.下载Scala

安装链接:https://pan.baidu.com/s/1Qiy1aEndKn_Xs-zSSLaWIA?pwd=yyds 同上,这个文件也是在网盘下载的 参考文件:【Scala】Windows下安装Scala(以Scala 2.11.12为例)

2.安装Scala

这个文件不需要进行修改,直接安装好就行,我把这个文件也是放在了E:bigdata,名称为Scala(所以他的路径为E:bigdata\\Scala)

3.配置环境变量

同上,去环境变量中新增环境变量 SCALA_HOME,值为Scala对应路径,如下图:在这里插入图片描述 并且在环境变量Path添加条目: 在这里插入图片描述

4.验证是否安装成功

开启一个新的cmd窗口。输入:Scala 在这里插入图片描述

出现上述界面,说明安装成功

安装kafka

1.下载kafka

安装链接:https://pan.baidu.com/s/1Av4ZwQPUaAntwVxz79Ne9w?pwd=yyds 同上,网盘下载 参考文件:【Kafka】Windows下安装Kafka(图文记录详细步骤)

2.安装kafka

文件我下载在了E:bigdata,名称为kafka 接下来需要在Kafka安装目录下新建目录logs, 在这里插入图片描述 然后修改配置文件 server.properties 文件路径:…bigdata\\kafka\\config\\server.properties 修改 log.dirs 参数值,修改成上一步新建的logs文件夹。注意文件夹路径中是双左斜杠, 例如我的是log.dirs=E:\\bigdata\\kafka\\logs。 在这里插入图片描述 再然后修改 listeners 参数值,还是 server.properties文件, 在35行左右添加listeners=PLAINTEXT://localhost:9092 这个代表了你创建的kafka服务器的端口号,在后续连接进行操作时会用到 在这里插入图片描述 以上前置需要使用的文件安装完成

Kafka运行

Kafka依赖于Zookeeper, 所以在终端以管理员身份打开命令行首先输入命令zkServer启动Zookeeper服务,这个在验证zookeeper是否安装成功有提到,就不放图了, 再另开一个命令行,还是以管理员身份打开,首先进入kafka文件所在的目录,例如我这里是在E盘的bigdata文件中所以我的过程如下 在这里插入图片描述 输入如下命令启动Kafka服务:

.\\bin\\windows\\kafka-server-start.bat .\\config\\server.properties

运行结果如下 在这里插入图片描述 以管理员权限再新开一个命令提示窗口,进入E:\\bigdata\\kafka\\bin\\windows目录,

1|E:
2|cd bigdata\\kafka\\bin\\windows

执行以下命令,创建topics:

kafka-topics.bat –create –bootstrap-server localhost:9092 –replication-factor 1 –partitions 1 –topic test

kafka – topics.bat:这是 Kafka 用于操作主题(topic)的脚本文件(在 Windows 环境下)。它提供了一系列与主题相关的操作功能,如创建、删除、查看主题信息等。 –create:这是一个操作指令,表示要创建一个新的主题。 –bootstrap – server localhost:9092:指定 Kafka 集群的引导服务器地址和端口。localhost表示本地主机,如果 Kafka 部署在远程服务器上,则需要替换为相应的服务器域名或 IP 地址。9092是 Kafka 服务器默认的监听端口。客户端(如这个创建主题的操作)通过连接到这个引导服务器来获取整个 Kafka 集群的相关信息,包括其他的服务器节点信息等。 –replication – factor 1:复制因子用于指定主题的每个分区在 Kafka 集群中的副本数量。这里设置为1,意味着每个分区只有一个副本。在生产环境中,为了保证数据的高可用性和容错性,通常会设置大于1的值,这样即使某个节点出现故障,数据仍然可以从其他副本中获取。 –partitions 1:指定主题的分区数量。分区是 Kafka 对数据进行分布式存储和处理的基本单位。通过将主题划分为多个分区,可以实现并行处理数据,提高系统的吞吐量。这里创建的主题只有1个分区。 –topic test:指定要创建的主题名称为test。主题是 Kafka 中消息的分类标识,生产者将消息发送到特定的主题,消费者从特定的主题中接收消息。 由上可知,–bootstrap – server localhost:9092设置端口,–topic test设置主题,主要通过修改这两个值来达到区分不同主机不同端口的不同主题的目的 在这里插入图片描述 如上图,我这里已经创建了test这个主题,所以报错了,更换成test2,当出现Created topic test2.说明创建成功。 查看topics列表:

kafka-topics.bat –bootstrap-server localhost:9092 –list

在这里插入图片描述 打开producer(生产者) 以管理员身份新开一个命令行,输入如下命令:

kafka-console-producer.bat –broker-list localhost:9092 –topic test

在这里插入图片描述 打开consumer(消费者) 以管理员身份新开一个命令行,输入如下命令:

kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic test –from-beginning

在这里插入图片描述 这样kafka的服务器就搭建完成,在producer(生产者)输入信息,在consumer(消费者)就可以实时接收。

与python结合实例,制作生产者与消费者

使用pycharm运行python文件往kafka服务器,首先要下载kafka库,在自己的虚拟环境运行命令pip install kafka,将这个库下载到环境中。 1.发送信息: 到pycharm中运行以下代码:

from kafka import KafkaProducer
import json
import time

# 创建KafkaProducer实例
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))

# 发送消息到指定Topic
topic = 'my_topic'
message_dict = {'name': 'luowei', 'message': 'this is a test message'}

# 发送消息
while True:
producer.send(topic, message_dict)
time.sleep(5)

这段代码作用是向指定的kafka服务器端口里的一个指定主题,以utf-8的形式每隔5秒发送一个字典,是一个非常简单的生产者 运行这段代码后,在终端打开这个my_topic的生产者可以看到接收到了信息 在这里插入图片描述 2.使用代码读取kafka服务器主题中的信息

from kafka import KafkaConsumer
import json

# Kafka服务器地址和要读取的主题名称
bootstrap_servers = '127.0.0.1:9092'
topic_name = 'my_topic'

# 创建KafkaConsumer实例
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest', # 从最早的消息开始读取,如果不需要可以修改
group_id='test_group' # 消费者组ID,可以根据需要修改
)

for message in consumer:
try:
# 这里假设消息的值是字符串类型,如果是其他类型需要相应处理
print(f"收到消息: {message.value.decode('utf-8')}")
# 如果消息是JSON格式,可以这样解析
# message_value = json.loads(message.value.decode('utf-8'))
# print(f"解析后的消息: {message_value}")
except Exception as e:
print(f"处理消息时出错: {e}")

结果如下: 在这里插入图片描述

待完善的问题

1,在“kafka运行”这一段中还有一个删除方法,最开始直接运行删除命令会报错,在网上查找资料后发现要修改配置文件,在server.properties文件中加入delete.topic.enable = true,可是加入后发现依旧报错,并且整个kafka系统还会报错退出,错误与解释如下 [2024 – 11 – 07 17:10:52,557] ERROR [Broker id = 0] Ignoring StopReplica request (delete = true) from controller 0 with correlation id 5 epoch 1 for partition test – 0 as the local replica for the partition is in an offline log directory (state.change.logger) 这条日志表明 broker 0 收到了来自控制器 0 的针对分区 test – 0 的停止副本(StopReplica)请求,且请求中包含了删除(delete = true)操作。然而,broker 0 忽略了这个请求,原因是分区 test – 0 的本地副本所在的日志目录处于离线状态。这意味着 broker 无法正常访问该分区的日志数据,可能是由于之前提到的日志目录权限问题或者其他导致目录不可用的原因。 经过查询资料后得出可能的两个原因:一是kafka版本与zookeeper不是很适配,二是版本与输入命令不搭,在一些较早的版本删除命令有些许变化 2,在使用代码写入与读取时速度较慢,尤其是读取时 3,在使用代码发送时,无法向已发送过的topic再次发送信息,这里可能与生产者的配置文件有关,因时间原因并没有深入研究 4.当生产者发送的信息是用UTF-8编码处理时,有些没有将配置完善的kafka服务器终端在接收信息时会出现乱码情况

赞(0)
未经允许不得转载:网硕互联帮助中心 » 在Windows下搭建kafka服务器,使用python制作生产者与消费者
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!