Skip to main content

Pipeline - TabularPipelineBuilder

Pipeline.TabularPipelineBuilder

A TabularPipelineBuilder to build a data

Pipeline.

Methods on the TabularPipelineBuilder enable you to directly transform data from a tabular

Stream by applying a transform operation to the whole Stream, or applying select, filter, join, aggregate, disaggregate and offset each entry in the Stream.

Quality of data can be observed using assert and warn by providing expression based conditions and messages, either to the whole

Stream, or to every entry in the Stream.

A corresponding

Template can be created using .toTemplate().

Remarks

See

Transform Data for a related learning module.

Example

 const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);

// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.error({
if: (value) => Equal(Size(value), 0n),
message: () => Const("No users defined")
})
.input({ name: "admin", stream: admin })
.error({
if: (_, { admin }) => Equal(admin, ""),
message: () => Const("Admin user can't be empty")
})
.filter((fields, _, inputs) => NotEqual(fields.username, inputs.admin))
.warnEvery({
if: (fields) => Equal(fields.password, ""),
message: (fields) => StringJoin`User ${fields.username} has empty password`
})
.toTemplate();

Type parameters

NameType
Outputextends DictType
Inputsextends Record

Hierarchy

  • Builder

    TabularPipelineBuilder

Pipeline

aggregate

aggregate(config):

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string]: ReturnType["type"] } & { [K in string | number | symbol]: ReturnType["type"] }>>, Inputs>

A aggregate (or group) a row into based with defined methods.

Type parameters

NameType
GroupNameextends string
Gextends (fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunction
Aextends Record<string, (fields: TypeToFields, key: Variable, inputs: Inputs) => AggregationDefinition>

Parameters

NameTypeDescription
configObject-
config.aggregationsAA function the generates a colleciton of Expression's for the group values
config.group_nameGroupNameA name of the group variable created
config.group_valueGA function that generates an Expression for the value of the group

Returns

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string]: ReturnType["type"] } & { [K in string | number | symbol]: ReturnType["type"] }>>, Inputs>

a new

PipelineBuilder

Example

  const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
role: StringType
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream with the count of non admin users per role
const pipeline = new PipelineBuilder("Non-Admin Roles Count")
.from(users)
.input({ name: "admin", stream: admin })
.aggregate({
group_name: "group",
group_value: (fields) => fields.role,
aggregations: {
sum: (fields, _key, inputs) => Sum(IfElse(Equal(fields.email, inputs.admin), 0,1))
}
})
.toTemplate();

concatenate

concatenate(config):

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: Output["value"]["value"]["value"][K] } & { [K in string]: StringType }>>, Inputs>

Concatenate one-or-more other tabular data sets of the same type to the input data.

To avoid clashes with primary key an optional discriminator may be provided to prepend the keys of each table.

Type parameters

NameType
DiscriminatorNameextends string

Parameters

NameTypeDescription
configObject-
config.discriminator_nameDiscriminatorNameThe discriminator name to insert into the output
config.discriminator_valuestringThe discriminator to prepend to the primary key of the input table
config.inputs{ discriminator_value: string ; input: (inputs: Inputs) => Variable }[]The other tabular data to prepend to the primary key of the input

Returns

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: Output["value"]["value"]["value"][K] } & { [K in string]: StringType }>>, Inputs>

a new

PipelineBuilder

Example

const pipeline = new PipelineBuilder("Concatenator")
.from(stream)
.input({ name: "other", stream: other_stream })
.concatenate({
discriminator_value: "A",
inputs: [{ input: inputs => inputs.other, discriminator_value: "B" }]
});

disaggregateArray

disaggregateArray(config):

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs>

Apply a disaggregation to an array in each row, producing one row per entry in the array.

Type parameters

NameType
Cextends (fields: TypeToFields, input_key: Variable, inputs: Inputs) => EastFunction
Sextends Record<string, (fields: TypeToFields, value: Variable<ReturnType["type"]["value"]>, collection_key: Variable, input_key: Variable, inputs: Inputs) => EastFunction>

Parameters

NameTypeDescription
configObject-
config.collectionCA function that generates an Expression for the array to disaggregate based on the input fields
config.keep_alltrueIf true, append the selections to the existing fields
config.output_key?(fields: TypeToFields, value: Variable<ReturnType["type"]["value"]>, collection_key: Variable, input_key: Variable, inputs: Inputs) => EastFunctionA function that generates an Expression that identifies the row uniquely based on the output fields
config.selectionsSAn object containing functions generating Expressions for output field values based on the input fields and each value and key from the array

Returns

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs>

a new

PipelineBuilder

Example

 const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
roles: ArrayType(StringType)
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream of users roles
const pipeline = new PipelineBuilder("User Roles")
.from(users)
.input({ name: "admin", stream: admin })
.disaggregateArray({
collection: fields => fields.roles,
keep_all: true,
selections: {
role: (_, __, value) => value,
hash: (fields, _, __, ___, inputs) => AsciiToBase64(StringJoin`${fields.username}.${inputs.admin}`)
}
})
.toTemplate();

disaggregateDict

disaggregateDict(config):

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs>

Apply a disaggregation to a dictionary in each row, producing one row per entry in the dictionary.

Type parameters

NameType
Cextends (fields: TypeToFields, input_key: Variable, inputs: Inputs) => EastFunction
Sextends Record<string, (fields: TypeToFields, value: Variable<ReturnType["type"]["value"]["value"]>, collection_key: Variable<ReturnType["type"]["value"]["key"]>, input_key: Variable, inputs: Inputs) => EastFunction>

Parameters

NameTypeDescription
configObject-
config.collectionCA function that generates an Expression for the dictionary to disaggregate based on the input fields
config.keep_alltrueIf true, append the selections to the existing fields
config.output_key?(fields: TypeToFields, value: Variable<ReturnType["type"]["value"]["value"]>, collection_key: Variable<ReturnType["type"]["value"]["key"]>, input_key: Variable, inputs: Inputs) => EastFunctionA function that generates an Expression that identifies the row uniquely based on the output fields
config.selectionsSAn object containing functions generating Expressions for output field values based on the input fields and each value and key from the dictionary

Returns

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs>

a new

PipelineBuilder

Example

 const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
roles: DictType(StringType, FloatType)
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream of users roles
const pipeline = new PipelineBuilder("User Roles")
.from(users)
.input({ name: "admin", stream: admin })
.disaggregateDict({
collection: fields => fields.roles,
keep_all: true,
selections: {
role: (_, __, value) => value,
hash: (fields, _, __, ___, inputs) => AsciiToBase64(StringJoin`${fields.username}.${inputs.admin}`)
}
})
.toTemplate();

disaggregateSet

disaggregateSet(config):

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs>

Apply a disaggregation to a set in each row, producing one row per entry in the set.

Type parameters

NameType
Cextends (fields: TypeToFields, input_key: Variable, inputs: Inputs) => EastFunction
Sextends Record<string, (fields: TypeToFields, collection_key: Variable<ReturnType["type"]["value"]>, input_key: Variable, inputs: Inputs) => EastFunction>

Parameters

NameTypeDescription
configObject-
config.collectionCA function that generates an Expression for the set to disaggregate based on the input fields
config.keep_alltrueIf true, append the selections to the existing fields
config.output_key?(fields: TypeToFields, collection_key: Variable, input_key: Variable, inputs: Inputs) => EastFunctionA function that generates an Expression that identifies the row uniquely based on the output fields
config.selectionsSAn object containing functions generating Expressions for output field values based on the input fields and each key from the set

Returns

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs>

a new

PipelineBuilder

Example

 const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
roles: SetType(StringType)
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);
// create a stream of users roles
const pipeline = new PipelineBuilder("User Roles")
.from(users)
.input({ name: "admin", stream: admin })
.disaggregateSet({
collection: fields => fields.roles,
keep_all: true,
selections: {
role: (_, __, value) => value,
hash: (fields, _, __, inputs) => AsciiToBase64(StringJoin`${fields.username}.${inputs.admin}`)
}
})
.toTemplate();

error

error(config):

TabularPipelineBuilder

Add an assertion on the pipeline inputs and output to identify errors. When the if predicate returns true the pipeline will be terminated with an error message and output data will not be produced.

Parameters

NameTypeDescription
configObjectthe error message and predicate
config.if(value: Variable, inputs: Inputs) => EastFunctionIf true an error will be created
config.messagestring | (value: Variable, inputs: Inputs) => EastFunctionThe message in the case that an error is created

Returns

TabularPipelineBuilder

Example

 const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);

// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.error({
if: (value) => Equal(Size(value), 0n),
message: () => Const("No users defined")
})
.input({ name: "admin", stream: admin })
.error({
predicate: (_, { admin }) => Equal(admin, ""),
message: () => Const("Admin user can't be empty")
})
.toTemplate();

errorEvery

errorEvery(config):

TabularPipelineBuilder

Add an assertion on every row of the pipeline output to identify errors. For each row, when the if predicate returns true the pipeline will be terminated with an error message and output data will not be produced.

Parameters

NameTypeDescription
configObjectthe error message and predicate
config.if(fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunctionIf true an error will be created
config.messagestring | (fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunctionThe message in the case that an error is created

Returns

TabularPipelineBuilder

Example

 const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);

// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.input({ name: "admin", stream: admin })
.filter((fields, _, inputs) => NotEqual(fields.username, inputs.admin))
.errorEvery({
if: (fields) => Equal(fields.password, ""),
message: (fields) => StringJoin`User ${fields.username} has empty password`
})
.toTemplate();

filter

filter(predicate):

TabularPipelineBuilder

Apply a filter to the input data based on a user-defined predicate. Rows where the predicate evaluates to true are kept, and rows where predicate is false are discarded.

Parameters

NameType
predicate(fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunction

Returns

TabularPipelineBuilder

a new

PipelineBuilder

Example

 const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);

// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.input({ name: "admin", stream: admin })
.filter((fields, _, inputs) => NotEqual(fields.username, inputs.admin))
.toTemplate();

histogram

histogram(config):

TabularPipelineBuilder<DictType<StringType, StructType<{ bin: IntegerType ; bin_end: FloatType ; bin_start: FloatType ; group: ReturnType["type"] ; histogram: FloatType }>>, Inputs>

Transform data to obtain a histogram of data in the rows, optionally by group.

Type parameters

NameType
GKextends (fields: TypeToFields, input_key: Variable, inputs: Inputs) => EastFunction

Parameters

NameTypeDescription
configObject-
config.cumulative?booleanIf true, calculate a cumulative distribution (default false)
config.density?booleanWhether the probabilities should be interpretted as a density and scaled by the step size (default false)
config.groupGKan optional Expression defining the group key
config.minimum?numberthe minimum value to consider in the distribution
config.n_bins?numberthe numbe rof bins in the histogram
config.normalization?"count" | "percentage" | "probability"the normalization of the histogram - 'count' (default), 'percentage' or 'probability'
config.samples(fields: TypeToFields, input_key: Variable, inputs: Inputs) => EastFunctionthe function for the value samples
config.step?numberthe histogram step size to apply

Returns

TabularPipelineBuilder<DictType<StringType, StructType<{ bin: IntegerType ; bin_end: FloatType ; bin_start: FloatType ; group: ReturnType["type"] ; histogram: FloatType }>>, Inputs>

a new

PipelineBuilder

Example

 const TasksType = DictType(
StringType,
StructType({
username: StringType,
task: StringType,
duration: FloatType
})
)
const tasks = Stream("Task", TasksType);
// create a stream of historgram of task duration per user
const pipeline = new PipelineBuilder("User Roles")
.from(tasks)
.histogram({
group: fields => fields.username,
samples: (fields) => fields.duration,
normalization: "PercentDensity",
})
.toTemplate();

innerJoin

innerJoin(config):

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: ReturnType["type"] }>>, Inputs>

Apply an inner join operation, matching rows from the current ("left") table with an additional "right" table. Rows without matching join keys are discarded.

Type parameters

NameType
RIextends (inputs: Inputs) => Variable<DictType>
LSextends Record<string, (fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunction>
RSextends Record<string, (fields: TypeToFields<ReturnType["type"]>, key: Variable, inputs: Inputs) => EastFunction>

Parameters

NameTypeDescription
configObject-
config.left_key?(fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunctionA function to build an Expression for the join key on the "left" table (defaults to the left table's primary key)
config.left_selectionsLSA record of functions to build Expressions for the fields to output from the "left" table (defaults to all fields)
config.output_key(fields: { [K in string | number | symbol]: Variable<ReturnType["type"]> } & { [K in string | number | symbol]: Variable<ReturnType["type"]> }, left_input_key: Variable, right_input_key: Variable, inputs: Inputs) => EastFunctionA function to build an Expression for the primary key of the output of the join operation (defaults to the left table's primary key)
config.right_inputRIThe tabular stream holding data for the "right" side of the join operation
config.right_key?(fields: TypeToFields<ReturnType["type"]>, key: Variable, inputs: Inputs) => EastFunctionA function to build an Expression for the join key on the "right" table (defaults to the right table's primary key)
config.right_selectionsRSA record of functions to build Expressions for the fields to output from the "right" table (defaults to all fields)

Returns

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: ReturnType["type"] }>>, Inputs>

a new

PipelineBuilder

Example

  const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
role: StringType
})
)
const users = Stream("Users", UsersType);
const RoleType = DictType(
StringType,
StructType({
role: StringType,
expert: BooleanType
})
)
const roles = Stream("Roles", RoleType);
// create a stream that combines a user with their role
const pipeline = new PipelineBuilder("User with Role")
.from(users)
.input({ name: "roles", stream: roles })
.innerJoin({
right_input: inputs => inputs.roles,
left_selections: {
username: fields => fields.username,
email: fields => fields.email,
role: fields => fields.role,
},
right_selections: {
role: fields => fields.role,
expert: fields => fields.expert
},
output_key: fields => fields.role
})
.toTemplate();

input

input(config):

TabularPipelineBuilder<Output, Inputs & { [K in string]: Variable }>

Add an additional named input

Stream to the Pipeline.

Type parameters

NameType
Nameextends string
Textends EastType

Parameters

NameTypeDescription
configObjectthe input stream and the resulting variable name
config.nameName extends "input" | keyof Inputs ? never : Namethe name to give the input Variable
config.streamStreamthe input stream (the stream and associated preconditions)

Returns

TabularPipelineBuilder<Output, Inputs & { [K in string]: Variable }>

a new

PipelineBuilder

Example

 const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);

// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.input({ name: "admin", stream: admin })
.toTemplate();

leftJoin

leftJoin(config):

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Nullable<ReturnType["type"]> }>>, Inputs>

Apply a left join operation, matching rows from the current ("left") table with an additional "right" table. If a left row does not have any matching right rows, the right row's selections are taken to be null.

Type parameters

NameType
RIextends (inputs: Inputs) => Variable<DictType>
LSextends Record<string, (fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunction>
RSextends Record<string, (fields: TypeToFields<ReturnType["type"]>, key: Variable, inputs: Inputs) => EastFunction>

Parameters

NameTypeDescription
configObject-
config.left_key?(fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunctionA function to build an Expression for the join key on the "left" table (defaults to the left table's primary key)
config.left_selectionsLSA record of functions to build Expressions for the fields to output from the "left" table (defaults to all fields)
config.output_key(fields: { [K in string | number | symbol]: Variable<ReturnType["type"]> }, left_input_key: Variable, right_input_key: Variable, inputs: Inputs) => EastFunctionA function to build an Expression for the primary key of the output of the join operation (defaults to the left table's primary key)
config.right_inputRIThe tabular stream holding data for the "right" side of the join operation
config.right_key?(fields: TypeToFields<ReturnType["type"]>, key: Variable, inputs: Inputs) => EastFunctionA function to build an Expression for the join key on the "right" table (defaults to the right table's primary key)
config.right_selectionsRSA record of functions to build Expressions for the fields to output from the "right" table (defaults to all fields)

Returns

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Nullable<ReturnType["type"]> }>>, Inputs>

a new

PipelineBuilder

Example

  const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
role: StringType
})
)
const users = Stream("Users", UsersType);
const RoleType = DictType(
StringType,
StructType({
role: StringType,
expert: BooleanType
})
)
const roles = Stream("Roles", RoleType);
// create a stream that combines a user with their role
const pipeline = new PipelineBuilder("User with Role")
.from(users)
.input({ name: "roles", stream: roles })
.leftJoin({
right_input: inputs => inputs.roles,
left_selections: {
username: fields => fields.username,
email: fields => fields.email,
role: fields => fields.role,
},
right_selections: {
role: fields => fields.role,
expert: fields => fields.expert
},
output_key: fields => fields.role
})
.toTemplate();

let

let(name):

TabularPipelineBuilder<Output, Inputs & { [K in string]: Variable }>

Give a name to the current

Pipeline output to be used as an input later in the Pipeline (i.e. after the next operation)

Type parameters

NameType
Nameextends string

Parameters

NameTypeDescription
nameName extends "input" | keyof Inputs ? never : Namethe input stream and the resulting variable name

Returns

TabularPipelineBuilder<Output, Inputs & { [K in string]: Variable }>

a new

PipelineBuilder

Example

 // create a stream of non admin users, and seperate admin user
const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);

// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Users")
.from(users)
.let("all")
.input({ name: "admin", stream: admin })
.filter((fields, _, inputs) => NotEqual(fields.username, inputs.admin))
.transform((users, inputs) => Struct({
"Non-Admin Users": users,
"Admin User": Get(inputs.all, inputs.admin)
}))
.toTemplate();

log

log(config):

TabularPipelineBuilder

Produce a log message depending on the pipeline inputs and ouputs. When the if the predicate returns true the pipeline will produce a log message and proceed.

Parameters

NameTypeDescription
configObjectthe log message and optional predicate
config.if(value: Variable, inputs: Inputs) => EastFunctionIf true a log message will be created
config.messagestring | (value: Variable, inputs: Inputs) => EastFunctionThe log message

Returns

TabularPipelineBuilder

Example

 const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);

// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.log({
if: (value) => Equal(Size(value), 0n),
message: () => Const("No users defined")
})
.input({ name: "admin", stream: admin })
.log({
if: (_, { admin }) => Equal(admin, ""),
message: () => Const("Admin user can't be empty")
})
.filter((fields, _, inputs) => NotEqual(fields.username, inputs.admin))
.toTemplate();

logEvery

logEvery(config):

TabularPipelineBuilder

Produce a log message for every row of the pipeline ouput. For each row, when the if the predicate returns true the pipeline will produce a log message and proceed.

Parameters

NameTypeDescription
configObjectthe log message and optional predicate
config.if?(fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunctionIf true a log message will be created
config.messagestring | (fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunctionThe log message

Returns

TabularPipelineBuilder

Example

 const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);

// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.input({ name: "admin", stream: admin })
.filter((fields, _, inputs) => NotEqual(fields.username, inputs.admin))
.logEvery({
if: (fields) => Equal(fields.password, ""),
message: (fields) => StringJoin`User ${fields.username} has empty password`
})
.toTemplate();

offset

offset(config):

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs>

Apply a transformation on a row-by-row basis by pulling in data from related rows.

Rows can (optionally) be grouped by a group key, and are sorted by a sort key. The offset indicates the offset to the requested row within the group according to the sort order. For example, an offset of 1 would relate to the next row in sorted order, and -1 would relate to the previous row in sort order. If such a row does not exist, the provided fields are null. An offset_exists variable is provided to indicate whether the row exists or not.

Type parameters

NameType
Sextends Record<string, (fields: { [K in string | number | symbol]: Variable<Nullable> }, key: Variable, offset_exists: Variable, inputs: Inputs) => EastFunction>

Parameters

NameTypeDescription
configObject-
config.group_key?(fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunctionA function returning an Expression for the value to group the rows by
config.offsetnumberThe offset of the row to provide to offset_selections
config.offset_selectionsSAn object containing functions generating Expressions for output field values based on the available fields in the offset row, as well as a variable indicating if the row exists
config.sort_key(fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunctionA function returning an Expression for the value to sort the rows by

Returns

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs>

a new

PipelineBuilder

Example

 const TasksType = DictType(
StringType,
StructType({
username: StringType,
task: StringType,
date: DateTimeType
})
)
const tasks = Stream("Task", TasksType);
// create a stream with tasks, including next task date
const pipeline = new PipelineBuilder("Tasks with Next")
.from(tasks)
.offset({
group_key: fields => fields.username,
sort_key: fields => fields.date,
offset: 1,
offset_selections: {
next_date: (fields) => fields.date
}
})
.toTemplate();

outerJoin

outerJoin(config):

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Nullable<ReturnType["type"]> } & { [K in string | number | symbol]: Nullable<ReturnType["type"]> }>>, Inputs>

Apply a full outer join operation, matching rows from the current ("left") table with an additional "right" table. If a left row does not have any matching right rows, the right row's selections are taken to be null. Furthermore, if a right row does not have any matching left rows, the left row's selections are taken to be null.

Type parameters

NameType
RIextends (inputs: Inputs) => Variable<DictType>
LSextends Record<string, (fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunction>
RSextends Record<string, (fields: TypeToFields<ReturnType["type"]>, key: Variable, inputs: Inputs) => EastFunction>

Parameters

NameTypeDescription
configObject-
config.left_key?(fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunctionA function to build an Expression for the join key on the "left" table (defaults to the left table's primary key)
config.left_selectionsLSA record of functions to build Expressions for the fields to output from the "left" table (defaults to all fields)
config.output_key(fields: { [K in string | number | symbol]: Variable<ReturnType["type"]> }, left_input_key: Variable, right_input_key: Variable, inputs: Inputs) => EastFunctionA function to build an Expression for the primary key of the output of the join operation (defaults to the left table's primary key)
config.right_inputRIThe tabular stream holding data for the "right" side of the join operation
config.right_key?(fields: TypeToFields<ReturnType["type"]>, key: Variable, inputs: Inputs) => EastFunctionA function to build an Expression for the join key on the "right" table (defaults to the right table's primary key)
config.right_selectionsRSA record of functions to build Expressions for the fields to output from the "right" table (defaults to all fields)

Returns

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Nullable<ReturnType["type"]> } & { [K in string | number | symbol]: Nullable<ReturnType["type"]> }>>, Inputs>

a new

PipelineBuilder

Example

  const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
role: StringType
})
)
const users = Stream("Users", UsersType);
const RoleType = DictType(
StringType,
StructType({
role: StringType,
expert: BooleanType
})
)
const roles = Stream("Roles", RoleType);
// create a stream that combines a user with their role
const pipeline = new PipelineBuilder("User with Role")
.from(users)
.input({ name: "roles", stream: roles })
.outerJoin({
right_input: inputs => inputs.roles,
left_selections: {
username: fields => fields.username,
email: fields => fields.email,
role: fields => fields.role,
},
right_selections: {
role: fields => fields.role,
expert: fields => fields.expert
},
output_key: fields => fields.role
})
.toTemplate();

outputStream

outputStream():

Stream

Return the

Stream containing the output of the pipeline.

Returns

Stream

Example

// Create a datastream where a password can be written by end-user at runtime.
const hourly = new SourceBuilder("DatabasePassword")
.writeable(StringType)
.outputStream()

rightJoin

rightJoin(config):

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: Nullable<ReturnType["type"]> } & { [K in string | number | symbol]: ReturnType["type"] }>>, Inputs>

Apply a right join operation, matching rows from the current ("left") table with an additional "right" table. If a right row does not have any matching left rows, the left row's selections are taken to be null.

Type parameters

NameType
RIextends (inputs: Inputs) => Variable<DictType>
LSextends Record<string, (fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunction>
RSextends Record<string, (fields: TypeToFields<ReturnType["type"]>, key: Variable, inputs: Inputs) => EastFunction>

Parameters

NameTypeDescription
configObject-
config.left_key?(fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunctionA function to build an Expression for the join key on the "left" table (defaults to the left table's primary key)
config.left_selectionsLSA record of functions to build Expressions for the fields to output from the "left" table (defaults to all fields)
config.output_key(fields: { [K in string | number | symbol]: Variable<ReturnType["type"]> }, left_input_key: Variable, right_input_key: Variable, inputs: Inputs) => EastFunctionA function to build an Expression for the primary key of the output of the join operation (defaults to the right table's primary key)
config.right_inputRIThe tabular stream holding data for the "right" side of the join operation
config.right_key?(fields: TypeToFields<ReturnType["type"]>, key: Variable, inputs: Inputs) => EastFunctionA function to build an Expression for the join key on the "right" table (defaults to the right table's primary key)
config.right_selectionsRSA record of functions to build Expressions for the fields to output from the "right" table (defaults to all fields)

Returns

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: Nullable<ReturnType["type"]> } & { [K in string | number | symbol]: ReturnType["type"] }>>, Inputs>

a new

PipelineBuilder

Example

  const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
role: StringType
})
)
const users = Stream("Users", UsersType);
const RoleType = DictType(
StringType,
StructType({
role: StringType,
expert: BooleanType
})
)
const roles = Stream("Roles", RoleType);
// create a stream that combines a user with their role
const pipeline = new PipelineBuilder("User with Role")
.from(users)
.input({ name: "roles", stream: roles })
.rightJoin({
right_input: inputs => inputs.roles,
left_selections: {
username: fields => fields.username,
email: fields => fields.email,
role: fields => fields.role,
},
right_selections: {
role: fields => fields.role,
expert: fields => fields.expert
},
output_key: fields => fields.role
})
.toTemplate();

select

select(config):

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs>

Apply a selection to the input data based on user-defined expressions. Optionally, a new primary key may be specified - if so the user must ensure the keys remain distinct.

Type parameters

NameType
Sextends Record<string, (fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunction>

Parameters

NameTypeDescription
configObject-
config.keep_alltrueIf true, append the selections to the existing fields
config.output_key?(fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunctionA function that generates an Expression that identifies the row uniquely based on the output fields
config.selectionsSAn object containing functions generating Expressions for output field values based on the available fields

Returns

TabularPipelineBuilder<DictType<StringType, StructType<{ [K in string | number | symbol]: ReturnType["type"] } & { [K in string | number | symbol]: Output["value"]["value"]["value"][K] }>>, Inputs>

a new

PipelineBuilder

Example

 const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);

// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.input({ name: "admin", stream: admin })
.select({
keep_all: true,
selections: {
hash: (fields, _, inputs) => AsciiToBase64(StringJoin`${fields.username}.${inputs.admin}`)
}
})
.toTemplate();

toCsv

toCsv(config):

BlobPipelineBuilder

Unparse tabular

Stream into a CSV formatted BlobType Stream.

Type parameters

NameType
Sextends Record<string, (fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunction>

Parameters

NameTypeDescription
configObjectthe configuration of the CSV unparsing
config.delimiter?stringThe delimiter to seperate columns (default ",")
config.newline?stringThe delimiter to seperate rows (default "\n")
config.null_str?stringThe str used for empty values
config.selectionsSThe selections to unparse into
config.skip_n?bigintSkip this many rows from the top of the file

Returns

BlobPipelineBuilder

a new

PipelineBuilder

Example

 const username = Stream("Username", DictType(StringType, StructType({ float: FloatType })));
const float = Stream("float", FloatType);

const pipeline = new PipelineBuilder("BasicAuth")
.input({ name: "float", stream: float })
.toCsv({
selections: {
float: (fields, _key, inputs) => Add(fields.float, inputs.float)
},
skip_n: 20n,
delimiter: "|",
})
.toTemplate();

toJsonLines

toJsonLines(config):

BlobPipelineBuilder

Unparse tabular

Stream into a JSONLines formatted BlobType Stream.

Type parameters

NameType
Sextends Record<string, (fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunction>

Parameters

NameTypeDescription
configObjectthe configuration of the JSONLines unparsing
config.selectionsSThe selections to unparse into

Returns

BlobPipelineBuilder

a new

PipelineBuilder

Example

 const username = Stream("Username", DictType(StringType, StructType({ float: FloatType })));
const float = Stream("float", FloatType);

const pipeline = new PipelineBuilder("BasicAuth")
.input({ name: "float", stream: float })
.toJsonLines({
selections: {
float: (fields, _key, inputs) => Add(fields.float, inputs.float)
},
})
.toTemplate();

toTemplate

toTemplate():

Template

Convert the built pipeline into an

Template, for usage in an EDK project.

Returns

Template

a

Template

Example

 const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);

// create a stream of non admin users, from all users
const template = new PipelineBuilder("Non-Admin Users")
.from(users)
.error({
if: (value) => Equal(Size(value), 0n),
message: () => Const("No users defined")
})
.input({ name: "admin", stream: admin })
.error({
if: (_, { admin }) => Equal(admin, ""),
message: () => Const("Admin user can't be empty")
})
.filter((fields, _, inputs) => NotEqual(fields.username, inputs.admin))
.warnEvery({
predicate: (fields) => Equal(fields.password, ""),
message: (fields) => StringJoin`User ${fields.username} has empty password`
})
.toTemplate();

Overrides

Builder.toTemplate


toXlsx

toXlsx(config):

BlobPipelineBuilder

Unparse tabular

Stream into a XLSX formatted BlobType Stream.

Type parameters

NameType
Sextends Record<string, (fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunction>

Parameters

NameTypeDescription
configObjectthe configuration of the XLSX unparsing
config.null_str?string-
config.selectionsS-
config.sheet?string-

Returns

BlobPipelineBuilder

a new

PipelineBuilder

Example

 const username = Stream("Username", DictType(StringType, StructType({ float: FloatType })));
const float = Stream("float", FloatType);

const pipeline = new PipelineBuilder("BasicAuth")
.input({ name: "float", stream: float })
.toXlsx({
selections: {
float: (fields, _key, inputs) => Add(fields.float, inputs.float)
},
skip_n: 20n,
delimiter: "|",
})
.toTemplate();

transform

transform(f): ReturnType["type"] extends

DictType ? TabularPipelineBuilder : GenericPipelineBuilder<ReturnType["type"], Inputs>

Transform the entire input

Stream based on an EastFunction.

Type parameters

NameType
Fextends (value: Variable, inputs: Inputs) => EastFunction

Parameters

NameTypeDescription
fFan EastFunction function that generates the output Expression

Returns

ReturnType["type"] extends

DictType ? TabularPipelineBuilder : GenericPipelineBuilder<ReturnType["type"], Inputs>

a new

PipelineBuilder

Example

 const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);

// create a stream of the total number of users
const pipeline = new PipelineBuilder("Number Of Users")
.from(users)
.transform((users) => Size(users))
.toTemplate();

warn

warn(config):

TabularPipelineBuilder

Add a warning on the pipeline inputs and ouputs to identify problems. When the if predicate returns true the pipeline will register a warning with a message, but will proceed to proceed to produce output data.

Parameters

NameTypeDescription
configObjectthe warning message and predicate
config.if(value: Variable, inputs: Inputs) => EastFunctionIf true a warning will be created
config.messagestring | (value: Variable, inputs: Inputs) => EastFunctionThe message in the case that an warning is created

Returns

TabularPipelineBuilder

Example

 const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);

// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.warn({
if: (value) => Equal(Size(value), 0n),
message: () => Const("No users defined")
})
.input({ name: "admin", stream: admin })
.warn({
if: (_, { admin }) => Equal(admin, ""),
message: () => Const("Admin user can't be empty")
})
.filter((fields, _, inputs) => NotEqual(fields.username, inputs.admin))
.toTemplate();

warnEvery

warnEvery(config):

TabularPipelineBuilder

Add a warning on every row ofthe pipeline ouput to identify problems. For each row, when the if predicate returns true the pipeline will register a warning with a message, but will proceed to proceed to produce output data.

Parameters

NameTypeDescription
configObjectthe warning message and predicate
config.if(fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunctionIf true a warning will be created
config.messagestring | (fields: TypeToFields, key: Variable, inputs: Inputs) => EastFunctionThe message in the case that a warning is created

Returns

TabularPipelineBuilder

Example

 const UsersType = DictType(
StringType,
StructType({
username: StringType,
email: StringType,
})
)
const users = Stream("Users", UsersType);
const admin = Stream("Admin", StringType);

// create a stream of non admin users, from all users
const pipeline = new PipelineBuilder("Non-Admin Users")
.from(users)
.input({ name: "admin", stream: admin })
.filter((fields, _, inputs) => NotEqual(fields.username, inputs.admin))
.warnEvery({
if: (fields) => Equal(fields.password, ""),
message: (fields) => StringJoin`User ${fields.username} has empty password`
})
.toTemplate();