Skip to content
This repository has been archived by the owner on Apr 1, 2021. It is now read-only.

Python37 fixes #18

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
gevent
kafka-python==1.2.5
kazoo==2.2.1
gevent>=1.4
kafka-python>=1.2.5
kazoo>=2.2.1
six==1.10.0
wheel==0.24.0
9 changes: 3 additions & 6 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import sys
import os
from setuptools import setup, Command

from setuptools import setup

here = os.path.abspath(os.path.dirname(__file__))

Expand All @@ -12,9 +11,6 @@
README = f.read()





setup(
name="woof",
version=__version__,
Expand All @@ -36,7 +32,8 @@
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.6",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: Implementation :: PyPy",
"Topic :: Software Development :: Libraries :: Python Modules",
]
],
)
4 changes: 3 additions & 1 deletion woof/green_consumer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from gevent import monkey

monkey.patch_all()
import logging, gevent, time
import logging
import gevent
import time
import signal
import sys, traceback

Expand Down
12 changes: 6 additions & 6 deletions woof/partitioned_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class PartitionedProducer(object):

def __init__(self, broker,
partitioner=None, # Note if the earlier hash is needed, need to explicitly pass dumb_hash
async=False,
async_commit=False,
req_acks=None, # unused - here for legacy support
ack_timeout=None, # unused - here for legacy support
codec=None,
Expand All @@ -35,7 +35,7 @@ def __init__(self, broker,
**kwargs):

try:
self.async = async
self.async_commit = async_commit
if partitioner is not None:
_partitioner = CustomPartitioner(partitioner)
else:
Expand All @@ -61,7 +61,7 @@ def send(self, topic, key, *msg):
self.prod.send(topic, key=key, value=_msg)

# for async flush will happen in background
if not self.async:
if not self.async_commit:
self.prod.flush()

except KafkaTimeoutError as e:
Expand Down Expand Up @@ -113,14 +113,14 @@ class CyclicPartitionedProducer(KafkaProducer):

def __init__(self,
broker,
async=True,
async_commit=True,
key_serializer=make_kafka_safe,
value_serializer=make_kafka_safe,
random_start=True,
**kwargs):
self.partition_cycles = {}
self.random_start = random_start
self.async = async
self.async_commit = async_commit
kwargs['api_version'] = kwargs.get('api_version',
CURRENT_PROD_BROKER_VERSION)
super(CyclicPartitionedProducer, self).__init__(
Expand Down Expand Up @@ -159,7 +159,7 @@ def send(self, topic, key, *msg):
value=_msg)

# for async flush will happen in background
if not self.async:
if not self.async_commit:
self.prod.flush()

except KafkaTimeoutError as e:
Expand Down
4 changes: 2 additions & 2 deletions woof/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self,
key_serializer=make_kafka_safe,
value_serializer=make_kafka_safe,
retries=3,
async=False,
async_commit=False,
**kwargs):
try:
kwargs['api_version'] = kwargs.get('api_version',
Expand All @@ -29,7 +29,7 @@ def __init__(self,
value_serializer=value_serializer,
retries=retries,
**kwargs)
self.async = async
self.async_commit = async_commit
except Exception as e:
log.error("[feedproducer log] Constructor error ERROR %s \n",
str(e))
Expand Down
37 changes: 22 additions & 15 deletions woof/transactions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import socket
import time
import sys

from kafka import KafkaProducer
from kafka.errors import KafkaTimeoutError
Expand All @@ -14,24 +15,28 @@ def make_kafka_safe(raw_data):
This function was written to avoid non-unicode
string data produced to Kafka
"""
return raw_data
if sys.version_info[0] == 3 and isinstance(raw_data, str):
resp = raw_data.encode('utf-8')
else:
resp = raw_data
return resp


class TransactionLogger(object):
def __init__(self,
broker,
vertical,
host=socket.gethostname(),
async=False,
async_commit=False,
retries=1,
key_serializer=make_kafka_safe,
value_serializer=make_kafka_safe,
**kwargs):
self.broker = broker
self.this_host = host
self.vertical = vertical
self.async = async
self.topic = _get_topic_from_vertical(vertical)
self.async_commit = async_commit
self.topic = self._get_topic_from_vertical(vertical)
kwargs['api_version'] = kwargs.get('api_version',
CURRENT_PROD_BROKER_VERSION)
# thread safe producer, uses default murmur2 partiioner by default
Expand All @@ -56,33 +61,36 @@ def New(self,
def Modify(self,
txn_id,
amount="#",
skus=[],
skus=None,
detail="#",
userid="#",
email="#",
phone="#"):
skus = [] if skus is None else skus
self._send_log("MODIFY", txn_id, amount, skus, detail, userid, email,
phone)

def Cancel(self,
txn_id,
amount="#",
skus=[],
skus=None,
detail="#",
userid="#",
email="#",
phone="#"):
skus = [] if skus is None else skus
self._send_log("CANCEL", txn_id, amount, skus, detail, userid, email,
phone)

def Fulfil(self,
txn_id,
amount="#",
skus=[],
skus=None,
detail="#",
userid="#",
email="#",
phone="#"):
skus = [] if skus is None else skus
self._send_log("FULFIL", txn_id, amount, skus, detail, userid, email,
phone)

Expand Down Expand Up @@ -134,14 +142,13 @@ def _format_message(self, verb, txn_id, amount, skus, detail, userid,
"""
separator = '\t'

safe_skus = [make_kafka_safe(x) for x in skus]
safe_skus = [x for x in skus]
skus_as_string = ",".join(safe_skus)

return separator.join([self.this_host, str(time.time(
)), verb, make_kafka_safe(txn_id), make_kafka_safe(
amount), skus_as_string, make_kafka_safe(detail), make_kafka_safe(
userid), make_kafka_safe(email), make_kafka_safe(phone)])
return separator.join(
[self.this_host, str(time.time()), verb, txn_id, amount, skus_as_string, detail, userid, email, phone]
)


def _get_topic_from_vertical(vertical):
return "_".join(["TRANSACTIONS", vertical])
@staticmethod
def _get_topic_from_vertical(vertical):
return "_".join(["TRANSACTIONS", vertical])