Stream processor¶
Example 1 - Delimited¶
Endpoint: /stream
HTTP method: POST
HTTP Header:
Content-Type: application/json
HTTP POST body: JSON object or array of objects per below schema
{
"data": String,
"dataformat": {
"format": "delimited",
"fields": Object,
},
"delimiter": Character,
"quotechar": Character,
"timeformat": String,
"timezone": String,
"batch_id": String
}
data: Delimited file content, UTF-8 encoded
format: Set to the string “delimited”
- fields: A json object with below keys:
timestamp: iso-8601 string or unix epoch
longitude: decimal in WGS84 / EPSG 4326 datum
latitude: decimal in WGS84 / EPSG 4326 datum
device_id: This is used to group timeseries records. Device id is required only if your data contains GPS records from two or more devices. Geo filter functions view data as a continuous timeseries to detect anomalies. In the absence of device id, data from multiple devices will be grouped into one series causing erroneous filtering.
Values for the keys are specified as an array [column number, width]. Column numbers are 0 indexed. e.g.;
{ timestamp: [0, 99], //timestamp is the first field. Width is ignored for delimited files. longitude: [2, 99], //longitude is field 3 latitude: [3, 99], // latitude is field 4 device_id: [9, 99] // device id is field 10 } // Here additional fields in the file - fields 5 to 9 - will be ignored.
delimiter: Delimiter character. e.g., “,”
quotechar: Quote char for delimited files. Default double quote.
timeformat: “iso” or “unix” to denote iso8601 or unix epoch seconds respectively
timezone: Optional. API will convert all timestamps to UTC. Value in this field is used to preserve original timezone info. Timezone list
batch_id: Optional. Used to tag/query records submitted in a batch.
Download delimited example delimited.csv
import json
import requests
headers = {'Content-Type': 'application/json'}
with open('delimited.csv', mode='rt', encoding='utf-8') as f:
data = f.read()
payload = {
"data": data,
"dataformat": {
"format": "delimited",
"fields": {
"timestamp": [1, 99],
"longitude": [4, 99],
"latitude": [3, 99],
"device_id": [2, 99]
}
},
"delimiter": ",",
"quotechar": '"',
"timeformat": "iso",
"batch_id": "batch-1"
}
api_url = 'http://<server public ip>/stream'
r = requests.post(api_url, data=json.dumps(payload), headers=headers)
>>> print(r.status_code)
202
>>> print(r.text)
{
"queue_id": "9de1a91e-6c94-46a4-88fa-ceca3b0b0a9c_35b82dcb-ef04-48f2-bc28-e4e12fb51c59",
"status": 202
}
>>> print(r.headers['location'])
http://<server public ip>/stream/queue/9de1a91e-6c94-46a4-88fa-ceca3b0b0a9c_35b82dcb-ef04-48f2-bc28-e4e12fb51c59
Sucessful POST returns HTTP 202 Accepted as response code.
Response body text or location header gives you the queue id.
Queued message is processed after a delay of 15 mins or so. See Debounce
You can check status of message in queue using GET /stream/queue/<queue_id>
Depending on Debounce and number of records in the queue, it may take 15-30 minutes or more for the records to be parsed, cleaned and geocoded. You can then download the cleaned records as geosptial timeeries data using any of the API methods below:
1.Processed stream as a geospatial timeseries json using GET /stream
import json
import requests
batch_id = requests.get('http://<public server IP>/stream/queue/9de1a91e-6c94-46a4-88fa-ceca3b0b0a9c_35b82dcb-ef04-48f2-bc28-e4e12fb51c59').json()['batch_id']
params = {'batch_id': batch_id}
r = requests.get('http://<public server IP>/stream', params=params)
with open('stream_delimited.json', 'wt', encoding='utf-8') as f:
f.write(r.text)
See the response stream_delimited.json
2.Processed stream as a csv file using GET /stream/csv
import json
import requests
batch_id = requests.get('http://<public server IP>/stream/queue/9de1a91e-6c94-46a4-88fa-ceca3b0b0a9c_35b82dcb-ef04-48f2-bc28-e4e12fb51c59').json()['batch_id']
params = {'batch_id': batch_id}
r = requests.get('http://<public server IP>/stream/csv', params=params)
with open('stream_delimited.csv', 'wt', encoding='utf-8') as f:
f.write(r.text)
See the response stream_delimited.csv
3.Processed stream segmented into trip segments using GET /trips
import json
import requests
batch_id = requests.get('http://<public server IP>/stream/queue/9de1a91e-6c94-46a4-88fa-ceca3b0b0a9c_35b82dcb-ef04-48f2-bc28-e4e12fb51c59').json()['batch_id']
params = {'batch_id': batch_id, 'embed': 'address' }
r = requests.get('http://<public server IP>/trips', params=params)
with open('trips_delimited.json', 'wt', encoding='utf-8') as f:
f.write(r.text)
Example 2 - Fixed width¶
Endpoint: /stream
HTTP method: POST
HTTP Header:
Content-Type: application/json
HTTP POST body: JSON object or array of objects per below schema
{
"data": String,
"dataformat": {
"format": "fixed",
"fields": Object,
},
"timeformat": String,
"timezone": String,
"batch_id": String
}
data: Fixed width file content, UTF-8 encoded
format: Set to the string “fixed”
- fields: A json object with below keys:
timestamp: iso-8601 string or unix epoch
longitude: decimal in WGS84 / EPSG 4326 datum
latitude: decimal in WGS84 / EPSG 4326 datum
device_id: This is used to group timeseries records. Device id is required only if your data contains GPS records from two or more devices. Geo filter functions view data as a continuous timeseries to detect anomalies. In the absence of device id, data from multiple devices will be grouped into one series causing erroneous filtering.
Values for the keys are specified as an array [start column number, column width]. Here column number is character position in the fixed width file with first character starting at 0.
{ timestamp: [20, 120], // timestamp field starts at character or column 20, width 120 chars. longitude: [410, 30], latitude: [300, 30], device_id: [200, 20] }
timeformat: “iso” or “unix” to denote iso8601 or unix epoch seconds respectively
timezone: Optional. API will convert all timestamps to UTC. Value in this field is used to preserve original timezone info. Timezone list
batch_id: Optional. Used to tag/query records submitted in a batch.
Download fixed width example fixed.txt
import json
import requests
headers = {'Content-Type': 'application/json'}
with open('fixed.txt', mode='rt', encoding='utf-8') as f:
data = f.read()
payload = {
"data": data,
"dataformat": {
"format": "fixed",
"fields": {
"timestamp": [20, 120],
"longitude": [410, 30],
"latitude": [300, 30],
"device_id": [200, 20]
}
},
"timeformat": "iso",
"batch_id": "batch-fixed-2"
}
api_url = 'http://<server public ip>/stream'
r = requests.post(api_url, data=json.dumps(payload), headers=headers)
>>> print(r.status_code)
202
>>> print(r.text)
{
"queue_id": "68dc3c05-a7a5-4f82-bf74-15046335e77d_7c715e40-658e-4752-a81e-0eba40398034",
"status": 202
}
>>> print(r.headers['location'])
http://<server public ip>/stream/queue/68dc3c05-a7a5-4f82-bf74-15046335e77d_7c715e40-658e-4752-a81e-0eba40398034
Sucessful POST returns HTTP 202 Accepted as response code.
Response body text or location header gives you the queue id.
Queued message is processed after a delay of 15 mins or so. See Debounce
You can check status of message in queue using GET /stream/queue/<queue_id>
Message is fully processed when status changes to processed
Depending on Debounce and number of records in the queue, it may take 15-30 minutes or more for the records to be parsed, cleaned and geocoded.
When status changes to processed, you can download the records as geosptial timeseries data using any of the API methods below:
1.Processed stream as a geospatial timeseries json using GET /stream
import json
import requests
batch_id = requests.get('http://<public server IP>/stream/queue/68dc3c05-a7a5-4f82-bf74-15046335e77d_7c715e40-658e-4752-a81e-0eba40398034').json()['batch_id']
params = {'batch_id': batch_id}
r = requests.get('http://<public server IP>/stream', params=params)
with open('stream_fixed.json', 'wt', encoding='utf-8') as f:
f.write(r.text)
See the response stream_fixed.json
2.Processed stream as a csv file using GET /stream/csv
import json
import requests
batch_id = requests.get('http://<public server IP>/stream/queue/68dc3c05-a7a5-4f82-bf74-15046335e77d_7c715e40-658e-4752-a81e-0eba40398034').json()['batch_id']
params = {'batch_id': batch_id}
r = requests.get('http://<public server IP>/stream/csv', params=params)
with open('stream_fixed.csv', 'wt', encoding='utf-8') as f:
f.write(r.text)
See the response stream_fixed.csv
3.Processed stream segmented into trip segments using GET /trips
import json
import requests
batch_id = requests.get('http://<public server IP>/stream/queue/68dc3c05-a7a5-4f82-bf74-15046335e77d_7c715e40-658e-4752-a81e-0eba40398034').json()['batch_id']
params = {'batch_id': batch_id, 'embed': 'address' }
r = requests.get('http://<public server IP>/trips', params=params)
with open('trips_fixed.json', 'wt', encoding='utf-8') as f:
f.write(r.text)
Example 3 - JSON Basic¶
Endpoint: /stream/json_basic
HTTP method: POST
HTTP Header:
Content-Type: application/json
HTTP POST body: JSON object or array of objects per below schema
{
"latitude": Double, // required
"longitude": Double, // required
"altitude": Double,
"accuracy": Double,
"speed": Double,
"heading": Double,
"activity_type": String,
"activity_confidence": {
"type": Double,
"minimum": 0,
"maximum": 100
},
"timestamp": ISO-8601 UTC, // required
"device_id": String
}
Download JSON data example basic.json
import json
import requests
headers = {'Content-Type': 'application/json'}
with open('basic.json', 'rt') as f:
data = json.load(f)
api_url = 'http://<server public ip>/stream/json_basic'
r = requests.post(api_url, data=json.dumps(data), headers=headers)
>>> print(r.status_code)
202
>>> print(r.text)
{
"queue_id": "0b32373b-09cb-4a09-b7be-a1c8d9c4d75b_31f7f57c-c324-4739-8703-20f544efc213",
"status": 202
}
>>> print(r.headers['location'])
http://<server public ip>/stream/queue/0b32373b-09cb-4a09-b7be-a1c8d9c4d75b_31f7f57c-c324-4739-8703-20f544efc213
Sucessful POST returns HTTP 202 Accepted as response code.
Response body text or location header gives you the queue id.
Queued message is processed after a delay of 15 mins or so. See Debounce
You can check status of message in queue using GET /stream/queue/<queue_id>
Message is fully processed when status changes to processed
Depending on Debounce and number of records in the queue, it may take 15-30 minutes or more for the records to be parsed, cleaned and geocoded.
When status changes to processed, you can download the records as geosptial timeseries data using any of the API methods below:
1.Processed stream as a geospatial timeseries json using GET /stream
import json
import requests
params = {'batch_id': 'Q7KwEQNiAUI_2020-09-11T14-12-20.088284'} # batch_id from `GET /stream/queue/<queue_id>`_
r = requests.get('http://<public server IP>/stream', params=params)
with open('stream_basic.json', 'wt') as f:
f.write(r.text)
See the response stream_basic.json
2.Processed stream as a csv file using GET /stream/csv
import json
import requests
params = {'batch_id': 'Q7KwEQNiAUI_2020-09-11T14-12-20.088284'} # batch_id from `GET /stream/queue/<queue_id>`_
r = requests.get('http://<public server IP>/stream/csv', params=params)
with open('stream_basic.csv', 'wt') as f:
f.write(r.text)
See the response stream_basic.csv
3.Processed stream segmented into trip segments using GET /trips
import json
import requests
params = {'batch_id': 'Q7KwEQNiAUI_2020-09-11T14-12-20.088284', 'embed': 'address' } # batch_id from `GET /stream/queue/<queue_id>`_
r = requests.get('http://<public server IP>/trips', params=params)
with open('trips_basic.json', 'wt') as f:
f.write(r.text)
Example 4 - JSON Rnbg¶
React Native Background Geolocation by Transistorsoft - Rnbg is a popular battery efficient location tracking module for IOS & Android. You can configure Rnbg SDK or demo app to post location data continuously to geofilter stream endpoint. When building continuous location tracking apps, use Rnbg’s batch mode to group multiple records into a single HTTP request.
Endpoint: /stream/json_rnbg
HTTP method: POST
HTTP Header:
Content-Type: application/json
HTTP POST body: JSON ->
{
"imei": [String], // if given, geofilter will use this as the device ID.
"location": { // this is an array of location objects in rnbg batch mode
"coords": {
"latitude": Double,
"longitude": Double,
"accuracy": Double,
"speed": Double,
"heading": Double,
"altitude": Double
},
"extras": { // <-- optional meta-data
"foo": "bar"
},
"activity": {
"type": still|on_foot|walking|running|in_vehicle|on_bicycle|unknown,
"confidence": [0-100%]
},
"geofence": { // <-- Present only if a geofence was triggered at this location
"identifier": String,
"action": String ENTER|EXIT
},
"battery": {
"level": Double,
"is_charging": Boolean
},
"timestamp": ISO-8601 UTC, // eg: "2015-05-05T04:31:54.123Z"
"uuid": String, // <-- Universally unique identifier
"event" String, // <-- motionchange|geofence|heartbeat
"is_moving": Boolean, // <-- The motion-state when recorded.
"odometer": Double/meters
}
}
Download JSON data example rnbg.json
import json
import requests
headers = {'Content-Type': 'application/json'}
with open('rnbg.json', 'rt') as f:
data = json.load(f)
api_url = 'http://<server public ip>/stream/json_rnbg'
r = requests.post(api_url, data=json.dumps(data), headers=headers)
>>> print(r.status_code)
202
>>> print(r.text)
{
"queue_id": "b119738f-a14b-4a39-b47d-0a19e4ed2ff9_55ae48cd-bdf8-4c83-8e8d-530dbf60ead9",
"status": 202
}
>>> print(r.headers['location'])
http://<server public ip>/stream/queue/b119738f-a14b-4a39-b47d-0a19e4ed2ff9_55ae48cd-bdf8-4c83-8e8d-530dbf60ead9
Sucessful POST returns HTTP 202 Accepted as response code.
Response body text or location header gives you the queue id.
Queued message is processed after a delay of 15 mins or so. See Debounce
You can check status of message in queue using GET /stream/queue/<queue_id>
Message is fully processed when status changes to processed
Depending on Debounce and number of records in the queue, it may take 15-30 minutes or more for the records to be parsed, cleaned and geocoded.
When status changes to processed, you can download the records as geosptial timeseries data using any of the API methods below:
1.Processed stream as a geospatial timeseries json using GET /stream
import json
import requests
params = {'batch_id': 'PXNP2l7pUBs_2020-09-10T12-48-17.634733'} # batch_id from `GET /stream/queue/<queue_id>`_
r = requests.get('http://<public server IP>/stream', params=params)
with open('stream_rnbg.json', 'wt') as f:
f.write(r.text)
2.Processed stream as a csv file using GET /stream/csv
import json
import requests
params = {'batch_id': 'PXNP2l7pUBs_2020-09-10T12-48-17.634733'} # batch_id from `GET /stream/queue/<queue_id>`_
r = requests.get('http://<public server IP>/stream/csv', params=params)
with open('stream_rnbg.csv', 'wt') as f:
f.write(r.text)
3.Processed stream segmented into trip segments using GET /trips
import json
import requests
params = {'batch_id': 'PXNP2l7pUBs_2020-09-10T12-48-17.634733', 'embed': 'address' } # batch_id from `GET /stream/queue/<queue_id>`_
r = requests.get('http://<public server IP>/trips', params=params)
with open('trips_rnbg.json', 'wt') as f:
f.write(r.text)
API methods¶
POST /stream¶
Endpoint: /stream
HTTP method: POST
HTTP Header:
Content-Type: application/json
HTTP POST body: JSON object or array of objects per below schema
{
"data": String,
"dataformat": {
"format": String,
"fields": {
"timestamp": Array[Integer, Integer],
"longitude": Array[Integer, Integer],
"latitude": Array[Integer, Integer],
"device_id": Array[Integer, Integer]
},
},
"delimiter": Character,
"quotechar": Character,
"timeformat": String,
"timezone": String,
"batch_id": String
}
data: File content, UTF-8 encoded
- format: String. Allowed values are:
“delimited”: When data contains Delimited file content.
“fixed”: Fixed width file.
“json_basic”: Data is in JSON Basic format
“json_rnbg”: Data is in JSON Rnbg format.
- fields: A json object with below keys; required only for Delimited or Fixed format data.
timestamp: [column, width]
longitude: [column, width]
latitude: [column, width]
device_id: [column, width]
Values for the keys are specified as an array [column number, width].
Column numbers start at 0.
// Fixed width example // column index starts at 0. { timestamp: [20, 120], //timestamp starts at column 20 , with a width of 120 chars. longitude: [410, 30], //longitude starts at column 410, with a width of 30 chars. latitude: [300, 30], //latitude starts at column 300, with a width of 30 chars. device_id: [200, 20] //device id starts at column 200, with a width of 20 chars. } // Delimited example { timestamp: [0, 99], //timestamp is in column 0, width is ignored. longitude: [4, 99], //longitude is in column 4, width is ignored. latitude: [3, 99], //latitude is in column 3, width is ignored. device_id: [2, 99] //device id is in column 2, width is ignored. }
delimiter: Delimiter character. e.g., “,”
quotechar: Quote char for delimited files. Default double quote.
timeformat: “iso” or “unix” to denote iso8601 or unix epoch seconds respectively
timezone: Optional. API will convert all timestamps to UTC. Value in this field is used to preserve original timezone info. Timezone list
batch_id: Optional. Used to tag/query records submitted in a batch.
POST response: HTTP 202 Accepted. Response body gives queue_id.
{ "queue_id": "<queue_id>", "status": 202 }
POST /stream/json_basic¶
This is a helper method for posting to /stream with JSON Basic format. See the example
POST /stream/json_rnbg¶
This is a helper method for posting to /stream with JSON Rnbg format. See the example
GET /stream/queue/<queue_id>¶
Check status of messages in the queue.
processed is the final status before you can download clean data.
Example response.
{
"batch_id": "PXNP2l7pUBs_2020-09-10T12-48-17.634733",
"end_time": "2020-09-10T12:48:21.420224+00:00",
"records_processed": 3352,
"start_time": "2020-09-10T12:48:17.634432+00:00",
"status": "parsed"
}
GET /stream¶
Retrieve processed, cleaned data as a geospatial time series sorted on timestamp.
Parameters allowed in the HTTP GET request.
- include_filtered: Default false. Set to true to include filtered out records(outliers, static navigation error etc.) to be included in the output. The output file will then contain below boolean fields:
filter_time: Invalid timestamp, e.g., a timestamp in the future due to clock error, satellite fix issues.
filter_accuracy: GPS accuracy less than 250 meteres. This filter is applied when accuracy is supplied in the input data when using JSON Rnbg format.
filter_static: Static navigation errors.
filter_jumps: Jumps caused by GPS signal reflection, satellite fix issues.
start_time: ISO 8601 timestring. Retrieve records with timestamp >= start_time.
end_time: ISO 8601 timestring. Retrieve records with timestamp <= end_time.
batch_id: Filter records based on batch_id, retrieved uding GET /stream/queue/<queue_id>
device_id: Filter records based on device_id, if supplied in the input data.
activity: Filter records based on activity type. Used with JSON Rnbg format.
trip_id: Filter records based on trip_id.
HTTP response: HTTP 200.
- JSON data with below fields:
Field
Description
id
global record id
row_id
record id within a batch
timestamp
iso-8601 timestamp(UTC)
tz
original timezone
batch_id
trip_id
device_id
lattiude
decimal, epsg 4326
longitude
decimal, epsg 4326
altitude
heading
speed
activity
activity_confidence
accuracy
filter_time
true if time in future
filter_accuracy
true if accuracy < 250m
filter_static
true if static error
filter_jumps
true if GPS jumps
address
OpenStreetMap GeoJSON
GET /stream/csv¶
Helper method to retrieve GET /stream JSON as a flattened csv file.
GET /trips¶
Retrieve processed geospatial timeseries data grouped into journey or trip records with trip start and end times, trip distance and trip route polyline/geojson.
- Parameters allowed in the HTTP GET request:
trip_id: Retrieve only specified trip_id.
device_id: Retrieve trip records for a specific device.
batch_id: Retrieve trip records for a batch_id.
emebd: Specify emebd=address to return OpenStreetMap address records for trip start and end points.
See an example HTTP request for trips
HTTP response: HTTP 200.
- An array of JSON objects with below fields:
Field
Description
id
trip id
device_id
start_time
iso-8601 timestamp(UTC)
end_time
iso-8601 timestamp(UTC)
timezone_offset
original timezone
metres
trips distance in meteres
route_gejson
route as a GeoJSON LineString
route_polyline
route as a Google polyline
address
OSM GeoJSON start/end address
POST /tasks/schedules¶
Update task schedule and debounce delay.
Endpoint: /tasks/schedules
HTTP method: POST
HTTP Header:
Content-Type: application/json
HTTP POST body: JSON object or array of objects per below schema
{
'debounce': Integer, // how many seconds to wait since the last data upload.
'crontab': String // a valid linux crontab entry
}
Example POST body
{
'debounce': 60, // debounce delay of 60 seconds, default value.
'crontab': '*/2 * * * *' // run every 2 mins, default value.
}
GET /tasks/schedules¶
Retrieve scheduled background task details. An example response is below:
{
"args": "()",
"coalesce": "True", // tasks are merged into one when their schedules overlap
"id": "cleaner_master", // master task that initiates filter/clean tasks.
"kwargs": "{'enqueue': True, 'debounce': 60}", // debounce: time in seconds to wait since the last data upload before kicking off tasks.
"max_instances": "1",
"misfire_grace_time": "900",
"name": "cleaner_master",
"next_run_time": "2020-09-15 23:30:00+00:00", // next scheduled run time. Default: every 2 mins.
"trigger": "cron[month='*', day='*', day_of_week='*', hour='*', minute='*/2']" // crontab entry for the task. Default: every 2 mins.
}
Debounce¶
Filter functions work best when applied to a sufficiently large batch of geospatial data. For example, trip segmentation works by iterating through the dataset and identifying gaps(or segments) between groups of records. If segmentation task runs while you are uploading a large batch of records, the task will fail to segment the dataset accurately.
To reduce errors, the queue processor task waits for a gap in data uploads before applying filters. This debounce delay is set to 5 mins in the current version.
When you have many files to process, continue to upload them using scp/sftp to file processor inbox. Or use HTTP POST. Filter functions will be triggerred when there is atleast 5 mins idle time. This is usually 5 mins after the last upload. It may take another 15 - 30 mins or longer to completely process all records depedending on the number of messages to be processed.
To view the settings for debounce delay and task schedules use GET /tasks/schedules.
To update debounce delay and task schedule use POST /tasks/schedules