From 6cf0ac13d6396916e6c6e8ec2fb22418c107e1dd Mon Sep 17 00:00:00 2001 From: Roberto Congiu Date: Thu, 18 Feb 2010 19:03:43 +0000 Subject: [PATCH] initial sourceforge import git-svn-id: https://lwes.svn.sourceforge.net/svnroot/lwes/contrib/lwes-hive-serde/trunk@401 a2f82657-cdd2-4550-bd36-68a8e7111808 --- AUTHORS | 1 + LICENSE | 280 ++++++++++++ README | 78 ++++ pom.xml | 92 ++++ src/main/java/org/lwes/hadoop/EventWritable.java | 111 +++++ src/main/java/org/lwes/hadoop/hive/EventSerDe.java | 485 +++++++++++++++++++++ .../lwes/hadoop/io/DatagramPacketInputStream.java | 154 +++++++ .../lwes/hadoop/io/DatagramPacketOutputStream.java | 120 +++++ .../org/lwes/hadoop/io/JournalInputFormat.java | 164 +++++++ .../org/lwes/hadoop/io/JournalOutputFormat.java | 122 ++++++ .../org/openx/data/hive/journalserde/AppTest.java | 38 ++ 11 files changed, 1645 insertions(+) create mode 100644 AUTHORS create mode 100644 LICENSE create mode 100644 README create mode 100644 pom.xml create mode 100644 src/main/java/org/lwes/hadoop/EventWritable.java create mode 100644 src/main/java/org/lwes/hadoop/hive/EventSerDe.java create mode 100644 src/main/java/org/lwes/hadoop/io/DatagramPacketInputStream.java create mode 100644 src/main/java/org/lwes/hadoop/io/DatagramPacketOutputStream.java create mode 100644 src/main/java/org/lwes/hadoop/io/JournalInputFormat.java create mode 100644 src/main/java/org/lwes/hadoop/io/JournalOutputFormat.java create mode 100644 src/test/java/org/openx/data/hive/journalserde/AppTest.java diff --git a/AUTHORS b/AUTHORS new file mode 100644 index 0000000..38b9aeb --- /dev/null +++ b/AUTHORS @@ -0,0 +1 @@ +Roberto Congiu diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..a5a1524 --- /dev/null +++ b/LICENSE @@ -0,0 +1,280 @@ + GNU GENERAL PUBLIC LICENSE + Version 2, June 1991 + + Copyright (C) 1989, 1991 Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The licenses for most software are designed to take away your +freedom to share and change it. By contrast, the GNU General Public +License is intended to guarantee your freedom to share and change free +software--to make sure the software is free for all its users. This +General Public License applies to most of the Free Software +Foundation's software and to any other program whose authors commit to +using it. (Some other Free Software Foundation software is covered by +the GNU Lesser General Public License instead.) You can apply it to +your programs, too. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +this service if you wish), that you receive source code or can get it +if you want it, that you can change the software or use pieces of it +in new free programs; and that you know you can do these things. + + To protect your rights, we need to make restrictions that forbid +anyone to deny you these rights or to ask you to surrender the rights. +These restrictions translate to certain responsibilities for you if you +distribute copies of the software, or if you modify it. + + For example, if you distribute copies of such a program, whether +gratis or for a fee, you must give the recipients all the rights that +you have. You must make sure that they, too, receive or can get the +source code. And you must show them these terms so they know their +rights. + + We protect your rights with two steps: (1) copyright the software, and +(2) offer you this license which gives you legal permission to copy, +distribute and/or modify the software. + + Also, for each author's protection and ours, we want to make certain +that everyone understands that there is no warranty for this free +software. If the software is modified by someone else and passed on, we +want its recipients to know that what they have is not the original, so +that any problems introduced by others will not reflect on the original +authors' reputations. + + Finally, any free program is threatened constantly by software +patents. We wish to avoid the danger that redistributors of a free +program will individually obtain patent licenses, in effect making the +program proprietary. To prevent this, we have made it clear that any +patent must be licensed for everyone's free use or not licensed at all. + + The precise terms and conditions for copying, distribution and +modification follow. + + GNU GENERAL PUBLIC LICENSE + TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. This License applies to any program or other work which contains +a notice placed by the copyright holder saying it may be distributed +under the terms of this General Public License. The "Program", below, +refers to any such program or work, and a "work based on the Program" +means either the Program or any derivative work under copyright law: +that is to say, a work containing the Program or a portion of it, +either verbatim or with modifications and/or translated into another +language. (Hereinafter, translation is included without limitation in +the term "modification".) Each licensee is addressed as "you". + +Activities other than copying, distribution and modification are not +covered by this License; they are outside its scope. The act of +running the Program is not restricted, and the output from the Program +is covered only if its contents constitute a work based on the +Program (independent of having been made by running the Program). +Whether that is true depends on what the Program does. + + 1. You may copy and distribute verbatim copies of the Program's +source code as you receive it, in any medium, provided that you +conspicuously and appropriately publish on each copy an appropriate +copyright notice and disclaimer of warranty; keep intact all the +notices that refer to this License and to the absence of any warranty; +and give any other recipients of the Program a copy of this License +along with the Program. + +You may charge a fee for the physical act of transferring a copy, and +you may at your option offer warranty protection in exchange for a fee. + + 2. You may modify your copy or copies of the Program or any portion +of it, thus forming a work based on the Program, and copy and +distribute such modifications or work under the terms of Section 1 +above, provided that you also meet all of these conditions: + + a) You must cause the modified files to carry prominent notices + stating that you changed the files and the date of any change. + + b) You must cause any work that you distribute or publish, that in + whole or in part contains or is derived from the Program or any + part thereof, to be licensed as a whole at no charge to all third + parties under the terms of this License. + + c) If the modified program normally reads commands interactively + when run, you must cause it, when started running for such + interactive use in the most ordinary way, to print or display an + announcement including an appropriate copyright notice and a + notice that there is no warranty (or else, saying that you provide + a warranty) and that users may redistribute the program under + these conditions, and telling the user how to view a copy of this + License. (Exception: if the Program itself is interactive but + does not normally print such an announcement, your work based on + the Program is not required to print an announcement.) + +These requirements apply to the modified work as a whole. If +identifiable sections of that work are not derived from the Program, +and can be reasonably considered independent and separate works in +themselves, then this License, and its terms, do not apply to those +sections when you distribute them as separate works. But when you +distribute the same sections as part of a whole which is a work based +on the Program, the distribution of the whole must be on the terms of +this License, whose permissions for other licensees extend to the +entire whole, and thus to each and every part regardless of who wrote it. + +Thus, it is not the intent of this section to claim rights or contest +your rights to work written entirely by you; rather, the intent is to +exercise the right to control the distribution of derivative or +collective works based on the Program. + +In addition, mere aggregation of another work not based on the Program +with the Program (or with a work based on the Program) on a volume of +a storage or distribution medium does not bring the other work under +the scope of this License. + + 3. You may copy and distribute the Program (or a work based on it, +under Section 2) in object code or executable form under the terms of +Sections 1 and 2 above provided that you also do one of the following: + + a) Accompany it with the complete corresponding machine-readable + source code, which must be distributed under the terms of Sections + 1 and 2 above on a medium customarily used for software interchange; or, + + b) Accompany it with a written offer, valid for at least three + years, to give any third party, for a charge no more than your + cost of physically performing source distribution, a complete + machine-readable copy of the corresponding source code, to be + distributed under the terms of Sections 1 and 2 above on a medium + customarily used for software interchange; or, + + c) Accompany it with the information you received as to the offer + to distribute corresponding source code. (This alternative is + allowed only for noncommercial distribution and only if you + received the program in object code or executable form with such + an offer, in accord with Subsection b above.) + +The source code for a work means the preferred form of the work for +making modifications to it. For an executable work, complete source +code means all the source code for all modules it contains, plus any +associated interface definition files, plus the scripts used to +control compilation and installation of the executable. However, as a +special exception, the source code distributed need not include +anything that is normally distributed (in either source or binary +form) with the major components (compiler, kernel, and so on) of the +operating system on which the executable runs, unless that component +itself accompanies the executable. + +If distribution of executable or object code is made by offering +access to copy from a designated place, then offering equivalent +access to copy the source code from the same place counts as +distribution of the source code, even though third parties are not +compelled to copy the source along with the object code. + + 4. You may not copy, modify, sublicense, or distribute the Program +except as expressly provided under this License. Any attempt +otherwise to copy, modify, sublicense or distribute the Program is +void, and will automatically terminate your rights under this License. +However, parties who have received copies, or rights, from you under +this License will not have their licenses terminated so long as such +parties remain in full compliance. + + 5. You are not required to accept this License, since you have not +signed it. However, nothing else grants you permission to modify or +distribute the Program or its derivative works. These actions are +prohibited by law if you do not accept this License. Therefore, by +modifying or distributing the Program (or any work based on the +Program), you indicate your acceptance of this License to do so, and +all its terms and conditions for copying, distributing or modifying +the Program or works based on it. + + 6. Each time you redistribute the Program (or any work based on the +Program), the recipient automatically receives a license from the +original licensor to copy, distribute or modify the Program subject to +these terms and conditions. You may not impose any further +restrictions on the recipients' exercise of the rights granted herein. +You are not responsible for enforcing compliance by third parties to +this License. + + 7. If, as a consequence of a court judgment or allegation of patent +infringement or for any other reason (not limited to patent issues), +conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot +distribute so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you +may not distribute the Program at all. For example, if a patent +license would not permit royalty-free redistribution of the Program by +all those who receive copies directly or indirectly through you, then +the only way you could satisfy both it and this License would be to +refrain entirely from distribution of the Program. + +If any portion of this section is held invalid or unenforceable under +any particular circumstance, the balance of the section is intended to +apply and the section as a whole is intended to apply in other +circumstances. + +It is not the purpose of this section to induce you to infringe any +patents or other property right claims or to contest validity of any +such claims; this section has the sole purpose of protecting the +integrity of the free software distribution system, which is +implemented by public license practices. Many people have made +generous contributions to the wide range of software distributed +through that system in reliance on consistent application of that +system; it is up to the author/donor to decide if he or she is willing +to distribute software through any other system and a licensee cannot +impose that choice. + +This section is intended to make thoroughly clear what is believed to +be a consequence of the rest of this License. + + 8. If the distribution and/or use of the Program is restricted in +certain countries either by patents or by copyrighted interfaces, the +original copyright holder who places the Program under this License +may add an explicit geographical distribution limitation excluding +those countries, so that distribution is permitted only in or among +countries not thus excluded. In such case, this License incorporates +the limitation as if written in the body of this License. + + 9. The Free Software Foundation may publish revised and/or new versions +of the General Public License from time to time. Such new versions will +be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + +Each version is given a distinguishing version number. If the Program +specifies a version number of this License which applies to it and "any +later version", you have the option of following the terms and conditions +either of that version or of any later version published by the Free +Software Foundation. If the Program does not specify a version number of +this License, you may choose any version ever published by the Free Software +Foundation. + + 10. If you wish to incorporate parts of the Program into other free +programs whose distribution conditions are different, write to the author +to ask for permission. For software which is copyrighted by the Free +Software Foundation, write to the Free Software Foundation; we sometimes +make exceptions for this. Our decision will be guided by the two goals +of preserving the free status of all derivatives of our free software and +of promoting the sharing and reuse of software generally. + + NO WARRANTY + + 11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY +FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN +OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES +PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED +OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS +TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE +PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, +REPAIR OR CORRECTION. + + 12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR +REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, +INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING +OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED +TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY +YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER +PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + + diff --git a/README b/README new file mode 100644 index 0000000..a9f2181 --- /dev/null +++ b/README @@ -0,0 +1,78 @@ +LWES Journal File SerDe README + +*** +In order to read journal files from Hive, a SerDe (Serialize/Deserializer) +is needed, to map Hive columns to LWES attributes. + +*** +Prerequisites +- JDK 1.6.x (http://java.sun.com/) +- Maven 2.2.x (http://apache.maven.org/) + +*** +How to build +% mvn clean package + +*** +How to install + +Hive looks for extensions in a directory defined in the environment +variable HIVE_AUX_JARS_PATH. +If that variable is not defined, set it to a directory of your choice +Copy JournalSerDe-x.x.x.jar into that directory and launch hive + +*** +Creating tables + + +This is an example of table creation. +Just one event type is currently allowed per table. +The SerDe will automatically map a lwes attribute to the correspondent +hive column with the same name. Unfortunately, lwes attributes are case +sensitive while hive columns are not; you may also want a hive column +with a different name from the lwes attribute. In either case, you can +change the attribute/column mapping with serde properties as shown below: +the column sender_ip is mapped to the lwes attribute 'SenderIP'. +Classes for input/output are +INPUTFORMAT 'org.lwes.hadoop.io.JournalInputFormat' +OUTPUTFORMAT 'org.lwes.hadoop.io.JournalOutputFormat' + + +CREATE TABLE mrkt_auction_complete_hourly ( + a_bid string, + a_price string, + a_act_id bigint, + ...... + x_revenue string + ) +PARTITIONED BY(dt STRING) + ROW FORMAT SERDE 'org.lwes.hadoop.hive.EventSerDe' +WITH SERDEPROPERTIES ( + 'lwes.event_name'='Auction::Complete', + 'sender_ip'='SenderIP', + 'sender_port'='SenderPort', + 'receipt_time'='ReceiptTime', + 'site_id'='SiteID') + STORED AS + INPUTFORMAT 'org.lwes.hadoop.io.JournalInputFormat' + OUTPUTFORMAT 'org.lwes.hadoop.io.JournalOutputFormat' + ; + + +Also, lwes does not support FLOAT nor DOUBLE but hive does. +You can have define those columns as float/double and the serde +will convert its values according to Float.parseFloat(String) and +Double.parseDouble(String). + +I also built a tool to create table definitions from the ESF file +and will post it too to sourceforge. + +*** +Limitations + +Since LWES is basically a key/value format, it does not support nested +columns so arrays and hashes are for now not allowed in a hive table that +uses this SerDe + + + diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..424e5a2 --- /dev/null +++ b/pom.xml @@ -0,0 +1,92 @@ + + 4.0.0 + org.openx.data.hive + JournalSerDe + jar + 1.0.1 + Open Source SerDe + http://maven.apache.org + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.0.2 + + 1.6 + 1.6 + + + + + + + junit + junit + 3.8.1 + test + + + hadoop + hadoop-core + 0.19.2 + + + commons-logging + commons-logging + 1.1 + + + avalon-framework + avalon-framework + + + servlet-api + javax.servlet + + + logkit + logkit + + + + + org.lwes + lwes-java + 0.2.0 + + + org.lwes + journaller-java + 0.0.6 + + + hive + hive-serde + 0.6.0_trunk + + + hive + hive-exec + 0.6.0_trunk + + + + + + + + + + + + + + + + + + + + diff --git a/src/main/java/org/lwes/hadoop/EventWritable.java b/src/main/java/org/lwes/hadoop/EventWritable.java new file mode 100644 index 0000000..cfb8dd6 --- /dev/null +++ b/src/main/java/org/lwes/hadoop/EventWritable.java @@ -0,0 +1,111 @@ +package org.lwes.hadoop; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.io.WritableComparable; +import org.lwes.Event; +import org.lwes.EventSystemException; +import org.lwes.NoSuchAttributeException; + +/** + * Writable wrapper around LWES events. + * + * @author rcongiu + */ + +public class EventWritable implements WritableComparable { + private Event event; + + public EventWritable() { + } + + public EventWritable(Event event) { + setEvent(event); + } + + public Event getEvent() { + return event; + } + + public void setEvent(Event event) { + this.event = event; + } + + @Override + public void readFields(DataInput in) throws IOException{ + final int length = in.readInt(); + final byte[] bytes = new byte[length]; + in.readFully(bytes); + try { + setEvent(new Event(bytes, false, null)); + } catch (EventSystemException ex) { + throw new IOException("EventSystemException", ex); + } + } + + @Override + public void write(DataOutput out) throws IOException { + final byte[] bytes = event.serialize(); + out.writeInt(bytes.length); + out.write(bytes); + } + + @Override + public int compareTo(EventWritable o) { + Thread.dumpStack(); + return compare(getEvent(), o.getEvent()); + } + + @SuppressWarnings("unchecked") + private static int compare(Event a, Event b) { + int comp = a.getEventName().compareTo(b.getEventName()); + if (comp!=0) return comp; + comp = a.size()-b.size(); + if (comp!=0) return comp; + final Set anames = getAttributeNames(a), bnames = getAttributeNames(b); + // Compare attribute names + for (Iterator ai = anames.iterator(), bi = bnames.iterator(); ai.hasNext() + ||bi.hasNext();) { + final String aa = (ai.hasNext() ? ai.next() : ""), ba = (bi.hasNext() ? bi.next() : ""); + comp = aa.compareTo(ba); + if (comp!=0) return comp; + } + // Compare attribute values + for (String name : anames) { + try { + final Object av = a.get(name), bv = b.get(name); + if (av==null&&bv==null) continue; + if (av==null) return -1; + if (bv==null) return 1; + if ((av instanceof Comparable)&&(bv instanceof Comparable)) { + comp = ((Comparable) av).compareTo(bv); + if (comp!=0) return comp; + } else { + throw new IllegalArgumentException("Field "+name+" was not Comparable:\n\t"+a+"\n\t"+b); + } + } catch (NoSuchAttributeException e) { + throw new IllegalArgumentException("Field "+name+" seemed to disappear!\n\t"+a+"\n\t"+b); + } + } + return 0; + } + + @SuppressWarnings("unchecked") + private static Set getAttributeNames(Event event) { + final TreeSet names = new TreeSet(); + for (Enumeration e = event.getEventAttributeNames(); e.hasMoreElements();) + names.add(e.nextElement()); + return names; + } + + @Override + public String toString() { + return event==null ? "null" : event.toString(); + } +} \ No newline at end of file diff --git a/src/main/java/org/lwes/hadoop/hive/EventSerDe.java b/src/main/java/org/lwes/hadoop/hive/EventSerDe.java new file mode 100644 index 0000000..d8c3cc9 --- /dev/null +++ b/src/main/java/org/lwes/hadoop/hive/EventSerDe.java @@ -0,0 +1,485 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package org.lwes.hadoop.hive; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.*; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.objectinspector.*; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.*; +import org.apache.hadoop.hive.serde2.typeinfo.*; +import org.apache.hadoop.io.*; +import org.lwes.Event; +import org.lwes.EventSystemException; +import org.lwes.NoSuchAttributeException; +import org.lwes.hadoop.EventWritable; + +/** + * Adapter to use Journal Files in Hive. + * Journal files may either be stored as a stream of EventWritable or + * EventListWritable, SerDe will detect that through introspection. + * + * SerDe maps hive columns to event fields. Since hive columns are lowercase + * but event fields may be uppercase, or we may want to call the hive column + * with a name more descriptive than the event field, mapping can be provided + * when creating the table, or later, using SERDEPROPERTIES setting + * 'hive-column-name' = 'LWES-field-name'. If mapping is not provided, + * SerDe will assume that the field name is the same as the column name. + * + * When using EventWritable, the event type in the file must be specified + * using (example) + * 'lwes.event_name'='Impression::Confirmed' + * + * When using a EventListWritable based journal, the event name must also + * be specified in the column mapping, so one mapping would be + * + * 'sender_ip'='Impression::Confirmed::SenderIP' + * + * @author rcongiu + */ +public class EventSerDe implements SerDe { + + public static final Log LOG = LogFactory.getLog(EventSerDe.class.getName()); + List columnNames; + List columnTypes; + TypeInfo rowTypeInfo; + ObjectInspector rowObjectInspector; + boolean[] columnSortOrderIsDesc; + // holds the results of deserialization + ArrayList row; + Map> fieldsForEventName = + new HashMap>(); + String allEventName; // in case file has one type of event only + + /** + * Prepares for serialization/deserialization, collecting the column + * names and mapping them to the LWES attributes they refer to. + * + * @param conf + * @param tbl + * @throws SerDeException + */ + @Override + public void initialize(Configuration conf, Properties tbl) + throws SerDeException { + LOG.debug("initialize, logging to " + EventSerDe.class.getName()); + + // Get column names and sort order + String columnNameProperty = tbl.getProperty("columns"); + String columnTypeProperty = tbl.getProperty("columns.types"); + if (columnNameProperty.length() == 0) { + columnNames = new ArrayList(); + } else { + columnNames = Arrays.asList(columnNameProperty.split(",")); + } + if (columnTypeProperty.length() == 0) { + columnTypes = new ArrayList(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + } + assert (columnNames.size() == columnTypes.size()); + + for (String s : tbl.stringPropertyNames()) { + LOG.debug("Property: " + s + " value " + tbl.getProperty(s)); + } + + if (tbl.containsKey("lwes.event_name")) { + allEventName = tbl.getProperty("lwes.event_name"); + } + + // Create row related objects + rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); + //rowObjectInspector = (StructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo); + rowObjectInspector = (StructObjectInspector) TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(rowTypeInfo); + row = new ArrayList(columnNames.size()); + + for (int i = 0; i < columnNames.size(); i++) { + row.add(null); + } + + // Get the sort order + String columnSortOrder = tbl.getProperty(Constants.SERIALIZATION_SORT_ORDER); + columnSortOrderIsDesc = new boolean[columnNames.size()]; + for (int i = 0; i < columnSortOrderIsDesc.length; i++) { + columnSortOrderIsDesc[i] = (columnSortOrder != null && columnSortOrder.charAt(i) == '-'); + } + + // take each column and find what it maps to into the event list + int colNumber = 0; + for (String columnName : columnNames) { + String fieldName; + String eventName; + + if (!tbl.containsKey(columnName) && allEventName == null) { + LOG.warn("Column " + columnName + + " is not mapped to an eventName:field through SerDe Properties"); + continue; + } else if (allEventName != null) { + // no key, but in a single-type event file + eventName = allEventName; + fieldName = columnName; + } else { + // if key is there + String fullEventField = tbl.getProperty(columnName); + String[] parts = fullEventField.split("::"); + + // we are renaming the column + if (parts.length < 1 || (parts.length == 1 && allEventName != null)) { + LOG.warn("Malformed EventName::Field " + fullEventField); + continue; + } else if (parts.length == 1 && allEventName != null) { + // adds the name. We're not specifying the event. + fieldName = parts[0]; + eventName = allEventName; + } else { + fieldName = parts[parts.length - 1]; + eventName = fullEventField.substring(0, fullEventField.length() - 2 - fieldName.length()); + } + LOG.debug("Mapping " + columnName + " to EventName " + eventName + + ", field " + fieldName); + } + + if (!fieldsForEventName.containsKey(eventName)) { + fieldsForEventName.put(eventName, new LinkedList()); + } + + fieldsForEventName.get(eventName).add(new FieldAndPosition(fieldName, colNumber)); + colNumber++; + } + + + + } + + /** + * Deserializes a single event, storing its fields into the row object + * @param ev + */ + @Override + public Object deserialize(Writable w) throws SerDeException { + byte[] bytes = null; + + LOG.debug("Deserialize called."); + + Event ev = null; + try { + if (w instanceof BytesWritable) { + BytesWritable b = (BytesWritable) w; + ev = new Event(b.getBytes(), false, null); + } else if (w instanceof EventWritable) { + EventWritable ew = (EventWritable) w; + ev = ew.getEvent(); + } else { + throw new SerDeException(getClass().toString() + + ": expects either BytesWritable or Text object!"); + } + LOG.debug("Got bytes: " + bytes); + } catch (EventSystemException ex) { + throw new SerDeException(ex); + } + + if (this.fieldsForEventName.containsKey(ev.getEventName())) { + for (FieldAndPosition fp : fieldsForEventName.get(ev.getEventName())) { + + TypeInfo type = columnTypes.get(fp.getPosition()); + + LOG.debug("Deserializing " + columnNames.get(fp.getPosition())); + try { + row.set(fp.getPosition(), + deserialize_column(ev, type, fp, + row.get(fp.getPosition()))); // reusable object + } catch (IOException ex) { + row.set(fp.getPosition(), null); + Logger.getLogger(EventSerDe.class.getName()).log(Level.SEVERE, null, ex); + continue; + } + } + } + return row; + } + + /** + * Takes the parsed event, and maps its content to columns. + * @param ev + * @param type + * @param invert + * @param reuse + * @return + * @throws IOException + */ + static Object deserialize_column(Event ev, TypeInfo type, + FieldAndPosition fp, Object reuse) throws IOException { + LOG.debug("Deserializing column " + fp.getField()); + // Is this field a null? + String fieldName = fp.getField(); + + // if field is not there or is not set, return null. + // isSet doesn't throw AttributeNotExists exception. + if (!ev.isSet(fieldName)) { + return null; + } + try { + switch (type.getCategory()) { + case PRIMITIVE: { + PrimitiveTypeInfo ptype = (PrimitiveTypeInfo) type; + switch (ptype.getPrimitiveCategory()) { + case VOID: { + return null; + } + case BOOLEAN: { + BooleanWritable r = reuse == null ? new BooleanWritable() : (BooleanWritable) reuse; + r.set(ev.getBoolean(fieldName)); + return r; + } + case BYTE: { + throw new IOException("BYTE not supported. Use int instead"); + } + case SHORT: { + ShortWritable r = reuse == null ? new ShortWritable() : (ShortWritable) reuse; + r.set(ev.getInt16(fieldName)); + return r; + } + case INT: { + IntWritable r = reuse == null ? new IntWritable() : (IntWritable) reuse; + r.set(ev.getInt32(fieldName)); + return r; + } + case LONG: { + LongWritable r = reuse == null ? new LongWritable() : (LongWritable) reuse; + r.set(ev.getInt64(fieldName)); + return r; + } + // support float through conversion from string since LWES does + // not support floats + case FLOAT: { + FloatWritable r = reuse == null ? new FloatWritable() : (FloatWritable) reuse; + r.set(Float.parseFloat(ev.getString(fieldName))); + return r; + } + case DOUBLE: { + DoubleWritable r = reuse == null ? new DoubleWritable() : (DoubleWritable) reuse; + r.set(Double.parseDouble(ev.getString(fieldName))); + return r; + } + case STRING: { + Text r = reuse == null ? new Text() : (Text) reuse; + // this will work for String nd IPAddress type. + r.set(ev.get(fieldName).toString()); + return r; + } + default: { + throw new RuntimeException("Unrecognized type: " + ptype.getPrimitiveCategory()); + } + } + } + case LIST: + case MAP: + case STRUCT: { + throw new IOException("List, Map and Struct not supported in LWES"); + } + default: { + throw new RuntimeException("Unrecognized type: " + type.getCategory()); + } + } + } catch (NoSuchAttributeException ex) { + // we check for the attribute existence beforehead with isSet + // so we should never be here. Let's just repackage the exception. + throw new IOException(ex); + } + + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + LOG.debug("JournalSerDe::getObjectInspector()"); + return rowObjectInspector; + } + + @Override + public Class getSerializedClass() { + LOG.debug("JournalSerDe::getSerializedClass()"); + return BytesWritable.class; + } + /** + * Serializes an event to a writable object that can be handled by hadoop + * @param obj + * @param inspector + * @return + * @throws SerDeException + */ + + // buffers used to serialize + // we reused these objects since we don't want to create/destroy + // a huge number of objects in the JVM + EventWritable serialized = new EventWritable(); + Event serializeev = null; + + @Override + public Writable serialize(Object obj, ObjectInspector inspector) throws SerDeException { + LOG.debug("Serializing: " + obj.getClass().getCanonicalName()); + + try { + if(serializeev == null) + serializeev = new Event(allEventName, false, null); + } catch (EventSystemException ex) { + LOG.debug("Cannot create event ", ex); + throw new SerDeException("Can't create event in SerDe serialize", ex); + } + + StructObjectInspector soi = (StructObjectInspector) inspector; + List fields = soi.getAllStructFieldRefs(); + + // for each field supplied by object inspector + for (int i = 0; i < fields.size(); i++) { + // for each column, set the correspondent event attribute + String fieldName = fields.get(i).getFieldName(); + try { + serialize_column(serializeev, + columnNames.get(i), + soi.getStructFieldData(obj, fields.get(i)), //data for Struct Field + fields.get(i).getFieldObjectInspector()); // object inspector + } catch (EventSystemException ex) { + LOG.debug("Cannot set field " + fieldName, ex); + throw new SerDeException("Can't set field " + fieldName + + " in SerDe serialize", ex); + } + } + // if u ask me, this should be BytesWritable + // but apparently hive gets confused + serialized.setEvent(serializeev); + + return serialized; + + } + + private void serialize_column(Event ev, String f, Object o, ObjectInspector oi) + throws EventSystemException { + LOG.debug("Serializing column" + f); + + // just don't set the field. + if (o == null) { + return; + } + + switch (oi.getCategory()) { + case PRIMITIVE: { + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi; + switch (poi.getPrimitiveCategory()) { + case VOID: { + return; + } + case BOOLEAN: { + BooleanObjectInspector boi = (BooleanObjectInspector) poi; + boolean v = ((BooleanObjectInspector) poi).get(o); + ev.setBoolean(f, v); + return; + } + case BYTE: { + ByteObjectInspector boi = (ByteObjectInspector) poi; + byte v = boi.get(o); + // lwes doesn't have byte so we upcast + ev.setInt16(f, v); + return; + } + case SHORT: { + ShortObjectInspector spoi = (ShortObjectInspector) poi; + short v = spoi.get(o); + ev.setInt16(f, v); + return; + } + case INT: { + IntObjectInspector ioi = (IntObjectInspector) poi; + int v = ioi.get(o); + ev.setInt32(f, v); + return; + } + case LONG: { + LongObjectInspector loi = (LongObjectInspector) poi; + long v = loi.get(o); + ev.setInt64(f, v); + return; + } + case FLOAT: { + FloatObjectInspector foi = (FloatObjectInspector) poi; + float v = foi.get(o); + ev.setString(f, Float.toString(v)); + return; + } + case DOUBLE: { + DoubleObjectInspector doi = (DoubleObjectInspector) poi; + double v = doi.get(o); + ev.setString(f, Double.toString(v)); + return; + } + case STRING: { + StringObjectInspector soi = (StringObjectInspector) poi; + Text t = soi.getPrimitiveWritableObject(o); + ev.setString(f, (t != null ? t.toString() : null)); + return; + } + default: { + throw new RuntimeException("Unrecognized type: " + poi.getPrimitiveCategory()); + } + } + } + case LIST: + case MAP: + case STRUCT: { + throw new RuntimeException("Complex types not supported in LWES: " + oi.getCategory()); + } + default: { + throw new RuntimeException("Unrecognized type: " + oi.getCategory()); + } + } + } + + public static class FieldAndPosition { + + String field; + int position; + + public FieldAndPosition(String f, int p) { + field = f; + position = p; + } + + public String getField() { + return field; + } + + public void setField(String field) { + this.field = field; + } + + public int getPosition() { + return position; + } + + public void setPosition(int position) { + this.position = position; + } + + @Override + public String toString() { + return "<" + field + "," + position + ">"; + } + } +} diff --git a/src/main/java/org/lwes/hadoop/io/DatagramPacketInputStream.java b/src/main/java/org/lwes/hadoop/io/DatagramPacketInputStream.java new file mode 100644 index 0000000..ccdea5f --- /dev/null +++ b/src/main/java/org/lwes/hadoop/io/DatagramPacketInputStream.java @@ -0,0 +1,154 @@ +package org.lwes.hadoop.io; + + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.StreamCorruptedException; +import java.text.SimpleDateFormat; +import java.util.Date; + +import org.lwes.serializer.DeserializerState; +import org.lwes.serializer.Deserializer; +import org.lwes.Event; +import org.lwes.EventFactory; +import org.lwes.util.IPAddress; + +public class DatagramPacketInputStream extends DataInputStream +{ + static int HEADER_LENGTH = 22; + SimpleDateFormat dateFormatter = new SimpleDateFormat(); + private EventFactory ef = null; + private boolean typeChecking = false; /* act as it did before */ + + public void setEventFactory(EventFactory eventFactory) + { this.ef = eventFactory; } + public EventFactory getEventFactory() + { return ef; } + + public void setTypeChecking(boolean typeChecking) + { this.typeChecking = typeChecking; } + public boolean isTypeChecking() + { return typeChecking; } + + public DatagramPacketInputStream(InputStream is) + throws IOException, StreamCorruptedException + { + super(is); + } + + public Event readEvent() + { + Event event = null; + + if ( ef == null ) + { + ef = new EventFactory(); + } + try { + DeserializerState myState = new DeserializerState(); + + byte[] header = new byte[HEADER_LENGTH]; + readFully(header,0,HEADER_LENGTH); + int length = (int)(Deserializer.deserializeUINT16(myState,header)); + long time = Deserializer.deserializeINT64(myState,header); + IPAddress source_inet = + new IPAddress(Deserializer.deserializeIPADDR(myState,header)); + int source_port = + (int)(Deserializer.deserializeUINT16(myState,header)); + + int site_id = + (int)(Deserializer.deserializeUINT16(myState,header)); + + byte [] bytes = new byte[length]; + readFully(bytes,0,length); + if ( isTypeChecking() ) + { + event = ef.createEvent(bytes); + } + else + { + // create event without checks + event = new Event(bytes,false,null); + } + event.setInt64("ReceiptTime",time); + event.setIPAddress("SenderIP",source_inet); + event.setUInt16("SenderPort",source_port); + event.setUInt16("SiteID",site_id); + } catch ( EOFException e ) { + event=null; + } catch ( Exception e ) { + System.err.println("Problem with OutputStreamFromDataGrams "+e); + e.printStackTrace(); + event=null; + } + return event; + } + + public byte[] readDataGramBytes() + { + byte [] bytes = null; + try { + DeserializerState myState = new DeserializerState(); + + byte[] header = new byte[HEADER_LENGTH]; + readFully(header,0,HEADER_LENGTH); + int length = (int)(Deserializer.deserializeUINT16(myState,header)); + long time = Deserializer.deserializeINT64(myState,header); + IPAddress source_inet = + new IPAddress(Deserializer.deserializeIPADDR(myState,header)); + int source_port = + (int)(Deserializer.deserializeUINT16(myState,header)); + int site_id = + (int)(Deserializer.deserializeUINT16(myState,header)); + + bytes = new byte[length]; + readFully(bytes,0,length); + } catch ( EOFException e ) { + } catch ( Exception e ) { + System.err.println("Problem with OutputStreamFromDataGrams "+e); + } + return bytes; + } + + public String readDataGram() + { + StringBuffer sb = new StringBuffer(); + int total = -1; + try { + DeserializerState myState = new DeserializerState(); + + byte[] header = new byte[HEADER_LENGTH]; + total = read(header,0,HEADER_LENGTH); + + if ( total != -1 ) + { + int length = (int)(Deserializer.deserializeUINT16(myState,header)); + long time = Deserializer.deserializeINT64(myState,header); + IPAddress source_inet = + new IPAddress(Deserializer.deserializeIPADDR(myState,header)); + int source_port = + (int)(Deserializer.deserializeUINT16(myState,header)); + int site_id = + (int)(Deserializer.deserializeUINT16(myState,header)); + + dateFormatter.applyPattern("MM/dd/yyyy HH:mm:ss.SSS"); + Date date = new Date(time); + sb.append(dateFormatter.format(date)); + + byte [] bytes = new byte[length]; + total = read(bytes,0,length); + } + + } catch ( Exception e ) { + System.err.println("Problem with OutputStreamFromDataGrams "+e); + e.printStackTrace(); + } + if ( total == -1 ) return null; + + return sb.toString(); + } + +} + diff --git a/src/main/java/org/lwes/hadoop/io/DatagramPacketOutputStream.java b/src/main/java/org/lwes/hadoop/io/DatagramPacketOutputStream.java new file mode 100644 index 0000000..76aa063 --- /dev/null +++ b/src/main/java/org/lwes/hadoop/io/DatagramPacketOutputStream.java @@ -0,0 +1,120 @@ +/* + * Serializes an event as a Datagram Packet + */ +package org.lwes.hadoop.io; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.DatagramPacket; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import org.lwes.Event; +import org.lwes.NoSuchAttributeException; +import org.lwes.journaller.DeJournaller; +import org.lwes.journaller.util.EventHandlerUtil; + +import org.lwes.serializer.Serializer; +import org.lwes.util.IPAddress; + +class DatagramPacketOutputStream extends DataOutputStream { + + static int HEADER_LENGTH = 22; + static IPAddress default_ip = new IPAddress(); + + public DatagramPacketOutputStream(OutputStream os) + throws IOException { + super(os); + } + + /** + * Write a datagram to an output stream as follows + * + * 1) 2 bytes payload length (length of the datagram) + * 2) 8 bytes receipt time (output of java.lang.System.currentTimeMillis()) + * 3) 6 bytes of sender's IP address:port (4 byte IP, 2 byte port) + * 4) 2 bytes site ID + * 5) 4 bytes of future extensions + * 6) payload (Serialized event as described by Event.serialize()) + * + */ + public void writeDataGram(DatagramPacket datagram, long receiptTime, + int site_id) { + try { + int length = datagram.getLength(); + byte[] bytes = datagram.getData(); + byte[] header = new byte[HEADER_LENGTH]; + int offset = 0; + + /* payload length */ + offset += Serializer.serializeUINT16(length, header, offset); + + /* receipt Time */ + offset += + Serializer.serializeINT64(receiptTime, header, offset); + + /* sender ip */ + offset += + Serializer.serializeIPADDR(datagram.getAddress(), header, offset); + + /* sender port */ + offset += Serializer.serializeUINT16(datagram.getPort(), header, offset); + + /* site_id */ + offset += Serializer.serializeUINT16(site_id, header, offset); + + /* pad with zeros */ + offset += Serializer.serializeUINT32(0, header, offset); + + write(header, 0, HEADER_LENGTH); + write(bytes, 0, length); + } catch (Exception e) { + System.err.println("Problem with OutputStreamFromDataGrams " + e); + } + } + + /** + * Writes event to file using the same format + * as @link{DatagramPacketInputStream#readEvent} + * + * @param event + * @throws IOException + */ + public void writeEvent(Event event) throws IOException { + byte[] packet = event.serialize(); + + /* + * these fields are in the datagram packet and will be always carried + * over, unless they were already stripped from the original journal + * ot the file we're reading is not a DatagramPacket Format + */ + long receiptTime = 0; + InetAddress senderIP = InetAddress.getByAddress(new byte[]{0, 0, 0, 0}); + int senderPort = 0; + int siteID = 0; + + // instead of checking for each one, just do a try catch + try { + if(event != null && event.isSet("ReceiptTime")) { + receiptTime = event.getInt64("ReceiptTime"); + senderIP = InetAddress.getByAddress(event.getIPAddress("SenderIP")); + senderPort = event.getUInt16("SenderPort"); + siteID = event.getUInt16("SiteID"); + } + } catch (NoSuchAttributeException ex) { + // just ignore... this may be a lwes non-journal file. + } + + ByteBuffer b = ByteBuffer.allocate(DeJournaller.MAX_HEADER_SIZE); + + EventHandlerUtil.writeHeader(packet.length, + receiptTime, + senderIP, + senderPort, + siteID, b); + + out.write(b.array(), 0, DeJournaller.MAX_HEADER_SIZE); + out.write(packet); + out.flush(); + } +} diff --git a/src/main/java/org/lwes/hadoop/io/JournalInputFormat.java b/src/main/java/org/lwes/hadoop/io/JournalInputFormat.java new file mode 100644 index 0000000..ec624bb --- /dev/null +++ b/src/main/java/org/lwes/hadoop/io/JournalInputFormat.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.lwes.hadoop.io; + + + +import java.io.IOException; +import java.io.InputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.lwes.Event; +import org.lwes.hadoop.EventWritable; + +/** An {@link InputFormat} for plain text files. Files are broken into lines. + * Either linefeed or carriage-return are used to signal end of line. Keys are + * the position in the file, and values are the line of text.. */ +public class JournalInputFormat extends FileInputFormat + implements JobConfigurable { + + JobConf conf; + + @Override + public void configure(JobConf conf) { + this.conf = conf; + } + + @Override + protected boolean isSplitable(FileSystem fs, Path file) { + return false; + } + + @Override + public RecordReader getRecordReader( + InputSplit genericSplit, JobConf job, + Reporter reporter) + throws IOException { + reporter.setStatus(genericSplit.toString()); + return new JournalRecordReader(job, (FileSplit) genericSplit); + } + /************************************************************* + * RecordReader + * + *************************************************************/ + public static class JournalRecordReader implements RecordReader { + + private static final Log LOG = LogFactory.getLog(JournalRecordReader.class.getName()); + private CompressionCodecFactory compressionCodecs = null; + private long start; + private long pos; + private long end; + int maxLineLength; + private final DatagramPacketInputStream in; + + public JournalRecordReader(Configuration job, + FileSplit split) throws IOException { + this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", + Integer.MAX_VALUE); + start = split.getStart(); + end = start + split.getLength(); + final Path file = split.getPath(); + + LOG.debug("RecordReader: processing path " + file.toString() ); + + compressionCodecs = new CompressionCodecFactory(job); + final CompressionCodec codec = compressionCodecs.getCodec(file); + + // open the file and seek to the start of the split + FileSystem fs = file.getFileSystem(job); + FSDataInputStream fileIn = fs.open(split.getPath()); + + if (codec != null) { + in = new DatagramPacketInputStream(codec.createInputStream(fileIn)); + } else { + in = new DatagramPacketInputStream(fileIn); + } + } + + + @Override + public LongWritable createKey() { + return new LongWritable(); + } + + @Override + public EventWritable createValue() { + return new EventWritable(); + } + + /** Read a line. */ + @Override + public synchronized boolean next(LongWritable key, EventWritable value) + throws IOException { + + Event ev = in.readEvent(); + + if (ev != null) { + value.setEvent(ev); + pos += ev.serialize().length; + + key.set(pos); + + return true; + } else { + return false; + } + } + + /** + * Get the progress within the split + */ + @Override + public float getProgress() { + if (start == end) { + return 0.0f; + } else { + return Math.min(1.0f, (pos - start) / (float) (end - start)); + } + } + + @Override + public synchronized long getPos() throws IOException { + return pos; + } + + @Override + public synchronized void close() throws IOException { + if (in != null) { + in.close(); + } + } +} + + +} diff --git a/src/main/java/org/lwes/hadoop/io/JournalOutputFormat.java b/src/main/java/org/lwes/hadoop/io/JournalOutputFormat.java new file mode 100644 index 0000000..313bfb3 --- /dev/null +++ b/src/main/java/org/lwes/hadoop/io/JournalOutputFormat.java @@ -0,0 +1,122 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package org.lwes.hadoop.io; + +import java.io.IOException; +import java.util.Properties; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.ReflectionUtils; +import org.lwes.hadoop.EventWritable; + +/** + * + * @author rcongiu + */ +public class JournalOutputFormat extends FileOutputFormat + implements HiveOutputFormat { + + JobConf conf; + + @Override + public RecordWriter getRecordWriter(FileSystem fs, + JobConf job, String name, Progressable progress) throws IOException { + Path outputPath = getWorkOutputPath(job); + + if (!fs.exists(outputPath)) { + fs.mkdirs(outputPath); + } + Path file = new Path(outputPath, name); + CompressionCodec codec = null; + if (getCompressOutput(job)) { + Class codecClass = getOutputCompressorClass(job, DefaultCodec.class); + codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, job); + } + + final FSDataOutputStream outFile = fs.create(file); + final DatagramPacketOutputStream out; + if (codec != null) { + out = new DatagramPacketOutputStream(codec.createOutputStream(outFile)); + } else { + out = new DatagramPacketOutputStream(outFile); + } + + return new RecordWriter() { + + @Override + public void close(Reporter reporter) throws IOException { + out.close(); + } + + @Override + public void write(LongWritable key, EventWritable value) + throws IOException { + out.write(value.getEvent().serialize()); + } + }; + } + + + @Override + public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf job, + Path path, Class type, boolean isCompressed, Properties properties, + Progressable p) throws IOException { + FileSystem fs = path.getFileSystem(job); + + final DatagramPacketOutputStream outStream = new DatagramPacketOutputStream(Utilities.createCompressedStream(job, + fs.create(path), isCompressed)); + + int rowSeparator = 0; + String rowSeparatorString = properties.getProperty( + Constants.LINE_DELIM, "\n"); + try { + rowSeparator = Byte.parseByte(rowSeparatorString); + } catch (NumberFormatException e) { + rowSeparator = rowSeparatorString.charAt(0); + } + final int finalRowSeparator = rowSeparator; + return new FileSinkOperator.RecordWriter() { + + @Override + public void write(Writable r) throws IOException { + if (r instanceof Text) { + Text tr = (Text) r; + outStream.write(tr.getBytes(), 0, tr.getLength()); + outStream.write(finalRowSeparator); + } else if (r instanceof EventWritable) { + EventWritable ew = (EventWritable) r; + outStream.writeEvent(ew.getEvent()); + } else { + // DynamicSerDe always writes out BytesWritable + BytesWritable bw = (BytesWritable) r; + outStream.write(bw.get(), 0, bw.getSize()); + outStream.write('\n'); + } + } + + @Override + public void close(boolean abort) throws IOException { + outStream.close(); + } + }; + + } +} diff --git a/src/test/java/org/openx/data/hive/journalserde/AppTest.java b/src/test/java/org/openx/data/hive/journalserde/AppTest.java new file mode 100644 index 0000000..bec1d38 --- /dev/null +++ b/src/test/java/org/openx/data/hive/journalserde/AppTest.java @@ -0,0 +1,38 @@ +package org.openx.data.hive.journalserde; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +} -- 2.11.4.GIT