From 50a5720c2906e2d0695759df670950cd1239f661 Mon Sep 17 00:00:00 2001 From: Ivan Date: Wed, 2 May 2018 18:52:33 -0500 Subject: [PATCH] Initial commit --- FuncionPublicaMirror.py | 311 ++++++++++++++++++++++++++++++++++++++++++++++++ luigi.cfg | 5 + 2 files changed, 316 insertions(+) create mode 100644 FuncionPublicaMirror.py create mode 100644 luigi.cfg diff --git a/FuncionPublicaMirror.py b/FuncionPublicaMirror.py new file mode 100644 index 0000000..026f155 --- /dev/null +++ b/FuncionPublicaMirror.py @@ -0,0 +1,311 @@ +import os +from os.path import join +import zipfile +import datetime +import re +from urllib.parse import urlparse +import shutil + +import xlrd +import luigi +import luigi.contrib.postgres +import requests +"""We mirror the public procurement Excel files of the "Función pública" +Ministry as a PostgreSQL table. + +This Luigi pipeline may be used to initialize, or to upgrade a mirror. The +pipeline proposes a straightforward "download, extract and ingest" process. + +The FuncionPublicaMirror class triggers the pipeline. This is a possible +commandline: +PYTHONPATH='.' /path/to/luigi --local-scheduler --module FuncionPublicaMirror FuncionPublicaMirror + +If bandwidth isn't too bad, the process should take no more than 45 minutes to +complete. +""" + + +class PgConfiguration(luigi.Config): + """Read configuration of Postgres connection.""" + host = luigi.Parameter() + db = luigi.Parameter() + user = luigi.Parameter() + password = luigi.Parameter() + + +class FuncionPublicaMirror(luigi.WrapperTask): + """Trigger the pipeline. We use the wrapper task pattern.""" + get_time = datetime.datetime.now() + date = luigi.DateSecondParameter(default=get_time) + date_string = get_time.strftime('%Y-%m-%dT%H%M%S') + files_download = luigi.Parameter( + default='/tmp/funcion-publica-files-' + date_string) + urls = luigi.ListParameter(default=[ + 'https://compranetinfo.funcionpublica.gob.mx/descargas/cnet/Contratos2010_2012.zip', + 'https://compranetinfo.funcionpublica.gob.mx/descargas/cnet/Contratos2013.zip', + 'https://compranetinfo.funcionpublica.gob.mx/descargas/cnet/Contratos2014.zip', + 'https://upcp.funcionpublica.gob.mx/descargas/Contratos2015.zip', + 'https://upcp.funcionpublica.gob.mx/descargas/Contratos2016.zip', + 'https://upcp.funcionpublica.gob.mx/descargas/Contratos2017.zip', + ]) + + def requires(self): + + yield Download( + date=self.date, urls=self.urls, files_download=self.files_download) + yield Unzip( + date=self.date, urls=self.urls, files_download=self.files_download) + yield DropOlderTable( + date=self.date, urls=self.urls, files_download=self.files_download) + yield ExcelToPostgres( + date=self.date, urls=self.urls, files_download=self.files_download) + yield DeleteFiles( + date=self.date, urls=self.urls, files_download=self.files_download) + + +class Download(luigi.Task): + """Download zipped Excel files that "Función pública" Ministry releases. + + The output target is a plain text list of the download url's + + This class is a straightforward use of the Requests module. + """ + date = luigi.DateSecondParameter() + urls = luigi.ListParameter() + files_download = luigi.Parameter() + + def run(self): + + if not os.path.exists(self.files_download): + os.makedirs(self.files_download) + + output = self.output().open('w') + + for url in self.urls: + + # XXX For now, the output target just contains some url's + output.write(url) + + r = requests.get(url, verify=False) + o = urlparse(url) + file_name = o.path.split('/')[-1] + local_file_path = self.files_download + '/' + file_name + with open(local_file_path, 'wb') as handle: + for data in r.iter_content(): + handle.write(data) + + output.close() + + def output(self): + return luigi.LocalTarget( + self.date.strftime( + '/tmp/funcion-publica-download-%Y-%m-%dT%H%M%S')) + + +class Unzip(luigi.Task): + """Extract Excel files. + + The output target is a plain text list of the zip files. + + This class is a straightforward use of the zipfile module. + """ + date = luigi.DateSecondParameter() + urls = luigi.ListParameter() + files_download = luigi.Parameter() + + def run(self): + + output = self.output().open('w') + is_zip_file = re.compile('.*zip$') + + for root, dirs, files in os.walk(self.files_download): + for myfile in files: + if is_zip_file.match(myfile): + z = zipfile.ZipFile(join(root, myfile)) + z.extractall(path=self.files_download) + # XXX For now, the output target just contains some file names + output.write(myfile) + + output.close() + + def output(self): + return luigi.LocalTarget( + self.date.strftime('/tmp/funcion-publica-unzip-%Y-%m-%dT%H%M%S')) + + def requires(self): + return Download( + date=self.date, urls=self.urls, files_download=self.files_download) + + +class DropOlderTable(luigi.contrib.postgres.PostgresQuery): + """If need be, drop previous version of mirror before upgrade.""" + host = PgConfiguration().host + database = PgConfiguration().db + user = PgConfiguration().user + password = PgConfiguration().password + + date = luigi.DateSecondParameter() + urls = luigi.ListParameter() + files_download = luigi.Parameter() + + table = "compranet.mirror" + query = "drop table if exists compranet.mirror;" + + @property + def update_id(self): + return self.date.strftime('remove-%Y-%m-%dT%H%M%S') + + def run(self): + connection = self.output().connect() + cursor = connection.cursor() + sql = self.query + + cursor.execute(sql) + + # Update marker table + self.output().touch(connection) + + # commit and close connection + connection.commit() + connection.close() + + def requires(self): + return Unzip( + date=self.date, urls=self.urls, files_download=self.files_download) + + +class ExcelToPostgres(luigi.contrib.postgres.CopyToTable): + """Write data from Excel files to a Postgres table. + + The set of Excel files lends itself to be modelled by a single Postgres + table. Their columns are our columns. + + The "rows" method override needs to take into account the fact that the + Excel file for the 2010 - 2012 period lacks the "identificador_cm" column. + The column is left empty for the period. + """ + host = PgConfiguration().host + database = PgConfiguration().db + user = PgConfiguration().user + password = PgConfiguration().password + table = "compranet.mirror" + + date = luigi.DateSecondParameter() + urls = luigi.ListParameter() + files_download = luigi.Parameter() + + # Documentations say: null_values = container of values that should be inserted as NULL values + null_values = [''] + + columns = [ + ("gobierno", "text"), + ("siglas", "text"), + ("dependencia", "text"), + ("claveuc", "text"), + ("nombre_de_la_uc", "text"), + ("responsable", "text"), + ("codigo_expediente", "text"), + ("titulo_expediente", "text"), + ("plantilla_expediente", "text"), + ("numero_procedimiento", "text"), + ("exp_f_fallo", "date"), + ("proc_f_publicacion", "date"), + ("fecha_apertura_proposiciones", "date"), + ("caracter", "text"), + ("tipo_contratacion", "text"), + ("tipo_procedimiento", "text"), + ("forma_procedimiento", "text"), + ("codigo_contrato", "text"), + ("titulo_contrato", "text"), + ("fecha_inicio", "date"), + ("fecha_fin", "date"), + ("importe_contrato", "numeric"), + ("moneda", "text"), + ("estatus_contrato", "text"), + ("archivado", "text"), + ("convenio_modificatorio", "text"), + ("ramo", "text"), + ("clave_programa", "text"), + ("aportacion_federal", "numeric"), + ("fecha_celebracion", "date"), + ("contrato_marco", "text"), + ("identificador_cm", "text"), + ("compra_consolidada", "text"), + ("plurianual", "text"), + ("clave_cartera_shcp", "text"), + ("estratificacion_muc", "text"), + ("folio_rupc", "text"), + ("proveedor_contratista", "text"), + ("estratificacion_mpc", "text"), + ("siglas_pais", "text"), + ("estatus_empresa", "text"), + ("cuenta_administrada_por", "text"), + ("c_externo", "text"), + ("organismo", "text"), + ("anuncio", "text"), + ] + + # https://luigi.readthedocs.io/en/stable/api/luigi.contrib.rdbms.html#luigi.contrib.rdbms.CopyToTable.update_id + @property + def update_id(self): + return self.date.strftime('insert-%Y-%m-%dT%H%M%S') + + def rows(self): + """Yield lists corresponding to each row to be inserted. + + For some reason, the 2010 - 2012 Excel file misses the 32nd column, + corresponding to the "identificador_cm" value. We insert empty strings + in its stead. + """ + ten_twelve_exception = re.compile( + '.*2010_2012.*') # Match the name of 2010-2012 Excel file + is_excel_file = re.compile('.*xlsx$') + date_string = self.date.strftime('%Y%m%d') + + for root, dirs, files in os.walk(self.files_download): + + for myfile in files: + + if is_excel_file.match(myfile): + wb = xlrd.open_workbook(join(root, myfile)) + sh = wb.sheet_by_index(0) + if ten_twelve_exception.match(myfile): + for rownum in range(1, sh.nrows): + row = sh.row_values(rownum)[:31] + [ + '' + ] + sh.row_values(rownum)[31:] + yield row + else: + for rownum in range(1, sh.nrows): + row = sh.row_values(rownum) + yield row + + def requires(self): + """The previous mirror should be dropped before inserting new data.""" + return DropOlderTable( + date=self.date, urls=self.urls, files_download=self.files_download) + + +class DeleteFiles(luigi.Task): + """Remove files that were downloaded and extracted.""" + date = luigi.DateSecondParameter() + urls = luigi.ListParameter() + files_download = luigi.Parameter() + + def run(self): + + shutil.rmtree(self.files_download) + + # XXX For now, the output target just contains a date + output = self.output().open('w') + output.write(self.date.strftime('%Y-%m-%dT%H%M%S')) + output.close() + + def output(self): + return luigi.LocalTarget( + self.date.strftime( + '/tmp/funcion-publica-delete-files-%Y-%m-%dT%H%M%S')) + + def requires(self): + return ExcelToPostgres( + date=self.date, urls=self.urls, files_download=self.files_download) diff --git a/luigi.cfg b/luigi.cfg new file mode 100644 index 0000000..99be678 --- /dev/null +++ b/luigi.cfg @@ -0,0 +1,5 @@ +[PgConfiguration] +host=myhost +db=mydb +user=myuser +password=mypassword -- 2.11.4.GIT