4from datetime
import date, datetime
5from dotenv
import load_dotenv
10ACCESS_ID, ACCESS_KEY = os.environ.get(
"AWS_KEY_ID"), os.environ.get(
"AWS_KEY_SECRET")
13s3_client = boto3.client(
"s3", aws_access_key_id=ACCESS_ID, aws_secret_access_key= ACCESS_KEY)
16 """Receives the final path where files were uploaded and make a list of them.
19 final_path (str): s3 path where files were uploaded
23 bucket = final_path.split(
"/",1)[0]
24 path = final_path.split(
"/",1)[1]
25 obj = s3_client.list_objects(Bucket = bucket, Prefix=path)
26 for file
in obj[
"Contents"]:
27 if file[
"Key"].endswith(
"/"):
30 full_path = bucket+
"/"+file[
"Key"]
31 files.append(full_path)
35 """Checks which files in the input were uploaded successful.
38 event (dict): is a dictionary with all client and files information
39 final_path (str): s3 path where files were uploaded
40 Returns: files_out (list)
44 if type(event[
"file"]) == str:
45 files = [event[
"file"]]
47 full_path = final_path+file
48 files_out.append(full_path)
52 """Sends a success/fail notification to some webhook.
55 module (str): current module
56 webhook_url (str): webhook url
57 status (str): final status of execution
58 files (list): files uploaded to s3. None when status =! successful
59 error (str): error show while procedure fails. None when status = successful
63 date = now.strftime(
"%Y-%m-%dT%H:%M:%SZ")
64 if status ==
"Successful":
69 "message":
"Files uploader has finished. Reports are on S3.",
77 "message":
"Files could not be uploaded. Copy the error bellow and contact support to fix it.",
80 req = requests.post(webhook_url, data=json.dumps(message, indent=4), headers={
"Content-type":
"application/json"})
81 print(
"Request sent to OAS Webhook")