Glider
Loading...
Searching...
No Matches
ProcessFiles.py
Go to the documentation of this file.
1# import io
2import os
3import boto3
4import sys
5import traceback
6import awswrangler as wr
7from datetime import date
8from multiprocessing import Process, Manager
9from ColumnsManager import BuildOttoData
10from ErrorHandler import *
11from Validate import validate_data
12from FormatManager import csvManager
13from dotenv import load_dotenv
14from UploadSales import upload_main
15from MongoConnection import mongo_connection
16
17load_dotenv()
18
19merlin_formats = ["akazoo","alibaba","anghami","AWA", "awa", "boomplay","deezer","iheart","jiosaavn","kkbox","mixcloud","netease","pandora","slacker","soundcloud",
20 "soundtrack_your_brand","spotify","tencent","tiktok","uma","yandex", "facebook", "roxi", "triller", "resso", "peloton","snapchat",
21 "jaxsta","trebel", "youtube_merlin", "vevo", "youtube_shorts", "youtube_merlin_label", "audiblemagic",
22 "facebook_revshare", "joox", "saavn", "tiktok-miniplayer", "kkbox_v2", "soundcloud_v2", "youtube_tier"]#"spotify_discovery",
23
24
25ACCESS_ID, ACCESS_KEY = os.environ.get("AWS_KEY_ID"), os.environ.get("AWS_KEY_SECRET")
26
27
28mongo_conn = mongo_connection()
29
30snap_collection = mongo_conn.mongo_conn_snapshots()
31
32collection_name = os.environ.get("COLLECTION")
33
34collection = mongo_conn.mongo_conn_sales()
35
36
37s3_session = boto3.Session(aws_access_key_id=ACCESS_ID, aws_secret_access_key=ACCESS_KEY)
38
39def save_parquet_s3(df, event, name):
40 """Save current dataframe as parquet file in s3
41
42 Args:
43 df (pandas dataframe): processed file loaded as dataframe
44 event (dict): is a dictionary with all client and sales information
45 name (str): current filename
46 Returns: response (str)
47 """
48 df["quantity"] = df["quantity"].fillna(0)
49 df["total_local"] = df["total_local"].fillna(0.0)
50 df["full_total_foreign"] = df["full_total_foreign"].fillna(0.0)
51 df["quantity"] = df["quantity"].astype("int")
52 df["total_local"] = df["total_local"].astype("float")
53 df["full_total_foreign"] = df["full_total_foreign"].astype("float")
54 df["catalogue_id"] = "no_catalogue_id"
55 df.fillna("undefined", inplace=True)
56 df.reset_index(drop=True, inplace=True)
57 bucket_out = event["bucket"][1]
58 path_out = event["path"][1]
59 parquet_file = name+'.parquet'
60 wr.s3.to_parquet(
61 df=df,
62 path="s3://{bucket}/{path}/{file}".format(bucket=bucket_out, path=path_out, file=parquet_file),
63 boto3_session=s3_session,
64 index = False
65 )
66 response = "{} uploaded to {}/{}".format(parquet_file, bucket_out, path_out)
67 return response, df
68
69def upload_data_mongo(df, filename):
70 """ Uploads batch lines to mongodb
71
72 Args:
73 df (dataframe): Dataframe with sales matched by catalogue
74 Returns: Nothing
75 """
76 # files_delete = [i["file"] for i in files]
77 documents = collection.find_one({"file": filename})
78 if documents:
79 r = collection.delete_many({"file": filename})
80 response = f"{r.deleted_count} sales lines deleted"
81 else:
82 response = "No files deleted"
83 print(response)
84 # lines = df.apply(lambda x: x.to_dict(), axis=1).to_list()
85 lines = df.to_dict(orient="records")
86 try:
87 print(f"{'*'*15} Uploading Data to Mongo {'*'*15}")
88 chunk_size = 500000 # NĂºmero de documentos por lote
89 for start in range(0, len(lines), chunk_size):
90 end = start + chunk_size
91 chunk = lines[start:end]
92 collection.insert_many(chunk)
93 print(f"Batch: {end} finished.")
94 # collection.insert_many(lines)
95 print(f"Lines uploaded to {collection_name}")
96 except Exception as e:
97 raise Exception("There's no lines to upload")
98
99def fix_date(date):
100 """Normalize date column to YYYY-MM-DD format
101
102 Args:
103 date (datetime stamp): date column
104 Returns: date (str)
105 """
106 date = str(date)
107 date = date.replace(" 00:00:00", "")
108 return date
109
110def get_period(date, filename):
111 """Generates period given the date
112
113 Args:
114 date (str): date column
115 Returns: period (str)
116 """
117 if "KepachMusictas" in filename:
118 from templates import FugaTemplate
119 period = FugaTemplate().date(filename)
120 else:
121 date = str(date)
122 if len(date) > 7:
123 period = date[:7].replace("-","")
124 else:
125 period = date.replace("-","")
126 return period
127
128def create_new_df(event, formats, return_dict):
129 """Executes full procedure per format
130
131 Args:
132 event (dict): is a dictionary with all client and sales information
133 formats (str): current format to process
134 return_dict (dict): is a dictionary with all processed files information
135
136 Returns: return_dict (dict)
137 """
138 # write_DF = get_DF_otto()
139 no_processed=dict()
140 final_files = event["format"][formats]["files"][:]
141 for file in event["format"][formats]["files"]:
142 try:
143 print(file)
144 data_complete, status_data = csvManager.df_csv(event, formats, file, s3_session, BuildOttoData())
145 print(status_data)
146 if status_data != "Ok":
147 raise FileNotLoaded()
148 # Data validation
149 vd = validate_data()
150 data_complete, status_valid = vd.validation(data_complete)
151 if status_valid != "Ok":
152 raise ValidationFailed()
153 # Add accounting date column
154 if "currencies not found" in BuildOttoData().add_exchange(data_complete, event["currency"], event["currency_exception"]):
155 raise CurrencyNotFound()
156
157 data_complete = BuildOttoData().add_info_columns(data_complete, file["file"], event["client_id"], event["base_currency"])
158
159 # print("{FILE} file total previous: {TOTAL}".format(FILE=file["file"], TOTAL=data_complete["total_local"].sum()))
160 # if formats in merlin_formats:
161 # print("Making discount for {}".format(file["file"]))
162 # write_DF.discount(data_complete)
163
164 # Add period and date columns
165 data_complete["period"] = data_complete.apply(lambda Row: get_period(Row["date"], file["file"]), axis=1)
166 data_complete["date"] = data_complete.apply(lambda Row: fix_date(Row["date"]), axis=1)
167 print(data_complete.head())
168 p_response, data_complete = save_parquet_s3(data_complete, event, file["file"])
169 print(p_response)
170 snap_collection.update_one({"file_db_id":file["file_id"]},{"$set":{"status":"processed"}})
171 upload_data_mongo(data_complete, file["file"])
172 except Exception as e:
173 print(sys.exc_info()[2])
174 print(traceback.format_exc())
175 m = "{}\n{}".format(sys.exc_info()[2], traceback.format_exc())
176 file_db_id = file["file_id"]
177 eh = ErrorHandler()
178 error = eh.handle(e, m, file_db_id)
179 no_processed[file_db_id]={"file":file["file"], "error": error}
180 final_files.remove(file)
181 continue
182
183 event["format"][formats].pop("columns")
184 return_dict["status"] = "OK"
185 return_dict["tag"] = event["tag"]
186 return_dict["cat_gen"] = event["cat_gen"]
187 return_dict["cat_match"] = event["cat_match"]
188 return_dict["results_bucket"] = event["bucket"][1]
189 return_dict["results_path"] = event["path"][1]
190 return_dict["client_id"] = event["client_id"]
191 return_dict["format_{}".format(formats)] = final_files
192 return_dict["no_processed_{}".format(formats)] = no_processed
193
194 return return_dict
195
196def process_files_parallel(event, context=None):
197 """Executes full procedure using multiprocessing to process several formats at the same time
198
199 Args:
200 event (dict): is a dictionary with all client and sales information
201 context (none): it's required just for lambda execution
202 Returns: final_output (dict)
203 """
204 if len(event["format"]) == 0:
205 return {"status": "No Files to Process"}
206 manager = Manager()
207 return_dict = manager.dict()
208 jobs = []
209 for formats in event["format"]:
210 p = Process(target=create_new_df, args=(event, formats, return_dict, ))
211 jobs.append(p)
212 p.start()
213 for proc in jobs:
214 proc.join()
215 return_dict = dict(return_dict)
216 final_output = dict()
217 final_output["format"] = dict()
218 for i in return_dict:
219 if "format_" in i:
220 name = i.split("format_")[1]
221 final_output["format"][name] = {"files": return_dict[i]}
222 else:
223 final_output[i] = return_dict[i]
224 final_output["no_format_identified"] = event["no_format_identified"]
225 # try:
226 # # Matches and uploads sales to MongoDB
227 # upload_main(final_output, s3_session)
228 # except:
229 # print("Catalogue Matching has failed")
230 # print(sys.exc_info()[2])
231 # print(traceback.format_exc())
232 # pass
233 return final_output
234
upload_data_mongo(df, filename)
process_files_parallel(event, context=None)
get_period(date, filename)
create_new_df(event, formats, return_dict)
save_parquet_s3(df, event, name)