4import awswrangler
as wr
5import multiprocessing
as mp
6from multiprocessing.pool
import Pool
7from itertools
import repeat
8from pymongo
import MongoClient, DeleteOne
9from pymongo.errors
import BulkWriteError
10from MongoConnection
import mongo_connection
11from dotenv
import load_dotenv, dotenv_values
15collection_name = os.environ.get(
"COLLECTION")
19collection = mongo_conn.mongo_conn_sales()
21catalogue_collection = mongo_conn.mongo_conn_catalogue()
25 """ Uploads batch lines to mongodb
28 df (dataframe): Dataframe with sales matched by catalogue
31 lines = df.apply(
lambda x: x.to_dict(), axis=1).to_list()
33 collection.insert_many(lines)
36 """ Makes catalogue matching using upc as key
39 upcs (list): A list which contains all unique UPCs in the current batch
40 df (dataframe): Dataframe with sales matched by catalogue
46 upc_found = catalogue_collection.find_one({
"upc":{
"$regex":upc}},{
"_id":0,
"upc":1,
"title":1,
"label":1,
"artist":1,
"catalogId": 1})
47 ind = df[df[
"release_id"] == upc].index
49 df.loc[ind,
"release_title"] = upc_found[
"title"]
50 df.loc[ind,
"label_id"] = upc_found[
"label"]
51 df.loc[ind,
"catalogue_id"] = upc_found[
"catalogId"]
52 df.loc[ind,
"clean"] = 1
53 df.loc[ind,
"release_id"] = upc_found[
"upc"]
60 """ Makes catalogue matching using isrc as key
63 isrc (list): A list which contains all unique ISRCs in the current batch
64 df (dataframe): Dataframe with sales matched by catalogue
68 aux = df[df[
"isrc_id"] == isrc][
"release_id"].unique()
71 upc_found = catalogue_collection.find_one({
"tracks.isrc":isrc},{
"_id":0,
"upc":1,
"title":1,
"label":1,
"artist":1,
"tracks.isrc":1,
"tracks.title":1,
"tracks.artist":1,
"catalogId": 1})
72 ind = df[df[
"isrc_id"] == isrc].index
74 isrc_found = [element
for element
in upc_found[
"tracks"]
if element[
"isrc"] == isrc]
75 df.loc[ind,
"release_id"] = upc_found[
"upc"]
76 df.loc[ind,
"release_title"] = upc_found[
"title"]
77 df.loc[ind,
"label_id"] = upc_found[
"label"]
78 df.loc[ind,
"track_title"] = isrc_found[0][
"title"]
79 df.loc[ind,
"artists"] = isrc_found[0][
"artist"]
80 df.loc[ind,
"catalogue_id"] = upc_found[
"catalogId"]
81 df.loc[ind,
"clean"] = 1
86 """ Deletes all files in the current batch from database (in case of they exist) to avoid overwriting
89 files (list): A list which contains all filenames in the current batch splitted by formats
91 files_delete (list): A list with filenames in the current batch
93 files_delete = [i[
"file"]
for i
in files]
94 r = collection.delete_many({
"file": {
"$in": files_delete}})
95 if r.deleted_count >= 1:
96 response = f
"{r.deleted_count} sales lines deleted"
98 response =
"No files deleted"
103 """ Deletes all duplicated sales lines from database (in case of they exist) caused by parallel processing
111 {
"$group": {
"_id": {
"file":
"$file",
"line":
"$line"},
"unique_ids": {
"$push":
"$_id"},
"clean_ids": {
"$push":
"$clean"},
"count": {
"$sum": 1}}},
112 {
"$match": {
"count": {
"$gte": 2 }}}
114 cursor = collection.aggregate(pipeline, allowDiskUse =
True)
117 response.append(doc[
"unique_ids"][1:])
118 to_del = [item
for sublist
in response
for item
in sublist]
120 flat_list = [DeleteOne({
"_id":i})
for i
in to_del]
121 collection.bulk_write(flat_list, ordered=
False)
125 """ Orchestra every function and executes find_irsc function using multiprocessing
128 event (dict): Dictionary result of ProcessFiles.py
129 s3_session: (boto3.session object): S3 session to connect to AWS bucket
132 cat_match = event[
"cat_match"]
135 s3_filename =
"s3://"+event[
"results_bucket"]+
"/"+event[
"results_path"]+
"/"
136 for i
in event[
"format"]:
140 all_files.append(s3_filename+j)
141 dfs = wr.s3.read_parquet(all_files, boto3_session=s3_session, chunked=500_000)
153 if str(cat_match) ==
"True" or str(cat_match) ==
"CatType.true":
154 print(f
"{'*'*15} Catalogue Matching Starting {'*'*15}")
157 isrcs = list(df[
"isrc_id"].unique())
158 pool = mp.Pool(processes=6)
159 pool.starmap(find_irsc, zip(isrcs, repeat(df)))
163 print(f
"{'*'*15} Uploading Data to Mongo {'*'*15}")
165 print(f
"Batch: {batch} finished. Lines uploaded to {collection_name}")
upload_main(event, s3_session)
delete_files_mongoDB(files)