Glider
Loading...
Searching...
No Matches
CatalogueGenerator.py
Go to the documentation of this file.
1import os
2import io
3import json
4import copy
5import boto3
6import pandas as pd
7import ssl
8from pymongo import MongoClient
9from dotenv import load_dotenv
10from pathlib import Path
11import sys
12import traceback
13
14# dotenv_path = Path("src/importer/.env")
15load_dotenv()
16
17"""
18This class has the functionality to grab a specific group of sales files
19in OTTO format from S3 and generate a JSON catalogue file.
20"""
21s3_file_obj = None
22class EmptyException(Exception):
23 pass
24
25class Generator:
26
27 """
28 args:
29 s3_path: an array with path to files to generate the catalogue from.
30 Returns: Nothing.
31 raises: EmptyException if initialized empty.
32 """
33 # def __init__(self, s3_path):
34 # self.s3_path = s3_path
35 # if not isinstance(s3_path, list):
36 # raise EmptyException('No s3 path passed/list required')
37
38 def get_data(self, key, bucket, s3_client):
39 """
40 args:
41 AWS acccesses.
42 Bucket in which results are saved.
43 Path an array with path to files to generate the catalogue from.
44 returns: a pandas dataframe
45 """
46 if s3_client is None:
47 s3_client = boto3.client('s3')
48 obj = s3_client.get_object(Bucket=bucket, Key=key)
49 return pd.read_parquet(io.BytesIO(obj['Body'].read()))
50
51 def getDf(self, ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path):
52 """
53 args:
54 list of values
55 returns: unique values in list df["date"].replace("2022-01-28T00:00:00.000Z", "2022-02-28T00:00:00.000Z", inplace=True)
56 """
57 s3_client = boto3.client("s3", aws_access_key_id=ACCESS_ID, aws_secret_access_key=ACCESS_KEY)
58 df_pl = pd.DataFrame()
59 response = s3_client.list_objects(Bucket = RESULTS_BUCKET, Prefix=path)
60 if len(response["Contents"]) > 2:
61 # today = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
62 get_last_modified = lambda x: int(x['LastModified'].strftime('%s'))
63 last_added = [obj['Key'] for obj in sorted(response["Contents"], key=get_last_modified) if "/" not in obj][-1]
64 print("The last file added is {}".format(last_added))
65 df = self.get_data(last_added,RESULTS_BUCKET,s3_client)
66 # return df
67 else:
68 for file in response["Contents"]:
69 if file["Key"].endswith("/"):
70 continue
71 print("File {} is gonna generate the catalog".format(file["Key"]))
72 df = self.get_data(file["Key"],RESULTS_BUCKET,s3_client)
73 # print(df)
74 # df_pl = pd.concat([df,df_pl])
75 df.fillna("undefined", inplace=True)
76 return df
77
78 def unique_values_from_isrc(self, df, release, isrc, field):
79 """
80 args:
81 AWS acccesses.
82 Bucket in which results are saved.
83 Path an array with path to files to generate the catalogue from.
84 returns: a Pandas dataframe with grouped by release ID only with only isrc/release title/track title/label/artist columns
85 """
86 unique = []
87 for i in isrc:
88 value = df[(df.release_id==release) & (df.isrc_id==i)][field]
89 unique.append(value.unique()[0])
90 return unique
91
92 def generate(self, ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path, client):
93 """
94 args:
95 json line to update.
96 Collection were catalogue is saved
97 Returns: Nothing Just update or uploading new items to catalog
98 """
99 df_pl = self.getDf(ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path)
100 columns_to_drop = ['quantity', 'total_foreign', 'currency', 'date', 'type', 'service_id', 'filename', 'exchange_rate', 'territory_code', 'total_local', 'track_title', 'release_title']
101 columns_to_convert = ["release_title", "track_title", "label_id", "artists"]
102 aggregated = df_pl.groupby('release_id')["isrc_id"].apply(list).reset_index(name="isrc_id")
103 aggregated["isrc_id"] = aggregated.apply(lambda Row: list(set(Row["isrc_id"])), axis=1)
104 for col in columns_to_convert:
105 aggregated[col] = aggregated.apply(lambda Row: self.unique_values_from_isrc(df_pl, Row["release_id"],Row["isrc_id"], col), axis=1)
106 aggregated["client_id"] = client
107 #generate returns something simil1ar to:
108 """8052405144171
109 {"release_id":"8024709032027",
110 "isrc_id":["US29B0747640","US29B0747642","US29B0747652","US29B0747644","US29B0747656","US29B0747646","US29B0747650","US29B0747648","US29B0748238","US29B0747654"],
111 "release_title":["L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour"],
112 "track_title":["Il Postino","Para Jobim","Soledad","Romance Del Diablo","L'Hymne A L'Amour","Waltz For Debby","Triunfal","Milonga Is Coming","Sinfonia in G minor, BWV 797","Operation Tango"],
113 "label_id":["CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ"],
114 "artists":["Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton"]}
115 """
116 return aggregated
117
118
119
120 def update_data(self, line, collection):
121 """
122 args:
123 pandas dataframe
124 Returns: Nothing. Makes mongo connection and applies update_data function to all dataframe
125 """
126 release_found = collection.find_one({"release_id": line["release_id"]})
127 if release_found:
128 print("UPDATING {} DATA TO MONGO".format(line["release_id"]))
129 release_copy = copy.deepcopy(release_found)
130 release_copy["isrc_id"].extend([element for element in line["isrc_id"] if element not in release_copy["isrc_id"]])
131 itemns_to_add = [line["isrc_id"].index(element) for element in line["isrc_id"] if element not in release_found["isrc_id"]]
132 release_found["isrc_id"].extend([line["isrc_id"][element] for element in itemns_to_add])
133 release_found["release_title"].extend([line["release_title"][element] for element in itemns_to_add])
134 release_found["track_title"].extend([line["track_title"][element] for element in itemns_to_add])
135 release_found["label_id"].extend([line["label_id"][element] for element in itemns_to_add])
136 release_found["artists"].extend([line["artists"][element] for element in itemns_to_add])
137 collection.update_one({"release_id": line["release_id"]},{"$set": {"isrc_id":release_found["isrc_id"],
138 "release_title":release_found["release_title"],
139 "track_title":release_found["track_title"],
140 "label_id":release_found["label_id"],
141 "artists":release_found["artists"],
142 "client_id":line["client_id"]}})
143 else:
144 print("UPLOADING {} DATA TO MONGO".format(line["release_id"]))
145 collection.insert_one(line)
146
147 def upload_catalog(self, df):
148 """
149 args:
150 lambda default args
151 returns: Prints the JSON version (TODO: save to file)
152 """
153 uri = os.environ.get("MONGO_GLIDER")
154 db_name = os.environ.get("DB")
155 collection_name = os.environ.get("CAT_COLLECTION")
156 try:
157 client = MongoClient(uri, ssl_cert_reqs=ssl.CERT_NONE)
158 except:
159 client = MongoClient(uri)
160 db = client[db_name]
161 collection = db[collection_name]
162 jsonfile = df.to_json(orient="records")
163 df_json = json.loads(jsonfile)
164 # df_json = df.apply(lambda x: x.to_dict(), axis=1).to_list()
165 # print(df_json)
166 [self.update_data(x, collection) for x in df_json]
167
168def generate_catalogue(event, context):
169 """
170 args:
171 event (dict): Is a Dictionary with all client and sales information
172 returns: Status of procedure
173 """
174 if event["status"] != 'Snapshots Created':
175 return {"status": "Catalog could not be updated"}
176 ID = os.environ.get("AWS_KEY_ID")
177 KEY = os.environ.get("AWS_SECRET_KEY")
178 # RESULTS_BUCKET = os.environ.get("RESULTS_BUCKET")
179 RESULTS_BUCKET = event["results_bucket"]
180 RESULTS_PATH = event["results_path"]
181 CLIENT_ID = event["client_id"]
182 try:
183 catalog_generator = Generator()
184 aggregated = catalog_generator.generate(ID, KEY, RESULTS_BUCKET, RESULTS_PATH, CLIENT_ID)
185 print(aggregated)
186 json_df = aggregated.to_json(orient="records")
187 print(json_df)
188 catalog_generator.upload_catalog(aggregated)
189 except Exception as e:
190 print(sys.exc_info()[2])
191 print(traceback.format_exc())
192 return {"status": str(e)}
193 else:
194 return {"status": "Catalog updated"}
generate(self, ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path, client)
unique_values_from_isrc(self, df, release, isrc, field)
getDf(self, ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path)