Glider
Loading...
Searching...
No Matches
CreateSnapshots.py
Go to the documentation of this file.
1import os
2import ssl
3import boto3
4import sys
5import traceback
6import awswrangler as wr
7import pandas as pd
8from datetime import datetime as dt
9from dotenv import load_dotenv
10from pathlib import Path
11# from pymongo import MongoClient
12from ErrorHandler import *
13# from TemplateManager import TemplateManager
14from MongoConnection import mongo_connection
15# from CreateSnapshotsFonal import snapshot_playground
16
17# dotenv_path = Path("src/importer/.env")
18load_dotenv()
19
20
21mongo_conn = mongo_connection()
22
23collection = mongo_conn.mongo_conn_snapshots()
24
25
26ACCESS_ID, ACCESS_KEY = os.environ.get("AWS_KEY_ID"), os.environ.get("AWS_KEY_SECRET")
27
28
29s3_session = boto3.Session(aws_access_key_id=ACCESS_ID, aws_secret_access_key=ACCESS_KEY)
30
31def get_data(file, bucket_out, path_out):
32 """Receives a S3 path and loads the data using awswrangler
33
34 Args:
35 file (str): current filename
36 bucket_out (str): bucket where parquet file is storage
37 path_out (str): s3 path where parquet file is storage
38 Returns: df (pandas dataframe)
39 """
40 s3_filename = "s3://{bucket}/{path}/{file}".format(bucket=bucket_out,path=path_out,file=file)
41 df = wr.s3.read_parquet(s3_filename, boto3_session=s3_session)
42 return df
43
44def group_by_field(data, fields, snapshot):
45 """Group by service_id, territory_code, artists and tracks
46
47 Args:
48 data (pandas dataframe): current parquet file loaded as dataframe
49 fields (list): list of fields that are considered to snapshot
50 snapshot (dict): it will contain all fields info
51 Returns: snap (dict)
52 """
53 snap_fields = {"service_id":"byDSP", "territory_code": "byTerritory", "artists":"byArtist", "track_title": "byTrack", "release_id": "byRelease"}
54 # snap_fields = {"artists":"byArtist", "track_title": "byTrack", "release_id": "byRelease"}
55 for field in fields:
56 snap_field = snap_fields[field]
57 if field == "track_title":
58 grouped_df = data.groupby('track_title').agg({
59 'total_local': 'sum',
60 'isrc_id': 'first',
61 'artists': 'first'
62 }).reset_index()
63 # grouped_df = data.groupby('track_title').agg({
64 # 'total_local': 'sum',
65 # 'isrc_id': 'first',
66 # 'artists': 'first',
67 # 'release_id': lambda x: pd.Series(x).unique().tolist(),
68 # 'territory_code': lambda x: pd.Series(x).unique().tolist(),
69 # 'service_id': lambda x: pd.Series(x).unique().tolist()}).reset_index()
70 elif field == "artists":
71 grouped_df = data.groupby('artists').agg({
72 'total_local': 'sum',
73 # 'release_id': 'first'
74 'release_id': lambda x: pd.Series(x).unique().tolist()
75 }).reset_index()
76 else:
77 grouped_df = data.groupby(field).agg({
78 'total_local': 'sum',
79 }).reset_index()
80 grouped_df.rename(columns={field: 'key', 'total_local': 'total'}, inplace=True)
81 snapshot[snap_field] = grouped_df.to_dict(orient='records')
82 # df_grouped = data.groupby(field)["total_local"].sum()
83 # snap = build_snapshot(df_grouped, field, snapshot)
84 return snapshot
85
86def upload_mongo(results, file_db_id, update, length):
87 """Update fields using snapshot variable.
88
89 Args:
90 results (dict): current parquet file loaded as dataframe
91 file_db_id (str): list of fields that are considered to snapshot
92 update (datetime stamp): the current datetime where snapshot is updated
93 length (int): total rows
94 Returns: Nothing
95 """
96 for field in results:
97 collection.update_one({"file_db_id":file_db_id},{"$set": {field: results[field]}})
98 collection.update_one({"file_db_id":file_db_id},{"$set": {"status": "ingested", "status_cause": "No Errors", "error_message": "No message","updated_at": update, "total_rows": length}})
99
100
101def results_grouped(event, context):
102 """Executes full procedure
103
104 Args:
105 event (dict): is a dictionary with all client and sales information
106 context (none): it's required just for lambda execution
107 Returns: (dict)
108 """
109 if event["status"] != "OK":
110 return {"status": "No snapshots generated due: "+str(event["status"])}
111 snapshot_created = dict()
112 snapshot_not_created = dict()
113 status = "Snapshots Created"
114 # playground_formats = [event["format"][i] for i in event["format"] if "playground" in i]
115 # if playground_formats:
116 # snapshot_playground(event, playground_formats)
117 for formats in event["format"]:
118 # if "playground" in formats:
119 # continue
120 files = event["format"][formats]["files"]
121 if len(files) == 0:
122 continue
123 for file in files:
124 try:
125 file_db_id = file["file_id"]
126 filename = file["file"]
127 filename = filename+".parquet"
128 fields = ["service_id", "territory_code", "artists", "track_title", "release_id"]
129 # fields = ["artists", "track_title", "release_id"]
130 df = get_data(filename, event["results_bucket"], event["results_path"])
131 total_rows = df.shape
132 snapshot = dict({"byArtist":[], "byDSP": [], "byTerritory":[], "byTrack":[], "byRelease": []})
133 # snapshot = dict({"byArtist":[], "byTrack":[], "byRelease": []})
134 snapshot = group_by_field(df, fields, snapshot)
135 period = str(df.loc[0,"period"])
136
137 if len(period) > 7:
138 period = period[:7].replace("-","")
139 # elif len(period) == 7:
140 else:
141 period = period.replace("-","")
142 try:
143 results = {
144 "procedure_status": "OK",
145 "reporting_period": period,#.getType(filename, formats),
146 "total_local": float(df["total_local"].sum()),
147 "total_gross": float(df["total_gross"].sum()),
148 "total_net": float(df["total_net"].sum()),
149 "snapshot": snapshot
150 }
151 except:
152 results = {
153 "procedure_status": "OK",
154 "reporting_period": period,#.getType(filename, formats),
155 "total_local": float(df["total_local"].sum()),
156 "snapshot": snapshot
157 }
158 print("{}: {} Generating Snapshot".format(file_db_id, filename))
159 update_at = dt.now().replace(hour=0, minute=0, second=0, microsecond=0)
160 if "playground_digital" in formats:
161 root_db_id = file_db_id.split("-")[:-1]
162 root_db_id = "-".join(root_db_id)
163 upload_mongo(results, root_db_id, update_at, total_rows[0])
164 upload_mongo(results, file_db_id, update_at, total_rows[0])
165 snapshot_created[file_db_id] = filename
166 except Exception as e:
167 file_db_id = file["file_id"]
168 # print(sys.exc_info()[2])
169 # print(traceback.format_exc())
170 print(e)
171 m = "{}\n{}".format(sys.exc_info()[2], traceback.format_exc())
172 eh = ErrorHandler()
173 error = eh.handle(e, m, file_db_id)
174 status = "Some snapshots were not created"
175 snapshot_not_created[file_db_id] = {"file": file["file"], "error": error}
176 continue
177 # return {"status":error}
178
179 return {"procedure_status":status,
180 "results_bucket": event["results_bucket"],
181 "results_path": event["results_path"],
182 "client_id": event["client_id"],
183 "snapshots_created": snapshot_created,
184 "no_snapshots": snapshot_not_created}
185
group_by_field(data, fields, snapshot)
upload_mongo(results, file_db_id, update, length)
get_data(file, bucket_out, path_out)