Glider
Loading...
Searching...
No Matches
IdentifyFormat.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2import os
3import boto3
4import json
5import chardet
6# import smart_open
7import ssl
8from pandas import read_excel, ExcelFile
9from pymongo import MongoClient
10from dotenv import load_dotenv
11from ReceivePath import receive_path
12from ErrorHandler import *
13from Excel_to_csv import pass_excel_to_csv
14from MongoConnection import mongo_connection
15import sys
16import traceback
17load_dotenv()
18
19
20s3_file_obj = None
21
22
23ENV = os.environ.get("ENVIRONMENT")
24print(ENV)
25
26
27ACCESS_ID, ACCESS_KEY = os.environ.get("AWS_KEY_ID"), os.environ.get("AWS_KEY_SECRET")
28
29
30mongo_conn = mongo_connection()
31
32collection = mongo_conn.mongo_conn_formats()
33
34snap_collection = mongo_conn.mongo_conn_snapshots()
35
36
37s3_client = boto3.client("s3", aws_access_key_id=ACCESS_ID, aws_secret_access_key= ACCESS_KEY)
38
39def download_obj(event, file):
40 """Receives file info and reads the firsts 1024 bytes to determinate the encoding
41
42 Args:
43 event (dict): is a dictionary with all client and sales information
44 file (str): current filename
45 Returns: s3_file (str)
46 charenc (str)
47 """
48 bucket = event["bucket"][0]
49 path = event["path"][0]
50 obj = s3_client.get_object(Bucket=bucket, Key=path+"/"+file, Range='bytes=0-1024')
51 s3_file = (obj['Body']._raw_stream).read()
52 charenc = chardet.detect(s3_file).get('encoding')
53 # s3_dir = f"s3://{bucket}/{path}/{file}"
54 if (charenc == "ascii") or (charenc == "Windows-1252") or (charenc == "Johab"):# or (charenc == "ISO-8859-1"):
55 charenc = "utf-8"
56 return s3_file, charenc
57
58def csvHeaders(event, file):
59 """Takes the s3_file content and decoding it to get headers
60
61 Args:
62 event (dict): is a dictionary with all client and sales information
63 file (str): current filename
64 Returns: headers (list)
65 """
66 WINDOWS_LINE_ENDING = '\r\n'
67 MAC_LINE_ENDING = '\r'
68 UNIX_LINE_ENDING = '\n'
69 global encoding
70 s3_file, encoding = download_obj(event, file)
71 headers = []
72 try:
73 s3_file = s3_file.decode(encoding)
74 headers = s3_file.splitlines()
75 # with smart_open.open(s3_dir, encoding=encoding) as f:
76 # headers = [next(f).strip("\n\r\r\n") for x in range(7)]
77 except:
78 print("File couldn't be loaded")
79 # with smart_open.open(s3_dir, encoding=encoding) as f:
80 # headers = [next(f).strip("\n\r\r\n") for x in range(2)]
81 if len(headers) > 1:
82 return headers
83
84def identifyHeaders(headers, collection):
85 """Searches format using headers and takes template information. Also builds a list with main features of each column
86
87 Args:
88 headers (list): contains the X firsts lines from current file
89 collection (mongo collection): Mongo collection where formats templates are storage
90 Returns: template_format (dict)
91 """
92
93 skip_rows = 0
94 # print(headers)
95 for header in headers:
96 template_format = collection.find_one({"header": header})
97 if template_format:# and template_format[0]:
98 print("FORMAT: {} \t VERSION: {} \t DELIMITER: {}".format(template_format["format"], template_format["version"], template_format["delimiters"]))
99 print("HEADER IDENTIFIED: {}".format(header))
100 ottoMapping_columns = cols_otto(json.loads(template_format["schema"]), json.loads(template_format["ottoMapping"]))
101 return template_format["format"], template_format["delimiters"], ottoMapping_columns, skip_rows
102 skip_rows+=1
103 raise Exception("No format identified")
104
105def type_schema(schema):
106 """Takes the schema field from database and gets the main features of each column
107
108 Args:
109 schema (list): contains the columns features from current file
110 collection (mongo collection): Mongo collection where formats templates are storage
111 Returns: template_format (dict)
112 """
113 schema = schema["parameters"]
114 values = dict()
115 flagsNull = dict()
116 mapper = {"StringType": "object", "IntegerType": "int64", "DoubleType": "float64", "DateType": "datetime64[us]"}
117 for i in schema:
118 values[i["structFieldTemplate"]["name"]] = mapper[i["structFieldTemplate"]["ftype"]]
119 flagsNull[i["structFieldTemplate"]["name"]] = i["structFieldTemplate"]["flagNull"]
120 return values, flagsNull
121
122def cols_otto(schema, ottoMapping):
123 """Builds the relation between main features of each column in the file and the mapping template
124
125 Args:
126 schema (list): contains the columns features from current file
127 ottoMapping (list): contains the columns features from the desired template
128 Returns: match (list)
129 """
130 otto_keys = ottoMapping['parameters']
131 dtypes, flags = type_schema(schema)
132 match = []
133 for i in otto_keys:
134 ottoMapping_row = i['mappingTemplate']
135 # print(ottoMapping_row)
136 if ottoMapping_row["ftype"] == "header":
137 ottoMapping_row["value"] = ottoMapping_row["value"]
138 ottoMapping_row["dtype"] = dtypes[ottoMapping_row["value"]]
139 ottoMapping_row["flagNull"] = flags[ottoMapping_row["value"]]
140 match.append(ottoMapping_row)
141 return match
142
143def identify_format(event, context=None):
144 """Executes full procedure filter csv, txt and xls files
145
146 Args:
147 event (dict): is a dictionary with all client and sales information
148 context (none): it's required just for lambda execution
149 Returns: (dict)
150 """
151 global encoding
152 event = receive_path(event, s3_client)
153 print("Status={}".format(event["status"]))
154 status = "OK"
155 not_identified = dict()
156 csv_formats = dict()
157 for files in event["files"]:
158 file = files["file"]
159 file_db_id = files["file_id"]
160 print(file)
161 file_extension = os.path.splitext(file)[1]
162 if "xls" in file_extension:
163 try:
164 s3_file_obj = s3_client.get_object(Bucket=event["bucket"][0], Key=event["path"][0]+'/'+file)
165 xls = ExcelFile(s3_file_obj['Body'].read())
166 pass_excel_to_csv(event, files, xls, s3_client)
167 except Exception as e:
168 print(e)
169 m = "{}\n{}".format(sys.exc_info()[2], traceback.format_exc())
170 not_identified[file_db_id] = {"file": file}
171 eh = ErrorHandler()
172 error = eh.handle(e, m, file_db_id)
173 status = error
174 print("Maybe there is a sheet not identified")
175 continue
176
177 else:
178 try:
179 csv_headers = csvHeaders(event, file)
180 formats, delimiter, otto_cols, skip_rows = identifyHeaders(csv_headers, collection)
181 if formats not in csv_formats:
182 csv_formats[formats] = {"files":[{"file_id":file_db_id, "file": file, "delimiter": delimiter, "skip_rows": skip_rows, "encoding":encoding}], "columns": otto_cols}
183 else:
184 csv_list = csv_formats[formats]["files"]
185 csv_list.append({"file_id":file_db_id, "file": file, "delimiter": delimiter, "skip_rows": skip_rows, "encoding":encoding})
186 except Exception as e:
187 print(e)
188 m = "{}\n{}".format(sys.exc_info()[2], traceback.format_exc())
189 # print(sys.exc_info()[2])
190 # print(traceback.format_exc())
191 eh = ErrorHandler()
192 error = eh.handle(e, m, file_db_id)
193 not_identified[file_db_id] = {"file":file, "error":error}
194 status = "OK"
195 print("File {} not identified".format(file))
196 continue
197 snap_collection.update_one({"file_db_id":file_db_id},{"$set":{"status":"identified"}})
198 event["status"] = "OK"
199 event["format"] = csv_formats
200 event["no_format_identified"] = not_identified
201 return event
202 # return {
203 # "status": "OK",
204 # "tag": event["tag"],
205 # "cat_gen": event["cat_gen"],
206 # "cat_match": event["cat_match"],
207 # "bucket": event["bucket"],
208 # "path": event["path"],
209 # "client_id": event["client_id"],
210 # "format": csv_formats,
211 # "no_format_identified": not_identified,
212 # "currency": event["currency"]
213 # }
214
cols_otto(schema, ottoMapping)
identify_format(event, context=None)
identifyHeaders(headers, collection)