Sidekiq, una gema y solución a un problema concreto


Ruby & George April 07En esta ocasión vamos a hablar de Sidekiq, una gema muy interesante, en la que se me ha planteado un problema y voy a contar cómo lo he resuelto. El tema de la elección de Sidekiq ha sido por el tratamiento de los workers mediante threads y no mediante procesos forks, como es el caso de utilizar Resque. Si quieres ver ambas gemas, te dejo los links de Sidekiq y de Resque, pero veamos algunas aclaraciones antes.

Los Forks: cuando trabajamos con procesos Forks lo que estamos haciendo es crear una copia completa del proceso en si mismo y podríamos decir que copiamos el espacio de direcciones y de todos los descriptores.  Sin meterme en mucha más profundidad y como parte de una aclaración en las posibilidades que puedes tener, si el lenguaje de programación (MRI Ruby) no admite el nivel kernel level threading, entonces esta es la única manera de difundir el trabajo a través de múltiples núcleos, ya que cada proceso se consigue un núcleo diferente. También ganas un poco de estabilidad, pero por contra tienes la posibilidad de que si un proceso padre se cuelga los procesos hijos de este, quedan en estado zombies.

Los threads: podríamos decir que consumen menos recursos, ya que comparten el espacio de direcciones, la memoria y permiten tener una comunicación más fácil, si hacemos una comparación con la comunicación entre procesos forks. El cambio de contexto entre los threads dentro del mismo proceso también es generalmente mucho mejor que los forks. Dependiendo del tiempo de ejecución que se utiliza, cualquier problema que pueda ocurrir con los threads (por ejemplo, necesidad de utilizar grandes cantidades de memoria para una tarea) pueden ser manejados por el recolector de basura en su mayor parte. Finalmente una ventaja es con respecto a los procesos zombies, es que no hay que preocuparse, ya que mueren en el momento que el proceso muere, evitando los procesos zombies.

El problema que se me ha planteado es el siguiente:

  1. Un fichero CSV con datos a geolocalizar con geocoder, para cargar en la base de datos y debe ser interactivamente mediante un formulario.
  2. Una vez cargado el fichero de entrada CSV y colocados los datos sobre la base de datos, debe lanzar el proceso de geolocalización.
  3. El proceso de geolicacilzación debe analizar los datos y componer mediante los datos la geoposición.
  4. El worker que tengamos, debe saber si hay algún problema, el tipo de problema que ha tenido e informar de ello para que llegado el caso retomar el proceso de geolicalización.
  5. Hay que tener en cuenta que podemos lanzar el proceso manualmente mediante un botón, para retomarlo en cualquier momento por parte del usuario gestor de la aplicación de Geolocalización.
  6. Existe la posibilidad de poder hacer una descarga de los datos que disponemos en la base de datos y dar la información de cada uno de los datos y que ha sucedido.

Llegado a este punto inicial de los requisitos necesarios para trabajar con el desarrollo de la solución, veamos paso a paso todo el proceso. Comenzamos con la parte cliente en la que disponemos de un formulario para poder capturar el fichero de entrada CSV que queremos procesar y la llamada la hacemos por un lado a través del controlador y el volver a relanzar el proceso mediante un botón adicional en nuestro widget que será a través de un endpoint, ya que la aplicación es una solución RIA y estamos utilizando Netzke y Extjs. Por tanto, tenemos el widget, en el que tenemos una parte, en la que se encarga de disponer de una caja para el nombre del fichero y un botón para cuando pulsemos a examinar, que se encarga de nuestra ubicación en local del fichero a cargar CSV y una vez que hagamos el submit del formulario, entonces se pone en marcha el controlador para cargar e insertar en la base de datos, nuestros datos.

La llamada al método de nuestro controlador lo hace dentro nuestro componente, en la definición del método como una tools bar que hace lo siguiente:

def toolbar

[
xtype: :form, => en este caso lo definimos como un tipo formulario y forma parte de Netzke / Extjs para la definición del componente para entenderse con el cliente y servidor

url: controller.estate_geocoders_path,  => en este caso lo que estamos haciendo es la llamada para comunicarnos con el controlador y nuestro método en la clase EstateGeocodersController, que se lo dice el fichero routes.rb, la conexión con el método “import_csv_to_data” y en el fichero routes, tenemos definida la ruta siguiente : post ‘estate_geocoders’ => ‘estate_geocoders#import_csv_to_data’


hideLabel: true,
width: 510,

……………………more code

una vez subido nuestro fichero tenemos la llamada a nuestro controlador:

# encoding: UTF-8
require ‘csv’

class EstateGeocodersController < ApplicationController

def import_csv_to_data nuestro método de controlador en nuestro fichero de routes: post ‘estate_geocoders’ => ‘estate_geocoders#import_csv_to_data’

response = form_response do

uploaded_io = params[:file][:upload]

CSV.foreach(uploaded_io.tempfile, { headers: true, col_sep: “;”, header_converters: :symbol }) do |row|

begin

@estate = load_houses_from_csv_row row ======> iremos creando por cada fila leida un registro en la base de datos
rescue => err
   Exception.report_exception(err, “No se puede cargar ‘#{row}'”)
end

end

if @estate.errors.empty?
     ImportGeocoderEstateWorker.perform_async unless ImportGeocoderEstateWorker.running? ==>> desde aquí conectamos con nuestro worker una vez terminado el proceso de carga de los datos en la base de datos con una condición y es que no tengamos ya corriendo ningún proceso. El método ImportGeocoderEstateWorker.running? nos dice si tenemos worker corriendo o no. SI no tenemos worker corriendo entonces hacemos la llamada a ImportGeocoderEstateWorker.perform_async que corresponde al proceso worker de nuestro Sidekiq
    {
    success: true, on_refresh: nil
    }
else
    { success: false, errors: { reason: @estate.errors.full_messages } }
end

end

render :json => response.to_nifty_json

end

def load_houses_from_csv_row(row)

e = build_new_estate(row)
e.save

end

def build_new_estate(row)

EstateGeocoder.new do |e|

e.address = row[:direccion].titleize if row[:direccion].present?
e.codpostal = row[:codigo_postal].titleize if row[:codigo_postal].present?

……… more code

end

end

end

Ahora en nuestra definición de los métodos de worker, necesitamos una clase :

# encoding: UTF-8

class ImportGeocoderEstateWorker => esta es la clase para cuando hacemos la llamada 

                                     ImportGeocoderEstateWorker.perform_async unless ImportGeocoderEstateWorker.running?

include Sidekiq::Worker

sidekiq_options :retry => 24  => El número de reintentos que hará el worker

sidekiq_retry_in do |count| => Haremos reintentos cada hora en los casos de obtener problemas en las geolocalizaciones

3600

end

def perform

scope = EstateGeocoder.where(longitude: nil, latitude: nil).where(‘type_error is null or `limit` = 1’) => creamos nuestro scope de búsqueda de datos en la base de datos

scope.find_each(batch_size: 100) do |estate|

geocode(estate) => método para geolocalizar
sleep(34) => nos esperamos en cada ciclo para no saturar

end

end

def self.running?   ==>> este es el método importante, es el que controlamos si un proceso está encolado o está trabajando para que no exista la posibilidad, en nuestro caso, que cuando tengamos un worker corriendo y lo queremos volver a lanzar con nuestro botón manualmente, tengamos problemas de duplicidades. Son dos líneas sencillas en las que controlamos las colas con w.klass == self.name  sabemos si nuestro worker está encolado o no y con work[‘payload’][‘class’] == self.name sabremos si nuestro worker está activo o no.

Sidekiq::Queue.new.any? {|w| w.klass == self.name } ||
Sidekiq::Workers.new.any? {|name, work, started_at| work[‘payload’][‘class’] == self.name }

end

private

def geocode(estate)

begin
       geolocalization(estate) => llamada e nuestro método para geoposicionar los datos pasados como parámetro estate
rescue Geokit::TooManyQueriesError => err
     estate.last_sent = Time.now
     estate.limit = true
      ===== > Si entramos en esta parte de código quiere decir que  “Límite de geoposicionamientos en google alcanzado. Parando 1 hora”
     estate.save
rescue => err
    estate.longitude = estate.latitude = nil
    estate.last_sent = Time.now
    ====== > Si entramos en esta parte de código quiere decir que = “No se puede geoposicionar la dirección “
    estate.save
end

end

 

def geolocalization(estate)

if estate.address

full_address = ” #{estate.address}, #{estate.codpostal} #{estate.town}, #{estate.province}” => componemos la dirección completa a geolocalizar

loc = Geokit::Geocoders::GoogleGeocoder3.geocode full_address.parameterize(” “), bias: :es => le metemos la dirección completa a geolocalizar con el método parametrize y bias que en nuestro caso es para españa ya que pueden existir direcciones en otros paises coincidentes, por eso es importante especificar el país.
estate.longitude, estate.latitude = loc.lng, loc.lat => ya tenemos los parámetros de geoposicionamiento
estate.limit = false
estate.last_sent = Time.now
estate.type_error = nil
estate.save

end

end

end

Recuerda que también es importante tener corriendo el proceso de sidekiq que es el controla los workers y estados. Se lanza en línea de comandos como:

bundle exec sidekiq

Finalmente , he utilizado estas dos gemas dentro del Gemfile:

gem ‘geokit-rails3’
gem ‘sidekiq’

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión / Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión / Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión / Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión / Cambiar )

Conectando a %s