32 """Receives a S3 path and loads the data using awswrangler
35 file (str): current filename
36 bucket_out (str): bucket where parquet file is storage
37 path_out (str): s3 path where parquet file is storage
38 Returns: df (pandas dataframe)
40 s3_filename =
"s3://{bucket}/{path}/{file}".format(bucket=bucket_out,path=path_out,file=file)
41 df = wr.s3.read_parquet(s3_filename, boto3_session=s3_session)
45 """Group by service_id, territory_code, artists and tracks
48 data (pandas dataframe): current parquet file loaded as dataframe
49 fields (list): list of fields that are considered to snapshot
50 snapshot (dict): it will contain all fields info
53 snap_fields = {
"service_id":
"byDSP",
"territory_code":
"byTerritory",
"artists":
"byArtist",
"track_title":
"byTrack",
"release_id":
"byRelease"}
56 snap_field = snap_fields[field]
57 if field ==
"track_title":
58 grouped_df = data.groupby(
'track_title').agg({
70 elif field ==
"artists":
71 grouped_df = data.groupby(
'artists').agg({
74 'release_id':
lambda x: pd.Series(x).unique().tolist()
77 grouped_df = data.groupby(field).agg({
80 grouped_df.rename(columns={field:
'key',
'total_local':
'total'}, inplace=
True)
81 snapshot[snap_field] = grouped_df.to_dict(orient=
'records')
102 """Executes full procedure
105 event (dict): is a dictionary with all client and sales information
106 context (none): it's required just for lambda execution
109 if event[
"status"] !=
"OK":
110 return {
"status":
"No snapshots generated due: "+str(event[
"status"])}
111 snapshot_created = dict()
112 snapshot_not_created = dict()
113 status =
"Snapshots Created"
117 for formats
in event[
"format"]:
120 files = event[
"format"][formats][
"files"]
125 file_db_id = file[
"file_id"]
126 filename = file[
"file"]
127 filename = filename+
".parquet"
128 fields = [
"service_id",
"territory_code",
"artists",
"track_title",
"release_id"]
130 df =
get_data(filename, event[
"results_bucket"], event[
"results_path"])
131 total_rows = df.shape
132 snapshot = dict({
"byArtist":[],
"byDSP": [],
"byTerritory":[],
"byTrack":[],
"byRelease": []})
135 period = str(df.loc[0,
"period"])
138 period = period[:7].replace(
"-",
"")
141 period = period.replace(
"-",
"")
144 "procedure_status":
"OK",
145 "reporting_period": period,
146 "total_local": float(df[
"total_local"].sum()),
147 "total_gross": float(df[
"total_gross"].sum()),
148 "total_net": float(df[
"total_net"].sum()),
153 "procedure_status":
"OK",
154 "reporting_period": period,
155 "total_local": float(df[
"total_local"].sum()),
158 print(
"{}: {} Generating Snapshot".format(file_db_id, filename))
159 update_at = dt.now().replace(hour=0, minute=0, second=0, microsecond=0)
160 if "playground_digital" in formats:
161 root_db_id = file_db_id.split(
"-")[:-1]
162 root_db_id =
"-".join(root_db_id)
163 upload_mongo(results, root_db_id, update_at, total_rows[0])
164 upload_mongo(results, file_db_id, update_at, total_rows[0])
165 snapshot_created[file_db_id] = filename
166 except Exception
as e:
167 file_db_id = file[
"file_id"]
171 m =
"{}\n{}".format(sys.exc_info()[2], traceback.format_exc())
173 error = eh.handle(e, m, file_db_id)
174 status =
"Some snapshots were not created"
175 snapshot_not_created[file_db_id] = {
"file": file[
"file"],
"error": error}
179 return {
"procedure_status":status,
180 "results_bucket": event[
"results_bucket"],
181 "results_path": event[
"results_path"],
182 "client_id": event[
"client_id"],
183 "snapshots_created": snapshot_created,
184 "no_snapshots": snapshot_not_created}