Transport
Tip: See current list of all supported transports.
HTTP
Allows sending events to HTTP endpoint, using ApacheHTTPClient.
Configuration
type
- string, must be"http"
. Required.url
- string, base url for HTTP requests. Required.endpoint
- string specifying the endpoint to which events are sent, appended tourl
. Optional, default:/api/v1/lineage
.urlParams
- dictionary specifying query parameters send in HTTP requests. Optional.timeoutInMillis
- integer specifying timeout (in milliseconds) value used while connecting to server. Optional, default:5000
.auth
- dictionary specifying authentication options. Optional, by default no authorization is used. If set, requires thetype
property.type
- string specifying the "api_key" or the fully qualified class name of your TokenProvider. Required ifauth
is provided.apiKey
- string setting the Authentication HTTP header as the Bearer. Required iftype
isapi_key
.
headers
- dictionary specifying HTTP request headers. Optional.
Behavior
Events are serialized to JSON, and then are send as HTTP POST request with Content-Type: application/json
.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
Anonymous connection:
transport:
type: http
url: http://localhost:5000
With authorization:
transport:
type: http
url: http://localhost:5000
auth:
type: api_key
api_key: f38d2189-c603-4b46-bdea-e573a3b5a7d5
Full example:
transport:
type: http
url: http://localhost:5000
endpoint: /api/v1/lineage
urlParams:
param0: value0
param1: value1
timeoutInMillis: 5000
auth:
type: api_key
api_key: f38d2189-c603-4b46-bdea-e573a3b5a7d5
headers:
X-Some-Extra-Header: abc
Anonymous connection:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
With authorization:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
spark.openlineage.transport.auth.type=api_key
spark.openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
Full example:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
spark.openlineage.transport.endpoint=/api/v1/lineage
spark.openlineage.transport.urlParams.param0=value0
spark.openlineage.transport.urlParams.param1=value1
spark.openlineage.transport.timeoutInMillis=5000
spark.openlineage.transport.auth.type=api_key
spark.openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
spark.openlineage.transport.headers.X-Some-Extra-Header=abc
URL parsing within Spark integration
You can supply http parameters using values in url, the parsed spark.openlineage.*
properties are located in url as follows:
{transport.url}/{transport.endpoint}/namespaces/{namespace}/jobs/{parentJobName}/runs/{parentRunId}?app_name={appName}&api_key={transport.apiKey}&timeout={transport.timeout}&xxx={transport.urlParams.xxx}
example:
http://localhost:5000/api/v1/namespaces/ns_name/jobs/job_name/runs/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx?app_name=app&api_key=abc&timeout=5000&xxx=xxx
Anonymous connection:
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://localhost:5000
With authorization:
openlineage.transport.type=http
openlineage.transport.url=http://localhost:5000
openlineage.transport.auth.type=api_key
openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
Full example:
openlineage.transport.type=http
openlineage.transport.url=http://localhost:5000
openlineage.transport.endpoint=/api/v1/lineage
openlineage.transport.urlParams.param0=value0
openlineage.transport.urlParams.param1=value1
openlineage.transport.timeoutInMillis=5000
openlineage.transport.auth.type=api_key
openlineage.transport.auth.apiKey=f38d2189-c603-4b46-bdea-e573a3b5a7d5
openlineage.transport.headers.X-Some-Extra-Header=abc
Anonymous connection:
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.HttpTransport;
HttpConfig httpConfig = new HttpConfig();
httpConfig.setUrl("http://localhost:5000");
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new HttpTransport(httpConfig))
.build();
With authorization:
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.ApiKeyTokenProvider;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.HttpTransport;
ApiKeyTokenProvider apiKeyTokenProvider = new ApiKeyTokenProvider();
apiKeyTokenProvider.setApiKey("f38d2189-c603-4b46-bdea-e573a3b5a7d5");
HttpConfig httpConfig = new HttpConfig();
httpConfig.setUrl("http://localhost:5000");
httpConfig.setAuth(apiKeyTokenProvider);
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new HttpTransport(httpConfig))
.build();
Full example:
import java.util.Map;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.ApiKeyTokenProvider;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.HttpTransport;
Map<String, String> queryParams = Map.of(
"param0", "value0",
"param1", "value1"
);
Map<String, String> headers = Map.of(
"X-Some-Extra-Header", "abc"
);
ApiKeyTokenProvider apiKeyTokenProvider = new ApiKeyTokenProvider();
apiKeyTokenProvider.setApiKey("f38d2189-c603-4b46-bdea-e573a3b5a7d5");
HttpConfig httpConfig = new HttpConfig();
httpConfig.setUrl("http://localhost:5000");
httpConfig.setEndpoint("/api/v1/lineage");
httpConfig.setUrlParams(queryParams);
httpConfig.setAuth(apiKeyTokenProvider);
httpConfig.setTimeoutInMillis(headers);
httpConfig.setHeaders(5000);
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new HttpTransport(httpConfig))
.build();
Kafka
If a transport type is set to kafka
, then the below parameters would be read and used when building KafkaProducer.
This transport requires the artifact org.apache.kafka:kafka-clients:3.1.0
(or compatible) on your classpath.
Configuration
type
- string, must be"kafka"
. Required.topicName
- string specifying the topic on what events will be sent. Required.localServerId
- string, id of local server. Required.properties
- a dictionary containing a Kafka producer config as in Kafka producer config. Required.
Behavior
Events are serialized to JSON, and then dispatched to the Kafka topic.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
transport:
type: kafka
topicName: openlineage.events
properties:
bootstrap.servers: localhost:9092,another.host:9092
acks: all
retries: 3
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
localServerId: some-value
spark.openlineage.transport.type=kafka
spark.openlineage.transport.topicName=openlineage.events
spark.openlineage.transport.localServerId=xxxxxxxx
spark.openlineage.transport.properties.bootstrap.servers=localhost:9092,another.host:9092
spark.openlineage.transport.properties.acks=all
spark.openlineage.transport.properties.retries=3
spark.openlineage.transport.properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spark.openlineage.transport.properties.value.serializer=org.apache.kafka.common.serialization.StringSerializer
spark.openlineage.transport.properties.localServerId=some-value
openlineage.transport.type=kafka
openlineage.transport.topicName=openlineage.events
openlineage.transport.localServerId=xxxxxxxx
openlineage.transport.properties.bootstrap.servers=localhost:9092,another.host:9092
openlineage.transport.properties.acks=all
openlineage.transport.properties.retries=3
openlineage.transport.properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
openlineage.transport.properties.value.serializer=org.apache.kafka.common.serialization.StringSerializer
openlineage.transport.properties.localServerId=some-value
import java.util.Properties;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.KafkaConfig;
import io.openlineage.client.transports.KafkaTransport;
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("bootstrap.servers", "localhost:9092,another.host:9092");
kafkaProperties.setProperty("acks", "all");
kafkaProperties.setProperty("retries", "3");
kafkaProperties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaConfig kafkaConfig = new KafkaConfig();
KafkaConfig.setTopicName("openlineage.events");
KafkaConfig.setProperties(kafkaProperties);
KafkaConfig.setLocalServerId("some-value");
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new KafkaTransport(httpConfig))
.build();
Kinesis
If a transport type is set to kinesis
, then the below parameters would be read and used when building KinesisProducer.
Also, KinesisTransport depends on you to provide artifact com.amazonaws:amazon-kinesis-producer:0.14.0
or compatible on your classpath.
Configuration
type
- string, must be"kinesis"
. Required.streamName
- the streamName of the Kinesis. Required.region
- the region of the Kinesis. Required.roleArn
- the roleArn which is allowed to read/write to Kinesis stream. Optional.properties
- a dictionary that contains a Kinesis allowed properties. Optional.
Behavior
- Events are serialized to JSON, and then dispatched to the Kinesis stream.
- The partition key is generated as
{jobNamespace}:{jobName}
. - Two constructors are available: one accepting both
KinesisProducer
andKinesisConfig
and another solely acceptingKinesisConfig
.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
transport:
type: kinesis
streamName: your_kinesis_stream_name
region: your_aws_region
roleArn: arn:aws:iam::account-id:role/role-name
properties:
VerifyCertificate: true
ConnectTimeout: 6000
spark.openlineage.transport.type=kinesis
spark.openlineage.transport.streamName=your_kinesis_stream_name
spark.openlineage.transport.region=your_aws_region
spark.openlineage.transport.roleArn=arn:aws:iam::account-id:role/role-name
spark.openlineage.transport.properties.VerifyCertificate=true
spark.openlineage.transport.properties.ConnectTimeout=6000
openlineage.transport.type=kinesis
openlineage.transport.streamName=your_kinesis_stream_name
openlineage.transport.region=your_aws_region
openlineage.transport.roleArn=arn:aws:iam::account-id:role/role-name
openlineage.transport.properties.VerifyCertificate=true
openlineage.transport.properties.ConnectTimeout=6000
import java.util.Properties;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.KinesisConfig;
import io.openlineage.client.transports.KinesisTransport;
Properties kinesisProperties = new Properties();
kinesisProperties.setProperty("property_name_1", "value_1");
kinesisProperties.setProperty("property_name_2", "value_2");
KinesisConfig kinesisConfig = new KinesisConfig();
kinesisConfig.setStreamName("your_kinesis_stream_name");
kinesisConfig.setRegion("your_aws_region");
kinesisConfig.setRoleArn("arn:aws:iam::account-id:role/role-name");
kinesisConfig.setProperties(kinesisProperties);
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new KinesisTransport(httpConfig))
.build();
Console
This straightforward transport emits OpenLineage events directly to the console through a logger. No additional configuration is required.
Behavior
Events are serialized to JSON. Then each event is logged with INFO
level to logger with name ConsoleTransport
.
Notes
Be cautious when using the DEBUG
log level, as it might result in double-logging due to the OpenLineageClient
also logging.
Configuration
type
- string, must be"console"
. Required.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
transport:
type: console
spark.openlineage.transport.type=console
openlineage.transport.type=console
import java.util.Properties;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.ConsoleTransport;
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new ConsoleTransport())
.build();
File
Designed mainly for integration testing, the FileTransport
emits OpenLineage events to a given file.
Configuration
type
- string, must be"file"
. Required.location
- string specifying the path of the file. Required.
Behavior
- If the target file is absent, it's created.
- Events are serialized to JSON, and then appended to a file, separated by newlines.
- Intrinsic newline characters within the event JSON are eliminated to ensure one-line events.
Notes for Yarn/Kubernetes
This transport type is pretty useless on Spark/Flink applications deployed to Yarn or Kubernetes cluster:
- Each executor will write file to a local filesystem of Yarn container/K8s pod. So resulting file will be removed when such container/pod is destroyed.
- Kubernetes persistent volumes are not destroyed after pod removal. But all the executors will write to the same network disk in parallel, producing a broken file.
Examples
- Yaml Config
- Spark Config
- Flink Config
- Java Code
transport:
type: file
location: /path/to/your/file
spark.openlineage.transport.type=file
spark.openlineage.transport.location=/path/to/your/filext
openlineage.transport.type=file
openlineage.transport.location=/path/to/your/file
import java.util.Properties;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.FileConfig;
import io.openlineage.client.transports.FileTransport;
FileConfig fileConfig = new FileConfig("/path/to/your/file");
OpenLineageClient client = OpenLineageClient.builder()
.transport(
new FileTransport(fileConfig))
.build();