成熟丰满熟妇高潮XXXXX,人妻无码AV中文系列久久兔费 ,国产精品一国产精品,国精品午夜福利视频不卡麻豆

您好,歡迎來到九壹網(wǎng)。
搜索
您的當前位置:首頁Flume 實戰(zhàn)(2)--Flume-ng-sdk源碼分析

Flume 實戰(zhàn)(2)--Flume-ng-sdk源碼分析

來源:九壹網(wǎng)

具體參考: 官方用戶手冊和開發(fā)指南

http://flume.apache.org/FlumeDeveloperGuide.html

*) 定位和簡單例子

1). Flume-ng-sdk是用于編寫往flume agent發(fā)送數(shù)據(jù)的client sdk
2). 簡單示例

RpcClient client = null;
try {
  client = RpcClientFactory.getDefaultInstance("127.0.0.1", 41414); 
  Event event = EventBuilder.withBody("hello flume", Charset.forName("UTF-8"));
  client.append(event);
} catch (EventDeliveryException e) {
  e.printStackTrace();
} finally {
  if ( client != null ) {
    client.close();
  }
}

*) Event設計和類層次結(jié)構(gòu)

1. Event類設計
在Flume中Event是個接口類

public interface Event {
  public Map<String, String> getHeaders();
  public void setHeaders(Map<String, String> headers);
  public byte[] getBody();
  public void setBody(byte[] body);
}

由代碼可得, Event由Header集合和消息負載兩部分構(gòu)成.

2. Builder設計模式
在org.apache.flume.event下, 有兩個Event的具體實現(xiàn)類: SimpleEvent, JSonEvent.
EventBuilder類顧名思義, 采用Builder的方式來組裝對象的成員, 并產(chǎn)生最終的對象.

public class EventBuilder {

  public static Event withBody(byte[] body, Map<String, String> headers) {
    Event event = new SimpleEvent();
    if(body == null) {
      body = new byte[0];
    }
    event.setBody(body);
    if (headers != null) {
      event.setHeaders(new HashMap<String, String>(headers));
    }
    return event;
  }

  public static Event withBody(byte[] body) {
    return withBody(body, null);
  }

  public static Event withBody(String body, Charset charset,
      Map<String, String> headers) {
    return withBody(body.getBytes(charset), headers);
  }

  public static Event withBody(String body, Charset charset) {
    return withBody(body, charset, null);
  }

}

java的訪問控制符: public/default/protected/private, default表示同package可見
不過另人意外的是, 其對應的SimpleEvent的構(gòu)造函數(shù)的修飾符是public, 即不是default, 也不是protected, 這點讓EventBuilder的引入有些失敗.把Builder模式, 用到極致的是Google Protocol Buffer(java), 其每個PB對象, 都是用相應的Builder類來組裝和生成. 采用這種Builder模式的用途是, 把一個對象元素的修改和讀取徹底分離, 使得一個PB對象,從誕生后就是一個immutable對象, 只能讀取其屬性信息, 而不能修改其屬性.

*) RpcClient設計和類層次結(jié)構(gòu)

1. RpcClient的接口定義:

public interface RpcClient {
  public int getBatchSize();
    public void append(Event event) throws EventDeliveryException;
    public void appendBatch(List<Event> events) throws EventDeliveryException;
    public boolean isActive();
    public void close() throws FlumeException;
}

2. AbstractRpcClient的抽象類定義:

public abstract class AbstractRpcClient implements RpcClient {

    protected int batchSize = RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE;
    protected long connectTimeout = RpcClientConfigurationConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS;
    protected long requestTimeout = RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;

    @Override
    public int getBatchSize(){
        return batchSize;
    }

    protected abstract void configure(Properties properties) throws FlumeException;

}

新增了一些常量定義, 和新的抽象函數(shù)configure(Properties prop);

3. RpcClient工廠類的使用
RpcClientFactory的定義

public class RpcClientFactory {

    public static RpcClient getInstance(Properties properties) throws FlumeException {
        // 1). 獲取具體rpcclient的類型信息
        properties.getProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE);
        // 2). 利用反射,獲取類的class
        Class<? extends AbstractRpcClient> clazz = (Class<? extends AbstractRpcClient>) Class.forName(...);
        // 3). 產(chǎn)生類對象
        RpcClient client = clazz.newInstance();
        // 4). 進行具體rpcclient實例的配置初始化
        client.configure(properties);
        // 5). 返回對象
        return client;
    }

}

RpcClientFactory借助靜態(tài)方法getInstance, 其依據(jù)Properties里的相應key/value來, 來產(chǎn)生不同的對象實例, 配置不同的屬性. 同時RpcClient的具體實例, 其構(gòu)造方法的訪問限定符都是protected, 這一點做的, 比之前EventBuilder設計和實現(xiàn)要規(guī)范和清晰.

clazz = Class.forName(...);
client = class.newInstance();
client.configure(...);

是種非常好的實踐代碼, 把面向?qū)ο蟮亩鄳B(tài)性用到極致

4. 具體的RpcClient類的實現(xiàn)
其SDK提供了兩大類, 具體的實現(xiàn)類ThriftRpcClient和AvroRpcClient
4.1. 對ThriftRpcClient的解讀
4.1.1 thrift idl的定義
idl文件(src/main/thrift/flume.thrift)的定義

namespace java org.apache.flume.thrift

struct ThriftFlumeEvent {
  1: required map <string, string> headers,
  2: required binary body,
}

enum Status {
  OK,
  FAILED,
  ERROR,
  UNKNOWN
}

service ThriftSourceProtocol {
  Status append(1: ThriftFlumeEvent event),
  Status appendBatch(1: list<ThriftFlumeEvent> events),
}

分別對應源碼包org.apache.flume.thrift下

Status, ThriftFlumeEvent, ThriftSourceProtocol類
4.1.2 ThriftRpcClient的實現(xiàn)
ThriftRpcClient并不是簡單對ThriftSourceProtocol的客戶端的簡單封裝

public class ThriftRpcClient extends AbstractRpcClient {
  private ConnectionPoolManager connectionManager;
  private final ExecutorService callTimeoutPool;
  private final AtomicLong threadCounter;
  // ......
}

評注: 粗略觀察其類成員, 其借助線程池(ExecutorService)和連接池(ConnectionManager)管理, 來實現(xiàn)RpcClient的發(fā)送接口, 這樣append(), appendBatch()的接口都是線程安全的, 該客戶端的實例能用于多線程并發(fā)使用.

AvroRpcClient代碼結(jié)構(gòu)差不多, 先一筆帶過.

5. 兩個重要的實現(xiàn)類
FailOverRpcClient的源碼解析:
這邊采用裝飾模式(Decorator Pattern), FailOverRpcClient繼承自RpcClient, 同時又擁有實際的RpcClient實例, 只是在實際RpcClient基礎上, 添加了失敗后重試的能力.

FailOver是失敗后重試的機制, 通常借助帶嘗試次數(shù)的重試來實現(xiàn)
其append(Event e)方法中:

int tries = 0;
while (tries < maxTries) {
  try {
    tries++;
    localClient = getClient();
    localClient.append(event);
    return;
  } catch (EventDeliveryException e) {
    localClient.close();
    localClient = null;
  } catch (Exception e2) {
    throw new EventDeliveryException(
        "Failed to send event. Exception follows: ", e2);
   }
}    

這段代碼采用相對簡單的.

getNextClient()的實現(xiàn)如下:

for (int count = lastCheckedhost + 1; count < limit; count++) {
  HostInfo hostInfo = hosts.get(count);
  try {
    setDefaultProperties(hostInfo, props);
    localClient = RpcClientFactory.getInstance(props);
    lastCheckedhost = count;
    return localClient;
  } catch (FlumeException e) {
    logger.info("Could not connect to " + hostInfo, e);
    continue;
  }
}
for(int count = 0; count <= lastCheckedhost; count++) {
  HostInfo hostInfo = hosts.get(count);
  try {
    setDefaultProperties(hostInfo, props);
    localClient = RpcClientFactory.getInstance(props);
    lastCheckedhost = count;
    return localClient;
  } catch (FlumeException e) {
    logger.info("Could not connect to " + hostInfo, e);
    continue;
  }
}

LoadBalancingRpcClient的源碼解析:
LoadBalancingRpcClient顧名思義, 采用負載均衡的策略來實現(xiàn), 其還是采用遍歷(輪詢/隨機)+反饋的機制, 來動態(tài)的調(diào)整服務列表的候選順序.
在append(Event)方法中:

Iterator<HostInfo> it = selector.createHostIterator();
while (it.hasNext()) {
  HostInfo host = it.next();
  try {
    RpcClient client = getClient(host);
    client.append(event);
    eventSent = true;
    break;
  } catch (Exception ex) {
    selector.informFailure(host);
    LOGGER.warn("Failed to send event to host " + host, ex);
  }
}
if (!eventSent) {
  throw new EventDeliveryException("Unable to send event to any host");
}

selector.createHostIterator() 創(chuàng)建當前服務候選列表的一個快照, 同時遞進一個輪詢單元.
selector.informFailure(host) 是對失敗的服務進行降級處理

而HostSelector接口定義如下:

public interface HostSelector {
  void setHosts(List<HostInfo> hosts);
  Iterator<HostInfo> createHostIterator();
  void informFailure(HostInfo failedHost);
}

其具體實現(xiàn)類

#). RoundRobinHostSelector, 借助輪詢的方式來實現(xiàn)
#). RandomOrderHostSelector, 借助隨機的方式來實現(xiàn)
這兩個類, 都是借助OrderSelector<T>的實現(xiàn)類來實現(xiàn), OrderSelector封裝了對錯誤服務機器列表的屏蔽策略
該屏蔽策略如下所示:
失敗一次, 設置一個恢復時間點, 未到該恢復時間點, 則不允許獲取該機器ip/port
同時為了懲罰多次失敗, 減少獲取該服務機器的ip/port, 采用1000 * (1 << sequentialFails), 連續(xù)失敗次數(shù), 其恢復時間的間隔要加大.

*) Properties的屬性配置
基本的屬性配置

client.type = default (for avro) or thrift (for thrift)
hosts = h1 # default client accepts only 1 host
hosts.h1 = host1.example.org:41414 # host and port must both be specified
batch-size = 100 # Must be >=1 (default: 100)
connect-timeout = 20000 # Must be >=1000 (default: 20000)
request-timeout = 20000 # Must be >=1000 (default: 20000)

FailOver支持的配置

client.type = default_failover
hosts = h1 h2 h3 # at least one is required, but 2 or more makes better sense
max-attempts = 3 # Must be >=0 (default: number of hosts

Balancing支持的配置

client.type = default_loadbalance
hosts = h1 h2 h3 # At least 2 hosts are required
backoff = false # Specifies whether the client should back-off from a failed host
maxBackoff = 0 # Max timeout in millis 
host-selector = round_robin # The host selection strategy used

*) 異常類定義

EventDeliveryException和FlumeException

因篇幅問題不能全部顯示,請點此查看更多更全內(nèi)容

Copyright ? 2019- 91gzw.com 版權(quán)所有 湘ICP備2023023988號-2

違法及侵權(quán)請聯(lián)系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市萬商天勤律師事務所王興未律師提供法律服務