40 """Save current dataframe as parquet file in s3
43 df (pandas dataframe): processed file loaded as dataframe
44 event (dict): is a dictionary with all client and sales information
45 name (str): current filename
46 Returns: response (str)
48 df[
"quantity"] = df[
"quantity"].fillna(0)
49 df[
"total_local"] = df[
"total_local"].fillna(0.0)
50 df[
"full_total_foreign"] = df[
"full_total_foreign"].fillna(0.0)
51 df[
"quantity"] = df[
"quantity"].astype(
"int")
52 df[
"total_local"] = df[
"total_local"].astype(
"float")
53 df[
"full_total_foreign"] = df[
"full_total_foreign"].astype(
"float")
54 df[
"catalogue_id"] =
"no_catalogue_id"
55 df.fillna(
"undefined", inplace=
True)
56 df.reset_index(drop=
True, inplace=
True)
57 bucket_out = event[
"bucket"][1]
58 path_out = event[
"path"][1]
59 parquet_file = name+
'.parquet'
62 path=
"s3://{bucket}/{path}/{file}".format(bucket=bucket_out, path=path_out, file=parquet_file),
63 boto3_session=s3_session,
66 response =
"{} uploaded to {}/{}".format(parquet_file, bucket_out, path_out)
70 """ Uploads batch lines to mongodb
73 df (dataframe): Dataframe with sales matched by catalogue
77 documents = collection.find_one({
"file": filename})
79 r = collection.delete_many({
"file": filename})
80 response = f
"{r.deleted_count} sales lines deleted"
82 response =
"No files deleted"
85 lines = df.to_dict(orient=
"records")
87 print(f
"{'*'*15} Uploading Data to Mongo {'*'*15}")
89 for start
in range(0, len(lines), chunk_size):
90 end = start + chunk_size
91 chunk = lines[start:end]
92 collection.insert_many(chunk)
93 print(f
"Batch: {end} finished.")
95 print(f
"Lines uploaded to {collection_name}")
96 except Exception
as e:
97 raise Exception(
"There's no lines to upload")
129 """Executes full procedure per format
132 event (dict): is a dictionary with all client and sales information
133 formats (str): current format to process
134 return_dict (dict): is a dictionary with all processed files information
136 Returns: return_dict (dict)
140 final_files = event[
"format"][formats][
"files"][:]
141 for file
in event[
"format"][formats][
"files"]:
144 data_complete, status_data = csvManager.df_csv(event, formats, file, s3_session,
BuildOttoData())
146 if status_data !=
"Ok":
150 data_complete, status_valid = vd.validation(data_complete)
151 if status_valid !=
"Ok":
154 if "currencies not found" in BuildOttoData().add_exchange(data_complete, event[
"currency"], event[
"currency_exception"]):
157 data_complete =
BuildOttoData().add_info_columns(data_complete, file[
"file"], event[
"client_id"], event[
"base_currency"])
165 data_complete[
"period"] = data_complete.apply(
lambda Row:
get_period(Row[
"date"], file[
"file"]), axis=1)
166 data_complete[
"date"] = data_complete.apply(
lambda Row:
fix_date(Row[
"date"]), axis=1)
167 print(data_complete.head())
168 p_response, data_complete =
save_parquet_s3(data_complete, event, file[
"file"])
170 snap_collection.update_one({
"file_db_id":file[
"file_id"]},{
"$set":{
"status":
"processed"}})
172 except Exception
as e:
173 print(sys.exc_info()[2])
174 print(traceback.format_exc())
175 m =
"{}\n{}".format(sys.exc_info()[2], traceback.format_exc())
176 file_db_id = file[
"file_id"]
178 error = eh.handle(e, m, file_db_id)
179 no_processed[file_db_id]={
"file":file[
"file"],
"error": error}
180 final_files.remove(file)
183 event[
"format"][formats].pop(
"columns")
184 return_dict[
"status"] =
"OK"
185 return_dict[
"tag"] = event[
"tag"]
186 return_dict[
"cat_gen"] = event[
"cat_gen"]
187 return_dict[
"cat_match"] = event[
"cat_match"]
188 return_dict[
"results_bucket"] = event[
"bucket"][1]
189 return_dict[
"results_path"] = event[
"path"][1]
190 return_dict[
"client_id"] = event[
"client_id"]
191 return_dict[
"format_{}".format(formats)] = final_files
192 return_dict[
"no_processed_{}".format(formats)] = no_processed
197 """Executes full procedure using multiprocessing to process several formats at the same time
200 event (dict): is a dictionary with all client and sales information
201 context (none): it's required just for lambda execution
202 Returns: final_output (dict)
204 if len(event[
"format"]) == 0:
205 return {
"status":
"No Files to Process"}
207 return_dict = manager.dict()
209 for formats
in event[
"format"]:
210 p = Process(target=create_new_df, args=(event, formats, return_dict, ))
215 return_dict = dict(return_dict)
216 final_output = dict()
217 final_output[
"format"] = dict()
218 for i
in return_dict:
220 name = i.split(
"format_")[1]
221 final_output[
"format"][name] = {
"files": return_dict[i]}
223 final_output[i] = return_dict[i]
224 final_output[
"no_format_identified"] = event[
"no_format_identified"]