Glider
Loading...
Searching...
No Matches
ReportSnapshot.py
Go to the documentation of this file.
1import ssl
2import os
3import awswrangler as wr
4import boto3
5import sys, traceback
6from pandas import DataFrame, concat
7from datetime import datetime as dt
8from pymongo import MongoClient
9from dotenv import load_dotenv
10from botocore.exceptions import ClientError
11f_path = __file__
12index = f_path.find("report_generation/")
13f_path = f_path[:index]
14sys.path.insert(1, f_path+'importer')
15from MongoConnection import mongo_connection
16load_dotenv()
17
18
19ACCESS_ID, ACCESS_KEY = os.environ.get("AWS_KEY_ID"), os.environ.get("AWS_KEY_SECRET")
20
21
22mongo_conn = mongo_connection()
23
24snap_collection = mongo_conn.mongo_conn_snapshots()
25
26final_df = DataFrame()
27total_rows = 0
28total_local = 0
29
30
31session = boto3.Session(aws_access_key_id=ACCESS_ID, aws_secret_access_key= ACCESS_KEY)
32
33update_at = dt.now().replace(hour=0, minute=0, second=0, microsecond=0)
34
35def read_report_csv(filename, chunk):
36 """Receives a S3 path and loads the data using awswrangler ans chunks (1M lines per chunk)
37
38 Args:
39 filename (str): s3 file location
40 chunk (int): chunk size
41 Returns: final_df (pandas dataframe)
42 """
43 global final_df, total_rows, total_local
44 _dfs = wr.s3.read_csv(filename, low_memory=False, chunksize=chunk, keep_default_na=False, boto3_session=session)
45 for df in _dfs:
46 total_rows += df.shape[0]
47 total_local += float(df["total_local"].sum())
48 df.sort_values("total_local", ascending=False, inplace=True)
49 df = df.iloc[:10000,:]
50 final_df = concat([final_df, df], ignore_index=True)
51 return final_df
52
53def group_by_field(data, fields, snapshot):
54 """Group by service_id, territory_code, artists and tracks
55
56 Args:
57 data (pandas dataframe): current parquet file loaded as dataframe
58 fields (list): list of fields that are considered to snapshot
59 snapshot (dict): it will contain all fields info
60 Returns: snap (dict)
61 """
62 #Group by service_id, territory_code. artists
63 for field in fields:
64 df_grouped = data.groupby(field)["total_local"].sum()
65 snap = build_snapshot(df_grouped, field, snapshot)
66 return snap
67
68def build_snapshot(df, field, snapshot):
69 """Checks the field and fill the snapshot field using grouped data.
70
71 Args:
72 df (pandas dataframe): current parquet file loaded as dataframe
73 field (str): current file used as filter
74 snapshot (dict): it will contain all fields info
75 Returns: snapshot (dict)
76 """
77 df.sort_values(ascending=False, inplace=True)
78 for i in df.index[:50]:
79 if field == "release_id":
80 element = {"release_id":i, "total":float(df.loc[i])}
81 snapshot["byUPC"].append(element)
82 elif field == "artists":
83 element = {"artist":i, "total":float(df.loc[i])}
84 snapshot["byArtist"].append(element)
85 elif field == "service_id":
86 element = {"service":i, "total":float(df.loc[i])}
87 snapshot["byDSP"].append(element)
88 elif field == "territory_code":
89 element = {"territory_code":i, "total":float(df.loc[i])}
90 snapshot["byTerritory"].append(element)
91 elif field == "track_title":
92 element = {"track_title":i, "total":float(df.loc[i])}
93 snapshot["byTrack"].append(element)
94 return snapshot
95
96
97def upload_mongo(event, final_path, snapshot, period):
98 """Creates and updates snapshot to mongoDB.
99
100 Args:
101 event (dict): contains the file and client info
102 final_path (str): Path where original csv is saved in s3
103 snapshot (dict): it contains all fields info
104 period (str): month and year when sales were processed
105 Returns: Nothing
106 """
107 name = final_path.split("/")[-1]
108 file_id = "{}_report_{}".format(event["client_id"], event["date"])
109 document = {
110 "file_db_id": file_id,
111 "status": "finished",
112 "client_id": event["client_id"],
113 "name": name,
114 "s3_path": final_path,
115 "reporting_period": period,
116 "total_local": total_local,
117 "snapshot": snapshot,
118 "updated_at": update_at,
119 "total_rows": total_rows
120 }
121 c = snap_collection.replace_one({"file_db_id":file_id}, document)
122 if c.matched_count == 0:
123 snap_collection.insert_one(document)
124 print(f"Generating Snapshot with id: {file_id}")
125
126def search_snpashots_id(files, date):
127 """Searches for all snapshots id in mongoDB."""
128 date = dt.strptime(date, "%Y-%m-%d")
129 ids = snap_collection.find({"name": {"$in": files}, "updated_at": date}, {"_id":1, "file_db_id":1, "name": 1})
130 if ids:
131 snapshots = [{"_id": i["_id"], "file_db_id":i["file_db_id"], "name": i["name"]} for i in ids]
132 return snapshots
133 raise ValueError("No snapshots found")
134
135
136def create_snapshot(event, final_path):
137 """Executes full procedure
138
139 Args:
140 event (dict): contains the file and client info
141 final_path (str): Path where original csv is saved in s3
142 Returns: Nothing
143 """
144 s3_path = "s3://{}/{}".format(event["bucket"], final_path)
145 df = read_report_csv(s3_path, 1_000_000)
146 fields = ["release_id", "service_id", "territory_code", "artists", "track_title"]
147 # snapshot = dict({"byUPC":[], "byArtist":[], "byDSP": [], "byTerritory":[], "byTrack":[]})
148 # snapshot = group_by_field(df, fields, snapshot)
149 # snapshot = dict({"id":[], "file_db_id": [], "files":[]})
150 snapshot = search_snpashots_id(event["files"], event["date"])
151 # print(ids)
152 period = df["date"].value_counts().index[0]
153
154 if len(period) > 7:
155 period = period[:7].replace("-","")
156 # elif len(period) == 7:
157 else:
158 period = period.replace("-","")
159
160 print("EL PERIODO ES: ", period)
161 print(upload_mongo(event, final_path, snapshot, period))
upload_mongo(event, final_path, snapshot, period)
group_by_field(data, fields, snapshot)