Initial commit
[luigi_pipeline.git] / FuncionPublicaMirror.py
blob026f155cbb732400e1cb9f890b9e064a1c2a1751
1 import os
2 from os.path import join
3 import zipfile
4 import datetime
5 import re
6 from urllib.parse import urlparse
7 import shutil
9 import xlrd
10 import luigi
11 import luigi.contrib.postgres
12 import requests
13 """We mirror the public procurement Excel files of the "Función pública"
14 Ministry as a PostgreSQL table.
16 This Luigi pipeline may be used to initialize, or to upgrade a mirror. The
17 pipeline proposes a straightforward "download, extract and ingest" process.
19 The FuncionPublicaMirror class triggers the pipeline. This is a possible
20 commandline:
21 PYTHONPATH='.' /path/to/luigi --local-scheduler --module FuncionPublicaMirror FuncionPublicaMirror
23 If bandwidth isn't too bad, the process should take no more than 45 minutes to
24 complete.
25 """
28 class PgConfiguration(luigi.Config):
29 """Read configuration of Postgres connection."""
30 host = luigi.Parameter()
31 db = luigi.Parameter()
32 user = luigi.Parameter()
33 password = luigi.Parameter()
36 class FuncionPublicaMirror(luigi.WrapperTask):
37 """Trigger the pipeline. We use the wrapper task pattern."""
38 get_time = datetime.datetime.now()
39 date = luigi.DateSecondParameter(default=get_time)
40 date_string = get_time.strftime('%Y-%m-%dT%H%M%S')
41 files_download = luigi.Parameter(
42 default='/tmp/funcion-publica-files-' + date_string)
43 urls = luigi.ListParameter(default=[
44 'https://compranetinfo.funcionpublica.gob.mx/descargas/cnet/Contratos2010_2012.zip',
45 'https://compranetinfo.funcionpublica.gob.mx/descargas/cnet/Contratos2013.zip',
46 'https://compranetinfo.funcionpublica.gob.mx/descargas/cnet/Contratos2014.zip',
47 'https://upcp.funcionpublica.gob.mx/descargas/Contratos2015.zip',
48 'https://upcp.funcionpublica.gob.mx/descargas/Contratos2016.zip',
49 'https://upcp.funcionpublica.gob.mx/descargas/Contratos2017.zip',
52 def requires(self):
54 yield Download(
55 date=self.date, urls=self.urls, files_download=self.files_download)
56 yield Unzip(
57 date=self.date, urls=self.urls, files_download=self.files_download)
58 yield DropOlderTable(
59 date=self.date, urls=self.urls, files_download=self.files_download)
60 yield ExcelToPostgres(
61 date=self.date, urls=self.urls, files_download=self.files_download)
62 yield DeleteFiles(
63 date=self.date, urls=self.urls, files_download=self.files_download)
66 class Download(luigi.Task):
67 """Download zipped Excel files that "Función pública" Ministry releases.
69 The output target is a plain text list of the download url's
71 This class is a straightforward use of the Requests module.
72 """
73 date = luigi.DateSecondParameter()
74 urls = luigi.ListParameter()
75 files_download = luigi.Parameter()
77 def run(self):
79 if not os.path.exists(self.files_download):
80 os.makedirs(self.files_download)
82 output = self.output().open('w')
84 for url in self.urls:
86 # XXX For now, the output target just contains some url's
87 output.write(url)
89 r = requests.get(url, verify=False)
90 o = urlparse(url)
91 file_name = o.path.split('/')[-1]
92 local_file_path = self.files_download + '/' + file_name
93 with open(local_file_path, 'wb') as handle:
94 for data in r.iter_content():
95 handle.write(data)
97 output.close()
99 def output(self):
100 return luigi.LocalTarget(
101 self.date.strftime(
102 '/tmp/funcion-publica-download-%Y-%m-%dT%H%M%S'))
105 class Unzip(luigi.Task):
106 """Extract Excel files.
108 The output target is a plain text list of the zip files.
110 This class is a straightforward use of the zipfile module.
112 date = luigi.DateSecondParameter()
113 urls = luigi.ListParameter()
114 files_download = luigi.Parameter()
116 def run(self):
118 output = self.output().open('w')
119 is_zip_file = re.compile('.*zip$')
121 for root, dirs, files in os.walk(self.files_download):
122 for myfile in files:
123 if is_zip_file.match(myfile):
124 z = zipfile.ZipFile(join(root, myfile))
125 z.extractall(path=self.files_download)
126 # XXX For now, the output target just contains some file names
127 output.write(myfile)
129 output.close()
131 def output(self):
132 return luigi.LocalTarget(
133 self.date.strftime('/tmp/funcion-publica-unzip-%Y-%m-%dT%H%M%S'))
135 def requires(self):
136 return Download(
137 date=self.date, urls=self.urls, files_download=self.files_download)
140 class DropOlderTable(luigi.contrib.postgres.PostgresQuery):
141 """If need be, drop previous version of mirror before upgrade."""
142 host = PgConfiguration().host
143 database = PgConfiguration().db
144 user = PgConfiguration().user
145 password = PgConfiguration().password
147 date = luigi.DateSecondParameter()
148 urls = luigi.ListParameter()
149 files_download = luigi.Parameter()
151 table = "compranet.mirror"
152 query = "drop table if exists compranet.mirror;"
154 @property
155 def update_id(self):
156 return self.date.strftime('remove-%Y-%m-%dT%H%M%S')
158 def run(self):
159 connection = self.output().connect()
160 cursor = connection.cursor()
161 sql = self.query
163 cursor.execute(sql)
165 # Update marker table
166 self.output().touch(connection)
168 # commit and close connection
169 connection.commit()
170 connection.close()
172 def requires(self):
173 return Unzip(
174 date=self.date, urls=self.urls, files_download=self.files_download)
177 class ExcelToPostgres(luigi.contrib.postgres.CopyToTable):
178 """Write data from Excel files to a Postgres table.
180 The set of Excel files lends itself to be modelled by a single Postgres
181 table. Their columns are our columns.
183 The "rows" method override needs to take into account the fact that the
184 Excel file for the 2010 - 2012 period lacks the "identificador_cm" column.
185 The column is left empty for the period.
187 host = PgConfiguration().host
188 database = PgConfiguration().db
189 user = PgConfiguration().user
190 password = PgConfiguration().password
191 table = "compranet.mirror"
193 date = luigi.DateSecondParameter()
194 urls = luigi.ListParameter()
195 files_download = luigi.Parameter()
197 # Documentations say: null_values = container of values that should be inserted as NULL values
198 null_values = ['']
200 columns = [
201 ("gobierno", "text"),
202 ("siglas", "text"),
203 ("dependencia", "text"),
204 ("claveuc", "text"),
205 ("nombre_de_la_uc", "text"),
206 ("responsable", "text"),
207 ("codigo_expediente", "text"),
208 ("titulo_expediente", "text"),
209 ("plantilla_expediente", "text"),
210 ("numero_procedimiento", "text"),
211 ("exp_f_fallo", "date"),
212 ("proc_f_publicacion", "date"),
213 ("fecha_apertura_proposiciones", "date"),
214 ("caracter", "text"),
215 ("tipo_contratacion", "text"),
216 ("tipo_procedimiento", "text"),
217 ("forma_procedimiento", "text"),
218 ("codigo_contrato", "text"),
219 ("titulo_contrato", "text"),
220 ("fecha_inicio", "date"),
221 ("fecha_fin", "date"),
222 ("importe_contrato", "numeric"),
223 ("moneda", "text"),
224 ("estatus_contrato", "text"),
225 ("archivado", "text"),
226 ("convenio_modificatorio", "text"),
227 ("ramo", "text"),
228 ("clave_programa", "text"),
229 ("aportacion_federal", "numeric"),
230 ("fecha_celebracion", "date"),
231 ("contrato_marco", "text"),
232 ("identificador_cm", "text"),
233 ("compra_consolidada", "text"),
234 ("plurianual", "text"),
235 ("clave_cartera_shcp", "text"),
236 ("estratificacion_muc", "text"),
237 ("folio_rupc", "text"),
238 ("proveedor_contratista", "text"),
239 ("estratificacion_mpc", "text"),
240 ("siglas_pais", "text"),
241 ("estatus_empresa", "text"),
242 ("cuenta_administrada_por", "text"),
243 ("c_externo", "text"),
244 ("organismo", "text"),
245 ("anuncio", "text"),
248 # https://luigi.readthedocs.io/en/stable/api/luigi.contrib.rdbms.html#luigi.contrib.rdbms.CopyToTable.update_id
249 @property
250 def update_id(self):
251 return self.date.strftime('insert-%Y-%m-%dT%H%M%S')
253 def rows(self):
254 """Yield lists corresponding to each row to be inserted.
256 For some reason, the 2010 - 2012 Excel file misses the 32nd column,
257 corresponding to the "identificador_cm" value. We insert empty strings
258 in its stead.
260 ten_twelve_exception = re.compile(
261 '.*2010_2012.*') # Match the name of 2010-2012 Excel file
262 is_excel_file = re.compile('.*xlsx$')
263 date_string = self.date.strftime('%Y%m%d')
265 for root, dirs, files in os.walk(self.files_download):
267 for myfile in files:
269 if is_excel_file.match(myfile):
270 wb = xlrd.open_workbook(join(root, myfile))
271 sh = wb.sheet_by_index(0)
272 if ten_twelve_exception.match(myfile):
273 for rownum in range(1, sh.nrows):
274 row = sh.row_values(rownum)[:31] + [
276 ] + sh.row_values(rownum)[31:]
277 yield row
278 else:
279 for rownum in range(1, sh.nrows):
280 row = sh.row_values(rownum)
281 yield row
283 def requires(self):
284 """The previous mirror should be dropped before inserting new data."""
285 return DropOlderTable(
286 date=self.date, urls=self.urls, files_download=self.files_download)
289 class DeleteFiles(luigi.Task):
290 """Remove files that were downloaded and extracted."""
291 date = luigi.DateSecondParameter()
292 urls = luigi.ListParameter()
293 files_download = luigi.Parameter()
295 def run(self):
297 shutil.rmtree(self.files_download)
299 # XXX For now, the output target just contains a date
300 output = self.output().open('w')
301 output.write(self.date.strftime('%Y-%m-%dT%H%M%S'))
302 output.close()
304 def output(self):
305 return luigi.LocalTarget(
306 self.date.strftime(
307 '/tmp/funcion-publica-delete-files-%Y-%m-%dT%H%M%S'))
309 def requires(self):
310 return ExcelToPostgres(
311 date=self.date, urls=self.urls, files_download=self.files_download)