-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathLoadCTP.py
More file actions
154 lines (127 loc) · 5.61 KB
/
LoadCTP.py
File metadata and controls
154 lines (127 loc) · 5.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import os
import glob
from pathlib import Path
import pyodbc
from loguru import logger
from datetime import datetime, timedelta
import configparser
from dotenv import load_dotenv
def configure_logger():
ini_path = Path(__file__).parent / "settings.ini"
config = configparser.ConfigParser()
config.read(ini_path, encoding="utf-8")
log_path = config.get("log", "path", fallback="")
log_file = os.path.join(log_path, "CTP_load.log") if log_path else "CTP_load.log"
logger.remove()
logger.add(
log_file,
level=config.get("log", "level", fallback="INFO"),
rotation=config.get("log", "rotation", fallback="10 MB"),
retention=config.get("log", "retention", fallback="7 days"),
compression=config.get("log", "compression", fallback="zip"),
)
def load_config():
ini_path = Path(__file__).parent / "settings.ini"
config = configparser.ConfigParser()
config.read(ini_path, encoding="utf-8")
input_directory = config.get("paths", "input_directory", fallback="")
if not Path(input_directory).exists():
raise FileNotFoundError(f"Входная директория не найдена: {input_directory}")
load_dotenv()
db_config = {
"DB_SERVER": os.getenv("DB_SERVER"),
"DB_NAME": os.getenv("DB_NAME"),
"DB_USER": os.getenv("DB_USER"),
"DB_PASSWORD": os.getenv("DB_PASSWORD")
}
if not all(db_config.values()):
raise ValueError("Нет всех параметров подключения к БД в .env")
return input_directory, db_config
def main():
configure_logger()
input_directory, db = load_config()
conn_str = (
f"Driver={{ODBC Driver 11 for SQL Server}};"
f"Server={db['DB_SERVER']};"
f"Database={db['DB_NAME']};"
f"UID={db['DB_USER']};"
f"PWD={db['DB_PASSWORD']};"
)
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
today_str = datetime.now().strftime("%Y%m%d")
pattern = f"CTP{today_str}_*.0001"
# pattern = f"CTP20250908_0665.0001"
ctp_files = glob.glob(os.path.join(input_directory, pattern))
for ctp_file in ctp_files:
logger.info(f"Обработка файла: {ctp_file}")
try:
with open(ctp_file, encoding="utf-8") as f:
lines = f.readlines()
except Exception as e:
logger.error(f"Ошибка открытия {ctp_file}: {e}")
continue
file_date = None
for line in lines:
line = line.strip()
if not line:
continue
# Первая строка: FH;...
if line.startswith("FH;"):
try:
parts = line.split(";")
file_date = datetime.strptime(parts[-1], "%Y%m%d").date()
logger.info(f"Дата данных из FH: {file_date}")
except Exception as e:
logger.error(f"Ошибка разбора FH строки {line}: {e}")
continue
# Обработка строк PR
if not line.startswith("PR;"):
continue
parts = line.split(";")
if len(parts) < 5:
continue
record_type, value, interchangeID = parts[3], parts[4], parts[2]
try:
if record_type == "InterchangeFeeData":
direction = value[0]
currency = int(value[1:4])
amount = int(value[4:]) / 100
interchange_date = file_date or datetime.now().date()
# проверка на дубликат
cursor.execute("""
SELECT 1
FROM tRdbInterchange (nolock)
WHERE InterchangeID=?
""", interchangeID)
if cursor.fetchone():
logger.info(f"Существует Interchange: {interchangeID} {amount} {currency} {direction}")
continue
cursor.execute("""
INSERT INTO tRdbInterchange (InterchangeID, Type, Date, Value, Currency, Direction)
VALUES (?, ?, ?, ?, ?, ?)
""", interchangeID, record_type, interchange_date, amount, currency, direction)
logger.info(f"Добавлено Interchange: {interchangeID} {amount} {currency} {direction}")
elif record_type == "MerchTaxId":
merch_tax_id = value
cursor.execute("""
exec RdbCardTranAttrib_Insert
@AttribID = ?,
@AttribName = 'INN_TSP',
@AttribValue = ?
""", interchangeID, merch_tax_id)
logger.info(f"Добавлено MerchTaxId: {interchangeID} | {merch_tax_id}")
except Exception as e:
logger.error(f"Ошибка обработки строки {line}: {e}")
continue
# чтобы шедулер диасофта не загружать перенесем задание на следубщий день
dt = (datetime.now() + timedelta(days=1)).strftime("%Y%m%d 12:00:00.000")
# 10000000047 - LoadCTP Загрузка CTP файла
sql = f"EXEC ReplTask_UpdateTime 10000000047, 2, '{dt}'"
logger.info("Executing:", sql)
cursor.execute(sql)
conn.commit()
cursor.close()
conn.close()
if __name__ == "__main__":
main()