Skip to content

HTTP Reader

HTTP Reader plugin implements the ability to read Restful API data.

Example

Sample Interface and Data

The following configuration demonstrates how to get data from a specified API, assuming the accessed interface is:

http://127.0.0.1:9090/mock/17/LDJSC/ASSET

The interface accepts GET requests with the following parameters:

Parameter NameExample Value
CURR_DATE2021-01-17
DEPT9400
USERNAMEandi

The following is a sample of accessed data (actual returned data may vary slightly):

Details
json
{
  "result": [
    {
      "CURR_DATE": "2019-12-09",
      "DEPT": "9700",
      "TOTAL_MANAGED_MARKET_VALUE": 1581.03,
      "TOTAL_MANAGED_MARKET_VALUE_GROWTH": 36.75,
      "TMMARKET_VALUE_DOD_GROWTH_RATE": -0.009448781026677719,
      "TMMARKET_VALUE_GROWTH_MON": -0.015153586011995693,
      "TMMARKET_VALUE_GROWTH_YEAR": 0.0652347643813081,
      "TMMARKET_VALUE_SHARECOM": 0.024853621341525287,
      "TMMARKET_VALUE_SHARE_GROWTH_RATE": -0.005242133578517903,
      "AVERAGE_NEW_ASSETS_DAYINMON": 1645.1193961136973,
      "YEAR_NEW_ASSET_SSHARECOM": 0.16690149257388515,
      "YN_ASSET_SSHARECOM_GROWTH_RATE": 0.017886267801303465,
      "POTENTIAL_LOST_ASSETS": 56.76,
      "TOTAL_LIABILITIES": 57.81,
      "TOTAL_ASSETS": 1306.33,
      "TOTAL_ASSETS_DOD_GROWTH": 4.79,
      "TOTAL_ASSETS_DOD_GROWTH_RATE": -0.006797058194980485,
      "NEW_ASSETS_DAY": 14.92,
      "NEW_ASSETS_MON": 90.29,
      "NEW_ASSETS_YEAR": 297.32,
      "NEW_ASSETS_DOD_GROWTH_RATE": -0.04015576541561927,
      "NEW_FUNDS_DAY": 18.16,
      "INFLOW_FUNDS_DAY": 2.12,
      "OUTFLOW_FUNDS_DAY": 9.73,
      "OVERALL_POSITION": 0.810298404938773,
      "OVERALL_POSITION_DOD_GROWTH_RATE": -0.03521615634095476,
      "NEW_CUST_FUNDS_MON": 69.44,
      "INFLOW_FUNDS_MONTH": 62.26,
      "OUTFLOW_FUNDS_MONTH": 32.59
    },
    {
      "CURR_DATE": "2019-08-30",
      "DEPT": "8700",
      "TOTAL_MANAGED_MARKET_VALUE": 1596.74,
      "TOTAL_MANAGED_MARKET_VALUE_GROWTH": 41.86,
      "TMMARKET_VALUE_DOD_GROWTH_RATE": 0.03470208565515685,
      "TMMARKET_VALUE_GROWTH_MON": 0.07818120801111743,
      "TMMARKET_VALUE_GROWTH_YEAR": -0.05440250244736409,
      "TMMARKET_VALUE_SHARECOM": 0.09997733019626448,
      "TMMARKET_VALUE_SHARE_GROWTH_RATE": -0.019726478499825697,
      "AVERAGE_NEW_ASSETS_DAYINMON": 1007.9314679742108,
      "YEAR_NEW_ASSET_SSHARECOM": 0.15123738798885086,
      "YN_ASSET_SSHARECOM_GROWTH_RATE": 0.04694052069678048,
      "POTENTIAL_LOST_ASSETS": 52.48,
      "TOTAL_LIABILITIES": 55.28,
      "TOTAL_ASSETS": 1366.72,
      "TOTAL_ASSETS_DOD_GROWTH": 10.12,
      "TOTAL_ASSETS_DOD_GROWTH_RATE": 0.009708491982487952,
      "NEW_ASSETS_DAY": 12.42,
      "NEW_ASSETS_MON": 41.14,
      "NEW_ASSETS_YEAR": 279.32,
      "NEW_ASSETS_DOD_GROWTH_RATE": -0.025878627161898062,
      "NEW_FUNDS_DAY": 3.65,
      "INFLOW_FUNDS_DAY": 14.15,
      "OUTFLOW_FUNDS_DAY": 17.08,
      "OVERALL_POSITION": 0.9098432997243932,
      "OVERALL_POSITION_DOD_GROWTH_RATE": 0.02111922282868306,
      "NEW_CUST_FUNDS_MON": 57.21,
      "INFLOW_FUNDS_MONTH": 61.16,
      "OUTFLOW_FUNDS_MONTH": 15.83
    },
    {
      "CURR_DATE": "2019-06-30",
      "DEPT": "6501",
      "TOTAL_MANAGED_MARKET_VALUE": 1506.72,
      "TOTAL_MANAGED_MARKET_VALUE_GROWTH": -13.23,
      "TMMARKET_VALUE_DOD_GROWTH_RATE": -0.0024973354204176554,
      "TMMARKET_VALUE_GROWTH_MON": -0.015530793150701896,
      "TMMARKET_VALUE_GROWTH_YEAR": -0.08556724628979398,
      "TMMARKET_VALUE_SHARECOM": 0.15000077963967678,
      "TMMARKET_VALUE_SHARE_GROWTH_RATE": -0.049629446804825755,
      "AVERAGE_NEW_ASSETS_DAYINMON": 1250.1040863177336,
      "YEAR_NEW_ASSET_SSHARECOM": 0.19098445630488178,
      "YN_ASSET_SSHARECOM_GROWTH_RATE": -0.007881179708853471,
      "POTENTIAL_LOST_ASSETS": 50.53,
      "TOTAL_LIABILITIES": 56.62,
      "TOTAL_ASSETS": 1499.53,
      "TOTAL_ASSETS_DOD_GROWTH": 29.56,
      "TOTAL_ASSETS_DOD_GROWTH_RATE": -0.02599813232345556,
      "NEW_ASSETS_DAY": 28.81,
      "NEW_ASSETS_MON": 123.24,
      "NEW_ASSETS_YEAR": 263.63,
      "NEW_ASSETS_DOD_GROWTH_RATE": 0.0073986669331394875,
      "NEW_FUNDS_DAY": 18.52,
      "INFLOW_FUNDS_DAY": 3.26,
      "OUTFLOW_FUNDS_DAY": 6.92,
      "OVERALL_POSITION": 0.8713692113306709,
      "OVERALL_POSITION_DOD_GROWTH_RATE": 0.02977644553289545,
      "NEW_CUST_FUNDS_MON": 85.14,
      "INFLOW_FUNDS_MONTH": 23.35,
      "OUTFLOW_FUNDS_MONTH": 92.95
    },
    {
      "CURR_DATE": "2019-12-07",
      "DEPT": "8705",
      "TOTAL_MANAGED_MARKET_VALUE": 1575.85,
      "TOTAL_MANAGED_MARKET_VALUE_GROWTH": 8.94,
      "TMMARKET_VALUE_DOD_GROWTH_RATE": -0.04384846980627058,
      "TMMARKET_VALUE_GROWTH_MON": -0.022962456288549656,
      "TMMARKET_VALUE_GROWTH_YEAR": -0.005047009316021089,
      "TMMARKET_VALUE_SHARECOM": 0.07819484815809447,
      "TMMARKET_VALUE_SHARE_GROWTH_RATE": -0.008534369960890256,
      "AVERAGE_NEW_ASSETS_DAYINMON": 1340.0339240689955,
      "YEAR_NEW_ASSET_SSHARECOM": 0.19019952857677042,
      "YN_ASSET_SSHARECOM_GROWTH_RATE": 0.01272353909992914,
      "POTENTIAL_LOST_ASSETS": 54.63,
      "TOTAL_LIABILITIES": 53.17,
      "TOTAL_ASSETS": 1315.08,
      "TOTAL_ASSETS_DOD_GROWTH": 49.31,
      "TOTAL_ASSETS_DOD_GROWTH_RATE": 0.0016538407028265922,
      "NEW_ASSETS_DAY": 29.17,
      "NEW_ASSETS_MON": 44.75,
      "NEW_ASSETS_YEAR": 172.87,
      "NEW_ASSETS_DOD_GROWTH_RATE": 0.045388692595736746,
      "NEW_FUNDS_DAY": 18.46,
      "INFLOW_FUNDS_DAY": 12.93,
      "OUTFLOW_FUNDS_DAY": 10.38,
      "OVERALL_POSITION": 0.8083127036694828,
      "OVERALL_POSITION_DOD_GROWTH_RATE": -0.02847453515632541,
      "NEW_CUST_FUNDS_MON": 49.74,
      "INFLOW_FUNDS_MONTH": 81.93,
      "OUTFLOW_FUNDS_MONTH": 18.17
    }
  ]
}

We need to get partial key value data from the result results.

Configuration

The following configuration implements getting data from the interface and printing to terminal

json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": {
      "reader": {
        "name": "httpreader",
        "parameter": {
          "connection": [
            {
              "url": "http://127.0.0.1:9090/mock/17/LDJSC/ASSET",
              "proxy": {
                "host": "http://127.0.0.1:3128",
                "auth": "user:pass"
              }
            }
          ],
          "reqParams": {
            "CURR_DATE": "2021-01-18",
            "DEPT": "9700"
          },
          "resultKey": "result",
          "method": "GET",
          "column": [
            "CURR_DATE",
            "DEPT",
            "TOTAL_MANAGED_MARKET_VALUE",
            "TOTAL_MANAGED_MARKET_VALUE_GROWTH"
          ],
          "username": "user",
          "password": "passw0rd",
          "headers": {
            "X-Powered-by": "Addax"
          }
        }
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": "true"
        }
      }
    }
  }
}

Save the above content as job/httpreader2stream.json file.

Execution

Execute the following command for collection

bash
bin/addax.sh job/httpreader2stream.json

The output of the above command is roughly as follows:

Details
txt
2021-01-20 09:07:41.864 [main] INFO  VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2021-01-20 09:07:41.877 [main] INFO  Engine - the machine info  =>

	osInfo: 	Mac OS X x86_64 10.15.1
	jvmInfo:	AdoptOpenJDK 14 14.0.2+12
	cpu num:	8

	totalPhysicalMemory:	-0.00G
	freePhysicalMemory:	-0.00G
	maxFileDescriptorCount:	-1
	currentOpenFileDescriptorCount:	-1

	GC Names	[G1 Young Generation, G1 Old Generation]

	MEMORY_NAME                    | allocation_size                | init_size
	CodeHeap 'profiled nmethods'   | 117.21MB                       | 2.44MB
	G1 Old Gen                     | 2,048.00MB                     | 39.00MB
	G1 Survivor Space              | -0.00MB                        | 0.00MB
	CodeHeap 'non-profiled nmethods' | 117.21MB                       | 2.44MB
	Compressed Class Space         | 1,024.00MB                     | 0.00MB
	Metaspace                      | -0.00MB                        | 0.00MB
	G1 Eden Space                  | -0.00MB                        | 25.00MB
	CodeHeap 'non-nmethods'        | 5.57MB                         | 2.44MB


2021-01-20 09:07:41.903 [main] INFO  Engine -
{
	"content":
		{
			"reader":{
				"parameter":{
					"reqParams":{
						"CURR_DATE":"2021-01-18",
						"DEPT":"9700"
					},
					"method":"GET",
					"column":[
						"CURR_DATE",
						"DEPT",
						"TOTAL_MANAGED_MARKET_VALUE",
						"TOTAL_MANAGED_MARKET_VALUE_GROWTH"
					],
					"resultKey":"result",
					"connection":[
						{
							"url":"http://127.0.0.1:9090/mock/17/LDJSC/ASSET"
						}
					]
				},
				"name":"httpreader"
			},
			"writer":{
				"parameter":{
					"print":"true"
				},
				"name":"streamwriter"
			}
	},
	"setting":{
		"speed":{
			"bytes":-1,
			"channel":1
		}
	}
}

2021-01-20 09:07:41.926 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-01-20 09:07:41.927 [main] INFO  JobContainer - Addax jobContainer starts job.
2021-01-20 09:07:41.928 [main] INFO  JobContainer - Set jobId = 0
2021-01-20 09:07:42.002 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started

2019-08-30	9700	1539.85	-14.78
2019-10-01	9700	1531.71	47.66
2020-12-03	9700	1574.38	7.34
2020-11-31	9700	1528.13	41.62
2019-03-01	9700	1554.28	-9.29

2021-01-20 09:07:45.006 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-01-20 09:07:41
任务结束时刻                    : 2021-01-20 09:07:44
任务总计耗时                    :                  3s
任务平均流量                    :               42B/s
记录写入速度                    :              1rec/s
读出记录总数                    :                   5
读写失败总数                    :                   0

Parameters

ConfigurationRequiredData TypeDefault ValueDescription
urlYesstringNoneHTTP address to access
reqParamsNomapNoneInterface request parameters
resultKeyNostringNoneKey value to get results, if getting entire return value, no need to fill
methodNostringgetRequest mode, only supports GET and POST, case insensitive
columnYeslistNoneKeys to get, configure as "*" to get all key values
usernameNostringNoneAuthentication account required for interface request (if any)
passwordNostringNonePassword required for interface request (if any)
authConfigNomapNoneAuth endpoint config; fetch token first, then inject it into business request headers
proxyNomapNoneProxy address, see description below
headersNomapNoneCustom request header information
isPageNobooleanNoneWhether interface supports pagination
pageParamsNomapNonePagination parameters

reqParams

reqParams are request parameters. If the request is GET method, it will be appended to the url in k=v format. If the request is POST mode, reqParams will be sent as JSON content in the request body. In particular, in POST mode, if your request body is not a k-v structure, you can set the key to empty string, like:

json
{
  "reqParams": {
    "": [123, 3456]
  }
}

The program will handle this case specially.

authConfig

authConfig is used for "authenticate first, then read business data" scenarios. When configured, httpreader calls the auth endpoint first, extracts a token from the auth response, and injects it into business request headers.

Example:

json
{
  "authConfig": {
    "url": "http://127.0.0.1:9090/auth/login",
    "method": "POST",
    "reqParams": {
      "username": "demo",
      "password": "demo"
    },
    "headers": {
      "X-Auth-Client": "Addax"
    },
    "resultKey": "data.token",
    "tokenHeader": "Authorization",
    "tokenPrefix": "Bearer "
  }
}

Field description:

  • url: Auth endpoint URL (required).
  • method: Auth request method, supports GET/POST, default POST.
  • reqParams: Auth request parameters; for POST they are sent as JSON body, for GET they are appended to URL.
  • headers: Extra request headers for auth endpoint.
  • resultKey: Token extraction path. JSONPath style is supported (for example data.token or $.data.token), default token.
  • tokenHeader: Target header name for token injection in business requests, default Authorization.
  • tokenPrefix: Prefix used when injecting token, default Bearer .

proxy

If the accessed interface needs to go through a proxy, you can configure the proxy configuration item, which is a json dictionary containing a required host field and an optional auth field.

json
{
  "proxy": {
    "host": "http://127.0.0.1:8080",
    "auth": "user:pass"
  }
}

For socks proxy (V4, V5), you can write:

json
{
  "proxy": {
    "host": "socks://127.0.0.1:8080",
    "auth": "user:pass"
  }
}

host is the proxy address, including proxy type. Currently only supports http proxy and socks (both V4 and V5) proxy. If the proxy requires authentication, you can configure auth, which consists of username and password separated by colon (:).

column

Besides directly specifying keys, column also allows using JSON Xpath style to specify key values to get. Suppose you want to read the following JSON file:

json
{
  "result": [
    {
      "CURR_DATE": "2019-12-09",
      "DEPT": {
        "ID": "9700"
      },
      "KK": [
        {
          "COL1": 1
        },
        {
          "COL2": 2
        }
      ]
    },
    {
      "CURR_DATE": "2021-11-09",
      "DEPT": {
        "ID": "6500"
      },
      "KK": [
        {
          "COL1": 3
        },
        {
          "COL2": 4
        }
      ]
    }
  ]
}

If we want to read CURR_DATE, ID, COL1, COL2 as four fields, your column can be configured like this:

json
{
  "column": ["CURR_DATE", "DEPT.ID", "KK[0].COL1", "KK[1].COL2"]
}

The execution result is as follows:

bash
...
2021-10-30 14:01:50.273 [ taskGroup-0] INFO  Channel              - Channel set record_speed_limit to -1, No tps activated.

2019-12-09	9700	1	2
2021-11-09	6500	3	4

2021-10-30 14:01:53.283 [       job-0] INFO  AbstractScheduler    - Scheduler accomplished all tasks.
2021-10-30 14:01:53.284 [       job-0] INFO  JobContainer         - Addax Writer.Job [streamwriter] do post work.
2021-10-30 14:01:53.284 [       job-0] INFO  JobContainer         - Addax Reader.Job [httpreader] do post work.
2021-10-30 14:01:53.286 [       job-0] INFO  JobContainer         - PerfTrace not enable!
2021-10-30 14:01:53.289 [       job-0] INFO  JobContainer         -
Task start time                    : 2021-10-30 14:01:50
Task end time                      : 2021-10-30 14:01:53
Task total duration                :                  3s
Task average throughput            :               10B/s
Record write speed                 :              0rec/s
Total records read                 :                   2
Total read/write failures          :                   0

Note: If you specify a non-existent key, it returns NULL value directly.

isPage

The isPage parameter is used to specify whether the interface supports pagination. It is a boolean value. If true, it means the interface supports pagination, otherwise it doesn't.

When the interface supports pagination, it will automatically paginate reading until the number of records returned by the interface's last return is less than the number of records per page.

pageParams

The pageParams parameter only takes effect when the isPage parameter is true. It is a JSON dictionary containing two optional fields pageIndex and pageSize.

pageIndex is used to indicate the current page for pagination. It is a JSON field containing two optional fields key and value, where key specifies the parameter name for page number, and value specifies the current page number value.

pageSize is used to indicate the page size for pagination. It is a JSON field containing two optional fields key and value, where key specifies the parameter name for page size, and value specifies the page size value.

The default values for these two parameters are:

json
{
  "pageParams": {
    "pageIndex": {
      "key": "pageIndex",
      "value": 1
    },
    "pageSize": {
      "key": "pageSize",
      "value": 100
    }
  }
}

If your interface pagination parameters are not pageIndex and pageSize, you can specify them through the pageParams parameter. For example:

json
{
  "isPage": true,
  "pageParams": {
    "pageIndex": {
      "key": "page",
      "value": 1
    },
    "pageSize": {
      "key": "size",
      "value": 100
    }
  }
}

This means the pagination parameters passed to the interface are page=1&size=100.

Limitations

  1. The returned result must be JSON type
  2. Currently all key values are treated as string type
  3. Currently only one auth call is performed at task startup; automatic token refresh is not supported