Glider
Loading...
Searching...
No Matches
generator.py
Go to the documentation of this file.
1import os
2import io
3import json
4import copy
5import boto3
6import pandas as pd
7import ssl
8from pymongo import MongoClient
9from dotenv import load_dotenv
10from pathlib import Path
11import sys
12import traceback
13
14# dotenv_path = Path("src/importer/.env")
15load_dotenv()
16
17"""
18This class has the functionality to grab a specific group of sales files
19in OTTO format from S3 and generate a JSON catalogue file.
20"""
21s3_file_obj = None
22class EmptyException(Exception):
23 pass
24
25class Generator:
26
27 """
28 args:
29 s3_path: an array with path to files to generate the catalogue from.
30 Returns: Nothing.
31 raises: EmptyException if initialized empty.
32 """
33 # def __init__(self, s3_path):
34 # self.s3_path = s3_path
35 # if not isinstance(s3_path, list):
36 # raise EmptyException('No s3 path passed/list required')
37
38 def get_data(self, key, bucket, s3_client):
39 if s3_client is None:
40 s3_client = boto3.client('s3')
41 obj = s3_client.get_object(Bucket=bucket, Key=key)
42 return pd.read_parquet(io.BytesIO(obj['Body'].read()))
43
44 """
45 args:
46 AWS acccesses.
47 Bucket in which results are saved.
48 Path an array with path to files to generate the catalogue from.
49 returns: a pandas dataframe
50 """
51 def getDf(self, ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path):
52 s3_client = boto3.client("s3", aws_access_key_id=ACCESS_ID, aws_secret_access_key=ACCESS_KEY)
53 df_pl = pd.DataFrame()
54 response = s3_client.list_objects(Bucket = RESULTS_BUCKET, Prefix=path)
55 if len(response["Contents"]) > 2:
56 # today = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
57 get_last_modified = lambda x: int(x['LastModified'].strftime('%s'))
58 last_added = [obj['Key'] for obj in sorted(response["Contents"], key=get_last_modified) if "/" not in obj][-1]
59 print("The last file added is {}".format(last_added))
60 df = self.get_data(last_added,RESULTS_BUCKET,s3_client)
61 # return df
62 else:
63 for file in response["Contents"]:
64 if file["Key"].endswith("/"):
65 continue
66 print("File {} is gonna generate the catalog".format(file["Key"]))
67 df = self.get_data(file["Key"],RESULTS_BUCKET,s3_client)
68 # print(df)
69 # df_pl = pd.concat([df,df_pl])
70 df.fillna("undefined", inplace=True)
71 return df
72 """
73 args:
74 list of values
75 returns: unique values in list df["date"].replace("2022-01-28T00:00:00.000Z", "2022-02-28T00:00:00.000Z", inplace=True)
76 """
77 def unique_values_from_isrc(self, df, release, isrc, field):
78 unique = []
79 for i in isrc:
80 value = df[(df.release_id==release) & (df.isrc_id==i)][field]
81 unique.append(value.unique()[0])
82 return unique
83 """
84 args:
85 AWS acccesses.
86 Bucket in which results are saved.
87 Path an array with path to files to generate the catalogue from.
88 returns: a Pandas dataframe with grouped by release ID only with only isrc/release title/track title/label/artist columns
89 """
90 def generate(self, ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path, client):
91 df_pl = self.getDf(ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path)
92 columns_to_drop = ['quantity', 'total_foreign', 'currency', 'date', 'type', 'service_id', 'filename', 'exchange_rate', 'territory_code', 'total_local', 'track_title', 'release_title']
93 columns_to_convert = ["release_title", "track_title", "label_id", "artists"]
94 aggregated = df_pl.groupby('release_id')["isrc_id"].apply(list).reset_index(name="isrc_id")
95 aggregated["isrc_id"] = aggregated.apply(lambda Row: list(set(Row["isrc_id"])), axis=1)
96 for col in columns_to_convert:
97 aggregated[col] = aggregated.apply(lambda Row: self.unique_values_from_isrc(df_pl, Row["release_id"],Row["isrc_id"], col), axis=1)
98 aggregated["client_id"] = client
99 #generate returns something simil1ar to:
100 """8052405144171
101 {"release_id":"8024709032027",
102 "isrc_id":["US29B0747640","US29B0747642","US29B0747652","US29B0747644","US29B0747656","US29B0747646","US29B0747650","US29B0747648","US29B0748238","US29B0747654"],
103 "release_title":["L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour"],
104 "track_title":["Il Postino","Para Jobim","Soledad","Romance Del Diablo","L'Hymne A L'Amour","Waltz For Debby","Triunfal","Milonga Is Coming","Sinfonia in G minor, BWV 797","Operation Tango"],
105 "label_id":["CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ"],
106 "artists":["Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton"]}
107 """
108 return aggregated
109
110
111 """
112 args:
113 json line to update.
114 Collection were catalogue is saved
115 Returns: Nothing Just update or uploading new items to catalog
116 """
117
118 def update_data(self, line, collection):
119 release_found = collection.find_one({"release_id": line["release_id"]})
120 if release_found:
121 print("UPDATING {} DATA TO MONGO".format(line["release_id"]))
122 release_copy = copy.deepcopy(release_found)
123 release_copy["isrc_id"].extend([element for element in line["isrc_id"] if element not in release_copy["isrc_id"]])
124 itemns_to_add = [line["isrc_id"].index(element) for element in line["isrc_id"] if element not in release_found["isrc_id"]]
125 release_found["isrc_id"].extend([line["isrc_id"][element] for element in itemns_to_add])
126 release_found["release_title"].extend([line["release_title"][element] for element in itemns_to_add])
127 release_found["track_title"].extend([line["track_title"][element] for element in itemns_to_add])
128 release_found["label_id"].extend([line["label_id"][element] for element in itemns_to_add])
129 release_found["artists"].extend([line["artists"][element] for element in itemns_to_add])
130 collection.update_one({"release_id": line["release_id"]},{"$set": {"isrc_id":release_found["isrc_id"],
131 "release_title":release_found["release_title"],
132 "track_title":release_found["track_title"],
133 "label_id":release_found["label_id"],
134 "artists":release_found["artists"],
135 "client_id":line["client_id"]}})
136 else:
137 print("UPLOADING {} DATA TO MONGO".format(line["release_id"]))
138 collection.insert_one(line)
139 """
140 args:
141 pandas dataframe
142 Returns: Nothing. Makes mongo connection and applies update_data function to all dataframe
143 """
144
145 def upload_catalog(self, df):
146 uri = os.environ.get("MONGO_GLIDER")
147 db_name = os.environ.get("DB")
148 collection_name = os.environ.get("CAT_COLLECTION")
149 client = MongoClient(uri, ssl_cert_reqs=ssl.CERT_NONE)
150 db = client[db_name]
151 collection = db[collection_name]
152 jsonfile = df.to_json(orient="records")
153 df_json = json.loads(jsonfile)
154 # df_json = df.apply(lambda x: x.to_dict(), axis=1).to_list()
155 # print(df_json)
156 [self.update_data(x, collection) for x in df_json]
157
158"""
159 args:
160 lambda default args
161 returns: Prints the JSON version (TODO: save to file)
162"""
163def lambda_handler(event, context):
164 if event["status"] != 'Snapshots Created':
165 return {"status": "Catalog could not be updated"}
166 ID = os.environ.get("AWS_KEY_ID")
167 KEY = os.environ.get("AWS_SECRET_KEY")
168 # RESULTS_BUCKET = os.environ.get("RESULTS_BUCKET")
169 RESULTS_BUCKET = event["results_bucket"]
170 RESULTS_PATH = event["results_path"]
171 CLIENT_ID = event["client_id"]
172 try:
173 catalog_generator = Generator()
174 aggregated = catalog_generator.generate(ID, KEY, RESULTS_BUCKET, RESULTS_PATH, CLIENT_ID)
175 print(aggregated)
176 json_df = aggregated.to_json(orient="records")
177 print(json_df)
178 catalog_generator.upload_catalog(aggregated)
179 except Exception as e:
180 print(sys.exc_info()[2])
181 print(traceback.format_exc())
182 return {"status": str(e)}
183 else:
184 return {"status": "Catalog updated"}
get_data(self, key, bucket, s3_client)
Definition generator.py:38
generate(self, ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path, client)
Definition generator.py:90
unique_values_from_isrc(self, df, release, isrc, field)
Definition generator.py:77
getDf(self, ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path)
Definition generator.py:51
update_data(self, line, collection)
Definition generator.py:118
lambda_handler(event, context)
Definition generator.py:163