A Framework for Real-Time Processing Processing of Sensor Datain the Cloud
Supun Kamburugamuve
[emailprotected]
Leif Christiansen
[emailprotected]
Geoffrey Fox
[emailprotected]
School of Informatics and Computing and Community GridsLaboratory
Indiana University, Bloomington IN 47408 USA
Abstract: In this paper we describe IoTCloud; a platform toconnect smart devices to cloud services for real time dataprocessing and control. A connected device to IoTCloud cancommunicate with the real time data processing analysis frameworksrunning deployed in the cloud via messaging. The platform design isscalable in connecting devices, transferring data and processingdata. A user can develops real time data processing algorithms inan abstract framework without concern for underlying details of howthe data is distributed and transferred. For this platform weprimarily consider real time robotics applications such asautonomous robot navigation, where there are strict requirements onprocessing latency and demand for scalable processing. Todemonstrate the feasibility of the system, a robotic application isdeveloped on top of the framework. The system and the roboticsapplication characteristics are measured to show that dataprocessing in central data centers can be used forservers isfeasible for real time robotics sensor applications.
1. Introduction
The availability of internet connections and low manufacturingcosts have led to a boom in smart objects, devices with atripartite construction consisting of a CPU, memory storage, and awireless connection. These smart objects (or devices) are equippedwith sensors that produce data and actuators that are capable ofreceiving commands. Such devices are widespread in all the fieldsand usages are expected to grow exponentially in the future. Forthese devices, central data processing has been shown to beadvantageous due to numerous factors including: the ability toeasily draw from vast stores of information, efficient allocationof computing resources and a proclivity for parallelization.Because of these factors, many devices may benefit from processingonly some data locally and offloading the remaining processing tocentral servers. Among the aforementioned devices, and increasinglypresent in modern life, are robots. Robots such as the iRobotRoomba, a robot that can clean the floor, present affordable,automated aids for daily living. Additionally, Amazon and Googleare researching and developing platforms for delivering consumerproducts using drones. Most of these robots have limited onboardprocessing power but still generate large amounts of data. A highdefinition video stream from a drone-mounted camera is a goodexample of a high volume data stream. Cloud based control analysisof data coming from such robots creates many challenges is achallenging area due to strict latency requirements and highvolumes of data production.
To process data coming from many smart devices, we need scalabledata processing platforms. Because of its efficiency and agility,the Ccloud is an ideal computational platform for hosting dataprocessing applications for smart devices, because of itsefficiency and agility. The. Cloud computing[1] refers to bothapplications delivered as services over the Internet and thehardware and system software in the datacenters that provide thoseservices. Cloud computing enables computing as a utility and isgradually becoming the standard for computation, allowing thesystems and users to use Platform as a Service (PaaS),Infrastructure as a Service (IaaS), and Software as a Service(SaaS). The computational nodes are provisioned, configured andreconfigured dynamically in the cloud. These machines can be in theform of virtual machines or physical machines. Furthermore, sensorbased applications can benefit from in-house private cloudenvironments hosted within organizations or from public cloudshosted by large organizations.
In order to process data generated by smart devices in a cloudenvironment, the data must be transmitted from the devices to thecloud in an efficient and scalable manner. The communicationbetween cloud applications and the devices is essentially based onevents, which suggests that the traditional request/responseapproach is not appropriate. For example, when using requests andresponses, a device requiring real time control has to poll theapplications continuously. Continuous polling increases the latencyand network traffic. Transmission of events is well supported bypublish-subscribe messaging[2] where a publisher makes informationavailable to subscribers in an asynchronous fashion. Over timePublish-Subscribe messaging has emerged as a distributedintegration paradigm for deployment of scalable and loosely coupledsystems. Subscribers have the ability to express their interest inan event, or a pattern of events, and are subsequently notified ofany event, generated by a publisher, which matches their registeredinterest. An event is asynchronously propagated to all subscribersthat registered interest in that given event and subscribers.Publish-Subscribe messaging decouples the message producers andconsumers in the dimensions of time, space and synchronization. Thedecoupling favors the scalability of the message producing andconsuming systems. Because of these features, publish-subscribemessaging is being proposed as a good fit for connecting smartdevices to cloud applications.
Topic based and content based are two different widely usedschemes of pub-sub systems. In topic based systems the messages arepublished to topics which are identified by keywords. The consumerssubscribe to topics and receive messages coming to these topics. Incontent based systems the consumers subscribe to messages based onthe properties of the messages. This means the content of eachmessage has to be examined at the broker middleware to select aconsumer among possibly a large set of consumers. Because of thesimple design of topic based brokersmiddleware, they tend to scalevery well compared to content based brokers and introduces lessoverhead.
We can assume that for all our devices, data is sent to cloud asa stream of events. It is important to process the data as a streamwithout storing it, to achieve real time processing guarantees.Parallel processing of events coming from a single source can helpto reduce the latency in most applications as well. The ability toconnect large number of devices creates a need for a scalableinfrastructure to process the data. Distributed event processingengines (DSPEs) are a good fit for such requirements. A DSPEabstracts out the event delivery, propagation and processingsemantics and greatly simplify the real time algorithm development.Also a DSPE can act as a messaging fabric that can distribute datato other data sinks like databases, file systems etc.; for batchprocessing and archival purposes, after some pre-processing of thedata. We envision a cloud based data intensive computingarchitecture where stream based real time analysis and batchanalysis are combined together to form a rich infrastructure forsensor applications. We propose Cloud DIKW (Data, Information,Knowledge, Wisdom) based architecture for sensor data analysis inthe cloud. The high level DIKW view of the system is shown inFigure 1.
Figure 1 DIKW View of the System
We can assume that for all our devices, data is sent to cloud asa stream of events. It is important to process the data as a stream(before storing it) to achieve real time processing guarantees.Parallel processing of events coming from a single source can helpto reduce the latency in most applications. The ability to connectlarge number of devices creates a need for a scalableinfrastructure to process the data. Distributed event processingengines (DSPEs)[3-6] are a good fit for such requirements. A DSPEabstracts out the event delivery, propagation and processingsemantics and greatly simplifies the real time algorithmdevelopment. Also a DSPE can act as a messaging fabric thatdistributes data to other data sinks like databases, file systemsetc.; for batch processing and archival purposes, after somepre-processing of the data.
By combining all the above requirements, we have developed ourIoTCloud platform, which is a distributed software platform capableof connecting devices to the cloud services. IoTCloud uses topicsbased publish-subscribe messaging to transfer data between thedevices and the cloud services and uses a DSPE to process the datain the cloud. The platform supports two publish-subscribe brokerswith different semantics that are suitable for differentapplications. We have developed a robotic application that runsthrough a private in house cloud to demonstrate how to use thesystem and measured the characteristics of the system in order toshow that we can do real time processing of device sensor data in acloud environment in a scalable manner. The main contribution ofour work is to show explore that scalable cloud based real timedata processing can be used infor robotics sensor applications.
Section 2 of the paper describes the related work in this area.Section 3 explains the architecture of the framework and section 4the robotics application we have developed. Next in section 5, wepresent a series of tests we have done to evaluate the system anddiscuss the observations. Finally, in section 6 and 7 we concludethe paper with conclusions and future work.
2. Related Work
To best of our knowledge, connecting devices to cloud servicesfor real time processing in a scalable manner is not addressed inthe literature. Also, work related to cloud based control of robotsis largely lacking. Hassan[7] is a content based publish/subscribeframework for connecting sensor data to cloud services. Contentbased pub-sub allows greater flexibility for the applicationdesigners than topic based pub-sub systems. But content basedpub-sub systems usually involves higher overhead than topic basedpub-sub systems because the brokers has to inspect message content.Furthermore, content based pub-sub brokers are not popular and arenot widely available as products.
Mires[8], TinySIP[9], DV/DRP[10] are all publish/subscribemessaging middleware for WSNs. They address the different issues inconnecting WSNs and communicating with sensors. MQTT-S[11] is anopen topic-based pub-sub protocol defined for transferring datafrom sensors. The protocol enables data transfer between sensorsand traditional networks. In our work we assume that sensor data isavailable to be transported to cloud services and we handle thetransferring of gathered data from devices to cloud services. Forexample, a device connected to our system can send data via adedicated communication channel, public Internet etc. Also manydevices can be connected in WSNs using above mentioned protocols ormessaging systems and our platform can transfer this data to cloudservices for processing.
Reference architectures for integrating sensors and cloudservices have being discussed in the literature[12, 13]. Both workstalk aboutexplore the general architecture that can be used toconnect the sensors to cloud services and the potential issues. Inour work we provide a framework that can be used to send sensordata from devices to the cloud as well as show how to process thedata within a generic framework. We also discuss how to transferdata from devices to cloud services and process it in a scalableway, topics that are not fully addressed in above papers.
3. IoTCloud Architecture
The overallA system view of the architecture of the platform isshown in Figure 2Figure 1. Our architecture consists of three mainlayers.
1. Gateway Layer
2. Publish-Subscribe messaging layer
3. Cloud based big data processing layer
We consider a device as a set of sensors and actuators. Usersdevelop a driver that can communicate with the device and deploysit in a gateway. This driver doesn’t always have to directlyconnect to the device. For example, it can connect to the devicevia a TCP connection or through a message broker. The datagenerated by the driver application is sent to the cloud-processinglayer using publish-subscribe messaging brokers. The cloudprocessing layer processes the data and sends control messages backto the driver using the message brokers. The driver converts theinformation to a format that suites the device and communicatesthis to the device. The platform is implemented in the Javaprogramming language.
3.1 Gateway: Drivers are deployed in gateways responsible formanaging drivers. There can be multiple Gateways in the system andeach gateway has a unique id. A Gateway master controls thegateways by issuing commands to deploy/un-deploy, start/stopdrivers etc. A Gateway is connected to multiple message brokers andthese brokers can be in a cluster configuration. By default theplatform supports RabbitMQ[14], ActiveMQ and Kafka[15] messagebrokers. Gateways manage the connections to the brokers and handlethe load balancing of the device data to the brokers. Gatewaysupdate the master about the drivers deployed in it and status ofthe gateways. Master stores the state information in aZooKeeper[16] cluster.
Figure 21 IOTCloud Architecture
3.2 Driver: The driver is the data bridge between a device andthe cloud applications. The driver converts data coming from thedevice to a format that the cloud applications expect and viceversa. A driver has a name and a set of communication channels.When a driver is deployed, the running instance gets an instanceid. This instance id is used for controlling the driver after thedeployment. The same driver can be deployed multiple times and eachof the instances receive a unique id. A driver can have multiplecommunication channels and each channel within a driver has aunique name. A communication channel connects the driver topublish-subscribe messaging brokers. When a driver is deployed, itsinformation is saved in ZooKeeper. The default structure of driverinformation in ZooKeeper is:
/iot/sensors/[driver_name]/[driver_instance_id]/[channel_name]
A zookeeper node (ZNode) with the driver instance id containsinformation about the driver like its status, metadata etc. ZNodeswith channel name contains the information about the channels. Theframework allows shared and exclusive channels to be created. Anexclusive channel can give faster communication between the driversand the cloud processing. But in large-scale deployment of drivers,an exclusive channel can result in large number resources in thebrokers. Some applications don’t have strict latency requirementsand can use shared channels consuming less system resources.
3.3 Brokers: The platform specifically focuses on Topic-Basedpublish-subscribe brokers rather than content-basedpublish-subscribe brokers. We chose Topic based brokers due toseveral reasons. 1. Stable, open source topic based brokers areavailable 2. Topic based brokers are simple to use and configure 3.The overhead introduces by the broker is minimal compared tocontent based brokers. For this project the most important factorsare 1 and 3, because our applications require low latency andtopics based brokers are the ones readily available for use. Themessaging layer needs to preserve the message ordering preventingmultiple consumers consuming messages from the same driver.
There are many open source brokers available that fit full fillour needs for the messaging infrastructure. Such brokers includesActiveMQ[17], RabbitMQ[14], Kafka[15, 18] Kestrel, HonertMQ etc.From these brokers ActiveMQ, RabbitMQ and Kafka are the widely usedtopic based publish subscribe brokers. The preliminary studiesshoweds that ActiveMQ and RabbitMQ have identical functionalitiesfor our purposes and latter is capable of handling more load withless overhead. So we decided to usego with RabbitMQ. The Kafkabroker has very good clustering capabilities and can handleparallel consumer reads for the same Topic. So we decided tosupport both these brokers in our platform.
Each communication channel created in a driver is connected witha topic created in the message broker. The framework supports twomappings of channels to topics hence creating two types ofchannels. In the first type, each channel is mapped to a uniquequeue in the broker. We call this type of channels exclusivechannels. In the other type of channel, a set of channels share thesame topic in the broker. This type of channel is called a sharedchannel. At the moment we use a very simple rule to map thechannels to a shared queue. We map the same channel from multipleinstances of a driver deployed in one gateway to a singletopic.
For shared channels:
Exclusive channels:
For a shared channel, corresponding topic name is of the formatof “gateway_id.driver_name.queue_name”. For an exclusive channel,topic name is of the format of“gateway_id.driver_name.driver_id.queue_name”.
RabbitMQ: RabbitMQ is a message broker primarily supportingAdvanced Message Queuing Protocol (AMQP)[19]. Even though the coreof RabbitMQ is designed to support AMQP protocol, the broker hasbeen extended to support other message protocols like STOMP, MQTTetc. RabbitMQ is written in the Erlang programing language andsupports low latency high throughput messaging. RabbitMQ has a richAPI and architecture for developing consumers and publishers.RabbitMQ topics are easy to create and manage using its APIs. Thesetopics are light weight and can be created without much burden tothe broker. We allow both shared channels and exclusive channels tobe created for RabbitMQ. The metadata about the messages are sentusing RabbitMQ message headers. The metadata includes sensor id,gateway id and custom properties.
Kafka: Kafka is publish-subscribe message broker backed by acommit log. The messages sent by the producers are appended to acommit log and the consumers read the messages from this commitlog. Kafka implements its own message protocol and does not supportstandard protocols like AMQP or MQTT. At the core of Kafkamessaging is the concept of a Topic. A topic is divided intomultiple partitions and a message is sent to a single partition. Inour platform, partition for a message is chosen using a keyaccompanying a message. So messages with the same key go to thesame partition. Consumers consume messages from partitions.Partitions of a single topic can spread across a cluster of Kafkaservers. Furthermore, a single partition is replicated in a Kafkacluster for reliability. Kafka guarantees ordering of messages in apartition and doesn’t guarantee ordering across partitions. Becausea topic consists of multiple partitions, consumers can read fromthe same topic in parallel without affecting the message orderingfor a single message key. In IoTCloud platform we use the driver idas the key for a message.
In IoTCloud we need to send metadata with a message such as thedriver id, site id and some properties. Because Kafka only supportsbyte messages without any headers, we use a Thrift[20] basedmessage format to send metadata about the message. Use of driver idas the key, makes sure that the messages belonging to a singledriver instance will always be in one partition. We use at most oneconsumer per partition to ensure the message ordering for a driver.Because Kafka topics can be partitioned we will have parallel readcapability and write capabilities for shared channels. Because ofthis, the platform only support shared channels for Kafka.
3.4 Cloud Processing: As the primary cloud-processing frameworkwe are using Apache Storm[6], which is an open source DSPE. Thereare many DSPEs available but we chose Storm because of itsscalability, performance, excellent development community supportand the ability to use scripting languages to write itsapplications. Storm can be used to process the data and sendresponses back immediately or it can be used to do somepre-processing of the data and store them for later processing bybatch engines such as Apache Hadoop. The applications we havedeveloped doesn’t uses batch processing at the moment, so wehaven’t incorporated such engines to the platform yet but ourarchitecture permits integration of engines like Hadoop. We useFutureGrid[21] as our cloud platform for deploying the StormCluster. Futuregrid has an OpenStack based could implementation andwe provision VM images using the OpenStack tools.
Apache Storm: Storm is a distributed stream processing enginedesigned to process large amounts of streaming data in a scalableand efficient way. Data processing applications are written asStorm topologies. Storm providesA topology defines a DAG structurefor processing the streaming data coming from the devices as anunbounded stream of tuples. and acts as a gateway to the datacoming from the devices. Real time data processing algorithms arewritten as Storm topologies. A Storm topologyThe DAG consists of isa set of Spouts and Bolts written with the data processingalgorithm and connected in a DAG like structureto process the data.The tuples of the stream flow through the nodes (Spouts and Bolts)of the DAG. Spouts and Bolts are primarily written in Java butother programming languages like Python, Ruby is permitted. Thespouts and bolts of a topology can be run in parallel in differentcomputation nodes. Data enters a topology through Spouts and theprocessing happens in bolts. The components in the DAG areconnected to each other using stream (tuple) groupings. Pub-sub isa common pattern for ingesting data in to a Storm topology. A boltcan consume the connected input streams, do some processing on thetuples and generate and emit new tuples to the output streams.Usually the last bolts in the topology DAG write the results to aDB or send the results to remote nodes using pub-sub messaging. Thespouts and bolts of a topology can be run in parallel in differentcomputation nodes.
To ease the development of Storm topologies in our platform weallow the external communication points of a Storm Topology to bedefined in a configuration file. Figure 3Figure 2 is an example ofsuch configuration file. The topology has two externalcommunication channels. A “kinect_receive” spout is where we getthe input data from devices and a “count_send” bolt is where wesend output information back to the devices. We can use the aboveconfiguration to build the outer layer of a topology automatically.The algorithm has to be written by the application developer.
We can run many instances of any of the components in a StormTopology in parallel. For example to read data parallelly from manydevices, we can spawn several instances of the kinect_receive spoutin different nodes. This can be done for any bolt in the topologyas well. The parallelism can be changed at runtime as well. Thisallows the system to scale with the addition of drivers.
zk.servers: ["server1:2181"]
zk.root: "/iot/sensors"
topology.name: "wordcount"
spouts:
kinect_receive:
broker: "rabbitmq"
driver: "turtle"
channel: "kinect"
fields: ["frame", "driverID","time"]
properties:
broker.zk.servers:"server1:2181"
broker.zk.root:"/brokers"
bolts:
count_send:
broker: "rabbitmq"
driver: "turtle"
channel: "control"
fields: ["control", "driverID","time"]
properties:
metadata.broker.list:"server2:9090"
Figure 32 Topology Endpoint Configuration
We can run many instances of any of the components in a StormTopology in parallel. For example to read data parallelly from manydevices, we can spawn several instances of the kinect_receive spoutin different nodes. This can be done for any bolt in the topologyas well. The parallelism can be changed at runtime as well. Thisallows the system to scale with the addition of drivers.
3.5 Discovery: Because Storm is a parallel processing framework,it requires coordination among the processing units. For examplewhen a communication channel is created in the broker for a device,the parallel units responsible for communicating with that channelshould pick a leader because multiple units reading from the samechannel can lead to data duplication and out of order processing,which is not desirable for most applications. Also the distributedprocessing units should be able to detect when the drivers comeonline and go offline. To adapt to such a distributed dynamicprocessing environment we need discovery and coordination. We useApache ZooKeeper[16] for achieving both. When drivers come onlinethe information about the drivers is saved in the ZooKeeper. Thediscovery component discovers and connects this information to thecloud processors dynamically at runtime. This allows the processinglayers to automatically distribute the load and adjust accordinglyto the changes in the data producer side.
A storm Topology is deployed with a number of parallel Spoutsand Bolts that send and receive data from the pub-sub brokers. Wecan change the parallelism of a Spout or a Bolt at the runtime aswell. When a topology deploys its external communication components(Spout and Bolts) , it does not know about the physical addressesof the topics or how many topics it hasthey have to listen to. Soat the very beginning the topology does not have any active messagelisteners or message senders. The topology knows that it has toexchange messages with a set of drivers deployed in the gateways.The topology has information about the ZooKeeper and the driversthat it is interested in. It uses this information to dynamicallydiscover the topics that it has to listen and add those consumersand producers to the topology at runtime.
3.6 Processing Parallelism: The processing parallelism at theendpoints of the topology is bound to the message brokers and howwe can distribute the topics across the brokers. For middleprocessing bolts, the maximum parallelism is not bounded anddepends on the application. A Storm topology gets its messagesthrough the spouts. Same spout can run multiple instances inparallel to read the messages coming from multiple devicesconnected to the system. A spout always reads the messages from asingle channel of a device. If a processing algorithm requiresinput from multiple channels, the topology must have multiplespouts. A running instance of a Spout can connect to multipletopics to read the messages, but all these topics must be connectedto a channel with the same name and driver. When a spout needs toread from multiple topics, the topology distributes the topicsequally among the running instances of the spout dynamically at theruntime. The message flow through the Storm topology happensprimarily using the driver ids. The bolts that are communicatingwith the brokers know about all the topics in the system and theycan send a message to an appropriate topic by selecting the correcttopic using the driver id.
RabbitMQ: There is a limit to the number of parallel spouts thatwe can run due to the number of topics created per channel.Following gives an upper bound on number of spouts we can run whenRabbitMQ brokers are used.
Shared Channels:
Exclusive Channels:
Figure 4 RabbitMQ Exclusive Channels & Storm
In general we cannot do parallel reads from a topic due to theordering constrains. Figure 4Figure 3 shows how exclusive channelscreated by a driver named sensor_01 is connected to the stormtopology. Here, the storm topology runs only one instance for eachspout reading from channel_01 and channel_02. Because we have 8channels in 4 instances of the drivers, we need 8 topics in thebroker. Because we only have 2 spouts and 2 bolts in the topology,each spout is connected to 2 topics and each bolt is communicatingwith two 2 topics. Figure 54 shows the same scenario with sharedchannels. In this case we only have 4 topics because the twodrivers deployed in the same gateway is are using the sametopics.
Figure SEQ Figure \* ARABIC 3 RabbitMQ Exclusive Channels &Storm
Kafka: Kafka topics are more heavy weight than RabbitMQ. Forevery topic in the system, Kafka has to create a log files andindex files in the file system for its partitions. If thereplication is enabled for fault tolerance, these files have to bereplicated in the Kafka cluster. Kafka also supports parallel readsfor a single topic. Because of these reasons we only support sharedchannels for Kafka. In Kafka the number of spouts possible dependson the number of partitions for a topic.
REF _Ref400024769 \h \* MERGEFORMAT Figure 5 shows topicsdistribution with Kafka for the same scenario as in Figure 4. Inthe Figure 5 each Kafka topic has 2 partitions and we have 4 topicsbecause the channels are shared. Because each topic has twopartitions, read and write parallelism in this case is equal to theexclusive channel scenario with RabbitMQ ( REF _Ref400024790 \h \*MERGEFORMAT Figure 3). But in practical scenarios we will have lessnumber of partitions than devices connected per gateway. This willmake the parallelism greater than the shared channels with RabbitMQbut less than the exclusive channels.
Figure 54 RabbitMQ Shared Channels & Storm
Figure 7 TurtleBot
Figure SEQ Figure \* ARABIC 6 7 TurtleBot
Figure 6 Kafka Shared Channels & Storm
Figure SEQ Figure \* ARABIC 5 Kafka Shared Channels &Storm
Kafka: Kafka topics are more heavy weight than RabbitMQ. Forevery topic in the system, Kafka has to create a log files andindex files in the file system for its partitions. If thereplication is enabled for fault tolerance, these files have to bereplicated in the Kafka cluster. Kafka also supports parallel readsfor a single topic. Because of these reasons we only support sharedchannels for Kafka. In Kafka the number of spouts possible dependson the number of partitions for a topic.
Figure 6 shows topics distribution with Kafka for the samescenario as in Figure 4. In the Figure 6 each Kafka topic has 2partitions and we have 4 topics because the channels are shared.Because each topic has two partitions, read and write parallelismin this case is equal to the exclusive channel scenario withRabbitMQ (Figure 5). But in practical scenarios we will have lessnumber of partitions than devices connected per gateway. This willmake the parallelism greater than the shared channels with RabbitMQbut less than the exclusive channels.
4. TurtleBot Follower Application
In order to explore possible configurations for the IoTCloudframework, we have used the Microsoft Kinect[22] and TurtleBot[23].The Microsoft Kinect consists of an IR camera, an RGB camera, an IRemitter, and several auxiliary features. Our project was notconcerned with the details of the hardware but complete discussionsof the Kinects specifications and method of depth calculation areavailable. Currently, there are numerous open-source projects andacademic studies utilizing the Kinect, due to the sensorsaffordability and host of applications. In addition, awell-documented robot incorporating the Kinect is alreadyavailable, the TurtleBot by Willow Garage. It is because of thesemany resources that the Kinect and TurtleBot were chosen as asubject for the development of a sensor to cloud processingframework.
In our application the TurtleBot follows a large target in frontof it by trying to maintain a constant distance to the target.Compressed depth images of the Kinect camera are sent to the cloudand the processing topology calculates a point cloud of theTurtleBot’s field of view. The algorithm uses the point cloud tocalculate an average point, the centroid, of a hypothetical box infront of the TurtleBot. Shifts in the centroid are calculated andcommand messages, in the form of vectors, are sent back to theTurtlebot using its ROS[24] API. The Turtlebot then actuates thesevectors in order to maintain a set distance from the centroid.
4.1 Reading Depth Frames from the Kinect: The initial step indeveloping our application utilizing the Kinect depth camera wasfinding a driver to read in the Kinect data stream. The TurtleBotis operated with ROS, the open-source robotics operating system,which has an available Kinect driver. The ROS Kinect driver isbuilt on OpenKinect’s libfreenect[25] driver so in order to avoidany unnecessary overhead, libfreenect was used pure. Libfreenect isan open-source Kinect driver that provides a Java interface to boththe IR and RGB cameras. Methods are provided to start a depthstream and handle frames. libfreenect was originally implemented inC++, although a Java JNA wrapper is now available.
4.2 Compression: In the course of the project severalcompression schemes were tested. In the early stages the LZ4,Snappy[26] and JZlib Java compression libraries were tested. Snappyachieved less compression but was faster than the other two.Ultimately, we chose a two-stage compression process using Mehrotraet al’s [27] inversion technique as the first stage and Snappy asthe second. Mehrotra et al’s[27] inversion technique takesadvantage of the error endemic to the depth camera. The depthcamera’s accuracy decreases proportional to the inverse of thesquared depth. Hence, multiple values may be encoded to the samenumber without any loss in fidelity[27]. From using this inversiontechnique every two-byte disparity can be compressed to one byte.It is worth noting however that the inversion algorithm takesdistance as an input, not disparity. Mehrotra et al. achieve astartling 5ms compression time for their whole 3-step process withlittle optimization. For the sake of expediency, our project usedan existing java compression library (Snappy) rather than Mehrotraet al’s RLE/Golomb-Rice compression.
The last decision left was whether to implement the predictionstrategy mentioned in Mehrotra et al. The prediction strategy takesadvantage of the heterogeneous nature of the depths of objects.This translates into long runs of values in the depth data. Theprediction strategy is simple and converts any run into a run of0’s. For an RLE this will have a clear advantage but when testedwith Snappy the gain was negligible and thus not worth the addedcomputation. Ultimately, we were able to achieve a compressionratio of 10:1 in a time of 10ms. This compares favorably toMehrotra et al’s 7:1 ratio in 5ms. The data compression happens inthe Laptop computer inside the Turtlebot. After the compression thedata is sent to a driver application that runs in an IoTCloudgateway. This Gateway relays the information to the cloud.
4.3 Calculation of Velocity: The Storm topology for our scenarioconsists of 3 processing units. One spout receives the data, a boltun-compresses this data and calculate the velocity vector requiredby the TurtleBot to move and last bolt send these vectors to theTurtleBot.
All the literature indicates that the Kinect should stream eachframe as 307,200 11-bit disparity values, 2047 being sent toindicate an unreadable point. But upon inspection of receiveddisparity values, the highest value observed was 1024. When thisvalue was treated as the unreadable flag, the depth map displayedappeared normal. Depth shadows were rendered correctly along withthe minimum and maximum readable distances. The code was thenadjusted to expect only 10-bit disparity values and everythingfunctions normally. The full range of the Kinect, 80 cm – 400 cmcan be encoded with only 10-bit values. It is unclear whether the10-bit values are a result of the Java libfreenect wrapper orfaulty code, but our programs are fully functional and the issuewas left unresolved. An explanation of this phenomenon would nodoubt prove beneficial and may be a point of latterinvestigation.
The processing bolt creates a point cloud using the depth framesit receives using an approximation technique mentioned in [28]. Thealgorithm defines a hypothetical box in the TurtleBot field ofview. The average point of this box is calculated and a velocityvector is generated for TurtleBot to move towards or away from thisaverage point. This way TurtleBot always tries to keep a fixeddistance to an object in front of it.
4.4 Controlling the TurtleBot: The driver running in the Gatewayreceives the velocity vectors from the processing application. Itthen converts this vectors to a format that the ROS API of theTurtleBot accepts. Ultimately the ROS API is used by the driver tocontrol the TurtleBot. We use a Java version of ROS available forinterfacing with ROS, which is primarily written in Python.
5. Results & Discussion
We primarily focused on the latency of the system and thescalability of the system. A series of experiments were conductedto measure the latency and how well the system performs underdeployment of multiple of sensors. We used FutureGrid as our cloudplatform and deployed the setup on FutureGrid OpenStack mediumflavors. An instance of medium flavor has 2 VCPUs, 4GB of memoryand 40 GB of hard disk. We run Storm Nimbus & ZooKeeper on 1node, Gateways Servers on 2 nodes, Storm Supervisors on 3 nodes andBrokers on 2 nodes. Altogether our setup contained 8 moderatelypowerful VVirtual Machines with moderate configurations.
In order to test the latency of the system we deployed 4 driverapplications on the two Gateways that produce data at a constantrate. This data were relayed through the two brokers and injectedto a Storm topology. The Storm topology passes the data back to theGateways and it was running 4 spout instances in parallel to getthe data and 4 bolts in parallel to send the data out. Theround-trip latency was measured at the gateways for each message.This setup was repeated for different message sizes and messagerates. We went up to 100 message per second and increased themessages size up to 1MB per message size. Each driver sent 200messages and we got the average across all the drivers. We testedthe system with RabbitMQ and Kafka brokers. For measuring thescalability we progressively increased the number of driversdeployed in the gateways and observed how many devices can behandled by the system.
The TurtleBot application is an application deployed on theFutureGrid. We could observe that the TurtleBot smoothly followingsa human in front of it when this application was deployed. Wetested the TurtleBot application through the Indiana Universitycomputer network and measured the latency observed.
5.1 Latency: REF _Ref400024611 \h \* MERGEFORMAT Figure 7Figure6 shows the latency observed when running the tests through aRabbitMQ server. Up to 200KB messages, the latency was at aconsiderably lower value for all the message rates we tested. At300KB messages the latency started to grow rapidly after themessage rate of 50.
REF _Ref400024632 \h \* MERGEFORMAT Figure 8 shows the latencyvariation in the observed latencies for a particular message sizeand rate. The variation in latency was also minimal for messagesizes up to 200KB. After that there is a large variation in thelatency.
Figure 6 Average Latency for different message sizes withRabbitMQ. The different lines are for different message sizes inbytes.
Figure 7Figure 9 shows the average latency observed whilerunning throughwith the Kafka broker. Despite variation in latencyusing the Kafka broker, in average the system was running with aconsiderably low latency. But wWe observed some drastically highlatency values frequently. The frequency of these values increasedsthe average latency considerably. Despite variations in latency, inaverage the system was running with a considerably low latency withKafka. The Kafka broker is better suited to be run in machines withhigh disk IO rates. We ran our tests on computation nodes thatdoesn’t have very good IO performance. But there are otherperformance results of Kafka that were done on high disk IO nodesthat shows some large variations in latency as well. In our setupKafka broker latency was started to increase far much more quicklythan the RabbitMQ brokers.
5.2 Jitter: For most real time applications uniformity of thelatency over time is very important. Figure 8 shows the latencyvariation in observed latencies for a particular message size andrate with RabbitMQ broker. The variation in latency was alsominimal for message sizes up to 200KB. After that there was a largevariation in the latency. The Kafka latency variation is very highcompared to the RabbitMQ broker and we are not including thoseresults here.
Figure 7 Average Latency for different message sizes with Kafka.The different lines are for different message sizes in bytes.
Figure SEQ Figure \* ARABIC 7 Average Latency for differentmessage sizes with RabbitMQ. The different lines are for differentmessage sizes.
Figure 88 Latency standard deviation with different messagesizes and message rates for RabbitMQ. The different lines are fordifferent message sizes in bytes.
5.32 Scalability: In the test we did for observing thescalability of the system we deployed 1000 mock drivers in twogateways and measured the latency. These drivers can generate100byte messages at a rate of 5 message per second. We use lowvalues for both message rate and size so that we can make sure thesystem doesn’t slow down due to large amount of data produced.Figure 9Figure 10 shows the latency with RabbitMQ. Latency observedwas little higher than the previous test we did with 4 drivers butit was consistent up to 1000 drivers and stayed within reasonablerange. The increase in latency can be attributed to increased useof resources. At 1000 sensors the latency started to increase.Because this test was done in shared channel mode, only 2 spoutswere actively reading from the 2 queues created.
Figure SEQ Figure \* ARABIC 9 Average Latency for differentmessage sizes with Kafka. The different lines are for differentmessage sizes.
For Kafka wWe did the same test with the Kafka broker. Becausewe partitioned each topic in to 4, all 4 spouts were activelyreading from the Topics. This is the advantage of having a Kafkalike distributed broker. The latency observed is shown in Figure10Figure 11. As expected, there were big variations in thelatencies observed. We tried to remove these big numbers and drawthe graph to see how they affect the average latency. Figure10Figure 11 shows graphs with values > 200 removed. We canobserve that the average latency is at a considerable low rangeafter these very high values are removed. Kafka is a relatively newbroker under development and we believe its development communityis working on fixing these issues with the broker and expect thesevariations to reduce in future versions.
Figure 910 Latency with varying number of devices - RabbitMQ
Figure 1011 Latency with varying number of devices -– Kafka
All the tests were done for the best case scenario in terms oflatency of Storm based analysis. A real application would involvemuch more complex processing and a complex DAG structure for dataprocessing. Those processing latencies will add to the overalllatency in real applications. Also in our tests we sent andreceived the same message through the cloud. In real applicationsmessages generated after the processing is usually minimal comparedto the data messages. So we expect a reduction in latency aswell.
5.3 TurtleBot: Because of the latency requirements, we used theRabbitMQ broker for the TurtleBot application. The TurleBot wasfunctioning properly under the latencies we have observed. Figure11Figure 12 shows the latency values we observed for 1500 Kinectframes. The average latency fluctuated between 35ms and 25ms. TheTurtleBot was sending messages of size 60KB in a 20 message/secrate. The best case latency without any processing for suchmessages is around 10ms. The network latency and the processing isadding around another 25ms to the latency. The processing includesboth compression and decompression time of Kinect frames. Therewere some outliers that went to like values such as 50ms. Thesewere not frequent but can be seen occurring with some highprobability. We could not recognize any patterns in such highlatency observations and some of the reasons for these increasescan be network congestions, Java garbage collections and otherusers using the same network and resources in FutureGrid. Weobserved, aAverage lLatency of: 33.26 milliseconds and sStandarddDeviation of: 2.91.
Figure 1112 Latency observed in Turtlebot application
6. Conclusions
In Thisthis paper we introduced a scalable, distributedarchitecture for connecting devices to cloud services andprocessing data in real time. This paper alsoFurther we discussedabout a robotics application built on top of this framework. Weinvestigated how to scale the system with topic basedpublish-subscribe messaging brokers and a distributed streamprocessing engine in the cloud. We measured the performancecharacteristics of the system and showed that we can achieve lowlatencies with moderate hardware in the cloud. Also the resultsindicate we can scale the architecture to hundreds of connecteddevices. Because of the low latencies, The framework with theRabbitMQ broker is suitableed for applications with very lowlatencyreal time requirements. Applications involving massiveamount of devices without strict latency requirements can benefitfrom the scalability of Kafka brokers. The results also indicatethat reasonably uniform behavior in message processing latenciescan be maintained which is important factor for modeling mostproblems.
7. Future Work
As our platform evolves, we would like to extend our system toCloud DIKW applications which involve both real time analysis andbatch analysis. A primary concern for real time applications is therecovery from faults. A robot guided by a cloud application shouldwork amidst the application level failures and middleware levelfailures. We would like to explore different fault toleranttechniques for making our platform more robust. The discovery ofdevices is coarse grained at the moment and we would like to enablefiner grained discovery of devices at the cloud processing layer.For example selecting devices that meet specific criteria likegeographical locations for processing is important for someapplications. We observed that there are variations in the latencyobserved in our applications. In some applications it is requiredto contain the processing latency with hard limits. It will beinteresting to look at methods for enabling such guarantees for ourapplications. Simultaneously we are working to build new roboticsapplications based on our platform.
8. Acknowledgement
The authors would like to thank the Indiana UniversityFutureGrid team for their support in setting up the system inFutureGrid NSF award OCI-0910812. This work was partially supportedby AFOSR award FA9550-13-1-0225 “Cloud-Based Perception and Controlof Sensor Nets and Robot Swarms”.
References
1.Armbrust, M., et al., A view of cloud computing.Communications of the ACM, 2010. 53(4): p. 50-58.
2.Eugster, P.T., et al., The many faces of publish/subscribe.ACM Computing Surveys (CSUR), 2003. 35(2): p. 114-131.
3.Abadi, D.J., et al. The Design of the Borealis StreamProcessing Engine. in CIDR. 2005.
4.Gedik, B., et al. SPADE: the system s declarative streamprocessing engine. in Proceedings of the 2008 ACM SIGMODinternational conference on Management of data. 2008. ACM.
5.Neumeyer, L., et al. S4: Distributed stream computingplatform. in Data Mining Workshops (ICDMW), 2010 IEEE InternationalConference on. 2010. IEEE.
6.Anderson, Q., Storm Real-time Processing Cookbook. 2013: PacktPublishing Ltd.
7.Hassan, M.M., B. Song, and E.-N. Huh. A framework ofsensor-cloud integration opportunities and challenges. inProceedings of the 3rd international conference on Ubiquitousinformation management and communication. 2009. ACM.
8.Souto, E., et al., Mires: a publish/subscribe middleware forsensor networks. Personal and Ubiquitous Computing, 2006. 10(1): p.37-44.
9.Krishnamurthy, S. TinySIP: Providing seamless access tosensor-based services. in Mobile and Ubiquitous Systems-Workshops,2006. 3rd Annual International Conference on. 2006. IEEE.
10.Hall, C.P., A. Carzaniga, and A.L. Wolf, DV/DRP: Acontent-based networking protocol for sensor networks. 2006,Technical Report 2006/04, Faculty of Informatics, University ofLugano.
11.Hunkeler, U., H.L. Truong, and A. Stanford-Clark. MQTT-S—Apublish/subscribe protocol for Wireless Sensor Networks. inCommunication Systems Software and Middleware and Workshops, 2008.COMSWARE 2008. 3rd International Conference on. 2008. IEEE.
12.Dash, S.K., et al., Sensor-cloud: assimilation of wirelesssensor network and the cloud, in Advances in Computer Science andInformation Technology. Networks and Communications. 2012,Springer. p. 455-464.
13.Alamri, A., et al., A survey on sensor-cloud: architecture,applications, and approaches. International Journal of DistributedSensor Networks, 2013. 2013.
14.Videla, A. and J.J. Williams, RabbitMQ in action. 2012:Manning.
15.Kreps, J., N. Narkhede, and J. Rao. Kafka: A distributedmessaging system for log processing. in Proceedings of the NetDB.2011.
16.Hunt, P., et al. ZooKeeper: Wait-free Coordination forInternet-scale Systems. in USENIX Annual Technical Conference.2010.
17.Snyder, B., D. Bosnanac, and R. Davies, ActiveMQ in action.2011: Manning.
18.Goodhope, K., et al., Building LinkedIn's Real-time ActivityData Pipeline. IEEE Data Eng. Bull., 2012. 35(2): p. 33-45.
19.Vinoski, S., Advanced message queuing protocol. IEEE InternetComputing, 2006. 10(6): p. 87-89.
20.Agarwal, A., M. Slee, and M. Kwiatkowski, Thrift: Scalablecross-language services implementation. 2007, Tech. rep., Facebook(4 2007), http://thrift. apache. org/static/files/thrift-20070401.pdf.
21.Fox, G., et al., FutureGrid—A reconfigurable testbed forCloud, HPC and Grid Computing. Contemporary High PerformanceComputing: From Petascale toward Exascale, Computational Science.Chapman and Hall/CRC, 2013.
22.Zhang, Z., Microsoft kinect sensor and its effect.MultiMedia, IEEE, 2012. 19(2): p. 4-10.
23.Garage, W., TurtleBot. Website: http://turtlebot. com/lastvisited, 2011: p. 11-25.
24.Quigley, M., et al. ROS: an open-source Robot OperatingSystem. in ICRA workshop on open source software. 2009.
25.openkinect. Open Kinect. 2014 [cited 2014; Available from:http://openkinect.org/.
26.Google. snappy. 2014 [cited 2014; Available from:https://code.google.com/p/snappy/.
27.Mehrotra, S., et al. Low-complexity, near-lossless coding ofdepth maps from kinect-like depth cameras. in Multimedia SignalProcessing (MMSP), 2011 IEEE 13th International Workshop on. 2011.IEEE.
28.openkinect. Imaging Information. 2014; Available from:http://openkinect.org/wiki/Imaging_Information.