# Kafka

# 一、安装部署

# 1.1 群集规划

node01 node02 node03
zk zk zk
kafka kafka kafka

JAR包下载: 从Kafka (opens new window)官方下载Jar包

# 1.2 群集部署

  1. 解压安装包
[hadoop@node01 opt]$ sudo wget https://downloads.apache.org/kafka/3.4.1/kafka_2.13-3.4.1.tgz
[hadoop@node01 opt]$ sudo tar -xzf kafka_2.13-3.4.1.tgz
1
2
  1. 创建logs文件夹
[hadoop@node01 kafka]$ mkdir /opt/kafka/logs
1
  1. 修改配置文件
node03-➜  /opt vim /opt/kafka/config/server.properties
# 每台broker的id需要唯一,不能重复
broker.id=0
# 设置 kafka topic 数据存放目录
log.dirs=/opt/kafka/data
# 设置zookeeper的节点
zookeeper.connect=node01:2181,node02:2181,node03:2181/kafka
1
2
3
4
5
6
7
  1. 配置环境变量
[root@node01 kafka]# vim /etc/profile.d/my_env.sh
#KAFKA_HOME
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
1
2
3
4
  1. 分发文件
[root@node01 kafka]# xsync /etc/profile.d/my_env.sh
[root@node01 opt]# xsync kafka
1
2
  1. 分别在node02,node03上修改 broker.id

  2. 启动脚本

#!/bin/bash

case $1 in
"start"){
    for i in node01 node02 node03
    do
        echo " --------启动 $i Kafka-------"
        ssh $i "/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties"
    done
};;
"stop"){
    for i in node01 node02 node03
    do
        echo " --------停止 $i Kafka-------"
        ssh $i "/opt/kafka/bin/kafka-server-stop.sh"
    done
};;
esac
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  1. 启动停止kafka群集
[root@node01 script]# kf.sh start
[root@node01 script]# kf.sh stop
1
2
  1. 在zk中查看kafka状态
node01-➜  ~ /opt/zookeeper/bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 4] ls /kafka/brokers/ids
[0, 1, 2]
1
2
3

# 二、命令行操作

官方快速指南 (opens new window)

# 2.1 创建topic

$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
kafka-topics.sh  --bootstrap-server localhost:9092 --create --topic first --partitions 1 --replication-factor 3
1
2

# 2.2 查看topic列表

kafka-topics.sh --list --bootstrap-server localhost:9092
1

# 2.3 创造消息

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
1
2
3

# 2.4 消费消息

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
1
2
3

# 2.5 删除数据

停掉服务,然后清除数据。

$ rm -rf /tmp/kafka-logs /tmp/zookeeper
1

# 2.6 删除某个topic的数据

bin/kafka-topics.sh --delete --topic bin_log_coupon --zookeeper localhost:2181
1

# 三、KnowStreaming 监控

# 3.1 安装 MySQL 服务

传送门

# 3.2 安装ElasticSearch

传送门

# 3.3 KnowStreaming 实例搭建

# 下载安装包
[root@node01 opt]# wget https://s3-gzpu.didistatic.com/pub/knowstreaming/KnowStreaming-3.3.0.tar.gz

# 解压安装包到指定目录
[root@node01 opt]# tar -xzf KnowStreaming-3.3.0.tar.gz

# 修改启动脚本并加入systemd管理
cd  KnowStreaming

# 创建相应的库和导入初始化数据
mysql -uroot -proot -e "create database know_streaming;"
mysql -uroot -proot  know_streaming < ./init/sql/ddl-ks-km.sql
mysql -uroot -proot  know_streaming < ./init/sql/ddl-logi-job.sql
mysql -uroot -proot  know_streaming < ./init/sql/ddl-logi-security.sql
mysql -uroot -proot  know_streaming < ./init/sql/dml-ks-km.sql
mysql -uroot -proot  know_streaming < ./init/sql/dml-logi.sql

# 创建elasticsearch初始化数据
sh ./bin/init_es_template.sh

# 修改配置文件
vim ./conf/application.yml

# 监听端口
server:
    port: 8080 # web 服务端口
    tomcat:
        accept-count: 1000
        max-connections: 10000

# ES地址
es.client.address: 127.0.0.1:8060

# 数据库配置(一共三处地方,修改正确的mysql地址和数据库名称以及用户名密码)
jdbc-url: jdbc:mariadb://127.0.0.1:3306/know_streaming?.....
username: root
password: root

# 启动服务
cd /opt/KnowStreaming/bin/
sh startup.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

# 3.4 使用KnowStreaming

# 3.4.1 登录系统

默认用户名密码admin:admin

更新时间: 7/7/2023, 11:32:03 AM