You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

215 lines
6.0 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

bootstrap-server: 10.160.231.159:9093,10.160.231.58:9093,10.160.231.204:9093
生产:
topic: shell-loyalty-sichuan
username: oms_prod
username: R!y7GB9IDA
测试:
topic: shell-loyalty-test
username: oms_test
username: Fi51kN!yzS
group可自定义请在名称中以group标识如oms_group
#------------------------------Java代码参考以实际系统需求与技术实现为准----------------------------
# Specifies the broker information of the Kafka instance.
# ip:port indicates the connection address and port number of the instance.
# For details about how to obtain the information, see section "Collecting Connection Information."
# Example: bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
acks=all
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
buffer.memory=33554432
retries=0
sasl.mechanism=SCRAM-SHA-512
security.protocol=SASL_SSL
# Password of the SSL truststore file.
# This password is configured to access the JKS file generated by Java.
ssl.truststore.password=dms@kafka
# Indicates whether to verify the certificate domain name.
# If this parameter is left blank, the function is disabled.
# This parameter must be disabled and must be left blank.
ssl.endpoint.identification.algorithm=
# Location of the SSL truststore file
ssl.truststore.location=client.jks
# Set the jaas username and password.
# The username and password are the username and password entered when SASL_SSL
# is enabled during Kafka instance creation,
# or the username and password set when SASL_SSL user is created.
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="username" \
password="password";
#------------------------------生产消息代码
package com.dms.kafka.examples;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Properties;
public class DmsProducer<K, V> {
public static final String PRODUCER_CONFIG_FILE_NAME = "dms.sdk.producer.properties";
private Producer<K, V> producer;
DmsProducer(String path) {
Properties props = new Properties();
try {
InputStream in = new BufferedInputStream(new FileInputStream(path));
props.load(in);
}
catch (IOException e) {
e.printStackTrace();
return;
}
producer = new KafkaProducer<>(props);
}
DmsProducer() {
Properties props;
try {
props = loadFromClassPath(PRODUCER_CONFIG_FILE_NAME);
}
catch (IOException e) {
e.printStackTrace();
return;
}
producer = new KafkaProducer<>(props);
}
public void produce(String topic, V value) {
produce(topic, null, null, value, null, null);
}
public void produce(String topic, Integer partition, K key, V value) {
produce(topic, partition, key, value, null, null);
}
public void produce(ProducerRecord<K, V> record) {
produce(record, null);
}
public void produce(ProducerRecord<K, V> record, Callback callback) {
producer.send(record, callback);
}
public void produce(String topic, Integer partition, K key, V value, Long timestamp,
Callback callback) {
if (timestamp == null) {
produce(new ProducerRecord<>(topic, partition, key, value), callback);
} else {
produce(new ProducerRecord<>(topic, partition, timestamp, key, value), callback);
}
}
public void produce(String topic, Integer partition, K key, V value, Long timestamp) {
produce(topic, partition, key, value, timestamp, null);
}
public void produce(String topic, Integer partition, K key, V value, Callback callback) {
produce(topic, partition, key, value, null, callback);
}
public void close() {
producer.close();
}
private static ClassLoader getCurrentClassLoader() {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
if (classLoader == null) {
classLoader = DmsProducer.class.getClassLoader();
}
return classLoader;
}
private static Properties loadFromClassPath(String configFileName)
throws IOException {
ClassLoader classLoader = getCurrentClassLoader();
Properties config = new Properties();
List<URL> properties = new ArrayList<>();
Enumeration<URL> propertyResources = classLoader.getResources(configFileName);
while (propertyResources.hasMoreElements()) {
properties.add(propertyResources.nextElement());
}
for (URL url : properties) {
InputStream is = null;
try {
is = url.openStream();
config.load(is);
}
finally {
if (is != null) {
is.close();
is = null;
}
}
}
return config;
}
}
#------------------------------测试代码
package com.dms.kafka.examples;
import org.junit.Test;
public class DmsProducerTest {
@Test
public void testProducer()
throws Exception {
DmsProducer<String, String> producer = new DmsProducer<>();
int partition = 0;
try {
for (int i = 0; i < 10; i++) {
String key = null;
String data = String.format("Demo message %d.", i);
producer.produce("topic-0", partition, key, data, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
return;
}
System.out.println("Message produced.");
});
System.out.println("Message content:" + data);
}
}
catch (Exception e) {
e.printStackTrace();
}
finally {
producer.close();
}
}
}