Source - SourceBuilder
Source.SourceBuilderA helper class to build a data
Source, which populates a data Stream with data. Data sources can range from simple clocks to complex database queries or API requests.A corresponding
Template can be created using.toTemplate()
.
Remarks
See
Ingest Structured Data for a related learning module.Example
// Create a datastream with an integer value that can be provided by users
export default new SourceBuilder("Hourly")
.writeable(IntegerType)
.toTemplate()
Type parameters
Name | Type |
---|---|
Inputs | extends Record = |
Other
file
▸ file(config
):
GenericWriteableSourceBuilder
Build a writeable source of type BlobType
, consisting of a blob datastream populated with a file on your local disk (which end-users can overwrite at runtime).
Parameters
Name | Type | Description |
---|---|---|
config | Object | The configutation of the file datasource. |
config.path | string | - |
Returns
GenericWriteableSourceBuilder
a new
SourceBuilderExample
const source = new SourceBuilder("FileSource")
.file({
path: "./my_big_csv_file.csv",
})
value
▸ value(config
): WriteableSourceBuilder
Build a writeable source, consisting of a datastream with an initial default value (which end-users can overwrite at runtime).
Type parameters
Name | Type |
---|---|
T | extends EastType |
Parameters
Name | Type | Description |
---|---|---|
config | Object | The configutation of the file datasource. |
config.type | T | The EastType of the value to be written to the stream. * |
config.value | ValueTypeOf | The value to be written to the stream. * |
Returns
WriteableSourceBuilder
a new
SourceBuilderExample
const source = new SourceBuilder("StringSource")
.value({
value: "a string"
})
writeable
▸ writeable(type
):
TabularWriteableSourceBuilder
Build a writeable source, consisting of a datastream that end-users can write values to at runtime.
Type parameters
Name | Type |
---|---|
T | extends DictType |
Parameters
Name | Type | Description |
---|---|---|
type | T | The EastType of the value that can be written to the stream. |
Returns
TabularWriteableSourceBuilder
a new
SourceBuilderExample
const source = new SourceBuilder("FloatSource")
.writeable(FloatType)
Source
constructor
• new SourceBuilder(name
, module?
):
SourceBuilder
Construct a SourceBuilder
to build a data
In an EDK project, a data source will usually be exported using .toTemplate()
.
Type parameters
Name | Type |
---|---|
Inputs | extends Record = |
Parameters
Name | Type | Description |
---|---|---|
name | string | the name of the Source |
module? | ModulePath | ModuleBuilder | - |
Returns
SourceBuilder
Example
// Create a datastream which is updated to the current time at the beginning of every hour
export default new SourceBuilder("Hourly")
.writeable(IntegerType)
.toTemplate()
api
▸ api(request
):
ApiSourceBuilder
Build source from a http get request
Type parameters
Name | Type |
---|---|
RU | extends (prev_response : { body : Variable <Nullable > ; failure : Variable ; headers : Variable <Nullable <DictType >> ; status_code : Variable }, attempts : Variable , inputs : Inputs ) => EastFunction |
RB | extends (prev_response : { body : Variable <Nullable > ; failure : Variable ; headers : Variable <Nullable <DictType >> ; status_code : Variable }, attempts : Variable , inputs : Inputs ) => EastFunction |
RH | extends (prev_response : { body : Variable <Nullable > ; failure : Variable ; headers : Variable <Nullable <DictType >> ; status_code : Variable }, attempts : Variable , inputs : Inputs ) => EastFunction <DictType > |
RR | extends (prev_response : { body : Variable <Nullable > ; failure : Variable ; headers : Variable <Nullable <DictType >> ; status_code : Variable }, attempts : Variable , inputs : Inputs ) => EastFunction |
RD | extends (prev_response : { body : Variable <Nullable > ; failure : Variable ; headers : Variable <Nullable <DictType >> ; status_code : Variable }, attempts : Variable , inputs : Inputs ) => EastFunction |
Parameters
Name | Type | Description |
---|---|---|
request | Object | the configuration of the http request |
request.body? | RB | An EastFunction for the request body |
request.cron? | string | The cron string used to determine when to trigger the clock source |
request.delay? | RD | An EastFunction for the request url |
request.headers? | RH | An EastFunction for the request headers |
request.retry? | RR | An EastFunction to determine if the request should be retired |
request.url | RU | An EastFunction for the request url |
Returns
ApiSourceBuilder
a new
SourceBuilderExample
// Create a blob datastream containing the response from a http get request, updated daily
export default new SourceBuilder("Hourly")
.api({
cron: "0 8 * * *",
url: () => Const("https://covid.ourworldindata.org/data/owid-covid-data.json"),
retry: (_, attempts) => Less(attempts, 5n)
})
.toTemplate()
clock
▸ clock(config
):
ClockSourceBuilder
<ReturnType
["type"
], Inputs
>
Build a clock source that runs based on a
cron string.Type parameters
Name | Type |
---|---|
V | extends (datetime : Variable , inputs : Inputs ) => EastFunction |
Parameters
Name | Type | Description |
---|---|---|
config | Object | the configuration of the clock source |
config.cron | string | The cron string used to determine when to trigger the clock source |
config.value | V | A function to generate an EastFunction for the output value based on the current time (optional) |
Returns
ClockSourceBuilder
<ReturnType
["type"
], Inputs
>
a new
SourceBuilderExample
// Create a datastream which is updated to the current time at the beginning of every hour
export default new SourceBuilder("Hourly")
.clock({
cron: "0 * * * *",
value: datetime => StringJoin`The current time is ${datetime}`
})
.toTemplate()
elara
▸ elara(config
):
ElaraSourceBuilder
Build a source that pulls data from a datastream in a different workspace in the same tenant.
Type parameters
Name | Type |
---|---|
T | extends EastType |
Parameters
Name | Type | Description |
---|---|---|
config | Object | an object containing the workspace name, stream name and type |
config.stream | string | - |
config.type | T | - |
config.workspace | string | - |
Returns
ElaraSourceBuilder
a new
SourceBuilderExample
// Create a datastream which whose value comes from the output of pipeline `my_pipeline` in another workspace called `my_other_workspace`.
export default new SourceBuilder("Elara")
.elara({
workspace: "my_other_workspace",
stream: "Pipeline.my_pipeline",
type: IntegerType,
})
.toTemplate()
error
▸ error(config
):
SourceBuilder
Check the source inputs for validity and identify any errors. When the if
predicate returns true
the source will be terminated with an error message and output data will not be produced.
Parameters
Name | Type | Description |
---|---|---|
config | Object | the error message and predicate |
config.if | (inputs : Inputs ) => EastFunction | If true an error will be created |
config.message | string | (inputs : Inputs ) => EastFunction | The message in the case that an error is created |
Returns
SourceBuilder
a new
SourceBuilderExample
const source = new SourceBuilder("Source")
.input({ name: "other", stream: other })
// ensure that other is not negative
.error({
if: ({ other }) => Less(other, 0),
message: () => Const("other is negative")
})
.clock({ cron: "0 * * * *" })
.toSource()
ftp
▸ ftp(config
):
FtpSourceBuilder
Build source from a file at an ftp or sftp uri
Type parameters
Name | Type |
---|---|
U | extends (inputs : Inputs ) => EastFunction |
P | extends (inputs : Inputs ) => EastFunction |
Parameters
Name | Type | Description |
---|---|---|
config | Object | the configuration of the ftp file |
config.cron? | string | The cron string used to determine when to trigger the clock source |
config.key? | U | An EastFunction for the ftp key |
config.password? | U | An EastFunction for the ftp password |
config.port? | P | An EastFunction for the ftp port |
config.uri | U | An EastFunction for the ftp uri |
config.username? | U | An EastFunction for the ftp username |
Returns
FtpSourceBuilder
a new
SourceBuilderExample
// Create a blob datastream containing the data in an ftp file, updated daily
export default new SourceBuilder("Hourly")
.ftp({
cron: "0 8 * * *",
uri: () => Const("ftp://ftp.bom.gov.au/anon/gen/clim_data/IDCKWCDEA0/tables/qld/applethorpe/applethorpe-200902.csv"),
})
.toTemplate()
input
▸ input(config
):
SourceBuilder
<Inputs
& { [K in string]: Variable }>
Add a stream as input to the SourceBuilder
, which can be used in
expressions to configure how the
Type parameters
Name | Type |
---|---|
Name | extends string |
T | extends EastType |
Parameters
Name | Type | Description |
---|---|---|
config | Object | the configuration for the input |
config.name | Name extends "input" | keyof Inputs ? never : Name | the name to give the input Variable |
config.stream | Stream | the input stream (the stream and associated preconditions) |
Returns
SourceBuilder
<Inputs
& { [K in string]: Variable }>
a new
SourceBuilderExample
const other = Stream("Other", FloatType)
const source = new SourceBuilder("Source")
.input({ name: "other", stream: other })
.clock({ cron: "0 * * * *" })
.toSource()
log
▸ log(config
):
SourceBuilder
Produce log messages based on the source inputs. When the if
predicate returns true
the source will register a log message and proceed to proceed to produce output data.
Parameters
Name | Type | Description |
---|---|---|
config | Object | the log message and optional predicate |
config.if? | (inputs : Inputs ) => EastFunction | If true a log message will be produced (optional) |
config.message | string | (inputs : Inputs ) => EastFunction | The log message |
Returns
SourceBuilder
a new
SourceBuilderExample
const source = new SourceBuilder("Source")
.input({ name: "other", stream: other })
// log a message whenever other is negative
.log({
if: ({ other }) => Less(other, 0),
message: () => Const("other is negative")
})
.clock({ cron: "0 * * * *" })
.toSource()
patch
▸ patch(base_stream
):
PatchSourceBuilder
Build a patch source that modify or update a "base" stream.
Type parameters
Name | Type |
---|---|
T | extends DictType |
Parameters
Name | Type | Description |
---|---|---|
base_stream | Stream | the "base" stream to apply a patch to |
Returns
PatchSourceBuilder
a new
SourceBuilderExample
// Create a datastream which whose value is a (writeable) patch applied to a base stream
export default new SourceBuilder("Patch")
.patch(base_stream)
.toTemplate()
s3
▸ s3(config
):
S3SourceBuilder
Build source from a file at an s3 path
Type parameters
Name | Type |
---|---|
U | extends (inputs : Inputs ) => EastFunction |
Parameters
Name | Type | Description |
---|---|---|
config | Object | the configuration of the s3 blob |
config.access_key_id | U | An EastFunction with the AWS access key ID |
config.bucket | U | An EastFunction with a bucket name containing the object |
config.cron? | string | The cron string used to determine when to trigger the clock source |
config.key | U | An EastFunction with the key of the object to get. |
config.region | U | An EastFunction with the aws region |
config.secret_access_key | U | An EastFunction with the AWS secret access key |
Returns
S3SourceBuilder
a new
SourceBuilderExample
// Create an s3 datastream
export default new SourceBuilder("My S3 Blob")
.s3({
region: (inputs) => Const("__REGION__"),
bucket: (inputs) => Const("__BUCKET__"),
key: (inputs) => Const("__KEY__"),
access_key_id: (inputs) => Const("__ACCESS_KEY_ID__"),
secret_access_key: (inputs) => Const("__SECRET_KEY__")
})
.toTemplate()
warn
▸ warn(config
):
SourceBuilder
Check the source inputs for validity and raise warnings as necessary. When the if
predicate returns true
the source will register a warning with a message, but will proceed to proceed to produce output data.
Parameters
Name | Type | Description |
---|---|---|
config | Object | the warning message and predicate |
config.if | (inputs : Inputs ) => EastFunction | If true a warning will be produced |
config.message | string | (inputs : Inputs ) => EastFunction | The message in the case that a warning is created |
Returns
SourceBuilder
a new
SourceBuilderExample
const source = new SourceBuilder("Source")
.input({ name: "other", stream: other })
// warn whenever other is not negative
.warn({
if: ({ other }) => Less(other, 0),
message: () => Const("other is negative")
})
.clock({ cron: "0 * * * *" })
.toSource()