8from pymongo
import MongoClient
9from dotenv
import load_dotenv
10from pathlib
import Path
18This class has the functionality to grab a specific group of sales files
19in OTTO format from S3 and generate a JSON catalogue file.
29 s3_path: an array with path to files to generate the catalogue from.
31 raises: EmptyException if initialized empty.
40 s3_client = boto3.client(
's3')
41 obj = s3_client.get_object(Bucket=bucket, Key=key)
42 return pd.read_parquet(io.BytesIO(obj[
'Body'].read()))
47 Bucket in which results are saved.
48 Path an array with path to files to generate the catalogue from.
49 returns: a pandas dataframe
51 def getDf(self, ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path):
52 s3_client = boto3.client(
"s3", aws_access_key_id=ACCESS_ID, aws_secret_access_key=ACCESS_KEY)
53 df_pl = pd.DataFrame()
54 response = s3_client.list_objects(Bucket = RESULTS_BUCKET, Prefix=path)
55 if len(response[
"Contents"]) > 2:
57 get_last_modified =
lambda x: int(x[
'LastModified'].strftime(
'%s'))
58 last_added = [obj[
'Key']
for obj
in sorted(response[
"Contents"], key=get_last_modified)
if "/" not in obj][-1]
59 print(
"The last file added is {}".format(last_added))
60 df = self.
get_data(last_added,RESULTS_BUCKET,s3_client)
63 for file
in response[
"Contents"]:
64 if file[
"Key"].endswith(
"/"):
66 print(
"File {} is gonna generate the catalog".format(file[
"Key"]))
67 df = self.
get_data(file[
"Key"],RESULTS_BUCKET,s3_client)
70 df.fillna(
"undefined", inplace=
True)
75 returns: unique values in list df["date"].replace("2022-01-28T00:00:00.000Z", "2022-02-28T00:00:00.000Z", inplace=True)
80 value = df[(df.release_id==release) & (df.isrc_id==i)][field]
81 unique.append(value.unique()[0])
86 Bucket in which results are saved.
87 Path an array with path to files to generate the catalogue from.
88 returns: a Pandas dataframe with grouped by release ID only with only isrc/release title/track title/label/artist columns
90 def generate(self, ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path, client):
91 df_pl = self.
getDf(ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path)
92 columns_to_drop = [
'quantity',
'total_foreign',
'currency',
'date',
'type',
'service_id',
'filename',
'exchange_rate',
'territory_code',
'total_local',
'track_title',
'release_title']
93 columns_to_convert = [
"release_title",
"track_title",
"label_id",
"artists"]
94 aggregated = df_pl.groupby(
'release_id')[
"isrc_id"].apply(list).reset_index(name=
"isrc_id")
95 aggregated[
"isrc_id"] = aggregated.apply(
lambda Row: list(set(Row[
"isrc_id"])), axis=1)
96 for col
in columns_to_convert:
97 aggregated[col] = aggregated.apply(
lambda Row: self.
unique_values_from_isrc(df_pl, Row[
"release_id"],Row[
"isrc_id"], col), axis=1)
98 aggregated[
"client_id"] = client
101 {"release_id":"8024709032027",
102 "isrc_id":["US29B0747640","US29B0747642","US29B0747652","US29B0747644","US29B0747656","US29B0747646","US29B0747650","US29B0747648","US29B0748238","US29B0747654"],
103 "release_title":["L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour"],
104 "track_title":["Il Postino","Para Jobim","Soledad","Romance Del Diablo","L'Hymne A L'Amour","Waltz For Debby","Triunfal","Milonga Is Coming","Sinfonia in G minor, BWV 797","Operation Tango"],
105 "label_id":["CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ"],
106 "artists":["Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton"]}
114 Collection were catalogue is saved
115 Returns: Nothing Just update or uploading new items to catalog
119 release_found = collection.find_one({
"release_id": line[
"release_id"]})
121 print(
"UPDATING {} DATA TO MONGO".format(line[
"release_id"]))
122 release_copy = copy.deepcopy(release_found)
123 release_copy[
"isrc_id"].extend([element
for element
in line[
"isrc_id"]
if element
not in release_copy[
"isrc_id"]])
124 itemns_to_add = [line[
"isrc_id"].index(element)
for element
in line[
"isrc_id"]
if element
not in release_found[
"isrc_id"]]
125 release_found[
"isrc_id"].extend([line[
"isrc_id"][element]
for element
in itemns_to_add])
126 release_found[
"release_title"].extend([line[
"release_title"][element]
for element
in itemns_to_add])
127 release_found[
"track_title"].extend([line[
"track_title"][element]
for element
in itemns_to_add])
128 release_found[
"label_id"].extend([line[
"label_id"][element]
for element
in itemns_to_add])
129 release_found[
"artists"].extend([line[
"artists"][element]
for element
in itemns_to_add])
130 collection.update_one({
"release_id": line[
"release_id"]},{
"$set": {
"isrc_id":release_found[
"isrc_id"],
131 "release_title":release_found[
"release_title"],
132 "track_title":release_found[
"track_title"],
133 "label_id":release_found[
"label_id"],
134 "artists":release_found[
"artists"],
135 "client_id":line[
"client_id"]}})
137 print(
"UPLOADING {} DATA TO MONGO".format(line[
"release_id"]))
138 collection.insert_one(line)
142 Returns: Nothing. Makes mongo connection and applies update_data function to all dataframe
146 uri = os.environ.get(
"MONGO_GLIDER")
147 db_name = os.environ.get(
"DB")
148 collection_name = os.environ.get(
"CAT_COLLECTION")
149 client = MongoClient(uri, ssl_cert_reqs=ssl.CERT_NONE)
151 collection = db[collection_name]
152 jsonfile = df.to_json(orient=
"records")
153 df_json = json.loads(jsonfile)
161 returns: Prints the JSON version (TODO: save to file)
164 if event[
"status"] !=
'Snapshots Created':
165 return {
"status":
"Catalog could not be updated"}
166 ID = os.environ.get(
"AWS_KEY_ID")
167 KEY = os.environ.get(
"AWS_SECRET_KEY")
169 RESULTS_BUCKET = event[
"results_bucket"]
170 RESULTS_PATH = event[
"results_path"]
171 CLIENT_ID = event[
"client_id"]
174 aggregated = catalog_generator.generate(ID, KEY, RESULTS_BUCKET, RESULTS_PATH, CLIENT_ID)
176 json_df = aggregated.to_json(orient=
"records")
178 catalog_generator.upload_catalog(aggregated)
179 except Exception
as e:
180 print(sys.exc_info()[2])
181 print(traceback.format_exc())
182 return {
"status": str(e)}
184 return {
"status":
"Catalog updated"}
get_data(self, key, bucket, s3_client)
generate(self, ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path, client)
unique_values_from_isrc(self, df, release, isrc, field)
getDf(self, ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path)
update_data(self, line, collection)
lambda_handler(event, context)