-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun.py
More file actions
98 lines (81 loc) · 3.96 KB
/
run.py
File metadata and controls
98 lines (81 loc) · 3.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import logging
from operator import itemgetter
from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL
from pyro import db, tj, transformation, representation, cfg
from pyro.utils import relation_name, assemble_list, attribute_name
if __name__ == '__main__':
# load configuration
cfg.load('config-2.json')
# setup logging
logging.basicConfig(**cfg.settings['logging'])
# connect to source Database
logging.info('Connecting to the source DB: {}'.format(
cfg.settings['source_db']['database']))
source_engine = create_engine(str(URL(**cfg.settings['source_db'])))
relations, dependencies = db.get_schema(source_engine)
# get multi valued dependencies from the config
mvd = cfg.settings.get('multi valued dependencies', [])
# transform lists of attributes to sets of attributes
mvd = [{part: set(attributes) for part, attributes in dep.items()}
for dep in mvd]
dependencies.extend(mvd)
# get main attributes
dimension_attributes = [list(map(attribute_name, d['attributes']))
for d in cfg.settings['dimensions']]
measure_relation = relation_name(cfg.settings['measure'])
measure_attribute = attribute_name(cfg.settings['measure'])
# Start transformation
# build contexts
logging.info('Building dimension contexts...')
contexts = []
base_total = {measure_relation}
# build dimension contexts
for dimension in cfg.settings['dimensions']:
logging.info('Building context for dimension "{}"'.format(
dimension['name']))
# extract relation names from extended attribute names in config
base = set(map(relation_name, dimension['attributes']))
base_total |= base
# for now pick first found context
context = next(transformation.contexts(relations, base, dependencies))
contexts.append(context)
# build application context
if cfg.settings['app_context']:
logging.info('Building application context')
try:
app_context = next(transformation.contexts(relations, base_total,
dependencies))
except StopIteration:
# no combination satisfies Lossless Join property. Pick all
# relations
logging.warning('No lossless combinations were found, picking '
'whole relation set for application application '
'context')
app_context = relations
else:
logging.info('Combining all the contexts to form application context')
app_context = assemble_list(contexts, key=lambda r: r['name'])
contexts.append(app_context)
# get logical constraints
constraints = list(map(lambda d: d.get('constraint', []),
cfg.settings['dimensions']))
constraints.append([]) # no application constraint will be used
cache_file_path = cfg.settings.get('cache_file', 'cache.json')
# connect to the output DB
logging.info('Connecting to the cube DB: {}'.format(
cfg.settings['cube_db']['database']))
cube_engine = create_engine(URL(**cfg.settings['cube_db']))
table_names = []
for context, constraint, dimension in zip(contexts, constraints,
dimension_attributes + [[]]):
logging.info('Building Table of Joins for the context {}'.format(
list(map(itemgetter('name'), context))))
table_of_joins = tj.build(context, dependencies, constraint,
source_engine, cube_engine, cache_file_path)
table_names.append(table_of_joins['name'])
logging.info('The source Database has been successfully transformed to '
'OLAP representation!')
logging.info('Writing the result table to the file')
representation.create(cube_engine, table_names, dimension_attributes,
measure_attribute, cfg.settings['output_file'])