Skip to content

Commit

Permalink
Fix some kafka problems. (#1468)
Browse files Browse the repository at this point in the history
  • Loading branch information
Barenboim authored Jan 9, 2024
1 parent 3a8c14c commit 244913d
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 53 deletions.
3 changes: 2 additions & 1 deletion src/client/WFKafkaClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
#include <atomic>
#include <mutex>
#include "WFTaskError.h"
#include "WFKafkaClient.h"
#include "StringUtil.h"
#include "KafkaTaskImpl.inl"
#include "WFKafkaClient.h"

#define KAFKA_HEARTBEAT_INTERVAL (3 * 1000 * 1000)

Expand Down
3 changes: 1 addition & 2 deletions src/client/WFKafkaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@
#include <string>
#include <vector>
#include <functional>
#include "WFTask.h"
#include "KafkaMessage.h"
#include "KafkaResult.h"
#include "KafkaTaskImpl.inl"


class WFKafkaTask;
class WFKafkaClient;
Expand Down
43 changes: 36 additions & 7 deletions src/protocol/KafkaDataTypes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,50 @@
Authors: Wang Zhulei (wangzhulei@sogou-inc.com)
*/


#include <errno.h>
#include <assert.h>
#include <algorithm>
#include "KafkaDataTypes.h"

#define MIN(x, y) ((x) <= (y) ? (x) : (y))

namespace protocol
{

#define MIN(x, y) ((x) <= (y) ? (x) : (y))
std::string KafkaConfig::get_sasl_info() const
{
std::string info;

if (strcasecmp(this->ptr->mechanisms, "plain") == 0)
{
info += this->ptr->mechanisms;
info += "|";
info += this->ptr->username;
info += "|";
info += this->ptr->password;
info += "|";
}
else if (strncasecmp(this->ptr->mechanisms, "SCRAM", 5) == 0)
{
info += this->ptr->mechanisms;
info += "|";
info += this->ptr->username;
info += "|";
info += this->ptr->password;
info += "|";
}

return info;
}

static bool compare_member(const kafka_member_t *m1, const kafka_member_t *m2)
{
return strcmp(m1->member_id, m2->member_id) < 0;
}

static int compare_member(const void *p1, const void *p2)
inline void KafkaMetaSubscriber::sort_by_member()
{
kafka_member_t *member1 = (kafka_member_t *)p1;
kafka_member_t *member2 = (kafka_member_t *)p2;
return strcmp(member1->member_id, member2->member_id);
std::sort(this->member_vec.begin(), this->member_vec.end(), compare_member);
}

static bool operator<(const KafkaMetaSubscriber& s1, const KafkaMetaSubscriber& s2)
Expand Down Expand Up @@ -114,7 +143,7 @@ int KafkaCgroup::kafka_roundrobin_assignor(kafka_member_t **members,
int next = -1;

std::sort(subscribers->begin(), subscribers->end());
qsort(members, member_elements, sizeof (kafka_member_t *), compare_member);
std::sort(members, members + member_elements, compare_member);

for (const auto& subscriber : *subscribers)
{
Expand Down
41 changes: 4 additions & 37 deletions src/protocol/KafkaDataTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,21 @@
#ifndef _KAFKA_DATATYPES_H_
#define _KAFKA_DATATYPES_H_


#include <assert.h>
#include <algorithm>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <utility>
#include <string.h>
#include <vector>
#include <string>
#include <string.h>
#include <atomic>
#include <snappy.h>
#include <snappy-sinksource.h>
#include "list.h"
#include "rbtree.h"
#include "kafka_parser.h"


namespace protocol
{

Expand Down Expand Up @@ -553,30 +551,7 @@ class KafkaConfig
return kafka_sasl_set_password(password, this->ptr) == 0;
}

std::string get_sasl_info() const
{
std::string info;
if (strcasecmp(this->ptr->mechanisms, "plain") == 0)
{
info += this->ptr->mechanisms;
info += "|";
info += this->ptr->username;
info += "|";
info += this->ptr->password;
info += "|";
}
else if (strncasecmp(this->ptr->mechanisms, "SCRAM", 5) == 0)
{
info += this->ptr->mechanisms;
info += "|";
info += this->ptr->username;
info += "|";
info += this->ptr->password;
info += "|";
}

return info;
}
std::string get_sasl_info() const;

bool new_client(kafka_sasl_t *sasl)
{
Expand Down Expand Up @@ -1284,15 +1259,7 @@ class KafkaMetaSubscriber
return &this->member_vec;
}

static bool cmp(const kafka_member_t *m1, const kafka_member_t *m2)
{
return strcmp(m1->member_id, m2->member_id) < 0;
}

void sort_by_member()
{
std::sort(this->member_vec.begin(), this->member_vec.end(), cmp);
}
void sort_by_member();

private:
KafkaMeta *meta;
Expand Down
12 changes: 6 additions & 6 deletions src/protocol/kafka_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
Authors: Wang Zhulei (wangzhulei@sogou-inc.com)
*/

#include <openssl/sha.h>
#include <openssl/hmac.h>
#include <openssl/evp.h>
#include <arpa/inet.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <ctype.h>
#include <openssl/sha.h>
#include <openssl/hmac.h>
#include <openssl/evp.h>
#include "kafka_parser.h"

static kafka_api_version_t kafka_api_version_queryable[] = {
Expand Down Expand Up @@ -610,13 +610,13 @@ int kafka_parser_append_message(const void *buf, size_t *size,

if (s > parser->message_size - parser->cur_size)
{
memcpy(parser->msgbuf + parser->cur_size, buf,
memcpy((char *)parser->msgbuf + parser->cur_size, buf,
parser->message_size - parser->cur_size);
parser->cur_size = parser->message_size;
}
else
{
memcpy(parser->msgbuf + parser->cur_size, buf, s);
memcpy((char *)parser->msgbuf + parser->cur_size, buf, s);
parser->cur_size += s;
}

Expand Down

0 comments on commit 244913d

Please sign in to comment.