Skip to content

Commit 59bf1c8

Browse files
committed
Merge branch 'master' of https://github.com/kafka-dev/kafka
2 parents b7865e3 + 9706f45 commit 59bf1c8

File tree

73 files changed

+1862
-2142
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+1862
-2142
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,9 @@ Kafka is aimed at providing a publish-subscribe solution that can handle all act
1212
See our [web site](http://sna-projects.com/kafka) for more details on the project.
1313

1414
Kafka is a new project, and we are interested in building the community; we would welcome any thoughts or patches. You can reach us [here](http://groups.google.com/group/kafka-dev).
15+
16+
To get kafka code:
17+
git clone [email protected]:kafka-dev/kafka.git kafka
18+
19+
To run unit tests:
20+
ant test (you need to make sure that scala 2.8.0 is in your PATH)
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
log4j.rootLogger=INFO, stderr
2+
3+
log4j.appender.stderr=org.apache.log4j.ConsoleAppender
4+
log4j.appender.stderr.target=System.err
5+
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
6+
log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
7+

bin/kafka-console-consumer.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#!/bin/bash
2+
3+
base_dir=$(dirname $0)
4+
export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
5+
$base_dir/kafka-run-class.sh kafka.consumer.ConsoleConsumer $@

bin/kafka-run-class.sh

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ fi
88

99
base_dir=$(dirname $0)/..
1010

11-
#CLASSPATH=$CLASSPATH:bin
12-
1311
for file in $base_dir/dist/*.jar;
1412
do
1513
CLASSPATH=$CLASSPATH:$file
@@ -20,10 +18,8 @@ do
2018
CLASSPATH=$CLASSPATH:$file
2119
done
2220

23-
CLASSPATH=dist:$CLASSPATH
24-
2521
if [ -z "$KAFKA_OPTS" ]; then
26-
KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote"
22+
KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/dist/log4j.properties "
2723
fi
2824

2925
if [ -z "$JAVA_HOME" ]; then

build.xml

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,26 +37,56 @@
3737

3838
<target name="all" depends="clean, jar" description="Build all artifacts." />
3939

40+
<target name="fsc" depends="clean, jar-fsc" description="Build all artifacts using the fast scala compiler." />
41+
4042
<target name="clean" description="Delete generated files.">
4143
<delete dir="${dist.dir}" />
4244
<replace-dir dir="${javadoc.dir}" />
4345
</target>
4446

45-
<target name="build" depends="init" description="Compile main source tree">
47+
<target name="build-fsc" depends="init" description="Compile main source tree using the fast scala compiler">
4648
<replace-dir dir="${classes.dir}" />
4749
<replace-dir dir="${testclasses.dir}" />
48-
<scalac srcdir="${src.dir}"
50+
<fsc srcdir="${src.dir}"
4951
destdir="${classes.dir}"
5052
classpathref="main-classpath"
5153
force="changed"
5254
target="jvm-1.5">
5355
<include name="**/*.scala" />
56+
</fsc>
57+
<fsc srcdir="${unittestsrc.dir}"
58+
destdir="${testclasses.dir}"
59+
classpathref="test-classpath"
60+
force="changed"
61+
target="jvm-1.5">
62+
<include name="**/*.scala"/>
63+
</fsc>
64+
<fsc srcdir="${othertestsrc.dir}"
65+
destdir="${testclasses.dir}"
66+
classpathref="test-classpath"
67+
force="changed"
68+
target="jvm-1.5">
69+
<include name="**/*.scala"/>
70+
</fsc>
71+
<copy file="${src.dir}/log4j.properties" todir="${dist.dir}" />
72+
<copy file="${test.dir}/log4j.properties" todir="${testclasses.dir}" />
73+
</target>
74+
75+
<target name="build" depends="init" description="Compile main source tree">
76+
<replace-dir dir="${classes.dir}" />
77+
<replace-dir dir="${testclasses.dir}" />
78+
<scalac srcdir="${src.dir}"
79+
destdir="${classes.dir}"
80+
classpathref="main-classpath"
81+
force="changed"
82+
target="jvm-1.5">
83+
<include name="**/*.scala" />
5484
</scalac>
5585
<scalac srcdir="${unittestsrc.dir}"
5686
destdir="${testclasses.dir}"
5787
classpathref="test-classpath"
5888
force="changed"
59-
target="jvm-1.5">
89+
target="jvm-1.5">
6090
<include name="**/*.scala"/>
6191
</scalac>
6292
<scalac srcdir="${othertestsrc.dir}"
@@ -66,8 +96,8 @@
6696
target="jvm-1.5">
6797
<include name="**/*.scala"/>
6898
</scalac>
69-
<copy file="${src.dir}/log4j.properties" todir="${dist.dir}" />
70-
<copy file="${test.dir}/log4j.properties" todir="${testclasses.dir}" />
99+
<copy file="${src.dir}/log4j.properties" todir="${dist.dir}" />
100+
<copy file="${test.dir}/log4j.properties" todir="${testclasses.dir}" />
71101
</target>
72102

73103
<target name="jar" depends="build" description="Build jar file">
@@ -83,10 +113,44 @@
83113
</jar>
84114
</target>
85115

116+
<target name="jar-fsc" depends="build-fsc" description="Build jar file using the fast scala compiler">
117+
<jar destfile="${dist.dir}/${name}-${curr.release}.jar">
118+
<fileset dir="${classes.dir}">
119+
<include name="**/*.*" />
120+
</fileset>
121+
</jar>
122+
<jar destfile="${dist.dir}/${name}-test-${curr.release}.jar">
123+
<fileset dir="${testclasses.dir}">
124+
<include name="**/*.*" />
125+
</fileset>
126+
</jar>
127+
</target>
128+
86129
<target name="test" depends="jar" description="Run junit tests.">
87130
<replace-dir dir="${testreport.dir}" />
88131
<replace-dir dir="${testhtml.dir}" />
89-
<junit printsummary="on" showoutput="false">
132+
<junit printsummary="on" showoutput="true">
133+
<classpath refid="test-classpath" />
134+
<formatter type="xml" />
135+
<batchtest fork="yes" todir="${testreport.dir}">
136+
<fileset dir="${testclasses.dir}">
137+
<include name="**/*Test.class" />
138+
<exclude name="**/Abstract*.class" />
139+
</fileset>
140+
</batchtest>
141+
</junit>
142+
<junitreport todir="${testhtml.dir}">
143+
<fileset dir="${testreport.dir}">
144+
<include name="TEST-*.xml" />
145+
</fileset>
146+
<report todir="${testhtml.dir}" format="frames" />
147+
</junitreport>
148+
</target>
149+
150+
<target name="test-fsc" depends="jar-fsc" description="Run junit tests using the fast scala compiler">
151+
<replace-dir dir="${testreport.dir}" />
152+
<replace-dir dir="${testhtml.dir}" />
153+
<junit printsummary="on" showoutput="true">
90154
<classpath refid="test-classpath" />
91155
<formatter type="xml" />
92156
<batchtest fork="yes" todir="${testreport.dir}">

clients/python/kafka.py

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,55 @@
1414

1515
import socket
1616
import struct
17+
import binascii
18+
import sys
1719

18-
class Message:
19-
def __init__(self, topic, payload):
20-
self.topic = topic
21-
self.payload = payload
20+
PRODUCE_REQUEST_ID = 0
2221

23-
## Message format is 4 byte length, 2 byte topic length, N byte topic and M byte payload
24-
def encode(self):
25-
return struct.pack('>i', len(self.payload) + len(self.topic) + 2) + \
26-
struct.pack('>h', len(self.topic)) + self.topic + self.payload
22+
def encode_message(message):
23+
# <MAGIC_BYTE: char> <CRC32: int> <PAYLOAD: bytes>
24+
return struct.pack('>B', 0) + \
25+
struct.pack('>i', binascii.crc32(message)) + \
26+
message
27+
28+
def encode_produce_request(topic, partition, messages):
29+
# encode messages as <LEN: int><MESSAGE_BYTES>
30+
encoded = [encode_message(message) for message in messages]
31+
message_set = ''.join([struct.pack('>i', len(m)) + m for m in encoded])
32+
33+
# create the request as <REQUEST_SIZE: int> <REQUEST_ID: short> <TOPIC: bytes> <PARTITION: int> <BUFFER_SIZE: int> <BUFFER: bytes>
34+
data = struct.pack('>H', PRODUCE_REQUEST_ID) + \
35+
struct.pack('>H', len(topic)) + topic + \
36+
struct.pack('>i', partition) + \
37+
struct.pack('>i', len(message_set)) + message_set
38+
return struct.pack('>i', len(data)) + data
2739

2840

2941
class KafkaProducer:
30-
def __init__(self, topic, host, port):
42+
def __init__(self, host, port):
3143
self.REQUEST_KEY = 0
32-
self.topic = topic
3344
self.connection = socket.socket()
3445
self.connection.connect((host, port))
3546

3647
def close(self):
3748
self.connection.close()
3849

39-
def send(self, message):
40-
encoded = message.encode()
41-
self.connection.send(struct.pack('>i', len(encoded) + 2) + struct.pack('>h', self.REQUEST_KEY) + encoded)
42-
50+
def send(self, messages, topic, partition = 0):
51+
self.connection.sendall(encode_produce_request(topic, partition, messages))
52+
53+
if __name__ == '__main__':
54+
if len(sys.argv) < 4:
55+
print >> sys.stderr, 'USAGE: python', sys.argv[0], 'host port topic'
56+
sys.exit(1)
57+
host = sys.argv[1]
58+
port = int(sys.argv[2])
59+
topic = sys.argv[3]
60+
61+
producer = KafkaProducer(host, port)
62+
63+
while True:
64+
print 'Enter comma seperated messages: ',
65+
line = sys.stdin.readline()
66+
messages = line.split(',')
67+
producer.send(messages, topic)
68+
print 'Sent', len(messages), 'messages successfully'

config/server.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ log.cleanup.interval.mins=1
2929
log.retention.hours=168
3030

3131
#the number of messages to accept without flushing the log to disk
32-
log.flush.interval=1000
32+
log.flush.interval=1
3333

3434
#set the following properties to use zookeeper
3535

contrib/hadoop-consumer/README

Lines changed: 31 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -5,79 +5,56 @@ It requires the following inputs from a configuration file
55

66
kafka.etl.topic : the topic to be fetched;
77

8-
kafka.nodes : a hdfs file containing kafka nodes description;
9-
test/kafka-nodes.txt is an exmple;
10-
11-
kafka.etl.config: a hdfs file containing topic configuration;
12-
test/kafka-config.txt is an exmple;
13-
14-
offsets.root : input directory containing offsets in text format;
15-
they will be automatically generated in the first run;
8+
input : input directory containing topic offsets and
9+
it can be generated by DataGenerator;
1610
the number of files in this directory determines the
17-
number of mappers thus the number of fetch jobs;
11+
number of mappers in the hadoop job;
1812

19-
output.root : output directory containing kafka data and updated
20-
offset files;
13+
output : output directory containing kafka data and updated
14+
topic offsets;
2115

22-
partition.granularity:
23-
We assume all topic data contain a "time" field
24-
and we partition data based on specified granularity.
25-
Accepted values are: minute/hour/day. Data in the
26-
same partition will go to the same reduce() call.
16+
kafka.request.limit : it is used to limit the number events fetched.
2717

28-
KafkaETLJob is an abstract class which sets up job properties and
29-
files Hadoop job. Users need to implement methods to provide Mapper
30-
and Reducer classes to be used.
18+
KafkaETLRecordReader is a record reader associated with KafkaETLInputFormat.
19+
It fetches kafka data from the server. It starts from provided offsets
20+
(specified by "input") and stops when it reaches the largest available offsets
21+
or the specified limit (specified by "kafka.request.limit").
3122

32-
KafkaETLMapper is an abstract class which fetches kafka data from
33-
the server. It starts from provided offsets (by default, starts from
34-
smallest offsets available in server) and stops when it reaches
35-
the largest available offsets. Users need to implement methods to
36-
decode fetched data (to get timestamp field), and to determine job
37-
status and stopping conditions.
23+
KafkaETLJob contains some helper functions to initialize job configuration.
3824

39-
KafkaETLReducer is an abstract class which is used to partition
40-
outputs. Users need to implement methods to generate output key
41-
and value. They can also turn off partitioning by setting the
42-
number of reducers to be 0.
25+
SimpleKafkaETLJob sets up job properties and files Hadoop job.
4326

44-
We include a simple implementation SimpleKafkaETLJob which fetches
45-
test events (in text format) from server and store data in HDFS.
27+
SimpleKafkaETLMapper dumps kafka data into hdfs.
4628

4729
HOW TO RUN:
48-
1. Complile the code using "ant jar".
30+
1. Complile using "ant" and you will see kafka-etl-contrib-<version>.jar
31+
under $KAFKA_ROOT/dist.
4932

50-
2. Generate test events in server:
33+
2. Produce test events in server and generate offset files
5134
1) Start kafka server [ Follow the quick start -
5235
http://sna-projects.com/kafka/quickstart.php ]
36+
5337
2) Update test/test.properties to change the following parameters:
5438
kafka.etl.topic : topic name
5539
event.count : number of events to be generated
56-
kafka.nodes : hdfs location of kafka nodes configuration
57-
(test/kafka-nodes.txt is an example).
58-
3) Generate events
59-
./bin/run-class.sh kafka.etl.impl.DataGenerator test/test.properties
60-
40+
kafka.server.uri : kafka server uri;
41+
input : hdfs directory of offset files
42+
43+
3) Produce test events to Kafka server and generate offset files
44+
./run-class.sh kafka.etl.impl.DataGenerator test/test.properties
6145

6246
3. Fetch generated topic into HDFS:
6347
1) Update test/test.properties to change the following parameters:
6448
hadoop.job.ugi : id and group
65-
kafka.etl.config: hdfs location of kafka configuration file
66-
(test/kafka-config.txt is an example)
67-
offsets.root : input location (you can provide a hdfs dir
68-
where you have write permission and the job will
69-
automatically generate starting offset files)
70-
output.root : output location (please provide a hdfs dir
71-
where you have write permission)
72-
partition.granularity : partition granularity
73-
74-
2) Fetch data
75-
./bin/run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties
76-
77-
78-
79-
80-
49+
input : input location
50+
output : output location
51+
kafka.request.limit: limit the number of events to be fetched;
52+
-1 means no limitation.
53+
hdfs.default.classpath.dir : hdfs location of jars
8154

55+
2) copy jars into hdfs
56+
./copy-jars.sh ${hdfs.default.classpath.dir}
8257

58+
2) Fetch data
59+
./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties
8360

0 commit comments

Comments
 (0)