Disclaimer: This is not an official Google product.
Data Tasks Coordinator (code name Sentinel) is a framework that organizes different tasks into an automatic data pipeline, plus a library that conducts interactive installation processes based on configurations.
This solution is not a general purposed orchestration solution. It is designed for marketing solutions with some specified tasks, mainly related to BigQuery.
Task
: A data processing element for a specific objective, e.g. loading data
from Cloud Storage to BigQuery.
Data pipeline
: A series
of connected data processing tasks. The output of one task is the input of the
next one. The tasks of a pipeline are often executed in parallel or in
time-sliced fashion.
Event-driven programming
: A programming paradigm
in which events, such as user activities or execution results from other
programming threads, determine the flow of a program's execution.
A task
can be triggered by a Pub/Sub message.
In most cases, a Cloud Scheduler job created by deploy.sh
will send those
messages regularly to trigger the first task. After that, Sentinel will send
messages to trigger next tasks.
- Message attribute:
taskId
- Message body: the JSON string of the parameter object that will be passed into the task to start.
Task definitions and sql files support parameters in this format:
${parameterName}
. The placeholders will be replaced with the value of the
parameterName
in the passed-in parameter JSON object.
Embedded parameters are supported, e.g. ${parameter.name}
.
Each task will pass a parameter object to its next task(s). The passed-on
parameter object is the merge result of the parameter that it receives, the new
parameters that it generates, and the parameters from the configuration item
appendedParameters
if it exists.
{
"foo": {
"type": "query",
"source": {
...
},
"destination": {
...
},
"options": {
...
},
"next": "bar"
}
}
Properties:
-
foo
is the task name. -
type
, task type. Different types define the details of the task also have different configurations. -
source
,destination
andoptions
are configurations. Refer to the detailed tasks for more information. -
next
, defines what next task(s) will be started after the current one completed, in this case, taskbar
will be started afterfoo
.
See config_task.json.template
for templates of tasks.
In a Cloud Shell:
- clone the source code;
- enter the source code folder, edit the task configuration JSON file;
- run
chmod a+x ./deploy.sh; ./deploy.sh
.
{
"load_job": {
"type": "load",
"source": {
"file": {
"bucket": "[YOUR_STORAGE_BUCKET_ID]",
"name": "[YOUR_FILENAME]"
}
},
"destination": {
"table": {
"projectId": "[YOUR_CLOUD_PROJECT_ID]",
"datasetId": "[YOUR_BIGQUERY_DATASET_ID]",
"tableId": "[YOUR_BIGQUERY_TABLE_ID]",
"location": "[YOUR_BIGQUERY_LOCATION_ID]"
},
"tableSchema": {
"schema": {
"fields": [
{
"mode": "NULLABLE",
"name": "[YOUR_BIGQUERY_TABLE_COLUMN_1_NAME]",
"type": "[YOUR_BIGQUERY_TABLE_COLUMN_1_TYPE]"
},
{
"mode": "NULLABLE",
"name": "[YOUR_BIGQUERY_TABLE_COLUMN_2_NAME]",
"type": "[YOUR_BIGQUERY_TABLE_COLUMN_2_TYPE]"
}
]
}
}
},
"options": {
"sourceFormat": "CSV",
"writeDisposition": "WRITE_TRUNCATE",
"skipLeadingRows": 1,
"autodetect": false
}
}
}
{
"load_job": {
"type": "load",
"source": {
"file": {
"bucket": "[YOUR_STORAGE_BUCKET_ID]",
"name": "[YOUR_FILENAME]"
}
},
"destination": {
"table": {
"projectId": "[YOUR_CLOUD_PROJECT_ID]",
"datasetId": "[YOUR_BIGQUERY_DATASET_ID]",
"tableId": "targetTable$${partitionDay}",
"location": "[YOUR_BIGQUERY_LOCATION_ID]"
}
},
"options": {
"sourceFormat": "CSV",
"writeDisposition": "WRITE_TRUNCATE",
"skipLeadingRows": 1,
"autodetect": true
}
}
}
{
"query_job_sql": {
"type": "query",
"source": {
"sql": "[YOUR_QUERY_SQL]"
},
"destination": {
"table": {
"projectId": "[YOUR_CLOUD_PROJECT_ID]",
"datasetId": "[YOUR_BIGQUERY_DATASET_ID]",
"tableId": "[YOUR_BIGQUERY_TABLE_ID]"
},
"writeDisposition": "WRITE_TRUNCATE"
}
}
}
{
"query_job_gcs": {
"type": "query",
"source": {
"file": {
"bucket": "[YOUR_BUCKET_FOR_SQL_FILE]",
"name": "[YOUR_SQL_FILE_FULL_PATH_NAME]"
}
},
"destination": {
"table": {
"projectId": "[YOUR_CLOUD_PROJECT_ID]",
"datasetId": "[YOUR_BIGQUERY_DATASET_ID]",
"tableId": "[YOUR_BIGQUERY_TABLE_ID]"
},
"writeDisposition": "WRITE_TRUNCATE"
}
}
}
{
"export_job": {
"type": "export",
"source": {
"projectId": "[YOUR_CLOUD_PROJECT_ID]",
"datasetId": "[YOUR_BIGQUERY_DATASET_ID]",
"tableId": "[YOUR_BIGQUERY_TABLE_ID]",
"location": "[YOUR_BIGQUERY_LOCATION_ID]"
},
"destination": {
"bucket": "[YOUR_BUCKET_FOR_EXPORTED_FILE]",
"name": "[YOUR_FULL_PATH_NAME_FOR_EXPORTED_FILE]"
},
"options": {
"destinationFormat": "NEWLINE_DELIMITED_JSON",
"printHeader": false
}
}
}
{
"export_for_tentacles": {
"type": "export",
"source": {
"projectId": "${destinationTable.projectId}",
"datasetId": "${destinationTable.datasetId}",
"tableId": "${destinationTable.tableId}",
"location": "#DATASET_LOCATION#"
},
"destination": {
"bucket": "#GCS_BUCKET#",
"name": "#OUTBOUND#/API[MP]_config[test]_${partitionDay}.ndjson"
},
"options": {
"destinationFormat": "NEWLINE_DELIMITED_JSON",
"printHeader": false
}
}
}