Glider
Loading...
Searching...
No Matches
UploadReports.py
Go to the documentation of this file.
1import os
2import boto3
3import pyshorteners
4from dotenv import load_dotenv
5from botocore.exceptions import ClientError
6load_dotenv()
7
8ACCESS_ID = os.environ.get("AWS_KEY_ID")
9ACCESS_KEY = os.environ.get("AWS_KEY_SECRET")
10REGION = os.environ.get("REGION")
11ACCESS_TOKEN = os.environ.get("BITLY_TOKEN")
12s3_client = boto3.client("s3", aws_access_key_id=ACCESS_ID, aws_secret_access_key= ACCESS_KEY, region_name=REGION)
13
14def create_presigned_url(bucket, object, expiration=86400):#86400
15 """Generate a presigned URL to share an S3 object via lambda invoke
16
17 Args:
18 bucket (str): bucket where report was uploaded
19 object (str): full report object name
20 expiration (int): time in seconds for the presigned URL to remain valid
21 Returns: response (str): If error, returns None.
22 """
23 params={'Bucket': bucket, 'Key': object}
24 try:
25 response = s3_client.generate_presigned_url('get_object', Params=params, ExpiresIn=expiration)
26 except ClientError as e:
27 return None
28 return response
29 # get_log = True
30 # function_params = {
31 # "bucket": bucket,
32 # "object": object,
33 # "expiration": expiration
34 # }
35 # client = boto3.client('lambda', aws_access_key_id=ACCESS_ID, aws_secret_access_key=ACCESS_KEY, region_name="eu-west-2")
36 # lambda_response = client.invoke(
37 # FunctionName=function_name,
38 # Payload=json.dumps(function_params),
39 # LogType='Tail' if get_log else 'None')
40 # link_response = lambda_response['Payload'].read()
41 # link_response = link_response.strip()
42 # link_response = json.loads(link_response)
43 # response = link_response["response"]
44 # return response
45
46def shorten_url(long_url):
47 """
48 Shorts URL using TynyURL service
49
50 Args:
51 long_url (str): presigned url for report
52 Returns: short_url.
53 """
54 # bitly_access_token = ACCESS_TOKEN
55
56 try:
57 # shortener = pyshorteners.Shortener(api_key=bitly_access_token)
58 shortener = pyshorteners.Shortener()
59 short_url = shortener.tinyurl.short(long_url)
60 return short_url
61 except Exception as e:
62 print(f"Error shorting URL: {e} Returning long url")
63 return long_url
64
65def upload_files_s3(bucket, s3_path, local_path, file):
66 """
67 Uploads all files to s3 path defined previously
68
69 Args:
70 local_path (str): Root path where is saved files
71 files (str): File to upload
72 Returns: url, s3 path.
73 """
74 full_path = local_path+file
75 s3_full_path = s3_path+file
76 s3_client.upload_file(full_path, bucket, s3_full_path)
77 long_url = create_presigned_url(bucket, s3_path+file)
78 url = shorten_url(long_url)
79 return url, s3_full_path
create_presigned_url(bucket, object, expiration=86400)
upload_files_s3(bucket, s3_path, local_path, file)