Glider
Loading...
Searching...
No Matches
Tools.py
Go to the documentation of this file.
1import os, json
2import boto3
3import requests
4from datetime import date, datetime
5from dotenv import load_dotenv
6
7load_dotenv()
8
9
10ACCESS_ID, ACCESS_KEY = os.environ.get("AWS_KEY_ID"), os.environ.get("AWS_KEY_SECRET")
11
12
13s3_client = boto3.client("s3", aws_access_key_id=ACCESS_ID, aws_secret_access_key= ACCESS_KEY)
14
15def gen_file_list_drive(final_path):
16 """Receives the final path where files were uploaded and make a list of them.
17
18 Args:
19 final_path (str): s3 path where files were uploaded
20 Returns: files (list)
21 """
22 files = []
23 bucket = final_path.split("/",1)[0]
24 path = final_path.split("/",1)[1]
25 obj = s3_client.list_objects(Bucket = bucket, Prefix=path)
26 for file in obj["Contents"]:
27 if file["Key"].endswith("/"):
28 pass
29 else:
30 full_path = bucket+"/"+file["Key"]
31 files.append(full_path)
32 return files
33
34def gen_file_list_event(event, final_path):
35 """Checks which files in the input were uploaded successful.
36
37 Args:
38 event (dict): is a dictionary with all client and files information
39 final_path (str): s3 path where files were uploaded
40 Returns: files_out (list)
41 """
42 files_out = []
43 files = event["file"]
44 if type(event["file"]) == str:
45 files = [event["file"]]
46 for file in files:
47 full_path = final_path+file
48 files_out.append(full_path)
49 return files_out
50
51def send_notification(module, webhook_url, status, files=None, error=None):
52 """Sends a success/fail notification to some webhook.
53
54 Args:
55 module (str): current module
56 webhook_url (str): webhook url
57 status (str): final status of execution
58 files (list): files uploaded to s3. None when status =! successful
59 error (str): error show while procedure fails. None when status = successful
60 Returns: Nothing
61 """
62 now = datetime.now()
63 date = now.strftime("%Y-%m-%dT%H:%M:%SZ")
64 if status == "Successful":
65 message = {
66 "date": date,
67 "module": module,
68 "status": "Finished",
69 "message": "Files uploader has finished. Reports are on S3.",
70 "files": files
71 }
72 else:
73 message = {
74 "date": date,
75 "module": module,
76 "status": "Failed",
77 "message": "Files could not be uploaded. Copy the error bellow and contact support to fix it.",
78 "error": error
79 }
80 req = requests.post(webhook_url, data=json.dumps(message, indent=4), headers={"Content-type": "application/json"})
81 print("Request sent to OAS Webhook")
gen_file_list_event(event, final_path)
Definition Tools.py:34
send_notification(module, webhook_url, status, files=None, error=None)
Definition Tools.py:51
gen_file_list_drive(final_path)
Definition Tools.py:15