Skip to main content

Source - SourceBuilder

Source.SourceBuilder

A helper class to build a data

Source, which populates a data Stream with data. Data sources can range from simple clocks to complex database queries or API requests.

A corresponding

Template can be created using .toTemplate().

Remarks

See

Ingest Structured Data for a related learning module.

Example

// Create a datastream with an integer value that can be provided by users
export default new SourceBuilder("Hourly")
.writeable(IntegerType)
.toTemplate()

Type parameters

NameType
Inputsextends Record =

Other

file

file(config):

GenericWriteableSourceBuilder

Build a writeable source of type BlobType, consisting of a blob datastream populated with a file on your local disk (which end-users can overwrite at runtime).

Parameters

NameTypeDescription
configObjectThe configutation of the file datasource.
config.pathstring-

Returns

GenericWriteableSourceBuilder

a new

SourceBuilder

Example

 const source = new SourceBuilder("FileSource")
.file({
path: "./my_big_csv_file.csv",
})

value

value(config): WriteableSourceBuilder

Build a writeable source, consisting of a datastream with an initial default value (which end-users can overwrite at runtime).

Type parameters

NameType
Textends EastType

Parameters

NameTypeDescription
configObjectThe configutation of the file datasource.
config.typeTThe EastType of the value to be written to the stream. *
config.valueValueTypeOfThe value to be written to the stream. *

Returns

WriteableSourceBuilder

a new

SourceBuilder

Example

 const source = new SourceBuilder("StringSource")
.value({
value: "a string"
})

writeable

writeable(type):

TabularWriteableSourceBuilder

Build a writeable source, consisting of a datastream that end-users can write values to at runtime.

Type parameters

NameType
Textends DictType

Parameters

NameTypeDescription
typeTThe EastType of the value that can be written to the stream.

Returns

TabularWriteableSourceBuilder

a new

SourceBuilder

Example

 const source = new SourceBuilder("FloatSource")
.writeable(FloatType)

Source

constructor

new SourceBuilder(name, module?):

SourceBuilder

Construct a SourceBuilder to build a data

Source, which populates a data Stream with data. Data sources can range from simple clocks to complex database queries or API requests.

In an EDK project, a data source will usually be exported using .toTemplate().

Type parameters

NameType
Inputsextends Record =

Parameters

NameTypeDescription
namestringthe name of the Source
module?ModulePath | ModuleBuilder-

Returns

SourceBuilder

Example

// Create a datastream which is updated to the current time at the beginning of every hour
export default new SourceBuilder("Hourly")
.writeable(IntegerType)
.toTemplate()

api

api(request):

ApiSourceBuilder

Build source from a http get request

Type parameters

NameType
RUextends (prev_response: { body: Variable<Nullable> ; failure: Variable ; headers: Variable<Nullable<DictType>> ; status_code: Variable }, attempts: Variable, inputs: Inputs) => EastFunction
RBextends (prev_response: { body: Variable<Nullable> ; failure: Variable ; headers: Variable<Nullable<DictType>> ; status_code: Variable }, attempts: Variable, inputs: Inputs) => EastFunction
RHextends (prev_response: { body: Variable<Nullable> ; failure: Variable ; headers: Variable<Nullable<DictType>> ; status_code: Variable }, attempts: Variable, inputs: Inputs) => EastFunction<DictType>
RRextends (prev_response: { body: Variable<Nullable> ; failure: Variable ; headers: Variable<Nullable<DictType>> ; status_code: Variable }, attempts: Variable, inputs: Inputs) => EastFunction
RDextends (prev_response: { body: Variable<Nullable> ; failure: Variable ; headers: Variable<Nullable<DictType>> ; status_code: Variable }, attempts: Variable, inputs: Inputs) => EastFunction

Parameters

NameTypeDescription
requestObjectthe configuration of the http request
request.body?RBAn EastFunction for the request body
request.cron?stringThe cron string used to determine when to trigger the clock source
request.delay?RDAn EastFunction for the request url
request.headers?RHAn EastFunction for the request headers
request.retry?RRAn EastFunction to determine if the request should be retired
request.urlRUAn EastFunction for the request url

Returns

ApiSourceBuilder

a new

SourceBuilder

Example

// Create a blob datastream containing the response from a http get request, updated daily
export default new SourceBuilder("Hourly")
.api({
cron: "0 8 * * *",
url: () => Const("https://covid.ourworldindata.org/data/owid-covid-data.json"),
retry: (_, attempts) => Less(attempts, 5n)
})
.toTemplate()

clock

clock(config):

ClockSourceBuilder<ReturnType["type"], Inputs>

Build a clock source that runs based on a

cron string.

Type parameters

NameType
Vextends (datetime: Variable, inputs: Inputs) => EastFunction

Parameters

NameTypeDescription
configObjectthe configuration of the clock source
config.cronstringThe cron string used to determine when to trigger the clock source
config.valueVA function to generate an EastFunction for the output value based on the current time (optional)

Returns

ClockSourceBuilder<ReturnType["type"], Inputs>

a new

SourceBuilder

Example

// Create a datastream which is updated to the current time at the beginning of every hour
export default new SourceBuilder("Hourly")
.clock({
cron: "0 * * * *",
value: datetime => StringJoin`The current time is ${datetime}`
})
.toTemplate()

elara

elara(config):

ElaraSourceBuilder

Build a source that pulls data from a datastream in a different workspace in the same tenant.

Type parameters

NameType
Textends EastType

Parameters

NameTypeDescription
configObjectan object containing the workspace name, stream name and type
config.streamstring-
config.typeT-
config.workspacestring-

Returns

ElaraSourceBuilder

a new

SourceBuilder

Example

// Create a datastream which whose value comes from the output of pipeline `my_pipeline` in another workspace called `my_other_workspace`.
export default new SourceBuilder("Elara")
.elara({
workspace: "my_other_workspace",
stream: "Pipeline.my_pipeline",
type: IntegerType,
})
.toTemplate()

error

error(config):

SourceBuilder

Check the source inputs for validity and identify any errors. When the if predicate returns true the source will be terminated with an error message and output data will not be produced.

Parameters

NameTypeDescription
configObjectthe error message and predicate
config.if(inputs: Inputs) => EastFunctionIf true an error will be created
config.messagestring | (inputs: Inputs) => EastFunctionThe message in the case that an error is created

Returns

SourceBuilder

a new

SourceBuilder

Example

 const source = new SourceBuilder("Source")
.input({ name: "other", stream: other })
// ensure that other is not negative
.error({
if: ({ other }) => Less(other, 0),
message: () => Const("other is negative")
})
.clock({ cron: "0 * * * *" })
.toSource()

ftp

ftp(config):

FtpSourceBuilder

Build source from a file at an ftp or sftp uri

Type parameters

NameType
Uextends (inputs: Inputs) => EastFunction
Pextends (inputs: Inputs) => EastFunction

Parameters

NameTypeDescription
configObjectthe configuration of the ftp file
config.cron?stringThe cron string used to determine when to trigger the clock source
config.key?UAn EastFunction for the ftp key
config.password?UAn EastFunction for the ftp password
config.port?PAn EastFunction for the ftp port
config.uriUAn EastFunction for the ftp uri
config.username?UAn EastFunction for the ftp username

Returns

FtpSourceBuilder

a new

SourceBuilder

Example

// Create a blob datastream containing the data in an ftp file, updated daily
export default new SourceBuilder("Hourly")
.ftp({
cron: "0 8 * * *",
uri: () => Const("ftp://ftp.bom.gov.au/anon/gen/clim_data/IDCKWCDEA0/tables/qld/applethorpe/applethorpe-200902.csv"),
})
.toTemplate()

input

input(config):

SourceBuilder<Inputs & { [K in string]: Variable }>

Add a stream as input to the SourceBuilder, which can be used in expressions to configure how the

Source will access the data (such as a URL or password), the output data, or any data assertions (preconditions and postconditions).

Type parameters

NameType
Nameextends string
Textends EastType

Parameters

NameTypeDescription
configObjectthe configuration for the input
config.nameName extends "input" | keyof Inputs ? never : Namethe name to give the input Variable
config.streamStreamthe input stream (the stream and associated preconditions)

Returns

SourceBuilder<Inputs & { [K in string]: Variable }>

a new

SourceBuilder

Example

 const other = Stream("Other", FloatType)
const source = new SourceBuilder("Source")
.input({ name: "other", stream: other })
.clock({ cron: "0 * * * *" })
.toSource()

log

log(config):

SourceBuilder

Produce log messages based on the source inputs. When the if predicate returns true the source will register a log message and proceed to proceed to produce output data.

Parameters

NameTypeDescription
configObjectthe log message and optional predicate
config.if?(inputs: Inputs) => EastFunctionIf true a log message will be produced (optional)
config.messagestring | (inputs: Inputs) => EastFunctionThe log message

Returns

SourceBuilder

a new

SourceBuilder

Example

 const source = new SourceBuilder("Source")
.input({ name: "other", stream: other })
// log a message whenever other is negative
.log({
if: ({ other }) => Less(other, 0),
message: () => Const("other is negative")
})
.clock({ cron: "0 * * * *" })
.toSource()

patch

patch(base_stream):

PatchSourceBuilder

Build a patch source that modify or update a "base" stream.

Type parameters

NameType
Textends DictType

Parameters

NameTypeDescription
base_streamStreamthe "base" stream to apply a patch to

Returns

PatchSourceBuilder

a new

SourceBuilder

Example

// Create a datastream which whose value is a (writeable) patch applied to a base stream
export default new SourceBuilder("Patch")
.patch(base_stream)
.toTemplate()

s3

s3(config):

S3SourceBuilder

Build source from a file at an s3 path

Type parameters

NameType
Uextends (inputs: Inputs) => EastFunction

Parameters

NameTypeDescription
configObjectthe configuration of the s3 blob
config.access_key_idUAn EastFunction with the AWS access key ID
config.bucketUAn EastFunction with a bucket name containing the object
config.cron?stringThe cron string used to determine when to trigger the clock source
config.keyUAn EastFunction with the key of the object to get.
config.regionUAn EastFunction with the aws region
config.secret_access_keyUAn EastFunction with the AWS secret access key

Returns

S3SourceBuilder

a new

SourceBuilder

Example

// Create an s3 datastream
export default new SourceBuilder("My S3 Blob")
.s3({
region: (inputs) => Const("__REGION__"),
bucket: (inputs) => Const("__BUCKET__"),
key: (inputs) => Const("__KEY__"),
access_key_id: (inputs) => Const("__ACCESS_KEY_ID__"),
secret_access_key: (inputs) => Const("__SECRET_KEY__")
})
.toTemplate()

warn

warn(config):

SourceBuilder

Check the source inputs for validity and raise warnings as necessary. When the if predicate returns true the source will register a warning with a message, but will proceed to proceed to produce output data.

Parameters

NameTypeDescription
configObjectthe warning message and predicate
config.if(inputs: Inputs) => EastFunctionIf true a warning will be produced
config.messagestring | (inputs: Inputs) => EastFunctionThe message in the case that a warning is created

Returns

SourceBuilder

a new

SourceBuilder

Example

 const source = new SourceBuilder("Source")
.input({ name: "other", stream: other })
// warn whenever other is not negative
.warn({
if: ({ other }) => Less(other, 0),
message: () => Const("other is negative")
})
.clock({ cron: "0 * * * *" })
.toSource()