This sample demonstrates the use of Pravega Watermarks in Flink applications.
Flink offers event-time characteristic.
The mechanism in Flink to measure progress in event time is watermarks.
Watermarks flow as part of the data stream and carry a timestamp t
.
A Watermark(t)
declares that event time has reached time t
in that stream, meaning that there should be no more elements from the stream with a timestamp t’
<= t
(i.e. events with timestamps older or equal to the watermark).
Pravega has innovated a strategy to generate the watermark within the stream in 0.6 release. The main design is close to the concept in Flink. The new Pravega watermark API enables the writer to provide event time information, and provide a time bound from the writers to the readers so that they can identify where they are in the stream.
The following example will show how to utilize Pravega watermark in Flink applications. In addition, you can also follow the instruction to set up this project running on Dell EMC Streaming Data Platform.
PravegaWatermarkIngestion
is a Pravega writer to generate synthetic sensor data with event time and ingest into a Pravega stream.
It mocks a sine wave for three sensors and emits data every second in event time.
There is a slight difference that Flink forces that event timestamp can be extracted from each record, while Pravega as a streaming storage doesn’t have that limitation.
In order to enable Pravega watermark to transfer into Flink, Flink readers accepts an implementation of
1: How to extract timestamp from each record
2: How to leverage the watermark timestamp given the time bound from Pravega readers.
This thought is abstracted into an interface called AssignerWithTimeWindows
. It’s up to you to implement it.
While building the reader, please use withTimestampAssigner(new MyAssignerWithTimeWindows())
to register the assigner.
The application reads text from a socket, assigns the event time and then propagating the Flink watermark into Pravega with enableWatermark(true)
in the Flink writer.
EventTimeAverage
reads sensor data from the stream with Pravega watermark, calculates an average value for each sensor under an fixed-length event-time window and generates the summary sensor data back into another Pravega stream.
You can run it recursively by reusing the result of the smaller window.
Before you start, you need to download the latest Pravega release on the github releases page. See here for the instructions to build and run Pravega in standalone mode.
Besides, you also need to get the latest Flink binary from Apache download page. Follow this tutorial to start a local Flink cluster.
pravega-samples
Repositorypravega-samples
repository provides code samples to connect analytics engines Flink with Pravega as a storage substrate for data streams. It has divided into sub-projects(pravega-client-examples
, flink-connector-examples
and hadoop-connector-examples
), each one addressed to demonstrate a specific component. To build pravega-samples
from source, follow the instructions to use the built-in gradle wrapper.
There are multiple ways to run the program in Flink environment including submitting from terminal or Flink UI. Follow the latest instruction from the github README to learn more about the IDE setup and running process.
Since the original flink connector watermark example was designed to run on a standalone Pravega and Flink environment, the code used the createScope
method from the StreamManager Interface in Pravega. However, due to the security reason, Dell EMC Streaming Data Platform does not allow to create a scope from the code. Please comment out createScope
method from your code.
There are two occurrences of createScope
method in this watermark example which locate on following Java file:
I. pravega-samples/flink-connector-examples/src/main/java/io/pravega/example/flink/Utils.java
II. pravega-samples/flink-connector-examples/src/main/java/io/pravega/example/flink/watermark/PravegaWatermarkIngestion.java
If you choose to use the Gradle Build Tool, make sure the Maven repo in SDP available to your development workstation as mentioned in the post. The following is an example of pravega-samples/flink-connector-examples/build.gradle
file which only shows the modified section. Since the example uses shadow JARs, make sure to add classifier = ""
and zip64 true
to the shadowJar
config.
shadowJar {
dependencies {
include dependency("org.scala-lang.modules:scala-java8-compat_${flinkScalaVersion}")
include dependency("io.pravega:pravega-connectors-flink-${flinkMajorMinorVersion}_${flinkScalaVersion}")
include dependency("io.pravega:pravega-keycloak-client:${pravegaKeycloakVersion}")
}
classifier = ""
zip64 true
}
publishing {
repositories {
maven {
url = "http://localhost:9090/maven2"
credentials {
username "desdp"
password "password"
}
authentication {
basic(BasicAuthentication)
}
}
}
publications {
shadow(MavenPublication) { publication ->
project.shadow.component(publication)
}
}
}
The preferable name/namespace for this project is watermark-examples
.
The Flink Image for creating the cluster is 1.9.0
.
The Main Class for creating the new app is io.pravega.example.flink.watermark.EventTimeAverage
. Please also make sure to pass the same parameters as discussed in the original watermark post.
After finishing the above steps, the State for your Flink application should be shown as Started.
PravegaWatermarkIngestion
application with the Pravega stream on Dell EMC Streaming Data PlatformAfter setting up the project namespace on SDP, configure Keycloak authentication by following this post. You need to set the same configurations for the PravegaWatermarkIngestion
application. Then you can run the application in the same way as discussed in the original post.
Unlike the standalone mode, the result will not be showing in the console output. You need to access the pod’s log by using kubectl
. The following is an example of using the command (namespace may change based on your settings):
kubectl logs -f watermark-examples-taskmanager-0 -n watermark-examples
You can find the same results as showed in the post.
https://github.com/pravega/pravega-samples/tree/master/flink-connector-examples/doc/watermark
Flink implements many techniques from the Dataflow Model, and Pravega aligns with it. For better knowledge about event time and watermarks, the following articles can be helpful.