Pipeline - TabularPipelineBuilder
Pipeline.TabularPipelineBuilderA TabularPipelineBuilder
to build a data
Methods on the TabularPipelineBuilder
enable you to directly transform data
from
a tabular
transform
operation to the whole Stream,
or applying select
, filter
, join
, aggregate
, disaggregate
and offset
each entry in the Stream.
Quality of data can be observed using assert
and warn
by providing expression based conditions
and messages, either to the whole
A corresponding
Template can be created using.toTemplate()
.
Remarks
See
Transform Data for a related learning module.Example
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.error({
if: (value) => Equal(Size(value), 0n),
message: () => Const("No users defined")
})
.input({ name: "admin", stream: admin })
.error({
if: (_, { admin }) => Equal(admin, ""),
message: () => Const("Admin user can't be empty")
})
.filter((fields, _, inputs) => NotEqual(fields.username, inputs.admin))
.warnEvery({
if: (fields) => Equal(fields.password, ""),
message: (fields) => StringJoin`User ${fields.username} has empty password`
})
.toTemplate();
Type parameters
Name | Type |
---|---|
Output | extends DictType |
Inputs | extends Record |
Hierarchy
-
Builder
↳
TabularPipelineBuilder
Pipeline
aggregate
▸ aggregate(config
):
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string]: ReturnType["type"] } & { [K in string | number | symbol]: ReturnType["type"] }>>, Inputs
>
A aggregate (or group) a row into based with defined methods.
Type parameters
Name | Type |
---|---|
GroupName | extends string |
G | extends (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction |
A | extends Record <string , (fields : TypeToFields , key : Variable , inputs : Inputs ) => AggregationDefinition > |
Parameters
Name | Type | Description |
---|---|---|
config | Object | - |
config.aggregations | A | A function the generates a colleciton of Expression's for the group values |
config.group_name | GroupName | A name of the group variable created |
config.group_value | G | A function that generates an Expression for the value of the group |
Returns
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string]: ReturnType["type"] } & { [K in string | number | symbol]: ReturnType["type"] }>>, Inputs
>
a new
PipelineBuilderExample
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
role: StringType
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream with the count of non admin users per role
const pipeline = new PipelineBuilder("Non-Admin Roles Count")
.from(users)
.input({ name: "admin", stream: admin })
.aggregate({
group_name: "group",
group_value: (fields) => fields.role,
aggregations: {
sum: (fields, _key, inputs) => Sum(IfElse(Equal(fields.email, inputs.admin), 0,1))
}
})
.toTemplate();
concatenate
▸ concatenate(config
):
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: Output["value"]["value"]["value"][K] } & { [K in string]: StringType }>>, Inputs
>
Concatenate one-or-more other tabular data sets of the same type to the input data.
To avoid clashes with primary key an optional discriminator may be provided to prepend the keys of each table.
Type parameters
Name | Type |
---|---|
DiscriminatorName | extends string |
Parameters
Name | Type | Description |
---|---|---|
config | Object | - |
config.discriminator_name | DiscriminatorName | The discriminator name to insert into the output |
config.discriminator_value | string | The discriminator to prepend to the primary key of the input table |
config.inputs | { discriminator_value : string ; input : (inputs : Inputs ) => Variable }[] | The other tabular data to prepend to the primary key of the input |
Returns
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: Output["value"]["value"]["value"][K] } & { [K in string]: StringType }>>, Inputs
>
a new
PipelineBuilderExample
const pipeline = new PipelineBuilder("Concatenator")
.from(stream)
.input({ name: "other", stream: other_stream })
.concatenate({
discriminator_value: "A",
inputs: [{ input: inputs => inputs.other, discriminator_value: "B" }]
});
disaggregateArray
▸ disaggregateArray(config
):
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs
>
Apply a disaggregation to an array in each row, producing one row per entry in the array.
Type parameters
Name | Type |
---|---|
C | extends (fields : TypeToFields , input_key : Variable , inputs : Inputs ) => EastFunction |
S | extends Record <string , (fields : TypeToFields , value : Variable <ReturnType ["type" ]["value" ]>, collection_key : Variable , input_key : Variable , inputs : Inputs ) => EastFunction > |
Parameters
Name | Type | Description |
---|---|---|
config | Object | - |
config.collection | C | A function that generates an Expression for the array to disaggregate based on the input fields |
config.keep_all | true | If true , append the selections to the existing fields |
config.output_key? | (fields : TypeToFields , value : Variable <ReturnType ["type" ]["value" ]>, collection_key : Variable , input_key : Variable , inputs : Inputs ) => EastFunction | A function that generates an Expression that identifies the row uniquely based on the output fields |
config.selections | S | An object containing functions generating Expressions for output field values based on the input fields and each value and key from the array |
Returns
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs
>
a new
PipelineBuilderExample
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
roles: ArrayType(StringType)
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream of users roles
const pipeline = new PipelineBuilder("User Roles")
.from(users)
.input({ name: "admin", stream: admin })
.disaggregateArray({
collection: fields => fields.roles,
keep_all: true,
selections: {
role: (_, __, value) => value,
hash: (fields, _, __, ___, inputs) => AsciiToBase64(StringJoin`${fields.username}.${inputs.admin}`)
}
})
.toTemplate();
disaggregateDict
▸ disaggregateDict(config
):
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs
>
Apply a disaggregation to a dictionary in each row, producing one row per entry in the dictionary.
Type parameters
Name | Type |
---|---|
C | extends (fields : TypeToFields , input_key : Variable , inputs : Inputs ) => EastFunction |
S | extends Record <string , (fields : TypeToFields , value : Variable <ReturnType ["type" ]["value" ]["value" ]>, collection_key : Variable <ReturnType ["type" ]["value" ]["key" ]>, input_key : Variable , inputs : Inputs ) => EastFunction > |
Parameters
Name | Type | Description |
---|---|---|
config | Object | - |
config.collection | C | A function that generates an Expression for the dictionary to disaggregate based on the input fields |
config.keep_all | true | If true , append the selections to the existing fields |
config.output_key? | (fields : TypeToFields , value : Variable <ReturnType ["type" ]["value" ]["value" ]>, collection_key : Variable <ReturnType ["type" ]["value" ]["key" ]>, input_key : Variable , inputs : Inputs ) => EastFunction | A function that generates an Expression that identifies the row uniquely based on the output fields |
config.selections | S | An object containing functions generating Expressions for output field values based on the input fields and each value and key from the dictionary |
Returns
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs
>
a new
PipelineBuilderExample
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
roles: DictType(StringType, FloatType)
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream of users roles
const pipeline = new PipelineBuilder("User Roles")
.from(users)
.input({ name: "admin", stream: admin })
.disaggregateDict({
collection: fields => fields.roles,
keep_all: true,
selections: {
role: (_, __, value) => value,
hash: (fields, _, __, ___, inputs) => AsciiToBase64(StringJoin`${fields.username}.${inputs.admin}`)
}
})
.toTemplate();
disaggregateSet
▸ disaggregateSet(config
):
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs
>
Apply a disaggregation to a set in each row, producing one row per entry in the set.
Type parameters
Name | Type |
---|---|
C | extends (fields : TypeToFields , input_key : Variable , inputs : Inputs ) => EastFunction |
S | extends Record <string , (fields : TypeToFields , collection_key : Variable <ReturnType ["type" ]["value" ]>, input_key : Variable , inputs : Inputs ) => EastFunction > |
Parameters
Name | Type | Description |
---|---|---|
config | Object | - |
config.collection | C | A function that generates an Expression for the set to disaggregate based on the input fields |
config.keep_all | true | If true , append the selections to the existing fields |
config.output_key? | (fields : TypeToFields , collection_key : Variable , input_key : Variable , inputs : Inputs ) => EastFunction | A function that generates an Expression that identifies the row uniquely based on the output fields |
config.selections | S | An object containing functions generating Expressions for output field values based on the input fields and each key from the set |
Returns
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs
>
a new
PipelineBuilderExample
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
roles: SetType(StringType)
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream of users roles
const pipeline = new PipelineBuilder("User Roles")
.from(users)
.input({ name: "admin", stream: admin })
.disaggregateSet({
collection: fields => fields.roles,
keep_all: true,
selections: {
role: (_, __, value) => value,
hash: (fields, _, __, inputs) => AsciiToBase64(StringJoin`${fields.username}.${inputs.admin}`)
}
})
.toTemplate();
error
▸ error(config
):
TabularPipelineBuilder
Add an assertion on the pipeline inputs and output to identify errors. When the if
predicate returns true
the pipeline will be terminated with an error message and output data will not be produced.
Parameters
Name | Type | Description |
---|---|---|
config | Object | the error message and predicate |
config.if | (value : Variable , inputs : Inputs ) => EastFunction | If true an error will be created |
config.message | string | (value : Variable , inputs : Inputs ) => EastFunction | The message in the case that an error is created |
Returns
TabularPipelineBuilder
Example
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.error({
if: (value) => Equal(Size(value), 0n),
message: () => Const("No users defined")
})
.input({ name: "admin", stream: admin })
.error({
predicate: (_, { admin }) => Equal(admin, ""),
message: () => Const("Admin user can't be empty")
})
.toTemplate();
errorEvery
▸ errorEvery(config
):
TabularPipelineBuilder
Add an assertion on every row of the pipeline output to identify errors. For each row, when the if
predicate returns true
the pipeline will be terminated with an error message and output data will not be produced.
Parameters
Name | Type | Description |
---|---|---|
config | Object | the error message and predicate |
config.if | (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction | If true an error will be created |
config.message | string | (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction | The message in the case that an error is created |
Returns
TabularPipelineBuilder
Example
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.input({ name: "admin", stream: admin })
.filter((fields, _, inputs) => NotEqual(fields.username, inputs.admin))
.errorEvery({
if: (fields) => Equal(fields.password, ""),
message: (fields) => StringJoin`User ${fields.username} has empty password`
})
.toTemplate();
filter
▸ filter(predicate
):
TabularPipelineBuilder
Apply a filter to the input data based on a user-defined predicate.
Rows where the predicate
evaluates to true
are kept, and rows where predicate
is false
are discarded.
Parameters
Name | Type |
---|---|
predicate | (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction |
Returns
TabularPipelineBuilder
a new
PipelineBuilderExample
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.input({ name: "admin", stream: admin })
.filter((fields, _, inputs) => NotEqual(fields.username, inputs.admin))
.toTemplate();
histogram
▸ histogram(config
):
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ bin
: IntegerType
; bin_end
: FloatType
; bin_start
: FloatType
; group
: ReturnType
["type"
] ; histogram
: FloatType
}>>, Inputs
>
Transform data to obtain a histogram of data in the rows, optionally by group.
Type parameters
Name | Type |
---|---|
GK | extends (fields : TypeToFields , input_key : Variable , inputs : Inputs ) => EastFunction |
Parameters
Name | Type | Description |
---|---|---|
config | Object | - |
config.cumulative? | boolean | If true, calculate a cumulative distribution (default false ) |
config.density? | boolean | Whether the probabilities should be interpretted as a density and scaled by the step size (default false ) |
config.group | GK | an optional Expression defining the group key |
config.minimum? | number | the minimum value to consider in the distribution |
config.n_bins? | number | the numbe rof bins in the histogram |
config.normalization? | "count" | "percentage" | "probability" | the normalization of the histogram - 'count' (default), 'percentage' or 'probability' |
config.samples | (fields : TypeToFields , input_key : Variable , inputs : Inputs ) => EastFunction | the function for the value samples |
config.step? | number | the histogram step size to apply |
Returns
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ bin
: IntegerType
; bin_end
: FloatType
; bin_start
: FloatType
; group
: ReturnType
["type"
] ; histogram
: FloatType
}>>, Inputs
>
a new
PipelineBuilderExample
const TasksType = DictType(
StringType,
StructType({
username: StringType,
task: StringType,
duration: FloatType
})
)
const tasks = Stream("Task", TasksType);
// create a stream of historgram of task duration per user
const pipeline = new PipelineBuilder("User Roles")
.from(tasks)
.histogram({
group: fields => fields.username,
samples: (fields) => fields.duration,
normalization: "PercentDensity",
})
.toTemplate();
innerJoin
▸ innerJoin(config
):
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: ReturnType["type"] }>>, Inputs
>
Apply an inner join operation, matching rows from the current ("left") table with an additional "right" table. Rows without matching join keys are discarded.
Type parameters
Name | Type |
---|---|
RI | extends (inputs : Inputs ) => Variable <DictType > |
LS | extends Record <string , (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction > |
RS | extends Record <string , (fields : TypeToFields <ReturnType ["type" ]>, key : Variable , inputs : Inputs ) => EastFunction > |
Parameters
Name | Type | Description |
---|---|---|
config | Object | - |
config.left_key? | (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction | A function to build an Expression for the join key on the "left" table (defaults to the left table's primary key) |
config.left_selections | LS | A record of functions to build Expressions for the fields to output from the "left" table (defaults to all fields) |
config.output_key | (fields : { [K in string | number | symbol]: Variable<ReturnType["type"]> } & { [K in string | number | symbol]: Variable<ReturnType["type"]> }, left_input_key : Variable , right_input_key : Variable , inputs : Inputs ) => EastFunction | A function to build an Expression for the primary key of the output of the join operation (defaults to the left table's primary key) |
config.right_input | RI | The tabular stream holding data for the "right" side of the join operation |
config.right_key? | (fields : TypeToFields <ReturnType ["type" ]>, key : Variable , inputs : Inputs ) => EastFunction | A function to build an Expression for the join key on the "right" table (defaults to the right table's primary key) |
config.right_selections | RS | A record of functions to build Expressions for the fields to output from the "right" table (defaults to all fields) |
Returns
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: ReturnType["type"] }>>, Inputs
>
a new
PipelineBuilderExample
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
role: StringType
})
)
const users = Stream("Users", UsersType);
const RoleType = DictType(
StringType,
StructType({
role: StringType,
expert: BooleanType
})
)
const roles = Stream("Roles", RoleType);
// create a stream that combines a user with their role
const pipeline = new PipelineBuilder("User with Role")
.from(users)
.input({ name: "roles", stream: roles })
.innerJoin({
right_input: inputs => inputs.roles,
left_selections: {
username: fields => fields.username,
email: fields => fields.email,
role: fields => fields.role,
},
right_selections: {
role: fields => fields.role,
expert: fields => fields.expert
},
output_key: fields => fields.role
})
.toTemplate();
input
▸ input(config
):
TabularPipelineBuilder
<Output
, Inputs
& { [K in string]: Variable }>
Add an additional named input
Stream to the Pipeline.Type parameters
Name | Type |
---|---|
Name | extends string |
T | extends EastType |
Parameters
Name | Type | Description |
---|---|---|
config | Object | the input stream and the resulting variable name |
config.name | Name extends "input" | keyof Inputs ? never : Name | the name to give the input Variable |
config.stream | Stream | the input stream (the stream and associated preconditions) |
Returns
TabularPipelineBuilder
<Output
, Inputs
& { [K in string]: Variable }>
a new
PipelineBuilderExample
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.input({ name: "admin", stream: admin })
.toTemplate();
leftJoin
▸ leftJoin(config
):
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Nullable<ReturnType["type"]> }>>, Inputs
>
Apply a left join operation, matching rows from the current ("left") table with
an additional "right" table. If a left row does not have any matching right rows,
the right row's selections are taken to be null
.
Type parameters
Name | Type |
---|---|
RI | extends (inputs : Inputs ) => Variable <DictType > |
LS | extends Record <string , (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction > |
RS | extends Record <string , (fields : TypeToFields <ReturnType ["type" ]>, key : Variable , inputs : Inputs ) => EastFunction > |
Parameters
Name | Type | Description |
---|---|---|
config | Object | - |
config.left_key? | (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction | A function to build an Expression for the join key on the "left" table (defaults to the left table's primary key) |
config.left_selections | LS | A record of functions to build Expressions for the fields to output from the "left" table (defaults to all fields) |
config.output_key | (fields : { [K in string | number | symbol]: Variable<ReturnType["type"]> }, left_input_key : Variable , right_input_key : Variable , inputs : Inputs ) => EastFunction | A function to build an Expression for the primary key of the output of the join operation (defaults to the left table's primary key) |
config.right_input | RI | The tabular stream holding data for the "right" side of the join operation |
config.right_key? | (fields : TypeToFields <ReturnType ["type" ]>, key : Variable , inputs : Inputs ) => EastFunction | A function to build an Expression for the join key on the "right" table (defaults to the right table's primary key) |
config.right_selections | RS | A record of functions to build Expressions for the fields to output from the "right" table (defaults to all fields) |
Returns
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Nullable<ReturnType["type"]> }>>, Inputs
>
a new
PipelineBuilderExample
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
role: StringType
})
)
const users = Stream("Users", UsersType);
const RoleType = DictType(
StringType,
StructType({
role: StringType,
expert: BooleanType
})
)
const roles = Stream("Roles", RoleType);
// create a stream that combines a user with their role
const pipeline = new PipelineBuilder("User with Role")
.from(users)
.input({ name: "roles", stream: roles })
.leftJoin({
right_input: inputs => inputs.roles,
left_selections: {
username: fields => fields.username,
email: fields => fields.email,
role: fields => fields.role,
},
right_selections: {
role: fields => fields.role,
expert: fields => fields.expert
},
output_key: fields => fields.role
})
.toTemplate();
let
▸ let(name
):
TabularPipelineBuilder
<Output
, Inputs
& { [K in string]: Variable }>
Give a name to the current
Pipeline output to be used as an input later in the Pipeline (i.e. after the next operation)Type parameters
Name | Type |
---|---|
Name | extends string |
Parameters
Name | Type | Description |
---|---|---|
name | Name extends "input" | keyof Inputs ? never : Name | the input stream and the resulting variable name |
Returns
TabularPipelineBuilder
<Output
, Inputs
& { [K in string]: Variable }>
a new
PipelineBuilderExample
// create a stream of non admin users, and seperate admin user
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Users")
.from(users)
.let("all")
.input({ name: "admin", stream: admin })
.filter((fields, _, inputs) => NotEqual(fields.username, inputs.admin))
.transform((users, inputs) => Struct({
"Non-Admin Users": users,
"Admin User": Get(inputs.all, inputs.admin)
}))
.toTemplate();
log
▸ log(config
):
TabularPipelineBuilder
Produce a log message depending on the pipeline inputs and ouputs. When the if
the predicate returns true
the
pipeline will produce a log message and proceed.
Parameters
Name | Type | Description |
---|---|---|
config | Object | the log message and optional predicate |
config.if | (value : Variable , inputs : Inputs ) => EastFunction | If true a log message will be created |
config.message | string | (value : Variable , inputs : Inputs ) => EastFunction | The log message |
Returns
TabularPipelineBuilder
Example
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.log({
if: (value) => Equal(Size(value), 0n),
message: () => Const("No users defined")
})
.input({ name: "admin", stream: admin })
.log({
if: (_, { admin }) => Equal(admin, ""),
message: () => Const("Admin user can't be empty")
})
.filter((fields, _, inputs) => NotEqual(fields.username, inputs.admin))
.toTemplate();
logEvery
▸ logEvery(config
):
TabularPipelineBuilder
Produce a log message for every row of the pipeline ouput. For each row, when the if
the predicate returns true
the
pipeline will produce a log message and proceed.
Parameters
Name | Type | Description |
---|---|---|
config | Object | the log message and optional predicate |
config.if? | (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction | If true a log message will be created |
config.message | string | (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction | The log message |
Returns
TabularPipelineBuilder
Example
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.input({ name: "admin", stream: admin })
.filter((fields, _, inputs) => NotEqual(fields.username, inputs.admin))
.logEvery({
if: (fields) => Equal(fields.password, ""),
message: (fields) => StringJoin`User ${fields.username} has empty password`
})
.toTemplate();
offset
▸ offset(config
):
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs
>
Apply a transformation on a row-by-row basis by pulling in data from related rows.
Rows can (optionally) be grouped by a group key, and are sorted by a sort key. The offset
indicates
the offset to the requested row within the group according to the sort order. For
example, an offset
of 1
would relate to the next row in sorted order, and -1
would relate to the previous row in sort order. If such a row does not exist, the
provided fields are null
. An offset_exists
variable is provided to indicate
whether the row exists or not.
Type parameters
Name | Type |
---|---|
S | extends Record <string , (fields : { [K in string | number | symbol]: Variable<Nullable> }, key : Variable , offset_exists : Variable , inputs : Inputs ) => EastFunction > |
Parameters
Name | Type | Description |
---|---|---|
config | Object | - |
config.group_key? | (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction | A function returning an Expression for the value to group the rows by |
config.offset | number | The offset of the row to provide to offset_selections |
config.offset_selections | S | An object containing functions generating Expressions for output field values based on the available fields in the offset row, as well as a variable indicating if the row exists |
config.sort_key | (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction | A function returning an Expression for the value to sort the rows by |
Returns
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs
>
a new
PipelineBuilderExample
const TasksType = DictType(
StringType,
StructType({
username: StringType,
task: StringType,
date: DateTimeType
})
)
const tasks = Stream("Task", TasksType);
// create a stream with tasks, including next task date
const pipeline = new PipelineBuilder("Tasks with Next")
.from(tasks)
.offset({
group_key: fields => fields.username,
sort_key: fields => fields.date,
offset: 1,
offset_selections: {
next_date: (fields) => fields.date
}
})
.toTemplate();
outerJoin
▸ outerJoin(config
):
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Nullable<ReturnType["type"]> } & { [K in string | number | symbol]: Nullable<ReturnType["type"]> }>>, Inputs
>
Apply a full outer join operation, matching rows from the current ("left") table with
an additional "right" table. If a left row does not have any matching right rows,
the right row's selections are taken to be null
. Furthermore, if a right row does
not have any matching left rows, the left row's selections are taken to be null
.
Type parameters
Name | Type |
---|---|
RI | extends (inputs : Inputs ) => Variable <DictType > |
LS | extends Record <string , (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction > |
RS | extends Record <string , (fields : TypeToFields <ReturnType ["type" ]>, key : Variable , inputs : Inputs ) => EastFunction > |
Parameters
Name | Type | Description |
---|---|---|
config | Object | - |
config.left_key? | (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction | A function to build an Expression for the join key on the "left" table (defaults to the left table's primary key) |
config.left_selections | LS | A record of functions to build Expressions for the fields to output from the "left" table (defaults to all fields) |
config.output_key | (fields : { [K in string | number | symbol]: Variable<ReturnType["type"]> }, left_input_key : Variable , right_input_key : Variable , inputs : Inputs ) => EastFunction | A function to build an Expression for the primary key of the output of the join operation (defaults to the left table's primary key) |
config.right_input | RI | The tabular stream holding data for the "right" side of the join operation |
config.right_key? | (fields : TypeToFields <ReturnType ["type" ]>, key : Variable , inputs : Inputs ) => EastFunction | A function to build an Expression for the join key on the "right" table (defaults to the right table's primary key) |
config.right_selections | RS | A record of functions to build Expressions for the fields to output from the "right" table (defaults to all fields) |
Returns
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Nullable<ReturnType["type"]> } & { [K in string | number | symbol]: Nullable<ReturnType["type"]> }>>, Inputs
>
a new
PipelineBuilderExample
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
role: StringType
})
)
const users = Stream("Users", UsersType);
const RoleType = DictType(
StringType,
StructType({
role: StringType,
expert: BooleanType
})
)
const roles = Stream("Roles", RoleType);
// create a stream that combines a user with their role
const pipeline = new PipelineBuilder("User with Role")
.from(users)
.input({ name: "roles", stream: roles })
.outerJoin({
right_input: inputs => inputs.roles,
left_selections: {
username: fields => fields.username,
email: fields => fields.email,
role: fields => fields.role,
},
right_selections: {
role: fields => fields.role,
expert: fields => fields.expert
},
output_key: fields => fields.role
})
.toTemplate();
outputStream
▸ outputStream():
Stream
Return the
Stream containing the output of the pipeline.Returns
Stream
Example
// Create a datastream where a password can be written by end-user at runtime.
const hourly = new SourceBuilder("DatabasePassword")
.writeable(StringType)
.outputStream()
rightJoin
▸ rightJoin(config
):
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: Nullable<ReturnType["type"]> } & { [K in string | number | symbol]: ReturnType["type"] }>>, Inputs
>
Apply a right join operation, matching rows from the current ("left") table with
an additional "right" table. If a right row does not have any matching left rows,
the left row's selections are taken to be null
.
Type parameters
Name | Type |
---|---|
RI | extends (inputs : Inputs ) => Variable <DictType > |
LS | extends Record <string , (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction > |
RS | extends Record <string , (fields : TypeToFields <ReturnType ["type" ]>, key : Variable , inputs : Inputs ) => EastFunction > |
Parameters
Name | Type | Description |
---|---|---|
config | Object | - |
config.left_key? | (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction | A function to build an Expression for the join key on the "left" table (defaults to the left table's primary key) |
config.left_selections | LS | A record of functions to build Expressions for the fields to output from the "left" table (defaults to all fields) |
config.output_key | (fields : { [K in string | number | symbol]: Variable<ReturnType["type"]> }, left_input_key : Variable , right_input_key : Variable , inputs : Inputs ) => EastFunction | A function to build an Expression for the primary key of the output of the join operation (defaults to the right table's primary key) |
config.right_input | RI | The tabular stream holding data for the "right" side of the join operation |
config.right_key? | (fields : TypeToFields <ReturnType ["type" ]>, key : Variable , inputs : Inputs ) => EastFunction | A function to build an Expression for the join key on the "right" table (defaults to the right table's primary key) |
config.right_selections | RS | A record of functions to build Expressions for the fields to output from the "right" table (defaults to all fields) |
Returns
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: Nullable<ReturnType["type"]> } & { [K in string | number | symbol]: ReturnType["type"] }>>, Inputs
>
a new
PipelineBuilderExample
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
role: StringType
})
)
const users = Stream("Users", UsersType);
const RoleType = DictType(
StringType,
StructType({
role: StringType,
expert: BooleanType
})
)
const roles = Stream("Roles", RoleType);
// create a stream that combines a user with their role
const pipeline = new PipelineBuilder("User with Role")
.from(users)
.input({ name: "roles", stream: roles })
.rightJoin({
right_input: inputs => inputs.roles,
left_selections: {
username: fields => fields.username,
email: fields => fields.email,
role: fields => fields.role,
},
right_selections: {
role: fields => fields.role,
expert: fields => fields.expert
},
output_key: fields => fields.role
})
.toTemplate();
select
▸ select(config
):
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs
>
Apply a selection to the input data based on user-defined expressions. Optionally, a new primary key may be specified - if so the user must ensure the keys remain distinct.
Type parameters
Name | Type |
---|---|
S | extends Record <string , (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction > |
Parameters
Name | Type | Description |
---|---|---|
config | Object | - |
config.keep_all | true | If true , append the selections to the existing fields |
config.output_key? | (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction | A function that generates an Expression that identifies the row uniquely based on the output fields |
config.selections | S | An object containing functions generating Expressions for output field values based on the available fields |
Returns
TabularPipelineBuilder
<DictType
<StringType
, StructType
<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs
>
a new
PipelineBuilderExample
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.input({ name: "admin", stream: admin })
.select({
keep_all: true,
selections: {
hash: (fields, _, inputs) => AsciiToBase64(StringJoin`${fields.username}.${inputs.admin}`)
}
})
.toTemplate();
toCsv
▸ toCsv(config
):
BlobPipelineBuilder
Unparse tabular
Stream into a CSV formatted BlobType Stream.Type parameters
Name | Type |
---|---|
S | extends Record <string , (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction > |
Parameters
Name | Type | Description |
---|---|---|
config | Object | the configuration of the CSV unparsing |
config.delimiter? | string | The delimiter to seperate columns (default "," ) |
config.newline? | string | The delimiter to seperate rows (default "\n" ) |
config.null_str? | string | The str used for empty values |
config.selections | S | The selections to unparse into |
config.skip_n? | bigint | Skip this many rows from the top of the file |
Returns
BlobPipelineBuilder
a new
PipelineBuilderExample
const username = Stream("Username", DictType(StringType, StructType({ float: FloatType })));
const float = Stream("float", FloatType);
const pipeline = new PipelineBuilder("BasicAuth")
.input({ name: "float", stream: float })
.toCsv({
selections: {
float: (fields, _key, inputs) => Add(fields.float, inputs.float)
},
skip_n: 20n,
delimiter: "|",
})
.toTemplate();
toJsonLines
▸ toJsonLines(config
):
BlobPipelineBuilder
Unparse tabular
Stream into a JSONLines formatted BlobType Stream.Type parameters
Name | Type |
---|---|
S | extends Record <string , (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction > |
Parameters
Name | Type | Description |
---|---|---|
config | Object | the configuration of the JSONLines unparsing |
config.selections | S | The selections to unparse into |
Returns
BlobPipelineBuilder
a new
PipelineBuilderExample
const username = Stream("Username", DictType(StringType, StructType({ float: FloatType })));
const float = Stream("float", FloatType);
const pipeline = new PipelineBuilder("BasicAuth")
.input({ name: "float", stream: float })
.toJsonLines({
selections: {
float: (fields, _key, inputs) => Add(fields.float, inputs.float)
},
})
.toTemplate();
toTemplate
▸ toTemplate():
Template
Convert the built pipeline into an
Template, for usage in an EDK project.Returns
Template
a
TemplateExample
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream of non admin users, from all users
const template = new PipelineBuilder("Non-Admin Users")
.from(users)
.error({
if: (value) => Equal(Size(value), 0n),
message: () => Const("No users defined")
})
.input({ name: "admin", stream: admin })
.error({
if: (_, { admin }) => Equal(admin, ""),
message: () => Const("Admin user can't be empty")
})
.filter((fields, _, inputs) => NotEqual(fields.username, inputs.admin))
.warnEvery({
predicate: (fields) => Equal(fields.password, ""),
message: (fields) => StringJoin`User ${fields.username} has empty password`
})
.toTemplate();
Overrides
Builder.toTemplate
toXlsx
▸ toXlsx(config
):
BlobPipelineBuilder
Unparse tabular
Stream into a XLSX formatted BlobType Stream.Type parameters
Name | Type |
---|---|
S | extends Record <string , (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction > |
Parameters
Name | Type | Description |
---|---|---|
config | Object | the configuration of the XLSX unparsing |
config.null_str? | string | - |
config.selections | S | - |
config.sheet? | string | - |
Returns
BlobPipelineBuilder
a new
PipelineBuilderExample
const username = Stream("Username", DictType(StringType, StructType({ float: FloatType })));
const float = Stream("float", FloatType);
const pipeline = new PipelineBuilder("BasicAuth")
.input({ name: "float", stream: float })
.toXlsx({
selections: {
float: (fields, _key, inputs) => Add(fields.float, inputs.float)
},
skip_n: 20n,
delimiter: "|",
})
.toTemplate();
transform
▸ transform(f
): ReturnType
["type"
] extends
DictType
? TabularPipelineBuilder
: GenericPipelineBuilder
<ReturnType
["type"
], Inputs
>
Transform the entire input
Stream based on an EastFunction.Type parameters
Name | Type |
---|---|
F | extends (value : Variable , inputs : Inputs ) => EastFunction |
Parameters
Name | Type | Description |
---|---|---|
f | F | an EastFunction function that generates the output Expression |
Returns
ReturnType
["type"
] extends
DictType
? TabularPipelineBuilder
: GenericPipelineBuilder
<ReturnType
["type"
], Inputs
>
a new
PipelineBuilderExample
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
// create a stream of the total number of users
const pipeline = new PipelineBuilder("Number Of Users")
.from(users)
.transform((users) => Size(users))
.toTemplate();
warn
▸ warn(config
):
TabularPipelineBuilder
Add a warning on the pipeline inputs and ouputs to identify problems. When the if
predicate returns true
the
pipeline will register a warning with a message, but will proceed to proceed to produce output data.
Parameters
Name | Type | Description |
---|---|---|
config | Object | the warning message and predicate |
config.if | (value : Variable , inputs : Inputs ) => EastFunction | If true a warning will be created |
config.message | string | (value : Variable , inputs : Inputs ) => EastFunction | The message in the case that an warning is created |
Returns
TabularPipelineBuilder
Example
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.warn({
if: (value) => Equal(Size(value), 0n),
message: () => Const("No users defined")
})
.input({ name: "admin", stream: admin })
.warn({
if: (_, { admin }) => Equal(admin, ""),
message: () => Const("Admin user can't be empty")
})
.filter((fields, _, inputs) => NotEqual(fields.username, inputs.admin))
.toTemplate();
warnEvery
▸ warnEvery(config
):
TabularPipelineBuilder
Add a warning on every row ofthe pipeline ouput to identify problems. For each row, when the if
predicate returns true
the
pipeline will register a warning with a message, but will proceed to proceed to produce output data.
Parameters
Name | Type | Description |
---|---|---|
config | Object | the warning message and predicate |
config.if | (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction | If true a warning will be created |
config.message | string | (fields : TypeToFields , key : Variable , inputs : Inputs ) => EastFunction | The message in the case that a warning is created |
Returns
TabularPipelineBuilder
Example
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.input({ name: "admin", stream: admin })
.filter((fields, _, inputs) => NotEqual(fields.username, inputs.admin))
.warnEvery({
if: (fields) => Equal(fields.password, ""),
message: (fields) => StringJoin`User ${fields.username} has empty password`
})
.toTemplate();