Tuesday, October 13, 2020

Google ProtoBuff + ZeroMq -- C++


In the last series of posts we demonstrated ZeroMq as a technology that supports 'sockets on steroids', supporting multiple platforms as well as multiple languages.  The examples to-date have been transmitting strings between senders and receivers.  While interesting, to effectively create a distributed heterogeneous system we need to be capable of transmitting meaningful messages, preferably complex data structures rather than just strings.  That's where Google's Protobuff comes into play: http://code.google.com/p/protobuf/


Building off our previously created Ubuntu 12.04 32-bit VM, let's start by installing the additional necessary packages;



$ sudo apt-get install libprotoc-dev


With the developer libraries installed, we can now extend our previous C++ example to transmit a ProtoBuff message.


We'll extend our Makefile to add the necessary libraries and a target (e.g. msgs) to generate the C++ files for the message.



$ cat Makefile 

CC=g++

SRCS=main.cpp Messages.pb.cc

OBJS=$(subst .cpp,.o,$(SRCS))

INCLUDES += -I.

LIBS += -lpthread -lrt -lzmq -lprotobuf

.cpp.o:

$(CC) -c $<

main: msgs ${OBJS} 

${CC} ${CFLAGS} -o $@ ${OBJS} ${LIBS}

msgs:

${SH} protoc -I. --cpp_out=. Messages.proto

clean:

${RM} ${OBJS} main *.pb.*


Oh, we should take a look at our simple Protobuff message file:


$ cat Messages.proto 

message Person {

  required int32 id=1;

  required string name=2;

}


Finally, our extended main file:


$ cat main.cpp 

#include

#include

#include

#include

#include

#include

#include "Messages.pb.h"

void* ctx=zmq_init(1);

char* EndPoint="tcp://127.0.0.1:8000";

static const int N=100;

static const int BufferSize=128;

void* sender(void*)

{

  printf("(%s:%d) running\n",__FILE__,__LINE__);

  void* pub=zmq_socket(ctx, ZMQ_PUB);

  assert(pub);

  int rc=zmq_bind(pub,EndPoint);

  assert(rc==0);

  Person p;

  p.set_name("fatslowkid");

  p.set_id(01);

  for(int i=0; i

  {

    zmq_msg_t msg;

    std::string S=p.SerializeAsString();

    char* content=(char*)S.c_str();

    int rc=zmq_msg_init_size(&msg, BufferSize);

    assert(rc==0);

    rc=zmq_msg_init_data(&msg, content, strlen(content), 0,0);

    assert(rc==0);

    rc=zmq_send(pub, &msg, 0);

    assert(rc==0);

    ::usleep(100000);

  }

}

void* receiver(void*)

{

  printf("(%s:%d) running\n",__FILE__,__LINE__);

  void* sub=zmq_socket(ctx, ZMQ_SUB);

  assert(sub);

  int rc=zmq_connect(sub,EndPoint);

  assert(rc==0);

  char* filter="";

  rc=zmq_setsockopt(sub, ZMQ_SUBSCRIBE, filter, strlen(filter));

  assert(rc==0);

  for(int i=0; i

  {

    zmq_msg_t msg;

    zmq_msg_init_size(&msg, BufferSize);

    const int rc=zmq_recv (sub, &msg, 0);

    char* content=(char*)zmq_msg_data(&msg);

    Person p;

    p.ParseFromString(content);

    printf("(%s:%d) received: '%s'\n",__FILE__,__LINE__,p.name().c_str());

    zmq_msg_close(&msg);

  }

}

int main(int argc, char* argv[])

{

  printf("(%s:%d) main process initializing\n",__FILE__,__LINE__);

  int major, minor, patch;

  zmq_version (&major, &minor, &patch);

  printf("(%s:%d) zmq version: %d.%d.%d\n",__FILE__,__LINE__,major,minor,patch);

  pthread_t rId;

  pthread_create(&rId, 0, receiver, 0);

  pthread_t sId;

  pthread_create(&sId, 0, sender, 0);

  pthread_join(rId,0);

  pthread_join(sId,0);

  printf("(%s:%d) main process terminating\n",__FILE__,__LINE__);

}


Notice that we now transmit and receive Protobuf messages, serialized as strings.  The value of this is that the serialization mechanism is multi-platform & multi-language support.


Cheers.

No comments:

Post a Comment