-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
127 lines (96 loc) · 3.97 KB
/
main.py
File metadata and controls
127 lines (96 loc) · 3.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from typing import Dict, Tuple
import pandas as pd
import pyarrow.parquet as pq
import analysis
VMKey = Tuple[str, str]
VMDataDict = Dict[VMKey, pd.DataFrame]
interval = "2D"
def read_parquet_file(file: str, rows_to_read: int = 0) -> VMDataDict:
# Read the entire Parquet file
parquet_file = pq.ParquetFile(file)
# batches = []
data_dicts: VMDataDict = {}
total_rows = 0
for batch in parquet_file.iter_batches(batch_size=1_000_000):
batch_df = batch.to_pandas()
rows_needed = rows_to_read - total_rows
if total_rows + len(batch_df) > rows_to_read:
print(f"Starting splitting data rows: {total_rows:,}...", end="")
data_dict: VMDataDict = split_dataframe_by_host_overcommit(batch_df.iloc[:rows_needed], interval)
data_dicts = merge_dict_dataframes(data_dicts, data_dict)
# batches.append(batch_df.iloc[:rows_needed])
print(f"Done")
break
else:
print(f"Starting splitting data rows: {total_rows:,}...", end="")
data_dict: VMDataDict = split_dataframe_by_host_overcommit(batch_df, interval)
data_dicts = merge_dict_dataframes(data_dicts, data_dict)
# batches.append(batch_df)
total_rows += len(batch_df)
print(f"Done")
# df = pd.concat(batches, ignore_index=True)
return data_dicts
def merge_dict_dataframes(dict1: VMDataDict, dict2: VMDataDict) -> VMDataDict:
result = dict1.copy()
for key, df in dict2.items():
if key in result:
result[key] = pd.concat([result[key], df], ignore_index=True)
else:
result[key] = df
return result
def print_structure(data_frame: pd.DataFrame):
# Display the data
print(data_frame.columns)
def print_rows(data_frame: pd.DataFrame, amount: int, offset: int = 0):
with pd.option_context(
"display.max_columns",
None,
"display.max_rows",
None,
"display.max_colwidth",
None,
"display.width",
None,
):
print(data_frame[offset : offset + amount])
def split_dataframe_by_host_overcommit(df: pd.DataFrame, interval="1h") -> VMDataDict:
# Extract the zone prefix from vm column
df["vm_zone"] = df["vm"].str.split("/").str[0]
# Group by vm_zone and os_overcommit
grouped = df.groupby(["vm_zone", "os_overcommit"])
# Create result dictionary
result: VMDataDict = {}
for (vm_zone, os_overcommit), group_df in grouped:
# Create a copy with datetime index
temp_df = group_df.copy()
temp_df["timestamp"] = pd.to_datetime(temp_df["timestamp"], unit="s")
temp_df.set_index("timestamp", inplace=True)
# Resample to the specified interval and calculate mean
resampled = temp_df[["cpu_usage", "mem_usage"]].resample(interval).mean()
# Create new dataframe with required structure
new_df = pd.DataFrame(
{
"timestamps": resampled.index.astype(int) // 10**9, # Convert back to unix timestamp
"vcpu": resampled["cpu_usage"].values,
"mem": resampled["mem_usage"].values,
}
)
# Reset index to have numeric index
new_df.reset_index(drop=True, inplace=True)
# Store in dictionary with tuple key
result[(vm_zone, os_overcommit)] = new_df
return result
if __name__ == "__main__":
data_frames = read_parquet_file("/Users/evzimin/Downloads/data.parquet", rows_to_read=10_000_000_000)
# Access individual dataframes
for (vm_zone, os_overcommit), df_subset in data_frames.items():
print(f"AZ: {vm_zone}, OS Overcommit: {os_overcommit}")
print(df_subset.head())
print(f"total length: {len(df_subset)}")
# Start analysis
for (vm_zone, os_overcommit), df_subset in data_frames.items():
analysis.analyzer(
dataframe=df_subset,
interval=interval,
dataframe_name=f"{vm_zone}_{os_overcommit}",
)