-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpost_process.py
More file actions
66 lines (52 loc) · 2.21 KB
/
post_process.py
File metadata and controls
66 lines (52 loc) · 2.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import json
import os
from google.cloud import storage
from dotenv import load_dotenv
load_dotenv()
# get environment variables
BUCKET_NAME = os.environ.get("BUCKET_NAME")
OUTPUT_JSONL_PATH = os.environ.get("OUTPUT_JSONL_PATH")
OUTPUT_FOLDER = os.environ.get("OUTPUT_FOLDER")
def process_results():
"""
Processes the output JSONL file and saves it as individual JSON files in the specified output folder in the bucket.
"""
# initialize client and get the bucket
client = storage.Client()
bucket = client.get_bucket(BUCKET_NAME)
# get the output JSONL file in the results folder path
blobs = bucket.list_blobs(prefix=OUTPUT_JSONL_PATH)
output_blob = None
for blob in blobs:
if "predictions.jsonl" in blob.name:
output_blob = blob
break
if not output_blob:
print("Could not find the output file. Please check whether the job has finished and whether the OUTPUT_JSONL_PATH is correct.")
return
# read the output.jsonl file
print(f"Reading results from {output_blob.name}.")
content = output_blob.download_as_text()
for line in content.strip().split('\n'):
if not line:
continue
data = json.loads(line)
original_filename = data.get("key")
try:
# extract the JSON from the response
raw_text = data["response"]["candidates"][0]["content"]["parts"][0]["text"]
# parse into a real JSON object
structured_data = json.loads(raw_text)
# save as a new JSON file in the output folder in the bucket
json_filename = original_filename.replace(".txt", ".json")
new_file_name = f"{OUTPUT_FOLDER}/{json_filename}"
new_blob = bucket.blob(new_file_name)
new_blob.upload_from_string(
json.dumps(structured_data, indent=2), # indent for readability
content_type='application/json'
)
print(f"✅ Saved {new_file_name} to the bucket.")
except Exception as e:
print(f"⚠️ Failed to process {original_filename}: {e}")
if __name__ == "__main__":
process_results()