Translate

quarta-feira, 20 de junho de 2012

Java - Limpeza automática de coleções

Um dos problemas principais em aplicações do tipo serviço é a alocação e devolução de memória, porque aplicações do tipo serviço tendem a ter um ciclo de vida de horas, dias ou meses por terem como premissa uma longa disponibilidade e robustez.

A depender da complexidade do serviço em relação ao volume de requisições à processar, é costume do arquiteto definir o uso de pilhas sofisticadas (coleções) para enfileirar e controlar o fluxo do processamento de requisições. Não obstante, estas pilhas podem naturalmente receber novos elementos, porém por uma eventual falha (ou não tratada) fará alguns de seus elementos ficarem empilhados "para sempre" até o fim do serviço. O pior é que a gc (garbage collector) nesta situação não fara a limpeza porque os elementos perdidos estão vinculados a um objeto de pilha (elemento).

No exemplo deste artigo, coloco a disposição especializações das coleções ConcurrentHashMap e ConcurrentLinkedQueue por se tratar das pilhas mais comuns numa arquitetura de serviço com paralelismo para alta produtividade. As especializações a seguir tem o mesmo propósito, expirar elementos após 120 segundos (2 minutos) em fila. Perceba na constante KEY_TIMEOUT_VALUE, encontrada em cada uma das especializações, que este tempo pode ser aumentado ou diminuído.

Importante: O mecanismo proposto como base expira elementos da pilha e destrói objetos envelopados por WTTimeObject através de anulamento. Porém, se o objeto núcleo armazenado na pilha for muito complexo, como é o caso de um Socket ou File, este será removido do empilhamento, mas ficará em aberto mesmo com o anulamento enquanto o serviço estiver ativo. Para este caso, recomendo um controle transacional conciente para o correto encerramento do objeto complexo.

WTTimeObject

Esta classe é um invólucro usado por ambas as especializações citadas anteriormente. O WTTimeObject contém atributos para a guarda do tempo de último acesso ao elemento núcleo empilhado (aquele que realmente se pretente armazenar) generalizado em <V>. Se você não entende de Generics em Java, terá dificuldades de compreender este artigo.

public class WTTimeObject<V> {

    private static long nextVal = 0L;
   
    private long lastAccessTime = 0L;
    private V value;
    private long objectId;
   
    public WTTimeObject() {
        this.objectId = this.getNextVal();
    }
   
    public WTTimeObject(V value) {

        this();

        this.lastAccessTime = System.currentTimeMillis();
        this.value = value;

    }

    public long getLastAccessTime() {
        return lastAccessTime;
    }

    public V getValue() {
        return this.getValue(true);
    }

   
    public V getValue(boolean doUpdateAccessTime) {
        if (doUpdateAccessTime) {
            this.lastAccessTime = System.currentTimeMillis();
        }
        return value;
    }

    public void setValue(V value) {
        this.lastAccessTime = System.currentTimeMillis();
        this.value = value;
    }
   
    public synchronized long getNextVal() {
       
        if (WTTimeObject.nextVal < Long.MAX_VALUE) {
            WTTimeObject.nextVal++;
        } else {
            WTTimeObject.nextVal=1;
        }
       
        return WTTimeObject.nextVal;
       
    }

    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + (int) (objectId ^ (objectId >>> 32));
        return result;
    }

    public boolean equals(Object obj) {

        if (this == obj)
            return true;

        if (obj == null)
            return false;

        if (getClass() != obj.getClass())
            return false;

        WTTimeObject<?> other = (WTTimeObject<?>) obj;

        if (objectId != other.objectId)
            return false;

        return true;

    }

}

WTConcurrentLinkedQueue

Esta é a especialização da coleção ConcurrentLinkedQueue, perceba que existe uma inner class na especialização chamada ClearExpiredRunnable que é do tipo Runnable, porque a cada adição a instância de ClearExpiredRunnable será acionada à verificar e eliminar elementos não acessados a mais de 120 segundos.

import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;

public class WTConcurrentLinkedQueue<V> {

    public static final long CLEAR_EXPIRED_INTERVAL_VALUE = 60; // A cada 1 minuto verifica
    public static final long KEY_TIMEOUT_VALUE = 120; // Elementos mais velhos do que 2 minutos 

    private ConcurrentLinkedQueue<WTTimeObject<V>> queue;

    private long keyTimeout;
    private String id;
    private long clearExpiredTimeout;
    private ClearExpiredValues clearExpiredRunnable;

    public WTConcurrentLinkedQueue(String id) {
        this(id, KEY_TIMEOUT_VALUE);
    }

    public WTConcurrentLinkedQueue(String id, Long keyTimeout) {
        this.id = id;
        this.queue = new ConcurrentLinkedQueue<WTTimeObject<V>>();
        updateTimeoutOfClearExpired();
        this.keyTimeout = keyTimeout;
        clearExpiredRunnable = new ClearExpiredValues(this);
    }

    public boolean add(V value) {

        boolean result = false;

        result = this.queue.add(new WTTimeObject<V>(value));
   
        // Chama evento para excluir elementos expirados
        clearExpiredValues();
       
        return result;

    }

    public V peek() {

        WTTimeObject<V> result = null;

        result = this.queue.peek();

        if (result == null) {
            return null;
        } else {
            return result.getValue();
        }
    }
   
    public V poll() {

        WTTimeObject<V> result = null;

        result = this.queue.poll();

        if (result == null) {
            return null;
        } else {
            return result.getValue();
        }
    }
   
   
    public boolean remove(V value) {

        boolean result = false;
       
        WTTimeObject<V> timeObj = null;

        synchronized(queue) {
       
            Iterator<WTTimeObject<V>> it = this.queue.iterator();
   
            while (it.hasNext()) {
   
                timeObj = it.next();
   
                // A comparação deve ser feita com o conteúdo de DGTimeObject e nunca direto
                if (value instanceof String) {
                    if (timeObj.getValue(false).equals(value)) {
                        result = this.queue.remove(timeObj);
                        it.remove();
                        break;
                    }
                }else {
                    if (timeObj.getValue(false) == value) {
                        result = this.queue.remove(timeObj);
                        it.remove();
                        break;
                    }
                }
   
            }
           
        }

        return result;
       
    }

    public Iterator<WTTimeObject<V>> iterator() {
        return this.queue.iterator();
    }

    public int size() {
        return this.queue.size();
    }
   
    public void clearExpiredValues() {

        if (clearExpiredTimeout < System.currentTimeMillis()) {
            updateTimeoutOfClearExpired();
            Thread r = new Thread(clearExpiredRunnable);
            r.start();
        }

    }

    public void clear() {

        this.queue.clear();

    }
   
   
    class ClearExpiredValues implements Runnable {

        private WTConcurrentLinkedQueue<V> queue;

        public ClearExpiredValues(WTConcurrentLinkedQueue<V> queue) {
            this.queue = queue;
        }

        public void run() {

            int countRemove = 0;
           
            Iterator<WTTimeObject<V>> it = this.queue.iterator();

            WTTimeObject<V> value;

            while (it.hasNext()) {

                value = it.next();

                // Verifica se o objeto venceu
                if (value != null && value.getLastAccessTime() + (keyTimeout * 1000) < System.currentTimeMillis()) {

                    // ATENÇÃO: Nunca anular o valor dentro de value, porque o remove encontra
                    // o elemento para remoção através de value.getValue().
                    this.queue.remove(value.getValue());
                    it.remove();

                    value.setValue(null);
                    value = null;

                    countRemove++;

                }
   
            }

            if (countRemove > 0)
                System.out.println(String.format("Removido(s) %d elemento(s) vencidos de %s.", countRemove, id));

        }

    }

    public void updateTimeoutOfClearExpired() {
        clearExpiredTimeout = System.currentTimeMillis() + (CLEAR_EXPIRED_INTERVAL_VALUE * 1000);
    }   

}

WTConcurrentHashMap

Já esta é a outra especialização, agora da coleção ConcurrentHashMap, perceba que também existe uma inner class na especialização chamada ClearExpiredRunnable que é do tipo Runnable, porque a cada adição a instância de ClearExpiredRunnable será acionada à verificar e eliminar elementos não acessados a mais de 120 segundos.


import java.util.Enumeration;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public class WTConcurrentHashMap<K, V> {

    public static final long CLEAR_EXPIRED_INTERVAL_VALUE = 60; // A cada 1 minuto verifica
    public static final long KEY_TIMEOUT_VALUE = 120; // Elementos mais velhos do que 2 minutos 

    private ConcurrentHashMap<K, WTTimeObject<V>> map;
    private long keyTimeout;
    private String id;
    private long clearExpiredTimeout;
    private ClearExpiredKeys clearExpiredRunnable;

    public WTConcurrentHashMap(String id) {
        this(id, KEY_TIMEOUT_VALUE);
    }

    public WTConcurrentHashMap(String id, Long keyTimeout) {
        this.id = id;
        map = new ConcurrentHashMap<K, WTTimeObject<V>>();
        updateTimeoutOfClearExpired();
        this.keyTimeout = keyTimeout;
        clearExpiredRunnable = new ClearExpiredKeys(map);
    }

    public V put(K key, V value) {

        WTTimeObject<V> result = null;

        synchronized(map) {
            result = map.put(key, new WTTimeObject<V>(value));
        }

        if (this.keyTimeout > 0L) {
            // Chama evento para excluir elementos expirados
            clearExpiredKeys();
        }
       
        if (result == null) {
            return null;
        } else {
            return value;
        }
    }

    public V get(K key) {

        WTTimeObject<V> result = null;

        synchronized(map) {
            result = map.get(key);
        }

        if (result == null) {
            return null;
        } else {
            return result.getValue();
        }
    }
   
    public V remove(K key) {

        WTTimeObject<V> result = null;

        synchronized(map) {
            result = map.remove(key);
        }

        if (result == null) {
            return null;
        } else {
            return result.getValue();
        }
    }
   
    public void clearExpiredKeys() {

        if (clearExpiredTimeout < System.currentTimeMillis()) {
            updateTimeoutOfClearExpired();
            Thread r = new Thread(clearExpiredRunnable);
            r.start();
        }

    }

    public void clear() {
        synchronized(map) {
            map.clear();
        }
    }
   
    public Set<K> keySet() {
        synchronized(map) {
            return map.keySet();
        }
    }

    public int size() {
        synchronized(map) {
            return map.size();
        }
    }
   
    class ClearExpiredKeys implements Runnable {

        private ConcurrentHashMap<K, WTTimeObject<V>> map;

        public ClearExpiredKeys(ConcurrentHashMap<K, WTTimeObject<V>> map) {
            this.map = map;
        }
       
        public void run() {

            int countRemove = 0;

            synchronized(map) {
               
                Enumeration<K> en = map.keys();
   
                K key;
                WTTimeObject<V> value;
   
                while (en.hasMoreElements()) {
   
                    key = en.nextElement();
   
                    value = map.get(key);
   
                    // Verifica se o objeto venceu
                    if (value != null && value.getLastAccessTime() + (keyTimeout * 1000) < System.currentTimeMillis()) {
   
                        value.setValue(null);
                        value = null;
                        map.remove(key);
                        countRemove++;
   
                    }
   
                }
               
            }

            if (countRemove > 0)
                System.out.println(String.format("Removido(s) %d elemento(s) vencidos de %s.", countRemove, id));

        }

    }
   
    public void updateTimeoutOfClearExpired() {
        clearExpiredTimeout = System.currentTimeMillis() + (CLEAR_EXPIRED_INTERVAL_VALUE * 1000);
    }

}


Testando as coleções especializadas

Para finalizar, vamos testar as coleções para observar a limpeza de elementos com mais de 120 segundos sem acesso:

        WTConcurrentLinkedQueue<String> map = new WTConcurrentLinkedQueue<String>("ColecaoTeste");
        WTConcurrentHashMap<Integer, String> map1 = new WTConcurrentHashMap<Integer, String>("ColecaoTeste");
        int contador = 0;

        while (true) {
            contador++;
            map.add("Teste " + contador);
            map1.put(contador, "Teste " + contador);
            try {
                Thread.sleep(1L);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }       




Nenhum comentário:

Postar um comentário