Data Sources
Last updated
Last updated
Metarank has two data processing stages:
import: consume historical visitor interaction events and produce latest point-in-time snapshot of the system and all the ML features.
inference: continues processing live events that come realtime.
An overview diagram of event flow during inference/bootstrap is shown below:
Metarank supports the following list of connectors:
yes
yes
yes
yes
yes, but actually no *
yes
yes
yes
yes
no
*
AWS Kinesis has a strict max 7 days retention period, so if you want to store more than 7 days of historical clickthrough events, choose something else (for example, add AWS Kinesis Firehose to write events from Kinesis topic to S3 files to pick them with Files
connector).
All supported connectors have some shared options:
offset: a time window in which events are read
earliest
- start from the first stored message in the topic
latest
- consume only events that came recently (after Metarank connection)
ts=<timestamp>
- start from a specific absolute timestamp in the past
last=<duration>
- consume only events that happened within a defined relative duration (duration supports the following patterns: 1s
, 1m
, 1h
, 1d
)
format: event record encoding format, possible options:
json
: Both Json-line (newline separated records) and Json-array ([{event:1}, {event:2}]
) formats are supported.
snowplow:tsv|snowplow:json
- Snowplow-native format, see Snowplow integration for details on how to set it up
Config file definition example:
The path parameter is a node-local file or directory with the input dataset.
The file
data source supports:
compression, auto-detected based on file extension: ZStandard and GZip are supported
directories with multiple input files. Metarank sorts files by the specified sort
method to read them in a proper sequence.
Apache Kafka is an open source distributed event streaming platform.
If you already use Kafka in your project, Metarank can connect to an existing Kafka topic to read incoming and stored events both for import (with an offset set to some time in the past) and inference (when offset is set to latest
) stages.
Kafka connector is configured in the following way:
Extra connector options can be taken from ConsumerConfig in the following way:
Apache Pulsar is an open source distributed messaging and streaming platform.
If you already use Pulsar in your project, Metarank can connect to an existing Pulsar topic to read incoming and stored events both for import (with an offset set to some time in the past) and inference (when offset is set to latest
) stages.
Metarank supports Pulsar 2.8+, but using 2.9+ is recommended.
Pulsar connector is configured in the following way:
Extra connector options can be taken from Configuring Pulsar consumer in the following way:
AWS Kinesis Streams is a fully-managed event streaming platform. Metarank uses a connector for Apache Flink which is well-maintained and feature complete.
To configure the connector, use this reference YAML block:
Important things to note when using AWS Kinesis connector for bootstrap:
AWS Kinesis has a hard limit on max retention time of 7 days. If you want to store more data there, use AWS Firehose to offload these events to S3 and pick them with the File
connector.
AWS limits max throughput per shard to 2Mb/s, so it may take some time to pull large dataset from kinesis. You may need to consider using an EFO consumer for a dedicated throughput.
Kinesis source uses a default auth chain from the AWS SDK, so all the possible ways of AWS SDK authentication methods are supported, but in short:
Use IAM roles when possible
add AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY env vars to manually supply the keys.
It's possible to ingest real-time feedback events directly using the REST API of Metarank:
POST /feedback
- push feedback events to Metarank
The /feedback
endpoint is always enabled and there is no need to configure it explicitly.
You can read more about Metarank REST API in the API Documentation. You can bundle multiple events in a single batch using batch payloads, so REST API can be used for batch dataset import instead of a separate import
step: