-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgpsync.py
More file actions
201 lines (181 loc) · 6.31 KB
/
gpsync.py
File metadata and controls
201 lines (181 loc) · 6.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# 加# %%方便单步调试,正式执行时替换为##%%,需要单步时还原为# %%
# %%
import os,sys
import re
import tomli
import json
import psycopg2
from pandas import json_normalize
# %%
# 由yaml配置文件生成include table的json、exclude table的txt以及include table的delete sql
def gen_copyfile(cfg_file='config.toml'):
# exclude table file:db.schema.table
with open(cfg_file, "rb") as f:
toml_dict = tomli.load(f)
df = json_normalize(toml_dict)#扁平化json数据
# 写exclude文件
str = '\n'
print('generate exclude_table.txt...')
with open("exclude_table.txt","w") as f:
f.write(str.join(df.columns.tolist()))
dct_json=[]
dels=[]
for r in df.columns.tolist():
row=f'delete from {r} {df[r][0]};\n'
# print(row)
dels.append(row)
dct={}
dct['source']=r
dct['sql']=f"select * from {r} {df[r][0]}"
dct['dest']=r
dct_json.append(dct)
print('generate include_table.json...')
with open("include_table.json","w") as f:
f.write(json.dumps(dct_json))
return dels
# %%
def run_sql(sql,dest,dbname):
conn = psycopg2.connect(database=dbname, user=dest['usr'], password=dest['pwd'], host=dest['host'], port=dest['port'])
print(conn)
cur = conn.cursor()
cur.execute(sql)
conn.commit()
conn.close()
# %%
# use gpcopy to transfer data
def copy_data(source,dest,dbname):
# 增量用json文件,同时先delete表数据
dels='\n'.join(gen_copyfile())
print('delete in dest db:\n',dels)
run_sql(dels,dest,dbname)
cmd=f"""export PGSSLMODE=disable && export PGPASSWORD={source['pwd']} && gpcopy \
--source-host {source['host']} --source-port {source['port']} --source-user {source['usr']} \
--dest-host {dest['host']} --dest-port {dest['port']} --dest-user {dest['usr']} \
--include-table-json include_table.json --append
"""
print(cmd)
os.system(cmd)
# 全量用exclude
cmd=f"""export PGSSLMODE=disable && export PGPASSWORD={source['pwd']} && gpcopy \
--source-host {source['host']} --source-port {source['port']} --source-user {source['usr']} \
--dest-host {dest['host']} --dest-port {dest['port']} --dest-user {dest['usr']} \
--dbname {dbname} --exclude-table-file exclude_table.txt --truncate
"""
# cmd="""export PGSSLMODE=disable && gpcopy --source-host {} --source-port {} --source-user {} \
# --dest-host {} --dest-port {} --dest-user {} \
# --dbname {} --exclude-table-file exclude_table.txt --truncate
# """.format(source['host'],source['port'],source['usr'],dest['host'],dest['port'],dest['usr'],dbname)
print(cmd)
os.system(cmd)
# %%
# 导出dump文件并预处理
def dump(dbinfo,db,filename):
cmd=f"export PGPASSWORD={dbinfo['pwd']} && pg_dump -h {dbinfo['host']} -p {dbinfo['port']} -U {dbinfo['usr']} -s -f {filename} -d {db}"
print('dump command:',cmd)
os.system(cmd)
# 替换)前的单空格
cmd=r"sed -i 's/ )/)/g' "+filename
print('replace " )" to ")":',cmd)
os.system(cmd)
# %%
# 拆分dump文件
def split_dump(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
statements = []
statement = ''
in_function = False
for line in lines:
# line = line.strip()
# 去空行
if line == '\n':
line = line.strip("\n")
if line.startswith('--'):
continue
else:
statement += line
if re.match(r'^CREATE FUNCTION', line):
in_function = True
if in_function:
if re.search(r'\$_\$;$', line) or re.search(r'\$\$;$', line): #$_$;或$$;结尾
statements.append(statement)
statement = ''
in_function = False
else:
if re.search(r';$', line):
statements.append(statement)
statement = ''
return statements
# %%
# 处理目标库中多余的(源库删除或修改)
"""
CREATE FUNCTION,drop
CREATE TABLE ,drop
ALTER ... OWNER TO
CREATE EXTERNAL TABLE gpfdist外部表,暂不考虑
COMMENT ON COLUMN
REVOKE FROM
GRANT FROM
"""
# 反转sql
def reverse_sql(sql):
obj=None
rsql=''
if sql.startswith('CREATE TABLE'):
obj=re.findall(r'^CREATE TABLE (\S+)', sql)[0]
rsql=f'drop table {obj};'
if sql.startswith('CREATE FUNCTION'):
obj=re.findall(r'^CREATE FUNCTION (.*?\))', sql)[0]
rsql=f'drop function {obj};'
if sql.startswith('CREATE INDEX'):
obj=re.findall(r'^CREATE INDEX (\S+)', sql)[0]
rsql=f'drop index if exists {obj};'#若该表先删,则执行报错
if sql.startswith('GRANT'):
obj=re.findall(r'^GRANT (.*+) TO (.*;)', sql)[0]
rsql=f'revoke {obj[0]} from {obj[1]};'
return rsql
#%%
def dest_has(dump_dest,dump_src):
# 若表空间不一致,直接退出
# compare_tablespace(dump_src,dump_dest)
has=[elem for elem in dump_dest if elem not in dump_src]
print('need del in dest db:',len(has))
dels=[]
for h in has:
dels.append(reverse_sql(h))
return has,dels
# %%
# 源库多的(源库新增或修改)
def src_adds(dump_src,dump_dest,dels):
dump_dest2=list(set(dump_dest)-set(dels))
print(len(dump_dest2))#16559
new=[elem for elem in dump_src if elem not in dump_dest2]
print('need add in dest db:',len(new))#580
for o in new:
print(o)
# %%
def sync_schema(dbinfo,dbinfo2,dbname):
filename=dbname+'_src.sql'
filename2=dbname+'_dest.sql'
# dump(dbinfo,dbname,filename)
# dump(dbinfo2,dbname,filename2)
dump_src=split_dump(filename)
print(len(dump_src))
dump_dest=split_dump(filename2)
print(len(dump_dest))
has,dels=dest_has(dump_dest,dump_src)
# 目标库中先执行反向sql
print('反向sql:\n','\n'.join(dels))
run_sql('\n'.join(dels),dbinfo2,dbname)
# 再增加源的sql
src_adds(dump_src,dump_dest,has)
# %%
if __name__=='__main__':
dbname='mdmaster_platform'#'mdmaster_bsgj_dev551_product_dev'
dbinfo={"host":"192.168.200.207", "port":2345, "usr":"gpadmin", "pwd":"密码"}
dbinfo2={"host":"192.168.200.73", "port":2345, "usr":"gpadmin", "pwd":"密码"}
print('begint to sync schema...')
sync_schema(dbinfo,dbinfo2,dbname)
print('begint to sync data...')
copy_data(dbinfo,dbinfo2,dbname)
# %%