Data Transformation
Transformer Definition
During data synchronization and transmission, users may have customized requirements for data processing, such as trimming columns or transforming column values. This can be achieved through the T (Transformer) process in ETL. Addax includes a Transformer module that allows for flexible data transformation by defining a series of UDFs (User-Defined Functions).
Execution Model
graph LR
source(("source"))
subgraph fr["Addax Framework"]
direction LR
Reader ==> Transformer ==>Writer
end
target(("target"))
source ==> fr ==> targetUDF Functions
dx_substr
dx_substr(idx, pos, length) -> str
Parameters
idx: The index of the field in the record.pos: The starting position within the field's value.length: The length of the target substring.
Returns: A substring of the specified length from the specified starting position (inclusive). An exception is thrown if the starting position is invalid. If the field is null, it is returned directly (i.e., this transformer does not process it).
dx_pad
dx_pad(idx, flag, length, chr)
Parameters
idx: The index of the field in the record.flag: "l" or "r", indicating whether to pad at the beginning (left) or the end (right).length: The target length of the field.chr: The character to use for padding.
Returns: If the source string's length is less than the target length, it returns the string after padding. If it's longer, it is truncated (always from the right). If the field is null, it is converted to an empty string before padding.
Examples:
dx_pad(1, "l", "4", "A"): Ifcolumn 1's value isxyz, the transformed value isAxyz. If the value isxyzzzzz, it becomesxyzz.dx_pad(1, "r", "4", "A"): Ifcolumn 1's value isxyz, the transformed value isxyzA. If the value isxyzzzzz, it becomesxyzz.
dx_replace
dx_replace(idx, pos, length, str) -> str
Parameters
idx: The index of the field in the record.pos: The starting position within the field's value.length: The length of the substring to be replaced.str: The string to replace with.
Returns: Replaces a substring of a specified length from a specified starting position (inclusive). An exception is thrown if the starting position is invalid. If the field is null, it is returned directly (i.e., this transformer does not process it).
Examples:
dx_replace(1, "2", "4", "****"): Ifcolumn 1's value isaddaxTest, it is transformed toda****est.dx_replace(1, "5", "10", "****"): Ifcolumn 1's value isaddaxTest, it is transformed todata****.
dx_filter
dx_filter(idx, operator, expr) -> str
Parameters:
idx: The index of the field in the record.operator: The operator. Supported operators arelike,not like,>,=,<,>=,!=,<=.expr: A regular expression (Java-style) or a value.
Returns:
- If the condition is met, it returns
null, which filters out the entire row. If the condition is not met, the row is kept. likeandnot like: The field is converted to a string and then fully matched against the target regular expression.>,=,<,>=,!=,<=: Comparison is performed based on the data type. Numeric types are compared by value; string and boolean types are compared lexicographically.- If the target field is
null, it will satisfy the= nullfilter condition and be filtered out. For the!= nullcondition,nulldoes not satisfy the filter condition and is not filtered. Forlike, if the field isnull, it is not filtered.
Examples:
dx_filter(1, "like", "dataTest")dx_filter(1, ">=", "10")
Compound filters (i.e., conditions involving multiple fields) are not currently supported as the function parameters would be too complex for users.
dx_groovy
dx_groovy(code, package) -> record
Parameters
code: Code that conforms to Groovy syntax.package:extraPackage, which can be a list or empty.
Returns
A Record data type.
Notes:
dx_groovycan only be called once per transformer configuration. Multiple calls are not allowed.- The
groovy codesupports packages fromjava.langandjava.util. Objects that can be directly referenced includerecordand various column types underelement(BoolColumn.class, BytesColumn.class, DateColumn.class, DoubleColumn.class, LongColumn.class, StringColumn.class). Other packages are not supported by default. If you need to use other packages, you can setextraPackage. Note thatextraPackagedoes not support third-party JARs. - In the
groovy code, you must return the updatedRecord(e.g.,record.setColumn(columnIndex, new StringColumn(newValue));) ornull. Returningnullfilters out the current row. - You can directly call static utility methods (GroovyTransformerStaticUtil).
Examples:
Groovy implementation of subStr:
String code="Column column = record.getColumn(1);\n"+
" String oriValue = column.asString();\n"+
" String newValue = oriValue.substring(0, 3);\n"+
" record.setColumn(1, new StringColumn(newValue));\n"+
" return record;";
dx_groovy(code); // Note: The original doc had `dx_groovy(record)` which is incorrect. It should be the code string.Groovy implementation of replace:
String code2="Column column = record.getColumn(1);\n"+
" String oriValue = column.asString();\n"+
" String newValue = \"\*\*\*\*\" + oriValue.substring(3, oriValue.length());\n"+
" record.setColumn(1, new StringColumn(newValue));\n"+
" return record;";Groovy implementation of pad:
String code3="Column column = record.getColumn(1);\n"+
" String oriValue = column.asString();\n"+
" String padString = \"12345\";\n"+
" String finalPad = \"\";\n"+
" int NeedLength = 8 - oriValue.length();\n"+
" while (NeedLength > 0) {\n"+
"\n"+
" if (NeedLength >= padString.length()) {\n"+
" finalPad += padString;\n"+
" NeedLength -= padString.length();\n"+
" } else {\n"+
" finalPad += padString.substring(0, NeedLength);\n"+
" NeedLength = 0;\n"+
" }\n"+
" }\n"+
" String newValue= finalPad + oriValue;\n"+
" record.setColumn(1, new StringColumn(newValue));\n"+
" return record;";Starting from version 4.1.2, dx_groovy supports loading Groovy code from an external file. The file is read relative to the $ADDAX_HOME directory, which is the installation directory of Addax.
For example, to implement subStr, you can create a file job/substr.groovy with the following content:
Column column = record.getColumn(1)
String oriValue = column.asString()
String newValue = oriValue.substring(0, 3)
record.setColumn(1, new StringColumn(newValue))
return recordThen, define it in the job file like this:
{
"transformer": [
{
"name": "dx_groovy",
"parameter": {
"codeFile": "job/substr.groovy"
}
}
]
}You can also specify an absolute path for the file.
Job Definition
In this example, four UDFs are configured.
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": {
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "My name is xxxx",
"type": "string"
},
{
"value": "password is Passw0rd",
"type": "string"
},
{
"value": 19890604,
"type": "long"
},
{
"value": "1989-06-04 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
},
{
"random": "0,10",
"type": "long"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"encoding": "UTF-8"
}
},
"transformer": [
{
"name": "dx_replace",
"parameter": {
"columnIndex": 0,
"paras": [
"11",
"6",
"wgzhao"
]
}
},
{
"name": "dx_substr",
"parameter": {
"columnIndex": 1,
"paras": [
"0",
"12"
]
}
},
{
"name": "dx_map",
"parameter": {
"columnIndex": 2,
"paras": [
"^",
"2"
]
}
},
{
"name": "dx_filter",
"parameter": {
"columnIndex": 6,
"paras": [
"<",
"5"
]
}
}
]
}
}
}Custom Functions
If the built-in functions do not meet your data transformation requirements, you can write code that conforms to Groovy specifications within the transformer. Here is a complete example:
{
"job": {
"setting": {
"speed": {
"byte": -1,
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": {
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "Addax",
"type": "string"
},
{
"incr": "1",
"type": "long"
},
{
"incr": "1989/06/04 00:00:01,-1",
"type": "date",
"dateFormat": "yyyy/MM/dd hh:mm:ss"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"column": [
"col1"
],
"encoding": "UTF-8"
}
},
"transformer": [
{
"name": "dx_groovy",
"description": "Add string 'Header_' to the first column value;Double the value of the second field",
"parameter": {
"code": "record.setColumn(0, new StringColumn('Header_' + record.getColumn(0).asString()));record.setColumn(1, new LongColumn(record.getColumn(1).asLong() * 2));return record;"
}
}
]
}
}
}The transformer code above modifies the first two fields of each record. It adds the prefix Header_ to the first string field and doubles the value of the second integer field. The execution result is as follows:
Details
$ bin/addax.sh job/transformer_demo.json
___ _ _
/ _ \ | | | |
/ /_\ \ __| | __| | __ ___ __
| _ |/ _` |/ _` |/ _` \ \/ /
| | | | (_| | (_| | (_| |> <
\_| |_/\__,_|\__,_|\__,_/_/\_\
:: Addax version :: (v4.0.2-SNAPSHOT)
2021-08-04 15:45:56.421 [ main] INFO VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2021-08-04 15:45:56.443 [ main] INFO Engine -
.....
2021-08-04 15:45:56.458 [ main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-08-04 15:45:56.459 [ main] INFO JobContainer - Addax jobContainer starts job.
2021-08-04 15:45:56.460 [ main] INFO JobContainer - Set jobId = 0
2021-08-04 15:45:56.470 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader] do prepare work .
2021-08-04 15:45:56.471 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter] do prepare work .
2021-08-04 15:45:56.471 [ job-0] INFO JobContainer - Job set Channel-Number to 1 channels.
2021-08-04 15:45:56.472 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader] splits to [1] tasks.
2021-08-04 15:45:56.472 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter] splits to [1] tasks.
2021-08-04 15:45:56.498 [ job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2021-08-04 15:45:56.505 [ taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2021-08-04 15:45:56.517 [ taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-08-04 15:45:56.517 [ taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2021-08-04 15:45:56.520 [ taskGroup-0] INFO TransformerUtil - user config transformers [[dx_groovy]], loading...
2021-08-04 15:45:56.531 [ taskGroup-0] INFO TransformerUtil - 1 of transformer init success. name=dx_groovy, isNative=true parameter =
{"code":"record.setColumn(0, new StringColumn('Header_' + record.getColumn(0).asString()));record.setColumn(1, new LongColumn(record.getColumn(1).asLong() * 2));return record;"}
Header_Addax 2 1989-06-04 00:00:01 true test
Header_Addax 4 1989-06-03 00:00:01 true test
Header_Addax 6 1989-06-02 00:00:01 true test
Header_Addax 8 1989-06-01 00:00:01 true test
Header_Addax 10 1989-05-31 00:00:01 true test
Header_Addax 12 1989-05-30 00:00:01 true test
Header_Addax 14 1989-05-29 00:00:01 true test
Header_Addax 16 1989-05-28 00:00:01 true test
Header_Addax 18 1989-05-27 00:00:01 true test
Header_Addax 20 1989-05-26 00:00:01 true test
2021-08-04 15:45:59.515 [ job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2021-08-04 15:45:59.517 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter] do post work.
2021-08-04 15:45:59.518 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader] do post work.
2021-08-04 15:45:59.521 [ job-0] INFO JobContainer - PerfTrace not enable!
2021-08-04 15:45:59.524 [ job-0] INFO StandAloneJobContainerCommunicator - Total 10 records, 330 bytes | Speed 110B/s, 3 records/s | Error 0 records, 0 bytes |
All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Transformer Success 10 records | Transformer Error 0 records | Transformer Filter 0 records
| Transformer usedTime 0.383s | Percentage 100.00%
2021-08-04 15:45:59.527 [ job-0] INFO JobContainer -
任务启动时刻 : 2021-08-04 15:45:56
任务结束时刻 : 2021-08-04 15:45:59
任务总计耗时 : 3s
任务平均流量 : 110B/s
记录写入速度 : 3rec/s
读出记录总数 : 10
读写失败总数 : 0
2021-08-04 15:45:59.528 [ job-0] INFO JobContainer -
Transformer成功记录总数 : 10
Transformer失败记录总数 : 0
Transformer过滤记录总数 : 0Metrics and Dirty Data
The Transform process involves data conversion, which may increase or decrease the amount of data. Therefore, precise metrics are needed, including:
- Number of input records and bytes for the Transform.
- Number of output records and bytes from the Transform.
- Number of dirty data records and bytes from the Transform.
- If there are multiple Transforms, and one of them generates dirty data, subsequent transforms will not be executed for that record, and it will be directly counted as dirty data.
- Currently, only overall metrics for all Transforms are provided (success, failure, filtered counts, and time consumed by the transform).
The metrics displayed during the process are defined as follows:
Total 1000000 records, 22000000 bytes | Transform 100000 records(in), 10000 records(out) | Speed 2.10MB/s, 100000 records/s | Error 0 records, 0 bytes | Percentage 100.00%INFO
This mainly records the input and output of the transformation, which requires monitoring changes in the number of data records.
The final job metrics are displayed as follows:
Job start at : 2025-07-23 09:08:26
Job end at : 2025-07-23 09:08:29
Job took secs : 3s
Average bps : 110B/s
Average rps : 3rec/s
Number of rec : 10
Failed record : 0
Transformer success records: 10
Transformer failed records: 0
Transformer filter records: 0Note: This mainly records the input and output of the transformation, which requires monitoring changes in the number of data records.