8from pymongo
import MongoClient
9from dotenv
import load_dotenv
10from pathlib
import Path
18This class has the functionality to grab a specific group of sales files
19in OTTO format from S3 and generate a JSON catalogue file.
29 s3_path: an array with path to files to generate the catalogue from.
31 raises: EmptyException if initialized empty.
42 Bucket in which results are saved.
43 Path an array with path to files to generate the catalogue from.
44 returns: a pandas dataframe
47 s3_client = boto3.client(
's3')
48 obj = s3_client.get_object(Bucket=bucket, Key=key)
49 return pd.read_parquet(io.BytesIO(obj[
'Body'].read()))
51 def getDf(self, ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path):
55 returns: unique values in list df["date"].replace("2022-01-28T00:00:00.000Z", "2022-02-28T00:00:00.000Z", inplace=True)
57 s3_client = boto3.client(
"s3", aws_access_key_id=ACCESS_ID, aws_secret_access_key=ACCESS_KEY)
58 df_pl = pd.DataFrame()
59 response = s3_client.list_objects(Bucket = RESULTS_BUCKET, Prefix=path)
60 if len(response[
"Contents"]) > 2:
62 get_last_modified =
lambda x: int(x[
'LastModified'].strftime(
'%s'))
63 last_added = [obj[
'Key']
for obj
in sorted(response[
"Contents"], key=get_last_modified)
if "/" not in obj][-1]
64 print(
"The last file added is {}".format(last_added))
65 df = self.
get_data(last_added,RESULTS_BUCKET,s3_client)
68 for file
in response[
"Contents"]:
69 if file[
"Key"].endswith(
"/"):
71 print(
"File {} is gonna generate the catalog".format(file[
"Key"]))
72 df = self.
get_data(file[
"Key"],RESULTS_BUCKET,s3_client)
75 df.fillna(
"undefined", inplace=
True)
82 Bucket in which results are saved.
83 Path an array with path to files to generate the catalogue from.
84 returns: a Pandas dataframe with grouped by release ID only with only isrc/release title/track title/label/artist columns
88 value = df[(df.release_id==release) & (df.isrc_id==i)][field]
89 unique.append(value.unique()[0])
92 def generate(self, ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path, client):
96 Collection were catalogue is saved
97 Returns: Nothing Just update or uploading new items to catalog
99 df_pl = self.
getDf(ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path)
100 columns_to_drop = [
'quantity',
'total_foreign',
'currency',
'date',
'type',
'service_id',
'filename',
'exchange_rate',
'territory_code',
'total_local',
'track_title',
'release_title']
101 columns_to_convert = [
"release_title",
"track_title",
"label_id",
"artists"]
102 aggregated = df_pl.groupby(
'release_id')[
"isrc_id"].apply(list).reset_index(name=
"isrc_id")
103 aggregated[
"isrc_id"] = aggregated.apply(
lambda Row: list(set(Row[
"isrc_id"])), axis=1)
104 for col
in columns_to_convert:
105 aggregated[col] = aggregated.apply(
lambda Row: self.
unique_values_from_isrc(df_pl, Row[
"release_id"],Row[
"isrc_id"], col), axis=1)
106 aggregated[
"client_id"] = client
109 {"release_id":"8024709032027",
110 "isrc_id":["US29B0747640","US29B0747642","US29B0747652","US29B0747644","US29B0747656","US29B0747646","US29B0747650","US29B0747648","US29B0748238","US29B0747654"],
111 "release_title":["L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour","L'Hymne A L'Amour"],
112 "track_title":["Il Postino","Para Jobim","Soledad","Romance Del Diablo","L'Hymne A L'Amour","Waltz For Debby","Triunfal","Milonga Is Coming","Sinfonia in G minor, BWV 797","Operation Tango"],
113 "label_id":["CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ","CAM JAZZ"],
114 "artists":["Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton","Richard Galliano featuring Gary Burton"]}
124 Returns: Nothing. Makes mongo connection and applies update_data function to all dataframe
126 release_found = collection.find_one({
"release_id": line[
"release_id"]})
128 print(
"UPDATING {} DATA TO MONGO".format(line[
"release_id"]))
129 release_copy = copy.deepcopy(release_found)
130 release_copy[
"isrc_id"].extend([element
for element
in line[
"isrc_id"]
if element
not in release_copy[
"isrc_id"]])
131 itemns_to_add = [line[
"isrc_id"].index(element)
for element
in line[
"isrc_id"]
if element
not in release_found[
"isrc_id"]]
132 release_found[
"isrc_id"].extend([line[
"isrc_id"][element]
for element
in itemns_to_add])
133 release_found[
"release_title"].extend([line[
"release_title"][element]
for element
in itemns_to_add])
134 release_found[
"track_title"].extend([line[
"track_title"][element]
for element
in itemns_to_add])
135 release_found[
"label_id"].extend([line[
"label_id"][element]
for element
in itemns_to_add])
136 release_found[
"artists"].extend([line[
"artists"][element]
for element
in itemns_to_add])
137 collection.update_one({
"release_id": line[
"release_id"]},{
"$set": {
"isrc_id":release_found[
"isrc_id"],
138 "release_title":release_found[
"release_title"],
139 "track_title":release_found[
"track_title"],
140 "label_id":release_found[
"label_id"],
141 "artists":release_found[
"artists"],
142 "client_id":line[
"client_id"]}})
144 print(
"UPLOADING {} DATA TO MONGO".format(line[
"release_id"]))
145 collection.insert_one(line)
151 returns: Prints the JSON version (TODO: save to file)
153 uri = os.environ.get(
"MONGO_GLIDER")
154 db_name = os.environ.get(
"DB")
155 collection_name = os.environ.get(
"CAT_COLLECTION")
157 client = MongoClient(uri, ssl_cert_reqs=ssl.CERT_NONE)
159 client = MongoClient(uri)
161 collection = db[collection_name]
162 jsonfile = df.to_json(orient=
"records")
163 df_json = json.loads(jsonfile)
171 event (dict): Is a Dictionary with all client and sales information
172 returns: Status of procedure
174 if event[
"status"] !=
'Snapshots Created':
175 return {
"status":
"Catalog could not be updated"}
176 ID = os.environ.get(
"AWS_KEY_ID")
177 KEY = os.environ.get(
"AWS_SECRET_KEY")
179 RESULTS_BUCKET = event[
"results_bucket"]
180 RESULTS_PATH = event[
"results_path"]
181 CLIENT_ID = event[
"client_id"]
184 aggregated = catalog_generator.generate(ID, KEY, RESULTS_BUCKET, RESULTS_PATH, CLIENT_ID)
186 json_df = aggregated.to_json(orient=
"records")
188 catalog_generator.upload_catalog(aggregated)
189 except Exception
as e:
190 print(sys.exc_info()[2])
191 print(traceback.format_exc())
192 return {
"status": str(e)}
194 return {
"status":
"Catalog updated"}
generate(self, ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path, client)
unique_values_from_isrc(self, df, release, isrc, field)
getDf(self, ACCESS_ID, ACCESS_KEY, RESULTS_BUCKET, path)
update_data(self, line, collection)
get_data(self, key, bucket, s3_client)
generate_catalogue(event, context)