linux系统-本机安装cdc

安装脚本

#!/bin/bash
# Filename: install_kafka_debezium.sh
# Usage: sudo bash install_kafka_debezium.sh

set -e

# -------------------------
# 1. 安装 Java 17
# -------------------------
echo "=== Installing OpenJDK 17 ==="
JAVA_BIN=$(which java || true)
if [ -z "$JAVA_BIN" ] || ! "$JAVA_BIN" -version 2>&1 | grep -q "17"; then
    sudo apt update || true
    sudo apt install -y openjdk-17-jdk wget tar
    JAVA_BIN=$(which java)
fi
echo "Using Java at: $JAVA_BIN"
"$JAVA_BIN" -version

# -------------------------
# 2. 下载并解压 Kafka 3.5.1
# -------------------------
KAFKA_VERSION="3.7.2"
SCALA_VERSION="2.13"
KAFKA_DIR="$HOME/kafka_$SCALA_VERSION-$KAFKA_VERSION"
echo $HOME
if [ ! -d "$KAFKA_DIR" ]; then
    echo "=== Downloading Kafka $KAFKA_VERSION ==="
    wget -q https://downloads.apache.org/kafka/$KAFKA_VERSION/kafka_$SCALA_VERSION-$KAFKA_VERSION.tgz -O /tmp/kafka.tgz
    tar -xzf /tmp/kafka.tgz -C $HOME
fi

cd $KAFKA_DIR

# -------------------------
# 3. 配置 KRaft 模式
# -------------------------
echo "=== Configuring KRaft mode ==="
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64/
export PATH=$JAVA_HOME/bin:$PATH
KRAFT_CLUSTER_ID=$(bin/kafka-storage.sh random-uuid)
bin/kafka-storage.sh format -t $KRAFT_CLUSTER_ID -c config/kraft/server.properties

# -------------------------
# 4. 下载 Debezium Connector
# -------------------------
DEBEZIUM_VERSION="2.6.0.Final"
PLUGIN_DIR="$KAFKA_DIR/plugins/debezium"

mkdir -p $PLUGIN_DIR
echo "=== Downloading Debezium MySQL Connector $DEBEZIUM_VERSION ==="
wget -q https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/$DEBEZIUM_VERSION/debezium-connector-mysql-$DEBEZIUM_VERSION-plugin.tar.gz -O /tmp/debezium.tar.gz
tar -xzf /tmp/debezium.tar.gz -C $PLUGIN_DIR

# -------------------------
# 5. 创建 Debezium Connect 配置文件
# -------------------------
CONNECT_CONFIG="$KAFKA_DIR/config/debezium-standalone.properties"

cat > $CONNECT_CONFIG <<EOF
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=$HOME/connect.offsets
plugin.path=$PLUGIN_DIR
group.id=1
config.storage.topic=my_connect_configs
offset.storage.topic=my_connect_offsets
status.storage.topic=my_connect_statuses
EOF

# -------------------------
# 6. 启动 Kafka KRaft
# -------------------------
echo "=== Starting Kafka KRaft broker ==="
nohup bin/kafka-server-start.sh config/kraft/server.properties > $HOME/kafka.log 2>&1 &

sleep 10  # 等待 Kafka 启动

# -------------------------
# 7. 启动 Debezium Connect
# -------------------------
echo "=== Starting Debezium Connect ==="
nohup bin/connect-standalone.sh config/debezium-standalone.properties > $HOME/debezium.log 2>&1 &

echo "=== Kafka + Debezium setup complete ==="
echo "Kafka KRaft broker running on: localhost:9092"
echo "Debezium Connect REST API running on port 8083"

然后等待安装,安装完成执行执行创建topic

bin/kafka-topics.sh --delete --topic infineon-cms.infineon-cms.application_families --bootstrap-server kafka地址

bin/kafka-topics.sh \
--create \
--topic dbhistory_ifx \
--bootstrap-server kafka地址 \
--partitions 1 \
--replication-factor 1 \
--config retention.ms=-1

 

然后等待创建,创建完成执行post请求http://127.0.0.1:8083/connectors添加connectors

{
  "name": "mysql-connector1",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "数据库地址",
    "database.port": "端口号",
    "database.user": "账号",
    "database.password": "数据库密码",
    "database.server.id": "binlog设置的serverid",
    "database.server.name": "名称",
    "database.include.list": "数据库列表",
    "table.include.list": "数据库名.表名逗号分隔",
    "topic.prefix": "前缀",
    "schema.history.internal.kafka.bootstrap.servers": "kafka地址",
    "schema.history.internal.kafka.topic": "topic名称",
    "decimal.handling.mode": "double",
    "snapshot.mode": "initial"
  }
}

 

 

手动增加分区:

bin/kafka-topics.sh --bootstrap-server kafka地址 \
--alter --topic infineon-cms.infineon-cms.application_families --partitions 3

 

/bin/kafka-topics.sh --list --bootstrap-server kafka地址 查看列表

 

http://127.0.0.1:8083/connectors/mysql-connector 删除配置

tip:

每次重启需要重新创建新的connectors

版权声明:
作者:小凡
链接:https://ye-w.cn/2025/09/02/82.html
来源:小凡笔记-我的技术记录
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
打赏
< <上一篇
下一篇>>