bireme数据源同步工具--debezium+kafka+bireme

1、介绍

Bireme 是一个 Greenplum / HashData 数据仓库的增量同步工具。目前支持 MySQL、PostgreSQL 和 MongoDB 数据源
官方介绍文档:https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md

1、数据流

bireme数据源同步工具--debezium+kafka+bireme
Bireme 采用 DELETE + COPY 的方式,将数据源的修改记录同步到 Greenplum / HashData ,相较于INSERT + UPDATE + DELETE的方式,COPY 方式速度更快,性能更优

2、数据源

2.1、Maxwell + Kafka 是 bireme 目前支持的一种数据源类型,架构如下图:
bireme数据源同步工具--debezium+kafka+bireme
Maxwell 是一个 MySQL binlog 的读取工具,它可以实时读取 MySQL 的 binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka
 
2.2、Debezium + Kafka 是 bireme 支持的另外一种数据源类型,架构如下图:
bireme数据源同步工具--debezium+kafka+bireme
Debezium 是一个CDC工具,可以将数据库的增删改转换为事件流,并把这些修改发送给 Kafka

3、工作原理

Bireme 从数据源读取数据 (Record),将其转化为内部格式 (Row) 并缓存,当缓存数据达到一定量,将这些数据合并为一个任务 (Task),每个任务包含两个集合,delete 集合与insert 集合,最后把这些数据更新到目标数据库。
每个数据源可以有多个 pipeline,对于 maxwell,每个 Kafka partition 对应一个 pipeline;对于 debezium,每个 Kafka topic 对应一个 pipeline

4、本文搭建实例图形

bireme数据源同步工具--debezium+kafka+bireme
 

 

2、配置相关数据源、目标数据源和java环境

1、mysql数据源

1、数据库,create database syncdb1;
2、用户权限,需要拥有SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT,此处使用root权限
3、同步的表(切换到syncdb1数据库),create table tb1(a int, b char(10), primary key(a));

2、pgsql目的数据库

1、用户,create user syncdb with password 'syncdb';
2、数据库,create database syncdb with owner 'syncdb';
3、同步的表(使用syncdb用户切换到syncdb数据库),create table tb1(a int, b char(10), primary key(a));

3、java环境的安装

1、下载二进制安装包:jdk-8u101-linux-x64.tar.gz
2、解压二进制包并做软链接:tar xf jdk-8u101-linux-x64.tar.gz && ln -s /data/jdk1.8.0_101 /usr/java
3、配置路径和java环境变量:vim /etc/profile.d/java.sh
export JAVA_HOME=/usr/java
export JRE_HOME=$JAVA_HOME/jre
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
4、source生效:source  /etc/profile.d/java.sh
5、安装jsvc,yum install jsvc

 
 
 

3、kafka的安装和启动配置

1、下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/
2、kafka官方文档:http://kafka.apache.org/
3、解压缩:tar xf kafka_2.11-2.0.0.tgz && cd kafka_2.11-2.0.0
4、ZooKeeper

启动,bin/zookeeper-server-start.sh config/zookeeper.properties
关闭,bin/zookeeper-server-stop.sh config/zookeeper.properties

5、Kafka server

启动,bin/kafka-server-start.sh config/server.properties
启动,bin/kafka-server-stop.sh config/server.properties

6、Topic(不是本实验必须的,作为学习使用)

创建,bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic world
查询,bin/kafka-topics.sh --list --zookeeper localhost:2181
删除,bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic world

7、Producer(不是本实验必须的,作为学习使用)

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello
>jiaming
>

8、Consumer(不是本实验必须的,作为学习使用)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello
jiaming

 
 
 

4、debezium的安装和启动配置

下载debezium的mysql连接器
1、下载地址:https://debezium.io/docs/install/
2、debezium官方文档:https://debezium.io/docs/tutorial/
3、解压缩:tar xf debezium-connector-mysql-0.8.1.Final-plugin.tar.gz
4、解压出来的jar包全部拷贝到kafka libs目录下,cp debezium-connector-mysql/.jar kafka2.11-2.0.0/libs/
5、添加配置文件(用于连接mysql数据源,对应参数可参考官方介绍:https://debezium.io/docs/connectors/mysql/#example-configuration
cd kafka_2.11-2.0.0 && vim mysql.properties
note:debezium的database.server.name一定要和bireme的data_source保持一致

name=inventory-connector
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=118.190.209.102
database.port=5700
database.user=root
database.password=123456
database.server.id=129129
database.server.name=debezium1  # debezium的database.server.name一定要和bireme的data_source保持一致
database.whitelist=syncdb1  # 同步的数据库列表
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=dbhistory.debezium1
include.schema.changes=true

6、以独立模式启动kafka connect,此时debezium会对数据库中的每一个表创建一个topic,消费相应的topic,即可获取binlog解析信息

cd kafka_2.11-2.0.0
bin/connect-standalone.sh config/connect-standalone.properties mysql.properties

7、查看topic列表

cd kafka_2.11-2.0.0
bin/kafka-topics.sh --list --zookeeper localhost:2181

debezium1.syncdb1.tb1,每个数据源同步表会生成一个topic
debezium1,记录ddl操作
dbhistory.debezium1,记录对应ddl操作和position位点信息
bireme数据源同步工具--debezium+kafka+bireme

 
 

5、bireme的安装和启动配置

1、下载地址:https://github.com/HashDataInc/bireme/releases
2、bireme官方文档:https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md
3、解压缩:tar xf bireme-1.0.0.tar.gz && cd bireme-1.0.0
4、修改配置文件,vim etc/config.properties
note:debezium的database.server.name一定要和bireme的data_source保持一致

# target database where the data will sync into.
target.url = jdbc:postgresql://118.190.209.102:5432/syncdb
target.user = syncdb
target.passwd = syncdb

# data source name list, separated by comma.
data_source = debezium1  # debezium的database.server.name一定要和bireme的data_source保持一致

# data source "debezium1"
debezium1.type = debezium
# kafka server which debezium write into.
debezium1.kafka.server = 127.0.0.1:9092 
# kafka groupid used for consumer.
debezium1.kafka.groupid = bireme
debezium1.kafka.namespace = debezium1

# set the IP address for bireme state server.
state.server.addr = 0.0.0.0
# set the port for bireme state server.
state.server.port = 8080

5、修改配置文件,vim etc/debezium1.properties(表映射配置
note:debezium1.properties的debezium1一定要和bireme的data_source保持一致

# source table full name = target table full name
syncdb1.tb1 = public.tb1

6、启动bireme,bin/bireme start
7、监控,http://192.168.1.129:8080/pretty (state.server.addr:state.server.port)
 
 
 

6、测试

1、mysql数据源

insert into tb1 select 1,'a';
insert into tb1 select 2,'b';

2、pgsql目标数据库

syncdb=# select * from tb1;
 a |     b      
---+------------
 1 | a         
 2 | b         
(2 rows)

 
 
 

7、优势和存在问题

1、优势

1、可以实现多个库表的汇总功能,syncdb1.tb1/syncdb2.tb1 可以汇总到pgsql的一张表tb1中
2、中间使用kafka消息队列,对于大数据量性能方面提升较好
3、不存在数据源库***问题,位点信息存放在kafka中的topic中
4、第一次启动debezium,会生成一个数据源数据库的snapshot,然后之后基于binlog的解析,这样避免了第一次同步数据源数据库到目标数据库的一份全量数据

2、存在问题
待测试补充

转载于:https://blog.51cto.com/11257187/2153817

版权声明:本文为weixin_33857679原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_33857679/article/details/91661542

智能推荐

2018.8.27

2018.8.27...

HTML 表单元素的基本样式

HTML 表单元素的基本样式 原创 ixygj197875 发布于2018-02-22 17:48:53 阅读数 2296 收藏 更新于2018-05-20 15:35:58 分类专栏: 揭秘 CSS 揭秘 CSS 收起 表单元素主要包括 label、input、textarea、select、datalist、******、progress、meter、output等,以及对表单元素进行分组的 ...

php输出语句

php输出语句 常见的输出语句 echo(): 可以一次输出多个值,多个值之间用逗号分隔。echo是语言结构(language construct),而并不是真正的函数,因此不能作为表达式的一部分使用。 print(): 函数print()打印一个值(它的参数),如果字符串成功显示则返回true,否则返回false。 print_r(): 可以把字符串和数字简单地打印出来,而数组则以括起来的键和值...

工厂模式

简介 常见的实例化对象模式。 用工厂方法替代new操作的一种模式。 当我们使用new操作实例化对象时,调用构造函数完成初始化。若初始化仅是进行赋值等简单的操作,写入构造函数即可。但如果初始化时需要执行一长串复杂的代码,将多个工作装入一个方法,是不妥的。 创建实例与使用实例分离。将创建实例所需的大量初始化工作从基类的构造函数中分离出去。 简单工厂模式、工厂方法模式针对的是一个产品等级结构;而抽象工厂...

B1105 Spiral Matrix (画图)

B1105 Spiral Matrix (25分) //第一次只拿了21分 矩阵的长和宽,求最大因子,从sqrt(num)开始枚举. 每次循环一次,s++,t--,d--,r++ 测试点四运行超时,是因为输入一个数字的时候,需要直接输出这个数字。//1分 测试点二运行超时,最后一个数字不必再while循环一次,直接输出即可。//3分 最后一个测试点卡了好久/(ㄒoㄒ)/~~ 螺旋矩阵...

猜你喜欢

Java基础=>String,StringBuffer与StringBuilder的区别

字符串常量池 什么是字符串常量池? JVM为了减少字符串对象的重复创建,其维护了一块特殊的内存,这段内存被称为字符串常量池(存储在方法区中)。 具体实现 当代码中出现字符串时,JVM首先会对其进行检查。 如果字符串常量池中存在相同内容的字符串对象,如果有,则不再创建,直接返回这个对象的地址返回。 如果字符串常量池中不存在相同内容的字符串对象,则创建一个新的字符串对象并放入常量池,并返回新创建的字符...

java调用其他java项目的Https接口

项目中是这样的: 用户拿出二维码展示,让机器识别二维码, 机器调用开门的后台系统接口, 然后开门的后台系统接口需要调用管理系统的接口, 管理系统需要判断能不能开门.这两个系统是互相独立的.当时使用http调用是没有问题的.当时后来要求必须用https.废话不说,直接代码: 我的项目中调用的是 HttpsUtils.Get(utlStr) 这个接口 开门系统接口如下图:   管理系统的接口...

Hadoop1.2.1全分布式模式配置

一 集群规划 主机名            IP                               安装的软件 &nbs...

Go语言gin框架的安装

尝试安装了一下gin,把遇到的一些小问题来记录一下 安装步骤 首先来看看官方文档,链接点这里 可以看到安装步骤很简单,就一句话 在命令行中输入这句话运行等待就好。 问题来了,因为墙的问题,go get会很慢,所以命令行里面半天什么反应也没有,不要急,慢慢等着就会看到gin-gonic/gin这个目录出现 这个时候命令行还是没有结束,表示还在下一些东西。有的时候可能心急的人就停了(比如我),然后写个...

uni-app表单组件二

input(输入框) 属性名 类型 说明 平台差异 value String 输入框的初始内容 type String input 的类型 password Boolean(默认false) 是否是密码类型 placeholder String 输入框为空时占位符 placeholder-style String 指定 placeholder 的样式 placeholder-class Strin...