Glider
Loading...
Searching...
No Matches
UploadSales.py
Go to the documentation of this file.
1import os
2import ssl
3import boto3
4import awswrangler as wr
5import multiprocessing as mp
6from multiprocessing.pool import Pool
7from itertools import repeat
8from pymongo import MongoClient, DeleteOne
9from pymongo.errors import BulkWriteError
10from MongoConnection import mongo_connection
11from dotenv import load_dotenv, dotenv_values
12load_dotenv()
13
14
15collection_name = os.environ.get("COLLECTION")
16mongo_conn = mongo_connection()
17
18
19collection = mongo_conn.mongo_conn_sales()
20
21catalogue_collection = mongo_conn.mongo_conn_catalogue()
22# releases = list(catalogue_collection.find({}, {"_id":0, "upc":1,"title":1,"label":1,"artist":1,"tracks.isrc":1, "tracks.title":1, "tracks.artist":1, "catalogId": 1}))
23
25 """ Uploads batch lines to mongodb
26
27 Args:
28 df (dataframe): Dataframe with sales matched by catalogue
29 Returns: Nothing
30 """
31 lines = df.apply(lambda x: x.to_dict(), axis=1).to_list()
32 if len(lines) > 0:
33 collection.insert_many(lines)
34
35def find_upc(upcs, df):
36 """ Makes catalogue matching using upc as key
37
38 Args:
39 upcs (list): A list which contains all unique UPCs in the current batch
40 df (dataframe): Dataframe with sales matched by catalogue
41 Returns: Nothing
42 """
43 for upc in upcs:
44 # upc_found = [element for element in releases if element['upc'] == upc]
45 # if len(upc_found) > 0:
46 upc_found = catalogue_collection.find_one({"upc":{"$regex":upc}},{"_id":0, "upc":1,"title":1,"label":1,"artist":1, "catalogId": 1})
47 ind = df[df["release_id"] == upc].index
48 if upc_found:
49 df.loc[ind, "release_title"] = upc_found["title"]
50 df.loc[ind, "label_id"] = upc_found["label"]
51 df.loc[ind, "catalogue_id"] = upc_found["catalogId"]
52 df.loc[ind, "clean"] = 1
53 df.loc[ind, "release_id"] = upc_found["upc"]
54 upload_data_mongo(df.loc[ind,:])
55 else:
56 upload_data_mongo(df.loc[ind,:])
57 # return df
58
59def find_irsc(isrc, df):
60 """ Makes catalogue matching using isrc as key
61
62 Args:
63 isrc (list): A list which contains all unique ISRCs in the current batch
64 df (dataframe): Dataframe with sales matched by catalogue
65 Returns: Nothing
66 """
67 if len(isrc) < 12:
68 aux = df[df["isrc_id"] == isrc]["release_id"].unique()
69 find_upc(aux, df)
70 else:
71 upc_found = catalogue_collection.find_one({"tracks.isrc":isrc},{"_id":0, "upc":1,"title":1,"label":1,"artist":1,"tracks.isrc":1, "tracks.title":1, "tracks.artist":1, "catalogId": 1})
72 ind = df[df["isrc_id"] == isrc].index
73 if upc_found:
74 isrc_found = [element for element in upc_found["tracks"] if element["isrc"] == isrc]
75 df.loc[ind, "release_id"] = upc_found["upc"]
76 df.loc[ind, "release_title"] = upc_found["title"]
77 df.loc[ind, "label_id"] = upc_found["label"]
78 df.loc[ind, "track_title"] = isrc_found[0]["title"]
79 df.loc[ind, "artists"] = isrc_found[0]["artist"]
80 df.loc[ind, "catalogue_id"] = upc_found["catalogId"]
81 df.loc[ind, "clean"] = 1
82 upload_data_mongo(df.loc[ind,:])
83 # return df
84
86 """ Deletes all files in the current batch from database (in case of they exist) to avoid overwriting
87
88 Args:
89 files (list): A list which contains all filenames in the current batch splitted by formats
90 Returns:
91 files_delete (list): A list with filenames in the current batch
92 """
93 files_delete = [i["file"] for i in files]
94 r = collection.delete_many({"file": {"$in": files_delete}})
95 if r.deleted_count >= 1:
96 response = f"{r.deleted_count} sales lines deleted"
97 else:
98 response = "No files deleted"
99 print(response)
100 return files_delete
101
103 """ Deletes all duplicated sales lines from database (in case of they exist) caused by parallel processing
104
105 Args:
106 Nothing
107 Returns:
108 Nothing
109 """
110 pipeline = [
111 {"$group": {"_id": {"file": "$file", "line": "$line"}, "unique_ids": {"$push": "$_id"}, "clean_ids": {"$push": "$clean"}, "count": {"$sum": 1}}},
112 {"$match": {"count": { "$gte": 2 }}}
113 ]
114 cursor = collection.aggregate(pipeline, allowDiskUse = True)
115 response = []
116 for doc in cursor:
117 response.append(doc["unique_ids"][1:])
118 to_del = [item for sublist in response for item in sublist]
119
120 flat_list = [DeleteOne({"_id":i}) for i in to_del]
121 collection.bulk_write(flat_list, ordered=False)
122
123
124def upload_main(event, s3_session):
125 """ Orchestra every function and executes find_irsc function using multiprocessing
126
127 Args:
128 event (dict): Dictionary result of ProcessFiles.py
129 s3_session: (boto3.session object): S3 session to connect to AWS bucket
130 Returns: Nothing
131 """
132 cat_match = event["cat_match"]
133 tag = event["tag"]
134 all_files = []
135 s3_filename = "s3://"+event["results_bucket"]+"/"+event["results_path"]+"/"
136 for i in event["format"]:
137 q = delete_files_mongoDB(event["format"][i]["files"])
138 for j in q:
139 j = j+".parquet"
140 all_files.append(s3_filename+j)
141 dfs = wr.s3.read_parquet(all_files, boto3_session=s3_session, chunked=500_000)
142 batch = 1
143 for df in dfs:
144 # df["quantity"] = df["quantity"].fillna(0)
145 # df["total_local"] = df["total_local"].fillna(0.0)
146 # df["full_total_foreign"] = df["full_total_foreign"].fillna(0.0)
147 # df["quantity"] = df["quantity"].astype("int")
148 # df["total_local"] = df["total_local"].astype("float")
149 # df["full_total_foreign"] = df["full_total_foreign"].astype("float")
150 # df.fillna("undefined", inplace=True)
151 # df.reset_index(drop=True, inplace=True)
152 # df["catalogue_id"] = "no_catalogue_id"
153 if str(cat_match) == "True" or str(cat_match) == "CatType.true": # Launches catalogue matching
154 print(f"{'*'*15} Catalogue Matching Starting {'*'*15}")
155 df["clean"] = 0
156 mp.freeze_support()
157 isrcs = list(df["isrc_id"].unique())
158 pool = mp.Pool(processes=6)
159 pool.starmap(find_irsc, zip(isrcs, repeat(df)))
160 pool.close()
161 pool.join()
162 else: # Just uploads sales lines
163 print(f"{'*'*15} Uploading Data to Mongo {'*'*15}")
165 print(f"Batch: {batch} finished. Lines uploaded to {collection_name}")
166 batch+=1
167 # try:
168 # delete_duplicates()
169 # except Exception:
170 # print("No files duplicated")
171 # pass
172
upload_main(event, s3_session)