久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何進行flink中的kafka源碼分析

203次閱讀
沒有評論

共計 4956 個字符,預計需要花費 13 分鐘才能閱讀完成。

今天就跟大家聊聊有關如何進行 flink 中的 kafka 源碼分析,可能很多人都不太了解,為了讓大家更加了解,丸趣 TV 小編給大家總結了以下內容,希望大家根據(jù)這篇文章可以有所收獲。

最近一直在弄 flink sql 相關的東西,第一階段的目標是從解決 kafka 的消費和寫入的問題。不過也有些同學并不是很了解,今天我們來詳細分析一下包的繼承層次。

flink 源碼如下:

public class KafkaTableSourceFactory implements StreamTableSourceFactory Row { private ConcurrentHashMap String, KafkaTableSource  kafkaTableSources = new ConcurrentHashMap ();
 @Override
 public Map String, String  requiredContext() { Map String, String  context = new HashMap ();
 context.put(CONNECTOR_TYPE(), KafkaConnectorDescriptor.CONNECTOR_TYPE);
 context.put(CONNECTOR_PROPERTY_VERSION(),String.valueOf(KafkaConnectorDescriptor.CONNECTOR_PROPERTY_VERSION));
 return context;
 }
 @Override
 public List String  supportedProperties() { List String  properties = new ArrayList ();
 properties.add(KafkaConnectorDescriptor.DATABASE_KEY);
 properties.add(KafkaConnectorDescriptor.TABLE_KEY);
 return properties;
 }
 @Override
 public StreamTableSource Row  createStreamTableSource(Map String, String  properties) {
 // 避免頻繁的觸發(fā)   是否需要加緩存
 KafkaTableSource kafkaTableSource;
 String dataBase = properties.get(KafkaConnectorDescriptor.DATABASE_KEY);
 String table = properties.get(KafkaConnectorDescriptor.TABLE_KEY);
 if (!kafkaTableSources.containsKey(dataBase + table)) { Kafka08UDMPBTableSource.Builder builder = new Kafka08UDMPBTableSource.Builder();
 kafkaTableSource = builder
 .cluster(dataBase)
 .subject(table)
 .build();
 kafkaTableSources.put(dataBase + table,kafkaTableSource);
 } else { kafkaTableSource = kafkaTableSources.get(dataBase + table);
 }
 return kafkaTableSource;
 }
}
class Kafka08PBTableSource protected(topic: String,
 properties: Properties,
 schema: TableSchema,
 typeInformation: TypeInformation[Row],
 paramMap: util.LinkedHashMap[String, AnyRef],
 entryClass: String)
 extends KafkaTableSource(schema, topic, properties, new PBRowDeserializationSchema(typeInformation, paramMap,entryClass)) { override def createKafkaConsumer(topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row]): FlinkKafkaConsumerBase[Row] = { this.setStartupMode(StartupMode.EARLIEST)
 new FlinkKafkaConsumer08(topic, deserializationSchema, properties).setStartFromEarliest()
 }
}

下面用戶自定義的 kafka 的 sink 類:

class Kafka08UDMPBTableSink (topic: String,
 properties: Properties,
 partitioner: Optional[FlinkKafkaPartitioner[Row]],
 paramMap: util.LinkedHashMap[String, AnyRef],
 serializationSchema: SerializationSchema[Row],
 fieldNames: Array[String],
 fieldTypes: Array[TypeInformation[_]]
 ) extends KafkaTableSink(topic, properties, partitioner.orElse(new FlinkFixedPartitioner[Row])) { override def createKafkaProducer(topic: String, properties: Properties, serializationSchema: SerializationSchema[Row], partitioner: Optional[FlinkKafkaPartitioner[Row]]): SinkFunction[Row]={ new FlinkKafkaProducer08[Row](topic, serializationSchema, properties, partitioner.orElse(new FlinkFixedPartitioner[Row]))
 }
 override def createSerializationSchema(rowSchema: RowTypeInfo) = serializationSchema
 override def createCopy = new Kafka08UDMPBTableSink(topic, properties, this.partitioner, paramMap, serializationSchema, fieldNames, fieldTypes)
 override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): KafkaTableSink = { super.configure(this.fieldNames, this.fieldTypes)
 }
 override def getFieldNames: Array[String]=this.fieldNames
 /** Returns the types of the table fields. */
 override def getFieldTypes: Array[TypeInformation[_]]=this.fieldTypes

 override def emitDataStream(dataStream: DataStream[Row]): Unit = { val kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner)  dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass, fieldNames))  } }
public class TrackRowDeserializationSchema implements SerializationSchema Row , DeserializationSchema Row  {
 private static final long serialVersionUID = -2885556750743978636L;
 /** Type information describing the input type. */
 private TypeInformation Row  typeInfo = null;
 private LinkedHashMap paraMap;
 private String inSchema;
 private String outSchema;
 private String inClass;
 private String outClass;
}
public class TrackRowFormatFactory extends TableFormatFactoryBase Row 
 implements SerializationSchemaFactory Row , DeserializationSchemaFactory Row  { public TrackRowFormatFactory() { super(TrackValidator.FORMAT_TYPE_VALUE, 1, false);
 }
 public TrackRowFormatFactory(String type, int version, boolean supportsSchemaDerivation) { super(type, version, supportsSchemaDerivation);
 }
 @Override
 protected List String  supportedFormatProperties() { final List String  properties = new ArrayList ();
 properties.add(TrackValidator.FORMAT_IN_SCHEMA);
 properties.add(TrackValidator.FORMAT_IN_CLASS);
 properties.add(TrackValidator.FORMAT_OUT_CLASS);
 properties.add(TrackValidator.FORMAT_OUT_SCHEMA);
 properties.add(TrackValidator.FORMAT_TYPE_INFORMATION);
 properties.add(TrackValidator.FORMAT_TYPE_VALUE);
 return properties;
 }
}

看完上述內容,你們對如何進行 flink 中的 kafka 源碼分析有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注丸趣 TV 行業(yè)資訊頻道,感謝大家的支持。

正文完
 
丸趣
版權聲明:本站原創(chuàng)文章,由 丸趣 2023-08-16發(fā)表,共計4956字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網(wǎng)絡搜集發(fā)布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 北票市| 荃湾区| 巴林左旗| 湖口县| 当雄县| 夏河县| 科技| 资源县| 钟山县| 应城市| 南宫市| 南丹县| 富蕴县| 汝南县| 台北市| 绍兴市| 酒泉市| 万全县| 汨罗市| 淄博市| 云南省| 松江区| 嫩江县| 固始县| 慈利县| 长丰县| 卓资县| 连云港市| 巴彦淖尔市| 沐川县| 剑阁县| 平昌县| 汉川市| 盘锦市| 久治县| 滕州市| 余庆县| 汤原县| 台北市| 奉化市| 武陟县|