Pipeline - BlobPipelineBuilder
Pipeline.BlobPipelineBuilderA BlobPipelineBuilder
to build a data
Methods on the BlobPipelineBuilder
enable you to directly transform data
from
any
transform
operation, or alternatively
parse files in formats such as jsonl, json, csv, xlsx.
Quality of data can be observed using assert
and warn
by providing expression based conditions
and messages to the whole
A corresponding
Template can be created using.toTemplate()
.
Remarks
See
Transform Data for a related learning module.Example
// get a predefined pipeline table
const file = Stream("File", BlobType);
const pipeline = new PipelineBuilder("ReadJson")
.from(file)
.error({
if: file => Equal(Size(file), 0n),
message: "File is empty"
})
.fromJson({
type: StructType({
float: FloatType,
}),
})
.toTemplate();
Type parameters
Name | Type |
---|---|
Output | extends BlobType |
Inputs | extends Record |
Hierarchy
-
Builder
↳
BlobPipelineBuilder
Pipeline
error
▸ error(config
):
BlobPipelineBuilder
Add an assertion on the pipeline inputs and output to identify errors. When the if
predicate returns true
the pipeline will be terminated with an error message and output data will not be produced.
Parameters
Name | Type | Description |
---|---|---|
config | Object | the error message and predicate |
config.if | (value : Variable , inputs : Inputs ) => EastFunction | If true an error will be created |
config.message | string | (value : Variable , inputs : Inputs ) => EastFunction | The message in the case that an error is created |
Returns
BlobPipelineBuilder
Example
const username = Stream("Username", StringType);
const password = Stream("Password", StringType);
const pipeline = new PipelineBuilder("BasicAuth")
.from(username)
.error({
if: username => Equal(username, ""),
message: "Username is empty"
})
.input({ name: "password", stream: password })
.transform((username, { password }) => StringJoin`${username}:${password}`)
.error({
if: str => Equal(str, ":"),
message: () => Const("Unexpected string")
})
.toTemplate();
fromCsv
▸ fromCsv(config
):
TabularPipelineBuilder
<DictType
<StringType
, StructType
>, Inputs
>
Parse a
BlobType Stream containing CSV data to construct a tabular Stream.Type parameters
Name | Type |
---|---|
S | extends Record |
Parameters
Name | Type | Description |
---|---|---|
config | Object | the configuration of the CSV parsing |
config.delimiter? | string | The delimiter to seperate columns (default "," ) |
config.fields | S | The field types to parse |
config.newline? | string | The delimiter to seperate rows (default "\n" , "\r\n"or "\r"`) |
config.null_str? | string | The str used for empty values |
config.output_key | (fields : { [K in string | number | symbol]: Variable }, inputs : Inputs ) => EastFunction | output key for the parsed data |
config.skip_n? | bigint | Skip this many rows from the top of the file |
Returns
TabularPipelineBuilder
<DictType
<StringType
, StructType
>, Inputs
>
a new
PipelineBuilderExample
const username = Stream("Username", StringType);
const pipeline = new PipelineBuilder("BasicAuth")
.input({ name: "password", stream: password })
.fromCsv({
fields: {
float: FloatType,
},
skip_n: 20n,
delimiter: "|",
output_key: (fields, inputs) => StringJoin`${fields.float}.${inputs.password}`
})
.toTemplate();
fromJson
▸ fromJson(config
):
GenericPipelineBuilder
Parse a
BlobType Stream containing JSON data to construct a Stream.Type parameters
Name | Type |
---|---|
T | extends EastType |
Parameters
Name | Type | Description |
---|---|---|
config | Object | the configuration of the JSON parsing |
config.type | T | The type to parse |
Returns
GenericPipelineBuilder
a new
PipelineBuilderExample
const username = Stream("Username", StringType);
const pipeline = new PipelineBuilder("BasicAuth")
.fromJson({
type: StructType({
float: FloatType,
}),
})
.toTemplate();
fromJsonLines
▸ fromJsonLines(config
):
TabularPipelineBuilder
<DictType
<StringType
, StructType
>, Inputs
>
Parse a
BlobType Stream containing JSONLines data to construct a tabular Stream.Type parameters
Name | Type |
---|---|
S | extends Record |
Parameters
Name | Type | Description |
---|---|---|
config | Object | the configuration of the JSONLines parsing |
config.fields | S | The field types to parse |
config.output_key | (fields : { [K in string | number | symbol]: Variable }, inputs : Inputs ) => EastFunction | output key for the parsed data |
Returns
TabularPipelineBuilder
<DictType
<StringType
, StructType
>, Inputs
>
a new
PipelineBuilderExample
const username = Stream("Username", StringType);
const pipeline = new PipelineBuilder("BasicAuth")
.input({ name: "password", stream: password })
.fromJsonLines({
fields: {
float: FloatType,
},
output_key: (fields, inputs) => StringJoin`${fields.float}.${inputs.password}`
})
.toTemplate();
fromXlsx
▸ fromXlsx(config
):
TabularPipelineBuilder
<DictType
<StringType
, StructType
>, Inputs
>
Parse a
BlobType Stream containing XLSX data to construct a tabular Stream.Type parameters
Name | Type |
---|---|
S | extends Record |
Parameters
Name | Type | Description |
---|---|---|
config | Object | the configuration of the XLSX parsing |
config.fields | S | The field types to parse |
config.null_str? | string | The str used for empty values |
config.output_key | (fields : { [K in string | number | symbol]: Variable }, inputs : Inputs ) => EastFunction | output key for the parsed data |
config.sheet? | string | The worksheet containing the data |
Returns
TabularPipelineBuilder
<DictType
<StringType
, StructType
>, Inputs
>
a new
PipelineBuilderExample
const username = Stream("Username", StringType);
const pipeline = new PipelineBuilder("BasicAuth")
.input({ name: "password", stream: password })
.fromXlsx({
fields: {
float: FloatType,
},
sheet: "Sheet2",
output_key: (fields, inputs) => StringJoin`${fields.float}.${inputs.password}`
})
.toTemplate();
input
▸ input(config
):
BlobPipelineBuilder
<Output
, Inputs
& { [K in string]: Variable }>
Add an additional named input
Stream to the Pipeline.Type parameters
Name | Type |
---|---|
Name | extends string |
T | extends EastType |
Parameters
Name | Type | Description |
---|---|---|
config | Object | the input stream and the resulting variable name |
config.name | Name extends "input" | keyof Inputs ? never : Name | the name to give the config Variable |
config.stream | Stream | the input stream configuration (the stream and associated preconditions) |
Returns
BlobPipelineBuilder
<Output
, Inputs
& { [K in string]: Variable }>
a new
PipelineBuilderExample
const username = Stream("Username", StringType);
const password = Stream("Password", StringType);
const pipeline = new PipelineBuilder("BasicAuth")
.from(username)
.input({ name: "password", stream: password })
.toTemplate();
let
▸ let(name
):
BlobPipelineBuilder
<Output
, Inputs
& { [K in string]: Variable }>
Give a name to the current
Pipeline output to be used as an input later in the Pipeline (i.e. after the next operation)Type parameters
Name | Type |
---|---|
Name | extends string |
Parameters
Name | Type | Description |
---|---|---|
name | Name extends "input" | keyof Inputs ? never : Name | the input stream and the resulting variable name |
Returns
BlobPipelineBuilder
<Output
, Inputs
& { [K in string]: Variable }>
a new
PipelineBuilderExample
const username = Stream("Username", StringType);
const password = Stream("Password", StringType);
const pipeline = new PipelineBuilder("BasicAuth")
.from(username)
.let("username")
.input({ name: "password", stream: password })
.transform((username, { password }) => StringJoin`${username}:${password}`)
.transform((str, inputs) => Struct({
Username: inputs.username,
Hash: str
}))
.toTemplate();
log
▸ log(config
):
BlobPipelineBuilder
Produce a log message depending on the pipeline inputs and ouputs. When the if
the predicate returns true
the
pipeline will produce a log message and proceed.
Parameters
Name | Type | Description |
---|---|---|
config | Object | the log message and optional predicate |
config.if? | (value : Variable , inputs : Inputs ) => EastFunction | If true a log message will be produced (optional) |
config.message | string | (value : Variable , inputs : Inputs ) => EastFunction | The log message |
Returns
BlobPipelineBuilder
Example
const username = Stream("Username", StringType);
const password = Stream("Password", StringType);
const pipeline = new PipelineBuilder("BasicAuth")
.from(username)
.log({
if: username => Equal(username, ""),
message: "Username is empty"
})
.input({ name: "password", stream: password })
.transform((username, { password }) => StringJoin`${username}:${password}`)
.log({
if: str => Equal(str, ":"),
message: () => Const("Unexpected string")
})
.toTemplate();
outputStream
▸ outputStream():
Stream
Return the
Stream containing the output of the pipeline.Returns
Stream
Example
// Create a datastream where a password can be written by end-user at runtime.
const hourly = new SourceBuilder("DatabasePassword")
.writeable(StringType)
.outputStream()
toTemplate
▸ toTemplate():
Template
Convert the built pipeline into an
Template, for usage in an EDK project.Returns
Template
a
TemplateExample
const username = Stream("Username", StringType);
const template = new PipelineBuilder("BasicAuth")
.from(username)
.toTemplate();
Overrides
Builder.toTemplate
transform
▸ transform(f
): ReturnType
["type"
] extends
DictType
? TabularPipelineBuilder
: GenericPipelineBuilder
<ReturnType
["type"
], Inputs
>
Transform the entire input
Stream based on an EastFunction.Type parameters
Name | Type |
---|---|
F | extends (value : Variable , inputs : Inputs ) => EastFunction |
Parameters
Name | Type | Description |
---|---|---|
f | F | an EastFunction function that generates the output Expression |
Returns
ReturnType
["type"
] extends
DictType
? TabularPipelineBuilder
: GenericPipelineBuilder
<ReturnType
["type"
], Inputs
>
a new
PipelineBuilderExample
const username = Stream("Username", StringType);
const password = Stream("Password", StringType);
const pipeline = new PipelineBuilder("BasicAuth")
.from(username)
.input({ name: "password", stream: password })
.transform((username, { password }) => StringJoin`${username}:${password}`)
.toTemplate();
warn
▸ warn(config
):
BlobPipelineBuilder
Add a warning on the pipeline inputs and ouputs to identify problems. When the if
predicate returns true
the
pipeline will register a warning with a message, but will proceed to proceed to produce output data.
Parameters
Name | Type | Description |
---|---|---|
config | Object | the warning message and predicate |
config.if | (value : Variable , inputs : Inputs ) => EastFunction | If true a warning will be produced |
config.message | string | (value : Variable , inputs : Inputs ) => EastFunction | The message in the case that a warning is produced |
Returns
BlobPipelineBuilder
Example
const username = Stream("Username", StringType);
const password = Stream("Password", StringType);
const pipeline = new PipelineBuilder("BasicAuth")
.from(username)
.warn({
if: username => Equal(username, ""),
message: "Username is empty"
})
.input({ name: "password", stream: password })
.transform((username, { password }) => StringJoin`${username}:${password}`)
.warn({
if: str => Equal(str, ":"),
message: () => Const("Unexpected string")
})
.toTemplate();