This example shows how to exercise the path matching functionality of the
Location Library
in a streaming environment.
The example map-matches data coming from the sample-streaming-layer
that
contains SDII messages. The catalog that contains this layer is defined in the
sdii-catalog
entry in config/here/pipeline-config.conf
or
config/here-china/pipeline-config.conf
for China. The example writes the
result of the computation to the out-data
streaming layer in the catalog you
will create.
The StreamPathMatcherExample
class does the following:
- Get the Flink
StreamExecutionEnvironment
- Add a
SdiiMessageMapFunction
, the FlinkSourceFunction
that provides data from the input streaming layer and deserializes it - Add a
PathMatcherMapFunction
, the FlinkSourceFunction
that does the path matching - Put the result descriptions into the catalog you will create, as well as print them to the standard output
Before you execute the instructions in the next sections of this document, read
the Prerequisites for the Location Library
examples.
To run the example, you need access to the following catalogs:
HERE Optimized Map for Location Library
or China specificHERE Optimized Map for Location Library
catalogHERE Sample SDII Messages - Berlin
orHERE Sample SDII Messages - China
To run this example, you need two sets of credentials:
- Platform credentials: To get access to platform data and resources, including HERE Map Content data for your pipeline input.
- Repository credentials: To download HERE Data SDK for Java & Scala libraries and Maven archetypes to your environment.
For more details on how to set up your credentials, see the Identity & Access Management Developer Guide.
For more details on how to verify that your platform credentials are configured correctly, see the Verify Your Credentials tutorial.
To follow this example, you will need a project. A project is a collection of platform resources (catalogs, pipelines, and schemas) with controlled access. You can create a project through the platform portal.
Alternatively, use the OLP CLI olp project create
command to create the project:
olp project create $PROJECT_ID $PROJECT_NAME
The command returns the HERE Resource Name (HRN) of your new project. Note down this HRN as you will need it later in this tutorial.
You do not have to provide a
--scope
parameter if your app has a default scope. For details on how to set a default project scope for an app, see the Specify a default Project for Apps chapter of the Identity & Access Management Developer Guide.
For more information on how to work with projects, see the Organize your work in projects tutorial.
The catalog you need to create is used to store the results of the Stream Path Matcher
example.
Use the HERE platform portal to create the output catalog in your project and add the following layers:
Layer ID | Layer Type | Content Type | Content Encoding | Coverage |
---|---|---|---|---|
out-data | Stream | application/x-protobuf | uncompressed | DE or CN |
Alternatively, you can use the OLP CLI to create the catalog and the corresponding layers.
In the commands that follow, replace the variable placeholders with the following values:
$CATALOG_ID
is your output catalog's ID.$CATALOG_HRN
is your output catalog'sHRN
(returned byolp catalog create
).$PROJECT_HRN
is your project'sHRN
(returned byolp project create
command).$COVERAGE
is a two-letter code for country and region (in this caseDE
orCN
for China)$INPUT_SDII_CATALOG
is the HRN of the public sdii-catalog catalog in your pipeline configuration (HERE environment or HERE environment in China).$INPUT_OPTIMIZED_MAP_CATALOG
is the HRN of the public optimized-map-catalog catalog in your pipeline configuration (HERE environment or HERE environment in China).
We recommend to set values to variables so that you can easily copy and execute the following commands.
- Use the
olp catalog create
command to create the catalog. Make sure to note down the HRN returned by the following command for later use:
olp catalog create $CATALOG_ID $CATALOG_ID --summary "Output catalog for Stream Path Matcher example" \
--description "Output catalog for Stream Path Matcher example" \
--scope $PROJECT_HRN
- Use the
olp catalog layer add
command to add a stream layer to your catalog:
olp catalog layer add $CATALOG_HRN out-data out-data --stream --summary "Layer for output partitions" \
--description "Layer for output partitions" --coverage $COVERAGE \
--scope $PROJECT_HRN
- Update the output catalog HRN in the
pipeline-config.conf
file
The config/here/pipeline-config.conf
(for the HERE platform environment) and
config/here-china/pipeline-config.conf
(for the HERE platform environment in China) files contain
the permanent configuration of the data sources for the example.
Pick the file that corresponds to your platform environment and replace YOUR_OUTPUT_CATALOG_HRN
placeholder
with the HRN of your Stream Path Matcher catalog.
To find the HRN, in the HERE platform portal or the HERE platform portal in China, navigate to your catalog. The HRN is displayed in the upper left corner of the page.
- Use the
olp project resources link
command to link the HERE Sample SDII Messages - Berlin and HERE Optimized Map for Location Library catalog to your project.
olp project resources link $PROJECT_HRN $INPUT_SDII_CATALOG
olp project resources link $PROJECT_HRN $INPUT_OPTIMIZED_MAP_CATALOG
- For more details on catalog commands, see Catalog Commands.
- For more details on layer commands, see Layer Commands.
- For more details on project commands, see Project Commands.
- For instructions on how to link a resource to a project, see Project Resources Link command.
We're first going to run the example using a Flink local environment, suitable for local development and debugging.
- Compile and execute the example
mvn --projects=:java-flink-stream-path-matcher compile exec:java \
-Dpipeline-config.file=$PATH_TO_CONFIG_FOLDER/pipeline-config.conf \
-Dpipeline-job.file=$PATH_TO_CONFIG_FOLDER/pipeline-job.conf \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope=$PROJECT_HRN
- Open different terminal and let a few partitions stream out of the layer, they consist of small, one-line messages
olp catalog layer stream get $CATALOG_HRN out-data --delimiter=\\n --limit=42 --timeout=900 --scope $PROJECT_HRN
- End stream example process after partitions were retrieved
The following steps allow you to run the StreamPathMatcherExample
pipeline on the platform.
Generate a "fat jar" for StreamPathMatcherExample
that will be sent to the platform later
mvn -Pplatform package
You can use the OLP CLI to create pipeline components and activate the pipeline version with the following commands:
- Create pipeline components:
olp pipeline create $COMPONENT_NAME_Pipeline --scope $PROJECT_HRN
olp pipeline template create $COMPONENT_NAME_Template stream-4.0 $PATH_TO_JAR \
com.here.platform.example.location.java.flink.StreamPathMatcherExample \
--input-catalog-ids="$PATH_TO_CONFIG_FOLDER/pipeline-config.conf" \
--scope $PROJECT_HRN
olp pipeline version create $COMPONENT_NAME_version $PIPELINE_ID $PIPELINE_TEMPLATE_ID \
"$PATH_TO_CONFIG_FOLDER/pipeline-config.conf" \
--scope $PROJECT_HRN
- Make sure logs are produced with info level
olp pipeline version log level set $PIPELINE_ID $PIPELINE_VERSION_ID \
--log4j-properties="$PATH_TO_PROJECT/src/main/resources/log4j.properties" \
--scope $PROJECT_HRN
If the operation is successful, you should be able to see the log level you just set:
olp pipeline version log level get $PIPELINE_ID $PIPELINE_VERSION_ID \
--scope $PROJECT_HRN
- Activate the pipeline version:
olp pipeline version activate $PIPELINE_ID $PIPELINE_VERSION_ID --input-catalogs "$PATH_TO_CONFIG_FOLDER/pipeline-job.conf" --scope $PROJECT_HRN
Partitions consist of small, one-line messages.
olp catalog layer stream get $CATALOG_HRN out-data --delimiter=\\n --limit=42 --timeout=900 --scope $PROJECT_HRN
You can now monitor the performance of your pipeline job by creating a custom graph on Grafana. To do this you can use the following query in a Grafana panel specifying your pipeline ID:
flink_taskmanager_job_task_operator_numRecordsOutPerSecond{operator_name="mapmatch_sdii_message",pipelineId="$pipeline_id"}
The resulting graph represents the number of SDII messages that are map matched per second.
After partition inspection, cancel the stream pipeline.
olp pipeline version cancel $PIPELINE_ID $PIPELINE_VERSION_ID --scope $PROJECT_HRN