1919#################################
2020
2121import pandas as pd
22- from os import listdir , getenv
22+ from os import getenv
2323from exchange_calendars import get_calendar
2424from dotenv import load_dotenv
2525from arcticdb import Arctic
2626import warnings
2727
2828warnings .filterwarnings ('ignore' )
29- dotenv_path = '/root/.env'
30- load_dotenv (dotenv_path )
3129
32- endpoint = getenv ("ENDPOINT" )
33- db = getenv ("DB" )
34- access_key = getenv ("ACCESS_KEY" )
35- secret_key = getenv ("SECRET_KEY" )
36-
37- ac = Arctic (f's3s://{ endpoint } :{ db } ?access={ access_key } &secret={ secret_key } ' )
30+ def _get_arctic_connection ():
31+ dotenv_path = '/root/.env'
32+ load_dotenv (dotenv_path )
33+ endpoint = getenv ("ENDPOINT" )
34+ db = getenv ("DB" )
35+ access_key = getenv ("ACCESS_KEY" )
36+ secret_key = getenv ("SECRET_KEY" )
37+ if not all ([endpoint , db , access_key , secret_key ]):
38+ raise ValueError ("Faltan variables de entorno. Revisa tu archivo .env (ENDPOINT, DB, ACCESS_KEY, SECRET_KEY)" )
39+ return Arctic (f's3s://{ endpoint } :{ db } ?access={ access_key } &secret={ secret_key } ' )
3840
3941
4042def bse_data (environ ,
@@ -51,73 +53,75 @@ def bse_data(environ,
5153
5254 symbols = ['XLE.US' ,'XLF.US' ,'XLI.US' ,'XLK.US' ,'XLP.US' ,'XLU.US' ,'XLV.US' ,'XLY.US' ,'XLB.US' ,'XLC.US' ,'ITA.US' ]
5355 if not symbols :
54-
5556 raise ValueError ("No se han encontrado TICKERS en el QA DATALAKE" )
56-
57- divs_splits = {"divs" : pd .DataFrame (columns = ["sid" ,"amount" ,"ex_date" ,"record_date" ,"declared_date" ,"pay_date" ,]),"splits" : pd .DataFrame (columns = ["sid" , "ratio" , "effective_date" ]),}
58-
57+
58+ divs_splits = {
59+ "divs" : pd .DataFrame (columns = ["sid" ,"amount" ,"ex_date" ,"record_date" ,"declared_date" ,"pay_date" ]),
60+ "splits" : pd .DataFrame (columns = ["sid" , "ratio" , "effective_date" ]),
61+ }
62+
5963 metadata = pd .DataFrame (columns = ('start_date' ,'end_date' ,'auto_close_date' ,'symbol' ,'exchange' ))
6064 sessions = calendar .sessions_in_range (start_session , end_session )
6165
62- daily_bar_writer .write (process_stocks (symbols , sessions , metadata , divs_splits ))
66+ ac = _get_arctic_connection ()
67+ daily_bar_writer .write (process_stocks (ac , symbols , sessions , metadata , divs_splits ))
6368
6469 metadata ["exchange" ] = "QAX"
6570 exchange = {'exchange' : 'QAX' , 'canonical_name' : 'QUANTARMY BACKTEST' , 'country_code' : 'US' }
66- exchange_df = pd .DataFrame (exchange , index = [0 ])
67-
68-
71+ exchange_df = pd .DataFrame (exchange , index = [0 ])
6972
7073 divs_splits ["divs" ]["sid" ] = divs_splits ["divs" ]["sid" ].astype (int )
7174 divs_splits ["splits" ]["sid" ] = divs_splits ["splits" ]["sid" ].astype (int )
72- daily_bar_writer .write (process_stocks (symbols , sessions , metadata , divs_splits ))
7375 asset_db_writer .write (equities = metadata , exchanges = exchange_df )
7476 adjustment_writer .write (splits = divs_splits ["splits" ], dividends = divs_splits ["divs" ])
7577
76- def process_stocks (symbols , sessions , metadata , divs_splits ):
78+ def process_stocks (ac , symbols , sessions , metadata , divs_splits ):
7779 my_cal = get_calendar ('NYSE' )
7880 prices = ac .get_library ('prices.etfs.us.stable' )
81+ divs_lib = ac .get_library ('divs.etfs.us.stable' )
82+ splits_lib = ac .get_library ('splits.etfs.us.stable' )
83+
7984 for sid , symbol in enumerate (symbols ):
80- print ('[QA DATALAKE CARNIVORE ] ||| Loading {}...' .format (symbol ))
85+ print ('[QA DATALAKE] Loading {}...' .format (symbol ))
8186 df = prices .read (symbol ).data
8287 df = df ['2010' :]
8388 start_date = df .index [0 ]
84- end_date = df .index [- 1 ]
85- sessions = my_cal .sessions_in_range (start_date , end_date )
86- df = df [df .index .isin (sessions )]
87- df = df .reindex (sessions .tz_localize (None ))[start_date :end_date ] #tz_localize(None)
88- df . fillna ( method = 'ffill' , inplace = True )
89- df .dropna (inplace = True )
89+ end_date = df .index [- 1 ]
90+ sym_sessions = my_cal .sessions_in_range (start_date , end_date )
91+ df = df [df .index .isin (sym_sessions )]
92+ df = df .reindex (sym_sessions .tz_localize (None ))[start_date :end_date ]
93+ df = df . ffill ( )
94+ df .dropna (inplace = True )
9095 ac_date = end_date + pd .Timedelta (days = 1 )
9196 metadata .loc [sid ] = start_date , end_date , ac_date , symbol , 'QAX'
9297
93- if ac .get_library ('divs.etfs.us.stable' ).has_symbol (symbol ):
94- data_divs = ac ['divs.etfs.us.stable' ].read (symbol ).data
95- data_divs = data_divs .reset_index ()
96- div = pd .DataFrame ()
97- div ['ex_date' ] = data_divs ['date' ]
98- div ['record_date' ] = data_divs ['recordDate' ]
99- div ['declared_date' ] = data_divs ['declarationDate' ]
100- div ['pay_date' ] = data_divs ['paymentDate' ]
101- div ['amount' ] = data_divs ['value' ]
102- div ['sid' ] = sid
103-
98+ if divs_lib .has_symbol (symbol ):
99+ data_divs = divs_lib .read (symbol ).data .reset_index ()
100+ div = pd .DataFrame ({
101+ 'ex_date' : data_divs ['date' ],
102+ 'record_date' : data_divs ['recordDate' ],
103+ 'declared_date' : data_divs ['declarationDate' ],
104+ 'pay_date' : data_divs ['paymentDate' ],
105+ 'amount' : data_divs ['value' ],
106+ 'sid' : sid ,
107+ })
104108 divs = divs_splits ['divs' ]
105109 ind = pd .Index (range (divs .shape [0 ], divs .shape [0 ] + div .shape [0 ]))
106110 div .set_index (ind , inplace = True )
107111 divs_splits ["divs" ] = pd .concat ([divs , div ], axis = 0 )
108- print ('[QA DATALAKE CARNIVORE ] DIVS INFO ADDED OVER' , symbol )
109-
110- if ac . get_library ( 'splits.etfs.us.stable' ) .has_symbol (symbol ):
111- data_splits = ac [ 'splits.etfs.us.stable' ]. read (symbol ).data
112- data_splits = data_splits . reset_index ()
113- split = pd . DataFrame ()
114- split [ 'effective_date' ] = data_splits ['date' ]
115- split [ 'ratio' ] = data_splits [ 'split' ]
116- split [ 'sid' ] = sid
112+ print ('[QA DATALAKE] DIVS loaded for' , symbol )
113+
114+ if splits_lib .has_symbol (symbol ):
115+ data_splits = splits_lib . read (symbol ).data . reset_index ()
116+ split = pd . DataFrame ({
117+ 'effective_date' : data_splits [ 'date' ],
118+ 'ratio' : data_splits ['split' ],
119+ 'sid' : sid ,
120+ })
117121 splits = divs_splits ["splits" ]
118- index = pd .Index (range (splits .shape [0 ], splits .shape [0 ] + split .shape [0 ]))
122+ index = pd .Index (range (splits .shape [0 ], splits .shape [0 ] + split .shape [0 ]))
119123 split .set_index (index , inplace = True )
120124 divs_splits ["splits" ] = pd .concat ([splits , split ], axis = 0 )
121- print ('[QA DATALAKE CARNIVORE] SPLIT INFO ADDED OVER' , symbol )
122-
125+ print ('[QA DATALAKE] SPLITS loaded for' , symbol )
126+
123127 yield sid , df
0 commit comments