kafka-connect-oracle

1.基础环境配置

安装zookeeper、kafka

linux下安装zookeeper https://www.cnblogs.com/expiator/p/9853378.html

linux下安装kafka https://juejin.cn/post/6844903745772322824

2.oracle安装及配置

以oracle12c为例

  • 使用docker安装oracle12c

https://blog.csdn.net/Damionew/article/details/84566718

  • oracle开启LogMiner

检查数据库日志模式

select log_mode from v$database;

LOG_MODE
------------
ARCHIVELOG

开启LogMiner需要用拥有DBA权限的用户登录Oracle进行开启。

sqlplus / as sysdba
SQL>shutdown immediate;
SQL>startup mount;
SQL>alter database archivelog;
SQL>alter database open;

启用补充日志记录

sqlplus / as sysdba
SQL>alter database add supplemental log data (all) columns;

为了成功执行连接器,必须使用特权Oracle用户启动连接器。

create role <role_name>;
grant create session to <role_name>;
grant  execute_catalog_role,select any transaction ,select any dictionary,logmining to <role_name>;
create user <user_name> identified by <user_pwd>;
grant  <role_name> to <user_name>;
alter user<user_name> quota unlimited on users ;

添加建表权限:

grant create any table to <user_name>;

3.安装kafka-connect-oracle

3.1 安装包获取###

  • 官方地址:

https://github.com/erdemcer/kafka-connect-oracle

下载项目后mvn clean package 打成kafka-connect-oracle-1.0.jar。

  • 下载oracle的jdbc驱动包

  • 将oracle驱动包及kafka-connect-oracle-1.0.jar放至kafka安装路径的lib目录下。

  • 将github项目里面的config/OracleSourceConnector.properties文件拷贝到kafak/config。

3.2 OracleSourceConnector.properties配置文件详解:

name=oracle-logminer-connector
connector.class=com.ecer.kafka.connect.oracle.OracleSourceConnector
db.name.alias=test
tasks.max=1
topic=test-oracle-hdc
db.name=xe
db.hostname=172.18.26.194
db.port=1521
db.user=hdc
db.user.password=hdc
db.fetch.size=1
table.whitelist=HDC.*
parse.dml.data=true
reset.offset=true
start.scn=
multitenant=false
table.blacklist=

详情参考:https://github.com/erdemcer/kafka-connect-oracle

3.3 启动

启动connector:

Kafka connect的工作模式分为两种,分别是standalone模式和distributed模式。

standalone模式启动:

bin/connect-standalone.sh -daemon config/connect-standalone.properties config/OracleSourceConnector.properties

distributed模式启动:

bin/connect-distributed.sh -daemon config/connect-distributed.properties

在connect-distributed.properties的配置文件中,其实并没有配置了你的connector的信息,因为在distributed模式下,启动不需要传递connector的参数,而是通过REST API来对kafka connect进行管理,包括启动、暂停、重启、恢复和查看状态的操作。

在启动kafkaconnect的distributed模式之前,首先需要创建三个主题,这三个主题的配置分别对应connect-distributed.properties文件中config.storage.topic(default connect-configs)、offset.storage.topic (default connect-offsets) 、status.storage.topic (default connect-status)的配置。

  • config.storage.topic:用以保存connector和task的配置信息,需要注意的是这个主题的分区数只能是1,而且是有多副本的。(推荐partition 1,replica 1)

  • offset.storage.topic:用以保存offset信息。(推荐partition50,replica 1)

  • status.storage.topic:用以保存connetor的状态信息。(推荐partition10,replica 1)

以下是创建主题命令:

config.storage.topic=connect-configs
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 1 --partitions 1 --config cleanup.policy=compact
offset.storage.topic=connect-offsets
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 1 --partitions 50 --config cleanup.policy=compact
status.storage.topic=connect-status
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic connect-status --replication-factor 1 --partitions 10 --config cleanup.policy=compact

kafka-comsumer消费:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test-oracle-hdc

topic常用命令:

1、创建topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test1

2、查看所有topic

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

3、产生console消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

4、消费console消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning

5、显示test1 topic详情

bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test1

6、删除topic test1

bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test1

执行结果:

DDL:

{
"schema": {
"type": "struct",
"fields": [
  {
"type": "int64",
"optional": false,
"field": "SCN"
  },
  {
"type": "string",
"optional": false,
"field": "SEG_OWNER"
  },
  {
"type": "string",
"optional": false,
"field": "TABLE_NAME"
  },
  {
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "TIMESTAMP"
  },
  {
"type": "string",
"optional": false,
"field": "SQL_REDO"
  },
  {
"type": "string",
"optional": false,
"field": "OPERATION"
  },
  {
"type": "struct",
"fields": [
  
],
"optional": true,
"field": "data"
  },
  {
"type": "struct",
"fields": [
  
],
"optional": true,
"field": "before"
  }
],
"optional": false,
"name": "test.hdc.user_info.row"
  	},
"payload": {
"SCN": 1879834,
"SEG_OWNER": "HDC",
"TABLE_NAME": "_GENERIC_DDL",
"TIMESTAMP": 1622138532000,
"SQL_REDO": "create table USER_INFO\n(\n  ID   NUMBER(11) not null,   \n  NAME   VARCHAR2(50) not null,   \n  AGE   NUMBER(5) not null\n  \n)",
"OPERATION": "DDL",
"data": null,
"before": null
  }
}

DML:

insert:

{
  "schema": {
"type": "struct",
"fields": [
  {
"type": "int64",
"optional": false,
"field": "SCN"
  },
  {
"type": "string",
"optional": false,
"field": "SEG_OWNER"
  },
  {
"type": "string",
"optional": false,
"field": "TABLE_NAME"
  },
  {
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "TIMESTAMP"
  },
  {
"type": "string",
"optional": false,
"field": "SQL_REDO"
  },
  {
"type": "string",
"optional": false,
"field": "OPERATION"
  },
  {
"type": "struct",
"fields": [
  {
"type": "int64",
"optional": false,
"field": "ID"
  },
  {
"type": "string",
"optional": false,
"field": "NAME"
  },
  {
"type": "int32",
"optional": false,
"field": "AGE"
  }
],
"optional": true,
"name": "value",
"field": "data"
  },
  {
"type": "struct",
"fields": [
  {
"type": "int64",
"optional": false,
"field": "ID"
  },
  {
"type": "string",
"optional": false,
"field": "NAME"
  },
  {
"type": "int32",
"optional": false,
"field": "AGE"
  }
],
"optional": true,
"name": "value",
"field": "before"
  }
],
"optional": false,
"name": "test.hdc.user_info.row"
  },
  "payload": {
"SCN": 1880255,
"SEG_OWNER": "HDC",
"TABLE_NAME": "USER_INFO",
"TIMESTAMP": 1622139201000,
"SQL_REDO": "insert into \"HDC\".\"USER_INFO\"(\"ID\",\"NAME\",\"AGE\") values (2,'test1',22)",
"OPERATION": "INSERT",
"data": {
  "ID": 2,
  "NAME": "test1",
  "AGE": 22
},
"before": null
  }
}

update:

{
  "schema": {
"type": "struct",
"fields": [
  {
"type": "int64",
"optional": false,
"field": "SCN"
  },
  {
"type": "string",
"optional": false,
"field": "SEG_OWNER"
  },
  {
"type": "string",
"optional": false,
"field": "TABLE_NAME"
  },
  {
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "TIMESTAMP"
  },
  {
"type": "string",
"optional": false,
"field": "SQL_REDO"
  },
  {
"type": "string",
"optional": false,
"field": "OPERATION"
  },
  {
"type": "struct",
"fields": [
  {
"type": "int64",
"optional": false,
"field": "ID"
  },
  {
"type": "string",
"optional": false,
"field": "NAME"
  },
  {
"type": "int32",
"optional": false,
"field": "AGE"
  }
],
"optional": true,
"name": "value",
"field": "data"
  },
  {
"type": "struct",
"fields": [
  {
"type": "int64",
"optional": false,
"field": "ID"
  },
  {
"type": "string",
"optional": false,
"field": "NAME"
  },
  {
"type": "int32",
"optional": false,
"field": "AGE"
  }
],
"optional": true,
"name": "value",
"field": "before"
  }
],
"optional": false,
"name": "test.hdc.user_info.row"
  },
  "payload": {
"SCN": 1882426,
"SEG_OWNER": "HDC",
"TABLE_NAME": "USER_INFO",
"TIMESTAMP": 1622143353000,
"SQL_REDO": "update \"HDC\".\"USER_INFO\" set \"NAME\" = 'test-3' where \"ID\" = 3 and \"NAME\" = 'test3' and \"AGE\" = 33",
"OPERATION": "UPDATE",
"data": {
  "ID": 3,
  "NAME": "test-3",
  "AGE": 33
},
"before": {
  "ID": 3,
  "NAME": "test3",
  "AGE": 33
}
  }
}

4.通过rest api管理connector

因为kafka connect的意图是以服务的方式去运行,所以它提供了REST API去管理connectors,默认的端口是8083,你也可以在启动kafka connect之前在配置文件中添加rest.port配置。

  • GET /connectors – 返回所有正在运行的connector名
  • POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
  • GET /connectors/{name} – 获取指定connetor的信息
  • GET /connectors/{name}/config – 获取指定connector的配置信息
  • PUT /connectors/{name}/config – 更新指定connector的配置信息
  • GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
  • GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
  • GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息
  • PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
  • PUT /connectors/{name}/resume – 恢复一个被暂停的connector
  • POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
  • POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
  • DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。

kafka-connect-oracle
http://example.com/2022/07/28/kafka-connect-oracle/
作者
乔帮主
发布于
2022年7月28日
许可协议