Skip to content

KSQL

This provider is designed to bring the power of one of the most popular streaming platforms: Apache Kafka and be able to plug it with ChartFactor by using its own query language: KSQL.

KSQL is developed by Confluent and is defined as:

KSQL IS AN OPEN SOURCE STREAMING SQL ENGINE THAT IMPLEMENTS CONTINUOUS, INTERACTIVE QUERIES AGAINST APACHE KAFKA™. IT ALLOWS YOU TO QUERY, READ, WRITE, AND PROCESS DATA IN APACHE KAFKA IN REAL-TIME, AT SCALE USING SQL COMMANDS.

Through ChartFactor’s aql, we can define queries that will be translated into KSQL. And the streamed data will be rendered in the visuals as it arrives.

Include the library:

1
<script src="./CFT-ksql-provider.min.js"></script>
1
2
3
4
5
6
// define providers
var providers = [{
    name:'KSQL',
    provider:'ksql',
    url:'http://localhost:9098'
}]

Then, as in with any other provider, use the setProviders() method of ChartFactor to set your data provider definitions:

1
cf.setProviders(providers);

This assumes you have a running instance of KSQL and the Confluent Platform. Refer to Confluent's KSQL installation documents for setps to install a KSQL server.

Supported Confluent and KSQL versions

ChartFactor Toolkit KSQL data provider supports Confluent and KSQL platform version 4.x. The latest minor version of the 4.x version is recommended.

Supported Aggregations Out-Of-The-Box

SUM

1
    var metric = cf.Metric("amount","sum");

AVG

1
    var metric = cf.Metric("amount","avg");

MIN

1
    var metric = cf.Metric("amount","min");

MAX

1
    var metric = cf.Metric("amount","max");

KSQL limitations and workarounds

The following workarounds are needed to work around Confluent and KSQL 4.x limitations to support non-grouped aggregated queries and the lack of start window and end window functions:

KPI (non-gruped aggregated queires)

To use a KPI chart in your data application, you have to create a second KSQL stream with a FOO column that will always have the same value. This works around the lack of support for non-grouped aggregated queries. For instance, let's take a look at the following KSQL stream:

1
CREATE STREAM PAGEVIEWS_ORIGINAL (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');

To render a KPI for view time, page view count, or any other metric, you need to define a second KSQL stream as shown below:

1
    CREATE STREAM PAGEVIEWS_ORIGINAL2 AS SELECT 1 AS FOO, * FROM PAGEVIEWS_ORIGINAL;

Then, in your data application you will be able to define a KPI widget using the second KSQL stream in this way:

1
2
3
4
5
6
7
8
    cf.provider('KSQL')
        .source('PAGEVIEWS_ORIGINAL2')
        .metrics(cf.Metric())
        .element('kpi')
        .set('showLabels', false)
        .set('mainTextSize', 6)
        .graph('KPI')
        .execute()

ChartFactor will use the FOO column internally to generate a query that KSQL 4.x accepts and render the KPI appropriately.

Trend (no way to detect the start of the time window)

To use a Trend chart in your data application, you need to create a second KSQL stream with your window definition to work around the lack of window start and window end functions. For instance, let's take a look at the following KSQL stream:

1
CREATE STREAM PAGEVIEWS_ORIGINAL (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');

To render a Trend, you will need to define a second KSQL stream as shown below:

1
    CREATE TABLE POBYUSER1S AS SELECT USERID, COUNT(*) AS PO_COUNT FROM PAGEVIEWS_ORIGINAL WINDOW TUMBLING (SIZE 1 SECOND) GROUP BY USERID;

Then, in your data application you will be able to define a Trend widget using the second KSQL stream as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    cf.setProviders([{
        name: 'KSQL',
        provider: 'ksql',
        url: 'https://chartfactor.com/ksql',
        metadata: {
            'POBYUSER1S': {
                fields: {
                    'ROWTIME': { 'type': 'TIME'}
                }
            }
        }
    }]);

    var time = cf.Attribute('rowtime', 'Rowtime').func('second').limit(20).sort('asc', 'rowtime');
    var user = cf.Attribute('userid', 'Users').limit(25).sort('desc', 'userid');
    var countTrend = cf.Metric('PO_COUNT');

    cf.provider('KSQL')
        .source('POBYUSER1S')
        .groupby(user, time)
        .metrics(countTrend)
        .element('trend')
        .set('legend', 'right')
        .set('axisLabels', false)
        .graph('Trend')
        .execute();

Important things to note in the data application code above:

  • We define metadata for the KSQL provider to tell ChartFactor that ROWTIME is a TIME field since it is internally a BigInt. Also, note that I refer to POBYUSER1S as the source name since that is the name I used to create my stream on the KSQL side.
  • The definition of the time attribute needs to match the time granularity of the time window used when creating the KSQL stream, in this case it is second.

Dependencies

  • None