Powered by Zoomin Software. For more details please contactZoomin

Data Client Library - Developer Guide

Product category
Technology
Doc type
Version
Product lifecycle
This publication

Data Client Library - Developer Guide: Write Index Data

Table of Contents

Write Index Data

Write Index Layer Data

The Data Client Library provides the class LayerDataFrameWriter, a custom Spark DataFrameWriter for writing DataFrames to index layers.

For index layers, data will be grouped by the values of the index attributes defined for the layer. If writing to a Protobuf-Encoded layer, there needs to be only one Row for each set of index attributes.

Project Dependencies

If you want to create an application that uses the HERE platform Spark Connector to write data to index layer, add the required dependencies to your project as described in chapter Dependencies for Spark Connector.

Write process

For index layers, DataFrame rows are grouped by indexes attributes to create the index data. The data will be uploaded into the index layer using the write engine and afterwards published using the Publish API. Also, DataFrame can include additional metadata.

Metadata fields

All provided metadata columns for index layer:

Write Data as Avro, Parquet, or Protobuf-Encoded Files

The following snippet demonstrates how to write a DataFrame as an Avro, Parquet, or Protobuf-Encoded data file to an index layer of a catalog:


import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.{DataFrame, SparkSession}

    // val inputDF: DataFrame (Input data stored as a DataFrame)
    // val outputCatalogHrn: HRN (HRN of the output catalog that contains the layer $outputLayerId)
    // val outputLayerId: String (ID of the output index layer. If protobuf, the schema should match the non-indexed columns of $inputDF)
    inputDF
      .writeLayer(outputCatalogHrn, outputLayerId)
      .save()
    


import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;


    // Dataset<Row> inputDF (Input data stored as a DataFrame)
    // HRN outputCatalogHrn (HRN of a catalog that contains the layer $outputLayerId)
    // String outputLayerId (ID of the output index layer. If protobuf, the schema should match the
    // non-indexed columns of $inputDF)
    JavaLayerDataFrameWriter.create(inputDF)
        .writeLayer(outputCatalogHrn, outputLayerId)
        .option("olp.connector.metadata-columns", true)
        .save();
    

Write Data in Csv Format

The following snippet demonstrates how to write a DataFrame as a file with an arbitrary format. In this example, the input DataFrame contains column field1 as integer, field2 as string.


import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt

import com.here.platform.data.client.spark.LayerDataFrameWriter._

    val sparkSession: SparkSession =
      SparkSession.builder().master("local").appName("write-csv").getOrCreate()

    // val sparkSession: org.apache.spark.sql.SparkSession
    // val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
    // val layerId: String (ID of versioned layer)
    
    val inputDataFrame = loadDataFrame(sparkSession)

    inputDataFrame
      .writeLayer(catalogHrn, layerId)
      .option("header", "true")
      .save()
    
    sparkSession.stop()
    


import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;


    SparkSession sparkSession =
        SparkSession.builder().master("local").appName("write-csv").getOrCreate();

    // val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
    // val layerId: String (ID of an index layer)
    HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
    String layerId = "index-csv-layer";
    
    Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

    JavaLayerDataFrameWriter.create(inputDataFrame)
        .writeLayer(catalogHrn, layerId)
        .option("header", "true")
        .save();
    
    sparkSession.stop();
    

Write Data in Text Format

The following snippet demonstrates how to write a DataFrame as a file with an arbitrary format. In this example, the input DataFrame contains a column data with message as string.


import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt

import com.here.platform.data.client.spark.LayerDataFrameWriter._

    val sparkSession: SparkSession =
      SparkSession.builder().master("local").appName("write-text").getOrCreate()

    // val sparkSession: org.apache.spark.sql.SparkSession
    // val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
    // val layerId: String (ID of versioned layer)
    
    val inputDataFrame = loadDataFrame(sparkSession)

    inputDataFrame
      .writeLayer(catalogHrn, layerId)
      .option("header", "true")
      .save()
    
    sparkSession.stop()
    


import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;


    SparkSession sparkSession =
        SparkSession.builder().master("local").appName("write-text").getOrCreate();

    // val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
    // val layerId: String (ID of an index layer)
    HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
    String layerId = "index-json-layer";
    
    Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

    JavaLayerDataFrameWriter.create(inputDataFrame)
        .writeLayer(catalogHrn, layerId)
        .option("header", "true")
        .save();
    
    sparkSession.stop();
    

Write Data in JSON Format

The following snippet demonstrates how to write a DataFrame as a file with an arbitrary format. In this example, the input DataFrame contains column intVal as integer, strVal as string.


import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt

import com.here.platform.data.client.spark.LayerDataFrameWriter._

    val sparkSession: SparkSession =
      SparkSession.builder().master("local").appName("write-json").getOrCreate()

    // val sparkSession: org.apache.spark.sql.SparkSession
    // val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
    // val layerId: String (ID of versioned layer)
    
    val inputDataFrame = loadDataFrame(sparkSession)

    inputDataFrame
      .writeLayer(catalogHrn, layerId)
      .option("header", "true")
      .save()
    
    sparkSession.stop()
    


import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;


    SparkSession sparkSession =
        SparkSession.builder().master("local").appName("write-json").getOrCreate();

    // val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
    // val layerId: String (ID of an index layer)
    HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
    String layerId = "index-json-layer";
    
    Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

    JavaLayerDataFrameWriter.create(inputDataFrame)
        .writeLayer(catalogHrn, layerId)
        .option("header", "true")
        .save();
    
    sparkSession.stop();
    

Write Data in Other Formats

The following snippet demonstrates how to write a DataFrame as a file with an arbitrary format to an index layer of a catalog. In this example, the input DataFrame contains a column data with messages as strings and the data of multiple rows is simply concatenated together:


import com.here.examples.platform.data.client.spark.ExampleUtil._
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.data.client.spark.scaladsl.{
  GroupedData,
  IndexDataConverter,
  IndexRowMetadata
}
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Encoders, Row, SparkSession}

    // val inputDF: DataFrame (Input data stored as a DataFrame)
    // val catalogHrn: HRN (HRN of the output catalog that contains the layer $layerId)
    // val layerId: String (ID of the output index layer)
    inputDF
      .writeLayer(catalogHrn, layerId)
      .withDataConverter(new IndexDataConverter {
        override def serializeGroup(
            rowMetadata: IndexRowMetadata,
            rows: Iterator[Row]
        ): GroupedData[IndexRowMetadata] = {
          val joinedText = rows
            .map(_.getAs[Array[Byte]]("data").map(_.toChar).mkString)
            .mkString
          GroupedData(rowMetadata, joinedText.getBytes())
        }
      })
      .save()
    


import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;


    // Dataset<Row> inputDF (Input data stored as a DataSet<Row>)
    // HRN catalogHrn (HRN of the output catalog that contains the layer $layerId)
    // String layerId: String (ID of the output index layer)
    JavaLayerDataFrameWriter.create(inputDF)
        .writeLayer(catalogHrn, layerId)
        .withDataConverter(
            new IndexDataConverter() {
              @Override
              public GroupedData<IndexRowMetadata> serializeGroup(
                  IndexRowMetadata rowMetadata, Iterator<Row> rows) {
                StringBuilder builder = new StringBuilder();
                rows.forEachRemaining(row -> builder.append(new String(row.<byte[]>getAs("data"))));
                String joinedText = builder.toString();
                return new GroupedData<>(rowMetadata, joinedText.getBytes());
              }
            })
        .save();
    

Note

For information on RSQL, see RSQL.

Was this article helpful?
TitleResults for “How to create a CRG?”Also Available inAlert