3import awswrangler
as wr
6from pandas
import DataFrame, concat
7from datetime
import datetime
as dt
8from pymongo
import MongoClient
9from dotenv
import load_dotenv
10from botocore.exceptions
import ClientError
12index = f_path.find(
"report_generation/")
13f_path = f_path[:index]
14sys.path.insert(1, f_path+
'importer')
15from MongoConnection
import mongo_connection
19ACCESS_ID, ACCESS_KEY = os.environ.get(
"AWS_KEY_ID"), os.environ.get(
"AWS_KEY_SECRET")
22mongo_conn = mongo_connection()
24snap_collection = mongo_conn.mongo_conn_snapshots()
31session = boto3.Session(aws_access_key_id=ACCESS_ID, aws_secret_access_key= ACCESS_KEY)
33update_at = dt.now().replace(hour=0, minute=0, second=0, microsecond=0)
36 """Receives a S3 path and loads the data using awswrangler ans chunks (1M lines per chunk)
39 filename (str): s3 file location
40 chunk (int): chunk size
41 Returns: final_df (pandas dataframe)
43 global final_df, total_rows, total_local
44 _dfs = wr.s3.read_csv(filename, low_memory=
False, chunksize=chunk, keep_default_na=
False, boto3_session=session)
46 total_rows += df.shape[0]
47 total_local += float(df[
"total_local"].sum())
48 df.sort_values(
"total_local", ascending=
False, inplace=
True)
49 df = df.iloc[:10000,:]
50 final_df = concat([final_df, df], ignore_index=
True)
54 """Group by service_id, territory_code, artists and tracks
57 data (pandas dataframe): current parquet file loaded as dataframe
58 fields (list): list of fields that are considered to snapshot
59 snapshot (dict): it will contain all fields info
64 df_grouped = data.groupby(field)[
"total_local"].sum()
69 """Checks the field and fill the snapshot field using grouped data.
72 df (pandas dataframe): current parquet file loaded as dataframe
73 field (str): current file used as filter
74 snapshot (dict): it will contain all fields info
75 Returns: snapshot (dict)
77 df.sort_values(ascending=
False, inplace=
True)
78 for i
in df.index[:50]:
79 if field ==
"release_id":
80 element = {
"release_id":i,
"total":float(df.loc[i])}
81 snapshot[
"byUPC"].append(element)
82 elif field ==
"artists":
83 element = {
"artist":i,
"total":float(df.loc[i])}
84 snapshot[
"byArtist"].append(element)
85 elif field ==
"service_id":
86 element = {
"service":i,
"total":float(df.loc[i])}
87 snapshot[
"byDSP"].append(element)
88 elif field ==
"territory_code":
89 element = {
"territory_code":i,
"total":float(df.loc[i])}
90 snapshot[
"byTerritory"].append(element)
91 elif field ==
"track_title":
92 element = {
"track_title":i,
"total":float(df.loc[i])}
93 snapshot[
"byTrack"].append(element)
98 """Creates and updates snapshot to mongoDB.
101 event (dict): contains the file and client info
102 final_path (str): Path where original csv is saved in s3
103 snapshot (dict): it contains all fields info
104 period (str): month and year when sales were processed
107 name = final_path.split(
"/")[-1]
108 file_id =
"{}_report_{}".format(event[
"client_id"], event[
"date"])
110 "file_db_id": file_id,
111 "status":
"finished",
112 "client_id": event[
"client_id"],
114 "s3_path": final_path,
115 "reporting_period": period,
116 "total_local": total_local,
117 "snapshot": snapshot,
118 "updated_at": update_at,
119 "total_rows": total_rows
121 c = snap_collection.replace_one({
"file_db_id":file_id}, document)
122 if c.matched_count == 0:
123 snap_collection.insert_one(document)
124 print(f
"Generating Snapshot with id: {file_id}")
127 """Searches for all snapshots id in mongoDB."""
128 date = dt.strptime(date,
"%Y-%m-%d")
129 ids = snap_collection.find({
"name": {
"$in": files},
"updated_at": date}, {
"_id":1,
"file_db_id":1,
"name": 1})
131 snapshots = [{
"_id": i[
"_id"],
"file_db_id":i[
"file_db_id"],
"name": i[
"name"]}
for i
in ids]
133 raise ValueError(
"No snapshots found")
137 """Executes full procedure
140 event (dict): contains the file and client info
141 final_path (str): Path where original csv is saved in s3
144 s3_path =
"s3://{}/{}".format(event[
"bucket"], final_path)
146 fields = [
"release_id",
"service_id",
"territory_code",
"artists",
"track_title"]
152 period = df[
"date"].value_counts().index[0]
155 period = period[:7].replace(
"-",
"")
158 period = period.replace(
"-",
"")
160 print(
"EL PERIODO ES: ", period)
161 print(
upload_mongo(event, final_path, snapshot, period))
create_snapshot(event, final_path)
read_report_csv(filename, chunk)
upload_mongo(event, final_path, snapshot, period)
build_snapshot(df, field, snapshot)
group_by_field(data, fields, snapshot)
search_snpashots_id(files, date)