Page tree

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Image Added

 

Note
titleNote!

Certain features described in this documentation are part of an add-on package and may not be available to you. Please contact your Account Manager for further information.


The Data Aggregator function is used to group and aggregate usage data to provide better commercial sense and ease in managing large amounts of data. As an example, streams using the Data Aggregator function can help billing systems that have limitations on the number of usage events that can be processed. Using this function, you can perform aggregation operations such as SUM, COUNT, MAX, MIN, Average AVERAGE and many more on any configured field(s), and can as well as set conditions on those fields when flushing data.

For example, accumulate usage records over time, per Accountaccount, Subscriber subscriber and Service service to calculate the total usage of a service over a month and then use the aggregated data to deliver to a billing service or to be used as an input for a cloud-based storage service.

Configuration

The Data Aggregator function's configuration contains the following sections:

  • Group Fields: Here you set the fields in the data that you wish to group together. You also have the option to group based on date/time.
  • Aggregate Fields - Here you select the fields you want to perform data aggregation on and select the arithmetic operation to be applied when grouping that field.
  • Flush by - Here you select how and when you want to flush (forward) the aggregated data to the next function in the stream.

Group Fields

...

Anchor
group fields
group fields

Here you specify the names of all the fields you want to group together when performing the aggregation.

FieldDescription
Fields

Specify the fields

that you wish to group together for aggregation.

to be used for grouping aggregated data. If a record has the same values as a another record in the selected fields, they will be grouped in the same session.

Info
titleExample

For example, there are three records being read in a stream with field key, user and you decide to group by user

Record1 has user field with value set as 1

Record2 has user field with value set as 2

Record3 has user field with value set as 1

So, Record1 and Record3 will be grouped in the same session and the value will now be updated to 2. And, Record2 will be in a separate session.


You can either type the name of the field(s) or select the field(s) from the drop-down menu that appears when you click on the field. In the Fields drop-down menu, you also have the option to Select all or Deselect all fields.

Group based on date/time

Select

this

the checkbox to

group the aggregated data

enable aggregation based on

the

a time period.

For this you must specify the name of the field or select a field from the Field drop-down many

Specify applicable fields, containing timestamp data, to be used for grouping of aggregation data.

Select the Period to group the information by:

  • Hour
  • Day
  • Month
  • Year
Info
titleTime Period Format

The supported time period format is ISO 8601 (extended format) in accordance with the YYYY-MM-DDTHH:mm:ss.sssZ pattern.

In case you need to convert a date format to support the ISO standard, see Script.

An example would be: "2019-01-13T00:00:00.000Z". 

Click +Period Field to

Click Image Added to add additional grouping criteria based on the time period.


Note
titleConfiguration

Any change in the Group Fields configuration will always may result in a new aggregation session, however, the old sessions will still be in the storage. So if you change it back to the previous configuration, the already existing session in the storage (if any) will be used during the aggregation. 

Aggregate Fields

...

Anchor
aggregate fields
aggregate fields

Here you specify the name of the field(s) and the operation to be performed on that field.

FieldOperation
Name of the input field key

The Operation drop-down menu is divided into three types: 

  • Numeric: 
    • SUM
    • MAX
    • MIN
    • AVERAGE
  • General
    • COUNT
    • CARRY_FIRST
    • CARRY_LAST
  • Date
    • MAX
    • MIN


Click + Aggregate Field Image Added to add more fields for aggregation.

Flush

...

by
Anchor
flush by
flush by

Here you select how and when you want to flush (forward) the aggregated data to the next function in the stream.

...


In

Product name
, you can flush using any of the following options:

Flush byDescription
End of transaction
Data

Aggregated data is flushed at the end of

the transaction. A transaction here refers to a file that is processed by the stream. Flushing occurs only after that individual transaction is processed and complete.In case of batch streams, the aggregated data is forwarded after each set of data is read, for

each transaction.

Info

For example, an Excel file in an S3 bucket.

Not applicable in case of real

This option is not applicable for real-time streams.

End of stream
Data

Aggregated data is flushed at the end of

all transactions (or, multiple files) during a stream execution. Flushing occurs only after the stream has run and is complete. 

the stream.

Info
titleExample

A use case where data sets from five different Excel files from an S3 bucket are being processed by a stream.

This option is not applicable for real-time streams.

Timeout

Aggregated data is timed out when a predefined interval has passed or a condition is met.

In case of batch streams, the flush happens only when a stream is executed. For real time streams, the aggregated data is

forwarded only after all sets of data have been read.

Not applicable in case of real-time streams.

TimeoutData is flushed according to a timeout condition. Both batch and real-time streams support timeouts

checked every 60 seconds and flushed only in case a timeout has occurred. 

Info
titleExample

Example of a timeout - Record the consumption of mobile data of a subscriber by 10th of every month.

Example of a condition timeout -  Record the consumption of mobile data until 10th of every month or if 100 GB limit is reached. So in this case the timeout will happen if either of these two conditions are met.


Select the type of the timeout based on:

Frequency (flushing data per unit of time)
  • Duration
    • Hours:

Data is flushed on an hourly basis. You must specify the Interval which decides how often the data must be flushed. For example, if Interval is set to 2, then data is flushed every 2 hours.Day: Data
    • Specify the interval to flush aggregated data.

      Info
      titleExample
      • For example, if you aggregate price per account and set timeout to 1 hour, then you will have the following behavior: 
        • At 13.00 Account 'X' with price 10 is aggregated, timeout will then be set to 14.00
        • At 13.07 Account 'Y'  with price 20 is aggregated, timeout will be set to 14.07
        • At 13.33 Account 'X' with price 12 is aggregated, sum updated to 22, timeout will still be set to 14.00


  • Frequency
    • Day: Aggregated data is flushed on a daily basis. You must specify the Time and Timezone to be considered

when flushing data.  Month: Data is flushed on a monthly basis. Select
    • The timeout will be an absolute value as you set time/day when the timeout must happen. 

      Info
      titleExample

      For example, you set the timeout to day and time to 12.00 PM, then all created aggregation sessions will be timed out independently on when they were created.


    • Month: Aggregated data is flushed on monthly basis. Specify which day of every month, time of that day and the timezone
to be considered
    • when
flushing data
    • the aggregated data will be timed out

  • Custom
    • Based on date/time field: Timeout will be defined based on the input data in the record per aggregated session. Select or enter a field from your input data that is of either date or time format.

You can also add a field of your choice. Supports
    •  Supports UTC format only.

      Info
      titleDate Format

      The supported date format is ISO 8601 (extended format) in accordance with the YYYY-MM-DDTHH:mm:ss.sss pattern.

      An example would be: "2019-01-13T00:00:00.000". 

Select +Add Condition
    • For example, a record that matches aggregation criteria with a date/time field set to 20220803:13.00.00 will have its timeout set to that date. Timeout will be updated in case new matching records arrive (with a different date/time field).  

Click Image Added if you wish to add custom conditions to

flush

timeout the aggregated data

when the Timeout flushing method

. This option is available only when the Timeout flush by option is selected.

For more information, refer to the Add Condition for Timeout section in the documentation.


Click the following to know more:

UI Expand
titleAdd Condition

Add Condition for Timeout
Anchor
condition timeout
condition timeout

You also have the option to

...

add a condition based on which the aggregated data

...

is flushed (For example,

...

timeout to happen when the value of the SUM field is more than 70).

Note
This option is only available in case of Timeout flush setting.
  • Condition name: You can assign a name to the condition.
  • Based on: Select which input fields or aggregated fields you want to apply the condition on. The Input Fields show all the input fields configured in the stream and Aggregated Fields show the fields that you have selected to perform aggregation on (in the Aggregation tab).

    Note

    The CARRY_FIRST and CARRY_LAST operations are not supported. So if you perform aggregation using any of these General Operations, those will not appear under the Aggregated Fields.


  • Type of field: Type of the field that is specified in Based on. You can choose from Numerical, String or Boolean. Applicable only in case of Input Field.
  • Operation: Specify the comparing conditions for that field to be compared with a Value that you will be specifying in the next step. The value changes depending on whether you choose input fields or aggregated fields in the Based on input.

  • Value: The numerical value against which you want to apply the operation.

    The following table explains which options are available when you select a specific criteria to add a Flush condition:

    Based onType of fieldOperationValue
    Input Field

    Select from the following values:

    • Numerical
    • Text Values (string)
    • Boolean

    Options vary according to the Type of field.

    • Numerical: Mathematical operation. You can choose from the following:
      Less than, Greater than, Less or equal to, Greater or equal to, Equal, Is different from
    • Text: Matches/Not Matches. It is not possible to perform mathematical operation on strings.
    • Boolean: Yes (True) and No (False)

    If you select:

    • Numeric Values: You can type the value or use the up or down keys to increment or decrement the value. 
    • Text Values (string): Enter the string value.
    • Boolean: Select Yes/No 
    Aggregated FieldsN/AYou can select from the numerical operation.


Click +Add Condition to add more conditions.



Note

The default value for the TTL for the stored data in a session is 180 days. That means if a session is not updated for 180 days, all the stored data pertaining to that session will be deleted permanently. 

...

Info
titleSome example streams

For examples of how you can use Data Aggregator in a stream, see one of the following:

Metadata

You can view and access the following metadata properties of Data Aggregator. To view the metadata, use the meta object as mentioned in the Script function. Here is an example:

Info
titleExample


Code Block
{"origin":"Data_Aggregator","count":7,"flushType":"TIMEOUT","firstEvent":"2022-04-08T17:35:53.239Z","lastEvent":"2022-04-08T17:38:17.315Z","lastCall":false}



...

Property name Description

count

Number of aggregated records

flushType

The reason for session being flushed out. Shows any of the values: ALL_FILES, EACH_FILE, TIMEOUT and CONDITION. During preview, the value will be empty.

firstEventDate and time of the first aggregated record in the session
lastEvent

Date and time of the last aggregated record in the session

...