diff --git a/pxf/build.gradle b/pxf/build.gradle index 1ed42bf08e..9866fc7eea 100644 --- a/pxf/build.gradle +++ b/pxf/build.gradle @@ -123,7 +123,6 @@ subprojects { subProject -> force 'commons-codec:commons-codec:1.4' force 'commons-collections:commons-collections:3.2.1' force 'commons-logging:commons-logging:1.1.3' - force 'org.apache.avro:avro:1.7.4' force 'org.apache.zookeeper:zookeeper:3.4.6' force 'org.codehaus.jackson:jackson-mapper-asl:1.9.13' force 'junit:junit:4.11' @@ -510,6 +509,39 @@ project('pxf-jdbc') { } } +project('pxf-s3') { + dependencies { + compile(project(':pxf-api')) + compile(project(':pxf-hdfs')) + compile "com.amazonaws:aws-java-sdk-s3:1.11.313" + compile "org.apache.parquet:parquet-hadoop:$parquetVersion" + compile "org.apache.parquet:parquet-tools:$parquetVersion" + compile "org.apache.hadoop:hadoop-common:2.8.2" + compile "org.apache.hadoop:hadoop-aws:2.8.2" + compile "org.apache.avro:avro:1.8.2" + compile "org.apache.parquet:parquet-avro:1.8.2" + testCompile "io.findify:s3mock_2.12:0.2.5" + } + + ospackage { + packageName = versionedPackageName("${project.name}") + summary = 'HAWQ Extension Framework (PXF), S3 plugin' + description = 'Querying external data stored in S3' + packager = ' ' + packageGroup = 'Development/Libraries' + release = buildNumber() + '.' + project.osFamily + buildHost = ' ' + + requires(versionedPackageName('pxf-hdfs'), project.version, GREATER | EQUAL) + + from(jar.outputs.files) { + into "/usr/lib/pxf-${project.version}" + } + + link("/usr/lib/pxf-${project.version}/${project.name}.jar", "${project.name}-${project.version}.jar") + } +} + def buildNumber() { System.getenv('BUILD_NUMBER') ?: System.getProperty('user.name') } diff --git a/pxf/pxf-s3/.gitignore b/pxf/pxf-s3/.gitignore new file mode 100644 index 0000000000..8a3a4e4b4f --- /dev/null +++ b/pxf/pxf-s3/.gitignore @@ -0,0 +1,30 @@ +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* +.gradle +gradle +.classpath +.settings +.project +bin +build +.*.swp diff --git a/pxf/pxf-s3/README.md b/pxf/pxf-s3/README.md new file mode 100644 index 0000000000..79c4689dea --- /dev/null +++ b/pxf/pxf-s3/README.md @@ -0,0 +1,206 @@ +# PXF extension to Greenplum to provide access to Parquet formatted data stored in S3 + +
+Greenplum +Parquet +S3 +
+ +## TODO +* Add support for Avro data + +## Collect all JARs and install on Greenplum segment server(s) +* The Gradle build will deposit all the transitive dependencies into `./build/libs` +* `tar -C ./pxf-s3-parquet/build/libs -cvf pxf-s3-jars.tar .` +* Copy this up to the Greenplum server(s) and extract into, say, `~/gpadmin/pxf-jars` +* Add this list of JARS to the `$GPHOME/pxf/conf/pxf-public.classpath` file: + ``` + ls -1 ~/pxf-jars/* >> $GPHOME/pxf/conf/pxf-public.classpath + ``` + +## Add new PXF profile +* Edit `$GPHOME/pxf/conf/pxf-profiles.xml`: +``` + + + S3ParquetWrite + A profile for writing Parquet data to S3 + + org.apache.hawq.pxf.plugins.s3.S3Fragmenter + org.apache.hawq.pxf.plugins.s3.S3ParquetWriteAccessor + org.apache.hawq.pxf.plugins.s3.S3ParquetWriteResolver + + + + S3ParquetRead + A profile for reading Parquet data from S3; does not support nested data + + org.apache.hawq.pxf.plugins.s3.S3Fragmenter + org.apache.hawq.pxf.plugins.s3.S3ParquetAccessor + org.apache.hawq.pxf.plugins.s3.S3ParquetResolver + + + + S3ParquetJsonRead + A profile for reading Parquet data from S3, as JSON; useful with nested data + + org.apache.hawq.pxf.plugins.s3.S3Fragmenter + org.apache.hawq.pxf.plugins.s3.S3ParquetJsonAccessor + org.apache.hawq.pxf.plugins.s3.S3ParquetJsonResolver + + + +``` + +## Restart the PXF server on all segment hosts +``` + $GPHOME/pxf/bin/pxf restart +``` + +## Example: reading data from Parquet files in S3 +* Data source: you can upload into S3 the five Parquet files found [here](https://github.com/Teradata/kylo/tree/master/samples/sample-data/parquet) +* DDL: replace the `S3_ACCESS_KEY`, `S3_SECRET_KEY` and `S3_REGION` values +``` +-- Parquet input, JSON output +DROP EXTERNAL TABLE IF EXISTS parquet_json; +CREATE EXTERNAL TABLE parquet_json (js JSON) +LOCATION ('pxf://pxf-s3-devel/the-data/userdata?S3_REGION=us-east-1&S3_ACCESS_KEY=ACFOCBFFDJFHZ69M7GM7&S3_SECRET_KEY=3DFq0kV01aEdNzr5uxkeN7Hr/cZv6erDjM3z4NsB&PROFILE=S3ParquetJsonRead') +FORMAT 'TEXT' (DELIMITER 'OFF'); +``` +* Queries on this data use the Greenplum JSON operators described [here](https://gpdb.docs.pivotal.io/500/admin_guide/query/topics/json-data.html#topic_gn4_x3w_mq) +* One possible query for this data set. Note the use of parenthesis around the `js->>'salary'`, since the cast operator (`::`) binds more tightly than the `->>` operator. +``` +SELECT js->>'country', AVG((js->>'salary')::float)::NUMERIC(9, 2) +FROM parquet_json +GROUP BY 1 +ORDER BY 2 ASC +LIMIT 10; +``` + +## Example: write data into Parquet files in S3 +* DDL: replace the `S3_ACCESS_KEY`, `S3_SECRET_KEY` and `S3_REGION` values +``` +DROP EXTERNAL TABLE IF EXISTS write_parquet; +CREATE WRITABLE EXTERNAL TABLE write_parquet (a INT, b TEXT, c BOOLEAN) +LOCATION ('pxf://pxf-s3-devel/test-write?S3_REGION=us-east-1&S3_ACCESS_KEY=ACFOCBFFDJFHZ69M7GM7&S3_SECRET_KEY=3DFq0kV01aEdNzr5uxkeN7Hr/cZv6erDjM3z4NsB&PROFILE=S3ParquetWrite') +FORMAT 'CUSTOM' (formatter='pxfwritable_export'); +``` +* Insert some rows into this table +``` +INSERT INTO write_parquet +(b, a, c) VALUES ('First value for b', 1001, false), ('Second value for b', 1002, true); +``` +* Build a *readable* Parquet S3 table, to verify what was written (JSON format) +``` +DROP EXTERNAL TABLE IF EXISTS read_what_we_wrote; +CREATE EXTERNAL TABLE read_what_we_wrote (js JSON) +LOCATION ('pxf://pxf-s3-devel/test-write?S3_REGION=us-east-1&S3_ACCESS_KEY=ACFOCBFFDJFHZ69M7GM7&S3_SECRET_KEY=3DFq0kV01aEdNzr5uxkeN7Hr/cZv6erDjM3z4NsB&PROFILE=S3ParquetJsonRead') +FORMAT 'TEXT' (DELIMITER 'OFF'); +``` +* Finally, query that table to verify that values made it +``` +SELECT js->>'b' a, (js->>'a')::INT a, (js->>'c')::BOOLEAN c +FROM read_what_we_wrote +ORDER BY 2 ASC; +``` +* Or, use a table which uses the primitive data types (doesn't support nested data structures) +``` +DROP EXTERNAL TABLE IF EXISTS read_primitives; +CREATE EXTERNAL TABLE read_primitives (LIKE write_parquet) +LOCATION ('pxf://pxf-s3-devel/test-write?S3_REGION=us-east-1&S3_ACCESS_KEY=ACFOCBFFDJFHZ69M7GM7&S3_SECRET_KEY=3DFq0kV01aEdNzr5uxkeN7Hr/cZv6erDjM3z4NsB&PROFILE=S3ParquetRead') +FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import'); +``` +* And read from this table +``` +SELECT * +FROM read_primitives +ORDER BY a ASC; +``` + +## Caveats +* You have to replace a JAR in the standard classpath with one we need due to the S3 libs: +``` +[gpadmin@gpdbsne conf]$ diff -u pxf-private.classpath.ORIG pxf-private.classpath +--- pxf-private.classpath.ORIG 2018-04-17 19:17:48.178225024 +0000 ++++ pxf-private.classpath 2018-04-17 19:18:35.114744844 +0000 +@@ -52,7 +52,7 @@ + /usr/lib/hadoop/client/commons-logging.jar + /usr/lib/hadoop/client/guava.jar + /usr/lib/hadoop/client/htrace-core4.jar +-/usr/lib/hadoop/client/jackson-core.jar ++/home/gpadmin/pxf-jars/jackson-core-2.6.7.jar + /usr/lib/hadoop/client/jackson-mapper-asl.jar + /usr/lib/hadoop/client/jetty-*.jar + /usr/lib/hadoop/client/jersey-core.jar + ``` + +* Out of memory? +If the query fails, and you find `java.lang.OutOfMemoryError: GC overhead limit exceeded` in the catalina.out, +try increasing the heap size for the Java PXF process (it's running Tomcat). + - File: `/usr/local/greenplum-db/pxf/pxf-service/bin/setenv.sh` on each segment host + - Line to change: `JVM_OPTS="-Xmx16g -Xms8g -Xss256K -Dpxf.log.dir=/usr/local/greenplum-db/pxf/logs "` + - The `-Xmx16g -Xms8g` is the *new* value (it was set to a much lower value initially) + +* Inconsistency in handling TIMESTAMP and DATE types, with Parquet / Avro +The `BridgeOutputBuilder` class contains the following check, which makes it impossible to serialize +these two types, using the Avro schema approach typical with Parquet, in the way that they are usually +handled (e.g. `DATE` would be represented at `int`, and `TIMESTAMP` as `long`): +``` + for (int i = 0; i < size; i++) { + OneField current = recFields.get(i); + if (!isTypeInSchema(current.type, schema[i])) { + throw new BadRecordException("For field " + colNames[i] + + " schema requires type " + + DataType.get(schema[i]).toString() + + " but input record has type " + + DataType.get(current.type).toString()); + } + + fillOneGPDBWritableField(current, i); + } +``` +That `isTypeInSchema()` method incorporates the following logic, which basically implies that these two +types can be handled, so long at they are serialized within Parquet as `String` type: +``` + boolean isStringType(DataType type) { + return Arrays.asList(DataType.VARCHAR, DataType.BPCHAR, DataType.TEXT, + DataType.NUMERIC, DataType.TIMESTAMP, DataType.DATE).contains(type); + } +``` +So, if *we* are writing Parquet data from Greenplum, we would write consistent with this approach, but +it remains to be seen how well we will handle Parquet data written by some other system. + +## Notes +* Here's an example of an INSERT into the S3 Parquet external table, reading from an s3 external table +``` +gpadmin=# INSERT INTO osm_parquet_write SELECT id, extract(epoch from date_time)::int date_time, uid, lat, lon, name, key_value from osm_ext; +NOTICE: Found 31 data formatting errors (31 or more input rows). Rejected related input data. +INSERT 0 5327616 +Time: 12495.106 ms +``` +* Example: read of that Parquet data, using the `S3ParquetJsonRead` profile +``` +gpadmin=# select count(*) from osm_parquet_read; + count +--------- + 5327616 +(1 row) + +Time: 184331.683 ms +``` +* Alternatively, read that same data, but from a table created using the `S3ParquetRead` profile +``` +gpadmin=# select count(*) from osm_prim_read; count +--------- + 5327616 +(1 row) + +Time: 6549.945 ms +``` +So far, it seems that the JSON read mode is quite a bit (about 30x) slower, so there's work to be +done to explore what's causing that. + +* A [reference on tuning Parquet](https://www.slideshare.net/RyanBlue3/parquet-performance-tuning-the-missing-guide) +* An [article about moving to Parquet file](https://www.enigma.com/blog/moving-to-parquet-files-as-a-system-of-record) +* [This script](./csv_to_parquet_pyspark.py) was used to verify that this solution is able to read Parquet data files written by Spark + diff --git a/pxf/pxf-s3/csv_to_parquet_pyspark.py b/pxf/pxf-s3/csv_to_parquet_pyspark.py new file mode 100644 index 0000000000..56cb6a59fb --- /dev/null +++ b/pxf/pxf-s3/csv_to_parquet_pyspark.py @@ -0,0 +1,54 @@ +# +# Used to convert the Open Street Map dump data, which I have in .gz files, +# delimited by '<', in S3 +# +# To to this, I first set up a Spark cluster in Amazon EC2 "EMR", then logged +# into the master node via SSH, then started the pyspark shell. +# +# See: +# http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema +# https://spark.apache.org/docs/1.6.2/api/python/_modules/pyspark/sql/types.html +# https://stackoverflow.com/questions/33129918/pyspark-typeerror-integertype-can-not-accept-object-in-type-type-unicode +# + +import datetime + +# Handle potential junk data for FLOAT type +def get_float(val, defaultVal=0.0): + try: + rv = float(val) + except ValueError: + rv = defaultVal + return rv + +# And, for LONG type +def get_long(val, defaultVal=-1L): + try: + rv = long(val) + except ValueError: + rv = defaultVal + return rv + +lines = sc.textFile('s3://goddard.datasets/osm_exerpt_0*') +lines.count() # Should be 5327647 +cols = lines.map(lambda l: l.split('<')) + +typedCols = cols.map(lambda c: ( + get_long(c[0], -1L), # LONG + datetime.datetime.strptime(c[1], '%Y-%m-%dT%H:%M:%SZ'), # TIMESTAMP + c[2], # STRING + get_float(c[3], 91.0), # FLOAT + get_float(c[4], 181.0), + c[5], + c[6])) +df = spark.createDataFrame(typedCols) + +# Writes 8 files into this bucket. The bucket must exist, but the "from-spark" part will be created +# if it doesn't exist. +df.write.parquet('s3a://pxf-s3-devel/from-spark', mode='overwrite') + +# Downsample this data set, then take 1k rows and save it for future testing. The resulting +# file is here (in this repo): ./data/sample_from_pyspark.snappy.parquet +df = df.sample(False, 0.0002, seed=137).limit(1000) +df.write.parquet('s3a://pxf-s3-devel/sample-from-spark', mode='overwrite') + diff --git a/pxf/pxf-s3/data/sample_from_pyspark.snappy.parquet b/pxf/pxf-s3/data/sample_from_pyspark.snappy.parquet new file mode 100644 index 0000000000..d35d59df94 Binary files /dev/null and b/pxf/pxf-s3/data/sample_from_pyspark.snappy.parquet differ diff --git a/pxf/pxf-s3/images/Greenplum_Logo.png b/pxf/pxf-s3/images/Greenplum_Logo.png new file mode 100644 index 0000000000..20f86f327b Binary files /dev/null and b/pxf/pxf-s3/images/Greenplum_Logo.png differ diff --git a/pxf/pxf-s3/images/Parquet_Logo.png b/pxf/pxf-s3/images/Parquet_Logo.png new file mode 100644 index 0000000000..4c0002b8c9 Binary files /dev/null and b/pxf/pxf-s3/images/Parquet_Logo.png differ diff --git a/pxf/pxf-s3/images/Plus_Sign.png b/pxf/pxf-s3/images/Plus_Sign.png new file mode 100644 index 0000000000..595271d0e2 Binary files /dev/null and b/pxf/pxf-s3/images/Plus_Sign.png differ diff --git a/pxf/pxf-s3/images/S3_Logo.png b/pxf/pxf-s3/images/S3_Logo.png new file mode 100644 index 0000000000..1d4cd5f24b Binary files /dev/null and b/pxf/pxf-s3/images/S3_Logo.png differ diff --git a/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/AvroUtil.java b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/AvroUtil.java new file mode 100644 index 0000000000..972e6f2c00 --- /dev/null +++ b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/AvroUtil.java @@ -0,0 +1,104 @@ +package org.apache.hawq.pxf.plugins.s3; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; + +public class AvroUtil { + + public static final String NAMESPACE = "org.greenplum.avro"; + private static final Log LOG = LogFactory.getLog(AvroUtil.class); + + private AvroUtil() { + } + + /** + * When writing Parquet or Avro, the Avro schema corresponding to the database rows selected + * in the query is required. This method provides that schema. + * NOTE: the provided schema specifies that any of the fields can be null + * + * @param id the InputData + * @return the corresponding Avro Schema instance + */ + public static Schema schemaFromInputData(InputData id) { + String schemaStr = "{\"namespace\": \"" + NAMESPACE + "\", \"type\": \"record\", "; + // FIXME: is there a way to get the table name from InputData? + schemaStr += "\"name\": \"mytable\", \"fields\": ["; + List fieldList = new ArrayList<>(); + for (ColumnDescriptor cd : id.getTupleDescription()) { + String fieldStr = "{\"name\": \"" + cd.columnName().toLowerCase() + "\", \"type\": [" + + asAvroType(DataType.get(cd.columnTypeCode())) + ", \"null\" ]}"; + fieldList.add(fieldStr); + } + schemaStr += String.join(", ", fieldList); + schemaStr += "]}"; + LOG.info("Avro schema string: " + schemaStr); + return new Schema.Parser().parse(schemaStr); + } + + /** + * @param s the input string + * @return the input string, surrounded by double quotes + */ + public static String addQuotes(String s) { + return "\"" + s + "\""; + } + + /** + * + * @param gpType the DataType, from the database, to be resolved into an Avro type + * @return a String representation of the corresponding Avro data type, surrounded by double quotes + */ + public static String asAvroType(DataType gpType) { + String rv = null; + switch (gpType) { + case BOOLEAN: + rv = addQuotes("boolean"); + break; + case BYTEA: + rv = addQuotes("bytes"); + break; + case BIGINT: + rv = addQuotes("long"); + break; + case SMALLINT: + case INTEGER: + rv = addQuotes("int"); + break; + case TEXT: + case BPCHAR: + case VARCHAR: + rv = addQuotes("string"); + break; + case REAL: + rv = addQuotes("float"); + break; + case FLOAT8: + rv = addQuotes("double"); + break; + case NUMERIC: // FIXME: come up with a better approach for NUMERIC + rv = addQuotes("string"); + break; + /* + * Ref. + * https://avro.apache.org/docs/1.8.0/spec.html#Timestamp+%28millisecond+precision%29 + * https://avro.apache.org/docs/1.8.1/api/java/index.html?org/apache/avro/SchemaBuilder.html + */ + case DATE: + rv = "{ \"type\": \"string\", \"logicalType\": \"date\" }"; + break; + case TIMESTAMP: + rv = "{ \"type\": \"string\", \"logicalType\": \"timestamp-millis\" }"; + break; + default: + throw new RuntimeException("Unsupported type: " + gpType); + } + return rv; + } +} diff --git a/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/NullableOneField.java b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/NullableOneField.java new file mode 100644 index 0000000000..734fc286b8 --- /dev/null +++ b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/NullableOneField.java @@ -0,0 +1,28 @@ +package org.apache.hawq.pxf.plugins.s3; + +import org.apache.hawq.pxf.api.OneField; + +public class NullableOneField extends OneField { + + public NullableOneField(int type, Object val) { + super(type, val); + } + + public NullableOneField() { + super(); + } + + /** + * The purpose of this class is to handle the case where val is + * null so, rather than calling toString() on null, it will simply + * return null. + */ + public String toString() { + String rv = null; + if (null != this.val) { + rv = this.val.toString(); + } + return rv; + } + +} diff --git a/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/PxfS3.java b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/PxfS3.java new file mode 100644 index 0000000000..70c357485e --- /dev/null +++ b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/PxfS3.java @@ -0,0 +1,188 @@ +package org.apache.hawq.pxf.plugins.s3; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hawq.pxf.api.utilities.InputData; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.amazonaws.services.s3.model.S3ObjectSummary; + +/** + * Test S3 access, ls, wildcarding filenames, open/read + */ +public class PxfS3 { + + private AmazonS3 s3; + private String bucketName; + private String objectName; + private BufferedReader s3In; + private String s3AccessKey; + private String s3SecretKey; + + public static final String S3_ACCESS_KEY = "S3_ACCESS_KEY"; + public static final String S3_SECRET_KEY = "S3_SECRET_KEY"; + public static final String S3_REGION = "S3_REGION"; + + @SuppressWarnings("unused") + private static final Log LOG = LogFactory.getLog(PxfS3.class); + + public String toString() { + return "PxfS3[bucketName=" + bucketName + ", objectName=" + objectName + "]"; + } + + /** + * This exists to support tests + */ + public PxfS3() { + super(); + } + + // FIXME: how to bubble these errors up to the SQL prompt? + public PxfS3(String s3AccessKey, String s3SecretKey, String s3Region) { + if (null == s3AccessKey || null == s3SecretKey) { + throw new RuntimeException("Both " + S3_ACCESS_KEY + " and " + S3_SECRET_KEY + " must be set"); + } + if (null == s3Region) { + throw new RuntimeException(S3_REGION + " must be set"); + } + this.s3AccessKey = s3AccessKey; + this.s3SecretKey = s3SecretKey; + BasicAWSCredentials creds = new BasicAWSCredentials(s3AccessKey, s3SecretKey); + s3 = AmazonS3ClientBuilder.standard().withRegion(s3Region) + .withCredentials(new AWSStaticCredentialsProvider(creds)).build(); + } + + // Used within the *write* context + public String getS3aURI(InputData inputData) { + // /pxf-s3-devel/test-write/1524148597-0000000430_1 + // Need to get objectName based on this string, which is in inputData.getDataSource() + String s3Path = getS3Path(inputData); + this.objectName = s3Path.replaceAll("^[^/]+/", ""); + return getS3aURI(); + } + + // Format: s3a://S3_ACCESS_KEY:S3_SECRET_KEY@BUCKET_NAME/FILE_NAME + public String getS3aURI() { + String rv = "s3a://" + this.s3AccessKey + ":"; + try { + rv += URLEncoder.encode(this.s3SecretKey, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + rv += "@" + this.bucketName + "/" + this.objectName; + return rv; + } + + public void setObjectName(String name) { + this.objectName = name; + } + + public void open() { + S3Object s3Obj = s3.getObject(bucketName, objectName); + S3ObjectInputStream s3is = s3Obj.getObjectContent(); + s3In = new BufferedReader(new InputStreamReader(s3is)); + } + + public void close() throws IOException { + if (null != s3In) { + s3In.close(); + s3In = null; + } + } + + public String readLine() throws IOException { + return s3In.readLine(); + } + + public String getBucketName() { + return bucketName; + } + + private static String getS3Path(InputData inputData) { + return inputData.getDataSource().replaceAll("^/+", ""); + } + + // Set bucketName and prefix, based on the PXF URI + protected void setBucketName(InputData inputData) { + // Remove any leading '/' as the value passed in here is in consistent in this + // regard + String s3Path = getS3Path(inputData); + int slashPos = s3Path.indexOf("/"); + if (slashPos > -1) { + bucketName = s3Path.substring(0, slashPos); + } else { + bucketName = s3Path; + } + } + + /* + * Useful from within Fragmenter and Accessor classes + */ + public static PxfS3 fromInputData(InputData inputData) { + String s3AccessKey = inputData.getUserProperty(S3_ACCESS_KEY); + String s3SecretKey = inputData.getUserProperty(S3_SECRET_KEY); + String s3Region = inputData.getUserProperty(S3_REGION); + PxfS3 rv = new PxfS3(s3AccessKey, s3SecretKey, s3Region); + rv.setBucketName(inputData); + return rv; + } + + /* + * If prefix is null, it will have no effect. Note that this doesn't handle + * something like "*.csv", since that's a wildcard and not a prefix. + */ + public List getKeysInBucket(InputData inputData) { + List rv = new ArrayList<>(); + String s3Path = getS3Path(inputData); + String prefix = null; + int slashPos = s3Path.indexOf("/"); + if (slashPos > -1) { + bucketName = s3Path.substring(0, slashPos); + if (s3Path.length() > slashPos + 1) { + prefix = s3Path.substring(slashPos + 1); + } + } else { + bucketName = s3Path; + } + ListObjectsV2Result result = s3.listObjectsV2(bucketName, prefix); + List objects = result.getObjectSummaries(); + for (S3ObjectSummary os : objects) { + String objKey = os.getKey(); + if (!objKey.endsWith("/")) { + rv.add(objKey); + } + } + return rv; + } + + public AmazonS3 getS3() { + return s3; + } + + public void setS3(AmazonS3 s3) { + this.s3 = s3; + } + + public void setS3AccessKey(String s3AccessKey) { + this.s3AccessKey = s3AccessKey; + } + + public void setS3SecretKey(String s3SecretKey) { + this.s3SecretKey = s3SecretKey; + } + +} diff --git a/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3Fragmenter.java b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3Fragmenter.java new file mode 100644 index 0000000000..e6ef60f539 --- /dev/null +++ b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3Fragmenter.java @@ -0,0 +1,73 @@ +package org.apache.hawq.pxf.plugins.s3; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.Fragmenter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hawq.pxf.api.Fragment; +import org.apache.hawq.pxf.api.utilities.InputData; + +import java.net.InetAddress; +import java.util.List; + +/** + * Fragmenter for S3 + * + * Based on the input, a bucket name plus a prefix, expands that to a list of + * matching paths, which is returned by getFragments() + */ +public class S3Fragmenter extends Fragmenter { + + private static final Log LOG = LogFactory.getLog(S3Fragmenter.class); + private final PxfS3 pxfS3; + + public S3Fragmenter(InputData inputData) { + super(inputData); + LOG.info("dataSource: " + inputData.getDataSource()); + pxfS3 = PxfS3.fromInputData(inputData); + } + + /** + * This is for tests + * @param inputData + * @param pxfS3 for tests, this would be a PxfS3Mock instance + */ + protected S3Fragmenter(InputData inputData, PxfS3 pxfS3) { + super(inputData); + this.pxfS3 = pxfS3; + } + + /** + * Returns a list of distinct S3 objects, including the full path to each, based + * on the bucket name plus prefix passed in via the InputData. + */ + @Override + public List getFragments() throws Exception { + String pxfHost = InetAddress.getLocalHost().getHostAddress(); + String[] hosts = new String[] { pxfHost }; + for (String key : pxfS3.getKeysInBucket(super.inputData)) { + // Example key values: test-write/1524148597-0000000911_1, test-write/1524496731-0000002514_5 + fragments.add(new Fragment(inputData.getDataSource(), hosts, key.getBytes())); + } + return fragments; + } + +} diff --git a/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3ParquetAccessor.java b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3ParquetAccessor.java new file mode 100644 index 0000000000..ab203e1243 --- /dev/null +++ b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3ParquetAccessor.java @@ -0,0 +1,73 @@ +package org.apache.hawq.pxf.plugins.s3; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.hdfs.ParquetFileAccessor; +import org.apache.hawq.pxf.plugins.hdfs.ParquetResolver; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; + +public class S3ParquetAccessor extends ParquetFileAccessor implements ReadAccessor { + + private static final Log LOG = LogFactory.getLog(S3ParquetAccessor.class); + private PxfS3 pxfS3; + private ParquetResolver resolver; + private MessageType schema; + + public S3ParquetAccessor(InputData input) { + super(input); + pxfS3 = PxfS3.fromInputData(inputData); + pxfS3.setObjectName(new String(inputData.getFragmentMetadata())); + resolver = new ParquetResolver(inputData); + } + + @Override + public boolean openForRead() throws Exception { + LOG.info("openForRead(): " + pxfS3); + Path path = new Path(pxfS3.getS3aURI()); + Configuration conf = new Configuration(); + ParquetMetadata metadata = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER); + schema = metadata.getFileMetaData().getSchema(); + setSchema(schema); + setReader(new ParquetFileReader(conf, path, ParquetMetadataConverter.NO_FILTER)); + setRecordIterator(); + return iteratorHasNext(); + } + + /** + * This overrides the parent's method, using a Resolver to set up the + * List that goes into the OneRow return value (which just gets passed + * through by S3ParquetResolver). The reason for this is that the schema isn't + * available to the Resolver, but it is here, so it makes sense to use it here. + */ + @Override + public OneRow readNextObject() { + OneRow rv = null; + OneRow next = super.readNextObject(); + if (null != next) { + try { + List oneFieldList = new ArrayList(); + for (OneField of: resolver.getFields(next, schema)) { + NullableOneField nof = new NullableOneField(of.type, of.val); + oneFieldList.add(nof); + } + rv = new OneRow(null, oneFieldList); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return rv; + } + +} diff --git a/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3ParquetJsonAccessor.java b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3ParquetJsonAccessor.java new file mode 100644 index 0000000000..4238c4e63b --- /dev/null +++ b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3ParquetJsonAccessor.java @@ -0,0 +1,81 @@ +package org.apache.hawq.pxf.plugins.s3; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.tools.json.JsonRecordFormatter; +import org.apache.parquet.tools.read.SimpleReadSupport; +import org.apache.parquet.tools.read.SimpleRecord; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +public class S3ParquetJsonAccessor extends Plugin implements ReadAccessor { + + private static final Log LOG = LogFactory.getLog(S3ParquetJsonAccessor.class); + + private PxfS3 pxfS3; + private ParquetReader reader; + private JsonRecordFormatter.JsonGroupFormatter formatter; + + public S3ParquetJsonAccessor(InputData inputData) { + super(inputData); + pxfS3 = PxfS3.fromInputData(inputData); + pxfS3.setObjectName(new String(inputData.getFragmentMetadata())); + } + + // Called once per object in S3 + @Override + public boolean openForRead() throws Exception { + LOG.info("openForRead(): " + pxfS3); + Path path = new Path(pxfS3.getS3aURI()); + Configuration conf = new Configuration(); + ParquetMetadata metadata = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER); + this.reader = ParquetReader.builder(new SimpleReadSupport(), path).build(); + this.formatter = JsonRecordFormatter.fromSchema(metadata.getFileMetaData().getSchema()); + return true; + } + + @Override + public OneRow readNextObject() throws Exception { + OneRow row = null; + SimpleRecord record = reader.read(); + if (null != record) { + row = new OneRow(null, formatter.formatRecord(record)); + } + return row; + } + + @Override + public void closeForRead() throws Exception { + LOG.info("closeForRead()"); + if (null != reader) { + reader.close(); + } + } +} diff --git a/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3ParquetJsonResolver.java b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3ParquetJsonResolver.java new file mode 100644 index 0000000000..a6683a8b5d --- /dev/null +++ b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3ParquetJsonResolver.java @@ -0,0 +1,26 @@ +package org.apache.hawq.pxf.plugins.s3; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadResolver; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; + +public class S3ParquetJsonResolver extends Plugin implements ReadResolver { + + public S3ParquetJsonResolver(InputData input) { + super(input); + } + + @Override + public List getFields(OneRow row) throws Exception { + List record = new ArrayList<>(); + record.add(new NullableOneField(DataType.VARCHAR.getOID(), row.getData())); + return record; + } + +} diff --git a/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3ParquetResolver.java b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3ParquetResolver.java new file mode 100644 index 0000000000..29e34e75a8 --- /dev/null +++ b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3ParquetResolver.java @@ -0,0 +1,23 @@ +package org.apache.hawq.pxf.plugins.s3; + +import java.util.List; + +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadResolver; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; + +public class S3ParquetResolver extends Plugin implements ReadResolver { + + public S3ParquetResolver(InputData input) { + super(input); + } + + @SuppressWarnings("unchecked") + @Override + public List getFields(OneRow row) throws Exception { + return (List) row.getData(); + } + +} diff --git a/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3ParquetWriteAccessor.java b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3ParquetWriteAccessor.java new file mode 100644 index 0000000000..832c0b7e99 --- /dev/null +++ b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3ParquetWriteAccessor.java @@ -0,0 +1,83 @@ +package org.apache.hawq.pxf.plugins.s3; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.WriteAccessor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +// See https://blog.cloudera.com/blog/2014/05/how-to-convert-existing-data-into-parquet/ +public class S3ParquetWriteAccessor extends Plugin implements WriteAccessor { + + private static final Log LOG = LogFactory.getLog(S3ParquetWriteAccessor.class); + ParquetWriter writer = null; + + public S3ParquetWriteAccessor(InputData input) { + super(input); + LOG.info("Constructor"); + } + + @Override + public boolean openForWrite() throws Exception { + String fileName = inputData.getDataSource(); + Schema avroSchema = AvroUtil.schemaFromInputData(this.inputData); + LOG.info("openForWrite(): fileName = " + fileName); // /pxf-s3-devel/test-write/1524148597-0000000430_1 + PxfS3 pxfS3 = PxfS3.fromInputData(inputData); + // pxfS3.setObjectName(new String(inputData.getFragmentMetadata())); // This yields a NPE + String s3aURI = pxfS3.getS3aURI(this.inputData); + LOG.info("s3aURI: " + s3aURI); + Path file = new Path(s3aURI); + writer = AvroParquetWriter + .builder(file) + .withSchema(avroSchema) + // TODO: Expose the compression codec via a user-supplied parameter on the table definition URI + //.withCompressionCodec(CompressionCodecName.SNAPPY) // 49 MB (was 95 MB w/o compression) + .withCompressionCodec(CompressionCodecName.GZIP) // 33 MB + .withDictionaryEncoding(true) + //.withDictionaryPageSize(3145728) // No effect + //.withRowGroupSize(16777216) // No effect + .build(); + return true; + } + + @Override + public boolean writeNextObject(OneRow oneRow) throws Exception { + writer.write((GenericRecord) oneRow.getData()); + return true; + } + + @Override + public void closeForWrite() throws Exception { + if (writer != null) { + writer.close(); + writer = null; + } + } +} diff --git a/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3ParquetWriteResolver.java b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3ParquetWriteResolver.java new file mode 100644 index 0000000000..1eac8edf94 --- /dev/null +++ b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3ParquetWriteResolver.java @@ -0,0 +1,98 @@ +package org.apache.hawq.pxf.plugins.s3; + +import java.text.SimpleDateFormat; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +import org.apache.avro.LogicalType; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.apache.avro.generic.GenericData; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.WriteResolver; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; + +@SuppressWarnings("unused") +public class S3ParquetWriteResolver extends Plugin implements WriteResolver { + + private static final Log LOG = LogFactory.getLog(S3ParquetWriteResolver.class); + + // Required for creating GenericRecord containing the row of data + private Schema avroSchema; + + public S3ParquetWriteResolver(InputData input) { + super(input); + avroSchema = AvroUtil.schemaFromInputData(input); + } + + /* + * Refer to: + * https://github.com/apache/parquet-mr/blob/master/parquet-avro/src/test/java/ + * org/apache/parquet/avro/TestReadWrite.java + * https://avro.apache.org/docs/1.8.1/api/java/org/apache/avro/generic/ + * GenericData.Record.html + */ + @Override + public OneRow setFields(List oneFieldList) throws Exception { + GenericData.Record record = new GenericData.Record(avroSchema); + List fieldList = avroSchema.getFields(); + // TODO: handle type conversion, from PostgreSQL to Avro + for (int i = 0; i < oneFieldList.size(); i++) { + OneField oneField = oneFieldList.get(i); + int dbType = oneField.type; + Object value = oneField.val; + + // What's the Avro type, that we'll have to convert to? + Schema.Field avroField = fieldList.get(i); + Schema fieldSchema = avroField.schema(); + Type avroType = fieldSchema.getType(); + List typeList = new ArrayList<>(); + // TODO: move this date/time resolution code into AvroUtil + if (avroType == Schema.Type.UNION) { + for (Schema s : fieldSchema.getTypes()) { + Type t = s.getType(); + LogicalType lt = s.getLogicalType(); + String ltName = "null"; + if (lt != null) { + ltName = lt.getName(); + } + /* Here, the value on the right is lt.getName(): + * + * DATE: (Type.INT, LogicalType = date) + * TIME: (Type.INT, LogicalType = time-millis) + * TIMESTAMP: (Type.LONG, LogicalType = timestamp-millis) + * + * Validated that these do get read back from Parquet with the specfied + * types (int or long). + */ + if (null != value) { + if ("date".equals(ltName) && Type.STRING == t) { + // DATE + value = (String) value; + } else if ("timestamp-millis".equals(ltName) && Type.STRING == t) { + // TIMESTAMP + value = (String) value; + } + } + typeList.add(s.toString() + "(Type." + t + ", LogicalType = " + ltName + ")"); + } + } + LOG.debug("type: " + DataType.get(dbType) + ", value: " + (value == null ? "null" : value.toString()) + + ", types: " + String.join(", ", typeList)); + record.put(fieldList.get(i).name(), value); + } + return new OneRow(null, record); + } + +} diff --git a/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3TextAccessor.java b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3TextAccessor.java new file mode 100644 index 0000000000..26b4e17a25 --- /dev/null +++ b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3TextAccessor.java @@ -0,0 +1,66 @@ +package org.apache.hawq.pxf.plugins.s3; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * ReadAccessor which handles delimited TEXT data, passing each line as its own + * OneRow instance + */ +public class S3TextAccessor extends Plugin implements ReadAccessor { + + private static final Log LOG = LogFactory.getLog(S3TextAccessor.class); + private PxfS3 pxfS3; + + public S3TextAccessor(InputData metaData) { + super(metaData); + pxfS3 = PxfS3.fromInputData(metaData); + pxfS3.setObjectName(new String(metaData.getFragmentMetadata())); + } + + @Override + public boolean openForRead() throws Exception { + LOG.info("openForRead()"); + pxfS3.open(); + return true; + } + + @Override + public OneRow readNextObject() throws Exception { + OneRow row = null; + String line = pxfS3.readLine(); + if (null != line) { + row = new OneRow(null, line); + } + return row; + } + + @Override + public void closeForRead() throws Exception { + LOG.info("closeForRead()"); + pxfS3.close(); + } +} diff --git a/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3TextResolver.java b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3TextResolver.java new file mode 100644 index 0000000000..eaa50a209e --- /dev/null +++ b/pxf/pxf-s3/src/main/java/org/apache/hawq/pxf/plugins/s3/S3TextResolver.java @@ -0,0 +1,59 @@ +package org.apache.hawq.pxf.plugins.s3; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadResolver; +import org.apache.hawq.pxf.api.WriteResolver; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; + +import java.util.LinkedList; +import java.util.List; + +import static org.apache.hawq.pxf.api.io.DataType.VARCHAR; + +public class S3TextResolver extends Plugin implements ReadResolver, WriteResolver { + + public S3TextResolver(InputData metaData) { + super(metaData); + } + + @Override + public List getFields(OneRow row) throws Exception { + List output = new LinkedList(); + Object data = row.getData(); + output.add(new OneField(VARCHAR.getOID(), data)); + return output; + } + + @Override + public OneRow setFields(List record) throws Exception { + // text row data is passed as a single field + if (record == null || record.size() != 1) { + throw new Exception( + "Unexpected record format, expected 1 field, found " + (record == null ? 0 : record.size())); + } + byte[] value = (byte[]) record.get(0).val; + // empty array means the end of input stream, return null to stop iterations + return value.length == 0 ? null : new OneRow(value); + } +} diff --git a/pxf/pxf-s3/src/test/java/org/apache/hawq/pxf/plugins/s3/AvroUtilTest.java b/pxf/pxf-s3/src/test/java/org/apache/hawq/pxf/plugins/s3/AvroUtilTest.java new file mode 100644 index 0000000000..0bf8514a65 --- /dev/null +++ b/pxf/pxf-s3/src/test/java/org/apache/hawq/pxf/plugins/s3/AvroUtilTest.java @@ -0,0 +1,69 @@ +package org.apache.hawq.pxf.plugins.s3; + +import static org.junit.Assert.*; + +import java.util.ArrayList; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.s3.AvroUtil; +import org.junit.Test; + +public class AvroUtilTest { + + @Test + public void testSchemaFromInputData() { + InputData id = new TestInputData(); + ArrayList tupleDescription = new ArrayList<>(); + tupleDescription.add(new ColumnDescriptor("int_col", DataType.INTEGER.getOID(), 0, "int4", null)); + tupleDescription.add(new ColumnDescriptor("text_col", DataType.TEXT.getOID(), 1, "text", null)); + tupleDescription.add(new ColumnDescriptor("bool_col", DataType.BOOLEAN.getOID(), 2, "bool", null)); + ((TestInputData) id).setTupleDescription(tupleDescription); + Schema schemaFromInputData = AvroUtil.schemaFromInputData(id); + assertNotNull(schemaFromInputData); + Schema schemaFromAPI = SchemaBuilder.record("mytable").namespace(AvroUtil.NAMESPACE) + .fields() + .name("int_col").type().nullable().intType().noDefault() + .name("text_col").type().nullable().stringType().noDefault() + .name("bool_col").type().nullable().booleanType().noDefault() + .endRecord(); + assertEquals(schemaFromInputData, schemaFromAPI); + } + + @Test + public void testAddQuotes() { + String withoutQuotes = "this has no quotes yet"; + String withQuotes = "\"this has no quotes yet\""; + assertEquals(withQuotes, AvroUtil.addQuotes(withoutQuotes)); + } + + @Test + public void testAsAvroType() { + assertEquals("\"boolean\"", AvroUtil.asAvroType(DataType.BOOLEAN)); + assertEquals("\"bytes\"", AvroUtil.asAvroType(DataType.BYTEA)); + assertEquals("\"long\"", AvroUtil.asAvroType(DataType.BIGINT)); + assertEquals("\"int\"", AvroUtil.asAvroType(DataType.INTEGER)); + assertEquals("\"string\"", AvroUtil.asAvroType(DataType.TEXT)); + assertEquals("\"float\"", AvroUtil.asAvroType(DataType.REAL)); + assertEquals("\"double\"", AvroUtil.asAvroType(DataType.FLOAT8)); + assertEquals("{ \"type\": \"string\", \"logicalType\": \"date\" }", AvroUtil.asAvroType(DataType.DATE)); + assertEquals("{ \"type\": \"string\", \"logicalType\": \"timestamp-millis\" }", AvroUtil.asAvroType(DataType.TIMESTAMP)); + } + + /** + * This exists solely to permit setting the tupleDescription field of the + * InputData instance + */ + static class TestInputData extends InputData { + public TestInputData() { + super(); + } + + public void setTupleDescription(ArrayList tupleDescription) { + super.tupleDescription = tupleDescription; + } + } +} diff --git a/pxf/pxf-s3/src/test/java/org/apache/hawq/pxf/plugins/s3/InputDataMock.java b/pxf/pxf-s3/src/test/java/org/apache/hawq/pxf/plugins/s3/InputDataMock.java new file mode 100644 index 0000000000..12753713cb --- /dev/null +++ b/pxf/pxf-s3/src/test/java/org/apache/hawq/pxf/plugins/s3/InputDataMock.java @@ -0,0 +1,34 @@ +package org.apache.hawq.pxf.plugins.s3; + +import java.util.HashMap; + +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.s3.PxfS3; + +/** + * This exists so the various userProperty values can be set + * InputData instance + */ +public class InputDataMock extends InputData { + + public InputDataMock() { + super(); + super.requestParametersMap = new HashMap(); + } + + public void setUserProperty(String key, String value) { + super.requestParametersMap.put(USER_PROP_PREFIX + key.toUpperCase(), value); + } + + /** + * Helper to build InputData + * @return an InputData instance + */ + public static InputData getInputData() { + InputDataMock inputData = new InputDataMock(); + inputData.setUserProperty(PxfS3.S3_ACCESS_KEY, PxfS3Test.TEST_S3_ACCESS_KEY); + inputData.setUserProperty(PxfS3.S3_SECRET_KEY, PxfS3Test.TEST_S3_SECRET_KEY); + inputData.setUserProperty(PxfS3.S3_REGION, PxfS3Test.TEST_S3_REGION); + return inputData; + } +} diff --git a/pxf/pxf-s3/src/test/java/org/apache/hawq/pxf/plugins/s3/PxfS3Mock.java b/pxf/pxf-s3/src/test/java/org/apache/hawq/pxf/plugins/s3/PxfS3Mock.java new file mode 100644 index 0000000000..139d3b3ba9 --- /dev/null +++ b/pxf/pxf-s3/src/test/java/org/apache/hawq/pxf/plugins/s3/PxfS3Mock.java @@ -0,0 +1,90 @@ +package org.apache.hawq.pxf.plugins.s3; + +import java.io.IOException; + +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.s3.PxfS3; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.AnonymousAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; + +import io.findify.s3mock.S3Mock; + +public class PxfS3Mock extends PxfS3 { + + private static final int PORT = 1999; + private static S3Mock API = null; + + /* + * static { API = new + * S3Mock.Builder().withPort(PORT).withInMemoryBackend().build(); API.start(); } + */ + + public PxfS3Mock(String s3AccessKey, String s3SecretKey, String s3Region) { + super(); + this.setS3AccessKey(s3AccessKey); + this.setS3SecretKey(s3SecretKey); + if (null == API) { + API = new S3Mock.Builder().withPort(PORT).withInMemoryBackend().build(); + API.start(); + } + AwsClientBuilder.EndpointConfiguration endpoint = new AwsClientBuilder.EndpointConfiguration( + "http://localhost:" + PORT, s3Region); + AmazonS3 s3 = AmazonS3ClientBuilder.standard().withPathStyleAccessEnabled(true) + .withEndpointConfiguration(endpoint) + .withCredentials(new AWSStaticCredentialsProvider(new AnonymousAWSCredentials())).build(); + setS3(s3); + } + + public static PxfS3 fromInputData(InputData inputData) { + String s3AccessKey = inputData.getUserProperty(PxfS3.S3_ACCESS_KEY); + String s3SecretKey = inputData.getUserProperty(PxfS3.S3_SECRET_KEY); + String s3Region = inputData.getUserProperty(PxfS3.S3_REGION); + PxfS3 rv = new PxfS3Mock(s3AccessKey, s3SecretKey, s3Region); + rv.setBucketName(inputData); + return rv; + } + + public void close() throws IOException { + super.close(); + if (API != null) { + API.shutdown(); + } + } + + /** + * Example of how to use this class + * + * NOTE: the try/catch/finally is important since, if an exception is thrown, + * this will keep the port open, preventing future runs (within the same JVM; + * e.g. the IDE). + */ + public static void main(String[] args) throws IOException { + System.out.println("Starting ..."); + PxfS3 pxfS3 = null; + try { + InputData inputData = InputDataMock.getInputData(); + inputData.setDataSource("/pxf-s3-devel/test-write"); + pxfS3 = PxfS3Mock.fromInputData(inputData); + AmazonS3 s3 = pxfS3.getS3(); + s3.createBucket(pxfS3.getBucketName()); + s3.putObject(pxfS3.getBucketName(), "test-write/1524148597-0000000911_1", + "contents of 1524148597-0000000911_1"); + s3.putObject(pxfS3.getBucketName(), "test-write/1524496731-0000002514_5", + "contents of 1524496731-0000002514_5"); + for (String key : pxfS3.getKeysInBucket(inputData)) { + System.out.println("key: " + key); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (null != pxfS3) { + pxfS3.close(); + } + } + } + +} diff --git a/pxf/pxf-s3/src/test/java/org/apache/hawq/pxf/plugins/s3/PxfS3Test.java b/pxf/pxf-s3/src/test/java/org/apache/hawq/pxf/plugins/s3/PxfS3Test.java new file mode 100644 index 0000000000..001dbc6982 --- /dev/null +++ b/pxf/pxf-s3/src/test/java/org/apache/hawq/pxf/plugins/s3/PxfS3Test.java @@ -0,0 +1,50 @@ +package org.apache.hawq.pxf.plugins.s3; + +import static org.junit.Assert.*; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.s3.PxfS3; +import org.junit.Test; + +public class PxfS3Test { + + static final String TEST_S3_ACCESS_KEY = "DAZFVISAXWVBOBDBHQZX"; + static final String TEST_S3_SECRET_KEY = "IhipDEysJnknxCBeX/9LGRg35l55owj/wf7KZYh9"; + static final String TEST_S3_REGION = "us-east-1"; + + /** + * Supports READ operations + * + * @throws UnsupportedEncodingException + */ + @Test + public void testGetS3aURIForRead() throws UnsupportedEncodingException { + InputData inputData = InputDataMock.getInputData(); + inputData.setDataSource("pxf-s3-devel/sample-from-spark/part-0"); + PxfS3 pxfS3 = PxfS3.fromInputData(inputData); + pxfS3.setObjectName("sample-from-spark/part-00000-f5d896f5-e61e-4991-935e-95707ff4d6eb-c000.snappy.parquet"); + String s3aURI = pxfS3.getS3aURI(); + String expectedS3aURI = "s3a://" + PxfS3Test.TEST_S3_ACCESS_KEY + ":" + URLEncoder.encode(PxfS3Test.TEST_S3_SECRET_KEY, "UTF-8") + + "@" + "pxf-s3-devel" + "/" + "sample-from-spark/part-00000-f5d896f5-e61e-4991-935e-95707ff4d6eb-c000.snappy.parquet"; + assertEquals(expectedS3aURI, s3aURI); + } + + /** + * Supports WRITE operations + * + * @throws UnsupportedEncodingException + */ + @Test + public void testGetS3aURIForWrite() throws UnsupportedEncodingException { + InputData inputData = InputDataMock.getInputData(); + inputData.setDataSource("/pxf-s3-devel/test-write/1524148597-0000000430_1"); + PxfS3 pxfS3 = PxfS3.fromInputData(inputData); + String s3aURI = pxfS3.getS3aURI(inputData); + String expectedS3aURI = "s3a://" + PxfS3Test.TEST_S3_ACCESS_KEY + ":" + URLEncoder.encode(PxfS3Test.TEST_S3_SECRET_KEY, "UTF-8") + + "@" + "pxf-s3-devel" + "/" + "test-write/1524148597-0000000430_1"; + assertEquals(expectedS3aURI, s3aURI); + } + +} diff --git a/pxf/pxf-s3/src/test/java/org/apache/hawq/pxf/plugins/s3/S3FragmenterTest.java b/pxf/pxf-s3/src/test/java/org/apache/hawq/pxf/plugins/s3/S3FragmenterTest.java new file mode 100644 index 0000000000..b01c16a273 --- /dev/null +++ b/pxf/pxf-s3/src/test/java/org/apache/hawq/pxf/plugins/s3/S3FragmenterTest.java @@ -0,0 +1,99 @@ +package org.apache.hawq.pxf.plugins.s3; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hawq.pxf.api.Fragment; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.s3.PxfS3; +import org.apache.hawq.pxf.plugins.s3.S3Fragmenter; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.amazonaws.services.s3.AmazonS3; + +public class S3FragmenterTest { + + private static PxfS3 pxfS3; + private static InputData inputData; + + @Test + public void testGetFragments() throws Exception { + S3Fragmenter fragmenter = new S3Fragmenter(inputData, pxfS3); + List testFragments = fragmenter.getFragments(); + List expectFragments = new ArrayList<>(); + String pxfHost = InetAddress.getLocalHost().getHostAddress(); + String[] hosts = new String[] { pxfHost }; + for (String key : Arrays.asList("test-write/1524148597-0000000911_1", "test-write/1524496731-0000002514_5")) { + expectFragments.add(new Fragment(inputData.getDataSource(), hosts, key.getBytes())); + } + // Verify same number of elements + System.out.println("Comparing lengths of the Fragment lists ..."); + assertTrue(expectFragments.size() == testFragments.size()); + // Verify equality of each Fragment + for (int i = 0; i < expectFragments.size(); i++) { + System.out.println("Comparing Fragment[" + i + "] ..."); + assertTrue(fragmentEquals(expectFragments.get(i), testFragments.get(i))); + } + } + + /** + * In the absence of an equals() method in Fragment, verify equality only of the + * fields in use here + * + * @param a + * left hand side + * @param b + * right hand side + * @return whether they are equal + */ + private boolean fragmentEquals(Fragment a, Fragment b) { + // sourceName + boolean rv = a.getSourceName().equals(b.getSourceName()); + // replicas + String[] replicasA = a.getReplicas(); + String[] replicasB = b.getReplicas(); + if (replicasA.length == replicasB.length) { + List replicasAList = Arrays.asList(replicasA); + for (int i = 0; i < replicasB.length; i++) { + if (!replicasAList.contains(replicasB[i])) { + rv = false; + } + } + } + // metadata + if (!Arrays.equals(a.getMetadata(), b.getMetadata())) { + rv = false; + } + return rv; + } + + @BeforeClass + public static void setUpMock() { + inputData = InputDataMock.getInputData(); + inputData.setDataSource("/pxf-s3-devel/test-write"); + pxfS3 = PxfS3Mock.fromInputData(inputData); + AmazonS3 s3 = pxfS3.getS3(); + s3.createBucket(pxfS3.getBucketName()); + s3.putObject(pxfS3.getBucketName(), "test-write/1524148597-0000000911_1", + "contents of 1524148597-0000000911_1"); + s3.putObject(pxfS3.getBucketName(), "test-write/1524496731-0000002514_5", + "contents of 1524496731-0000002514_5"); + } + + @AfterClass + public static void destroyMock() { + try { + pxfS3.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + +} diff --git a/pxf/settings.gradle b/pxf/settings.gradle index e26f812590..53106f5264 100644 --- a/pxf/settings.gradle +++ b/pxf/settings.gradle @@ -27,3 +27,4 @@ include 'pxf-hive' include 'pxf-hbase' include 'pxf-json' include 'pxf-jdbc' +include 'pxf-s3'