In the last series of posts we demonstrated ZeroMq as a technology that supports 'sockets on steroids', supporting multiple platforms as well as multiple languages. The examples to-date have been transmitting strings between senders and receivers. While interesting, to effectively create a distributed heterogeneous system we need to be capable of transmitting meaningful messages, preferably complex data structures rather than just strings. That's where Google's Protobuff comes into play: http://code.google.com/p/protobuf/
Building off our previously created Ubuntu 12.04 32-bit VM, let's start by installing the additional necessary packages;
$ sudo apt-get install libprotoc-dev
With the developer libraries installed, we can now extend our previous C++ example to transmit a ProtoBuff message.
We'll extend our Makefile to add the necessary libraries and a target (e.g. msgs) to generate the C++ files for the message.
$ cat Makefile
CC=g++
SRCS=main.cpp Messages.pb.cc
OBJS=$(subst .cpp,.o,$(SRCS))
INCLUDES += -I.
LIBS += -lpthread -lrt -lzmq -lprotobuf
.cpp.o:
$(CC) -c $<
main: msgs ${OBJS}
${CC} ${CFLAGS} -o $@ ${OBJS} ${LIBS}
msgs:
${SH} protoc -I. --cpp_out=. Messages.proto
clean:
${RM} ${OBJS} main *.pb.*
Oh, we should take a look at our simple Protobuff message file:
$ cat Messages.proto
message Person {
required int32 id=1;
required string name=2;
}
Finally, our extended main file:
$ cat main.cpp
#include
#include
#include
#include
#include
#include
#include "Messages.pb.h"
void* ctx=zmq_init(1);
char* EndPoint="tcp://127.0.0.1:8000";
static const int N=100;
static const int BufferSize=128;
void* sender(void*)
{
printf("(%s:%d) running\n",__FILE__,__LINE__);
void* pub=zmq_socket(ctx, ZMQ_PUB);
assert(pub);
int rc=zmq_bind(pub,EndPoint);
assert(rc==0);
Person p;
p.set_name("fatslowkid");
p.set_id(01);
for(int i=0; i
{
zmq_msg_t msg;
std::string S=p.SerializeAsString();
char* content=(char*)S.c_str();
int rc=zmq_msg_init_size(&msg, BufferSize);
assert(rc==0);
rc=zmq_msg_init_data(&msg, content, strlen(content), 0,0);
assert(rc==0);
rc=zmq_send(pub, &msg, 0);
assert(rc==0);
::usleep(100000);
}
}
void* receiver(void*)
{
printf("(%s:%d) running\n",__FILE__,__LINE__);
void* sub=zmq_socket(ctx, ZMQ_SUB);
assert(sub);
int rc=zmq_connect(sub,EndPoint);
assert(rc==0);
char* filter="";
rc=zmq_setsockopt(sub, ZMQ_SUBSCRIBE, filter, strlen(filter));
assert(rc==0);
for(int i=0; i
{
zmq_msg_t msg;
zmq_msg_init_size(&msg, BufferSize);
const int rc=zmq_recv (sub, &msg, 0);
char* content=(char*)zmq_msg_data(&msg);
Person p;
p.ParseFromString(content);
printf("(%s:%d) received: '%s'\n",__FILE__,__LINE__,p.name().c_str());
zmq_msg_close(&msg);
}
}
int main(int argc, char* argv[])
{
printf("(%s:%d) main process initializing\n",__FILE__,__LINE__);
int major, minor, patch;
zmq_version (&major, &minor, &patch);
printf("(%s:%d) zmq version: %d.%d.%d\n",__FILE__,__LINE__,major,minor,patch);
pthread_t rId;
pthread_create(&rId, 0, receiver, 0);
pthread_t sId;
pthread_create(&sId, 0, sender, 0);
pthread_join(rId,0);
pthread_join(sId,0);
printf("(%s:%d) main process terminating\n",__FILE__,__LINE__);
}
Notice that we now transmit and receive Protobuf messages, serialized as strings. The value of this is that the serialization mechanism is multi-platform & multi-language support.
Cheers.