Skip to main content

Pipeline - BlobPipelineBuilder

Pipeline.BlobPipelineBuilder

A BlobPipelineBuilder to build a data

Pipeline.

Methods on the BlobPipelineBuilder enable you to directly transform data from any

Stream by applying a transform operation, or alternatively parse files in formats such as jsonl, json, csv, xlsx.

Quality of data can be observed using assert and warn by providing expression based conditions and messages to the whole

Stream.

A corresponding

Template can be created using .toTemplate().

Remarks

See

Transform Data for a related learning module.

Example

 // get a predefined pipeline table
const file = Stream("File", BlobType);

const pipeline = new PipelineBuilder("ReadJson")
.from(file)
.error({
if: file => Equal(Size(file), 0n),
message: "File is empty"
})
.fromJson({
type: StructType({
float: FloatType,
}),
})
.toTemplate();

Type parameters

NameType
Outputextends BlobType
Inputsextends Record

Hierarchy

  • Builder

    BlobPipelineBuilder

Pipeline

error

error(config):

BlobPipelineBuilder

Add an assertion on the pipeline inputs and output to identify errors. When the if predicate returns true the pipeline will be terminated with an error message and output data will not be produced.

Parameters

NameTypeDescription
configObjectthe error message and predicate
config.if(value: Variable, inputs: Inputs) => EastFunctionIf true an error will be created
config.messagestring | (value: Variable, inputs: Inputs) => EastFunctionThe message in the case that an error is created

Returns

BlobPipelineBuilder

Example

 const username = Stream("Username", StringType);
const password = Stream("Password", StringType);

const pipeline = new PipelineBuilder("BasicAuth")
.from(username)
.error({
if: username => Equal(username, ""),
message: "Username is empty"
})
.input({ name: "password", stream: password })
.transform((username, { password }) => StringJoin`${username}:${password}`)
.error({
if: str => Equal(str, ":"),
message: () => Const("Unexpected string")
})
.toTemplate();

fromCsv

fromCsv(config):

TabularPipelineBuilder<DictType<StringType, StructType>, Inputs>

Parse a

BlobType Stream containing CSV data to construct a tabular Stream.

Type parameters

NameType
Sextends Record

Parameters

NameTypeDescription
configObjectthe configuration of the CSV parsing
config.delimiter?stringThe delimiter to seperate columns (default ",")
config.fieldsSThe field types to parse
config.newline?stringThe delimiter to seperate rows (default "\n", "\r\n"or"\r"`)
config.null_str?stringThe str used for empty values
config.output_key(fields: { [K in string | number | symbol]: Variable }, inputs: Inputs) => EastFunctionoutput key for the parsed data
config.skip_n?bigintSkip this many rows from the top of the file

Returns

TabularPipelineBuilder<DictType<StringType, StructType>, Inputs>

a new

PipelineBuilder

Example

 const username = Stream("Username", StringType);

const pipeline = new PipelineBuilder("BasicAuth")
.input({ name: "password", stream: password })
.fromCsv({
fields: {
float: FloatType,
},
skip_n: 20n,
delimiter: "|",
output_key: (fields, inputs) => StringJoin`${fields.float}.${inputs.password}`
})
.toTemplate();

fromJson

fromJson(config):

GenericPipelineBuilder

Parse a

BlobType Stream containing JSON data to construct a Stream.

Type parameters

NameType
Textends EastType

Parameters

NameTypeDescription
configObjectthe configuration of the JSON parsing
config.typeTThe type to parse

Returns

GenericPipelineBuilder

a new

PipelineBuilder

Example

 const username = Stream("Username", StringType);

const pipeline = new PipelineBuilder("BasicAuth")
.fromJson({
type: StructType({
float: FloatType,
}),
})
.toTemplate();

fromJsonLines

fromJsonLines(config):

TabularPipelineBuilder<DictType<StringType, StructType>, Inputs>

Parse a

BlobType Stream containing JSONLines data to construct a tabular Stream.

Type parameters

NameType
Sextends Record

Parameters

NameTypeDescription
configObjectthe configuration of the JSONLines parsing
config.fieldsSThe field types to parse
config.output_key(fields: { [K in string | number | symbol]: Variable }, inputs: Inputs) => EastFunctionoutput key for the parsed data

Returns

TabularPipelineBuilder<DictType<StringType, StructType>, Inputs>

a new

PipelineBuilder

Example

 const username = Stream("Username", StringType);

const pipeline = new PipelineBuilder("BasicAuth")
.input({ name: "password", stream: password })
.fromJsonLines({
fields: {
float: FloatType,
},
output_key: (fields, inputs) => StringJoin`${fields.float}.${inputs.password}`
})
.toTemplate();

fromXlsx

fromXlsx(config):

TabularPipelineBuilder<DictType<StringType, StructType>, Inputs>

Parse a

BlobType Stream containing XLSX data to construct a tabular Stream.

Type parameters

NameType
Sextends Record

Parameters

NameTypeDescription
configObjectthe configuration of the XLSX parsing
config.fieldsSThe field types to parse
config.null_str?stringThe str used for empty values
config.output_key(fields: { [K in string | number | symbol]: Variable }, inputs: Inputs) => EastFunctionoutput key for the parsed data
config.sheet?stringThe worksheet containing the data

Returns

TabularPipelineBuilder<DictType<StringType, StructType>, Inputs>

a new

PipelineBuilder

Example

 const username = Stream("Username", StringType);

const pipeline = new PipelineBuilder("BasicAuth")
.input({ name: "password", stream: password })
.fromXlsx({
fields: {
float: FloatType,
},
sheet: "Sheet2",
output_key: (fields, inputs) => StringJoin`${fields.float}.${inputs.password}`
})
.toTemplate();

input

input(config):

BlobPipelineBuilder<Output, Inputs & { [K in string]: Variable }>

Add an additional named input

Stream to the Pipeline.

Type parameters

NameType
Nameextends string
Textends EastType

Parameters

NameTypeDescription
configObjectthe input stream and the resulting variable name
config.nameName extends "input" | keyof Inputs ? never : Namethe name to give the config Variable
config.streamStreamthe input stream configuration (the stream and associated preconditions)

Returns

BlobPipelineBuilder<Output, Inputs & { [K in string]: Variable }>

a new

PipelineBuilder

Example

 const username = Stream("Username", StringType);
const password = Stream("Password", StringType);

const pipeline = new PipelineBuilder("BasicAuth")
.from(username)
.input({ name: "password", stream: password })
.toTemplate();

let

let(name):

BlobPipelineBuilder<Output, Inputs & { [K in string]: Variable }>

Give a name to the current

Pipeline output to be used as an input later in the Pipeline (i.e. after the next operation)

Type parameters

NameType
Nameextends string

Parameters

NameTypeDescription
nameName extends "input" | keyof Inputs ? never : Namethe input stream and the resulting variable name

Returns

BlobPipelineBuilder<Output, Inputs & { [K in string]: Variable }>

a new

PipelineBuilder

Example

 const username = Stream("Username", StringType);
const password = Stream("Password", StringType);

const pipeline = new PipelineBuilder("BasicAuth")
.from(username)
.let("username")
.input({ name: "password", stream: password })
.transform((username, { password }) => StringJoin`${username}:${password}`)
.transform((str, inputs) => Struct({
Username: inputs.username,
Hash: str
}))
.toTemplate();

log

log(config):

BlobPipelineBuilder

Produce a log message depending on the pipeline inputs and ouputs. When the if the predicate returns true the pipeline will produce a log message and proceed.

Parameters

NameTypeDescription
configObjectthe log message and optional predicate
config.if?(value: Variable, inputs: Inputs) => EastFunctionIf true a log message will be produced (optional)
config.messagestring | (value: Variable, inputs: Inputs) => EastFunctionThe log message

Returns

BlobPipelineBuilder

Example

 const username = Stream("Username", StringType);
const password = Stream("Password", StringType);

const pipeline = new PipelineBuilder("BasicAuth")
.from(username)
.log({
if: username => Equal(username, ""),
message: "Username is empty"
})
.input({ name: "password", stream: password })
.transform((username, { password }) => StringJoin`${username}:${password}`)
.log({
if: str => Equal(str, ":"),
message: () => Const("Unexpected string")
})
.toTemplate();

outputStream

outputStream():

Stream

Return the

Stream containing the output of the pipeline.

Returns

Stream

Example

// Create a datastream where a password can be written by end-user at runtime.
const hourly = new SourceBuilder("DatabasePassword")
.writeable(StringType)
.outputStream()

toTemplate

toTemplate():

Template

Convert the built pipeline into an

Template, for usage in an EDK project.

Returns

Template

a

Template

Example

 const username = Stream("Username", StringType);

const template = new PipelineBuilder("BasicAuth")
.from(username)
.toTemplate();

Overrides

Builder.toTemplate


transform

transform(f): ReturnType["type"] extends

DictType ? TabularPipelineBuilder : GenericPipelineBuilder<ReturnType["type"], Inputs>

Transform the entire input

Stream based on an EastFunction.

Type parameters

NameType
Fextends (value: Variable, inputs: Inputs) => EastFunction

Parameters

NameTypeDescription
fFan EastFunction function that generates the output Expression

Returns

ReturnType["type"] extends

DictType ? TabularPipelineBuilder : GenericPipelineBuilder<ReturnType["type"], Inputs>

a new

PipelineBuilder

Example

 const username = Stream("Username", StringType);
const password = Stream("Password", StringType);

const pipeline = new PipelineBuilder("BasicAuth")
.from(username)
.input({ name: "password", stream: password })
.transform((username, { password }) => StringJoin`${username}:${password}`)
.toTemplate();

warn

warn(config):

BlobPipelineBuilder

Add a warning on the pipeline inputs and ouputs to identify problems. When the if predicate returns true the pipeline will register a warning with a message, but will proceed to proceed to produce output data.

Parameters

NameTypeDescription
configObjectthe warning message and predicate
config.if(value: Variable, inputs: Inputs) => EastFunctionIf true a warning will be produced
config.messagestring | (value: Variable, inputs: Inputs) => EastFunctionThe message in the case that a warning is produced

Returns

BlobPipelineBuilder

Example

 const username = Stream("Username", StringType);
const password = Stream("Password", StringType);

const pipeline = new PipelineBuilder("BasicAuth")
.from(username)
.warn({
if: username => Equal(username, ""),
message: "Username is empty"
})
.input({ name: "password", stream: password })
.transform((username, { password }) => StringJoin`${username}:${password}`)
.warn({
if: str => Equal(str, ":"),
message: () => Const("Unexpected string")
})
.toTemplate();